Commit f808f1fb authored by Yoshinori Okuji's avatar Yoshinori Okuji

Implement a transactional uid buffer.


git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@4161 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent 38da0052
...@@ -18,12 +18,13 @@ import ExtensionClass ...@@ -18,12 +18,13 @@ import ExtensionClass
import Globals import Globals
from Globals import DTMLFile, PersistentMapping from Globals import DTMLFile, PersistentMapping
from string import lower, split, join from string import lower, split, join
from thread import allocate_lock from thread import allocate_lock, get_ident
from OFS.Folder import Folder from OFS.Folder import Folder
from AccessControl import ClassSecurityInfo, getSecurityManager from AccessControl import ClassSecurityInfo, getSecurityManager
from BTrees.OIBTree import OIBTree from BTrees.OIBTree import OIBTree
from App.config import getConfiguration from App.config import getConfiguration
from BTrees.Length import Length from BTrees.Length import Length
from Shared.DC.ZRDB.TM import TM
from DateTime import DateTime from DateTime import DateTime
from Products.PluginIndexes.common.randid import randid from Products.PluginIndexes.common.randid import randid
...@@ -57,13 +58,7 @@ try: ...@@ -57,13 +58,7 @@ try:
except ImportError: except ImportError:
psyco = None psyco = None
UID_BUFFER_SIZE = 900 UID_BUFFER_SIZE = 300
MAX_UID_BUFFER_SIZE = 20000
MAX_QUEUE_SIZE = 100
# Put the queue of catalogged objects in RAM for distributed computation.
catalogged_path_dict = {}
catalogged_path_dict_lock = threading.Lock()
valid_method_meta_type_list = ('Z SQL Method', 'Script (Python)') valid_method_meta_type_list = ('Z SQL Method', 'Script (Python)')
...@@ -85,6 +80,72 @@ def manage_addSQLCatalog(self, id, title, ...@@ -85,6 +80,72 @@ def manage_addSQLCatalog(self, id, title,
if REQUEST is not None: if REQUEST is not None:
return self.manage_main(self, REQUEST,update_menu=1) return self.manage_main(self, REQUEST,update_menu=1)
class UidBuffer(TM):
"""Uid Buffer class caches a list of reserved uids in a transaction-safe way."""
def __init__(self):
"""Initialize some variables.
temporary_buffer is used to hold reserved uids created by non-committed transactions.
finished_buffer is used to hold reserved uids created by committed-transactions.
This distinction is important, because uids by non-committed transactions might become
invalid afterwards, so they may not be used by other transactions."""
self.temporary_buffer = {}
self.finished_buffer = []
def _finish(self):
"""Move the uids in the temporary buffer to the finished buffer."""
tid = get_ident()
try:
self.finished_buffer.extend(self.temporary_buffer[tid])
del self.temporary_buffer[tid]
except KeyError:
pass
def _abort(self):
"""Erase the uids in the temporary buffer."""
tid = get_ident()
try:
del self.temporary_buffer[tid]
except KeyError:
pass
def __len__(self):
tid = get_ident()
l = len(self.finished_buffer)
try:
l += len(self.temporary_buffer[tid])
except KeyError:
pass
return l
def remove(self, value):
self._register()
for uid_list in self.temporary_buffer.values():
try:
uid_list.remove(value)
except ValueError:
pass
try:
self.finished_buffer.remove(value)
except ValueError:
pass
def pop(self):
self._register()
tid = get_ident()
try:
return self.temporary_buffer[tid].pop()
except (KeyError, IndexError):
return self.finished_buffer.pop()
def extend(self, iterable):
self._register()
tid = get_ident()
self.temporary_buffer.setdefault(tid, []).extend(iterable)
class Catalog(Folder, Persistent, Acquisition.Implicit, ExtensionClass.Base): class Catalog(Folder, Persistent, Acquisition.Implicit, ExtensionClass.Base):
""" An Object Catalog """ An Object Catalog
...@@ -712,13 +773,14 @@ class Catalog(Folder, Persistent, Acquisition.Implicit, ExtensionClass.Base): ...@@ -712,13 +773,14 @@ class Catalog(Folder, Persistent, Acquisition.Implicit, ExtensionClass.Base):
""" """
klass = self.__class__ klass = self.__class__
assert klass._reserved_uid_lock.locked() assert klass._reserved_uid_lock.locked()
uid_list = getattr(self, '_v_uid_buffer', [])
# This checks if the list of local reserved uids was cleared after clearReserved # This checks if the list of local reserved uids was cleared after clearReserved
# had been called. # had been called.
if klass._local_clear_reserved_time != self._last_clear_reserved_time: if klass._local_clear_reserved_time != self._last_clear_reserved_time:
uid_list = [] self._v_uid_buffer = UidBuffer()
klass._local_clear_reserved_time = self._last_clear_reserved_time klass._local_clear_reserved_time = self._last_clear_reserved_time
if len(uid_list) == 0: elif not hasattr(self, '_v_uid_buffer'):
self._v_uid_buffer = UidBuffer()
if len(self._v_uid_buffer) == 0:
method_id = self.sql_catalog_produce_reserved method_id = self.sql_catalog_produce_reserved
method = getattr(self, method_id) method = getattr(self, method_id)
instance_id = klass._instance_id instance_id = klass._instance_id
...@@ -732,9 +794,8 @@ class Catalog(Folder, Persistent, Acquisition.Implicit, ExtensionClass.Base): ...@@ -732,9 +794,8 @@ class Catalog(Folder, Persistent, Acquisition.Implicit, ExtensionClass.Base):
pass pass
instance_id = md5.new(str(random_factor_list)).hexdigest()[:30] instance_id = md5.new(str(random_factor_list)).hexdigest()[:30]
klass._instance_id = instance_id klass._instance_id = instance_id
new_uid_list = method(count = UID_BUFFER_SIZE, instance_id=instance_id) uid_list = [x.uid for x in method(count = UID_BUFFER_SIZE, instance_id = instance_id) if x.uid != 0]
uid_list.extend( filter(lambda x: x != 0, map(lambda x: x.uid, new_uid_list ))) self._v_uid_buffer.extend(uid_list)
self._v_uid_buffer = uid_list
def newUid(self): def newUid(self):
""" """
...@@ -759,9 +820,8 @@ class Catalog(Folder, Persistent, Acquisition.Implicit, ExtensionClass.Base): ...@@ -759,9 +820,8 @@ class Catalog(Folder, Persistent, Acquisition.Implicit, ExtensionClass.Base):
try: try:
klass._reserved_uid_lock.acquire() klass._reserved_uid_lock.acquire()
self.produceUid() self.produceUid()
uid_list = getattr(self, '_v_uid_buffer', []) if len(self._v_uid_buffer) > 0:
if len(uid_list) > 0: uid = self._v_uid_buffer.pop()
uid = uid_list.pop()
if self._max_uid is None: if self._max_uid is None:
self._max_uid = Length() self._max_uid = Length()
if uid > self._max_uid(): if uid > self._max_uid():
...@@ -935,8 +995,7 @@ class Catalog(Folder, Persistent, Acquisition.Implicit, ExtensionClass.Base): ...@@ -935,8 +995,7 @@ class Catalog(Folder, Persistent, Acquisition.Implicit, ExtensionClass.Base):
klass = self.__class__ klass = self.__class__
try: try:
klass._reserved_uid_lock.acquire() klass._reserved_uid_lock.acquire()
uid_list = getattr(aq_base(self), '_v_uid_buffer', []) if hasattr(self, '_v_uid_buffer'):
if uid in uid_list:
# This is the case where: # This is the case where:
# 1. The object got an uid. # 1. The object got an uid.
# 2. The catalog was cleared. # 2. The catalog was cleared.
...@@ -945,8 +1004,10 @@ class Catalog(Folder, Persistent, Acquisition.Implicit, ExtensionClass.Base): ...@@ -945,8 +1004,10 @@ class Catalog(Folder, Persistent, Acquisition.Implicit, ExtensionClass.Base):
# In this case, the uid is not reserved any longer, but # In this case, the uid is not reserved any longer, but
# SQLCatalog believes that it is still reserved. So it is # SQLCatalog believes that it is still reserved. So it is
# necessary to remove the uid from the list explicitly. # necessary to remove the uid from the list explicitly.
uid_list.remove(uid) try:
self._v_uid_buffer = uid_list self._v_uid_buffer.remove(uid)
except ValueError:
pass
finally: finally:
klass._reserved_uid_lock.release() klass._reserved_uid_lock.release()
elif catalog_path is not None: elif catalog_path is not None:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment