1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
# -*- coding: utf-8 -*-
## Copyright (c) 2002 Nexedi SARL and Contributors. All Rights Reserved.
# Sebastien Robin <seb@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
from os import path
from logging import getLogger, Formatter
from AccessControl import ClassSecurityInfo
from Products.ERP5Type.Tool.BaseTool import BaseTool
from Products.ERP5Type import Permissions
from Products.ERP5Type.Globals import InitializeClass
from Products.ERP5SyncML.SyncMLConstant import ACTIVITY_PRIORITY, \
SynchronizationError
from Products.ERP5SyncML.SyncMLMessage import SyncMLRequest
from Products.ERP5SyncML.Engine.SynchronousEngine import SyncMLSynchronousEngine
from Products.ERP5SyncML.Engine.AsynchronousEngine import SyncMLAsynchronousEngine
from Products.ERP5.ERP5Site import getSite
synchronous_engine = SyncMLSynchronousEngine()
asynchronous_engine = SyncMLAsynchronousEngine()
# Logging channel definitions
# Main logging channel
syncml_logger = getLogger('ERP5SyncML')
# Direct logging to "[instancehome]/log/ERP5SyncML.log", if this
# directory exists. Otherwise, it will end up in root logging
# facility (ie, event.log).
from App.config import getConfiguration
instancehome = getConfiguration().instancehome
if instancehome is not None:
log_directory = path.join(instancehome, 'log')
if path.isdir(log_directory):
from Signals import Signals
from ZConfig.components.logger.loghandler import FileHandler
log_file_handler = FileHandler(path.join(log_directory,
'ERP5SyncML.log'))
# Default zope log format string borrowed from
# ZConfig/components/logger/factory.xml, but without the extra "------"
# line separating entries.
log_file_handler.setFormatter(Formatter(
"%(asctime)s %(levelname)s %(name)s %(message)s",
"%Y-%m-%dT%H:%M:%S"))
Signals.registerZopeSignals([log_file_handler])
syncml_logger.addHandler(log_file_handler)
syncml_logger.propagate = 0
def checkAlertCommand(syncml_request):
"""
This parse the alert commands received and return a
dictionnary mapping database to sync mode
"""
database_alert_list = []
# XXX To be moved on engine
search = getSite().portal_categories.syncml_alert_code.searchFolder
for alert in syncml_request.alert_list:
if alert["data"] == "222":
# 222 is for asking next message, do not care
continue
# Retrieve the category
# XXX Categories must be redefined, ID must be code, not title so
# that we can drop the use of searchFolder
alert_code_category_list = search(reference=alert['data'])
if len(alert_code_category_list) == 1:
alert_code_category = alert_code_category_list[0].getId()
else:
# Must return (405) Command not allowed
raise NotImplementedError("Alert code is %s, got %s category" %
(alert['data'],
len(alert_code_category_list)))
# Copy the whole dict & add the category id
alert["code"] = alert_code_category
database_alert_list.append(alert)
return database_alert_list
class SynchronizationTool(BaseTool):
""" This tool implements the SyncML Protocol
SyncML Protocol defines how to synchronize data between clients and server.
Here is a mapping of the specification with the implementation in this tool :
- client are subscriptions
- server are publications
- change log are managed through the use of signatures. A signature contains
the last data sent and which was successfully synchronized. When running a
new synchronization new data is compared with the one stored in signature
to detect changes.
"""
meta_type = 'ERP5 Synchronizations'
portal_type = 'Synchronization Tool'
id = "portal_synchronizations"
security = ClassSecurityInfo()
security.declareProtected(Permissions.AccessContentsInformation,
'getConflictList')
def getConflictList(self, context=None):
"""
Retrieve the list of all conflicts
Here the list is as follow :
[conflict_1,conflict2,...] where conflict_1 is like:
['publication',publication_id,object.getPath(),property_id,
publisher_value,subscriber_value]
"""
conflict_list = []
for publication in self.searchFolder(portal_type='SyncML Publication'):
for result in publication.searchFolder(
portal_type='SyncML Subscription'):
subscriber = result.getObject()
sub_conflict_list = subscriber.getConflictList()
for conflict in sub_conflict_list:
if context is None or conflict.getOriginValue() == context:
conflict_list.append(conflict.__of__(subscriber))
for result in self.searchFolder(portal_type='SyncML Subscription'):
subscription = result.getObject()
sub_conflict_list = subscription.getConflictList()
for conflict in sub_conflict_list:
if context is None or conflict.getOriginValue() == context:
conflict_list.append(conflict.__of__(subscription))
return conflict_list
security.declareProtected(Permissions.AccessContentsInformation,
'getDocumentConflictList')
def getDocumentConflictList(self, context=None):
"""
Retrieve the list of all conflicts for a given document
Well, this is the same thing as getConflictList with a path
"""
return self.getConflictList(context)
security.declareProtected(Permissions.AccessContentsInformation,
'getSubscriberDocumentVersion')
def getSubscriberDocumentVersion(self, conflict, docid):
"""
Given a 'conflict' and a 'docid' refering to a new version of a
document, applies the conflicting changes to the document's new
version. By so, two differents versions of the same document will be
available.
Thus, the manager will be able to open both version of the document
before selecting which one to keep.
"""
subscriber = conflict.getSubscriber()
publisher_object = conflict.getOrigineValue()
publisher_xml = self.getXMLObject(
object=publisher_object,
xml_mapping=subscriber.getXmlBindingGeneratorMethodId())
directory = publisher_object.aq_parent
object_id = docid
if object_id in directory.objectIds():
directory._delObject(object_id) # XXX Why not manage_delObjects ?
# Import the conduit and get it
conduit = subscriber.getConduit()
conduit.addNode(xml=publisher_xml, object=directory,
object_id=object_id,
signature=conflict.getParentValue())
subscriber_document = directory._getOb(object_id)
for c in self.getConflictList(conflict.getOriginValue()):
if c.getSubscriber() == subscriber:
c.applySubscriberValue(document=subscriber_document)
return subscriber_document
# XXX- ?
def _getCopyId(self, document):
directory = document.aq_inner.aq_parent
if directory.getId() != 'portal_repository':
document_id = document.getId() + '_conflict_copy'
if document_id in directory.objectIds():
directory._delObject(document_id) # XXX manage_delObjects ?
else:
repotool = directory
docid = repotool.getDocidAndRevisionFromObjectId(document.getId())[0]
new_rev = repotool.getFreeRevision(docid) + 10 # make sure it's not gonna provoke conflicts
document_id = repotool._getId(docid, new_rev)
return document_id
#
# XXX-Aurel : the following methods must be moved to a specific part that
# manages protocols to send/receive messages
#
security.declarePublic('readResponse')
def readResponse(self, text='', sync_id=None, from_url=None):
"""
We will look at the url and we will see if we need to send mail, http
response, or just copy to a file.
"""
syncml_logger.info('readResponse sync_id %s, text %s' % (sync_id, text))
if text:
# we are still anonymous at this time, use unrestrictedSearchResults
# to fetch the Subcribers
catalog_tool = self.getPortalObject().portal_catalog.unrestrictedSearchResults
syncml_request = SyncMLRequest(text)
# It is assumed that client & server does not share the same database ID
# (source_reference); this must be checked using constraint
for publication in catalog_tool(portal_type='SyncML Publication',
source_reference=sync_id,
validation_state='validated'):
if publication.getUrlString() == syncml_request.header['target']:
# Do not process in activity first checking, a message ordering
# is required by protocol specification, only use activity when no
# race condition can happen (ie no final tag)
# XXX For now do in activity otherwise we never answer the HTTP request
# directly and so it ends with client/server stuck waiting for answer and
# on the other side we are doing an http request to send them
if publication.getIsActivityEnabled():
return self.activate(
activity="SQLQueue",
tag=publication.getRelativeUrl(),
priority=ACTIVITY_PRIORITY-1).processServerSynchronization(
publication.getPath(), text)
else:
return self.processServerSynchronization(publication.getPath(), text)
for subscription in catalog_tool(portal_type='SyncML Subscription',
source_reference=sync_id,
validation_state='validated'):
if subscription.getSubscriptionUrlString() == syncml_request.header['target']:
if subscription.getIsActivityEnabled():
return self.activate(activity="SQLQueue",
priority=ACTIVITY_PRIORITY-1).processClientSynchronization(
subscription.getPath(),text)
else:
return self.processClientSynchronization(subscription.getPath(), text)
# XXX maybe it is better to generate a syncml error message
raise ValueError("Impossible to find a pub/sub to process message %s:%s"
% (sync_id, syncml_request.header['target']))
# we use from only if we have a file
elif isinstance(from_url, basestring):
if from_url.startswith('file:'):
filename = from_url[len('file:'):]
xml = None
try:
stream = file(filename, 'r')
except IOError:
# XXX-Aurel : Why raising here make unit tests to fail ?
# raise ValueError("Impossible to read file %s, error is %s"
# % (filename, msg))
pass
else:
xml = stream.read()
stream.close()
syncml_logger.debug('readResponse xml from file is %s' % (xml,))
if xml:
return xml
#
# End of part managing protocols
#
#
# Following methods are related to server (Publication)
#
security.declarePrivate('processServerSynchronization')
def processServerSynchronization(self, publication_path, msg=None):
"""
This is the synchronization method for the server
"""
# Read the request from the client
publication = self.unrestrictedTraverse(publication_path)
if publication.getIsActivityEnabled():
engine = asynchronous_engine
else:
engine = synchronous_engine
if msg is None:
# Read message from file
msg = self.readResponse(from_url=publication.getUrlString(),
sync_id=publication.getSourceReference())
if msg is not None:
syncml_request = SyncMLRequest(msg)
#syncml_logger.info("\tXML received from client %s" %(str(syncml_request)))
# Get the subscriber
subscription_url = syncml_request.header['source']
subscriber = publication.getSubscriber(subscription_url) # XXX method to be renamed
# Alert commands are generated at initialization phase or when client ask
# for the remaining messages
database_alert_list = checkAlertCommand(syncml_request)
assert len(database_alert_list) <= 1, "Multi-databases sync no supported"
if len(database_alert_list):
# We are initializing the synchronization
if subscriber and subscriber.getSynchronizationState() not in \
("not_running", "initializing", "finished"):
syncml_logger.error(
'Trying to start a synchronization on server side : %s although synchronisation is already running'
% (subscriber.getPath(),))
# Prevent initilisation if sync already running
return
syncml_response = engine.processServerInitialization(
publication=publication,
syncml_request=syncml_request,
subscriber=subscriber,
alert_dict=database_alert_list[0])
else:
if not subscriber:
raise ValueError("First synchronization message must contains alert command")
else:
# Let engine manage the synchronization
try:
return engine.processServerSynchronization(subscriber, syncml_request)
except SynchronizationError:
return
else:
# This must be implemented following the syncml protocol, not with this hack
raise NotImplementedError("Starting sync process from server is forbidden")
# Return message for unit test purpose
return str(syncml_response)
#
# Following methods are related to client (subscription)
#
security.declareProtected(Permissions.ModifyPortalContent,
'processClientSynchronization')
def processClientSynchronization(self, subscription_path, msg=None):
"""
This is the synchronization method for the client
This is the first method called to launch a synchronization process
"""
subscription = self.unrestrictedTraverse(subscription_path)
if subscription.getIsActivityEnabled():
engine = asynchronous_engine
else:
engine = synchronous_engine
if msg is None and subscription.getSubscriptionUrlString('').find('file') >= 0:
# XXX This is a hack for unit test only, must be removed
msg = self.readResponse(sync_id=subscription.getDestinationReference(),
from_url=subscription.getSubscriptionUrlString())
if msg is None:
# This is a synchronization initialisation call
# Even if call on asynchronous engine, this will not use activities
syncml_response = engine.initializeClientSynchronization(subscription)
else:
syncml_request = SyncMLRequest(msg)
if not subscription.checkCorrectRemoteMessageId(
syncml_request.header['message_id']):
# Message already processed, resend the response
# XXX How to make sure we send the good last response ?
raise NotImplementedError
else:
return engine.processClientSynchronization(syncml_request, subscription)
# Send the message
if subscription.getIsActivityEnabled():
subscription.activate(
after_tag="%s_reset" %(subscription.getPath(),),
activity="SQLQueue",
after_method_id=('processServerSynchronization',
'SQLCatalog_indexSyncMLDocumentList'),
priority=ACTIVITY_PRIORITY,
tag=subscription.getRelativeUrl()).sendMessage(str(syncml_response))
else:
subscription.sendMessage(str(syncml_response))
return str(syncml_response)
InitializeClass(SynchronizationTool)