Commit 6d545b33 authored by Jean-Paul Smets's avatar Jean-Paul Smets

new catalog uid asynchronous allocation


git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@899 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent f0a82666
...@@ -280,7 +280,9 @@ class ERP5Generator(PortalGenerator): ...@@ -280,7 +280,9 @@ class ERP5Generator(PortalGenerator):
# Set parameters of the SQL method from the contents of a .zsql file. # Set parameters of the SQL method from the contents of a .zsql file.
sql_method.fromFile(os.path.join(zsql_dir, entry)) sql_method.fromFile(os.path.join(zsql_dir, entry))
# Setup ZSQLCaralog properties # Setup ZSQLCaralog properties
portal_catalog.sql_catalog_object = ('z0_catalog_object', 'z_catalog_category', 'z_catalog_movement', portal_catalog.sql_catalog_produce_reserved = 'z_produce_reserved_uid_list'
portal_catalog.sql_catalog_clear_reserved = 'z_clear_reserved'
portal_catalog.sql_catalog_object = ('z_update_object', 'z_catalog_category', 'z_catalog_movement',
'z_catalog_roles_and_users', 'z_catalog_stock', 'z_catalog_subject',) 'z_catalog_roles_and_users', 'z_catalog_stock', 'z_catalog_subject',)
portal_catalog.sql_uncatalog_object = ('z0_uncatalog_category', 'z0_uncatalog_movement', 'z0_uncatalog_roles_and_users', portal_catalog.sql_uncatalog_object = ('z0_uncatalog_category', 'z0_uncatalog_movement', 'z0_uncatalog_roles_and_users',
'z0_uncatalog_stock', 'z0_uncatalog_subject', 'z_uncatalog_object', ) 'z0_uncatalog_stock', 'z0_uncatalog_subject', 'z_uncatalog_object', )
...@@ -413,14 +415,6 @@ class ERP5Generator(PortalGenerator): ...@@ -413,14 +415,6 @@ class ERP5Generator(PortalGenerator):
# portal_catalog.reindexObject(p.portal_activities) # portal_catalog.reindexObject(p.portal_activities)
#p[MembershipTool.membersfolder_id].immediateReindexObject() #p[MembershipTool.membersfolder_id].immediateReindexObject()
def setupUserFolder(self, p):
try:
# Use NuxUserGroups instead of the standard acl_users.
p.manage_addProduct['NuxUserGroups'].manage_addUserFolderWithGroups()
except:
# No way.
PortalGenerator.setupUserFolder(self, p)
def setup(self, p, create_userfolder): def setup(self, p, create_userfolder):
self.setupTools(p) self.setupTools(p)
self.setupMailHost(p) self.setupMailHost(p)
......
...@@ -29,7 +29,7 @@ elif doAction0 == 'add': ...@@ -29,7 +29,7 @@ elif doAction0 == 'add':
container=context, container=context,
id=str(new_id), id=str(new_id),
RESPONSE=request.RESPONSE) RESPONSE=request.RESPONSE)
context[new_id].flushActivity(invoke=1) # context[new_id].flushActivity(invoke=1)
return request.RESPONSE return request.RESPONSE
return getattr(context,form_id)(request) return getattr(context,form_id)(request)
...@@ -9,7 +9,7 @@ context.portal_types.constructContent(type_name=context.portal_type, ...@@ -9,7 +9,7 @@ context.portal_types.constructContent(type_name=context.portal_type,
container=parent, container=parent,
id=str(new_id), id=str(new_id),
RESPONSE=REQUEST.RESPONSE) RESPONSE=REQUEST.RESPONSE)
parent[new_id].flushActivity(invoke=1) # parent[new_id].flushActivity(invoke=1)
# parent.invokeFactory(type_name=context.portal_type, # parent.invokeFactory(type_name=context.portal_type,
# id=str(parent.generateNewId()), # id=str(parent.generateNewId()),
# RESPONSE=REQUEST.RESPONSE) # RESPONSE=REQUEST.RESPONSE)
......
...@@ -14,6 +14,6 @@ context.portal_types.constructContent(type_name=type_name, ...@@ -14,6 +14,6 @@ context.portal_types.constructContent(type_name=type_name,
container=context, container=context,
id=str(new_id), id=str(new_id),
RESPONSE=REQUEST.RESPONSE) RESPONSE=REQUEST.RESPONSE)
context[new_id].flushActivity(invoke=1) #context[new_id].flushActivity(invoke=1)
return REQUEST.RESPONSE return REQUEST.RESPONSE
...@@ -41,12 +41,16 @@ def InitializeDocument(document_class, document_path=None): ...@@ -41,12 +41,16 @@ def InitializeDocument(document_class, document_path=None):
InitializeClass(document_class) InitializeClass(document_class)
# Register class in ERP5Type.Document # Register class in ERP5Type.Document
product_document_registry.append(((document_class.__name__, document_path))) product_document_registry.append(((document_class.__name__, document_path)))
#LOG('InitializeDocument', 0, document_class.__name__)
def initializeProductDocumentRegistry(): def initializeProductDocumentRegistry():
from Utils import importLocalDocument from Utils import importLocalDocument
for (class_id, document_path) in product_document_registry: for (class_id, document_path) in product_document_registry:
importLocalDocument(class_id, document_path=document_path) importLocalDocument(class_id, document_path=document_path)
print 'Added product document to ERP5Type repository: %s (%s)' % (class_id, document_path) #from Testing import ZopeTestCase
#ZopeTestCase._print('Added product document to ERP5Type repository: %s (%s) \n' % (class_id, document_path))
#LOG('Added product document to ERP5Type repository: %s (%s)' % (class_id, document_path), 0, '')
#print 'Added product document to ERP5Type repository: %s (%s)' % (class_id, document_path)
# Code Generation of __init__.py files # Code Generation of __init__.py files
def generateInitFiles(this_module, global_hook, def generateInitFiles(this_module, global_hook,
...@@ -79,6 +83,7 @@ if not hasattr(ERP5TypeDocumentRepository, '_override_%s'): ERP5TypeDocumentRepo ...@@ -79,6 +83,7 @@ if not hasattr(ERP5TypeDocumentRepository, '_override_%s'): ERP5TypeDocumentRepo
def add%s(folder, id, REQUEST=None, **kw): def add%s(folder, id, REQUEST=None, **kw):
o = ERP5TypeDocumentRepository.%s.%s(id) o = ERP5TypeDocumentRepository.%s.%s(id)
folder._setObject(id, o) folder._setObject(id, o)
o.uid = folder.portal_catalog.newUid()
if kw is not None: o.__of__(folder)._edit(force_update=1, **kw) if kw is not None: o.__of__(folder)._edit(force_update=1, **kw)
# contentCreate already calls reindex 3 times ... # contentCreate already calls reindex 3 times ...
# o.reindexObject() # o.reindexObject()
......
...@@ -16,7 +16,9 @@ from Persistence import Persistent ...@@ -16,7 +16,9 @@ from Persistence import Persistent
import Acquisition import Acquisition
import ExtensionClass import ExtensionClass
from string import lower, split, join from string import lower, split, join
from thread import get_ident
from DateTime import DateTime
from Products.PluginIndexes.common.randid import randid from Products.PluginIndexes.common.randid import randid
from Products.CMFCore.Expression import Expression from Products.CMFCore.Expression import Expression
from Acquisition import aq_parent, aq_inner, aq_base, aq_self from Acquisition import aq_parent, aq_inner, aq_base, aq_self
...@@ -24,6 +26,9 @@ from zLOG import LOG ...@@ -24,6 +26,9 @@ from zLOG import LOG
import time import time
UID_BUFFER_SIZE = 1000
MAX_UID_BUFFER_SIZE = 20000
class Catalog(Persistent, Acquisition.Implicit, ExtensionClass.Base): class Catalog(Persistent, Acquisition.Implicit, ExtensionClass.Base):
""" An Object Catalog """ An Object Catalog
...@@ -46,6 +51,7 @@ class Catalog(Persistent, Acquisition.Implicit, ExtensionClass.Base): ...@@ -46,6 +51,7 @@ class Catalog(Persistent, Acquisition.Implicit, ExtensionClass.Base):
- optmization: indexing objects should be deferred - optmization: indexing objects should be deferred
until timeout value or end of transaction until timeout value or end of transaction
""" """
_after_clear_reserved = 0
def __init__(self): def __init__(self):
self.schema = {} # mapping from attribute name to column self.schema = {} # mapping from attribute name to column
...@@ -64,6 +70,15 @@ class Catalog(Persistent, Acquisition.Implicit, ExtensionClass.Base): ...@@ -64,6 +70,15 @@ class Catalog(Persistent, Acquisition.Implicit, ExtensionClass.Base):
except: except:
pass pass
def clearReserved(self):
"""
Clears reserved uids
"""
method_id = self.sql_catalog_clear_reserved
method = getattr(self, method_id)
method()
self._after_clear_reserved = 1
def __getitem__(self, uid): def __getitem__(self, uid):
""" """
Get an object by UID Get an object by UID
...@@ -151,6 +166,50 @@ class Catalog(Persistent, Acquisition.Implicit, ExtensionClass.Base): ...@@ -151,6 +166,50 @@ class Catalog(Persistent, Acquisition.Implicit, ExtensionClass.Base):
return keys return keys
# the cataloging API # the cataloging API
def produceUid(self):
"""
Produces reserved uids in advance
"""
method_id = self.sql_catalog_produce_reserved
method = getattr(self, method_id)
thread_id = get_ident()
uid_list = getattr(self, '_v_uid_buffer', [])
if self._after_clear_reserved:
# Reset uid list after clear reserved
self._after_clear_reserved = 0
uid_list = []
if len(uid_list) < UID_BUFFER_SIZE:
date = DateTime()
new_uid_list = method(count = UID_BUFFER_SIZE, thread_id=thread_id, date=date)
uid_list.extend( filter(lambda x: x != 0, map(lambda x: x.uid, new_uid_list )))
self._v_uid_buffer = uid_list
def newUid(self):
"""
This is where uid generation takes place. We should consider a multi-threaded environment
with multiple ZEO clients on a single ZEO server.
The main risk is the following:
- objects a/b/c/d/e/f are created (a is parent of b which is parent of ... of f)
- one reindexing node N1 starts reindexing f
- another reindexing node N2 starts reindexing e
- there is a strong risk that N1 and N2 start reindexing at the same time
and provide different uid values for a/b/c/d/e
Similar problems may happen with relations and acquisition of uid values (ex. order_uid)
with the risk of graph loops
"""
self.produceUid()
uid_list = getattr(self, '_v_uid_buffer', [])
if len(uid_list) > 0:
return uid_list.pop()
else:
raise CatalogError("Could not retrieve new uid")
def catalogObject(self, object, path, is_object_moved=0): def catalogObject(self, object, path, is_object_moved=0):
""" """
Adds an object to the Catalog by calling Adds an object to the Catalog by calling
...@@ -161,8 +220,7 @@ class Catalog(Persistent, Acquisition.Implicit, ExtensionClass.Base): ...@@ -161,8 +220,7 @@ class Catalog(Persistent, Acquisition.Implicit, ExtensionClass.Base):
'uid' is the unique Catalog identifier for this object 'uid' is the unique Catalog identifier for this object
""" """
LOG('Catalog object:',0,str(path)) #LOG('Catalog object:',0,str(path))
parent = object.aq_parent
# Prepare the dictionnary of values # Prepare the dictionnary of values
kw = {} kw = {}
...@@ -183,7 +241,7 @@ class Catalog(Persistent, Acquisition.Implicit, ExtensionClass.Base): ...@@ -183,7 +241,7 @@ class Catalog(Persistent, Acquisition.Implicit, ExtensionClass.Base):
if (uid != index): if (uid != index):
# Update uid attribute of object # Update uid attribute of object
uid = int(index) uid = int(index)
LOG("Write Uid",0, "uid %s index %s" % (uid, index)) # LOG("Write Uid",0, "uid %s index %s" % (uid, index))
object.uid = uid object.uid = uid
# We will check if there is an filter on this # We will check if there is an filter on this
# method, if so we may not call this zsqlMethod # method, if so we may not call this zsqlMethod
...@@ -236,25 +294,11 @@ class Catalog(Persistent, Acquisition.Implicit, ExtensionClass.Base): ...@@ -236,25 +294,11 @@ class Catalog(Persistent, Acquisition.Implicit, ExtensionClass.Base):
# This can be very dangerous with relations stored in a category table (CMFCategory) # This can be very dangerous with relations stored in a category table (CMFCategory)
# This is why we recommend completely reindexing subobjects after any change of id # This is why we recommend completely reindexing subobjects after any change of id
if self.hasUid(uid): if self.hasUid(uid):
LOG('SQLCatalog WARNING',0,'assigning new uid to already catalogued object %s' % path)
uid = 0 uid = 0
if not uid: if not uid:
# Generate UID # Generate UID
# New style, get radom id index = self.newUid()
index=getattr(self, '_v_nextid', 0)
if index%4000 == 0: index = randid()
while self.hasUid(index):
index=randid()
# We want ids to be somewhat random, but there are
# advantages for having some ids generated
# sequentially when many catalog updates are done at
# once, such as when reindexing or bulk indexing.
# We allocate ids sequentially using a volatile base,
# so different threads get different bases. This
# further reduces conflict and reduces churn in
# here and it result sets when bulk indexing.
self._v_nextid=index+1
# Update uid attribute of object
LOG("Write Uid 2",0, "uid %s index %s" % (uid, index))
object.uid = index object.uid = index
else: else:
index = uid index = uid
...@@ -302,9 +346,7 @@ class Catalog(Persistent, Acquisition.Implicit, ExtensionClass.Base): ...@@ -302,9 +346,7 @@ class Catalog(Persistent, Acquisition.Implicit, ExtensionClass.Base):
# Alter row # Alter row
# Create row # Create row
#try: #try:
zope_root = self.getPortalObject().aq_parent if 1:
root_indexable = int(getattr(zope_root,'isIndexable',1))
if root_indexable:
#LOG("Call SQL Method %s with args:" % method_name,0, str(kw)) #LOG("Call SQL Method %s with args:" % method_name,0, str(kw))
method(**kw) method(**kw)
#except: #except:
...@@ -394,8 +436,10 @@ class Catalog(Persistent, Acquisition.Implicit, ExtensionClass.Base): ...@@ -394,8 +436,10 @@ class Catalog(Persistent, Acquisition.Implicit, ExtensionClass.Base):
return None return None
def hasUid(self, uid): def hasUid(self, uid):
""" Checks if uid is catalogued """ """ Checks if uid is catalogued for a real object """
return self.getPathForUid(uid) is not None path = self.getPathForUid(uid)
if path is None: return 0
return path != 'reserved'
def getMetadataForUid(self, uid): def getMetadataForUid(self, uid):
""" Accesses a single record for a given uid """ """ Accesses a single record for a given uid """
...@@ -573,6 +617,7 @@ class Catalog(Persistent, Acquisition.Implicit, ExtensionClass.Base): ...@@ -573,6 +617,7 @@ class Catalog(Persistent, Acquisition.Implicit, ExtensionClass.Base):
pass pass
# Return the result # Return the result
#LOG('queryResults',0,'kw: %s' % str(kw))
return sql_method(**kw) return sql_method(**kw)
def searchResults(self, REQUEST=None, used=None, **kw): def searchResults(self, REQUEST=None, used=None, **kw):
......
...@@ -137,6 +137,16 @@ class ZCatalog(Folder, Persistent, Implicit): ...@@ -137,6 +137,16 @@ class ZCatalog(Folder, Persistent, Implicit):
'description' : 'The title of this catalog', 'description' : 'The title of this catalog',
'type' : 'string', 'type' : 'string',
'mode' : 'w' }, 'mode' : 'w' },
{ 'id' : 'sql_catalog_produce_reserved',
'description' : 'A method to produce new uid values in advance',
'type' : 'selection',
'select_variable' : 'getColumnIds',
'mode' : 'w' },
{ 'id' : 'sql_catalog_clear_reserved',
'description' : 'A method to clear reserved uid values',
'type' : 'selection',
'select_variable' : 'getColumnIds',
'mode' : 'w' },
{ 'id' : 'sql_catalog_object', { 'id' : 'sql_catalog_object',
'description' : 'Methods to be called to catalog an object', 'description' : 'Methods to be called to catalog an object',
'type' : 'multiple selection', 'type' : 'multiple selection',
...@@ -224,6 +234,8 @@ class ZCatalog(Folder, Persistent, Implicit): ...@@ -224,6 +234,8 @@ class ZCatalog(Folder, Persistent, Implicit):
'mode' : 'w' }, 'mode' : 'w' },
) )
sql_catalog_produce_reserved = 'z_produce_reserved_uid_list'
sql_catalog_clear_reserved = 'z_clear_reserved'
sql_catalog_object = ('catalog_object',) sql_catalog_object = ('catalog_object',)
sql_uncatalog_object = ('uncatalog_object',) sql_uncatalog_object = ('uncatalog_object',)
sql_update_object = ('update_object',) sql_update_object = ('update_object',)
...@@ -357,6 +369,14 @@ class ZCatalog(Folder, Persistent, Implicit): ...@@ -357,6 +369,14 @@ class ZCatalog(Folder, Persistent, Implicit):
RESPONSE.redirect(URL1 + '/manage_catalogAdvanced?manage_tabs_message=Catalog%20Cleared') RESPONSE.redirect(URL1 + '/manage_catalogAdvanced?manage_tabs_message=Catalog%20Cleared')
def manage_catalogClearReserved(self, REQUEST=None, RESPONSE=None, URL1=None):
""" clears the whole enchilada """
self._catalog.clearReserved()
if REQUEST and RESPONSE:
RESPONSE.redirect(URL1 + '/manage_catalogAdvanced?manage_tabs_message=Catalog%20Cleared')
def manage_catalogCreateTables(self, REQUEST=None, RESPONSE=None, URL1=None): def manage_catalogCreateTables(self, REQUEST=None, RESPONSE=None, URL1=None):
""" creates the tables required for cataging objects """ """ creates the tables required for cataging objects """
self._catalog.createTables() self._catalog.createTables()
...@@ -418,6 +438,11 @@ class ZCatalog(Folder, Persistent, Implicit): ...@@ -418,6 +438,11 @@ class ZCatalog(Folder, Persistent, Implicit):
if REQUEST and RESPONSE: if REQUEST and RESPONSE:
RESPONSE.redirect(URL1 + '/manage_catalogSchema?manage_tabs_message=Schema%20Saved') RESPONSE.redirect(URL1 + '/manage_catalogSchema?manage_tabs_message=Schema%20Saved')
def newUid(self):
"""
Allocates a new uid value.
"""
return self._catalog.newUid()
def catalog_object(self, obj, uid=None, idxs=[], is_object_moved=0): def catalog_object(self, obj, uid=None, idxs=[], is_object_moved=0):
""" wrapper around catalog """ """ wrapper around catalog """
......
...@@ -38,13 +38,17 @@ ...@@ -38,13 +38,17 @@
</td> </td>
</tr> </tr>
<tr> <tr>
<td> <td align="left" valign="top">
<p class="form-help">Clearing reserved uids allows to reduce the size of the catalog
after many Zope restart. It can be a dangerous operation if deferred
execution of cataloging of objects has not been completed.
</p>
</td> </td>
</tr> <td align="right" valign="top">
<tr> <form action="<dtml-var URL1>">
<td> <input class="form-element" type="submit"
name="manage_catalogClearReserved:method" value=" Clear Reserved ">
</form>
</td> </td>
</tr> </tr>
</table> </table>
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment