testERP5Catalog.py 174 KB
Newer Older
1
# -*- coding: utf-8 -*-
Sebastien Robin's avatar
Sebastien Robin committed
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
##############################################################################
#
# Copyright (c) 2004 Nexedi SARL and Contributors. All Rights Reserved.
#          Sebastien Robin <seb@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################

Jérome Perrin's avatar
Jérome Perrin committed
30 31
import unittest
import sys
32
from unittest import expectedFailure
33
from _mysql_exceptions import ProgrammingError
Sebastien Robin's avatar
Sebastien Robin committed
34 35 36

from Testing import ZopeTestCase
from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
37
from AccessControl import getSecurityManager
38
from AccessControl.SecurityManagement import newSecurityManager
Sebastien Robin's avatar
Sebastien Robin committed
39 40
from zLOG import LOG
from DateTime import DateTime
41
from Products.ERP5Type.tests.utils import LogInterceptor
42 43
from Products.ERP5Type.tests.utils import createZODBPythonScript, todo_erp5, \
                                          getExtraSqlConnectionStringList
44 45 46
from Products.ZSQLCatalog.ZSQLCatalog import HOT_REINDEXING_FINISHED_STATE,\
      HOT_REINDEXING_RECORDING_STATE, HOT_REINDEXING_DOUBLE_INDEXING_STATE
from Products.CMFActivity.Errors import ActivityFlushError
47
from Products.ZSQLCatalog.SQLCatalog import Query, ComplexQuery, SimpleQuery
Sebastien Robin's avatar
Sebastien Robin committed
48

49

50 51 52 53
from OFS.ObjectManager import ObjectManager
from random import randint

class IndexableDocument(ObjectManager):
54 55 56

  # this property is required for dummy providesIMovement
  __allow_access_to_unprotected_subobjects__ = 1
57
  isRADContent = 0
58

59 60 61 62 63 64 65 66
  def getUid(self):
    uid = getattr(self, 'uid', None)
    if uid is None:
      self.uid = uid = randint(1, 100) + 100000
    return uid

  def __getattr__(self, name):
    # Case for all "is..." magic properties (isMovement, ...)
67 68 69
    if name.startswith('is') or \
       name.startswith('provides'):
      return lambda: 0
70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85
    raise AttributeError, name

  def getPath(self):
    return '' # Whatever

  def getRelativeUrl(self):
    return '' # Whatever

class FooDocument(IndexableDocument):
  def getReference(self):
    return 'foo'

class BarDocument(IndexableDocument):
  # Does not define any getReference method.
  pass

86
class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor):
Sebastien Robin's avatar
Sebastien Robin committed
87
  """
88
    Tests for ERP5 Catalog.
Sebastien Robin's avatar
Sebastien Robin committed
89 90 91 92 93
  """

  def getTitle(self):
    return "ERP5Catalog"

Sebastien Robin's avatar
Sebastien Robin committed
94
  def getBusinessTemplateList(self):
95
    return ('erp5_full_text_mroonga_catalog', 'erp5_base',)
Sebastien Robin's avatar
Sebastien Robin committed
96

Sebastien Robin's avatar
Sebastien Robin committed
97
  # Different variables used for this test
98
  username = 'seb'
99 100 101
  new_erp5_sql_connection = 'erp5_sql_connection2'
  new_erp5_deferred_sql_connection = 'erp5_sql_deferred_connection2'
  new_catalog_id = 'erp5_mysql_innodb2'
102

103 104 105 106 107 108
  __cleanups = ()

  def _addCleanup(self, callable):
    self.__cleanups += (callable,)
    return callable

109
  def afterSetUp(self):
110 111 112 113
    uf = self.getPortal().acl_users
    uf._doAddUser(self.username, '', ['Manager'], [])

    self.login(self.username)
114 115
    # make sure there is no message any more
    self.tic()
116 117

  def beforeTearDown(self):
118 119
    # restore default_catalog
    self.portal.portal_catalog.default_sql_catalog_id = 'erp5_mysql_innodb'
120
    self.portal.portal_catalog.hot_reindexing_state = None
121
    # clear Modules
122 123 124 125 126
    for module in [ self.getPersonModule(),
                    self.getOrganisationModule(),
                    self.getCategoryTool().region,
                    self.getCategoryTool().group ]:
      module.manage_delObjects(list(module.objectIds()))
127
      module.reindexObject()
128 129 130 131 132 133 134
    # Remove copied sql_connector and catalog
    if self.new_erp5_sql_connection in self.portal.objectIds():
      self.portal.manage_delObjects([self.new_erp5_sql_connection])
    if self.new_erp5_deferred_sql_connection in self.portal.objectIds():
      self.portal.manage_delObjects([self.new_erp5_deferred_sql_connection])
    if self.new_catalog_id in self.portal.portal_catalog.objectIds():
      self.portal.portal_catalog.manage_delObjects([self.new_catalog_id])
135 136
    for cleanup in self.__cleanups:
      cleanup(self)
137
    self.tic()
Sebastien Robin's avatar
Sebastien Robin committed
138

139
  def getSQLPathList(self,connection_id=None, sql=None):
Sebastien Robin's avatar
Sebastien Robin committed
140 141 142
    """
    Give the full list of path in the catalog
    """
143 144 145
    if connection_id is None:
      sql_connection = self.getSQLConnection()
    else:
146 147 148
      sql_connection = getattr(self.getPortal(), connection_id)
    if sql is None:
      sql = 'select distinct(path) from catalog'
Sebastien Robin's avatar
Sebastien Robin committed
149
    result = sql_connection.manage_test(sql)
150
    path_list = map(lambda x: x['path'], result)
Sebastien Robin's avatar
Sebastien Robin committed
151
    return path_list
152

153 154 155 156
  def getSQLPathListWithRolesAndUsers(self, connection_id):
    sql = 'select distinct(path) from catalog, roles_and_users\
           where catalog.security_uid=roles_and_users.uid'
    return self.getSQLPathList(connection_id, sql)
Sebastien Robin's avatar
Sebastien Robin committed
157

158 159
  def checkRelativeUrlInSQLPathList(self,url_list,connection_id=None):
    path_list = self.getSQLPathList(connection_id=connection_id)
Sebastien Robin's avatar
Sebastien Robin committed
160 161 162
    portal_id = self.getPortalId()
    for url in url_list:
      path = '/' + portal_id + '/' + url
163
      self.assertTrue(path in path_list)
164
      LOG('checkRelativeUrlInSQLPathList found path:',0,path)
Sebastien Robin's avatar
Sebastien Robin committed
165

166 167
  def checkRelativeUrlNotInSQLPathList(self,url_list,connection_id=None):
    path_list = self.getSQLPathList(connection_id=connection_id)
Sebastien Robin's avatar
Sebastien Robin committed
168 169 170
    portal_id = self.getPortalId()
    for url in url_list:
      path = '/' + portal_id + '/' + url
171
      self.assertTrue(path not in  path_list)
172
      LOG('checkRelativeUrlInSQLPathList not found path:',0,path)
Sebastien Robin's avatar
Sebastien Robin committed
173

Jérome Perrin's avatar
Jérome Perrin committed
174 175 176 177 178 179 180 181
  def test_01_HasEverything(self):
    self.assertNotEquals(self.getCategoryTool(), None)
    self.assertNotEquals(self.getSimulationTool(), None)
    self.assertNotEquals(self.getTypeTool(), None)
    self.assertNotEquals(self.getSQLConnection(), None)
    self.assertNotEquals(self.getCatalogTool(), None)

  def test_02_EverythingCatalogued(self):
Sebastien Robin's avatar
Sebastien Robin committed
182
    portal_catalog = self.getCatalogTool()
183
    self.tic()
Sebastien Robin's avatar
Sebastien Robin committed
184
    organisation_module_list = portal_catalog(portal_type='Organisation Module')
185
    self.assertEqual(len(organisation_module_list),1)
Sebastien Robin's avatar
Sebastien Robin committed
186

Jérome Perrin's avatar
Jérome Perrin committed
187
  def test_03_CreateAndDeleteObject(self):
Sebastien Robin's avatar
Sebastien Robin committed
188 189 190
    portal_catalog = self.getCatalogTool()
    person_module = self.getPersonModule()
    person = person_module.newContent(id='1',portal_type='Person')
191
    path_list = [person.getRelativeUrl()]
192
    self.checkRelativeUrlNotInSQLPathList(path_list)
193
    self.tic()
194
    self.checkRelativeUrlInSQLPathList(path_list)
Sebastien Robin's avatar
Sebastien Robin committed
195
    person_module.manage_delObjects('1')
196
    self.tic()
197
    self.checkRelativeUrlNotInSQLPathList(path_list)
Sebastien Robin's avatar
Sebastien Robin committed
198
    # Now we will ask to immediatly reindex
199
    person = person_module.newContent(id='2',
200
                                      portal_type='Person',)
201
    self.tic()
202
    path_list = [person.getRelativeUrl()]
203
    self.checkRelativeUrlInSQLPathList(path_list)
204
    self.tic()
205
    self.checkRelativeUrlInSQLPathList(path_list)
Sebastien Robin's avatar
Sebastien Robin committed
206
    person_module.manage_delObjects('2')
207
    self.tic()
208
    self.checkRelativeUrlNotInSQLPathList(path_list)
Sebastien Robin's avatar
Sebastien Robin committed
209 210
    # Now we will try with the method deleteContent
    person = person_module.newContent(id='3',portal_type='Person')
211
    path_list = [person.getRelativeUrl()]
212
    self.checkRelativeUrlNotInSQLPathList(path_list)
213
    self.tic()
214
    self.checkRelativeUrlInSQLPathList(path_list)
Sebastien Robin's avatar
Sebastien Robin committed
215
    person_module.deleteContent('3')
216
    # Now delete things is made with activities
217
    self.checkRelativeUrlNotInSQLPathList(path_list)
218
    self.tic()
219
    self.checkRelativeUrlNotInSQLPathList(path_list)
Sebastien Robin's avatar
Sebastien Robin committed
220

Jérome Perrin's avatar
Jérome Perrin committed
221
  def test_04_SearchFolderWithDeletedObjects(self):
Sebastien Robin's avatar
Sebastien Robin committed
222 223 224
    person_module = self.getPersonModule()
    # Now we will try the same thing as previous test and look at searchFolder
    folder_object_list = [x.getObject().getId() for x in person_module.searchFolder()]
225
    self.assertEqual([],folder_object_list)
226
    person = person_module.newContent(id='4',portal_type='Person',)
227
    self.tic()
Sebastien Robin's avatar
Sebastien Robin committed
228
    folder_object_list = [x.getObject().getId() for x in person_module.searchFolder()]
229
    self.assertEqual(['4'],folder_object_list)
230
    self.tic()
Sebastien Robin's avatar
Sebastien Robin committed
231
    person_module.manage_delObjects('4')
232
    self.tic()
Sebastien Robin's avatar
Sebastien Robin committed
233
    folder_object_list = [x.getObject().getId() for x in person_module.searchFolder()]
234
    self.assertEqual([],folder_object_list)
Sebastien Robin's avatar
Sebastien Robin committed
235

Jérome Perrin's avatar
Jérome Perrin committed
236
  def test_05_SearchFolderWithImmediateReindexObject(self):
Sebastien Robin's avatar
Sebastien Robin committed
237 238 239 240
    person_module = self.getPersonModule()

    # Now we will try the same thing as previous test and look at searchFolder
    folder_object_list = [x.getObject().getId() for x in person_module.searchFolder()]
241
    self.assertEqual([],folder_object_list)
Sebastien Robin's avatar
Sebastien Robin committed
242 243

    person = person_module.newContent(id='4',portal_type='Person')
244
    self.tic()
Sebastien Robin's avatar
Sebastien Robin committed
245
    folder_object_list = [x.getObject().getId() for x in person_module.searchFolder()]
246
    self.assertEqual(['4'],folder_object_list)
Jérome Perrin's avatar
Jérome Perrin committed
247

Sebastien Robin's avatar
Sebastien Robin committed
248
    person_module.manage_delObjects('4')
249
    self.tic()
Sebastien Robin's avatar
Sebastien Robin committed
250
    folder_object_list = [x.getObject().getId() for x in person_module.searchFolder()]
251
    self.assertEqual([],folder_object_list)
Sebastien Robin's avatar
Sebastien Robin committed
252

Jérome Perrin's avatar
Jérome Perrin committed
253
  def test_06_SearchFolderWithRecursiveImmediateReindexObject(self):
Sebastien Robin's avatar
Sebastien Robin committed
254 255 256 257
    person_module = self.getPersonModule()

    # Now we will try the same thing as previous test and look at searchFolder
    folder_object_list = [x.getObject().getId() for x in person_module.searchFolder()]
258
    self.assertEqual([],folder_object_list)
Sebastien Robin's avatar
Sebastien Robin committed
259 260 261 262

    person = person_module.newContent(id='4',portal_type='Person')
    person_module.recursiveImmediateReindexObject()
    folder_object_list = [x.getObject().getId() for x in person_module.searchFolder()]
263
    self.assertEqual(['4'],folder_object_list)
Jérome Perrin's avatar
Jérome Perrin committed
264

Sebastien Robin's avatar
Sebastien Robin committed
265
    person_module.manage_delObjects('4')
266
    self.tic()
Sebastien Robin's avatar
Sebastien Robin committed
267
    folder_object_list = [x.getObject().getId() for x in person_module.searchFolder()]
268
    self.assertEqual([],folder_object_list)
Sebastien Robin's avatar
Sebastien Robin committed
269

Jérome Perrin's avatar
Jérome Perrin committed
270
  def test_07_ClearCatalogAndTestNewContent(self):
Sebastien Robin's avatar
Sebastien Robin committed
271 272 273 274 275 276
    person_module = self.getPersonModule()

    # Clear catalog
    portal_catalog = self.getCatalogTool()
    portal_catalog.manage_catalogClear()

277
    person = person_module.newContent(id='4',portal_type='Person')
278
    self.tic()
Sebastien Robin's avatar
Sebastien Robin committed
279
    folder_object_list = [x.getObject().getId() for x in person_module.searchFolder()]
280
    self.assertEqual(['4'],folder_object_list)
Sebastien Robin's avatar
Sebastien Robin committed
281

Jérome Perrin's avatar
Jérome Perrin committed
282
  def test_08_ClearCatalogAndTestRecursiveImmediateReindexObject(self):
Sebastien Robin's avatar
Sebastien Robin committed
283 284 285 286 287 288 289 290 291
    person_module = self.getPersonModule()

    # Clear catalog
    portal_catalog = self.getCatalogTool()
    portal_catalog.manage_catalogClear()

    person = person_module.newContent(id='4',portal_type='Person')
    person_module.recursiveImmediateReindexObject()
    folder_object_list = [x.getObject().getId() for x in person_module.searchFolder()]
292
    self.assertEqual(['4'],folder_object_list)
Sebastien Robin's avatar
Sebastien Robin committed
293

Jérome Perrin's avatar
Jérome Perrin committed
294
  def test_09_ClearCatalogAndTestImmediateReindexObject(self):
Sebastien Robin's avatar
Sebastien Robin committed
295 296 297 298 299 300 301
    person_module = self.getPersonModule()

    # Clear catalog
    portal_catalog = self.getCatalogTool()
    portal_catalog.manage_catalogClear()

    person = person_module.newContent(id='4',portal_type='Person')
302
    self.tic()
Sebastien Robin's avatar
Sebastien Robin committed
303
    folder_object_list = [x.getObject().getId() for x in person_module.searchFolder()]
304
    self.assertEqual(['4'],folder_object_list)
Sebastien Robin's avatar
Sebastien Robin committed
305

Jérome Perrin's avatar
Jérome Perrin committed
306
  def test_10_OrderedSearchFolder(self):
Sebastien Robin's avatar
Sebastien Robin committed
307 308 309 310 311 312 313
    person_module = self.getPersonModule()

    # Clear catalog
    portal_catalog = self.getCatalogTool()
    portal_catalog.manage_catalogClear()

    person = person_module.newContent(id='a',portal_type='Person',title='a',description='z')
314
    self.tic()
Sebastien Robin's avatar
Sebastien Robin committed
315
    person = person_module.newContent(id='b',portal_type='Person',title='a',description='y')
316
    self.tic()
Sebastien Robin's avatar
Sebastien Robin committed
317
    person = person_module.newContent(id='c',portal_type='Person',title='a',description='x')
318
    self.tic()
319 320
    folder_object_list = [x.getObject().getId()
              for x in person_module.searchFolder(sort_on=[('id','ascending')])]
321
    self.assertEqual(['a','b','c'],folder_object_list)
322 323 324
    folder_object_list = [x.getObject().getId()
              for x in person_module.searchFolder(
              sort_on=[('title','ascending'), ('description','ascending')])]
325
    self.assertEqual(['c','b','a'],folder_object_list)
326 327 328
    folder_object_list = [x.getObject().getId()
              for x in person_module.searchFolder(
              sort_on=[('title','ascending'),('description','descending')])]
329
    self.assertEqual(['a','b','c'],folder_object_list)
Sebastien Robin's avatar
Sebastien Robin committed
330

Jérome Perrin's avatar
Jérome Perrin committed
331
  def test_11_CastStringAsInt(self):
332 333 334 335 336 337 338
    person_module = self.getPersonModule()

    # Clear catalog
    portal_catalog = self.getCatalogTool()
    portal_catalog.manage_catalogClear()

    person = person_module.newContent(id='a',portal_type='Person',title='1')
339
    self.tic()
340
    person = person_module.newContent(id='b',portal_type='Person',title='2')
341
    self.tic()
342
    person = person_module.newContent(id='c',portal_type='Person',title='12')
343
    self.tic()
344
    folder_object_list = [x.getObject().getTitle() for x in person_module.searchFolder(sort_on=[('title','ascending')])]
345
    self.assertEqual(['1','12','2'],folder_object_list)
346
    folder_object_list = [x.getObject().getTitle() for x in person_module.searchFolder(sort_on=[('title','ascending','int')])]
347
    self.assertEqual(['1','2','12'],folder_object_list)
348

Jérome Perrin's avatar
Jérome Perrin committed
349
  def test_12_TransactionalUidBuffer(self):
350 351
    portal_catalog = self.getCatalogTool()
    catalog = portal_catalog.getSQLCatalog()
352
    self.assertTrue(catalog is not None)
353 354

    # Clear out the uid buffer.
355 356 357 358 359
    #from Products.ZSQLCatalog.SQLCatalog import uid_buffer_dict, get_ident
    #uid_buffer_key = get_ident()
    #if uid_buffer_key in uid_buffer_dict:
    #  del uid_buffer_dict[uid_buffer_key]
    def getUIDBuffer(*args, **kw):
360 361
      with catalog.__class__._reserved_uid_lock:
        return catalog.getUIDBuffer(*args, **kw)
362 363

    getUIDBuffer(force_new_buffer=True)
364 365 366

    # Need to abort a transaction artificially, so commit the current
    # one, first.
367
    self.commit()
368 369

    catalog.newUid()
370
    uid_buffer = getUIDBuffer()
371
    self.assertTrue(len(uid_buffer) > 0)
Sebastien Robin's avatar
Sebastien Robin committed
372

373
    self.abort()
374
    uid_buffer = getUIDBuffer()
375
    self.assertTrue(len(uid_buffer) == 0)
Romain Courteaud's avatar
Romain Courteaud committed
376

Jérome Perrin's avatar
Jérome Perrin committed
377
  def test_13_ERP5Site_reindexAll(self):
Romain Courteaud's avatar
Romain Courteaud committed
378 379 380 381 382 383 384 385 386 387 388 389
    # Flush message queue
    self.tic()
    # Create some objects
    portal = self.getPortal()
    portal_category = self.getCategoryTool()
    base_category = portal_category.newContent(portal_type='Base Category',
                                               title="GreatTitle1")
    module = portal.getDefaultModule('Organisation')
    organisation = module.newContent(portal_type='Organisation',
                                     title="GreatTitle2")
    # Flush message queue
    self.tic()
390
    original_path_list = self.getSQLPathList()
Romain Courteaud's avatar
Romain Courteaud committed
391 392 393
    # Clear catalog
    portal_catalog = self.getCatalogTool()
    portal_catalog.manage_catalogClear()
394
    sql_connection = self.getSQLConnection()
Romain Courteaud's avatar
Romain Courteaud committed
395 396 397
    sql = 'select count(*) from catalog where portal_type!=NULL'
    result = sql_connection.manage_test(sql)
    message_count = result[0]['COUNT(*)']
398
    self.assertEqual(0, message_count)
Romain Courteaud's avatar
Romain Courteaud committed
399
    # Commit
400
    self.commit()
Romain Courteaud's avatar
Romain Courteaud committed
401 402 403
    # Reindex all
    portal.ERP5Site_reindexAll()
    self.tic()
404
    self.commit()
Romain Courteaud's avatar
Romain Courteaud committed
405 406 407 408
    # Check catalog
    sql = 'select count(*) from message'
    result = sql_connection.manage_test(sql)
    message_count = result[0]['COUNT(*)']
409
    self.assertEqual(0, message_count)
410 411
    # Check if all objects are catalogued as before
    new_path_list = self.getSQLPathList()
412
    self.assertTrue(set(original_path_list).issubset(new_path_list))
413

Jérome Perrin's avatar
Jérome Perrin committed
414 415 416
  def test_14_ReindexWithBrokenCategory(self):
    """Reindexing an object with 1 broken category must not affect other valid
    categories"""
417 418 419
    # Flush message queue
    self.tic()
    # Create some objects
Jérome Perrin's avatar
Jérome Perrin committed
420
    portal = self.portal
421 422 423 424 425 426 427 428
    portal_category = self.getCategoryTool()
    group_nexedi_category = portal_category.group\
                                .newContent( id = 'nexedi', )
    region_europe_category = portal_category.region\
                                .newContent( id = 'europe', )
    module = portal.getDefaultModule('Organisation')
    organisation = module.newContent(portal_type='Organisation',)
    organisation.setGroup('nexedi')
429
    self.assertEqual(organisation.getGroupValue(), group_nexedi_category)
430
    organisation.setRegion('europe')
431
    self.assertEqual(organisation.getRegionValue(), region_europe_category)
432
    organisation.setRole('not_exists')
433
    self.assertEqual(organisation.getRoleValue(), None)
434 435 436 437 438
    # Flush message queue
    self.tic()
    # Clear catalog
    portal_catalog = self.getCatalogTool()
    portal_catalog.manage_catalogClear()
439
    sql_connection = self.getSQLConnection()
440

441 442 443 444 445
    sql = 'SELECT COUNT(*) FROM category '\
        'WHERE uid=%s and category_strict_membership = 1' %\
        organisation.getUid()
    result = sql_connection.manage_test(sql)
    message_count = result[0]['COUNT(*)']
446
    self.assertEqual(0, message_count)
447 448 449
    # Commit
    self.tic()
    # Check catalog
450 451 452
    organisation.reindexObject()
    # Commit
    self.tic()
453 454 455
    sql = 'select count(*) from message'
    result = sql_connection.manage_test(sql)
    message_count = result[0]['COUNT(*)']
456
    self.assertEqual(0, message_count)
457 458 459 460 461 462 463 464 465 466 467
    # Check region and group categories are catalogued
    for base_cat, theorical_count in {
                                      'region':1,
                                      'group':1,
                                      'role':0}.items() :
      sql = """SELECT COUNT(*) FROM category
            WHERE category.uid=%s and category.category_strict_membership = 1
            AND category.base_category_uid = %s""" % (organisation.getUid(),
                    portal_category[base_cat].getUid())
      result = sql_connection.manage_test(sql)
      cataloged_obj_count = result[0]['COUNT(*)']
468
      self.assertEqual(theorical_count, cataloged_obj_count,
469 470
            'category %s is not cataloged correctly' % base_cat)

Jérome Perrin's avatar
Jérome Perrin committed
471
  def test_15_getObject(self,):
472
    # portal_catalog.getObject raises a ValueError if UID parameter is a string
473
    portal_catalog = self.getCatalogTool()
474
    self.assertRaises(ValueError, portal_catalog.getObject, "StringUID")
Jérome Perrin's avatar
Jérome Perrin committed
475

476 477
    obj = self._makeOrganisation()
    # otherwise it returns the object
478
    self.assertEqual(obj, portal_catalog.getObject(obj.getUid()).getObject())
479 480
    # but raises KeyError if object is not in catalog
    self.assertRaises(KeyError, portal_catalog.getObject, sys.maxint)
Jérome Perrin's avatar
Jérome Perrin committed
481

482
  def test_getRecordForUid(self):
483 484
    portal_catalog = self.getCatalogTool()
    obj = self._makeOrganisation()
485
    self.assertEqual(obj,
486
        portal_catalog.getSQLCatalog().getRecordForUid(obj.getUid()).getObject())
Jérome Perrin's avatar
Jérome Perrin committed
487

488 489 490
  def test_path(self):
    portal_catalog = self.getCatalogTool()
    obj = self._makeOrganisation()
491
    self.assertEqual(obj.getPath(), portal_catalog.getpath(obj.getUid()))
492 493
    self.assertRaises(KeyError, portal_catalog.getpath, sys.maxint)

Jérome Perrin's avatar
Jérome Perrin committed
494
  def test_16_newUid(self):
495 496 497 498 499 500
    # newUid should not assign the same uid
    portal_catalog = self.getCatalogTool()
    from Products.ZSQLCatalog.SQLCatalog import UID_BUFFER_SIZE
    uid_dict = {}
    for i in xrange(UID_BUFFER_SIZE * 3):
      uid = portal_catalog.newUid()
501 502
      self.assertTrue(isinstance(uid, long))
      self.assertFalse(uid in uid_dict)
503
      uid_dict[uid] = None
Jérome Perrin's avatar
Jérome Perrin committed
504 505

  def test_17_CreationDate_ModificationDate(self):
506 507
    portal_catalog = self.getCatalogTool()
    portal = self.getPortal()
508
    sql_connection = self.getSQLConnection()
Jérome Perrin's avatar
Jérome Perrin committed
509

510 511
    module = portal.getDefaultModule('Organisation')
    organisation = module.newContent(portal_type='Organisation',)
512 513
    creation_date = organisation.getCreationDate().toZone('UTC').ISO()
    modification_date = organisation.getModificationDate().toZone('UTC').ISO()
514
    self.commit()
515
    now = DateTime()
516
    self.tic()
517
    sql = """select creation_date, modification_date
518 519
             from catalog where uid = %s""" % organisation.getUid()
    result = sql_connection.manage_test(sql)
520
    self.assertEqual(creation_date,
521
                      result[0]['creation_date'].ISO())
522
    self.assertEqual(modification_date,
523
                      result[0]['modification_date'].ISO())
524
    self.assertEqual(creation_date,
525
                      result[0]['modification_date'].ISO())
Jérome Perrin's avatar
Jérome Perrin committed
526

527 528 529 530
    import time; time.sleep(3)
    organisation.edit(title='edited')
    self.tic()
    result = sql_connection.manage_test(sql)
531
    self.assertEqual(creation_date, result[0]['creation_date'].ISO())
532 533 534
    modification_date = organisation.getModificationDate().toZone('UTC').ISO()
    self.assertNotEquals(modification_date,
                         organisation.getCreationDate())
535 536 537
    # This test was first written with a now variable initialized with
    # DateTime(). But since we are never sure of the time required in
    # order to execute some line of code
538
    self.assertEqual(modification_date,
539
                      result[0]['modification_date'].ISO())
540 541
    self.assertTrue(organisation.getModificationDate()>now)
    self.assertTrue(result[0]['creation_date']<result[0]['modification_date'])
Jérome Perrin's avatar
Jérome Perrin committed
542

543 544
  # TODO: this test is disabled (and maybe not complete), because this feature
  # is not implemented
Jérome Perrin's avatar
Jérome Perrin committed
545 546
  @todo_erp5
  def test_18_buildSQLQueryAnotherTable(self):
547 548 549 550 551
    """Tests that buildSQLQuery works with another query_table than 'catalog'"""
    portal = self.getPortal()
    portal_catalog = self.getCatalogTool()
    # clear catalog
    portal_catalog.manage_catalogClear()
552
    self.commit()
553

554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570
    # create some content to use destination_section_title as related key
    # FIXME: create the related key here ?
    module = portal.getDefaultModule('Organisation')
    source_organisation = module.newContent( portal_type='Organisation',
                                        title = 'source_organisation')
    destination_organisation = module.newContent( portal_type='Organisation',
                                        title = 'destination_organisation')
    source_organisation.setDestinationSectionValue(destination_organisation)
    source_organisation.recursiveReindexObject()
    destination_organisation.recursiveReindexObject()
    self.tic()

    # buildSQLQuery can use arbitrary table name.
    query_table = "node"
    sql_squeleton = """
    SELECT %(query_table)s.uid,
           %(query_table)s.id
571
    FROM
572
      <dtml-in prefix="table" expr="from_table_list">
573 574
        <dtml-var table_item> AS <dtml-var table_key>
        <dtml-unless sequence-end>, </dtml-unless>
575 576
      </dtml-in>
    <dtml-if where_expression>
577
    WHERE
578 579 580 581 582 583
      <dtml-var where_expression>
    </dtml-if>
    <dtml-if order_by_expression>
      ORDER BY <dtml-var order_by_expression>
    </dtml-if>
    """ % {'query_table' : query_table}
584

585 586 587 588 589 590 591 592 593 594
    portal_skins_custom = portal.portal_skins.custom
    portal_skins_custom.manage_addProduct['ZSQLMethods'].manage_addZSQLMethod(
          id = 'testMethod',
          title = '',
          connection_id = 'erp5_sql_connection',
          arguments = "\n".join([ 'from_table_list',
                                  'where_expression',
                                  'order_by_expression' ]),
          template = sql_squeleton)
    testMethod = portal_skins_custom['testMethod']
595

596 597 598 599 600
    default_parametrs = {}
    default_parametrs['portal_type'] = 'Organisation'
    default_parametrs['from_table_list'] = {}
    default_parametrs['where_expression'] = ""
    default_parametrs['order_by_expression'] = None
601

602 603 604 605 606
    # check that we retrieve our 2 organisations by default.
    kw = default_parametrs.copy()
    kw.update( portal_catalog.buildSQLQuery(
                  query_table = query_table,
                  **kw) )
607 608
    LOG('kw', 0, kw)
    LOG('SQL', 0, testMethod(src__=1, **kw))
609
    self.assertEqual(len(testMethod(**kw)), 2)
610

611 612 613 614 615 616
    # check we can make a simple filter on title.
    kw = default_parametrs.copy()
    kw.update( portal_catalog.buildSQLQuery(
                  query_table = query_table,
                  title = 'source_organisation',
                  **kw) )
617 618
    LOG('kw', 1, kw)
    LOG('SQL', 1, testMethod(src__=1, **kw))
619
    self.assertEqual( len(testMethod(**kw)), 1,
620
                       testMethod(src__=1, **kw) )
621
    self.assertEqual( testMethod(**kw)[0]['uid'],
622 623
                        source_organisation.getUid(),
                        testMethod(src__=1, **kw) )
624

625 626 627 628 629 630
    # check sort
    kw = default_parametrs.copy()
    kw.update(portal_catalog.buildSQLQuery(
                  query_table = query_table,
                  sort_on = [('id', 'ascending')],
                  **kw))
631 632
    LOG('kw', 2, kw)
    LOG('SQL', 2, testMethod(src__=1, **kw))
633
    brains = testMethod(**kw)
634
    self.assertEqual( len(brains), 2,
635
                       testMethod(src__=1, **kw))
636
    self.assertFalse( brains[0]['id'] > brains[1]['id'],
637
                 testMethod(src__=1, **kw) )
638

639 640 641 642 643 644
    # check related keys works
    kw = default_parametrs.copy()
    kw.update(portal_catalog.buildSQLQuery(
                  query_table = query_table,
                  destination_section_title = 'organisation_destination'),
                  **kw)
645 646
    LOG('kw', 3, kw)
    LOG('SQL', 3, testMethod(src__=1, **kw))
647
    brains = testMethod(**kw)
648 649
    self.assertEqual( len(brains), 1, testMethod(src__=1, **kw) )
    self.assertEqual( brains[0]['uid'],
650 651
                       source_organisation.getUid(),
                       testMethod(src__=1, **kw) )
Sebastien Robin's avatar
Sebastien Robin committed
652

Jérome Perrin's avatar
Jérome Perrin committed
653
  def test_19_SearchFolderWithNonAsciiCharacter(self):
Sebastien Robin's avatar
Sebastien Robin committed
654 655
    person_module = self.getPersonModule()

656
    title = 'Sébastien'
Sebastien Robin's avatar
Sebastien Robin committed
657
    person = person_module.newContent(id='5',portal_type='Person',title=title)
658
    self.tic()
Sebastien Robin's avatar
Sebastien Robin committed
659
    folder_object_list = [x.getObject().getId() for x in person_module.searchFolder()]
660
    self.assertEqual(['5'],folder_object_list)
661
    folder_object_list = [x.getObject().getId() for x in
Sebastien Robin's avatar
Sebastien Robin committed
662
                              person_module.searchFolder(title=title)]
663
    self.assertEqual(['5'],folder_object_list)
664

665 666 667 668 669
  def test_Collation(self):
    person_module = self.getPersonModule()

    title = 'Sébastien'
    person = person_module.newContent(id='5',portal_type='Person', title=title)
670
    self.tic()
671 672
    folder_object_list = [x.getObject().getId() for x in
                              person_module.searchFolder(title=title)]
673
    self.assertEqual(['5'],folder_object_list)
674 675 676 677

    # Searching for Sebastien should also find Sébastien
    folder_object_list = [x.getObject().getId() for x in
                              person_module.searchFolder(title='Sebastien')]
678
    self.assertEqual(['5'],folder_object_list)
679 680 681 682

    # Same for sebastien, as catalog searches are case insensitive
    folder_object_list = [x.getObject().getId() for x in
                              person_module.searchFolder(title='sebastien')]
683
    self.assertEqual(['5'],folder_object_list)
684

685

Jérome Perrin's avatar
Jérome Perrin committed
686
  def test_20_SearchFolderWithDynamicRelatedKey(self):
687 688 689 690 691 692 693 694 695 696 697 698 699 700
    # Create some objects
    portal = self.getPortal()
    portal_category = self.getCategoryTool()
    portal_category.group.manage_delObjects([x for x in
        portal_category.group.objectIds()])
    group_nexedi_category = portal_category.group\
                                .newContent( id = 'nexedi', title='Nexedi',
                                             description='a')
    group_nexedi_category2 = portal_category.group\
                                .newContent( id = 'storever', title='Storever',
                                             description='b')
    module = portal.getDefaultModule('Organisation')
    organisation = module.newContent(portal_type='Organisation',)
    organisation.setGroup('nexedi')
701
    self.assertEqual(organisation.getGroupValue(), group_nexedi_category)
702 703
    organisation2 = module.newContent(portal_type='Organisation',)
    organisation2.setGroup('storever')
704
    organisation2.setTitle('Organisation 2')
705
    self.assertEqual(organisation2.getGroupValue(), group_nexedi_category2)
706 707 708 709
    # Flush message queue
    self.tic()

    # Try to get the organisation with the group title Nexedi
710
    organisation_list = [x.getObject() for x in
711
                         module.searchFolder(group_title='Nexedi')]
712
    self.assertEqual(organisation_list,[organisation])
713
    organisation_list = [x.getObject() for x in
714
                         module.searchFolder(default_group_title='Nexedi')]
715
    self.assertEqual(organisation_list,[organisation])
716 717
    # Try to get the organisation with the group id
    organisation_list = [x.getObject() for x in
718
                         module.searchFolder(group_id='storever')]
719
    self.assertEqual(organisation_list,[organisation2])
720
    # Try to get the organisation with the group description 'a'
721
    organisation_list = [x.getObject() for x in
722
                         module.searchFolder(group_description='a')]
723
    self.assertEqual(organisation_list,[organisation])
724
    # Try to get the organisation with the group description 'c'
725
    organisation_list = [x.getObject() for x in
726
                         module.searchFolder(group_description='c')]
727
    self.assertEqual(organisation_list,[])
728
    # Try to get the organisation with the default group description 'c'
729
    organisation_list = [x.getObject() for x in
730
                         module.searchFolder(default_group_description='c')]
731
    self.assertEqual(organisation_list,[])
732 733
    # Try to get the organisation with group relative_url
    group_relative_url = group_nexedi_category.getRelativeUrl()
734
    organisation_list = [x.getObject() for x in
735
                 module.searchFolder(group_relative_url=group_relative_url)]
736
    self.assertEqual(organisation_list, [organisation])
737
    # Try to get the organisation with group uid
738
    organisation_list = [x.getObject() for x in
739
                 module.searchFolder(group_uid=group_nexedi_category.getUid())]
740
    self.assertEqual(organisation_list, [organisation])
741
    # Try to get the organisation with the group id AND title of the document
742
    organisation_list = [x.getObject() for x in
743 744
                         module.searchFolder(group_id='storever',
                                             title='Organisation 2')]
745
    self.assertEqual(organisation_list,[organisation2])
746

747

Jérome Perrin's avatar
Jérome Perrin committed
748
  def test_21_SearchFolderWithDynamicStrictRelatedKey(self):
749 750 751 752 753 754 755 756 757 758 759 760 761 762
    # Create some objects
    portal = self.getPortal()
    portal_category = self.getCategoryTool()
    portal_category.group.manage_delObjects([x for x in
        portal_category.group.objectIds()])
    group_nexedi_category = portal_category.group\
                                .newContent( id = 'nexedi', title='Nexedi',
                                             description='a')
    sub_group_nexedi = group_nexedi_category\
                                .newContent( id = 'erp5', title='ERP5',
                                             description='b')
    module = portal.getDefaultModule('Organisation')
    organisation = module.newContent(portal_type='Organisation',)
    organisation.setGroup('nexedi/erp5')
763
    self.assertEqual(organisation.getGroupValue(), sub_group_nexedi)
764 765 766 767
    # Flush message queue
    self.tic()

    # Try to get the organisation with the group title Nexedi
768
    organisation_list = [x.getObject() for x in
769
                         module.searchFolder(strict_group_title='Nexedi')]
770
    self.assertEqual(organisation_list,[])
771
    # Try to get the organisation with the group title ERP5
772
    organisation_list = [x.getObject() for x in
773
                         module.searchFolder(strict_group_title='ERP5')]
774
    self.assertEqual(organisation_list,[organisation])
775
    # Try to get the organisation with the group description a
776
    organisation_list = [x.getObject() for x in
777
                         module.searchFolder(strict_group_description='a')]
778
    self.assertEqual(organisation_list,[])
779
    # Try to get the organisation with the group description b
780
    organisation_list = [x.getObject() for x in
781
                         module.searchFolder(strict_group_description='b')]
782
    self.assertEqual(organisation_list,[organisation])
783

Jérome Perrin's avatar
Jérome Perrin committed
784
  def test_22_SearchingWithUnicode(self):
785 786 787 788
    person_module = self.getPersonModule()
    person_module.newContent(portal_type='Person', title='A Person')
    self.tic()
    self.assertNotEquals([], self.getCatalogTool().searchResults(
789
                                     portal_type='Person', title=u'A Person'))
Jérome Perrin's avatar
Jérome Perrin committed
790 791

  def test_23_DeleteObjectRaiseErrorWhenQueryFail(self):
792 793 794 795
    portal_catalog = self.getCatalogTool()
    person_module = self.getPersonModule()
    # Now we will ask to immediatly reindex
    person = person_module.newContent(id='2',
796
                                      portal_type='Person',)
797
    self.tic()
798
    path_list = [person.getRelativeUrl()]
799
    self.checkRelativeUrlInSQLPathList(path_list)
800 801 802 803 804
    # We will delete the connector
    # in order to make sure it will not work any more
    portal = self.getPortal()
    portal.manage_delObjects('erp5_sql_connection')
    # Then it must be impossible to delete an object
805 806 807
    uid = person.getUid()
    unindex = portal_catalog.unindexObject
    self.assertRaises(AttributeError,unindex,person,uid=person.getUid())
808
    self.abort()
Sebastien Robin's avatar
Sebastien Robin committed
809

Jérome Perrin's avatar
Jérome Perrin committed
810
  def test_24_SortOn(self):
811
    self.assertTrue(
812
            self.getCatalogTool().buildSQLQuery(
813
            sort_on=(('catalog.title', 'ascending'),))['order_by_expression'] in \
814
            ('catalog.title', '`catalog`.`title` ASC', 'catalog.title ASC'))
815

Jérome Perrin's avatar
Jérome Perrin committed
816
  def test_25_SortOnDescending(self):
817
    self.assertTrue(
818
            self.getCatalogTool().buildSQLQuery(
819 820
            sort_on=(('catalog.title', 'descending'),))['order_by_expression'] in \
            ('catalog.title DESC', '`catalog`.`title` DESC'))
Jérome Perrin's avatar
Jérome Perrin committed
821 822

  def test_26_SortOnUnknownKeys(self):
823
    self.assertEqual('',
824
          self.getCatalogTool().buildSQLQuery(select_list=('uid', 'path'),
825
          sort_on=(('ignored', 'ascending'),))['order_by_expression'])
Jérome Perrin's avatar
Jérome Perrin committed
826 827

  def test_27_SortOnAmbigousKeys(self):
828 829
    # if the sort key is found on the catalog table, it will use that catalog
    # table.
830
    self.assertTrue(
831
          self.getCatalogTool().buildSQLQuery(
832 833
          sort_on=(('title', 'ascending'),))['order_by_expression'] in \
          ('catalog.title', '`catalog`.`title` ASC'))
Jérome Perrin's avatar
Jérome Perrin committed
834

835 836
    # if not found on catalog, it won't do any filtering
    # in the above, start_date exists both in delivery and movement table.
837 838 839
    self.assertRaises(ValueError, self.getCatalogTool().buildSQLQuery,
          sort_on=(('start_date', 'ascending'),))

840
    # of course, in that case, it's possible to prefix with table name
841
    self.assertTrue(
842 843
          self.getCatalogTool().buildSQLQuery(
          sort_on=(('delivery.start_date', 'ascending'),
844 845
                    ))['order_by_expression'] in \
          ('delivery.start_date', 'delivery.start_date ASC'))
Jérome Perrin's avatar
Jérome Perrin committed
846 847

  def test_28_SortOnMultipleKeys(self):
848
    self.assertTrue(
849 850 851
              self.getCatalogTool().buildSQLQuery(
              sort_on=(('catalog.title', 'ascending'),
                       ('catalog.id', 'asc')))
852
                       ['order_by_expression'].replace(' ', '') in \
853
              ('catalog.title,catalog.id', '`catalog`.`title`ASC,`catalog`.`id`ASC', 'catalog.titleASC,catalog.idASC'))
854

Jérome Perrin's avatar
Jérome Perrin committed
855
  def test_29_SortOnRelatedKey(self):
856 857
    """Sort-on parameter and related key. (Assumes that region_title is a \
    valid related key)"""
858
    self.assertTrue(
859 860
              self.getCatalogTool().buildSQLQuery(region_reference='foo',
              sort_on=(('region_reference', 'ascending'),))['order_by_expression'].endswith('.`reference` ASC'))
861
    self.assertTrue(
862 863
              self.getCatalogTool().buildSQLQuery(region_reference='foo',
              sort_on=(('region_reference', 'descending'),))['order_by_expression'].endswith('.`reference` DESC'))
864 865
    self.assertTrue(
              self.getCatalogTool().buildSQLQuery(
866
              sort_on=(('region_reference', 'ascending'),))['order_by_expression'].endswith('.`reference` ASC'),
867 868 869
              'sort_on parameter must be taken into account even if related key '
              'is not a parameter of the current query')
    self.assertTrue(
870
              self.getCatalogTool().buildSQLQuery(
871
              sort_on=(('region_reference', 'descending'),))['order_by_expression'].endswith('.`reference` DESC'),
872 873 874
              'sort_on parameter must be taken into account even if related key '
              'is not a parameter of the current query')

875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916
  def test_sortOnRelatedKeyWithUnsetRelation(self):
    """
      Check that sorting on a related key does not filter out objects for
      which the relation is not set.
    """
    portal = self.getPortalObject()
    organisation = portal.organisation_module.\
                   newContent(portal_type="Organisation")
    person_module = portal.person_module
    person_1 = person_module.newContent(portal_type="Person")
    person_2 = person_module.newContent(portal_type="Person",
                 career_subordination_value=organisation)
    self.tic()
    self.assertEqual(len(person_module.searchFolder()),
                     len(person_module.searchFolder(sort_on=[('subordination_title', 'ascending')])))

  def test_sortOnRelatedKeyWithoutLeftJoinSupport(self):
    """Check that sorting on a related key that does not support left join.
    """
    portal = self.getPortalObject()
    org_a = self._makeOrganisation(title='abc', default_address_city='abc')

    # now turn the z_related_grand_parent into an old-style method, without
    # RELATED_QUERY_SEPARATOR
    method = portal.portal_catalog.getSQLCatalog().z_related_grand_parent
    old_src = method.src

    @self._addCleanup
    def cleanGrandParentMethod(self):
      method.manage_edit(method.title, method.connection_id,
                         method.arguments_src, old_src)

    src = old_src.replace('<dtml-var RELATED_QUERY_SEPARATOR>', ' AND ')
    method.manage_edit(method.title, method.connection_id, method.arguments_src,
                       src)

    query = dict(grand_parent_portal_type="Organisation Module",
                 parent_reference=org_a.getReference())
    self.tic()
    self.assertNotEquals(0, len(portal.portal_catalog(
      portal_type='Address',
      sort_on=[('grand_parent_portal_type', 'ascending')])))
917

918 919 920 921 922 923 924 925 926 927 928 929 930 931
  def _makeOrganisation(self, **kw):
    """Creates an Organisation in it's default module and reindex it.
    By default, it creates a group/nexedi category, and make the organisation a
    member of this category.
    """
    group_cat = self.getCategoryTool().group
    if not hasattr(group_cat, 'nexedi'):
      group_cat.newContent(id='nexedi', title='Nexedi Group',)
    module = self.getPortal().getDefaultModule('Organisation')
    organisation = module.newContent(portal_type='Organisation')
    kw.setdefault('group', 'group/nexedi')
    organisation.edit(**kw)
    self.tic()
    return organisation
Jérome Perrin's avatar
Jérome Perrin committed
932 933

  def test_30_SimpleQueryDict(self):
934 935 936 937
    """use a dict as a keyword parameter.
    """
    organisation_title = 'Nexedi Organisation'
    organisation = self._makeOrganisation(title=organisation_title)
938
    self.assertEqual([organisation.getPath()],
939 940 941
        [x.path for x in self.getCatalogTool()(
                title={'query': organisation_title})])

Jérome Perrin's avatar
Jérome Perrin committed
942
  def test_31_RelatedKeySimpleQueryDict(self):
943 944 945
    """use a dict as a keyword parameter, but using a related key
    """
    organisation = self._makeOrganisation()
946
    self.assertEqual([organisation.getPath()],
947 948 949 950 951 952
        [x.path for x in self.getCatalogTool()(
                group_title={'query': 'Nexedi Group'},
                # have to filter on portal type, because the group category is
                # also member of itself
                portal_type=organisation.getPortalTypeName())])

Jérome Perrin's avatar
Jérome Perrin committed
953
  def test_32_SimpleQueryDictWithOrOperator(self):
954 955 956 957
    """use a dict as a keyword parameter, with OR operator.
    """
    organisation_title = 'Nexedi Organisation'
    organisation = self._makeOrganisation(title=organisation_title)
958

959
    self.assertEqual([organisation.getPath()],
960
        [x.path for x in self.getCatalogTool()(
961 962
                **{'catalog.title':{'query': (organisation_title, 'something else'),
                                    'operator': 'or'}})])
963

Jérome Perrin's avatar
Jérome Perrin committed
964
  def test_33_SimpleQueryDictWithAndOperator(self):
965 966 967 968
    """use a dict as a keyword parameter, with AND operator.
    """
    organisation_title = 'Nexedi Organisation'
    organisation = self._makeOrganisation(title=organisation_title)
969

970
    self.assertEqual([organisation.getPath()],
971 972 973 974 975
        [x.path for x in self.getCatalogTool()(
                # this is useless, we must find a better use case
                title={'query': (organisation_title, organisation_title),
                       'operator': 'and'})])

Jérome Perrin's avatar
Jérome Perrin committed
976
  def test_34_SimpleQueryDictWithMaxRangeParameter(self):
977 978 979 980 981
    """use a dict as a keyword parameter, with max range parameter ( < )
    """
    org_a = self._makeOrganisation(title='A')
    org_b = self._makeOrganisation(title='B')
    org_c = self._makeOrganisation(title='C')
Jérome Perrin's avatar
Jérome Perrin committed
982

983
    self.assertEqual([org_a.getPath()],
984 985 986
        [x.path for x in self.getCatalogTool()(
                portal_type='Organisation',
                title={'query': 'B', 'range': 'max'})])
Jérome Perrin's avatar
Jérome Perrin committed
987 988

  def test_35_SimpleQueryDictWithMinRangeParameter(self):
989 990 991 992 993
    """use a dict as a keyword parameter, with min range parameter ( >= )
    """
    org_a = self._makeOrganisation(title='A')
    org_b = self._makeOrganisation(title='B')
    org_c = self._makeOrganisation(title='C')
Jérome Perrin's avatar
Jérome Perrin committed
994

995 996 997 998 999 1000
    self.failIfDifferentSet([org_b.getPath(), org_c.getPath()],
        [x.path for x in self.getCatalogTool()(
                portal_type='Organisation',
                title={'query': 'B', 'range': 'min'})])


Jérome Perrin's avatar
Jérome Perrin committed
1001
  def test_36_SimpleQueryDictWithNgtRangeParameter(self):
1002 1003 1004 1005 1006
    """use a dict as a keyword parameter, with ngt range parameter ( <= )
    """
    org_a = self._makeOrganisation(title='A')
    org_b = self._makeOrganisation(title='B')
    org_c = self._makeOrganisation(title='C')
Jérome Perrin's avatar
Jérome Perrin committed
1007

1008 1009 1010 1011 1012
    self.failIfDifferentSet([org_a.getPath(), org_b.getPath()],
        [x.path for x in self.getCatalogTool()(
                portal_type='Organisation',
                title={'query': 'B', 'range': 'ngt'})])

Jérome Perrin's avatar
Jérome Perrin committed
1013
  def test_37_SimpleQueryDictWithMinMaxRangeParameter(self):
1014 1015 1016 1017 1018
    """use a dict as a keyword parameter, with minmax range parameter ( >=  < )
    """
    org_a = self._makeOrganisation(title='A')
    org_b = self._makeOrganisation(title='B')
    org_c = self._makeOrganisation(title='C')
Jérome Perrin's avatar
Jérome Perrin committed
1019

1020
    self.assertEqual([org_b.getPath()],
1021 1022 1023
        [x.path for x in self.getCatalogTool()(
                portal_type='Organisation',
                title={'query': ('B', 'C'), 'range': 'minmax'})])
Jérome Perrin's avatar
Jérome Perrin committed
1024 1025

  def test_38_SimpleQueryDictWithMinNgtRangeParameter(self):
1026 1027 1028 1029 1030
    """use a dict as a keyword parameter, with minngt range parameter ( >= <= )
    """
    org_a = self._makeOrganisation(title='A')
    org_b = self._makeOrganisation(title='B')
    org_c = self._makeOrganisation(title='C')
Jérome Perrin's avatar
Jérome Perrin committed
1031

1032 1033 1034 1035 1036
    self.failIfDifferentSet([org_b.getPath(), org_c.getPath()],
        [x.path for x in self.getCatalogTool()(
                portal_type='Organisation',
                title={'query': ('B', 'C'), 'range': 'minngt'})])

1037 1038 1039 1040 1041 1042
  def test_QueryDictFromRequest(self):
    """use a dict from REQUEST as a keyword parameter.
    """
    org_a = self._makeOrganisation(title='A')
    org_b = self._makeOrganisation(title='B')
    org_c = self._makeOrganisation(title='C')
Jérome Perrin's avatar
Jérome Perrin committed
1043

1044 1045 1046 1047 1048 1049
    query_dict = {'query': ('B', 'C'), 'range': 'minngt'}
    from ZPublisher.HTTPRequest import record
    query_record = record()
    for k, v in query_dict.items():
      setattr(query_record, k, v)

1050 1051 1052
    self.assertEqual({org_b.getPath(), org_c.getPath()},
      {x.path for x in self.getCatalogTool()(portal_type='Organisation',
                                             title=query_record)})
1053

Jérome Perrin's avatar
Jérome Perrin committed
1054
  def test_39_DeferredConnection(self):
1055 1056 1057 1058 1059
    """ERP5Catalog uses a deferred connection for full text indexing.
    """
    erp5_sql_deferred_connection = getattr(self.getPortal(),
                                    'erp5_sql_deferred_connection',
                                    None)
1060 1061
    self.assertTrue(erp5_sql_deferred_connection is not None)
    self.assertEqual('Z MySQL Deferred Database Connection',
1062
                      erp5_sql_deferred_connection.meta_type)
1063 1064
    for method in ['z0_uncatalog_fulltext',
                   'z_catalog_fulltext_list']:
1065
      self.assertEqual('erp5_sql_deferred_connection',
1066 1067 1068
                getattr(self.getCatalogTool().getSQLCatalog(),
                              method).connection_id)

Jérome Perrin's avatar
Jérome Perrin committed
1069
  def test_40_DeleteObject(self):
1070 1071 1072 1073 1074 1075 1076
    """Simple test to exercise object deletion
    """
    folder = self.getOrganisationModule()
    ob = folder.newContent()
    self.tic()
    folder.manage_delObjects([ob.getId()])
    self.tic()
1077
    self.assertEqual(0, len(folder.searchFolder()))
1078

Jérome Perrin's avatar
Jérome Perrin committed
1079
  def test_41_ProxyRolesInRestrictedPython(self):
1080 1081 1082 1083 1084 1085 1086 1087
    """test that proxy roles apply to catalog queries within python scripts
    """
    perm = 'View'

    uf = self.getPortal().acl_users
    uf._doAddUser('alice', '', ['Member', 'Manager', 'Assignor'], [])
    uf._doAddUser('bob', '', ['Member'], [])
    # create restricted object
1088
    self.login('alice')
1089 1090 1091 1092 1093 1094 1095 1096 1097
    folder = self.getOrganisationModule()
    ob = folder.newContent()
    # make sure permissions are correctly set
    folder.manage_permission('Access contents information', ['Member'], 1)
    folder.manage_permission(perm, ['Member'], 1)
    ob.manage_permission('Access contents information', ['Member'], 1)
    ob.manage_permission(perm, ['Manager'], 0)
    self.tic()
    # check access
1098 1099
    self.assertEqual(1, getSecurityManager().checkPermission(perm, folder))
    self.assertEqual(1, getSecurityManager().checkPermission(perm, ob))
1100
    self.login('bob')
1101 1102
    self.assertEqual(1, getSecurityManager().checkPermission(perm, folder))
    self.assertEqual(None, getSecurityManager().checkPermission(perm, ob))
1103
    # add a script that calls a catalog method
1104
    self.login('alice')
1105 1106 1107 1108
    script = createZODBPythonScript(self.getPortal().portal_skins.custom,
        'catalog_test_script', '', "return len(context.searchFolder())")

    # test without proxy role
1109
    self.assertEqual(1, folder.catalog_test_script())
1110
    self.login('bob')
1111
    self.assertEqual(0, folder.catalog_test_script())
1112 1113

    # test with proxy role and correct role
1114
    self.login('alice')
1115
    script.manage_proxy(['Manager'])
1116
    self.assertEqual(1, folder.catalog_test_script())
1117
    self.login('bob')
1118
    self.assertEqual(1, folder.catalog_test_script())
1119 1120

    # test with proxy role and wrong role
1121
    self.login('alice')
1122 1123 1124
    script.manage_proxy(['Assignor'])
    # proxy roles must overwrite the user's roles, even if he is the owner
    # of the script
1125
    self.assertEqual(0, folder.catalog_test_script())
1126
    self.login('bob')
1127
    self.assertEqual(0, folder.catalog_test_script())
1128

Jérome Perrin's avatar
Jérome Perrin committed
1129
  def test_42_SearchableText(self):
1130 1131 1132 1133 1134
    """Tests SearchableText is working in ERP5Catalog
    """
    folder = self.getOrganisationModule()
    ob = folder.newContent()
    ob.setTitle('The title of this object')
1135
    self.assertTrue('this' in ob.SearchableText(), ob.SearchableText())
1136 1137
    # add some other objects, we need to create a minimum quantity of data for
    # full text queries to work correctly
1138 1139 1140
    for i in range(10):
      otherob = folder.newContent()
      otherob.setTitle('Something different')
1141
      self.assertFalse('this' in otherob.SearchableText(), otherob.SearchableText())
1142 1143
    # catalog those objects
    self.tic()
1144
    catalog_tool = self.getCatalogTool()
1145
    self.assertEqual([ob],
1146 1147
        [x.getObject() for x in catalog_tool(portal_type='Organisation',
                                             SearchableText='title')])
1148
    self.assertEqual(1,
1149 1150 1151
                      catalog_tool.countResults(portal_type='Organisation',
                                                SearchableText='title')[0][0])

1152
    # 'different' is found in more than 50% of records
1153 1154 1155 1156 1157 1158 1159
    # MySQL ignores such a word, but Mroonga does not ignore.
    if 'ENGINE=Mroonga' in self.portal.erp5_sql_connection.manage_test(
        'SHOW CREATE TABLE full_text')[0][1]:
      # Mroonga
      self.assertEqual(10, self.getCatalogTool().countResults(
                portal_type='Organisation', SearchableText='different')[0][0])
    else:
1160
      # MySQL
1161
      self.assertEqual([],
1162 1163
          [x.getObject for x in self.getCatalogTool()(
                  portal_type='Organisation', SearchableText='different')])
1164
      self.assertEqual(0, self.getCatalogTool().countResults(
1165
                portal_type='Organisation', SearchableText='different')[0][0])
Jérome Perrin's avatar
Jérome Perrin committed
1166 1167

  def test_43_ManagePasteObject(self):
1168 1169 1170 1171 1172 1173 1174 1175 1176
    portal_catalog = self.getCatalogTool()
    person_module = self.getPersonModule()
    person = person_module.newContent(id='1',portal_type='Person')
    self.tic()
    copy_data = person_module.manage_copyObjects([person.getId()])
    new_id = person_module.manage_pasteObjects(copy_data)[0]['new_id']
    new_person = person_module[new_id]
    self.tic()
    path_list = [new_person.getRelativeUrl()]
1177
    self.checkRelativeUrlInSQLPathList(path_list)
1178

Jérome Perrin's avatar
Jérome Perrin committed
1179
  def test_44_ParentRelatedKeys(self):
1180 1181 1182 1183 1184
    portal_catalog = self.getCatalogTool()
    person_module = self.getPersonModule()
    person_module.reindexObject()
    person = person_module.newContent(id='1',portal_type='Person')
    self.tic()
1185
    self.assertEqual([person],
1186 1187
        [x.getObject() for x in self.getCatalogTool()(
               parent_title=person_module.getTitle())])
Jérome Perrin's avatar
Jérome Perrin committed
1188 1189

  def test_45_QueryAndComplexQuery(self):
1190 1191 1192 1193 1194 1195 1196 1197
    """
    """
    org_a = self._makeOrganisation(title='abc',description='abc')
    org_b = self._makeOrganisation(title='bcd',description='bcd')
    org_c = self._makeOrganisation(title='efg',description='efg')
    org_e = self._makeOrganisation(title='foo',description='bir')
    org_f = self._makeOrganisation(title='foo',description='bar')

1198 1199 1200 1201 1202 1203
    # uid=[]
    catalog_kw= {'query':Query(uid=[])}
    self.failIfDifferentSet(
        [x.getPath() for x in (org_a, org_b, org_c, org_e, org_f)],
        [x.path for x in self.getCatalogTool()(
                portal_type='Organisation',**catalog_kw)])
1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221
    # title='abc'
    catalog_kw= {'title':Query(title='abc')}
    self.failIfDifferentSet([org_a.getPath()],
        [x.path for x in self.getCatalogTool()(
                portal_type='Organisation',**catalog_kw)])
    # title with b and c
    catalog_kw= {'title':Query(title=['%b%','%c%'],operator='AND')}
    self.failIfDifferentSet([org_a.getPath(), org_b.getPath()],
        [x.path for x in self.getCatalogTool()(
                portal_type='Organisation',**catalog_kw)])
    # title='bcd' OR description='efg'
    catalog_kw = {'query':ComplexQuery(Query(title='bcd'),
                                       Query(description='efg'),
                                       operator='OR')}
    self.failIfDifferentSet([org_b.getPath(), org_c.getPath()],
        [x.path for x in self.getCatalogTool()(
                portal_type='Organisation',**catalog_kw)])
    # Recursive Complex Query
1222
    # (title='abc' and description='abc') OR
1223
    #  title='foo' and description='bar'
1224 1225
    catalog_kw = {'query':ComplexQuery(ComplexQuery(SimpleQuery(title='abc'),
                                                    SimpleQuery(description='abc'),
1226
                                                    operator='AND'),
1227 1228
                                       ComplexQuery(SimpleQuery(title='foo'),
                                                    SimpleQuery(description='bar'),
1229 1230 1231 1232 1233
                                                    operator='AND'),
                                       operator='OR')}
    self.failIfDifferentSet([org_a.getPath(), org_f.getPath()],
        [x.path for x in self.getCatalogTool()(
                portal_type='Organisation',**catalog_kw)])
1234

Jérome Perrin's avatar
Jérome Perrin committed
1235
  def test_46_TestLimit(self):
1236
    ctool = self.getCatalogTool()
1237 1238
    old_default_result_limit = ctool.default_result_limit
    max_ = ctool.default_result_limit = 3
1239 1240 1241 1242 1243
    #Create max + 2 Organisations
    for i in xrange(max_ + 2):
      self._makeOrganisation(title='abc%s' % (i), description='abc')
    self.assertEqual(max_,
                     len(self.getCatalogTool()(portal_type='Organisation')))
1244
    self.assertEqual(max_ + 2,
1245 1246
            len(self.getCatalogTool()(portal_type='Organisation', limit=None)))
    ctool.default_result_limit = old_default_result_limit
1247

1248
  def playActivityList(self, method_id_list):
1249
    self.commit()
1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260
    portal_activities = self.getActivityTool()
    for i in range(0,100):
      message_list = portal_activities.getMessageList()
      for message in message_list:
        #if message.method_id=='setHotReindexingState':
        #  import pdb;pdb.set_trace()
        if message.method_id in method_id_list:
          try:
            portal_activities.manageInvoke(message.object_path,message.method_id)
          except ActivityFlushError,m:
            pass
1261
      self.commit()
1262

Jérome Perrin's avatar
Jérome Perrin committed
1263
  def test_48_ERP5Site_hotReindexAll(self):
1264
    """
Jérome Perrin's avatar
Jérome Perrin committed
1265
      test the hot reindexing of catalog -> catalog2
1266 1267 1268
      then a hot reindexing detailed catalog2 -> catalog
      this test use the variable environment: extra_sql_connection_string_list
    """
Jérome Perrin's avatar
Jérome Perrin committed
1269
    portal = self.portal
1270
    self.original_connection_id = 'erp5_sql_connection'
1271 1272
    self.original_deferred_connection_id = self.new_erp5_deferred_sql_connection
    self.new_connection_id = self.new_erp5_sql_connection
1273
    self.new_deferred_connection_id = 'erp5_sql_deferred_connection2'
1274
    new_connection_string = getExtraSqlConnectionStringList()[0]
1275 1276 1277 1278 1279 1280 1281 1282 1283

    # Skip this test if default connection string is not "test test".
    original_connection = getattr(portal, self.original_connection_id)
    connection_string = original_connection.connection_string
    if (connection_string == new_connection_string):
      message = 'SKIPPED: default connection string is the same as the one for hot-reindex catalog'
      ZopeTestCase._print(message)
      LOG('Testing... ',0,message)

1284
    portal_category = self.getCategoryTool()
1285
    portal_activities = self.getActivityTool()
1286 1287 1288 1289 1290 1291 1292
    self.base_category = portal_category.newContent(portal_type='Base Category',
                                               title="GreatTitle1")
    module = portal.getDefaultModule('Organisation')
    self.organisation = module.newContent(portal_type='Organisation',
                                     title="GreatTitle2")
    # Flush message queue
    self.tic()
1293 1294
    addSQLConnection = portal.manage_addProduct['ZMySQLDA'] \
      .manage_addZMySQLConnection
1295
    # Create new connectors
1296
    addSQLConnection(self.new_connection_id,'', new_connection_string)
1297 1298
    new_connection = portal[self.new_connection_id]
    new_connection.manage_open_connection()
1299
    addSQLConnection(self.new_deferred_connection_id,'', new_connection_string)
1300 1301 1302 1303 1304 1305
    new_connection = portal[self.new_deferred_connection_id]
    new_connection.manage_open_connection()
    # the transactionless connector must not be change because this one
    # create the portal_ids otherwise it create of conflicts with uid
    # objects

1306 1307 1308 1309 1310 1311
    # Create new catalog
    portal_catalog = self.getCatalogTool()
    self.original_catalog_id = 'erp5_mysql_innodb'
    self.new_catalog_id = self.original_catalog_id + '2'
    cp_data = portal_catalog.manage_copyObjects(ids=('erp5_mysql_innodb',))
    new_id = portal_catalog.manage_pasteObjects(cp_data)[0]['new_id']
1312
    portal_catalog.manage_renameObject(id=new_id, new_id=self.new_catalog_id)
1313 1314 1315

    # Parse all methods in the new catalog in order to change the connector
    new_catalog = portal_catalog[self.new_catalog_id]
1316 1317 1318 1319
    source_sql_connection_id_list=list((self.original_connection_id,
                                  self.original_deferred_connection_id))
    destination_sql_connection_id_list=list((self.new_connection_id,
                                       self.new_deferred_connection_id))
1320
    #launch the full hot reindexing
1321 1322 1323 1324 1325 1326
    portal_catalog.manage_hotReindexAll(source_sql_catalog_id=self.original_catalog_id,
                 destination_sql_catalog_id=self.new_catalog_id,
                 source_sql_connection_id_list=source_sql_connection_id_list,
                 destination_sql_connection_id_list=destination_sql_connection_id_list,
                 update_destination_sql_catalog=True)

1327 1328
    # Flush message queue
    self.tic()
1329 1330
    original_path_list = self.getSQLPathList(self.original_connection_id)
    new_path_list = self.getSQLPathList(self.new_connection_id)
1331
    self.assertTrue(set(original_path_list).issubset(new_path_list))
1332 1333
    self.organisation2 = module.newContent(portal_type='Organisation',
                                     title="GreatTitle2")
1334
    first_deleted_url = self.organisation2.getRelativeUrl()
1335 1336
    self.tic()
    path_list = [self.organisation.getRelativeUrl()]
1337 1338
    self.checkRelativeUrlInSQLPathList(path_list, connection_id=self.original_connection_id)
    self.checkRelativeUrlInSQLPathList(path_list, connection_id=self.new_connection_id)
1339
    path_list = [first_deleted_url]
1340 1341
    self.checkRelativeUrlNotInSQLPathList(path_list, connection_id=self.original_connection_id)
    self.checkRelativeUrlInSQLPathList(path_list, connection_id=self.new_connection_id)
1342 1343

    # Make sure some zsql method use the right connection_id
1344
    zsql_method = portal.portal_skins.erp5_core.Resource_zGetInventoryList
1345
    self.assertEqual(getattr(zsql_method,'connection_id'),self.new_connection_id)
1346

1347
    self.assertEqual(portal_catalog.getHotReindexingState(),
1348 1349
                      HOT_REINDEXING_FINISHED_STATE)

1350 1351
    # Do a hot reindex in the reverse way, but this time a more
    # complicated hot reindex
1352 1353 1354 1355 1356 1357
    portal_catalog.manage_hotReindexAll(
      source_sql_catalog_id=self.new_catalog_id,
      destination_sql_catalog_id=self.original_catalog_id,
      source_sql_connection_id_list=destination_sql_connection_id_list,
      destination_sql_connection_id_list=source_sql_connection_id_list,
      update_destination_sql_catalog=True)
1358
    self.commit()
1359
    self.assertEqual(portal_catalog.getHotReindexingState(),
1360
                      HOT_REINDEXING_RECORDING_STATE)
1361 1362
    self.organisation3 = module.newContent(portal_type='Organisation',
                                     title="GreatTitle2")
1363 1364 1365 1366 1367
    # Try something more complicated, create new object, reindex it
    # and then delete it
    self.deleted_organisation = module.newContent(portal_type='Organisation',
                                     title="GreatTitle2")
    self.deleted_organisation.immediateReindexObject()
1368
    self.commit()
1369 1370
    deleted_url = self.deleted_organisation.getRelativeUrl()
    module.manage_delObjects(ids=[self.deleted_organisation.getId()])
1371
    self.commit()
1372 1373 1374 1375 1376 1377
    # We will invoke acitivities one by one in order to make sure we can test
    # the double indexing state of hot reindexing
    self.playActivityList(('Folder_reindexAll',
                         'InventoryModule_reindexMovementList',
                         'immediateReindexObject',
                         'Folder_reindexObjectList',
1378 1379 1380 1381 1382
                         'unindexObject',
                         'recursiveImmediateReindexObject'))
    # try to delete objects in double indexing state
    module.manage_delObjects(ids=[self.organisation2.getId()])
    self.playActivityList(('immediateReindexObject',
1383 1384 1385 1386 1387
                         'unindexObject',
                         'recursiveImmediateReindexObject',
                         'playBackRecordedObjectList',
                         'getId',
                         'setHotReindexingState'))
1388
    self.assertEqual(portal_catalog.getHotReindexingState(),
1389 1390 1391 1392 1393
                      HOT_REINDEXING_DOUBLE_INDEXING_STATE)
    # Now we have started an double indexing
    self.next_deleted_organisation = module.newContent(portal_type='Organisation',
                                     title="GreatTitle2",id='toto')
    next_deleted_url = self.next_deleted_organisation.getRelativeUrl()
1394
    self.commit()
1395 1396 1397 1398 1399 1400
    self.playActivityList(( 'immediateReindexObject',
                         'recursiveImmediateReindexObject',))
    path_list=[next_deleted_url]
    self.checkRelativeUrlInSQLPathList(path_list,connection_id=self.new_connection_id)
    self.checkRelativeUrlInSQLPathList(path_list,connection_id=self.original_connection_id)
    module.manage_delObjects(ids=[self.next_deleted_organisation.getId()])
1401 1402 1403 1404
    #Create object during the double indexing to check the security object
    #after the hot reindexing
    self.organisation4 = module.newContent(portal_type='Organisation',
                                     title="GreatTitle2")
1405
    self.tic()
1406
    self.assertEqual(portal_catalog.getHotReindexingState(),
1407
                      HOT_REINDEXING_FINISHED_STATE)
1408 1409 1410 1411 1412
    # Check Security UID object exist in roles and users
    # compare the number object in the catalog
    count_catalog = len(self.getSQLPathList(self.original_connection_id))
    count_restricted_catalog = len(self.getSQLPathListWithRolesAndUsers(\
                               self.original_connection_id))
1413
    self.assertEqual(count_catalog, count_restricted_catalog)
1414

1415 1416 1417
    path_list = [self.organisation3.getRelativeUrl()]
    self.checkRelativeUrlInSQLPathList(path_list,connection_id=self.new_connection_id)
    self.checkRelativeUrlInSQLPathList(path_list,connection_id=self.original_connection_id)
1418
    path_list = [first_deleted_url,deleted_url,next_deleted_url]
1419 1420
    self.checkRelativeUrlNotInSQLPathList(path_list,connection_id=self.new_connection_id)
    self.checkRelativeUrlNotInSQLPathList(path_list,connection_id=self.original_connection_id)
1421 1422 1423 1424
    # Make sure module are there
    path_list = [module.getRelativeUrl()]
    self.checkRelativeUrlInSQLPathList(path_list, connection_id=self.new_connection_id)
    self.checkRelativeUrlInSQLPathList(path_list, connection_id=self.original_connection_id)
1425 1426 1427

  def test_48bis_ERP5Site_hotReindexAllCheckCachedValues(self):
    """
1428
      test the hot reindexing of catalog -> catalog2
1429 1430 1431
      Check that cached values are invalidated due to
      catalog migration
    """
Jérome Perrin's avatar
Jérome Perrin committed
1432
    portal = self.portal
1433 1434
    original_connection_id = 'erp5_sql_connection'
    original_deferred_connection_id = 'erp5_sql_deferred_connection'
1435 1436 1437
    new_connection_string = getExtraSqlConnectionStringList()[0]

    # Skip this test if default connection string is not "test test".
1438
    original_connection = getattr(portal, original_connection_id)
1439 1440 1441 1442 1443 1444 1445
    connection_string = original_connection.connection_string
    if (connection_string == new_connection_string):
      message = 'SKIPPED: default connection string is the same as the one for hot-reindex catalog'
      ZopeTestCase._print(message)
      LOG('Testing... ',0, message)

    # Create new connectors
1446 1447 1448
    addSQLConnection = portal.manage_addProduct['ZMySQLDA'] \
      .manage_addZMySQLConnection
    addSQLConnection(self.new_erp5_sql_connection,'', new_connection_string)
1449 1450
    new_connection = portal[self.new_erp5_sql_connection]
    new_connection.manage_open_connection()
1451
    addSQLConnection(self.new_erp5_deferred_sql_connection,'',
1452 1453 1454 1455 1456 1457 1458 1459 1460
                                      new_connection_string)
    new_connection = portal[self.new_erp5_deferred_sql_connection]
    new_connection.manage_open_connection()
    # the transactionless connector must not be change because this one
    # create the portal_ids otherwise it create of conflicts with uid
    # objects

    # Create new catalog
    portal_catalog = self.getCatalogTool()
1461 1462 1463 1464 1465
    original_catalog = portal_catalog.getSQLCatalog()
    original_catalog_id = original_catalog.getId()
    cp_data = portal_catalog.manage_copyObjects(ids=(original_catalog_id,))
    new_catalog_id = portal_catalog.manage_pasteObjects(cp_data)[0]['new_id']
    new_catalog = portal_catalog[new_catalog_id]
1466 1467 1468 1469 1470 1471 1472

    # Add new searchable table in new catalog
    create_dummy_table_sql = """
    CREATE TABLE `dummy` (
    `uid` BIGINT UNSIGNED NOT NULL,
    `dummy_title` varchar(32) NOT NULL default '',
    PRIMARY KEY  (`uid`)
1473
    ) ENGINE=InnoDB;
1474 1475 1476 1477
    """
    drop_summy_table_sql = """
    DROP TABLE IF EXISTS `dummy`
    """
1478 1479 1480 1481 1482 1483 1484
    for catalog, connection_id in ((original_catalog, original_connection_id),
        (new_catalog, self.new_erp5_sql_connection)):
      catalog.manage_addProduct['ZSQLMethods'].manage_addZSQLMethod(
                    id='z_create_dummy_table', title='', arguments="",
                    connection_id=connection_id,
                    template=create_dummy_table_sql)
      catalog.manage_addProduct['ZSQLMethods'].manage_addZSQLMethod(
1485
                    id='z0_drop_dummy_table', title='', arguments="",
1486
                    connection_id=connection_id,
1487 1488 1489
                    template=drop_summy_table_sql)

    # update catalog configuration and declare new ZSQLMethods
1490
    sql_clear_catalog_list = list(original_catalog.sql_clear_catalog)
1491 1492 1493
    sql_clear_catalog_list.extend(['z0_drop_dummy_table',
                                   'z_create_dummy_table'])
    sql_clear_catalog_list.sort()
1494 1495
    original_catalog.sql_clear_catalog = new_catalog.sql_clear_catalog = \
      tuple(sql_clear_catalog_list)
1496

1497
    sql_search_table_list = list(original_catalog.sql_search_tables)
1498 1499
    sql_search_table_list.append('dummy')
    sql_search_table_list.sort()
1500 1501 1502
    original_catalog.sql_search_tables = new_catalog.sql_search_tables = \
      tuple(sql_search_table_list)

1503
    portal_catalog.manage_catalogClear()
1504
    self.commit()
1505 1506 1507 1508 1509 1510 1511
    # Catalog structure changed, so we should be able to build new queries
    # with new table columns
    # Check that column map is updated according new structure of catalog.
    self.assertTrue('dummy.dummy_title' in portal_catalog.getSQLCatalog().getColumnMap())
    # Check more cached methods of SQLCatalog by building SQLQuery
    query = portal_catalog.getSQLCatalog().buildQuery(kw={'dummy.dummy_title': 'Foo'})
    self.assertTrue(query.query_list)
1512 1513

    # prepare arguments for hot reindex
1514 1515
    source_sql_connection_id_list=list((original_connection_id,
                                  original_deferred_connection_id))
1516 1517
    destination_sql_connection_id_list=list((self.new_erp5_sql_connection,
                                       self.new_erp5_deferred_sql_connection))
1518
    # launch the full hot reindexing
1519 1520
    portal_catalog.manage_hotReindexAll(source_sql_catalog_id=original_catalog_id,
                 destination_sql_catalog_id=new_catalog_id,
1521 1522 1523 1524 1525 1526
                 source_sql_connection_id_list=source_sql_connection_id_list,
                 destination_sql_connection_id_list=destination_sql_connection_id_list,
                 update_destination_sql_catalog=True)

    # Flush message queue
    self.tic()
1527
    self.assertEqual(portal_catalog.getSQLCatalog().getId(), new_catalog_id)
1528 1529 1530 1531 1532
    # Check that column map is updated according new structure of catalog.
    self.assertTrue('dummy.dummy_title' in portal_catalog.getSQLCatalog().getColumnMap())
    # Check more cached methods of SQLCatalog by building SQLQuery
    query = portal_catalog.getSQLCatalog().buildQuery(kw={'dummy.dummy_title': 'Foo'})
    self.assertTrue(query.query_list)
1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543
    # We need to reset SQL connections in skin folder's zsql methods
    sql_connection_id_dict = {}
    for destination_sql_connection_id, source_sql_connection_id in \
          zip(destination_sql_connection_id_list,
              source_sql_connection_id_list):
        if source_sql_connection_id != destination_sql_connection_id:
          sql_connection_id_dict[destination_sql_connection_id] = \
              source_sql_connection_id
    portal_catalog.changeSQLConnectionIds(
      folder=portal.portal_skins,
      sql_connection_id_dict = sql_connection_id_dict)
1544

Jérome Perrin's avatar
Jérome Perrin committed
1545
  def test_47_Unrestricted(self):
1546 1547 1548 1549 1550
    """test unrestricted search/count results.
    """
    uf = self.getPortal().acl_users
    uf._doAddUser('alice', '', ['Member', 'Manager', 'Assignor'], [])
    uf._doAddUser('bob', '', ['Member'], [])
1551

1552
    # create a document that only alice can view
1553
    self.login('alice')
1554 1555 1556 1557
    folder = self.getOrganisationModule()
    ob = folder.newContent(title='Object Title')
    ob.manage_permission('View', ['Manager'], 0)
    self.tic()
Jérome Perrin's avatar
Jérome Perrin committed
1558

1559
    # bob cannot see the document
1560
    self.login('bob')
1561
    ctool = self.getCatalogTool()
1562 1563
    self.assertEqual(0, len(ctool.searchResults(title='Object Title')))
    self.assertEqual(0, ctool.countResults(title='Object Title')[0][0])
Jérome Perrin's avatar
Jérome Perrin committed
1564

1565
    # unless using unrestricted searches
1566
    self.assertEqual(1,
1567
                len(ctool.unrestrictedSearchResults(title='Object Title')))
1568
    self.assertEqual(1,
1569
                ctool.unrestrictedCountResults(title='Object Title')[0][0])
Aurel's avatar
Aurel committed
1570

Jérome Perrin's avatar
Jérome Perrin committed
1571 1572
  @todo_erp5
  def test_49_IndexInOrderedSearchFolder(self):
Aurel's avatar
Aurel committed
1573 1574 1575 1576 1577 1578 1579 1580
    person_module = self.getPersonModule()

    # Clear catalog
    portal_catalog = self.getCatalogTool()
    portal_catalog.manage_catalogClear()
    catalog = portal_catalog.objectValues()[0]

    person = person_module.newContent(id='a',portal_type='Person',title='a',description='z')
1581
    self.tic()
Aurel's avatar
Aurel committed
1582
    person = person_module.newContent(id='b',portal_type='Person',title='a',description='y')
1583
    self.tic()
Aurel's avatar
Aurel committed
1584
    person = person_module.newContent(id='c',portal_type='Person',title='a',description='x')
1585
    self.tic()
Aurel's avatar
Aurel committed
1586 1587 1588 1589 1590
    index_columns = getattr(catalog, 'sql_catalog_index_on_order_keys', None)
    self.assertNotEqual(index_columns, None)
    self.assertEqual(len(index_columns), 0)
    # Check catalog don't tell to use index if nothing defined
    sql = person_module.searchFolder(src__=1)
1591
    self.assertTrue('use index' not in sql)
Aurel's avatar
Aurel committed
1592
    sql = person_module.searchFolder(src__=1, sort_on=[('id','ascending')])
1593
    self.assertTrue('use index' not in sql)
Aurel's avatar
Aurel committed
1594
    sql = person_module.searchFolder(src__=1, sort_on=[('title','ascending')])
1595
    self.assertTrue('use index' not in sql)
Aurel's avatar
Aurel committed
1596 1597 1598 1599 1600 1601 1602 1603
    # Defined that catalog must tell to use index when order by catalog.title
    index_columns = ('catalog.title',)
    setattr(catalog, 'sql_catalog_index_on_order_keys', index_columns)
    index_columns = getattr(catalog, 'sql_catalog_index_on_order_keys', None)
    self.assertNotEqual(index_columns, None)
    self.assertEqual(len(index_columns), 1)
    # Check catalog tell to use index only when ordering by catalog.title
    sql = person_module.searchFolder(src__=1)
1604
    self.assertTrue('use index' not in sql)
Aurel's avatar
Aurel committed
1605
    sql = person_module.searchFolder(src__=1, sort_on=[('id','ascending')])
1606
    self.assertTrue('use index' not in sql)
Aurel's avatar
Aurel committed
1607
    sql = person_module.searchFolder(src__=1, sort_on=[('title','ascending')])
1608
    self.assertTrue('use index' in sql)
Aurel's avatar
Aurel committed
1609

Jérome Perrin's avatar
Jérome Perrin committed
1610
  def test_50_LocalRolesArgument(self):
1611 1612 1613 1614 1615 1616 1617 1618 1619 1620
    """test local_roles= argument
    """
    uf = self.getPortal().acl_users
    uf._doAddUser('bob', '', ['Member'], [])

    # create two documents, one with Assignee local roles, one without
    folder = self.getOrganisationModule()
    ob1 = folder.newContent(title='Object Title')
    ob1.manage_permission('View', ['Member'], 1)
    ob2 = folder.newContent(title='Object Title')
1621
    ob2_id = ob2.getId()
1622 1623
    ob2.manage_addLocalRoles('bob', ['Assignee'])
    self.tic()
Jérome Perrin's avatar
Jérome Perrin committed
1624

1625
    # by default bob can see those 2 documents
1626
    self.login('bob')
1627
    ctool = self.getCatalogTool()
1628 1629
    self.assertEqual(2, len(ctool.searchResults(title='Object Title')))
    self.assertEqual(2, ctool.countResults(title='Object Title')[0][0])
1630

1631 1632
    # if we specify local_roles= it will only returns documents on with bob has
    # a local roles
1633
    self.assertEqual(0,
1634 1635
                len(ctool.searchResults(title='Object Title',
                                        local_roles='UnexistingRole')))
1636
    self.assertEqual(0,
1637 1638
                len(ctool.searchResults(title='Object Title',
                                        local_roles='Assignor')))
1639
    self.assertEqual(1,
1640 1641
                len(ctool.searchResults(title='Object Title',
                                        local_roles='Assignee')))
1642
    self.assertEqual(1,
1643 1644 1645
                ctool.countResults(title='Object Title',
                                   local_roles='Assignee')[0][0])

1646
    # this also work for searchFolder and countFolder
1647
    self.assertEqual(1, len(folder.searchFolder(title='Object Title',
1648
                                             local_roles='Assignee')))
1649
    self.assertEqual(1, folder.countFolder(title='Object Title',
1650
                                             local_roles='Assignee')[0][0])
1651

1652 1653
    # and local_roles can be a list, then this a OR (ie. you must have at least
    # one role).
1654
    self.assertEqual(1,
1655 1656
                len(ctool.searchResults(title='Object Title',
                                       local_roles=['Assignee', 'Auditor'])))
1657
    self.assertEqual(1,
1658 1659 1660 1661
                ctool.countResults(title='Object Title',
                                   local_roles=['Assignee', 'Auditor'])[0][0])

    # this list can also be given in ; form, for worklists URL
1662
    self.assertEqual(1,
1663 1664
                len(ctool.searchResults(title='Object Title',
                                       local_roles='Assignee;Auditor')))
1665
    self.assertEqual(1,
1666 1667 1668
                ctool.countResults(title='Object Title',
                                   local_roles='Assignee;Auditor')[0][0])

1669 1670
    #Test if bob can't see object even if Assignee role (without View permission) is defined on object
    ob1.manage_addLocalRoles('bob', ['Assignee'])
1671 1672
    ob1.manage_permission('View', ['Assignor'], 0)
    ob1.reindexObject()
1673
    self.tic()
1674 1675 1676 1677
    user = getSecurityManager().getUser()
    self.assertFalse(user.has_permission('View', ob1))
    self.assertTrue(user.has_role('Assignee', ob1))
    result_list = [r.getId() for r in ctool(title='Object Title', local_roles='Assignee')]
1678 1679 1680
    self.assertEqual(1, len(result_list))
    self.assertEqual([ob2_id], result_list)
    self.assertEqual(1,
1681 1682 1683 1684
                ctool.countResults(title='Object Title',
                                   local_roles='Assignee')[0][0])

    # this also work for searchFolder and countFolder
1685
    self.assertEqual(1, len(folder.searchFolder(title='Object Title',
1686
                                             local_roles='Assignee')))
1687
    self.assertEqual(1, folder.countFolder(title='Object Title',
1688 1689
                                             local_roles='Assignee')[0][0])

Jérome Perrin's avatar
Jérome Perrin committed
1690
  def test_51_SearchWithKeyWords(self):
1691 1692 1693 1694 1695 1696 1697 1698
    person_module = self.getPersonModule()
    and_ = person_module.newContent(portal_type='Person', title='AND')
    or_ = person_module.newContent(portal_type='Person', title='OR')
    like_ = person_module.newContent(portal_type='Person', title='LIKE')
    select_ = person_module.newContent(portal_type='Person', title='SELECT')

    self.tic()
    ctool = self.getCatalogTool()
1699
    self.assertEqual([and_], [x.getObject() for x in
1700 1701
                                   ctool(portal_type='Person', title='AND')])

1702
    self.assertEqual([or_], [x.getObject() for x in
1703 1704
                                   ctool(portal_type='Person', title='OR')])

1705
    self.assertEqual([like_], [x.getObject() for x in
1706 1707
                                   ctool(portal_type='Person', title='LIKE')])

1708
    self.assertEqual([select_], [x.getObject() for x in
1709 1710
                                   ctool(portal_type='Person', title='SELECT')])

Jérome Perrin's avatar
Jérome Perrin committed
1711
  def test_52_QueryAndTableAlias(self):
1712 1713 1714 1715 1716 1717 1718 1719 1720 1721 1722 1723 1724 1725
    """
    Make sure we can use aliases for tables wich will
    be used by related keys. This allow in some particular
    cases to decrease a lot the number of aliases
    """
    org_a = self._makeOrganisation(title='abc',default_address_city='abc')
    module = self.getOrganisationModule()
    module.immediateReindexObject()
    # First try without aliases
    query1 = Query(parent_portal_type="Organisation")
    query2 = Query(grand_parent_portal_type="Organisation Module")
    complex_query = ComplexQuery(query1, query2, operator="AND")
    self.failIfDifferentSet([org_a.getPath() + '/default_address'],
        [x.path for x in self.getCatalogTool()(query=complex_query)])
1726
    # Then try with aliases
1727
    query1 = Query(parent_portal_type="Organisation",
1728 1729
                   table_alias_list=(("catalog" , "parent"),))
    query2 = Query(grand_parent_portal_type="Organisation Module",
1730
                   table_alias_list=(("catalog" , "parent"),
1731 1732 1733 1734 1735 1736 1737 1738 1739 1740 1741
                                    ("catalog", "grand_parent")))
    complex_query = ComplexQuery(query1, query2, operator="AND")
    self.failIfDifferentSet([org_a.getPath() + '/default_address'],
        [x.path for x in self.getCatalogTool()(query=complex_query)])
    sql_kw = self.getCatalogTool().buildSQLQuery(query=complex_query)
    # Make sure we have the right list of aliases
    table_alias_list = sql_kw["from_table_list"]
    self.failIfDifferentSet((("catalog","catalog"),
                             ("parent","catalog"),
                             ("grand_parent","catalog")),
                             table_alias_list)
Jérome Perrin's avatar
Jérome Perrin committed
1742 1743

  def test_53_DateFormat(self):
1744 1745 1746 1747 1748 1749 1750 1751 1752 1753 1754 1755 1756 1757 1758 1759 1760 1761 1762 1763 1764 1765 1766 1767 1768 1769 1770 1771 1772 1773 1774 1775 1776 1777 1778 1779 1780 1781 1782 1783 1784 1785 1786 1787
    org_a = self._makeOrganisation(title='org_a')
    org_b = self._makeOrganisation(title='org_b')
    sql_connection = self.getSQLConnection()
    # Add a method in order to directly put values we want into
    # the catalog.
    def updateDate(organisation,date):
      uid = organisation.getUid()
      sql = "UPDATE catalog SET modification_date='%s' '\
          'WHERE uid=%s" %\
          (date,uid)
      result = sql_connection.manage_test(sql)
    updateDate(org_a,'2007-01-12 01:02:03')
    updateDate(org_b,'2006-02-24 15:09:06')

    catalog_kw = {'modification_date':{'query':'24/02/2006',
                               'format':'%d/%m/%Y',
                               'type':'date'}}
    self.failIfDifferentSet([org_b.getPath()],
        [x.path for x in self.getCatalogTool()(
                portal_type='Organisation',**catalog_kw)])
    catalog_kw = {'modification_date':{'query':'2007-01-12',
                               'format':'%Y-%m-%d',
                               'type':'date'}}
    self.failIfDifferentSet([org_a.getPath()],
        [x.path for x in self.getCatalogTool()(
                portal_type='Organisation',**catalog_kw)])
    catalog_kw = {'modification_date':{'query':'>31/12/2006',
                               'format':'%d/%m/%Y',
                               'type':'date'}}
    self.failIfDifferentSet([org_a.getPath()],
        [x.path for x in self.getCatalogTool()(
                portal_type='Organisation',**catalog_kw)])
    catalog_kw = {'modification_date':{'query':'2006',
                               'format':'%Y',
                               'type':'date'}}
    self.failIfDifferentSet([org_b.getPath()],
        [x.path for x in self.getCatalogTool()(
                portal_type='Organisation',**catalog_kw)])
    catalog_kw = {'modification_date':{'query':'>2006',
                               'format':'%Y',
                               'type':'date'}}
    self.failIfDifferentSet([org_a.getPath()],
        [x.path for x in self.getCatalogTool()(
                portal_type='Organisation',**catalog_kw)])
1788 1789 1790 1791 1792 1793 1794
    # If the date is an empty string, check that all objects are displayed.
    catalog_kw = {'modification_date':{'query':'',
                               'format':'%d/%m/%Y',
                               'type':'date'}}
    self.failIfDifferentSet([org_a.getPath(), org_b.getPath()],
        [x.path for x in self.getCatalogTool()(
                portal_type='Organisation',**catalog_kw)])
1795

Jérome Perrin's avatar
Jérome Perrin committed
1796
  def test_54_FixIntUid(self):
1797 1798 1799 1800 1801 1802 1803
    portal_catalog = self.getCatalogTool()
    portal = self.getPortal()

    module = portal.getDefaultModule('Organisation')
    organisation = module.newContent(portal_type='Organisation',)
    # Ensure that the new uid is long.
    uid = organisation.uid
1804
    self.assertTrue(isinstance(uid, long))
1805 1806 1807
    self.tic()

    # Ensure that the uid did not change after the indexing.
1808
    self.assertEqual(organisation.uid, uid)
1809 1810 1811 1812 1813 1814 1815

    # Force to convert the uid to int.
    self.uid = int(uid)
    self.tic()

    # After the indexing, the uid must be converted to long automatically,
    # and the value must be equivalent.
1816 1817
    self.assertTrue(isinstance(uid, long))
    self.assertEqual(organisation.uid, uid)
1818

Jérome Perrin's avatar
Jérome Perrin committed
1819
  def test_55_FloatFormat(self):
Jérome Perrin's avatar
Jérome Perrin committed
1820 1821 1822
    catalog_kw = {'uid': {'query': '2 567.54',
                          'format': '1 234.12',
                          'type': 'float'}}
1823 1824 1825
    sql_src = self.getCatalogTool().buildSQLQuery(**catalog_kw)['where_expression']
    self.assertTrue("TRUNCATE(catalog.uid,2) = '2567.54'" in sql_src or \
                    'TRUNCATE(`catalog`.`uid`, 2) = 2567.54' in sql_src, sql_src)
1826

Jérome Perrin's avatar
Jérome Perrin committed
1827
  def test_56_CreateUidDuringClearCatalog(self):
1828 1829 1830 1831 1832 1833 1834
    """
      Create a script in the catalog to generate a uid list
      Check the creation some objects, or activities, during a clear
    """
    # Add a script to create uid list
    catalog = self.getCatalogTool().getSQLCatalog()
    script_id = 'z0_zCreateUid'
1835 1836
    script_content = "context.getPortalObject().portal_ids.generateNewIdList(id_generator='uid',\
                                                                             id_group='text_uid')"
1837 1838
    script = createZODBPythonScript(catalog, script_id,
                          '*args,**kw', script_content)
1839 1840 1841 1842 1843 1844 1845 1846
    sql_clear_catalog = list(catalog.sql_clear_catalog)
    sql_clear_catalog.append(script_id)
    sql_clear_catalog.sort()
    catalog.sql_clear_catalog = tuple(sql_clear_catalog)
    # launch the sql_clear_catalog with the script after the drop tables and
    # before the recreate tables of catalog
    catalog.manage_catalogClear()

Jérome Perrin's avatar
Jérome Perrin committed
1847
  def test_SearchOnOwner(self):
1848 1849 1850 1851 1852 1853 1854 1855
    # owner= can be used a search key in the catalog to have all documents for
    # a specific owner and on which he have the View permission.
    obj = self._makeOrganisation(title='The Document')
    obj2 = self._makeOrganisation(title='The Document')
    obj2.manage_permission('View', [], 0)
    obj2.reindexObject()
    self.tic()
    ctool = self.getCatalogTool()
1856
    self.assertEqual([obj], [x.getObject() for x in
1857 1858
                                   ctool(title='The Document',
                                         owner=self.username)])
1859
    self.assertEqual([], [x.getObject() for x in
1860 1861 1862
                                   ctool(title='The Document',
                                         owner='somebody else')])

Jérome Perrin's avatar
Jérome Perrin committed
1863

Jérome Perrin's avatar
Jérome Perrin committed
1864
  def test_SubDocumentsSecurityIndexing(self):
1865 1866 1867 1868 1869 1870 1871 1872
    # make sure indexing of security on sub-documents works as expected
    uf = self.getPortal().acl_users
    uf._doAddUser('bob', '', ['Member'], [])
    obj = self._makeOrganisation(title='The Document')
    obj2 = obj.newContent(portal_type='Bank Account')
    obj2.manage_addLocalRoles('bob', ['Auditor'])
    self.tic()

1873
    self.login('bob')
1874
    self.assertEqual([obj2], [x.getObject() for x in
1875 1876 1877 1878 1879 1880 1881
                               obj.searchFolder(portal_type='Bank Account')])
    # now if we pass the bank account in deleted state, it's no longer returned
    # by searchFolder.
    # This will work as long as Bank Account are associated to a workflow that
    # allow deletion.
    obj2.delete()
    self.tic()
1882
    self.assertEqual([], [x.getObject() for x in
1883 1884
                           obj.searchFolder(portal_type='Bank Account')])

1885
  @todo_erp5
Jérome Perrin's avatar
Jérome Perrin committed
1886
  def test_SubDocumentsWithAcquireLocalRoleSecurityIndexing(self):
1887 1888 1889 1890 1891 1892 1893 1894 1895 1896 1897 1898 1899 1900 1901 1902 1903 1904 1905 1906 1907 1908 1909 1910 1911 1912 1913 1914 1915 1916 1917 1918 1919 1920
    # Check that sub object indexation is compatible with ZODB settings
    # when the sub object acquires the parent local roles
    perm = 'View'

    # Create some users
    logout = self.logout
    user1 = 'local_foo_1'
    user2 = 'local_bar_1'
    uf = self.getPortal().acl_users
    uf._doAddUser(user1, user1, ['Member', ], [])
    uf._doAddUser(user2, user2, ['Member', ], [])

    container_portal_type = 'Organisation'
    # Create a container, define a local role, and set view permission
    folder = self.getOrganisationModule()

    # user1 should be auditor on container
    # user2 should be assignor on subdocument
    container = folder.newContent(portal_type=container_portal_type)
    container.manage_setLocalRoles(user1, ['Auditor'])
#     container.manage_setLocalRoles(user2, [])
    container.manage_permission(perm, ['Owner', 'Auditor', 'Assignor'], 0)

    # By default, local roles are acquired from container for Email portal type
    object_portal_type = 'Email'
    obj = container.newContent(portal_type=object_portal_type)
    # Acquire permission from parent
    obj.manage_permission(perm, [], 1)
    obj.manage_setLocalRoles(user2, ['Assignor'])

    obj.reindexObject()
    self.tic()

    logout()
1921
    self.login(user1)
1922 1923
    result = obj.portal_catalog(portal_type=object_portal_type)
    self.assertSameSet([obj, ], [x.getObject() for x in result])
1924
    result = obj.portal_catalog(portal_type=object_portal_type,
1925 1926
                                local_roles='Owner')
    self.assertSameSet([], [x.getObject() for x in result])
1927
    result = obj.portal_catalog(portal_type=object_portal_type,
1928 1929
                                local_roles='Assignor')
    self.assertSameSet([], [x.getObject() for x in result])
1930
    result = obj.portal_catalog(portal_type=object_portal_type,
1931 1932 1933 1934
                                local_roles='Auditor')
    self.assertSameSet([obj], [x.getObject() for x in result])

    logout()
1935
    self.login(user2)
1936 1937
    result = obj.portal_catalog(portal_type=object_portal_type)
    self.assertSameSet([obj, ], [x.getObject() for x in result])
1938
    result = obj.portal_catalog(portal_type=object_portal_type,
1939 1940
                                local_roles='Owner')
    self.assertSameSet([], [x.getObject() for x in result])
1941
    result = obj.portal_catalog(portal_type=object_portal_type,
1942 1943
                                local_roles='Assignor')
    self.assertSameSet([obj], [x.getObject() for x in result])
1944
    result = obj.portal_catalog(portal_type=object_portal_type,
1945 1946 1947
                                local_roles='Auditor')
    self.assertSameSet([], [x.getObject() for x in result])

Jérome Perrin's avatar
Jérome Perrin committed
1948
  def test_60_ViewableOwnerIndexing(self):
1949 1950 1951 1952 1953 1954 1955 1956 1957 1958 1959 1960
    logout = self.logout
    uf = self.getPortal().acl_users
    uf._doAddUser('super_owner', '', ['Member', 'Author', 'Assignee'], [])
    uf._doAddUser('little_owner', '', ['Member', 'Author'], [])

    perm = 'View'
    folder = self.getOrganisationModule()
    portal_type = 'Organisation'
    sub_portal_type_id = 'Address'
    sub_portal_type = self.getPortal().portal_types._getOb(sub_portal_type_id)

    sql_connection = self.getSQLConnection()
1961
    sql = 'select viewable_owner as owner from catalog where uid=%s'
1962

1963
    self.login('super_owner')
1964 1965 1966 1967 1968 1969

    # Check that Owner is not catalogued if he can't view the object
    obj = folder.newContent(portal_type='Organisation')
    obj.manage_permission(perm, [], 0)
    self.tic()
    result = sql_connection.manage_test(sql % obj.getUid())
1970
    self.assertSameSet([''], [x.owner for x in result])
1971 1972 1973 1974 1975 1976 1977 1978

    # Check that Owner is catalogued when he can view the object
    obj = folder.newContent(portal_type='Organisation')
    obj.manage_permission(perm, ['Owner'], 0)
    self.tic()
    result = sql_connection.manage_test(sql % obj.getUid())
    self.assertSameSet(['super_owner'], [x.owner for x in result])

1979
    # Check that Owner is not catalogued when he can view the
1980 1981 1982 1983 1984
    # object because he has another role
    obj = folder.newContent(portal_type='Organisation')
    obj.manage_permission(perm, ['Assignee'], 0)
    self.tic()
    result = sql_connection.manage_test(sql % obj.getUid())
1985
    self.assertSameSet([''], [x.owner for x in result])
1986

1987
    # Check that Owner is not catalogued when he can't view the
1988 1989 1990 1991
    # object and when the portal type does not acquire the local roles.
    sub_portal_type.acquire_local_roles = False
    self.portal.portal_caches.clearAllCache()
    logout()
1992
    self.login('super_owner')
1993 1994 1995
    obj = folder.newContent(portal_type='Organisation')
    obj.manage_permission(perm, ['Owner'], 0)
    logout()
1996
    self.login('little_owner')
1997 1998 1999 2000
    sub_obj = obj.newContent(portal_type='Address')
    sub_obj.manage_permission(perm, [], 0)
    self.tic()
    result = sql_connection.manage_test(sql % sub_obj.getUid())
2001
    self.assertSameSet([''], [x.owner for x in result])
2002

2003
    # Check that Owner is catalogued when he can view the
2004 2005 2006 2007
    # object and when the portal type does not acquire the local roles.
    sub_portal_type.acquire_local_roles = False
    self.portal.portal_caches.clearAllCache()
    logout()
2008
    self.login('super_owner')
2009 2010 2011
    obj = folder.newContent(portal_type='Organisation')
    obj.manage_permission(perm, ['Owner'], 0)
    logout()
2012
    self.login('little_owner')
2013 2014 2015 2016 2017 2018
    sub_obj = obj.newContent(portal_type='Address')
    sub_obj.manage_permission(perm, ['Owner'], 0)
    self.tic()
    result = sql_connection.manage_test(sql % sub_obj.getUid())
    self.assertSameSet(['little_owner'], [x.owner for x in result])

2019
    # Check that Owner is catalogued when he can view the
2020 2021 2022 2023 2024
    # object because permissions are acquired and when the portal type does not
    # acquire the local roles.
    sub_portal_type.acquire_local_roles = False
    self.portal.portal_caches.clearAllCache()
    logout()
2025
    self.login('super_owner')
2026 2027 2028
    obj = folder.newContent(portal_type='Organisation')
    obj.manage_permission(perm, ['Owner'], 0)
    logout()
2029
    self.login('little_owner')
2030 2031 2032 2033 2034 2035
    sub_obj = obj.newContent(portal_type='Address')
    sub_obj.manage_permission(perm, [], 1)
    self.tic()
    result = sql_connection.manage_test(sql % sub_obj.getUid())
    self.assertSameSet(['little_owner'], [x.owner for x in result])

2036
    # Check that Owner is not catalogued when he can't view the
2037 2038 2039 2040
    # object and when the portal type acquires the local roles.
    sub_portal_type.acquire_local_roles = True
    self.portal.portal_caches.clearAllCache()
    logout()
2041
    self.login('super_owner')
2042 2043 2044
    obj = folder.newContent(portal_type='Organisation')
    obj.manage_permission(perm, ['Owner'], 0)
    logout()
2045
    self.login('little_owner')
2046 2047 2048 2049
    sub_obj = obj.newContent(portal_type='Address')
    sub_obj.manage_permission(perm, [], 0)
    self.tic()
    result = sql_connection.manage_test(sql % sub_obj.getUid())
2050
    self.assertSameSet([''], [x.owner for x in result])
2051

2052
    # Check that Owner is catalogued when he can view the
2053 2054 2055 2056
    # object and when the portal type acquires the local roles.
    sub_portal_type.acquire_local_roles = True
    self.portal.portal_caches.clearAllCache()
    logout()
2057
    self.login('super_owner')
2058 2059 2060
    obj = folder.newContent(portal_type='Organisation')
    obj.manage_permission(perm, ['Owner'], 0)
    logout()
2061
    self.login('little_owner')
2062 2063 2064 2065 2066 2067
    sub_obj = obj.newContent(portal_type='Address')
    sub_obj.manage_permission(perm, ['Owner'], 0)
    self.tic()
    result = sql_connection.manage_test(sql % sub_obj.getUid())
    self.assertSameSet(['little_owner'], [x.owner for x in result])

2068
    # Check that Owner is catalogued when he can view the
2069 2070 2071 2072 2073
    # object because permissions are acquired and when the portal type
    # acquires the local roles.
    sub_portal_type.acquire_local_roles = True
    self.portal.portal_caches.clearAllCache()
    logout()
2074
    self.login('super_owner')
2075 2076 2077
    obj = folder.newContent(portal_type='Organisation')
    obj.manage_permission(perm, ['Owner'], 0)
    logout()
2078
    self.login('little_owner')
2079 2080 2081 2082 2083 2084
    sub_obj = obj.newContent(portal_type='Address')
    sub_obj.manage_permission(perm, [], 1)
    self.tic()
    result = sql_connection.manage_test(sql % sub_obj.getUid())
    self.assertSameSet(['little_owner'], [x.owner for x in result])

Jérome Perrin's avatar
Jérome Perrin committed
2085
  def test_ExactMatchSearch(self):
2086 2087 2088 2089 2090 2091
    # test exact match search with queries
    doc = self._makeOrganisation(title='Foo%')
    other_doc = self._makeOrganisation(title='FooBar')
    ctool = self.getCatalogTool()

    # by default, % in catalog search is a wildcard:
2092
    self.assertSameSet([doc, other_doc], [x.getObject() for x in
2093
        ctool(portal_type='Organisation', title='Foo%')])
2094
    # ... but you can force searches with an exact match key
2095
    self.assertEqual([doc], [x.getObject() for x in
2096 2097
       ctool(portal_type='Organisation', title=dict(query='Foo%',
                                                    key='ExactMatch'))])
2098

Jérome Perrin's avatar
Jérome Perrin committed
2099
  def test_KeywordSearch(self):
2100 2101 2102 2103 2104 2105 2106
    # test keyword search with queries
    doc = self._makeOrganisation(description='Foo')
    other_doc = self._makeOrganisation(description='Foobar')
    ctool = self.getCatalogTool()

    # description is not a keyword by default. (This might change in the
    # future, in this case, this test have to be updated)
2107
    self.assertSameSet([doc], [x.getObject() for x in
2108
        ctool(portal_type='Organisation', description='Foo')])
2109
    self.assertEqual({doc, other_doc}, {x.getObject() for x in
2110
      ctool(portal_type='Organisation', description=dict(query='Foo',
2111
                                                         key='Keyword'))})
2112 2113


Jérome Perrin's avatar
Jérome Perrin committed
2114
  def test_ignore_empty_string(self):
2115
    # ERP5Catalog ignore empty strings by default
2116 2117
    doc_with_description = self._makeOrganisation(description='X')
    doc_with_empty_description = self._makeOrganisation(description='')
2118 2119 2120
    ctool = self.getCatalogTool()
    def searchResults(**kw):
      kw['portal_type'] = 'Organisation'
2121
      return {x.getObject() for x in ctool.searchResults(**kw)}
2122

2123
    # description='' is ignored
2124
    self.assertEqual({doc_with_empty_description, doc_with_description},
2125
                      searchResults(description=''))
2126
    # unless we exlicitly say we don't want to ignore empty strings
2127
    self.assertEqual({doc_with_empty_description},
2128
                      searchResults(ignore_empty_string=0, description=''))
2129 2130 2131

  def test_ignore_empty_string_related_key(self):
    # ERP5Catalog ignore empty strings by default, also on related keys
2132 2133 2134 2135 2136
    region_with_empty_description = self.portal.portal_categories.region.newContent(
                                        portal_type='Category', description='')
    doc_with_empty_region_description = self._makeOrganisation(
                            region_value=region_with_empty_description)
    doc_without_region = self._makeOrganisation()
2137 2138 2139
    ctool = self.getCatalogTool()
    def searchResults(**kw):
      kw['portal_type'] = 'Organisation'
2140
      return {x.getObject() for x in ctool.searchResults(**kw)}
2141

2142
    self.assertEqual({doc_with_empty_region_description, doc_without_region},
2143
                      searchResults(region_description=''))
2144
    self.assertEqual({doc_with_empty_region_description},
2145
        searchResults(ignore_empty_string=0, region_description=''))
2146

Jérome Perrin's avatar
Jérome Perrin committed
2147
  def test_complex_query(self):
Yusei Tahara's avatar
Yusei Tahara committed
2148 2149 2150 2151 2152 2153 2154 2155 2156 2157 2158 2159 2160 2161 2162 2163 2164 2165 2166 2167 2168 2169 2170 2171 2172 2173 2174 2175 2176 2177 2178 2179 2180 2181 2182 2183 2184 2185 2186 2187 2188 2189 2190 2191 2192 2193 2194 2195
    # Make sure that complex query works on real environment.
    catalog = self.getCatalogTool()
    person_module = self.getPersonModule()

    # Add categories
    portal_category = self.getCategoryTool()
    africa = portal_category.region.newContent(id='africa')
    asia = portal_category.region.newContent(id='asia')
    europe = portal_category.region.newContent(id='europe')

    # A from Africa
    person_module.newContent(id='A', first_name='A', last_name='ERP5',
                             region='africa')

    # B from Asia
    person_module.newContent(id='B', first_name='B', last_name='ZOPE',
                             region='asia')

    # C from Europe
    person_module.newContent(id='C', first_name='C', last_name='PYTHON',
                             region='europe')

    # D from ????
    person_module.newContent(id='D', first_name='D', last_name='ERP5')

    self.tic()

    # simple query
    query = Query(portal_type='Person')
    self.assertEqual(len(catalog(query=query)), 4)

    # complex query
    query = ComplexQuery(Query(portal_type='Person'),
                         Query(region_uid=asia.getUid()),
                         operator='AND')
    self.assertEqual(len(catalog(query=query)), 1)

    # complex query
    query = ComplexQuery(Query(portal_type='Person'),
                         Query(region_uid=(africa.getUid(), asia.getUid())),
                         operator='AND')
    self.assertEqual(len(catalog(query=query)), 2)

    # more complex query
    query_find_european = ComplexQuery(Query(portal_type='Person'),
                                       Query(region_uid=europe.getUid()),
                                       operator='AND')
    self.assertEqual(len(catalog(query=query_find_european)), 1)
2196

Yusei Tahara's avatar
Yusei Tahara committed
2197 2198 2199 2200 2201
    query_find_name_erp5 = ComplexQuery(Query(portal_type='Person'),
                                        Query(title='%ERP5'),
                                        operator='AND')
    self.assertEqual(len(catalog(query=query_find_name_erp5)), 2)

Yusuke Muraoka's avatar
Yusuke Muraoka committed
2202
    self.assertRaises(NotImplementedError, ComplexQuery, query_find_european, query_find_name_erp5, operator='OR')
2203

Jérome Perrin's avatar
Jérome Perrin committed
2204
  def test_check_security_table_content(self):
2205 2206 2207
    sql_connection = self.getSQLConnection()
    portal = self.getPortalObject()
    portal_types = portal.portal_types
2208 2209 2210 2211

    uf = self.getPortal().acl_users
    uf._doAddUser('foo', 'foo', ['Member', ], [])
    uf._doAddUser('ERP5TypeTestCase', 'ERP5TypeTestCase', ['Member', ], [])
2212
    self.commit()
2213 2214 2215 2216 2217
    portal_catalog = self.getCatalogTool()
    portal_catalog.manage_catalogClear()
    self.getPortal().ERP5Site_reindexAll()
    self.tic()

2218 2219 2220 2221 2222 2223 2224 2225 2226 2227
    # Person stuff
    person_module = portal.person_module
    person = 'Person'
    person_portal_type = portal_types._getOb(person)
    person_portal_type.acquire_local_roles = False
    # Organisation stuff
    organisation_module = portal.organisation_module
    organisation = 'Organisation'
    organisation_portal_type = portal_types._getOb(organisation)
    organisation_portal_type.acquire_local_roles = True
2228

2229
    self.portal.portal_caches.clearAllCache()
2230

2231 2232 2233 2234 2235 2236 2237 2238 2239 2240 2241 2242 2243 2244 2245 2246 2247 2248 2249 2250
    def newContent(container, portal_type, acquire_view_permission, view_role_list, local_role_dict):
      document = container.newContent(portal_type=portal_type)
      document.manage_permission('View', roles=view_role_list, acquire=acquire_view_permission)
      for user, role_list in local_role_dict.iteritems():
        document.manage_setLocalRoles(userid=user, roles=role_list)
      return document

    # Create documents for all combinations
    object_dict = {}

    def getObjectDictKey():
      """
        Get values from enclosing environment.
        Uggly, but makes calls less verbose.
      """
      return (portal_type, acquire_view_permission,
              tuple(view_role_list),
              tuple([(x, tuple(y))
                     for x, y in local_role_dict.iteritems()])
             )
2251

2252 2253 2254 2255 2256 2257 2258 2259 2260 2261 2262 2263 2264 2265 2266 2267 2268 2269 2270 2271 2272 2273 2274 2275
    for container, portal_type in ((person_module, person),
                                   (organisation_module, organisation)):
      for acquire_view_permission in (True, False):
        for view_role_list in ([],
                               ['Owner'],
                               ['Owner', 'Author'],
                               ['Author']):
          for local_role_dict in ({},
                                  {'foo': ['Owner']},
                                  {'foo': ['Author']},
                                  {'foo': ['Owner'],
                                   'bar': ['Author']},
                                  {'foo': ['Owner', 'Author'],
                                   'bar': ['Whatever']},
                                  {'foo': ['Owner', 'Author'],
                                   'bar': ['Whatever', 'Author']}):
            object_dict[getObjectDictKey()] = \
              newContent(container, portal_type, acquire_view_permission,
                         view_role_list, local_role_dict)
    self.tic()

    def query(sql):
      result = sql_connection.manage_test(sql)
      return result.dictionaries()
2276

2277 2278 2279 2280 2281
    # Check that there is no Owner role in security table
    # Note: this tests *all* lines from security table. Not just the ones
    # inserted in this test.
    result = query('SELECT * FROM roles_and_users WHERE allowedRolesAndUsers LIKE "%:Owner"')
    self.assertEqual(len(result), 0, repr(result))
2282

2283 2284 2285 2286 2287 2288 2289 2290 2291 2292 2293 2294 2295 2296 2297 2298 2299 2300 2301 2302 2303
    # Check that for each "user:<user>:<role>" line there is exactly one
    # "user:<user>" line with the same uid.
    # Also, check that for each "user:<user>" there is at least one
    # "user:<user>:<role>" line with same uid.
    # Also, check if "user:..." lines are well-formed.
    # Note: this tests *all* lines from security table. Not just the ones
    # inserted in this test.
    line_list = query('SELECT * FROM roles_and_users WHERE allowedRolesAndUsers LIKE "user:%"')
    for line in line_list:
      role_list = line['allowedRolesAndUsers'].split(':')
      uid = line['uid']
      if len(role_list) == 3:
        stripped_role = ':'.join(role_list[:-1])
        result = query('SELECT * FROM roles_and_users WHERE allowedRolesAndUsers = "%s" AND uid = %i' % (stripped_role, uid) )
        self.assertEqual(len(result), 1, repr(result))
      elif len(role_list) == 2:
        result = query('SELECT * FROM roles_and_users WHERE allowedRolesAndUsers LIKE "%s:%%" AND uid = %i' % (line['allowedRolesAndUsers'], uid) )
        self.assertNotEqual(len(result), 0, 'No line found for allowedRolesAndUsers=%r and uid=%i' % (line['allowedRolesAndUsers'], uid))
      else:
        raise Exception, 'Malformed allowedRolesAndUsers value: %r' % (line['allowedRolesAndUsers'], )

2304 2305
    # Check that object that 'bar' can view because of 'Author' role can *not*
    # be found when searching for his other 'Whatever' role.
2306 2307 2308 2309 2310 2311 2312 2313 2314
    local_role_dict = {'foo': ['Owner', 'Author'],
                       'bar': ['Whatever', 'Author']}
    for container, portal_type in ((person_module, person),
                                   (organisation_module, organisation)):
      for acquire_view_permission in (True, False):
        for view_role_list in (['Owner', 'Author'],
                               ['Author']):
          object = object_dict[getObjectDictKey()]
          result = query('SELECT roles_and_users.uid FROM roles_and_users, catalog WHERE roles_and_users.uid = catalog.security_uid AND catalog.uid = %i AND allowedRolesAndUsers = "user:bar:Whatever"' % (object.uid, ))
2315
          self.assertEqual(len(result), 0, '%r: len(%r) != 0' % (getObjectDictKey(), result))
2316 2317 2318 2319 2320 2321 2322 2323 2324 2325 2326 2327 2328

    # Check that no 'bar' role are in security table when 'foo' has local
    # roles allowing him to view an object but 'bar' can't.
    local_role_dict = {'foo': ['Owner', 'Author'],
                       'bar': ['Whatever']}
    for container, portal_type in ((person_module, person),
                                   (organisation_module, organisation)):
      for acquire_view_permission in (True, False):
        for view_role_list in (['Owner', 'Author'],
                               ['Author']):
          object = object_dict[getObjectDictKey()]
          result = query('SELECT roles_and_users.uid, roles_and_users.allowedRolesAndUsers FROM roles_and_users, catalog WHERE roles_and_users.uid = catalog.security_uid AND catalog.uid = %i AND roles_and_users.allowedRolesAndUsers LIKE "user:bar%%"' % (object.uid, ))
          self.assertEqual(len(result), 0, '%r: len(%r) != 0' % (getObjectDictKey(), result))
2329

Jérome Perrin's avatar
Jérome Perrin committed
2330
  def test_RealOwnerIndexing(self):
2331 2332 2333 2334 2335 2336 2337 2338 2339 2340 2341 2342 2343 2344 2345
    logout = self.logout
    user1 = 'local_foo'
    user2 = 'local_bar'
    uf = self.getPortal().acl_users
    uf._doAddUser(user1, user1, ['Member', ], [])
    uf._doAddUser(user2, user2, ['Member', ], [])

    perm = 'View'
    folder = self.getOrganisationModule()
    folder.manage_setLocalRoles(user1, ['Author', 'Auditor'])
    folder.manage_setLocalRoles(user2, ['Author', 'Auditor'])
    portal_type = 'Organisation'

    sql_connection = self.getSQLConnection()

2346
    self.login(user2)
2347 2348 2349 2350
    obj2 = folder.newContent(portal_type=portal_type)
    obj2.manage_setLocalRoles(user1, ['Auditor'])
    obj2.manage_permission(perm, ['Owner', 'Auditor'], 0)

2351
    self.login(user1)
2352 2353 2354 2355 2356 2357 2358 2359 2360 2361 2362 2363 2364 2365 2366 2367 2368 2369 2370 2371 2372 2373 2374 2375 2376 2377 2378 2379 2380 2381 2382 2383 2384 2385 2386 2387 2388 2389 2390 2391 2392 2393 2394 2395 2396 2397 2398 2399

    obj = folder.newContent(portal_type=portal_type)
    obj.manage_setLocalRoles(user1, ['Owner', 'Auditor'])

    # Check that nothing is returned when user can not view the object
    obj.manage_permission(perm, [], 0)
    obj.reindexObject()
    self.tic()
    result = obj.portal_catalog(portal_type=portal_type)
    self.assertSameSet([obj2, ], [x.getObject() for x in result])
    result = obj.portal_catalog(portal_type=portal_type, local_roles='Owner')
    self.assertSameSet([], [x.getObject() for x in result])
    result = obj.portal_catalog(portal_type=portal_type, local_roles='Auditor')
    self.assertSameSet([obj2, ], [x.getObject() for x in result])

    # Check that object is returned when he can view the object
    obj.manage_permission(perm, ['Auditor'], 0)
    obj.reindexObject()
    self.tic()
    result = obj.portal_catalog(portal_type=portal_type)
    self.assertSameSet([obj2, obj], [x.getObject() for x in result])
    result = obj.portal_catalog(portal_type=portal_type, local_roles='Owner')
    self.assertSameSet([], [x.getObject() for x in result])
    result = obj.portal_catalog(portal_type=portal_type, local_roles='Auditor')
    self.assertSameSet([obj2, obj], [x.getObject() for x in result])

    # Check that object is returned when he can view the object
    obj.manage_permission(perm, ['Owner'], 0)
    obj.reindexObject()
    self.tic()
    result = obj.portal_catalog(portal_type=portal_type)
    self.assertSameSet([obj2, obj], [x.getObject() for x in result])
    result = obj.portal_catalog(portal_type=portal_type, local_roles='Owner')
    self.assertSameSet([obj], [x.getObject() for x in result])
    result = obj.portal_catalog(portal_type=portal_type, local_roles='Auditor')
    self.assertSameSet([obj2, ], [x.getObject() for x in result])

    # Add a new table to the catalog
    sql_catalog = self.portal.portal_catalog.getSQLCatalog()

    local_roles_table = "test_local_roles"

    create_local_role_table_sql = """
CREATE TABLE `%s` (
  `uid` BIGINT UNSIGNED NOT NULL,
  `owner_reference` varchar(32) NOT NULL default '',
  PRIMARY KEY  (`uid`),
  KEY `version` (`owner_reference`)
2400
) ENGINE=InnoDB;
2401 2402 2403 2404 2405 2406 2407 2408 2409 2410 2411 2412 2413 2414 2415 2416 2417 2418 2419 2420 2421 2422 2423 2424
    """ % local_roles_table
    sql_catalog.manage_addProduct['ZSQLMethods'].manage_addZSQLMethod(
          id = 'z_create_%s' % local_roles_table,
          title = '',
          arguments = "",
          connection_id = 'erp5_sql_connection',
          template = create_local_role_table_sql)

    drop_local_role_table_sql = """
DROP TABLE IF EXISTS %s
    """ % local_roles_table
    sql_catalog.manage_addProduct['ZSQLMethods'].manage_addZSQLMethod(
          id = 'z0_drop_%s' % local_roles_table,
          title = '',
          arguments = "",
          connection_id = 'erp5_sql_connection',
          template = drop_local_role_table_sql)

    catalog_local_role_sql = """
REPLACE INTO
  %s
VALUES
<dtml-in prefix="loop" expr="_.range(_.len(uid))">
(
2425
  <dtml-sqlvar expr="uid[loop_item]" type="int">,
2426 2427 2428 2429 2430 2431 2432 2433 2434 2435 2436 2437 2438 2439 2440 2441
  <dtml-sqlvar expr="Base_getOwnerId[loop_item]" type="string" optional>
)
<dtml-if sequence-end>
<dtml-else>
,
</dtml-if>
</dtml-in>
    """ % local_roles_table
    sql_catalog.manage_addProduct['ZSQLMethods'].manage_addZSQLMethod(
          id = 'z_catalog_%s_list' % local_roles_table,
          title = '',
          connection_id = 'erp5_sql_connection',
          arguments = "\n".join(['uid',
                                 'Base_getOwnerId']),
          template = catalog_local_role_sql)

2442
    self.commit()
2443 2444 2445 2446 2447 2448 2449 2450 2451 2452 2453 2454 2455 2456 2457
    current_sql_catalog_object_list = sql_catalog.sql_catalog_object_list
    sql_catalog.sql_catalog_object_list = \
      current_sql_catalog_object_list + \
         ('z_catalog_%s_list' % local_roles_table,)
    current_sql_clear_catalog = sql_catalog.sql_clear_catalog
    sql_catalog.sql_clear_catalog = \
      current_sql_clear_catalog + \
         ('z0_drop_%s' % local_roles_table, 'z_create_%s' % local_roles_table)
    current_sql_catalog_local_role_keys = \
          sql_catalog.sql_catalog_local_role_keys
    sql_catalog.sql_catalog_local_role_keys = ('Owner | %s.owner_reference' % \
       local_roles_table,)
    current_sql_search_tables = sql_catalog.sql_search_tables
    sql_catalog.sql_search_tables = sql_catalog.sql_search_tables + \
        [local_roles_table]
2458
    self.commit()
2459 2460 2461 2462 2463

    try:
      # Clear catalog
      portal_catalog = self.getCatalogTool()
      portal_catalog.manage_catalogClear()
2464
      self.commit()
2465
      self.portal.portal_caches.clearAllCache()
2466
      self.commit()
2467 2468 2469 2470 2471 2472 2473 2474 2475 2476 2477 2478 2479 2480 2481 2482 2483 2484 2485 2486 2487 2488 2489 2490 2491 2492 2493 2494 2495 2496 2497 2498 2499 2500 2501 2502 2503 2504 2505 2506 2507 2508 2509
      obj2.reindexObject()

      # Check that nothing is returned when user can not view the object
      obj.manage_permission(perm, [], 0)
      obj.reindexObject()
      self.tic()
      result = obj.portal_catalog(portal_type=portal_type)
      self.assertSameSet([obj2, ], [x.getObject() for x in result])
      method = obj.portal_catalog
      result = obj.portal_catalog(portal_type=portal_type, local_roles='Owner')
      self.assertSameSet([], [x.getObject() for x in result])
      result = obj.portal_catalog(portal_type=portal_type, local_roles='Auditor')
      self.assertSameSet([obj2, ], [x.getObject() for x in result])

      # Check that object is returned when he can view the object
      obj.manage_permission(perm, ['Auditor'], 0)
      obj.reindexObject()
      self.tic()
      result = obj.portal_catalog(portal_type=portal_type)
      self.assertSameSet([obj2, obj], [x.getObject() for x in result])
      result = obj.portal_catalog(portal_type=portal_type, local_roles='Owner')
      self.assertSameSet([obj], [x.getObject() for x in result])
      result = obj.portal_catalog(portal_type=portal_type, local_roles='Auditor')
      self.assertSameSet([obj2, obj], [x.getObject() for x in result])

      # Check that object is returned when he can view the object
      obj.manage_permission(perm, ['Owner'], 0)
      obj.reindexObject()
      self.tic()
      result = obj.portal_catalog(portal_type=portal_type)
      self.assertSameSet([obj2, obj], [x.getObject() for x in result])
      result = obj.portal_catalog(portal_type=portal_type, local_roles='Owner')
      self.assertSameSet([obj], [x.getObject() for x in result])
      result = obj.portal_catalog(portal_type=portal_type, local_roles='Auditor')
      self.assertSameSet([obj2, ], [x.getObject() for x in result])
    finally:
      sql_catalog.sql_catalog_object_list = \
        current_sql_catalog_object_list
      sql_catalog.sql_clear_catalog = \
        current_sql_clear_catalog
      sql_catalog.sql_catalog_local_role_keys = \
          current_sql_catalog_local_role_keys
      sql_catalog.sql_search_tables = current_sql_search_tables
2510
      self.commit()
2511

Jérome Perrin's avatar
Jérome Perrin committed
2512
  def test_MonoValueAssigneeIndexing(self):
2513 2514 2515 2516 2517 2518 2519 2520 2521 2522 2523 2524 2525 2526 2527
    logout = self.logout
    user1 = 'local_foo'
    user2 = 'local_bar'
    uf = self.getPortal().acl_users
    uf._doAddUser(user1, user1, ['Member', ], [])
    uf._doAddUser(user2, user2, ['Member', ], [])

    perm = 'View'
    folder = self.getOrganisationModule()
    folder.manage_setLocalRoles(user1, ['Author', 'Auditor'])
    folder.manage_setLocalRoles(user2, ['Author', 'Auditor'])
    portal_type = 'Organisation'

    sql_connection = self.getSQLConnection()

2528
    self.login(user2)
2529 2530 2531 2532
    obj2 = folder.newContent(portal_type=portal_type)
    obj2.manage_setLocalRoles(user1, ['Auditor'])
    obj2.manage_permission(perm, ['Assignee', 'Auditor'], 0)

2533
    self.login(user1)
2534 2535 2536 2537 2538 2539 2540 2541 2542 2543 2544 2545 2546 2547 2548 2549 2550 2551 2552 2553 2554 2555 2556 2557 2558 2559 2560 2561 2562 2563 2564 2565 2566 2567 2568 2569 2570 2571 2572 2573 2574 2575 2576 2577 2578 2579 2580 2581 2582 2583

    obj = folder.newContent(portal_type=portal_type)
    obj.manage_setLocalRoles(user1, ['Assignee', 'Auditor'])

    # Check that nothing is returned when user can not view the object
    obj.manage_permission(perm, [], 0)
    obj.reindexObject()
    self.tic()
    result = obj.portal_catalog(portal_type=portal_type)
    self.assertSameSet([obj2, ], [x.getObject() for x in result])
    result = obj.portal_catalog(portal_type=portal_type, local_roles='Assignee')
    self.assertSameSet([], [x.getObject() for x in result])
    result = obj.portal_catalog(portal_type=portal_type, local_roles='Auditor')
    self.assertSameSet([obj2, ], [x.getObject() for x in result])

    # Check that object is returned when he can view the object
    obj.manage_permission(perm, ['Auditor'], 0)
    obj.reindexObject()
    self.tic()
    result = obj.portal_catalog(portal_type=portal_type)
    self.assertSameSet([obj2, obj], [x.getObject() for x in result])
    result = obj.portal_catalog(portal_type=portal_type, local_roles='Assignee')
    self.assertSameSet([], [x.getObject() for x in result])
    result = obj.portal_catalog(portal_type=portal_type, local_roles='Auditor')
    self.assertSameSet([obj2, obj], [x.getObject() for x in result])

    # Check that object is returned when he can view the object
    obj.manage_permission(perm, ['Assignee'], 0)
    obj.reindexObject()
    self.tic()
    result = obj.portal_catalog(portal_type=portal_type)
    self.assertSameSet([obj2, obj], [x.getObject() for x in result])
    result = obj.portal_catalog(portal_type=portal_type, local_roles='Assignee')
    self.assertSameSet([obj], [x.getObject() for x in result])
    result = obj.portal_catalog(portal_type=portal_type, local_roles='Auditor')
    self.assertSameSet([obj2, ], [x.getObject() for x in result])

    # Add a new table to the catalog
    sql_catalog = self.portal.portal_catalog.getSQLCatalog()

    local_roles_table = "test_assignee_local_roles"

    create_local_role_table_sql = """
CREATE TABLE `%s` (
  `uid` BIGINT UNSIGNED NOT NULL,
  `assignee_reference` varchar(32) NOT NULL default '',
  `viewable_assignee_reference` varchar(32) NOT NULL default '',
  PRIMARY KEY  (`uid`),
  KEY `assignee_reference` (`assignee_reference`),
  KEY `viewable_assignee_reference` (`viewable_assignee_reference`)
2584
) ENGINE=InnoDB;
2585 2586 2587 2588 2589 2590 2591 2592 2593 2594 2595 2596 2597 2598 2599 2600 2601 2602 2603 2604 2605 2606 2607 2608
    """ % local_roles_table
    sql_catalog.manage_addProduct['ZSQLMethods'].manage_addZSQLMethod(
          id = 'z_create_%s' % local_roles_table,
          title = '',
          arguments = "",
          connection_id = 'erp5_sql_connection',
          template = create_local_role_table_sql)

    drop_local_role_table_sql = """
DROP TABLE IF EXISTS %s
    """ % local_roles_table
    sql_catalog.manage_addProduct['ZSQLMethods'].manage_addZSQLMethod(
          id = 'z0_drop_%s' % local_roles_table,
          title = '',
          arguments = "",
          connection_id = 'erp5_sql_connection',
          template = drop_local_role_table_sql)

    catalog_local_role_sql = """
REPLACE INTO
  %s
VALUES
<dtml-in prefix="loop" expr="_.range(_.len(uid))">
(
2609
  <dtml-sqlvar expr="uid[loop_item]" type="int">,
2610 2611 2612 2613 2614 2615 2616 2617 2618 2619 2620 2621 2622 2623 2624 2625 2626 2627
  <dtml-sqlvar expr="getAssignee[loop_item] or ''" type="string" optional>,
  <dtml-sqlvar expr="getViewPermissionAssignee[loop_item] or ''" type="string" optional>
)
<dtml-if sequence-end>
<dtml-else>
,
</dtml-if>
</dtml-in>
    """ % local_roles_table
    sql_catalog.manage_addProduct['ZSQLMethods'].manage_addZSQLMethod(
          id = 'z_catalog_%s_list' % local_roles_table,
          title = '',
          connection_id = 'erp5_sql_connection',
          arguments = "\n".join(['uid',
                                 'getAssignee',
                                 'getViewPermissionAssignee']),
          template = catalog_local_role_sql)

2628
    self.commit()
2629 2630 2631 2632 2633 2634 2635 2636 2637 2638 2639 2640 2641 2642 2643 2644 2645 2646 2647 2648 2649 2650
    current_sql_catalog_object_list = sql_catalog.sql_catalog_object_list
    sql_catalog.sql_catalog_object_list = \
      current_sql_catalog_object_list + \
         ('z_catalog_%s_list' % local_roles_table,)
    current_sql_clear_catalog = sql_catalog.sql_clear_catalog
    sql_catalog.sql_clear_catalog = \
      current_sql_clear_catalog + \
         ('z0_drop_%s' % local_roles_table, 'z_create_%s' % local_roles_table)
    current_sql_catalog_local_role_keys = \
          sql_catalog.sql_catalog_local_role_keys
    sql_catalog.sql_catalog_local_role_keys = ('Assignee | %s.assignee_reference' % \
       local_roles_table,)

    current_sql_catalog_role_keys = \
          sql_catalog.sql_catalog_role_keys
    sql_catalog.sql_catalog_role_keys = (
        'Assignee | %s.viewable_assignee_reference' % \
       local_roles_table,)

    current_sql_search_tables = sql_catalog.sql_search_tables
    sql_catalog.sql_search_tables = sql_catalog.sql_search_tables + \
        [local_roles_table]
2651
    self.commit()
2652 2653 2654 2655 2656

    try:
      # Clear catalog
      portal_catalog = self.getCatalogTool()
      portal_catalog.manage_catalogClear()
2657
      self.commit()
2658
      self.portal.portal_caches.clearAllCache()
2659
      self.commit()
2660 2661 2662 2663 2664 2665 2666 2667 2668 2669 2670 2671 2672 2673 2674 2675 2676 2677 2678 2679 2680 2681 2682 2683 2684 2685 2686 2687 2688 2689 2690 2691 2692 2693 2694 2695 2696 2697 2698 2699 2700 2701 2702 2703 2704
      obj2.reindexObject()

      # Check that nothing is returned when user can not view the object
      obj.manage_permission(perm, [], 0)
      obj.reindexObject()
      self.tic()
      result = obj.portal_catalog(portal_type=portal_type)
      self.assertSameSet([obj2, ], [x.getObject() for x in result])
      method = obj.portal_catalog
      result = obj.portal_catalog(portal_type=portal_type, local_roles='Assignee')
      self.assertSameSet([], [x.getObject() for x in result])
      result = obj.portal_catalog(portal_type=portal_type, local_roles='Auditor')
      self.assertSameSet([obj2, ], [x.getObject() for x in result])

      # Check that object is returned when he can view the object
      obj.manage_permission(perm, ['Auditor'], 0)
      obj.reindexObject()
      self.tic()
      result = obj.portal_catalog(portal_type=portal_type)
      self.assertSameSet([obj2, obj], [x.getObject() for x in result])
      result = obj.portal_catalog(portal_type=portal_type, local_roles='Assignee')
      self.assertSameSet([obj], [x.getObject() for x in result])
      result = obj.portal_catalog(portal_type=portal_type, local_roles='Auditor')
      self.assertSameSet([obj2, obj], [x.getObject() for x in result])

      # Check that object is returned when he can view the object
      obj.manage_permission(perm, ['Assignee'], 0)
      obj.reindexObject()
      self.tic()
      result = obj.portal_catalog(portal_type=portal_type)
      self.assertSameSet([obj2, obj], [x.getObject() for x in result])
      result = obj.portal_catalog(portal_type=portal_type, local_roles='Assignee')
      self.assertSameSet([obj], [x.getObject() for x in result])
      result = obj.portal_catalog(portal_type=portal_type, local_roles='Auditor')
      self.assertSameSet([obj2, ], [x.getObject() for x in result])
    finally:
      sql_catalog.sql_catalog_object_list = \
        current_sql_catalog_object_list
      sql_catalog.sql_clear_catalog = \
        current_sql_clear_catalog
      sql_catalog.sql_catalog_local_role_keys = \
          current_sql_catalog_local_role_keys
      sql_catalog.sql_catalog_role_keys = \
          current_sql_catalog_role_keys
      sql_catalog.sql_search_tables = current_sql_search_tables
2705
      self.commit()
2706

Jérome Perrin's avatar
Jérome Perrin committed
2707
  def test_UserOrGroupRoleIndexing(self):
2708 2709 2710 2711 2712 2713 2714 2715 2716 2717 2718 2719 2720 2721 2722 2723 2724 2725 2726
    logout = self.logout
    user1 = 'a_great_user_name'
    user1_group = 'a_great_user_group'
    uf = self.getPortal().acl_users
    uf._doAddUser(user1, user1, ['Member', ], [])
    uf.zodb_groups.addGroup( user1_group, user1_group, user1_group)
    new = uf.zodb_groups.addPrincipalToGroup( user1, user1_group )

    perm = 'View'
    folder = self.getOrganisationModule()
    folder.manage_setLocalRoles(user1, ['Author', 'Auditor'])
    portal_type = 'Organisation'
    organisation = folder.newContent(portal_type=portal_type)

    sql_connection = self.getSQLConnection()
    def query(sql):
      result = sql_connection.manage_test(sql)
      return result.dictionaries()

2727
    self.login(user1)
2728 2729 2730 2731 2732 2733 2734 2735 2736 2737 2738 2739 2740 2741

    # Add a new table to the catalog
    sql_catalog = self.portal.portal_catalog.getSQLCatalog()

    local_roles_table = "test_user_or_group_local_roles"

    create_local_role_table_sql = """
CREATE TABLE `%s` (
  `uid` BIGINT UNSIGNED NOT NULL,
  `assignee_reference` varchar(32) NOT NULL default '',
  `viewable_assignee_reference` varchar(32) NOT NULL default '',
  PRIMARY KEY  (`uid`),
  KEY `assignee_reference` (`assignee_reference`),
  KEY `viewable_assignee_reference` (`viewable_assignee_reference`)
2742
) ENGINE=InnoDB;
2743 2744 2745 2746 2747 2748 2749 2750 2751 2752 2753 2754 2755 2756 2757 2758 2759 2760 2761 2762 2763 2764 2765 2766
    """ % local_roles_table
    sql_catalog.manage_addProduct['ZSQLMethods'].manage_addZSQLMethod(
          id = 'z_create_%s' % local_roles_table,
          title = '',
          arguments = "",
          connection_id = 'erp5_sql_connection',
          template = create_local_role_table_sql)

    drop_local_role_table_sql = """
DROP TABLE IF EXISTS %s
    """ % local_roles_table
    sql_catalog.manage_addProduct['ZSQLMethods'].manage_addZSQLMethod(
          id = 'z0_drop_%s' % local_roles_table,
          title = '',
          arguments = "",
          connection_id = 'erp5_sql_connection',
          template = drop_local_role_table_sql)

    catalog_local_role_sql = """
REPLACE INTO
  %s
VALUES
<dtml-in prefix="loop" expr="_.range(_.len(uid))">
(
2767
  <dtml-sqlvar expr="uid[loop_item]" type="int">,
2768 2769 2770 2771 2772 2773 2774 2775 2776 2777 2778 2779 2780 2781 2782 2783 2784 2785
  <dtml-sqlvar expr="getAssignee[loop_item] or ''" type="string" optional>,
  <dtml-sqlvar expr="getViewPermissionAssignee[loop_item] or ''" type="string" optional>
)
<dtml-if sequence-end>
<dtml-else>
,
</dtml-if>
</dtml-in>
    """ % local_roles_table
    sql_catalog.manage_addProduct['ZSQLMethods'].manage_addZSQLMethod(
          id = 'z_catalog_%s_list' % local_roles_table,
          title = '',
          connection_id = 'erp5_sql_connection',
          arguments = "\n".join(['uid',
                                 'getAssignee',
                                 'getViewPermissionAssignee']),
          template = catalog_local_role_sql)

2786
    self.commit()
2787 2788 2789 2790 2791 2792 2793 2794 2795 2796 2797 2798 2799 2800 2801 2802 2803 2804 2805 2806 2807 2808 2809 2810 2811 2812
    current_sql_catalog_object_list = sql_catalog.sql_catalog_object_list
    sql_catalog.sql_catalog_object_list = \
      current_sql_catalog_object_list + \
         ('z_catalog_%s_list' % local_roles_table,)
    current_sql_clear_catalog = sql_catalog.sql_clear_catalog
    sql_catalog.sql_clear_catalog = \
      current_sql_clear_catalog + \
         ('z0_drop_%s' % local_roles_table, 'z_create_%s' % local_roles_table)
    current_sql_catalog_local_role_keys = \
          sql_catalog.sql_catalog_local_role_keys
    sql_catalog.sql_catalog_local_role_keys = (
        'Owner | viewable_owner',
        'Assignee | %s.assignee_reference' % \
       local_roles_table,)

    current_sql_catalog_role_keys = \
          sql_catalog.sql_catalog_role_keys
    sql_catalog.sql_catalog_role_keys = (
        'Assignee | %s.viewable_assignee_reference' % \
       local_roles_table,)

    current_sql_search_tables = sql_catalog.sql_search_tables
    sql_catalog.sql_search_tables = sql_catalog.sql_search_tables + \
        [local_roles_table]

    portal = self.getPortal()
2813
    self.commit()
2814 2815 2816 2817 2818

    try:
      # Clear catalog
      portal_catalog = self.getCatalogTool()
      portal_catalog.manage_catalogClear()
2819
      self.commit()
2820
      self.portal.portal_caches.clearAllCache()
2821
      self.commit()
2822 2823 2824 2825 2826 2827 2828 2829 2830 2831 2832 2833 2834 2835 2836 2837 2838 2839 2840 2841 2842 2843 2844 2845 2846 2847 2848 2849 2850 2851 2852 2853

      organisation_relative_url = organisation.getRelativeUrl()
      countResults = organisation.portal_catalog.countResults
      count_result_kw = {'relative_url': organisation_relative_url}

      use_case_number = 0
      for view_permission_role_list, security_group_list, \
          global_view, associate_view, assignee_view, both_view in \
          [
              # No view permission
              ([], [], 0, 0, 0, 0),
              ([], [(user1, ['Associate'])], 0, 0, 0, 0),
              ([], [(user1, ['Assignee'])], 0, 0, 0, 0),
              ([], [(user1, ['Assignee', 'Associate'])], 0, 0, 0, 0),
              ([], [(user1_group, ['Assignee'])], 0, 0, 0, 0),
              ([], [(user1_group, ['Assignee', 'Associate'])], 0, 0, 0, 0),
              ([], [(user1, ['Assignee']),
                    (user1_group, ['Assignee'])], 0, 0, 0, 0),
              ([], [(user1, ['Assignee']),
                    (user1_group, ['Assignee', 'Associate'])], 0, 0, 0, 0),

              # View permission for Assignee
              (['Assignee'], [], 0, 0, 0, 0),
              (['Assignee'], [(user1, ['Associate'])], 0, 0, 0, 0),
              (['Assignee'], [(user1, ['Assignee'])], 1, 0, 1, 1),
              (['Assignee'], [(user1, ['Assignee', 'Associate'])], 1, 0, 1, 1),
              (['Assignee'], [(user1_group, ['Assignee'])], 1, 0, 0, 0),
              (['Assignee'], [(user1_group, ['Assignee', 'Associate'])],
                              1, 0, 0, 0),
              (['Assignee'], [(user1, ['Assignee']),
                              (user1_group, ['Assignee'])], 1, 0, 1, 1),
              (['Assignee'], [(user1, ['Assignee']),
2854
                              (user1_group, ['Assignee', 'Associate'])],
2855 2856 2857 2858 2859 2860 2861 2862
                               1, 0, 1, 1),

              # View permission for Associate
              (['Associate'], [], 0, 0, 0, 0),
              (['Associate'], [(user1, ['Associate'])], 1, 1, 0, 0),
              (['Associate'], [(user1, ['Assignee'])], 0, 0, 0, 0),
              (['Associate'], [(user1, ['Assignee', 'Associate'])], 1, 1, 1, 1),
              (['Associate'], [(user1_group, ['Assignee'])], 0, 0, 0, 0),
2863
              (['Associate'], [(user1_group, ['Assignee', 'Associate'])],
2864 2865 2866 2867
                               1, 1, 0, 0),
              (['Associate'], [(user1, ['Assignee']),
                              (user1_group, ['Assignee'])], 0, 0, 0, 0),
              (['Associate'], [(user1, ['Assignee']),
2868
                              (user1_group, ['Assignee', 'Associate'])],
2869 2870 2871 2872 2873 2874
                               1, 1, 1, 1),

              # View permission for Associate and Assignee
              (['Associate', 'Assignee'], [], 0, 0, 0, 0),
              (['Associate', 'Assignee'], [(user1, ['Associate'])], 1, 1, 0, 0),
              (['Associate', 'Assignee'], [(user1, ['Assignee'])], 1, 0, 1, 1),
2875
              (['Associate', 'Assignee'],
2876
                     [(user1, ['Assignee', 'Associate'])], 1, 1, 1, 1),
2877
              (['Associate', 'Assignee'],
2878
                     [(user1_group, ['Assignee'])], 1, 0, 0, 0),
2879
              (['Associate', 'Assignee'],
2880 2881 2882 2883
                     [(user1_group, ['Assignee', 'Associate'])], 1, 1, 0, 0),
              (['Associate', 'Assignee'], [(user1, ['Assignee']),
                              (user1_group, ['Assignee'])], 1, 0, 1, 1),
              (['Associate', 'Assignee'], [(user1, ['Assignee']),
2884
                              (user1_group, ['Assignee', 'Associate'])],
2885 2886 2887 2888 2889 2890 2891 2892 2893 2894 2895 2896 2897 2898 2899 2900 2901 2902 2903 2904 2905 2906 2907 2908 2909 2910 2911 2912 2913 2914 2915 2916 2917 2918 2919 2920 2921 2922 2923 2924 2925 2926 2927 2928 2929 2930 2931 2932 2933 2934 2935 2936 2937 2938
                               1, 1, 1, 1),
              ]:

        use_case_number += 1
        organisation.manage_permission(perm, view_permission_role_list, 0)
        organisation.manage_delLocalRoles([user1, user1_group])
        for security_group, local_role_list in security_group_list:
          organisation.manage_setLocalRoles(security_group, local_role_list)
        organisation.reindexObject()
        self.tic()

        for expected_result, local_roles in \
            [
                (global_view, None),
                (associate_view, 'Associate'),
                (assignee_view, 'Assignee'),
                (both_view, ['Associate', 'Assignee']),
                ]:

          object_security_uid = query(
            'SELECT security_uid FROM catalog WHERE relative_url="%s"' % \
            organisation_relative_url
              )[0]['security_uid']

          if object_security_uid is not None:
            roles_and_users = query(
              'SELECT allowedRolesAndUsers FROM roles_and_users WHERE uid="%s"' % \
              object_security_uid
                )
          else:
            roles_and_users = ''

          monovalue_references = query(
              'SELECT * FROM test_user_or_group_local_roles WHERE uid="%s"' % \
                  organisation.getUid())[0]
          assignee_reference = monovalue_references['assignee_reference']
          viewable_assignee_reference = \
            monovalue_references['viewable_assignee_reference']

          result = countResults(local_roles=local_roles, **count_result_kw)[0][0]
          if result != expected_result:
            countResults(local_roles=local_roles, src__=1,
                         **count_result_kw)
            self.fail('Use case %s\n\tView permission is given to: %s\n\t' \
                      'Local roles are: %s\n\t' \
                      'local_roles parameter is: %s\n\t' \
                      'Object IS %s returned by portal_catalog!\n\t' \
                      '\n\tSecurity uid is: %s\n\t'
                      'Roles and users:  %s\n\t'
                      'assignee_reference:  %s\n\t'
                      'viewable_assignee_reference:  %s\n\t'
                      '\n\tSQL generated: \n\n%s' \
                      '' % \
                      (use_case_number,
2939
                       view_permission_role_list,
2940 2941 2942 2943 2944 2945 2946 2947 2948 2949 2950 2951 2952 2953 2954 2955 2956 2957 2958
                       organisation.__ac_local_roles__,
                       local_roles, ['NOT', ''][result],
                       object_security_uid,
                       str([x['allowedRolesAndUsers'] for x in roles_and_users]),
                       assignee_reference,
                       viewable_assignee_reference,
                       countResults(local_roles=local_roles, src__=1,
                                    **count_result_kw)))

    finally:
      sql_catalog.sql_catalog_object_list = \
        current_sql_catalog_object_list
      sql_catalog.sql_clear_catalog = \
        current_sql_clear_catalog
      sql_catalog.sql_catalog_local_role_keys = \
          current_sql_catalog_local_role_keys
      sql_catalog.sql_catalog_role_keys = \
          current_sql_catalog_role_keys
      sql_catalog.sql_search_tables = current_sql_search_tables
2959
      self.commit()
2960

Jérome Perrin's avatar
Jérome Perrin committed
2961
  def test_UserOrGroupLocalRoleIndexing(self):
2962 2963 2964 2965 2966 2967 2968 2969 2970 2971 2972 2973 2974 2975 2976 2977 2978 2979 2980
    logout = self.logout
    user1 = 'another_great_user_name'
    user1_group = 'another_great_user_group'
    uf = self.getPortal().acl_users
    uf._doAddUser(user1, user1, ['Member', ], [])
    uf.zodb_groups.addGroup( user1_group, user1_group, user1_group)
    new = uf.zodb_groups.addPrincipalToGroup( user1, user1_group )

    perm = 'View'
    folder = self.getOrganisationModule()
    folder.manage_setLocalRoles(user1, ['Author', 'Auditor'])
    portal_type = 'Organisation'
    organisation = folder.newContent(portal_type=portal_type)

    sql_connection = self.getSQLConnection()
    def query(sql):
      result = sql_connection.manage_test(sql)
      return result.dictionaries()

2981
    self.login(user1)
2982 2983 2984 2985 2986 2987 2988 2989 2990 2991 2992 2993

    # Add a new table to the catalog
    sql_catalog = self.portal.portal_catalog.getSQLCatalog()

    local_roles_table = "another_test_user_or_group_local_roles"

    create_local_role_table_sql = """
CREATE TABLE `%s` (
  `uid` BIGINT UNSIGNED NOT NULL,
  `viewable_assignee_reference` varchar(32) NOT NULL default '',
  PRIMARY KEY  (`uid`),
  KEY `viewable_assignee_reference` (`viewable_assignee_reference`)
2994
) ENGINE=InnoDB;
2995 2996 2997 2998 2999 3000 3001 3002 3003 3004 3005 3006 3007 3008 3009 3010 3011 3012 3013 3014 3015 3016 3017 3018
    """ % local_roles_table
    sql_catalog.manage_addProduct['ZSQLMethods'].manage_addZSQLMethod(
          id = 'z_create_%s' % local_roles_table,
          title = '',
          arguments = "",
          connection_id = 'erp5_sql_connection',
          template = create_local_role_table_sql)

    drop_local_role_table_sql = """
DROP TABLE IF EXISTS %s
    """ % local_roles_table
    sql_catalog.manage_addProduct['ZSQLMethods'].manage_addZSQLMethod(
          id = 'z0_drop_%s' % local_roles_table,
          title = '',
          arguments = "",
          connection_id = 'erp5_sql_connection',
          template = drop_local_role_table_sql)

    catalog_local_role_sql = """
REPLACE INTO
  %s
VALUES
<dtml-in prefix="loop" expr="_.range(_.len(uid))">
(
3019
  <dtml-sqlvar expr="uid[loop_item]" type="int">,
3020 3021 3022 3023 3024 3025 3026 3027 3028 3029 3030 3031 3032 3033 3034 3035
  <dtml-sqlvar expr="getViewPermissionAssignee[loop_item] or ''" type="string" optional>
)
<dtml-if sequence-end>
<dtml-else>
,
</dtml-if>
</dtml-in>
    """ % local_roles_table
    sql_catalog.manage_addProduct['ZSQLMethods'].manage_addZSQLMethod(
          id = 'z_catalog_%s_list' % local_roles_table,
          title = '',
          connection_id = 'erp5_sql_connection',
          arguments = "\n".join(['uid',
                                 'getViewPermissionAssignee']),
          template = catalog_local_role_sql)

