Commit bb816d89 authored by Julien Muchembled's avatar Julien Muchembled

DataStream: queue ingested data in order to append bigger chunks

parent bb0ad311
<catalog_method>
<item key="sql_clear_catalog" type="int">
<value>1</value>
</item>
</catalog_method>
<?xml version="1.0"?>
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="SQL" module="Products.ZSQLMethods.SQL"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>allow_simple_one_argument_traversal</string> </key>
<value>
<none/>
</value>
</item>
<item>
<key> <string>arguments_src</string> </key>
<value> <string></string> </value>
</item>
<item>
<key> <string>cache_time_</string> </key>
<value> <int>0</int> </value>
</item>
<item>
<key> <string>class_file_</string> </key>
<value> <string></string> </value>
</item>
<item>
<key> <string>class_name_</string> </key>
<value> <string></string> </value>
</item>
<item>
<key> <string>connection_hook</string> </key>
<value>
<none/>
</value>
</item>
<item>
<key> <string>connection_id</string> </key>
<value> <string>erp5_sql_connection</string> </value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>z0_drop_data_stream_queue</string> </value>
</item>
<item>
<key> <string>max_cache_</string> </key>
<value> <int>100</int> </value>
</item>
<item>
<key> <string>max_rows_</string> </key>
<value> <int>1000</int> </value>
</item>
<item>
<key> <string>title</string> </key>
<value> <string></string> </value>
</item>
</dictionary>
</pickle>
</record>
</ZopeData>
<catalog_method>
<item key="sql_uncatalog_object" type="int">
<value>1</value>
</item>
</catalog_method>
DELETE FROM data_stream_queue WHERE <dtml-sqltest uid op=eq type=int multiple>
\ No newline at end of file
<?xml version="1.0"?>
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="SQL" module="Products.ZSQLMethods.SQL"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>allow_simple_one_argument_traversal</string> </key>
<value>
<none/>
</value>
</item>
<item>
<key> <string>arguments_src</string> </key>
<value> <string>uid</string> </value>
</item>
<item>
<key> <string>cache_time_</string> </key>
<value> <int>0</int> </value>
</item>
<item>
<key> <string>class_file_</string> </key>
<value> <string></string> </value>
</item>
<item>
<key> <string>class_name_</string> </key>
<value> <string></string> </value>
</item>
<item>
<key> <string>connection_hook</string> </key>
<value>
<none/>
</value>
</item>
<item>
<key> <string>connection_id</string> </key>
<value> <string>erp5_sql_connection</string> </value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>z0_uncatalog_data_stream_queue</string> </value>
</item>
<item>
<key> <string>max_cache_</string> </key>
<value> <int>100</int> </value>
</item>
<item>
<key> <string>max_rows_</string> </key>
<value> <int>1000</int> </value>
</item>
<item>
<key> <string>title</string> </key>
<value> <string></string> </value>
</item>
</dictionary>
</pickle>
</record>
</ZopeData>
<catalog_method>
<item key="sql_clear_catalog" type="int">
<value>1</value>
</item>
</catalog_method>
CREATE TABLE `data_stream_queue` (
`id` BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY,
`uid` BIGINT UNSIGNED NOT NULL,
`chunk` MEDIUMBLOB NOT NULL
) ENGINE=InnoDB;
<?xml version="1.0"?>
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="SQL" module="Products.ZSQLMethods.SQL"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>_col</string> </key>
<value>
<tuple/>
</value>
</item>
<item>
<key> <string>allow_simple_one_argument_traversal</string> </key>
<value>
<none/>
</value>
</item>
<item>
<key> <string>arguments_src</string> </key>
<value> <string></string> </value>
</item>
<item>
<key> <string>cache_time_</string> </key>
<value> <int>0</int> </value>
</item>
<item>
<key> <string>class_file_</string> </key>
<value> <string></string> </value>
</item>
<item>
<key> <string>class_name_</string> </key>
<value> <string></string> </value>
</item>
<item>
<key> <string>connection_hook</string> </key>
<value>
<none/>
</value>
</item>
<item>
<key> <string>connection_id</string> </key>
<value> <string>erp5_sql_connection</string> </value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>z_create_data_stream_queue</string> </value>
</item>
<item>
<key> <string>max_cache_</string> </key>
<value> <int>100</int> </value>
</item>
<item>
<key> <string>max_rows_</string> </key>
<value> <int>1000</int> </value>
</item>
<item>
<key> <string>title</string> </key>
<value> <string></string> </value>
</item>
</dictionary>
</pickle>
</record>
</ZopeData>
......@@ -58,4 +58,17 @@ class DataStream(BigFile):
for chunk in data.iterate(start_offset, end_offset - start_offset):
chunk_list.append(chunk)
return chunk_list
\ No newline at end of file
return chunk_list
def appendData(self, data_chunk):
db = self.getPortalObject().erp5_sql_connection()
db.query('INSERT INTO data_stream_queue(uid, chunk) VALUES(%s,%s)'
% (self.getUid(), db.string_literal(data_chunk)))
def processDataStreamQueue(self):
q = self.getPortalObject().erp5_sql_connection().query
r = q('SELECT id, chunk FROM data_stream_queue WHERE uid=%s ORDER BY id'
% self.getUid(), max_rows=0)[1]
self._appendData(''.join(x[1] for x in r))
q('DELETE FROM data_stream_queue WHERE id IN (%s)'
% ','.join(str(x[0]) for x in r))
<?xml version="1.0"?>
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="Alarm" module="erp5.portal_type"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>active_sense_method_id</string> </key>
<value> <string>Alarm_processDataStreamQueue</string> </value>
</item>
<item>
<key> <string>automatic_solve</string> </key>
<value> <int>0</int> </value>
</item>
<item>
<key> <string>description</string> </key>
<value>
<none/>
</value>
</item>
<item>
<key> <string>enabled</string> </key>
<value> <int>1</int> </value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>process_data_stream_queue</string> </value>
</item>
<item>
<key> <string>periodicity_hour</string> </key>
<value>
<tuple/>
</value>
</item>
<item>
<key> <string>periodicity_minute</string> </key>
<value>
<tuple/>
</value>
</item>
<item>
<key> <string>periodicity_minute_frequency</string> </key>
<value> <int>15</int> </value>
</item>
<item>
<key> <string>periodicity_month</string> </key>
<value>
<tuple/>
</value>
</item>
<item>
<key> <string>periodicity_month_day</string> </key>
<value>
<tuple/>
</value>
</item>
<item>
<key> <string>periodicity_start_date</string> </key>
<value>
<object>
<klass>
<global name="DateTime" module="DateTime.DateTime"/>
</klass>
<tuple>
<none/>
</tuple>
<state>
<tuple>
<float>1483228800.0</float>
<string>GMT</string>
</tuple>
</state>
</object>
</value>
</item>
<item>
<key> <string>periodicity_week</string> </key>
<value>
<tuple/>
</value>
</item>
<item>
<key> <string>portal_type</string> </key>
<value> <string>Alarm</string> </value>
</item>
<item>
<key> <string>title</string> </key>
<value> <string>Process Data Stream queue</string> </value>
</item>
</dictionary>
</pickle>
</record>
</ZopeData>
portal = context.getPortalObject()
activate = portal.portal_activities.activateObject
for x in portal.Alarm_zDataStreamQueue():
activate(x.path, tag=tag,
serialization_tag='process_data_stream:%s' % x.uid
).processDataStreamQueue()
<?xml version="1.0"?>
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="PythonScript" module="Products.PythonScripts.PythonScript"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>Script_magic</string> </key>
<value> <int>3</int> </value>
</item>
<item>
<key> <string>_bind_names</string> </key>
<value>
<object>
<klass>
<global name="NameAssignments" module="Shared.DC.Scripts.Bindings"/>
</klass>
<tuple/>
<state>
<dictionary>
<item>
<key> <string>_asgns</string> </key>
<value>
<dictionary>
<item>
<key> <string>name_container</string> </key>
<value> <string>container</string> </value>
</item>
<item>
<key> <string>name_context</string> </key>
<value> <string>context</string> </value>
</item>
<item>
<key> <string>name_m_self</string> </key>
<value> <string>script</string> </value>
</item>
<item>
<key> <string>name_subpath</string> </key>
<value> <string>traverse_subpath</string> </value>
</item>
</dictionary>
</value>
</item>
</dictionary>
</state>
</object>
</value>
</item>
<item>
<key> <string>_params</string> </key>
<value> <string>fixit=0, tag=None</string> </value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>Alarm_processDataStreamQueue</string> </value>
</item>
</dictionary>
</pickle>
</record>
</ZopeData>
SELECT catalog.uid, path FROM data_stream_queue LEFT JOIN catalog USING (uid)
\ No newline at end of file
<?xml version="1.0"?>
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="SQL" module="Products.ZSQLMethods.SQL"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>arguments_src</string> </key>
<value> <string></string> </value>
</item>
<item>
<key> <string>connection_id</string> </key>
<value> <string>erp5_sql_connection</string> </value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>Alarm_zDataStreamQueue</string> </value>
</item>
<item>
<key> <string>title</string> </key>
<value> <string></string> </value>
</item>
</dictionary>
</pickle>
</record>
</ZopeData>
erp5_mysql_innodb/z0_drop_data_stream_queue
erp5_mysql_innodb/z0_uncatalog_data_stream_queue
erp5_mysql_innodb/z_create_data_stream_queue
\ No newline at end of file
portal_alarms/process_data_stream_queue
portal_gadgets/WendelinInformationGadget
portal_gadgets/WendelinInformationGadget/**
web_page_module/wendelin_information_gadget.html
\ No newline at end of file
  • @jm ,I'm interested to know about motivation of using SQL backend as an intermediate layer? Considering we use NEO which usually uses SQL backend why is this needed.

    Additionally I think that tests is needed (maybe already covered)

  • All this is urgent development to save wwm-mic-wind-production. I tested manually, using the software/wendelin-scalability/test-many-small-transactions.cfg SR (slapos@3df26781). No idea if all this will end up in the master branch. Maybe not. I guess we'll tell more as part of the WWM project.

  • @jm, thank you. Let us see.

Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment