diff --git a/product/ZSQLCatalog/SQLCatalog.py b/product/ZSQLCatalog/SQLCatalog.py index 66368501a61ddd674b653b6f0fdc666de9570d81..39c7af268b503c11684ca02c67bc86e88f97bd96 100755 --- a/product/ZSQLCatalog/SQLCatalog.py +++ b/product/ZSQLCatalog/SQLCatalog.py @@ -18,12 +18,13 @@ import ExtensionClass import Globals from Globals import DTMLFile, PersistentMapping from string import lower, split, join -from thread import allocate_lock +from thread import allocate_lock, get_ident from OFS.Folder import Folder from AccessControl import ClassSecurityInfo, getSecurityManager from BTrees.OIBTree import OIBTree from App.config import getConfiguration from BTrees.Length import Length +from Shared.DC.ZRDB.TM import TM from DateTime import DateTime from Products.PluginIndexes.common.randid import randid @@ -57,13 +58,7 @@ try: except ImportError: psyco = None -UID_BUFFER_SIZE = 900 -MAX_UID_BUFFER_SIZE = 20000 -MAX_QUEUE_SIZE = 100 - -# Put the queue of catalogged objects in RAM for distributed computation. -catalogged_path_dict = {} -catalogged_path_dict_lock = threading.Lock() +UID_BUFFER_SIZE = 300 valid_method_meta_type_list = ('Z SQL Method', 'Script (Python)') @@ -85,6 +80,72 @@ def manage_addSQLCatalog(self, id, title, if REQUEST is not None: return self.manage_main(self, REQUEST,update_menu=1) + +class UidBuffer(TM): + """Uid Buffer class caches a list of reserved uids in a transaction-safe way.""" + + def __init__(self): + """Initialize some variables. + + temporary_buffer is used to hold reserved uids created by non-committed transactions. + + finished_buffer is used to hold reserved uids created by committed-transactions. + + This distinction is important, because uids by non-committed transactions might become + invalid afterwards, so they may not be used by other transactions.""" + self.temporary_buffer = {} + self.finished_buffer = [] + + def _finish(self): + """Move the uids in the temporary buffer to the finished buffer.""" + tid = get_ident() + try: + self.finished_buffer.extend(self.temporary_buffer[tid]) + del self.temporary_buffer[tid] + except KeyError: + pass + + def _abort(self): + """Erase the uids in the temporary buffer.""" + tid = get_ident() + try: + del self.temporary_buffer[tid] + except KeyError: + pass + + def __len__(self): + tid = get_ident() + l = len(self.finished_buffer) + try: + l += len(self.temporary_buffer[tid]) + except KeyError: + pass + return l + + def remove(self, value): + self._register() + for uid_list in self.temporary_buffer.values(): + try: + uid_list.remove(value) + except ValueError: + pass + try: + self.finished_buffer.remove(value) + except ValueError: + pass + + def pop(self): + self._register() + tid = get_ident() + try: + return self.temporary_buffer[tid].pop() + except (KeyError, IndexError): + return self.finished_buffer.pop() + + def extend(self, iterable): + self._register() + tid = get_ident() + self.temporary_buffer.setdefault(tid, []).extend(iterable) class Catalog(Folder, Persistent, Acquisition.Implicit, ExtensionClass.Base): """ An Object Catalog @@ -712,13 +773,14 @@ class Catalog(Folder, Persistent, Acquisition.Implicit, ExtensionClass.Base): """ klass = self.__class__ assert klass._reserved_uid_lock.locked() - uid_list = getattr(self, '_v_uid_buffer', []) # This checks if the list of local reserved uids was cleared after clearReserved # had been called. if klass._local_clear_reserved_time != self._last_clear_reserved_time: - uid_list = [] + self._v_uid_buffer = UidBuffer() klass._local_clear_reserved_time = self._last_clear_reserved_time - if len(uid_list) == 0: + elif not hasattr(self, '_v_uid_buffer'): + self._v_uid_buffer = UidBuffer() + if len(self._v_uid_buffer) == 0: method_id = self.sql_catalog_produce_reserved method = getattr(self, method_id) instance_id = klass._instance_id @@ -732,9 +794,8 @@ class Catalog(Folder, Persistent, Acquisition.Implicit, ExtensionClass.Base): pass instance_id = md5.new(str(random_factor_list)).hexdigest()[:30] klass._instance_id = instance_id - new_uid_list = method(count = UID_BUFFER_SIZE, instance_id=instance_id) - uid_list.extend( filter(lambda x: x != 0, map(lambda x: x.uid, new_uid_list ))) - self._v_uid_buffer = uid_list + uid_list = [x.uid for x in method(count = UID_BUFFER_SIZE, instance_id = instance_id) if x.uid != 0] + self._v_uid_buffer.extend(uid_list) def newUid(self): """ @@ -759,9 +820,8 @@ class Catalog(Folder, Persistent, Acquisition.Implicit, ExtensionClass.Base): try: klass._reserved_uid_lock.acquire() self.produceUid() - uid_list = getattr(self, '_v_uid_buffer', []) - if len(uid_list) > 0: - uid = uid_list.pop() + if len(self._v_uid_buffer) > 0: + uid = self._v_uid_buffer.pop() if self._max_uid is None: self._max_uid = Length() if uid > self._max_uid(): @@ -935,8 +995,7 @@ class Catalog(Folder, Persistent, Acquisition.Implicit, ExtensionClass.Base): klass = self.__class__ try: klass._reserved_uid_lock.acquire() - uid_list = getattr(aq_base(self), '_v_uid_buffer', []) - if uid in uid_list: + if hasattr(self, '_v_uid_buffer'): # This is the case where: # 1. The object got an uid. # 2. The catalog was cleared. @@ -945,8 +1004,10 @@ class Catalog(Folder, Persistent, Acquisition.Implicit, ExtensionClass.Base): # In this case, the uid is not reserved any longer, but # SQLCatalog believes that it is still reserved. So it is # necessary to remove the uid from the list explicitly. - uid_list.remove(uid) - self._v_uid_buffer = uid_list + try: + self._v_uid_buffer.remove(uid) + except ValueError: + pass finally: klass._reserved_uid_lock.release() elif catalog_path is not None: