Commit bc21eda3 authored by Ivan Tyagov's avatar Ivan Tyagov

Allow to run transformations in parallel.

parent 38efc461
...@@ -55,18 +55,18 @@ ...@@ -55,18 +55,18 @@
"""\n """\n
Simply a wrapper to real method.\n Simply a wrapper to real method.\n
"""\n """\n
data_stream = context\n \n
total_stream_length = data_stream.getSize()\n total_stream_length = context.getSize()\n
\n \n
if start > total_stream_length:\n if start > total_stream_length:\n
# end reached\n # end reached\n
return\n return\n
\n \n
data_stream_chunk_list = data_stream.readChunkList(start, end)\n data_stream_chunk_list = context.readChunkList(start, end)\n
\n \n
# do call transformation script\n # do call transformation script\n
if transform_script_id is not None:\n if transform_script_id is not None:\n
transform_script = getattr(data_stream, transform_script_id, None)\n transform_script = getattr(context, transform_script_id, None)\n
if transform_script is not None:\n if transform_script is not None:\n
start, end = transform_script(context, data_stream_chunk_list, \\\n start, end = transform_script(context, data_stream_chunk_list, \\\n
start, \\\n start, \\\n
...@@ -77,7 +77,7 @@ if transform_script_id is not None:\n ...@@ -77,7 +77,7 @@ if transform_script_id is not None:\n
# [warning] store current position offset in Data Stream, this can cause easily \n # [warning] store current position offset in Data Stream, this can cause easily \n
# ConflictErrors and it spawns re-index activities on DataStream. Thus \n # ConflictErrors and it spawns re-index activities on DataStream. Thus \n
# disable for now.\n # disable for now.\n
#data_stream.setIntOffsetIndex(end)\n #context.setIntOffsetIndex(end)\n
\n \n
# start another read in another activity\n # start another read in another activity\n
start += chunk_length\n start += chunk_length\n
...@@ -87,13 +87,15 @@ if end > total_stream_length:\n ...@@ -87,13 +87,15 @@ if end > total_stream_length:\n
# no read beyond end of stream\n # no read beyond end of stream\n
end = total_stream_length\n end = total_stream_length\n
\n \n
# some bytes left ...\n if recursive:\n
data_stream.activate().DataStream_readChunkListAndTransform( \\\n # some bytes left ...\n
context.activate().DataStream_readChunkListAndTransform( \\\n
start, \\\n start, \\\n
end, \\\n end, \\\n
chunk_length, \\\n chunk_length, \\\n
transform_script_id,\\\n transform_script_id,\\\n
data_array_reference,\\\n data_array_reference,\\\n
recursive = recursive, \\\n
**kw)\n **kw)\n
...@@ -101,7 +103,7 @@ data_stream.activate().DataStream_readChunkListAndTransform( \\\n ...@@ -101,7 +103,7 @@ data_stream.activate().DataStream_readChunkListAndTransform( \\\n
</item> </item>
<item> <item>
<key> <string>_params</string> </key> <key> <string>_params</string> </key>
<value> <string>start, end, chunk_length, transform_script_id=None, data_array_reference=None, **kw</string> </value> <value> <string>start, end, chunk_length, transform_script_id=None, data_array_reference=None, recursive=1,**kw</string> </value>
</item> </item>
<item> <item>
<key> <string>id</string> </key> <key> <string>id</string> </key>
......
...@@ -50,27 +50,55 @@ ...@@ -50,27 +50,55 @@
</item> </item>
<item> <item>
<key> <string>_body</string> </key> <key> <string>_body</string> </key>
<value> <string>""" \n <value> <string encoding="cdata"><![CDATA[
Read entire stream using activities and pass stream\'s data to handler script\n
who can transform it.\n """ \n
Read entire stream using activities either in a sequence or in a oarallel mode.\n
Pass stream\'s data to handler script who can transform it.\n
Parameters:\n Parameters:\n
* transform_script_id - the script which will transform data\n * transform_script_id - the script which will transform data\n
* chunk_length - the length of a chunk\n * chunk_length - the length of a chunk\n
* data_array_reference - the reference of the output Data Array\n
* parallelize - try to transform in parallel or not, in this case\n
developer must carefully choose chunk_length to match record (s) size\n
"""\n """\n
start = 0\n start = 0\n
end = chunk_length\n end = chunk_length\n
context.activate().DataStream_readChunkListAndTransform( \\\n if not parallelize:\n
# sequential case\n
context.activate().DataStream_readChunkListAndTransform( \\\n
start, \\\n start, \\\n
end, \\\n end, \\\n
chunk_length, \\\n chunk_length, \\\n
transform_script_id, \\\n transform_script_id, \\\n
data_array_reference,\\\n data_array_reference,\\\n
recursive =1, \\\n
**kw)\n **kw)\n
</string> </value> else:\n
# parallel case\n
total_size = context.getSize()\n
while total_size > start:\n
start += chunk_length + 1\n
end += chunk_length +1\n
if end > total_size:\n
end = total_size\n
\n
# call transformation in an activity\n
context.activate(activity=\'SQLQueue\').DataStream_readChunkListAndTransform( \\\n
start, \\\n
end, \\\n
chunk_length, \\\n
transform_script_id, \\\n
data_array_reference,\\\n
recursive = 0, \\\n
**kw)\n
]]></string> </value>
</item> </item>
<item> <item>
<key> <string>_params</string> </key> <key> <string>_params</string> </key>
<value> <string>chunk_length=1048576, transform_script_id=None, data_array_reference=None, **kw</string> </value> <value> <string>chunk_length=1048576, transform_script_id=None, data_array_reference=None,parallelize=0, **kw</string> </value>
</item> </item>
<item> <item>
<key> <string>id</string> </key> <key> <string>id</string> </key>
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment