SynchronizationTool.py 38.3 KB
Newer Older
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
## Copyright (c) 2002 Nexedi SARL and Contributors. All Rights Reserved.
#          Sebastien Robin <seb@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################

27
"""
Jean-Paul Smets's avatar
Jean-Paul Smets committed
28 29 30 31
ERP portal_synchronizations tool.
"""

from OFS.SimpleItem import SimpleItem
32 33
from Products.ERP5Type.Document.Folder import Folder
from Products.ERP5Type.Base import Base
Jean-Paul Smets's avatar
Jean-Paul Smets committed
34 35 36 37 38
from Products.CMFCore.utils import UniqueObject
from Globals import InitializeClass, DTMLFile, PersistentMapping, Persistent
from AccessControl import ClassSecurityInfo, getSecurityManager
from Products.CMFCore import CMFCorePermissions
from Products.ERP5SyncML import _dtmldir
39
from Products.ERP5SyncML import Conduit
Jean-Paul Smets's avatar
Jean-Paul Smets committed
40
from Publication import Publication,Subscriber
41
from Products.BTreeFolder2.BTreeFolder2 import BTreeFolder2
Jean-Paul Smets's avatar
Jean-Paul Smets committed
42 43
from Subscription import Subscription,Signature
from xml.dom.ext.reader.Sax2 import FromXmlStream, FromXml
Sebastien Robin's avatar
Sebastien Robin committed
44
from xml.dom.minidom import parse, parseString
Sebastien Robin's avatar
Sebastien Robin committed
45
from Products.ERP5Type import Permissions
Jean-Paul Smets's avatar
Jean-Paul Smets committed
46 47
from PublicationSynchronization import PublicationSynchronization
from SubscriptionSynchronization import SubscriptionSynchronization
48
from Products.CMFCore.utils import getToolByName
49
from AccessControl.SecurityManagement import newSecurityManager
50
from AccessControl.SecurityManagement import noSecurityManager
51
from AccessControl.User import UnrestrictedUser
Sebastien Robin's avatar
Sebastien Robin committed
52
from Acquisition import aq_base
53
from xml.parsers.expat import ExpatError # parseString error
54
import urllib
55
import urllib2
56
import socket
57
import os
Jean-Paul Smets's avatar
Jean-Paul Smets committed
58
import string
59 60
import commands
import random
61
from zLOG import LOG
Jean-Paul Smets's avatar
Jean-Paul Smets committed
62

63

64
# XXX need to import here all conduits so getattr(conduit_name) works
Jean-Paul Smets's avatar
Jean-Paul Smets committed
65
from Conduit.ERP5Conduit import ERP5Conduit
66
from Conduit.ERP5ShopOrderConduit import ERP5ShopOrderConduit
Jean-Paul Smets's avatar
Jean-Paul Smets committed
67

68 69
class SynchronizationTool( SubscriptionSynchronization, PublicationSynchronization, 
                           UniqueObject, Folder, Base):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
70 71 72 73 74 75 76 77
  """
    This tool implements the synchronization algorithm
  """


  id       = 'portal_synchronizations'
  meta_type    = 'ERP5 Synchronizations'

78 79 80 81
  # On the server, this is use to keep track of the temporary
  # copies.
  objectsToRemove = [] 
  
Jean-Paul Smets's avatar
Jean-Paul Smets committed
82 83 84 85 86 87 88 89 90 91 92 93 94
  security = ClassSecurityInfo()

  #
  #  Default values.
  #
  list_publications = PersistentMapping()
  list_subscriptions = PersistentMapping()

  # Do we want to use emails ?
  #email = None
  email = 1
  same_export = 1

95 96 97 98
  # Multiple inheritance inconsistency caused by Base must be circumvented
  def __init__( self, *args, **kwargs ):
    Folder.__init__(self, self.id, **kwargs)

Jean-Paul Smets's avatar
Jean-Paul Smets committed
99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115

  #
  #  ZMI methods
  #
  manage_options = ( ( { 'label'   : 'Overview'
             , 'action'   : 'manage_overview'
             }
            , { 'label'   : 'Publications'
             , 'action'   : 'managePublications'
             }
            , { 'label'   : 'Subscriptions'
             , 'action'   : 'manageSubscriptions'
             }
            , { 'label'   : 'Conflicts'
             , 'action'   : 'manageConflicts'
             }
            )
116
           + Folder.manage_options
Jean-Paul Smets's avatar
Jean-Paul Smets committed
117 118 119 120 121 122 123 124 125 126 127
           )

  security.declareProtected( CMFCorePermissions.ManagePortal
               , 'manage_overview' )
  manage_overview = DTMLFile( 'dtml/explainSynchronizationTool', globals() )

  security.declareProtected( CMFCorePermissions.ManagePortal
               , 'managePublications' )
  managePublications = DTMLFile( 'dtml/managePublications', globals() )

  security.declareProtected( CMFCorePermissions.ManagePortal
128 129
               , 'manage_addPublicationForm' )
  manage_addPublicationForm = DTMLFile( 'dtml/manage_addPublication', globals() )
Jean-Paul Smets's avatar
Jean-Paul Smets committed
130 131 132 133 134 135 136 137 138 139

  security.declareProtected( CMFCorePermissions.ManagePortal
               , 'manageSubsciptions' )
  manageSubscriptions = DTMLFile( 'dtml/manageSubscriptions', globals() )

  security.declareProtected( CMFCorePermissions.ManagePortal
               , 'manageConflicts' )
  manageConflicts = DTMLFile( 'dtml/manageConflicts', globals() )

  security.declareProtected( CMFCorePermissions.ManagePortal
140 141
               , 'manage_addSubscriptionForm' )
  manage_addSubscriptionForm = DTMLFile( 'dtml/manage_addSubscription', globals() )
Jean-Paul Smets's avatar
Jean-Paul Smets committed
142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161

  security.declareProtected( CMFCorePermissions.ManagePortal
               , 'editProperties' )
  def editProperties( self
           , publisher=None
           , REQUEST=None
           ):
    """
      Form handler for "tool-wide" properties (including list of
      metadata elements).
    """
    if publisher is not None:
      self.publisher = publisher

    if REQUEST is not None:
      REQUEST[ 'RESPONSE' ].redirect( self.absolute_url()
                    + '/propertiesForm'
                    + '?manage_tabs_message=Tool+updated.'
                    )

162
  security.declareProtected(Permissions.ModifyPortalContent, 'manage_addPublication')
163
  def manage_addPublication(self, title, publication_url, destination_path,
164
            query, xml_mapping, conduit, gpg_key, RESPONSE=None):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
165 166 167
    """
      create a new publication
    """
168 169 170
    #if not('publications' in self.objectIds()):
    #  publications = Folder('publications')
    #  self._setObject(publications.id, publications)
171
    folder = self.getObjectContainer()
172 173
    new_id = self.getPublicationIdFromTitle(title)
    pub = Publication(new_id, title, publication_url, destination_path,
174
                      query, xml_mapping, conduit, gpg_key)
175
    folder._setObject( new_id, pub )
176 177 178
    #if len(self.list_publications) == 0:
    #  self.list_publications = PersistentMapping()
    #self.list_publications[id] = pub
Jean-Paul Smets's avatar
Jean-Paul Smets committed
179 180 181
    if RESPONSE is not None:
      RESPONSE.redirect('managePublications')

182
  security.declareProtected(Permissions.ModifyPortalContent, 'manage_addSubscription')
183
  def manage_addSubscription(self, title, publication_url, subscription_url,
184
                       destination_path, query, xml_mapping, conduit, gpg_key, RESPONSE=None):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
185
    """
Sebastien Robin's avatar
Sebastien Robin committed
186
      XXX should be renamed as addSubscription
Jean-Paul Smets's avatar
Jean-Paul Smets committed
187 188
      create a new subscription
    """
189 190 191
    #if not('subscriptions' in self.objectIds()):
    #  subscriptions = Folder('subscriptions')
    #  self._setObject(subscriptions.id, subscriptions)
192
    folder = self.getObjectContainer()
193 194
    new_id = self.getSubscriptionIdFromTitle(title)
    sub = Subscription(new_id, title, publication_url, subscription_url,
195
                       destination_path, query, xml_mapping, conduit, gpg_key)
196
    folder._setObject( new_id, sub )
197 198 199
    #if len(self.list_subscriptions) == 0:
    #  self.list_subscriptions = PersistentMapping()
    #self.list_subscriptions[id] = sub
Jean-Paul Smets's avatar
Jean-Paul Smets committed
200 201 202
    if RESPONSE is not None:
      RESPONSE.redirect('manageSubscriptions')

203
  security.declareProtected(Permissions.ModifyPortalContent, 'manage_editPublication')
204
  def manage_editPublication(self, title, publication_url, destination_path,
205
                       query, xml_mapping, conduit, gpg_key, RESPONSE=None):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
206 207 208
    """
      modify a publication
    """
209
    pub = self.getPublication(title)
210 211 212 213
    pub.setTitle(title)
    pub.setPublicationUrl(publication_url)
    pub.setDestinationPath(destination_path)
    pub.setQuery(query)
214
    pub.setConduit(conduit)
215 216
    pub.setXMLMapping(xml_mapping)
    pub.setGPGKey(gpg_key)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
217 218 219
    if RESPONSE is not None:
      RESPONSE.redirect('managePublications')

220
  security.declareProtected(Permissions.ModifyPortalContent, 'manage_editSubscription')
221
  def manage_editSubscription(self, title, publication_url, subscription_url,
222
             destination_path, query, xml_mapping, conduit, gpg_key, RESPONSE=None):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
223 224 225
    """
      modify a subscription
    """
226
    sub = self.getSubscription(title)
227 228 229 230
    sub.setTitle(title)
    sub.setPublicationUrl(publication_url)
    sub.setDestinationPath(destination_path)
    sub.setQuery(query)
231
    sub.setConduit(conduit)
232 233 234
    sub.setXMLMapping(xml_mapping)
    sub.setGPGKey(gpg_key)
    sub.setSubscriptionUrl(subscription_url)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
235 236 237
    if RESPONSE is not None:
      RESPONSE.redirect('manageSubscriptions')

238
  security.declareProtected(Permissions.ModifyPortalContent, 'manage_deletePublication')
239
  def manage_deletePublication(self, title, RESPONSE=None):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
240 241 242
    """
      delete a publication
    """
243
    id = self.getPublicationIdFromTitle(title)
244 245
    folder = self.getObjectContainer()
    folder._delObject(id)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
246 247 248
    if RESPONSE is not None:
      RESPONSE.redirect('managePublications')

249
  security.declareProtected(Permissions.ModifyPortalContent, 'manage_deleteSubscription')
250
  def manage_deleteSubscription(self, title, RESPONSE=None):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
251 252 253
    """
      delete a subscription
    """
254
    id = self.getSubscriptionIdFromTitle(title)
255 256
    folder = self.getObjectContainer()
    folder._delObject(id)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
257 258 259
    if RESPONSE is not None:
      RESPONSE.redirect('manageSubscriptions')

260
  security.declareProtected(Permissions.ModifyPortalContent, 'manage_resetPublication')
261
  def manage_resetPublication(self, title, RESPONSE=None):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
262 263 264
    """
      reset a publication
    """
265
    pub = self.getPublication(title)
266
    pub.resetAllSubscribers()
Jean-Paul Smets's avatar
Jean-Paul Smets committed
267 268 269
    if RESPONSE is not None:
      RESPONSE.redirect('managePublications')

270
  security.declareProtected(Permissions.ModifyPortalContent, 'manage_resetSubscription')
271
  def manage_resetSubscription(self, title, RESPONSE=None):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
272 273 274
    """
      reset a subscription
    """
275
    sub = self.getSubscription(title)
276 277
    sub.resetAllSignatures()
    sub.resetAnchors()
Jean-Paul Smets's avatar
Jean-Paul Smets committed
278 279 280
    if RESPONSE is not None:
      RESPONSE.redirect('manageSubscriptions')

281 282 283 284 285 286 287 288 289
  security.declareProtected(Permissions.ModifyPortalContent, 'manage_syncSubscription')
  def manage_syncSubscription(self, title, RESPONSE=None):
    """
      reset a subscription
    """
    self.SubSync(title)
    if RESPONSE is not None:
      RESPONSE.redirect('manageSubscriptions')

Sebastien Robin's avatar
Sebastien Robin committed
290
  security.declareProtected(Permissions.AccessContentsInformation,'getPublicationList')
Jean-Paul Smets's avatar
Jean-Paul Smets committed
291 292 293 294
  def getPublicationList(self):
    """
      Return a list of publications
    """
295 296
    folder = self.getObjectContainer()
    object_list = folder.objectValues()
297 298
    object_list = filter(lambda x: x.id.find('pub')==0,object_list)
    return object_list
Jean-Paul Smets's avatar
Jean-Paul Smets committed
299

300
  security.declareProtected(Permissions.AccessContentsInformation,'getPublication')
301
  def getPublication(self, title):
302
    """
303
      Return the  publications with this id
304
    """
305 306 307
    for p in self.getPublicationList():
      if p.getTitle() == title:
        return p
308
    return None
309

310 311 312 313 314 315 316 317 318 319 320 321
  security.declareProtected(Permissions.AccessContentsInformation,'getObjectContainer')
  def getObjectContainer(self):
    """
    this returns the external mount point if there is one
    """
    folder = self
    portal_url = getToolByName(self,'portal_url')
    root = portal_url.getPortalObject().aq_parent
    if 'external_mount_point' in root.objectIds():
      folder = root.external_mount_point
    return folder

Sebastien Robin's avatar
Sebastien Robin committed
322
  security.declareProtected(Permissions.AccessContentsInformation,'getSubscriptionList')
Jean-Paul Smets's avatar
Jean-Paul Smets committed
323 324 325 326
  def getSubscriptionList(self):
    """
      Return a list of publications
    """
327 328
    folder = self.getObjectContainer()
    object_list = folder.objectValues()
329 330
    object_list = filter(lambda x: x.id.find('sub')==0,object_list)
    return object_list
Jean-Paul Smets's avatar
Jean-Paul Smets committed
331

332
  def getSubscription(self, title):
333 334 335
    """
      Returns the subscription with this id
    """
336 337 338
    for s in self.getSubscriptionList():
      if s.getTitle() == title:
        return s
339 340 341
    return None


Sebastien Robin's avatar
Sebastien Robin committed
342
  security.declareProtected(Permissions.AccessContentsInformation,'getSynchronizationList')
343
  def getSynchronizationList(self):
344 345
    """
      Returns the list of subscriptions and publications
Sebastien Robin's avatar
Sebastien Robin committed
346

347 348 349
    """
    return self.getSubscriptionList() + self.getPublicationList()

Sebastien Robin's avatar
Sebastien Robin committed
350
  security.declareProtected(Permissions.AccessContentsInformation,'getSubscriberList')
351 352 353 354 355 356 357 358 359 360
  def getSubscriberList(self):
    """
      Returns the list of subscribers and subscriptions
    """
    s_list = []
    s_list += self.getSubscriptionList()
    for publication in self.getPublicationList():
      s_list += publication.getSubscriberList()
    return s_list

Sebastien Robin's avatar
Sebastien Robin committed
361
  security.declareProtected(Permissions.AccessContentsInformation,'getConflictList')
362
  def getConflictList(self, context=None):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
363 364 365 366
    """
    Retrieve the list of all conflicts
    Here the list is as follow :
    [conflict_1,conflict2,...] where conflict_1 is like:
367
    ['publication',publication_id,object.getPath(),property_id,publisher_value,subscriber_value]
Jean-Paul Smets's avatar
Jean-Paul Smets committed
368
    """
369
    path = self.resolveContext(context)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
370 371
    conflict_list = []
    for publication in self.getPublicationList():
Sebastien Robin's avatar
Sebastien Robin committed
372 373 374 375
      for subscriber in publication.getSubscriberList():
        sub_conflict_list = subscriber.getConflictList()
        for conflict in sub_conflict_list:
          #conflict.setDomain('Publication')
376
          conflict.setSubscriber(subscriber)
Sebastien Robin's avatar
Sebastien Robin committed
377
          #conflict.setDomainId(subscriber.getId())
378 379
          if path is None or conflict.getObjectPath() == path:
            conflict_list += [conflict.__of__(subscriber)]
Jean-Paul Smets's avatar
Jean-Paul Smets committed
380 381 382
    for subscription in self.getSubscriptionList():
      sub_conflict_list = subscription.getConflictList()
      for conflict in sub_conflict_list:
383
        #conflict.setDomain('Subscription')
384
        conflict.setSubscriber(subscription)
Sebastien Robin's avatar
Sebastien Robin committed
385
        #conflict.setDomainId(subscription.getId())
386 387 388 389 390 391 392 393
        if path is None or conflict.getObjectPath() == path:
          conflict_list += [conflict.__of__(subscription)]
    #if path is not None: # Retrieve only conflicts for a given path
    #  new_list = []
    #  for conflict in conflict_list:
    #    if conflict.getObjectPath() == path:
    #      new_list += [conflict.__of__(self)]
    #  return new_list
Jean-Paul Smets's avatar
Jean-Paul Smets committed
394 395
    return conflict_list

396 397 398 399 400 401 402 403 404
  security.declareProtected(Permissions.AccessContentsInformation,'getDocumentConflictList')
  def getDocumentConflictList(self, context=None):
    """
    Retrieve the list of all conflicts for a given document
    Well, this is the same thing as getConflictList with a path
    """
    return self.getConflictList(context)


Sebastien Robin's avatar
Sebastien Robin committed
405
  security.declareProtected(Permissions.AccessContentsInformation,'getSynchronizationState')
406
  def getSynchronizationState(self, context):
407
    """
408
    context : the context on which we are looking for state
409

410 411 412
    This functions have to retrieve the synchronization state,
    it will first look in the conflict list, if nothing is found,
    then we have to check on a publication/subscription.
413

414
    This method returns a mapping between subscription and states
Sebastien Robin's avatar
Sebastien Robin committed
415 416 417 418 419

    JPS suggestion:
      path -> object, document, context, etc.
      type -> '/titi/toto' or ('','titi', 'toto') or <Base instance 1562567>
      object = self.resolveContext(context) (method to add)
420
    """
421
    path = self.resolveContext(context)
422 423 424 425 426 427
    conflict_list = self.getConflictList()
    state_list= []
    LOG('getSynchronizationState',0,'path: %s' % str(path))
    for conflict in conflict_list:
      if conflict.getObjectPath() == path:
        LOG('getSynchronizationState',0,'found a conflict: %s' % str(conflict))
428
        state_list += [[conflict.getSubscriber(),self.CONFLICT]]
429
    for domain in self.getSynchronizationList():
430 431 432 433 434 435 436 437 438 439 440 441
      destination = domain.getDestinationPath()
      LOG('getSynchronizationState',0,'destination: %s' % str(destination))
      j_path = '/'.join(path)
      LOG('getSynchronizationState',0,'j_path: %s' % str(j_path))
      if j_path.find(destination)==0:
        o_id = j_path[len(destination)+1:].split('/')[0]
        LOG('getSynchronizationState',0,'o_id: %s' % o_id)
        subscriber_list = []
        if domain.domain_type==self.PUB:
          subscriber_list = domain.getSubscriberList()
        else:
          subscriber_list = [domain]
442
        LOG('getSynchronizationState, subscriber_list:',0,subscriber_list)
443 444 445 446
        for subscriber in subscriber_list:
          signature = subscriber.getSignature(o_id)
          if signature is not None:
            state = signature.getStatus()
447 448
            LOG('getSynchronizationState:',0,'sub.dest :%s, state: %s' % \
                                   (subscriber.getSubscriptionUrl(),str(state)))
449 450 451 452 453 454 455 456
            found = None
            # Make sure there is not already a conflict giving the state
            for state_item in state_list:
              if state_item[0]==subscriber:
                found = 1
            if found is None:
              state_list += [[subscriber,state]]
    return state_list
457

458 459
  security.declareProtected(Permissions.ModifyPortalContent, 'applyPublisherValue')
  def applyPublisherValue(self, conflict):
Sebastien Robin's avatar
Sebastien Robin committed
460 461 462 463 464
    """
      after a conflict resolution, we have decided
      to keep the local version of an object
    """
    object = self.unrestrictedTraverse(conflict.getObjectPath())
465
    subscriber = conflict.getSubscriber()
Sebastien Robin's avatar
Sebastien Robin committed
466
    # get the signature:
Sebastien Robin's avatar
Sebastien Robin committed
467
    LOG('p_sync.applyPublisherValue, subscriber: ',0,subscriber)
Sebastien Robin's avatar
Sebastien Robin committed
468
    signature = subscriber.getSignature(object.getId()) # XXX may be change for rid
469 470
    copy_path = conflict.getCopyPath()
    LOG('p_sync.applyPublisherValue, copy_path: ',0,copy_path)
Sebastien Robin's avatar
Sebastien Robin committed
471 472
    signature.delConflict(conflict)
    if signature.getConflictList() == []:
473 474 475 476 477 478 479 480 481 482 483 484 485
      if copy_path is not None:
        LOG('p_sync.applyPublisherValue, conflict_list empty on : ',0,signature)
        # Delete the copy of the object if the there is one
        directory = object.aq_parent
        copy_id = copy_path[-1]
        LOG('p_sync.applyPublisherValue, copy_id: ',0,copy_id)
        if hasattr(directory.aq_base, 'hasObject'):
          # optimize the case of a BTree folder
          LOG('p_sync.applyPublisherValue, deleting...: ',0,copy_id)
          if directory.hasObject(copy_id):
            directory._delObject(copy_id)
        elif copy_id in directory.objectIds():
          directory._delObject(copy_id)
Sebastien Robin's avatar
Sebastien Robin committed
486 487
      signature.setStatus(self.PUB_CONFLICT_MERGE)

488 489 490 491 492 493
  security.declareProtected(Permissions.ModifyPortalContent, 'applyPublisherDocument')
  def applyPublisherDocument(self, conflict):
    """
    apply the publisher value for all conflict of the given document
    """
    subscriber = conflict.getSubscriber()
Sebastien Robin's avatar
Sebastien Robin committed
494
    LOG('applyPublisherDocument, subscriber: ',0,subscriber)
495 496
    for c in self.getConflictList(conflict.getObjectPath()):
      if c.getSubscriber() == subscriber:
Sebastien Robin's avatar
Sebastien Robin committed
497
        LOG('applyPublisherDocument, applying on conflict: ',0,conflict)
498 499
        c.applyPublisherValue()

500
  security.declareProtected(Permissions.AccessContentsInformation, 'getPublisherDocumentPath')
501 502 503 504 505 506 507
  def getPublisherDocumentPath(self, conflict):
    """
    apply the publisher value for all conflict of the given document
    """
    subscriber = conflict.getSubscriber()
    return conflict.getObjectPath()

508
  security.declareProtected(Permissions.AccessContentsInformation, 'getPublisherDocument')
509 510 511 512 513 514 515 516 517 518
  def getPublisherDocument(self, conflict):
    """
    apply the publisher value for all conflict of the given document
    """
    publisher_object_path = self.getPublisherDocumentPath(conflict)
    LOG('getPublisherDocument publisher_object_path',0,publisher_object_path)
    publisher_object = self.unrestrictedTraverse(publisher_object_path)
    LOG('getPublisherDocument publisher_object',0,publisher_object)
    return publisher_object

519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539

  def getSubscriberDocumentVersion(self, conflict, docid):
    """
    Given a 'conflict' and a 'docid' refering to a new version of a
    document, applies the conflicting changes to the document's new
    version. By so, two differents versions of the same document will be
    available.
    Thus, the manager will be able to open both version of the document
    before selecting which one to keep.
    """
    
    subscriber = conflict.getSubscriber()
    publisher_object_path = conflict.getObjectPath()
    publisher_object = self.unrestrictedTraverse(publisher_object_path)
    publisher_xml = self.getXMLObject(object=publisher_object,xml_mapping\
                                            = subscriber.getXMLMapping())

    directory = publisher_object.aq_parent
    object_id = docid
    if object_id in directory.objectIds():
        directory._delObject(object_id)
540 541 542
        #conduit = ERP5Conduit()
        conduit_name = subscriber.getConduit()
        conduit = getattr(getattr(Conduit,conduit_name),conduit_name)()
543 544 545 546 547 548 549
        conduit.addNode(xml=publisher_xml,object=directory,object_id=object_id)
        subscriber_document = directory._getOb(object_id)
        for c in self.getConflictList(conflict.getObjectPath()):
            if c.getSubscriber() == subscriber:
                c.applySubscriberValue(object=subscriber_document)
        return subscriber_document

550 551 552 553 554 555 556 557 558 559 560 561 562
  def _getCopyId(self, object):
    directory = object.aq_inner.aq_parent
    if directory.getId() != 'portal_repository':    
      object_id = object.getId() + '_conflict_copy'
      if object_id in directory.objectIds():
        directory._delObject(object_id)
    else:
      repotool = directory
      docid, rev = repotool.getDocidAndRevisionFromObjectId(object.getId())
      new_rev = repotool.getFreeRevision(docid) + 10 # make sure it's not gonna provoke conflicts
      object_id = repotool._getId(docid, new_rev)
    return object_id
  
563
  security.declareProtected(Permissions.AccessContentsInformation, 'getSubscriberDocumentPath')
564 565 566 567
  def getSubscriberDocumentPath(self, conflict):
    """
    apply the publisher value for all conflict of the given document
    """
568 569 570
    copy_path = conflict.getCopyPath()
    if copy_path is not None:
        return copy_path
571 572 573 574
    subscriber = conflict.getSubscriber()
    publisher_object_path = conflict.getObjectPath()
    publisher_object = self.unrestrictedTraverse(publisher_object_path)
    publisher_xml = self.getXMLObject(object=publisher_object,xml_mapping = subscriber.getXMLMapping())
575 576
    directory = publisher_object.aq_inner.aq_parent
    object_id = self._getCopyId(publisher_object)
577 578 579
    #conduit = ERP5Conduit()
    conduit_name = subscriber.getConduit()
    conduit = getattr(getattr(Conduit,conduit_name),conduit_name)()
580 581
    conduit.addNode(xml=publisher_xml,object=directory,object_id=object_id)
    subscriber_document = directory._getOb(object_id)
582
    subscriber_document._conflict_resolution = 1
583 584 585
    for c in self.getConflictList(conflict.getObjectPath()):
      if c.getSubscriber() == subscriber:
        c.applySubscriberValue(object=subscriber_document)
586 587 588 589
    copy_path = subscriber_document.getPhysicalPath()
    conflict.setCopyPath(copy_path)
    return copy_path
    
590
  security.declareProtected(Permissions.AccessContentsInformation, 'getSubscriberDocument')
591 592 593 594 595 596 597 598
  def getSubscriberDocument(self, conflict):
    """
    apply the publisher value for all conflict of the given document
    """
    subscriber_object_path = self.getSubscriberDocumentPath(conflict)
    subscriber_object = self.unrestrictedTraverse(subscriber_object_path)
    return subscriber_object

599 600 601 602 603 604 605 606 607 608 609
  security.declareProtected(Permissions.ModifyPortalContent, 'applySubscriberDocument')
  def applySubscriberDocument(self, conflict):
    """
    apply the subscriber value for all conflict of the given document
    """
    subscriber = conflict.getSubscriber()
    for c in self.getConflictList(conflict.getObjectPath()):
      if c.getSubscriber() == subscriber:
        c.applySubscriberValue()

  security.declareProtected(Permissions.ModifyPortalContent, 'applySubscriberValue')
610
  def applySubscriberValue(self, conflict,object=None):
Sebastien Robin's avatar
Sebastien Robin committed
611 612 613 614
    """
      after a conflict resolution, we have decided
      to keep the local version of an object
    """
615 616 617 618 619 620 621
    solve_conflict = 1
    if object is None:
      object = self.unrestrictedTraverse(conflict.getObjectPath())
    else:
      # This means an object was given, this is used in order
      # to see change on a copy, so don't solve conflict
      solve_conflict=0
622
    subscriber = conflict.getSubscriber()
Sebastien Robin's avatar
Sebastien Robin committed
623 624 625
    # get the signature:
    LOG('p_sync.setRemoteObject, subscriber: ',0,subscriber)
    signature = subscriber.getSignature(object.getId()) # XXX may be change for rid
626 627 628
    #conduit = ERP5Conduit()
    conduit_name = subscriber.getConduit()
    conduit = getattr(getattr(Conduit,conduit_name),conduit_name)()
Sebastien Robin's avatar
Sebastien Robin committed
629 630
    for xupdate in conflict.getXupdateList():
      conduit.updateNode(xml=xupdate,object=object,force=1)
631
    if solve_conflict:
632
      copy_path = conflict.getCopyPath()
633 634
      signature.delConflict(conflict)
      if signature.getConflictList() == []:
635 636 637 638 639 640 641 642 643 644
        if copy_path is not None:
          # Delete the copy of the object if the there is one
          directory = object.aq_parent
          copy_id = copy_path[-1]
          if hasattr(directory.aq_base, 'hasObject'):
            # optimize the case of a BTree folder
            if directory.hasObject(id):
              directory._delObject(copy_id)
          elif copy_id in directory.objectIds():
            directory._delObject(copy_id)
645
        signature.setStatus(self.PUB_CONFLICT_MERGE)
Sebastien Robin's avatar
Sebastien Robin committed
646 647

  security.declareProtected(Permissions.ModifyPortalContent, 'manageLocalValue')
648
  def managePublisherValue(self, subscription_url, property_id, object_path, RESPONSE=None):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
649 650 651
    """
    Do whatever needed in order to store the local value on
    the remote server
Sebastien Robin's avatar
Sebastien Robin committed
652 653 654 655 656

    Suggestion (API)
      add method to view document with applied xupdate
      of a given subscriber XX (ex. viewSubscriberDocument?path=ddd&subscriber_id=dddd)
      Version=Version CPS
Jean-Paul Smets's avatar
Jean-Paul Smets committed
657 658
    """
    # Retrieve the conflict object
Sebastien Robin's avatar
Sebastien Robin committed
659
    LOG('manageLocalValue',0,'%s %s %s' % (str(subscription_url),
660
                                           str(property_id),
Sebastien Robin's avatar
Sebastien Robin committed
661 662 663
                                           str(object_path)))
    for conflict in self.getConflictList():
      LOG('manageLocalValue, conflict:',0,conflict)
664 665
      if conflict.getPropertyId() == property_id:
        LOG('manageLocalValue',0,'found the property_id')
Sebastien Robin's avatar
Sebastien Robin committed
666
        if '/'.join(conflict.getObjectPath())==object_path:
667
          if conflict.getSubscriber().getSubscriptionUrl()==subscription_url:
668
            conflict.applyPublisherValue()
Jean-Paul Smets's avatar
Jean-Paul Smets committed
669 670 671
    if RESPONSE is not None:
      RESPONSE.redirect('manageConflicts')

672
  security.declareProtected(Permissions.ModifyPortalContent, 'manageSubscriberValue')
673
  def manageSubscriberValue(self, subscription_url, property_id, object_path, RESPONSE=None):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
674 675 676 677
    """
    Do whatever needed in order to store the remote value locally
    and confirmed that the remote box should keep it's value
    """
Sebastien Robin's avatar
Sebastien Robin committed
678
    LOG('manageLocalValue',0,'%s %s %s' % (str(subscription_url),
679
                                           str(property_id),
Sebastien Robin's avatar
Sebastien Robin committed
680 681 682
                                           str(object_path)))
    for conflict in self.getConflictList():
      LOG('manageLocalValue, conflict:',0,conflict)
683 684
      if conflict.getPropertyId() == property_id:
        LOG('manageLocalValue',0,'found the property_id')
Sebastien Robin's avatar
Sebastien Robin committed
685
        if '/'.join(conflict.getObjectPath())==object_path:
686
          if conflict.getSubscriber().getSubscriptionUrl()==subscription_url:
687
            conflict.applySubscriberValue()
Jean-Paul Smets's avatar
Jean-Paul Smets committed
688 689
    if RESPONSE is not None:
      RESPONSE.redirect('manageConflicts')
690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713
  
  security.declareProtected(Permissions.ModifyPortalContent, 'manageSubscriberDocument')
  def manageSubscriberDocument(self, subscription_url, object_path):
    """
    """
    for conflict in self.getConflictList():
      if '/'.join(conflict.getObjectPath())==object_path:
        if conflict.getSubscriber().getSubscriptionUrl()==subscription_url:
          conflict.applySubscriberDocument()
          break
    self.managePublisherDocument(object_path)
  
  security.declareProtected(Permissions.ModifyPortalContent, 'managePublisherDocument')
  def managePublisherDocument(self, object_path):
    """
    """
    retry = True
    while retry:
      retry = False
      for conflict in self.getConflictList():
        if '/'.join(conflict.getObjectPath())==object_path:
          conflict.applyPublisherDocument()
          retry = True
          break
Jean-Paul Smets's avatar
Jean-Paul Smets committed
714

715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731
  def resolveContext(self, context):
    """
    We try to return a path (like ('','erp5','foo') from the context.
    Context can be :
      - a path
      - an object
      - a string representing a path
    """
    if context is None:
      return context
    elif type(context) is type(()):
      return context
    elif type(context) is type('a'):
      return tuple(context.split('/'))
    else:
      return context.getPhysicalPath()

732
  security.declarePublic('sendResponse')
733
  def sendResponse(self, to_url=None, from_url=None, sync_id=None,xml=None, domain=None, send=1):
734 735 736 737
    """
    We will look at the url and we will see if we need to send mail, http
    response, or just copy to a file.
    """
738
    LOG('sendResponse, self.getPhysicalPath: ',0,self.getPhysicalPath())
739 740 741 742
    LOG('sendResponse, to_url: ',0,to_url)
    LOG('sendResponse, from_url: ',0,from_url)
    LOG('sendResponse, sync_id: ',0,sync_id)
    LOG('sendResponse, xml: ',0,xml)
743 744 745 746 747 748 749
    if domain is not None:
      gpg_key = domain.getGPGKey()
      if gpg_key not in ('',None):
        filename = str(random.randrange(1,2147483600)) + '.txt'
        decrypted = file('/tmp/%s' % filename,'w')
        decrypted.write(xml)
        decrypted.close()
750 751
        (status,output)=commands.getstatusoutput('gzip /tmp/%s' % filename)
        (status,output)=commands.getstatusoutput('gpg --yes --homedir /var/lib/zope/Products/ERP5SyncML/gnupg_keys -r "%s" -se /tmp/%s.gz' % (gpg_key,filename))
752
        LOG('readResponse, gpg output:',0,output)
753
        encrypted = file('/tmp/%s.gz.gpg' % filename,'r')
754 755
        xml = encrypted.read()
        encrypted.close()
756 757 758 759 760
        commands.getstatusoutput('rm -f /tmp/%s.gz' % filename)
        commands.getstatusoutput('rm -f /tmp/%s.gz.gpg' % filename)
    if send:
      if type(to_url) is type('a'):
        if to_url.find('http://')==0:
761 762 763
          # XXX Make sure this is not a problem
          if domain.domain_type == self.PUB:
            return None
764
          # we will send an http response
Sebastien Robin's avatar
Sebastien Robin committed
765
          domain = aq_base(domain)
766
          LOG('sendResponse, will start sendHttpResponse, xml',0,xml)
767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782
          self.activate(activity='RAMQueue').sendHttpResponse(sync_id=sync_id,
                                           to_url=to_url,
                                           xml=xml, domain=domain)
          return None
        elif to_url.find('file://')==0:
          filename = to_url[len('file:/'):]
          stream = file(filename,'w')
          LOG('sendResponse, filename: ',0,filename)
          stream.write(xml)
          stream.close()
          # we have to use local files (unit testing for example
        elif to_url.find('mailto:')==0:
          # we will send an email
          to_address = to_url[len('mailto:'):]
          from_address = from_url[len('mailto:'):]
          self.sendMail(from_address,to_address,sync_id,xml)
783
    return xml
784 785

  security.declarePrivate('sendHttpResponse')
786
  def sendHttpResponse(self, to_url=None, sync_id=None, xml=None, domain=None ):
787
    LOG('sendHttpResponse, self.getPhysicalPath: ',0,self.getPhysicalPath())
788
    LOG('sendHttpResponse, starting with domain:',0,domain)
Sebastien Robin's avatar
Sebastien Robin committed
789
    #LOG('sendHttpResponse, xml:',0,xml)
790 791 792
    if domain is not None:
      if domain.domain_type == self.PUB:
        return xml
793 794 795 796 797 798 799 800 801 802 803 804 805 806 807
    # Retrieve the proxy from os variables
    proxy_url = ''
    if os.environ.has_key('http_proxy'):
      proxy_url = os.environ['http_proxy']
    LOG('sendHttpResponse, proxy_url:',0,proxy_url)
    if proxy_url !='':
      proxy_handler = urllib2.ProxyHandler({"http" :proxy_url})
    else:
      proxy_handler = urllib2.ProxyHandler({})
    pass_mgr = urllib2.HTTPPasswordMgrWithDefaultRealm()
    auth_handler = urllib2.HTTPBasicAuthHandler(pass_mgr)
    proxy_auth_handler = urllib2.ProxyBasicAuthHandler(pass_mgr)
    opener = urllib2.build_opener(proxy_handler, proxy_auth_handler,auth_handler,urllib2.HTTPHandler)
    urllib2.install_opener(opener)
    to_encode = {'text':xml,'sync_id':sync_id}
808
    encoded = urllib.urlencode(to_encode)
809 810
    if to_url.find('readResponse')<0:
      to_url = to_url + '/portal_synchronizations/readResponse'
811
    request = urllib2.Request(url=to_url,data=encoded)
812 813 814 815 816 817 818 819
    #result = urllib2.urlopen(request).read()
    try:
      result = urllib2.urlopen(request).read()
    except socket.error, msg:
      self.activate(activity='RAMQueue').sendHttpResponse(to_url=to_url,sync_id=sync_id,xml=xml,domain=domain)
      LOG('sendHttpResponse, socket ERROR:',0,msg)
      return

820
    
821
    LOG('sendHttpResponse, before result, domain:',0,domain)
Sebastien Robin's avatar
Sebastien Robin committed
822
    #LOG('sendHttpResponse, result:',0,result)
823 824
    if domain is not None:
      if domain.domain_type == self.SUB:
825
        gpg_key = domain.getGPGKey()
826
        if result not in (None,''):
827 828
          #if gpg_key not in ('',None):
          #  result = self.sendResponse(domain=domain,xml=result,send=0)
829 830 831
          uf = self.acl_users
          user = UnrestrictedUser('syncml','syncml',['Manager','Member'],'')
          newSecurityManager(None, user)
832 833
          #self.activate(activity='RAMQueue').readResponse(sync_id=sync_id,text=result)
          self.readResponse(sync_id=sync_id,text=result)
834 835 836 837 838 839 840 841 842 843 844 845 846 847 848

  security.declarePublic('sync')
  def sync(self):
    """
    This will try to synchronize every subscription
    """
    # Login as a manager to make sure we can create objects
    uf = self.acl_users
    user = UnrestrictedUser('syncml','syncml',['Manager','Member'],'')
    newSecurityManager(None, user)
    message_list = self.portal_activities.getMessageList()
    LOG('sync, message_list:',0,message_list)
    if len(message_list) == 0:
      for subscription in self.getSubscriptionList():
        LOG('sync, subcription:',0,subscription)
849
        self.activate(activity='RAMQueue').SubSync(subscription.getTitle())
850 851 852 853 854 855 856 857

  security.declarePublic('readResponse')
  def readResponse(self, text=None, sync_id=None, to_url=None, from_url=None):
    """
    We will look at the url and we will see if we need to send mail, http
    response, or just copy to a file.
    """
    LOG('readResponse, ',0,'starting')
858
    LOG('readResponse, self.getPhysicalPath: ',0,self.getPhysicalPath())
859
    LOG('readResponse, sync_id: ',0,sync_id)
Sebastien Robin's avatar
Sebastien Robin committed
860
    #LOG('readResponse, text:',0,text)
861 862 863 864 865
    # Login as a manager to make sure we can create objects
    uf = self.acl_users
    user = UnrestrictedUser('syncml','syncml',['Manager','Member'],'')
    newSecurityManager(None, user)

866
    if text is not None:
867 868 869 870 871
      # XXX We will look everywhere for a publication/subsription with
      # the id sync_id, this is not so good, but there is no way yet
      # to know if we will call a publication or subscription XXX
      gpg_key = ''
      for publication in self.getPublicationList():
872
        if publication.getTitle()==sync_id:
873 874 875
          gpg_key = publication.getGPGKey()
      if gpg_key == '':
        for subscription in self.getSubscriptionList():
876
          if subscription.getTitle()==sync_id:
877 878 879 880
            gpg_key = subscription.getGPGKey()
      # decrypt the message if needed
      if gpg_key not in (None,''):
        filename = str(random.randrange(1,2147483600)) + '.txt'
881
        encrypted = file('/tmp/%s.gz.gpg' % filename,'w')
882 883
        encrypted.write(text)
        encrypted.close()
884
        (status,output)=commands.getstatusoutput('gpg --homedir /var/lib/zope/Products/ERP5SyncML/gnupg_keys -r "%s"  --decrypt /tmp/%s.gz.gpg > /tmp/%s.gz' % (gpg_key,filename,filename))
885
        LOG('readResponse, gpg output:',0,output)
886
        (status,output)=commands.getstatusoutput('gunzip /tmp/%s.gz' % filename)
887 888
        decrypted = file('/tmp/%s' % filename,'r')
        text = decrypted.read()
889
        LOG('readResponse, text:',0,text)
890 891
        decrypted.close()
        commands.getstatusoutput('rm -f /tmp/%s' % filename)
892
        commands.getstatusoutput('rm -f /tmp/%s.gz.gpg' % filename)
893 894
      # Get the target and then find the corresponding publication or
      # Subscription
895
      LOG('readResponse, xml before parseSTring',0,text)
Sebastien Robin's avatar
Sebastien Robin committed
896
      xml = parseString(text)
897 898 899 900 901 902 903 904 905
      url = ''
      for subnode in self.getElementNodeList(xml):
        if subnode.nodeName == 'SyncML':
          for subnode1 in self.getElementNodeList(subnode):
            if subnode1.nodeName == 'SyncHdr':
              for subnode2 in self.getElementNodeList(subnode1):
                if subnode2.nodeName == 'Target':
                  url = subnode2.childNodes[0].data 
      for publication in self.getPublicationList():
906
        if publication.getPublicationUrl()==url and publication.getTitle()==sync_id:
907
          result = self.PubSync(sync_id,xml)
908 909 910 911
          # Then encrypt the message
          xml = result['xml']
          xml = self.sendResponse(xml=xml,domain=publication,send=0)
          return xml
912
      for subscription in self.getSubscriptionList():
913
        if subscription.getSubscriptionUrl()==url and subscription.getTitle()==sync_id:
914 915
          result = self.activate(activity='RAMQueue').SubSync(sync_id,xml)
          #result = self.SubSync(sync_id,xml)
916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931

    # we use from only if we have a file 
    elif type(from_url) is type('a'):
      if from_url.find('file://')==0:
        try:
          filename = from_url[len('file:/'):]
          stream = file(filename,'r')
          xml = stream.read()
          #stream.seek(0)
          #LOG('readResponse',0,'Starting... msg: %s' % str(stream.read()))
        except IOError:
          LOG('readResponse, cannot read file: ',0,filename)
          xml = None
        if xml is not None and len(xml)==0:
          xml = None
        return xml
932

933 934 935 936 937 938 939 940 941 942 943 944 945 946
  security.declareProtected(Permissions.ModifyPortalContent, 'getPublicationIdFromTitle')
  def getPublicationIdFromTitle(self, title):
    """
    simply return an id from a title
    """
    return 'pub_' + title

  security.declareProtected(Permissions.ModifyPortalContent, 'getPublicationIdFromTitle')
  def getSubscriptionIdFromTitle(self, title):
    """
    simply return an id from a title
    """
    return 'sub_' + title

947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964
#  security.declarePrivate('notify_sync')
#  def notify_sync(self, event_type, object, infos):
#    """Notification from the event service.
#
#    # XXX very specific to cps
#
#    Called when an object is added/deleted/modified.
#    Update the date of sync
#    """
#    from Products.CPSCore.utils import _isinstance
#    from Products.CPSCore.ProxyBase import ProxyBase
#
#    if event_type in ('sys_modify_object',
#                      'modify_object'):
#      if not(_isinstance(object, ProxyBase)):
#        repotool = getToolByName(self, 'portal_repository')
#        if repotool.isObjectInRepository(object):
#          object_id = object.getId()
965 966


Jean-Paul Smets's avatar
Jean-Paul Smets committed
967
InitializeClass( SynchronizationTool )