3036
    self.commit()
3037 3038 3039 3040 3041 3042 3043 3044 3045 3046 3047 3048 3049 3050 3051 3052 3053 3054 3055 3056 3057
    current_sql_catalog_object_list = sql_catalog.sql_catalog_object_list
    sql_catalog.sql_catalog_object_list = \
      current_sql_catalog_object_list + \
         ('z_catalog_%s_list' % local_roles_table,)
    current_sql_clear_catalog = sql_catalog.sql_clear_catalog
    sql_catalog.sql_clear_catalog = \
      current_sql_clear_catalog + \
         ('z0_drop_%s' % local_roles_table, 'z_create_%s' % local_roles_table)

    current_sql_catalog_role_keys = \
          sql_catalog.sql_catalog_role_keys
    sql_catalog.sql_catalog_role_keys = (
        'Owner | viewable_owner',
        'Assignee | %s.viewable_assignee_reference' % \
       local_roles_table,)

    current_sql_search_tables = sql_catalog.sql_search_tables
    sql_catalog.sql_search_tables = sql_catalog.sql_search_tables + \
        [local_roles_table]

    portal = self.getPortal()
3058
    self.commit()
3059 3060 3061 3062 3063

    try:
      # Clear catalog
      portal_catalog = self.getCatalogTool()
      portal_catalog.manage_catalogClear()
3064
      self.commit()
3065
      self.portal.portal_caches.clearAllCache()
3066
      self.commit()
3067 3068 3069 3070 3071 3072 3073 3074 3075 3076 3077 3078 3079 3080 3081 3082 3083 3084 3085 3086 3087 3088 3089 3090 3091 3092 3093 3094 3095 3096 3097 3098 3099 3100 3101 3102 3103 3104 3105 3106 3107 3108 3109 3110 3111 3112 3113 3114 3115

      organisation_relative_url = organisation.getRelativeUrl()
      countResults = organisation.portal_catalog.countResults
      count_result_kw = {'relative_url': organisation_relative_url}

      use_case_number = 0
      for view_permission_role_list, security_group_list, \
          associate_view, assignee_view in \
          [
              # No view permission
              ([], [], 0, 0),
              ([], [(user1, ['Associate'])], 0, 0),
              ([], [(user1, ['Assignee'])], 0, 0),
              ([], [(user1, ['Assignee', 'Associate'])], 0, 0),
              ([], [(user1_group, ['Assignee'])], 0, 0),
              ([], [(user1_group, ['Assignee', 'Associate'])], 0, 0),
              ([], [(user1, ['Assignee']),
                    (user1_group, ['Assignee'])], 0, 0),
              ([], [(user1, ['Assignee']),
                    (user1_group, ['Assignee', 'Associate'])], 0, 0),

              # View permission for Assignee
              (['Assignee'], [], 0, 0),
              (['Assignee'], [(user1, ['Associate'])], 0, 0),
              (['Assignee'], [(user1, ['Assignee'])], 0, 1),
              (['Assignee'], [(user1, ['Assignee', 'Associate'])], 0, 1),
              (['Assignee'], [(user1_group, ['Assignee'])], 0, 1),
              (['Assignee'], [(user1_group, ['Assignee', 'Associate'])], 0, 1),
              (['Assignee'], [(user1, ['Assignee']),
                              (user1_group, ['Assignee'])], 0, 1),
              (['Assignee'], [(user1, ['Assignee']),
                              (user1_group, ['Assignee', 'Associate'])], 0, 1),

              # View permission for Associate
              (['Associate'], [], 0, 0),
              (['Associate'], [(user1, ['Associate'])], 1, 0),
              (['Associate'], [(user1, ['Assignee'])], 0, 0),
              (['Associate'], [(user1, ['Assignee', 'Associate'])], 1, 0),
              (['Associate'], [(user1_group, ['Assignee'])], 0, 0),
              (['Associate'], [(user1_group, ['Assignee', 'Associate'])], 1, 0),
              (['Associate'], [(user1, ['Assignee']),
                              (user1_group, ['Assignee'])], 0, 0),
              (['Associate'], [(user1, ['Assignee']),
                              (user1_group, ['Assignee', 'Associate'])], 1, 0),

              # View permission for Associate and Assignee
              (['Associate', 'Assignee'], [], 0, 0),
              (['Associate', 'Assignee'], [(user1, ['Associate'])], 1, 0),
              (['Associate', 'Assignee'], [(user1, ['Assignee'])], 0, 1),
3116
              (['Associate', 'Assignee'],
3117
                     [(user1, ['Assignee', 'Associate'])], 1, 1),
3118
              (['Associate', 'Assignee'],
3119
                     [(user1_group, ['Assignee'])], 0, 1),
3120
              (['Associate', 'Assignee'],
3121 3122 3123 3124 3125 3126 3127 3128 3129 3130 3131 3132 3133 3134 3135 3136 3137 3138 3139 3140 3141 3142 3143 3144 3145 3146 3147 3148 3149 3150 3151 3152 3153 3154 3155 3156 3157 3158 3159 3160 3161 3162 3163 3164 3165 3166 3167 3168 3169 3170 3171 3172 3173 3174 3175 3176
                     [(user1_group, ['Assignee', 'Associate'])], 1, 1),
              (['Associate', 'Assignee'], [(user1, ['Assignee']),
                              (user1_group, ['Assignee'])], 0, 1),
              (['Associate', 'Assignee'], [(user1, ['Assignee']),
                              (user1_group, ['Assignee', 'Associate'])], 1, 1),
              ]:

        use_case_number += 1
        organisation.manage_permission(perm, view_permission_role_list, 0)
        organisation.manage_delLocalRoles([user1, user1_group])
        for security_group, local_role_list in security_group_list:
          organisation.manage_setLocalRoles(security_group, local_role_list)
        organisation.reindexObject()
        self.tic()

        for expected_result, local_roles in \
            [
                (associate_view or assignee_view, None),
                (associate_view, 'Associate'),
                (assignee_view, 'Assignee'),
                (associate_view or assignee_view, ['Associate', 'Assignee']),
                ]:

          object_security_uid = query(
            'SELECT security_uid FROM catalog WHERE relative_url="%s"' % \
            organisation_relative_url
              )[0]['security_uid']

          if object_security_uid is not None:
            roles_and_users = query(
              'SELECT allowedRolesAndUsers FROM roles_and_users WHERE uid="%s"' % \
              object_security_uid
                )
          else:
            roles_and_users = ''

          monovalue_references = query(
              'SELECT * FROM %s WHERE uid="%s"' % \
                 (local_roles_table, organisation.getUid()))[0]
          viewable_assignee_reference = \
            monovalue_references['viewable_assignee_reference']

          result = countResults(local_roles=local_roles, **count_result_kw)[0][0]
          if result != expected_result:
            countResults(local_roles=local_roles, src__=1,
                         **count_result_kw)
            self.fail('Use case %s\n\tView permission is given to: %s\n\t' \
                      'Local roles are: %s\n\t' \
                      'local_roles parameter is: %s\n\t' \
                      'Object IS %s returned by portal_catalog!\n\t' \
                      '\n\tSecurity uid is: %s\n\t'
                      'Roles and users:  %s\n\t'
                      'viewable_assignee_reference:  %s\n\t'
                      '\n\tSQL generated: \n\n%s' \
                      '' % \
                      (use_case_number,
3177
                       view_permission_role_list,
3178 3179 3180 3181 3182 3183 3184 3185 3186 3187 3188 3189 3190 3191 3192 3193
                       organisation.__ac_local_roles__,
                       local_roles, ['NOT', ''][result],
                       object_security_uid,
                       str([x['allowedRolesAndUsers'] for x in roles_and_users]),
                       viewable_assignee_reference,
                       countResults(local_roles=local_roles, src__=1,
                                    **count_result_kw)))

    finally:
      sql_catalog.sql_catalog_object_list = \
        current_sql_catalog_object_list
      sql_catalog.sql_clear_catalog = \
        current_sql_clear_catalog
      sql_catalog.sql_catalog_role_keys = \
          current_sql_catalog_role_keys
      sql_catalog.sql_search_tables = current_sql_search_tables
3194
      self.commit()
3195

3196 3197 3198
  # Low priority bug, which needs a lot of time to be fixed
  # Marked as expectedFailure
  @expectedFailure
3199
  def test_PersonDocumentWithMonovaluedLocalRole(self):
3200 3201 3202 3203 3204
    """Test when user is added, which has local roles on own Person Document

    This is a case when Person document containting reference with local role
    which shall be monovalued is reindexed for first time.
    """
3205 3206 3207 3208 3209 3210 3211 3212 3213 3214 3215 3216 3217 3218 3219 3220 3221 3222
    user = 'person_document_user_name'

    sql_connection = self.getSQLConnection()
    def query(sql):
      result = sql_connection.manage_test(sql)
      return result.dictionaries()

    # Add a new table to the catalog
    sql_catalog = self.portal.portal_catalog.getSQLCatalog()

    local_roles_table = "person_document_test_user_or_group_local_roles"

    create_local_role_table_sql = """
CREATE TABLE `%s` (
  `uid` BIGINT UNSIGNED NOT NULL,
  `viewable_assignee_reference` varchar(32) NOT NULL default '',
  PRIMARY KEY  (`uid`),
  KEY `viewable_assignee_reference` (`viewable_assignee_reference`)
3223
) ENGINE=InnoDB;
3224 3225 3226 3227 3228 3229 3230 3231 3232 3233 3234 3235 3236 3237 3238 3239 3240 3241 3242 3243 3244 3245 3246 3247
    """ % local_roles_table
    sql_catalog.manage_addProduct['ZSQLMethods'].manage_addZSQLMethod(
          id = 'z_create_%s' % local_roles_table,
          title = '',
          arguments = "",
          connection_id = 'erp5_sql_connection',
          template = create_local_role_table_sql)

    drop_local_role_table_sql = """
DROP TABLE IF EXISTS %s
    """ % local_roles_table
    sql_catalog.manage_addProduct['ZSQLMethods'].manage_addZSQLMethod(
          id = 'z0_drop_%s' % local_roles_table,
          title = '',
          arguments = "",
          connection_id = 'erp5_sql_connection',
          template = drop_local_role_table_sql)

    catalog_local_role_sql = """
REPLACE INTO
  %s
VALUES
<dtml-in prefix="loop" expr="_.range(_.len(uid))">
(
3248
  <dtml-sqlvar expr="uid[loop_item]" type="int">,
3249 3250 3251 3252 3253 3254 3255 3256 3257 3258 3259 3260 3261 3262 3263 3264
  <dtml-sqlvar expr="getViewPermissionAssignee[loop_item] or ''" type="string" optional>
)
<dtml-if sequence-end>
<dtml-else>
,
</dtml-if>
</dtml-in>
    """ % local_roles_table
    sql_catalog.manage_addProduct['ZSQLMethods'].manage_addZSQLMethod(
          id = 'z_catalog_%s_list' % local_roles_table,
          title = '',
          connection_id = 'erp5_sql_connection',
          arguments = "\n".join(['uid',
                                 'getViewPermissionAssignee']),
          template = catalog_local_role_sql)

3265
    self.commit()
3266 3267 3268 3269 3270 3271 3272 3273 3274 3275 3276 3277 3278 3279 3280 3281 3282 3283 3284 3285 3286
    current_sql_catalog_object_list = sql_catalog.sql_catalog_object_list
    sql_catalog.sql_catalog_object_list = \
      current_sql_catalog_object_list + \
         ('z_catalog_%s_list' % local_roles_table,)
    current_sql_clear_catalog = sql_catalog.sql_clear_catalog
    sql_catalog.sql_clear_catalog = \
      current_sql_clear_catalog + \
         ('z0_drop_%s' % local_roles_table, 'z_create_%s' % local_roles_table)

    current_sql_catalog_role_keys = \
          sql_catalog.sql_catalog_role_keys
    sql_catalog.sql_catalog_role_keys = (
        'Owner | viewable_owner',
        'Assignee | %s.viewable_assignee_reference' % \
       local_roles_table,)

    current_sql_search_tables = sql_catalog.sql_search_tables
    sql_catalog.sql_search_tables = sql_catalog.sql_search_tables + \
        [local_roles_table]

    portal = self.getPortal()
3287
    self.commit()
3288 3289 3290 3291 3292

    try:
      # Clear catalog
      portal_catalog = self.getCatalogTool()
      portal_catalog.manage_catalogClear()
3293
      self.commit()
3294
      self.portal.portal_caches.clearAllCache()
3295
      self.commit()
3296 3297 3298 3299 3300 3301 3302 3303

      person = self.portal.person_module.newContent(portal_type='Person',
          reference=user)
      person.manage_setLocalRoles(user, ['Assignee'])
      self.tic()

      roles_and_users_result = query('select * from roles_and_users where uid = (select security_uid from catalog where uid = %s)' % person.getUid())
      local_roles_table_result = query('select * from %s where uid = %s' % (local_roles_table, person.getUid()))[0]
3304

3305 3306 3307 3308 3309 3310 3311 3312 3313 3314 3315 3316 3317 3318 3319
      # check that local seucirty table is clean about created person object
      self.assertSameSet(
          sorted([q['allowedRolesAndUsers'] for q in roles_and_users_result]),
          ['Assignee', 'Assignor', 'Associate', 'Auditor', 'Author', 'Manager']
      )
      # check that user has optimised security declaration
      self.assertEqual(local_roles_table_result['viewable_assignee_reference'], user)
    finally:
      sql_catalog.sql_catalog_object_list = \
        current_sql_catalog_object_list
      sql_catalog.sql_clear_catalog = \
        current_sql_clear_catalog
      sql_catalog.sql_catalog_role_keys = \
          current_sql_catalog_role_keys
      sql_catalog.sql_search_tables = current_sql_search_tables
3320
      self.commit()
3321

Jérome Perrin's avatar
Jérome Perrin committed
3322 3323
  def test_ObjectReindexationConcurency(self):
    portal = self.portal
3324 3325 3326 3327 3328 3329 3330 3331 3332 3333 3334 3335 3336 3337 3338

    portal_activities = getattr(portal, 'portal_activities', None)
    if portal_activities is None:
      ZopeTestCase._print('\n Skipping test_ObjectReindexatoinConcurency (portal_activities not found)')
      return

    container = organisation_module = portal.organisation_module
    document_1 = container.newContent()
    document_1_1 = document_1.newContent()
    document_1_2 = document_1.newContent()
    document_2 = container.newContent()
    self.tic()
    # First case: parent, then child
    document_1.reindexObject()
    self.assertEqual(len(portal_activities.getMessageList()), 0)
3339
    self.commit()
3340 3341 3342 3343
    self.assertEqual(len(portal_activities.getMessageList()), 1)
    portal_activities.distribute()
    self.assertEqual(len([x for x in portal_activities.getMessageList() if x.processing_node == 0]), 1)
    document_1_1.reindexObject()
3344
    self.commit()
3345 3346 3347 3348 3349 3350 3351 3352
    self.assertEqual(len(portal_activities.getMessageList()), 2)
    portal_activities.distribute()
    self.assertEqual(len([x for x in portal_activities.getMessageList() if x.processing_node == 0]), 1)
    self.tic()
    # Variation of first case: parent's borther along
    document_1.reindexObject()
    document_2.reindexObject()
    self.assertEqual(len(portal_activities.getMessageList()), 0)
3353
    self.commit()
3354 3355 3356 3357
    self.assertEqual(len(portal_activities.getMessageList()), 2)
    portal_activities.distribute()
    self.assertEqual(len([x for x in portal_activities.getMessageList() if x.processing_node == 0]), 2)
    document_1_1.reindexObject()
3358
    self.commit()
3359 3360 3361 3362 3363 3364 3365
    self.assertEqual(len(portal_activities.getMessageList()), 3)
    portal_activities.distribute()
    self.assertEqual(len([x for x in portal_activities.getMessageList() if x.processing_node == 0]), 2)
    self.tic()
    # Second case: child, then parent
    document_1_1.reindexObject()
    self.assertEqual(len(portal_activities.getMessageList()), 0)
3366
    self.commit()
3367 3368 3369 3370
    self.assertEqual(len(portal_activities.getMessageList()), 1)
    portal_activities.distribute()
    self.assertEqual(len([x for x in portal_activities.getMessageList() if x.processing_node == 0]), 1)
    document_1.reindexObject()
3371
    self.commit()
3372 3373 3374 3375 3376 3377 3378 3379
    self.assertEqual(len(portal_activities.getMessageList()), 2)
    portal_activities.distribute()
    self.assertEqual(len([x for x in portal_activities.getMessageList() if x.processing_node == 0]), 1)
    self.tic()
    # Variation of second case: parent's borther along
    document_1_1.reindexObject()
    document_2.reindexObject()
    self.assertEqual(len(portal_activities.getMessageList()), 0)
3380
    self.commit()
3381 3382 3383 3384
    self.assertEqual(len(portal_activities.getMessageList()), 2)
    portal_activities.distribute()
    self.assertEqual(len([x for x in portal_activities.getMessageList() if x.processing_node == 0]), 2)
    document_1.reindexObject()
3385
    self.commit()
3386 3387 3388 3389 3390 3391 3392
    self.assertEqual(len(portal_activities.getMessageList()), 3)
    portal_activities.distribute()
    self.assertEqual(len([x for x in portal_activities.getMessageList() if x.processing_node == 0]), 2)
    self.tic()
    # Third case: child 1, then child 2
    document_1_1.reindexObject()
    self.assertEqual(len(portal_activities.getMessageList()), 0)
3393
    self.commit()
3394 3395 3396 3397
    self.assertEqual(len(portal_activities.getMessageList()), 1)
    portal_activities.distribute()
    self.assertEqual(len([x for x in portal_activities.getMessageList() if x.processing_node == 0]), 1)
    document_1_2.reindexObject()
3398
    self.commit()
3399 3400 3401 3402 3403 3404 3405 3406
    self.assertEqual(len(portal_activities.getMessageList()), 2)
    portal_activities.distribute()
    self.assertEqual(len([x for x in portal_activities.getMessageList() if x.processing_node == 0]), 1)
    self.tic()
    # Variation of third case: parent's borther along
    document_1_1.reindexObject()
    document_2.reindexObject()
    self.assertEqual(len(portal_activities.getMessageList()), 0)
3407
    self.commit()
3408 3409 3410 3411
    self.assertEqual(len(portal_activities.getMessageList()), 2)
    portal_activities.distribute()
    self.assertEqual(len([x for x in portal_activities.getMessageList() if x.processing_node == 0]), 2)
    document_1_2.reindexObject()
3412
    self.commit()
3413 3414 3415 3416 3417
    self.assertEqual(len(portal_activities.getMessageList()), 3)
    portal_activities.distribute()
    self.assertEqual(len([x for x in portal_activities.getMessageList() if x.processing_node == 0]), 2)
    self.tic()

Jérome Perrin's avatar
Jérome Perrin committed
3418
  def test_PercentCharacter(self):
3419 3420 3421 3422 3423 3424 3425 3426
    """
    Check expected behaviour of % character for simple query
    """
    portal_type = 'Organisation'
    folder = self.getOrganisationModule()
    folder.newContent(portal_type=portal_type, title='foo_organisation_1')
    folder.newContent(portal_type=portal_type, title='foo_organisation_2')
    self.tic()
3427
    self.assertEqual(1, len(folder.portal_catalog(portal_type=portal_type,
3428
                                                   title='foo_organisation_1')))
3429
    self.assertEqual(1, len(folder.portal_catalog(portal_type=portal_type,
3430
                                                   title='foo_organisation_2')))
3431
    self.assertEqual(1, len(folder.portal_catalog(portal_type=portal_type,
3432
                                                   title='%organisation_1')))
3433
    self.assertEqual(2, len(folder.portal_catalog(portal_type=portal_type,
3434
                                                   title='foo_organisation%')))
3435
    self.assertEqual(1, len(folder.portal_catalog(portal_type=portal_type,
3436 3437
                                                   title='foo_org%ion_1')))

Jérome Perrin's avatar
Jérome Perrin committed
3438
  def test_SearchedStringIsNotStripped(self):
3439 3440 3441 3442 3443 3444 3445 3446 3447 3448 3449 3450 3451 3452
    """
      Check that extra spaces in lookup values are preserved
    """
    portal_type = 'Organisation'
    folder = self.getOrganisationModule()
    first_doc = folder.newContent(portal_type=portal_type, reference="foo")
    second_doc = folder.newContent(portal_type=portal_type, reference=" foo")
    self.tic()
    def compareSet(reference, document_list):
      result = folder.portal_catalog(portal_type=portal_type,
                                     reference=reference)
      self.assertSameSet(document_list, [x.getObject() for x in result])
    compareSet('foo', [first_doc])
    compareSet(' foo', [second_doc])
3453 3454 3455 3456 3457
    # XXX: Those will hardly work, and it probably not the responsability of python code:
    # MySQL ignores trailing spaces in conditions.
    # So it's probably not really part of this test.
    #compareSet('foo ', [])
    #compareSet(' foo ', [])
3458

Jérome Perrin's avatar
Jérome Perrin committed
3459
  def test_WildcardMatchesUnsetValue(self):
3460 3461 3462 3463 3464 3465 3466 3467 3468 3469 3470
    """
      Check that the "%" wildcard matches unset values.
    """
    portal_type = 'Organisation'
    folder = self.getOrganisationModule()
    first_doc = folder.newContent(portal_type=portal_type, reference="doc 1")
    second_doc = folder.newContent(portal_type=portal_type, reference="doc 2", description="test")
    self.tic()
    result = folder.portal_catalog(portal_type=portal_type, reference='doc %', description='%')
    self.assertEqual(len(result), 2)

Jérome Perrin's avatar
Jérome Perrin committed
3471
  def test_multipleRelatedKeyDoMultipleJoins(self):
3472 3473 3474 3475 3476 3477 3478 3479 3480 3481
    """
      Check that when multiple related keys are present in the same query,
      each one does a separate join.
      ie:
        Searching for an object whose site_title is "foo" and
        site_description is "bar" will yeld a result set which is the union
        of:
          - objects whose site_title is foo
          - objects whose site_description is bar
    """
Jérome Perrin's avatar
Jérome Perrin committed
3482
    portal = self.portal
3483 3484 3485 3486 3487 3488 3489 3490 3491 3492 3493 3494 3495 3496 3497 3498 3499 3500 3501 3502 3503 3504 3505 3506 3507 3508 3509 3510 3511 3512 3513 3514 3515 3516 3517 3518 3519 3520 3521 3522 3523 3524 3525 3526 3527 3528 3529 3530 3531 3532 3533 3534 3535 3536 3537 3538 3539 3540
    def _create(**kw):
      return portal.organisation_module.newContent(portal_type='Organisation', **kw)
    def create(id, related_obect_list):
      return _create(id=id,
        site_value_list=related_obect_list,
        function_value_list=related_obect_list)
    def check(expected_result, description, query):
      result = [x.getObject() for x in portal.portal_catalog(portal_type='Organisation', query=query)]
      self.assertSameSet(expected_result, result,
        '%s:\nExpected: %r\nGot: %r' % (description,
           [x.getId() for x in expected_result],
           [x.getId() for x in result]))
    # completely artificial example, we just need relations
    related_1 = _create(title='foo1', reference='foo', description='bar')
    related_2 = _create(title='foo2', reference='foo'                   )
    related_3 = _create(                               description='bar')
    related_4 = _create()
    object_1  = create('object_1',  [related_1])
    object_2  = create('object_2',  [related_2])
    object_3  = create('object_3',  [related_3])
    object_4  = create('object_4',  [related_4])
    object_12 = create('object_12', [related_1, related_2])
    object_13 = create('object_13', [related_1, related_3])
    object_14 = create('object_14', [related_1, related_4])
    object_23 = create('object_23', [related_2, related_3])
    object_24 = create('object_24', [related_2, related_4])
    object_34 = create('object_34', [related_3, related_4])
    reference_object_list =   [object_1, object_2,                     object_12, object_13, object_14, object_23, object_24           ]
    description_object_list = [object_1,           object_3,           object_12, object_13, object_14, object_23,            object_34]
    both_object_list =        [object_1,                               object_12, object_13, object_14, object_23                      ]
    title_object_list =       [                                        object_12                                                       ]
    self.tic()
    # Single join
    check(reference_object_list,
          'site_reference="foo"',
          Query(site_reference='foo'))
    check(description_object_list,
          'site_description="bar"',
          Query(site_description='bar'))
    # Double join on different relations
    check(both_object_list,
          'site_reference="foo" AND function_description="bar"',
          ComplexQuery(Query(site_reference='foo'),
                       Query(function_description='bar'),
                       operator='AND'))
    # Double join on same relation
    check(both_object_list,
          'site_reference="foo" AND site_description="bar"',
          ComplexQuery(Query(site_reference='foo'),
                       Query(site_description='bar'),
                       operator='AND'))
    # Double join on same related key
    check(title_object_list,
          'site_title="foo1" AND site_title="foo2"',
          ComplexQuery(Query(site_title='=foo1'),
                       Query(site_title='=foo2'),
                       operator='AND'))

Jérome Perrin's avatar
Jérome Perrin committed
3541
  def test_SearchFolderWithRelatedDynamicRelatedKey(self):
3542
    # Create some objects
Jérome Perrin's avatar
Jérome Perrin committed
3543
    portal = self.portal
3544 3545 3546 3547 3548
    portal_category = self.getCategoryTool()
    portal_category.group.manage_delObjects([x for x in
        portal_category.group.objectIds()])
    group_nexedi_category = portal_category.group\
                                .newContent( id = 'nexedi', title='Nexedi',
3549
                                             reference='a')
3550 3551
    group_nexedi_category2 = portal_category.group\
                                .newContent( id = 'storever', title='Storever',
3552
                                             reference='b')
3553 3554 3555
    module = portal.getDefaultModule('Organisation')
    organisation = module.newContent(portal_type='Organisation',
                                     title='Nexedi Orga',
3556
                                     reference='c')
3557
    organisation.setGroup('nexedi')
3558
    self.assertEqual(organisation.getGroupValue(), group_nexedi_category)
3559 3560
    organisation2 = module.newContent(portal_type='Organisation',
                                      title='Storever Orga',
3561
                                      reference='d')
3562 3563
    organisation2.setGroup('storever')
    organisation2.setTitle('Organisation 2')
3564
    self.assertEqual(organisation2.getGroupValue(), group_nexedi_category2)
3565 3566 3567 3568 3569 3570
    # Flush message queue
    self.tic()

    base_category = portal_category.group
    # Try to get the category with the group related organisation title Nexedi
    # Orga
3571
    category_list = [x.getObject() for x in
3572 3573
                         base_category.searchFolder(
                           group_related_title='Nexedi Orga')]
3574
    self.assertEqual(category_list, [group_nexedi_category])
3575
    category_list = [x.getObject() for x in
3576 3577
                         base_category.searchFolder(
                           default_group_related_title='Nexedi Orga')]
3578
    self.assertEqual(category_list, [group_nexedi_category])
3579
    # Try to get the category with the group related organisation id
3580
    category_list = [x.getObject() for x in
3581
                         base_category.searchFolder(group_related_id='storever')]
3582
    self.assertEqual(category_list,[group_nexedi_category2])
3583
    # Try to get the category with the group related organisation reference 'd'
3584
    category_list = [x.getObject() for x in
3585
                         base_category.searchFolder(group_related_reference='d')]
3586
    self.assertEqual(category_list,[group_nexedi_category2])
3587
    # Try to get the category with the group related organisation reference
3588
    # 'e'
3589
    category_list = [x.getObject() for x in
3590
                         base_category.searchFolder(group_related_reference='e')]
3591
    self.assertEqual(category_list,[])
3592
    # Try to get the category with the default group related organisation reference
3593
    # 'e'
3594
    category_list = [x.getObject() for x in
3595
                         base_category.searchFolder(default_group_related_reference='e')]
3596
    self.assertEqual(category_list,[])
3597 3598
    # Try to get the category with the group related organisation relative_url
    organisation_relative_url = organisation.getRelativeUrl()
3599
    category_list = [x.getObject() for x in
3600
                 base_category.searchFolder(group_related_relative_url=organisation_relative_url)]
3601
    self.assertEqual(category_list, [group_nexedi_category])
3602
    # Try to get the category with the group related organisation uid
3603
    category_list = [x.getObject() for x in
3604
                 base_category.searchFolder(group_related_uid=organisation.getUid())]
3605
    self.assertEqual(category_list, [group_nexedi_category])
3606 3607
    # Try to get the category with the group related organisation id and title
    # of the category
3608
    category_list = [x.getObject() for x in
3609 3610
                         base_category.searchFolder(group_related_id=organisation2.getId(),
                                             title='Storever')]
3611
    self.assertEqual(category_list,[group_nexedi_category2])
3612

Jérome Perrin's avatar
Jérome Perrin committed
3613
  def test_SearchFolderWithRelatedDynamicStrictRelatedKey(self):
3614
    # Create some objects
Jérome Perrin's avatar
Jérome Perrin committed
3615
    portal = self.portal
3616 3617 3618 3619 3620
    portal_category = self.getCategoryTool()
    portal_category.group.manage_delObjects([x for x in
        portal_category.group.objectIds()])
    group_nexedi_category = portal_category.group\
                                .newContent( id = 'nexedi', title='Nexedi',
3621
                                             reference='a')
3622 3623
    sub_group_nexedi = group_nexedi_category\
                                .newContent( id = 'erp5', title='ERP5',
3624
                                             reference='b')
3625 3626 3627
    module = portal.getDefaultModule('Organisation')
    organisation = module.newContent(portal_type='Organisation',
                                     title='ERP5 Orga',
3628
                                     reference='c')
3629
    organisation.setGroup('nexedi/erp5')
3630
    self.assertEqual(organisation.getGroupValue(), sub_group_nexedi)
3631 3632
    organisation2 = module.newContent(portal_type='Organisation',
                                     title='Nexedi Orga',
3633
                                     reference='d')
3634 3635 3636 3637 3638 3639 3640 3641
    organisation2.setGroup('nexedi')
    # Flush message queue
    self.tic()

    base_category = portal_category.group

    # Try to get the category with the group related organisation title Nexedi
    # Orga
3642
    category_list = [x.getObject() for x in
3643 3644
                         base_category.portal_catalog(
                             strict_group_related_title='Nexedi Orga')]
3645
    self.assertEqual(category_list,[group_nexedi_category])
3646 3647
    # Try to get the category with the group related organisation title ERP5
    # Orga
3648
    category_list = [x.getObject() for x in
3649 3650
                         base_category.portal_catalog(
                           strict_group_related_title='ERP5 Orga')]
3651
    self.assertEqual(category_list,[sub_group_nexedi])
3652
    # Try to get the category with the group related organisation reference d
3653
    category_list = [x.getObject() for x in
3654
                         base_category.portal_catalog(
3655
                           strict_group_related_reference='d')]
3656
    self.assertEqual(category_list,[group_nexedi_category])
3657
    # Try to get the category with the group related organisation reference c
3658
    category_list = [x.getObject() for x in
3659
                         base_category.portal_catalog(
3660
                           strict_group_related_reference='c')]
3661
    self.assertEqual(category_list,[sub_group_nexedi])
3662

Jérome Perrin's avatar
Jérome Perrin committed
3663
  def test_EscapingLoginInSescurityQuery(self):
3664 3665 3666 3667 3668 3669 3670 3671 3672
    # Create some objects
    reference = "aaa.o'connor@fake.ie"
    portal = self.getPortal()
    uf = self.portal.acl_users
    uf._doAddUser(reference, 'secret', ['Member'], [])
    user = uf.getUserById(reference).__of__(uf)
    newSecurityManager(None, user)
    portal.view()

Jérome Perrin's avatar
Jérome Perrin committed
3673
  def test_IndexationContextIndependence(self):
3674 3675 3676 3677 3678 3679
    def doCatalog(catalog, document):
      catalog.catalogObjectList([document], check_uid=0)
      result = catalog(select_expression='reference', uid=document.getUid())
      self.assertEqual(len(result), 1)
      return result[0].reference

3680
    # Create some dummy documents
3681 3682 3683
    portal = self.getPortalObject()
    portal.foo = FooDocument()
    portal.bar = BarDocument()
3684

3685 3686 3687 3688
    # Get instances, wrapping them in acquisition context implicitely.
    foo = portal.foo
    bar = portal.bar

3689
    # Consistency checks
3690 3691 3692 3693 3694 3695 3696 3697 3698 3699 3700 3701 3702 3703 3704 3705 3706 3707 3708 3709 3710 3711 3712 3713 3714 3715 3716
    self.assertTrue(getattr(foo, 'getReference', None) is not None)
    self.assertTrue(getattr(bar, 'getReference', None) is None)

    # Clean indexing
    portal_catalog = portal.portal_catalog
    self.assertEqual(doCatalog(portal_catalog, foo), 'foo')
    self.assertEqual(doCatalog(portal_catalog, bar), None)

    # Index an object wrapped in a "poisoned" acquisition chain
    bar_on_foo = portal.foo.bar
    self.assertTrue(getattr(bar_on_foo, 'getReference', None) is not None)
    self.assertEqual(bar_on_foo.getReference(), 'foo')
    self.assertEqual(doCatalog(portal_catalog, bar_on_foo), None)

    # Index an object with catalog wrapped in a "poisoned" acquisition chain
    portal_catalog_on_foo = portal.foo.portal_catalog
    self.assertTrue(getattr(portal_catalog_on_foo, 'getReference', None) is not None)
    self.assertEqual(portal_catalog_on_foo.getReference(), 'foo')
    self.assertEqual(doCatalog(portal_catalog_on_foo, foo), 'foo')
    self.assertEqual(doCatalog(portal_catalog_on_foo, bar), None)

    # Poison everything
    self.assertEqual(doCatalog(portal_catalog_on_foo, bar_on_foo), None)

    delattr(portal, 'foo')
    delattr(portal, 'bar')

Jérome Perrin's avatar
Jérome Perrin committed
3717
  def test_distinct_select_expression(self):
3718 3719
    person = self.portal.person_module.newContent(portal_type='Person')
    self.tic()
3720 3721 3722
    portal_catalog = self.getCatalogTool()
    res = portal_catalog.searchResults(
      select_expression='count(DISTINCT catalog.reference) AS count_reference',
3723
      group_by_expression='catalog.reference',
3724
      portal_type='Person',
3725
    )
3726 3727
    self.assertEqual(1, len(res))
    self.assertEqual(person, res[0].getObject())
3728

Jérome Perrin's avatar
Jérome Perrin committed
3729
  def test_CatalogUidDuplicates(self):
3730 3731 3732 3733 3734 3735
    """
    Initially, the catalog was changing uids when a duplicate was found.

    This operation was really too dangerous, so now we raise errors in this
    case. Here we now check that the error is raised
    """
3736 3737 3738 3739 3740 3741 3742
    # Create an object just to allocate a new valid uid.
    person_module = self.getPersonModule()
    person = person_module.newContent(portal_type='Person')
    self.tic()

    # Make sure that the new object is catalogued.
    portal_catalog = self.getPortalObject().portal_catalog
3743
    self.assertEqual(person, portal_catalog(uid=person.uid)[0].getObject())
3744 3745 3746 3747 3748 3749 3750

    # Delete the new object to free the uid.
    available_uid = person.uid
    person_module.manage_delObjects(uids=[available_uid])
    self.tic()

    # Make sure that the uid is not used any longer.
3751
    self.assertEqual(0, len(portal_catalog(uid=person.uid)))
3752 3753 3754 3755 3756 3757 3758 3759 3760

    # Now, we create two new objects without indexing, so the catalog
    # will not know anything about these objects.
    person1 = person_module.newContent(portal_type='Person', is_indexable=False)
    person2 = person_module.newContent(portal_type='Person', is_indexable=False)

    # Force to assign the same uid, and catalog them.
    person1.uid = person2.uid = available_uid
    person1.is_indexable = person2.is_indexable = True
3761
    self.assertRaises(ValueError, portal_catalog.catalogObjectList,[person1, person2])
3762

Jérome Perrin's avatar
Jérome Perrin committed
3763
  def test_SearchFolderWithParenthesis(self):
3764 3765 3766 3767 3768 3769 3770
    person_module = self.getPersonModule()

    # Make sure that the catalog will not split it with such research :
    # title=foo AND title=bar
    title='foo (bar)'
    person = person_module.newContent(portal_type='Person',title=title)
    person_id = person.getId()
3771
    self.tic()
3772 3773
    folder_object_list = [x.getObject().getId() for x in person_module.searchFolder()]
    self.assertTrue(person_id in folder_object_list)
3774
    folder_object_list = [x.getObject().getId() for x in
3775
                              person_module.searchFolder(title=title)]
3776
    self.assertEqual([person_id],folder_object_list)
3777

Jérome Perrin's avatar
Jérome Perrin committed
3778
  def test_SearchFolderWithMultipleSpaces(self):
3779 3780 3781 3782 3783
    person_module = self.getPersonModule()

    # Make sure that the catalog will not split it with such research :
    # title=foo AND title=bar
    title='foo bar'
3784 3785
    person_module.newContent(portal_type='Person',title=title)
    self.tic()
3786 3787 3788
    title = title.replace(' ', '  ')
    person = person_module.newContent(portal_type='Person',title=title)
    person_id = person.getId()
3789
    self.tic()
3790 3791
    folder_object_list = [x.getObject().getId() for x in person_module.searchFolder()]
    self.assertTrue(person_id in folder_object_list)
3792
    folder_object_list = [x.getObject().getId() for x in
3793
                              person_module.searchFolder(**{'catalog.title':title})]
3794
    self.assertEqual([person_id],folder_object_list)
3795

Jérome Perrin's avatar
Jérome Perrin committed
3796
  def test_SearchFolderWithSingleQuote(self):
3797 3798 3799 3800 3801 3802 3803
    person_module = self.getPersonModule()

    # Make sure that the catalog will not split it with such research :
    # title=foo AND title=bar
    title="foo 'bar"
    person = person_module.newContent(portal_type='Person',title=title)
    person_id = person.getId()
3804
    self.tic()
3805 3806
    folder_object_list = [x.getObject().getId() for x in person_module.searchFolder()]
    self.assertTrue(person_id in folder_object_list)
3807
    folder_object_list = [x.getObject().getId() for x in
3808
                              person_module.searchFolder(title=title)]
3809
    self.assertEqual([person_id],folder_object_list)
3810

Jérome Perrin's avatar
Jérome Perrin committed
3811
  def test_ParameterSelectDict(self):
3812 3813 3814 3815
    person_module = self.getPersonModule()

    # Make sure that we are able to retrieve data directly from mysql
    # without retrieving real objects
Jérome Perrin's avatar
Jérome Perrin committed
3816
    title = "foo"
3817 3818 3819
    description = "foobar"
    person = person_module.newContent(portal_type='Person',title=title,
                                      description=description)
3820
    person_uid = person.getUid()
3821
    self.tic()
3822 3823
    folder_object_list = person_module.searchFolder(uid=person_uid, select_dict={'title': None})
    new_title = 'bar'
3824
    new_description = 'foobarfoo'
3825
    person.setTitle(new_title)
3826
    person.setDescription(new_description)
3827
    self.assertEqual(new_title, person.getTitle())
3828
    expected_sql_title_list = [title]
3829
    self.assertEqual([x.title for x in folder_object_list],
3830
                      expected_sql_title_list)
3831
    self.assertEqual([x.getProperty('title') for x in
3832 3833
                      folder_object_list], expected_sql_title_list)
    expected_sql_description_list = [new_description]
3834
    self.assertEqual([x.getProperty('description') for x in
3835
                      folder_object_list], expected_sql_description_list)
3836
    real_title_list = [new_title]
3837
    self.assertEqual([x.getTitle() for x in
3838
                      folder_object_list], real_title_list)
3839

Jérome Perrin's avatar
Jérome Perrin committed
3840
  def test_countResultsUsesFromExpression(self):
3841 3842 3843 3844 3845 3846 3847 3848 3849 3850 3851 3852 3853 3854 3855 3856
    person_module = self.getPersonModule()
    module_len = len(person_module)
    if module_len == 0:
      person = person_module.newContent(portal_type='Person')
      module_len = len(person_module)
    module_uid = person_module.getUid()

    self.tic()
    catalog = self.getCatalogTool()

    # Test sanity checks
    self.assertEqual(len(catalog.searchResults(parent_uid=module_uid)),
      module_len)
    self.assertEqual(catalog.countResults(parent_uid=module_uid)[0][0],
      module_len)

3857 3858 3859 3860 3861 3862 3863
    from_expression = {
      'catalog': '(SELECT sub_catalog.* FROM catalog AS sub_catalog'
                 ' WHERE sub_catalog.parent_uid=%i)'
                 ' AS catalog' % (module_uid, ),
    }
    count = catalog.countResults(from_expression=from_expression)[0][0]
    self.assertEqual(count, module_len)
3864

Jérome Perrin's avatar
Jérome Perrin committed
3865
  def test_getParentUid(self):
3866 3867
    from Products.ERP5.Document.Assignment import Assignment
    import erp5.portal_type
3868 3869
    person_module = self.getPersonModule()

3870 3871
    person_id = person_module.generateNewId()
    person = erp5.portal_type.Person(person_id)
3872
    person.setDefaultReindexParameters(activate_kw={'after_tag': self.id()})
3873
    person = person_module[person_module._setObject(person_id, person)]
3874 3875
    self.assertFalse('uid' in person.__dict__)
    person.uid = None
3876 3877 3878

    assignment_id = person.generateNewId()
    assignment = erp5.portal_type.Assignment(assignment_id)
3879
    assignment.setDefaultReindexParameters(activate_kw={'tag': self.id()})
3880
    assignment = person[person._setObject(assignment_id, assignment)]
3881 3882
    self.assertFalse('uid' in assignment.__dict__)
    assignment.uid = None
3883
    self.commit()
3884 3885 3886 3887 3888 3889 3890 3891 3892 3893 3894 3895 3896 3897 3898 3899 3900 3901 3902

    person_uid_list = []
    catalog_result_list = []
    Assignment_getParentUid = Assignment.getParentUid
    def getParentUid(self):
      person_uid_list.append(person.uid)
      uid = Assignment_getParentUid(self)
      catalog_result_list.append(len(self.portal_catalog(uid=uid)))
      return uid
    Assignment.getParentUid = getParentUid
    try:
      self.tic()
    finally:
      Assignment.getParentUid = Assignment_getParentUid
    self.assertEqual(catalog_result_list[0], 0)
    self.assertEqual(person_uid_list[0], None)
    self.assertTrue(int(person.uid))
    self.assertEqual(person.uid, assignment.getParentUid())

Jérome Perrin's avatar
Jérome Perrin committed
3903
  def test_queriesEndingWithSemicolon(self):
3904 3905
    connector = self.getPortal().erp5_sql_connection
    result = connector.manage_test('select 1 as foo;')
3906
    self.assertEqual(1, result[0].foo)
3907

3908
  def _createSomeGroupCategories(self):
3909 3910 3911 3912 3913 3914 3915 3916 3917 3918 3919 3920
    portal_category = self.getCategoryTool()
    group_category = portal_category.group
    group_data_map = dict(nexedi=('Nexedi', 'Nexedi Group'),
                          tiolive=('TIOLive', 'TioLive Group'),)
    existing_group_id_list = group_category.objectIds()
    for group_id, (title, description) in group_data_map.items():
      if group_id in existing_group_id_list:
        group = group_category[group_id]
      else:
        group = group_category.newContent(id=group_id)
      group.edit(title=title, description=description)

Jérome Perrin's avatar
Jérome Perrin committed
3921
  def test_SelectDictWithDynamicRelatedKey(self):
3922 3923 3924 3925 3926
    self._createSomeGroupCategories()

    # Create some orgs associated with varying association with the
    # groups created above.
    module = self.portal.getDefaultModule('Organisation')
3927 3928 3929 3930 3931 3932 3933 3934 3935 3936 3937 3938
    # org1 has no groups
    org1 = module.newContent(portal_type='Organisation', title='org1')
    # org2 has group nexedi
    org2 = module.newContent(portal_type='Organisation', title='org2')
    org2.setGroupList(['nexedi'])
    # org3 has group tiolive
    org3 = module.newContent(portal_type='Organisation', title='org3')
    org3.setGroupList(['tiolive'])
    # org4 has both groups
    org4 = module.newContent(portal_type='Organisation', title='org4')
    org4.setGroupList(['nexedi', 'tiolive'])
    # check associations are correct
3939 3940
    actual_group_title_map = {org.getTitle(): sorted(org.getGroupTitleList())
                              for org in (org1, org2, org3, org4)}
3941 3942 3943 3944
    expected_group_title_map = dict(org1=[],
                                    org2=['Nexedi'],
                                    org3=['TIOLive'],
                                    org4=['Nexedi', 'TIOLive'])
3945
    self.assertEqual(actual_group_title_map, expected_group_title_map)
3946 3947 3948
    # Flush message queue
    self.tic()

3949 3950
    # we will restrict our search to orgs with these ids to be resilient
    # to preexisting orgs:
3951
    org_id_list = sorted(org.getId() for org in (org1, org2, org3, org4))
3952 3953 3954
    # and we'll sort on title to make the output predictable
    search_kw = dict(id=org_id_list,
                     sort_on='title')
3955
    # Try to get the organisations with the group title Nexedi to make sure
3956
    # searching works correctly
3957
    organisation_list = [x.getObject() for x in
3958 3959
                         module.searchFolder(strict_group_title='Nexedi',
                                             **search_kw)]
3960
    self.assertEqual(organisation_list, [org2, org4])
3961 3962 3963
    # Now lets fetch the titles of groups of the above orgs using select_dict.
    search_kw.update(select_dict=dict(strict_group_title=None))
    records = module.searchFolder(**search_kw)
3964 3965 3966 3967 3968
    # By default the catalog returns all items, and the selected
    # strict_group_title is set to None for documents without groups
    # Besides, some entries will appear many times, according to the number of
    # relationships each catalog entry has in that related key.
    results = [(rec.title, rec.strict_group_title)
3969
               for rec in records]
3970
    self.assertEqual(sorted(results),
3971 3972
                      [('org1', None),
                       ('org2', 'Nexedi'),
3973 3974 3975
                       ('org3', 'TIOLive'),
                       ('org4', 'Nexedi'),
                       ('org4', 'TIOLive')])
3976 3977
    # This also works if we force a left join on the column.
    # They'll still be repeated according to their relationships, though.
3978 3979
    search_kw.update(left_join_list=('strict_group_title',))
    records = module.searchFolder(**search_kw)
3980
    results = [(rec.title, rec.strict_group_title)
3981
               for rec in records]
3982
    self.assertEqual(sorted(results),
3983
                      [('org1', None),
3984 3985 3986 3987 3988 3989 3990 3991 3992 3993 3994 3995 3996 3997
                       ('org2', 'Nexedi'),
                       ('org3', 'TIOLive'),
                       ('org4', 'Nexedi'),
                       ('org4', 'TIOLive')])
    # To get only one of each org, we need to group by one of the
    # catalog keys.

    # Note that this relies on a non-standard behaviour
    # of MySQL: If a selected column is not present in the GROUP BY
    # clause, only the first ocurrence is taken.  Other databases,
    # like Oracle, assume that selected columns are either GROUPed BY
    # or are inside an aggregation function (COUNT, SUM, GROUP_CONCAT,
    # ...), and consider the query to be in error otherwise.
    search_kw.update(group_by_list=('uid',))
3998
    organisation_list = [x.getObject() for x in
3999
                         module.searchFolder(**search_kw)]
4000
    self.assertEqual(organisation_list, [org1, org2, org3, org4])
4001

Jérome Perrin's avatar
Jérome Perrin committed
4002 4003
  def test_BackwardCompatibilityWithOldMethods(self):
    'Dealing with RelatedKey methods missing the proper separator'
4004 4005 4006
    module = self.getOrganisationModule()
    org_a = self._makeOrganisation(title='abc',default_address_city='abc')
    org_a.setReference(org_a.getId())
4007 4008 4009 4010 4011
    # sometimes the module itself is not indexed yet...
    module.reindexObject()

    # Flush message queue
    self.tic()
4012 4013 4014 4015 4016 4017 4018 4019 4020 4021 4022 4023 4024 4025 4026

    # make a query to fetch the address of the organisation above by
    # querying, among other things, the grand_parent
    query = dict(grand_parent_portal_type="Organisation Module",
                 parent_reference=org_a.getReference())
    catalog = self.getCatalogTool()
    # check the query works normally
    self.assertEqual([x.getObject() for x in catalog.searchResults(**query)],
                     [org_a.default_address])

    # even if we do a left_join
    query_lj = query.copy()
    query_lj.update(left_join_list=('grand_parent_portal_type',))
    self.assertEqual([x.getObject() for x in catalog.searchResults(**query_lj)],
                     [org_a.default_address])
4027

4028 4029 4030 4031 4032 4033 4034 4035 4036 4037 4038 4039 4040 4041 4042 4043 4044 4045 4046 4047 4048 4049
    # now turn the z_related_grand_parent into an old-style method, without
    # RELATED_QUERY_SEPARATOR
    method = catalog.getSQLCatalog().z_related_grand_parent
    old_src = method.src

    @self._addCleanup
    def cleanGrandParentMethod(self):
      method.manage_edit(method.title, method.connection_id,
                         method.arguments_src, old_src)

    src = old_src.replace('<dtml-var RELATED_QUERY_SEPARATOR>', ' AND ')
    method.manage_edit(method.title, method.connection_id, method.arguments_src,
                       src)

    # check that it still works
    self.assertEqual([x.getObject() for x in catalog.searchResults(**query)],
                     [org_a.default_address])

    # now try to do a left-join on grand_parent_portal_type which
    # shouldn't work
    self.assertRaises(RuntimeError, lambda: catalog.searchResults(**query_lj))

4050 4051 4052 4053 4054 4055 4056 4057 4058 4059 4060 4061 4062
    # Neither should it work if a left-join is attempted in a column
    # that has proper related-key rendering, but is present in the
    # same query as a column that hasn't, as the whole query is
    # converted into implicit inner joins.
    self.tic()
    query_lj.update(left_join_list=('strict_group_title',),
                    select_dict=('strict_group_title',))
    self.assertRaises(RuntimeError, lambda: catalog.searchResults(**query_lj))
    # though it should work on queries that don't use the broken related-key
    del query_lj['grand_parent_portal_type']
    self.assertEqual([x.getObject() for x in catalog.searchResults(**query_lj)],
                     [org_a.default_address])

Jérome Perrin's avatar
Jérome Perrin committed
4063 4064 4065 4066
def test_suite():
  suite = unittest.TestSuite()
  suite.addTest(unittest.makeSuite(TestERP5Catalog))
  return suite