Commit c26e6cd7 authored by Jeremy Hylton's avatar Jeremy Hylton

Merge atomic cache invalidation code from the 3.1 release branch.

parent c6c89bde
...@@ -268,6 +268,12 @@ class ClientStorage: ...@@ -268,6 +268,12 @@ class ClientStorage:
self._oid_lock = threading.Lock() self._oid_lock = threading.Lock()
self._oids = [] # Object ids retrieved from new_oids() self._oids = [] # Object ids retrieved from new_oids()
# Can't read data in one thread while writing data
# (tpc_finish) in another thread. In general, the lock
# must prevent access to the cache while _update_cache
# is executing.
self._lock = threading.Lock()
t = self._ts = get_timestamp() t = self._ts = get_timestamp()
self._serial = `t` self._serial = `t`
self._oid = '\0\0\0\0\0\0\0\0' self._oid = '\0\0\0\0\0\0\0\0'
...@@ -688,11 +694,19 @@ class ClientStorage: ...@@ -688,11 +694,19 @@ class ClientStorage:
specified by the given object id and version, if they exist; specified by the given object id and version, if they exist;
otherwise a KeyError is raised. otherwise a KeyError is raised.
""" """
self._lock.acquire() # for atomic processing of invalidations
try:
p = self._cache.load(oid, version) p = self._cache.load(oid, version)
if p: if p:
return p return p
finally:
self._lock.release()
if self._server is None: if self._server is None:
raise ClientDisconnected() raise ClientDisconnected()
# If an invalidation for oid comes in during zeoLoad, that's OK
# because we'll get oid's new state.
p, s, v, pv, sv = self._server.zeoLoad(oid) p, s, v, pv, sv = self._server.zeoLoad(oid)
self._cache.checkSize(0) self._cache.checkSize(0)
self._cache.store(oid, p, s, v, pv, sv) self._cache.store(oid, p, s, v, pv, sv)
...@@ -708,9 +722,13 @@ class ClientStorage: ...@@ -708,9 +722,13 @@ class ClientStorage:
If no version modified the object, return an empty string. If no version modified the object, return an empty string.
""" """
self._lock.acquire()
try:
v = self._cache.modifiedInVersion(oid) v = self._cache.modifiedInVersion(oid)
if v is not None: if v is not None:
return v return v
finally:
self._lock.release()
return self._server.modifiedInVersion(oid) return self._server.modifiedInVersion(oid)
def new_oid(self): def new_oid(self):
...@@ -847,16 +865,20 @@ class ClientStorage: ...@@ -847,16 +865,20 @@ class ClientStorage:
if transaction is not self._transaction: if transaction is not self._transaction:
return return
try: try:
self._lock.acquire() # for atomic processing of invalidations
try:
self._update_cache()
finally:
self._lock.release()
if f is not None: if f is not None:
f() f()
tid = self._server.tpc_finish(self._serial) tid = self._server.tpc_finish(self._serial)
self._cache.setLastTid(tid)
r = self._check_serials() r = self._check_serials()
assert r is None or len(r) == 0, "unhandled serialnos: %s" % r assert r is None or len(r) == 0, "unhandled serialnos: %s" % r
self._update_cache()
self._cache.setLastTid(tid)
finally: finally:
self.end_transaction() self.end_transaction()
...@@ -866,6 +888,8 @@ class ClientStorage: ...@@ -866,6 +888,8 @@ class ClientStorage:
This iterates over the objects in the transaction buffer and This iterates over the objects in the transaction buffer and
update or invalidate the cache. update or invalidate the cache.
""" """
# Must be called with _lock already acquired.
self._cache.checkSize(self._tbuf.get_size()) self._cache.checkSize(self._tbuf.get_size())
try: try:
self._tbuf.begin_iterate() self._tbuf.begin_iterate()
...@@ -912,10 +936,13 @@ class ClientStorage: ...@@ -912,10 +936,13 @@ class ClientStorage:
"""Storage API: undo a transaction, writing directly to the storage.""" """Storage API: undo a transaction, writing directly to the storage."""
if self._is_read_only: if self._is_read_only:
raise POSException.ReadOnlyError() raise POSException.ReadOnlyError()
# XXX what are the sync issues here?
oids = self._server.undo(transaction_id) oids = self._server.undo(transaction_id)
self._lock.acquire()
try:
for oid in oids: for oid in oids:
self._cache.invalidate(oid, '') self._cache.invalidate(oid, '')
finally:
self._lock.release()
return oids return oids
def undoInfo(self, first=0, last=-20, specification=None): def undoInfo(self, first=0, last=-20, specification=None):
...@@ -969,6 +996,8 @@ class ClientStorage: ...@@ -969,6 +996,8 @@ class ClientStorage:
# oid, version pairs. The DB's invalidate() method expects a # oid, version pairs. The DB's invalidate() method expects a
# dictionary of oids. # dictionary of oids.
self._lock.acquire()
try:
# versions maps version names to dictionary of invalidations # versions maps version names to dictionary of invalidations
versions = {} versions = {}
for oid, version in invs: for oid, version in invs:
...@@ -978,6 +1007,8 @@ class ClientStorage: ...@@ -978,6 +1007,8 @@ class ClientStorage:
if self._db is not None: if self._db is not None:
for v, d in versions.items(): for v, d in versions.items():
self._db.invalidate(d, version=v) self._db.invalidate(d, version=v)
finally:
self._lock.release()
def endVerify(self): def endVerify(self):
"""Server callback to signal end of cache validation.""" """Server callback to signal end of cache validation."""
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment