Commit c3091e24 authored by Jim Fulton's avatar Jim Fulton

Bug fixed

- A file storage bug could cause ZEO clients to have incorrect
  information about current object revisions after reconnecting to a
  database server.

Also added a locking/transaction ordering test.
parent 90cd4ab2
...@@ -12,6 +12,10 @@ Bugs fixed ...@@ -12,6 +12,10 @@ Bugs fixed
2.7 broke the object/connection cache implementation. 2.7 broke the object/connection cache implementation.
(https://bugs.launchpad.net/zodb/+bug/641481) (https://bugs.launchpad.net/zodb/+bug/641481)
- A file storage bug could cause ZEO clients to have incorrect
information about current object revisions after reconnecting to a
database server.
- Updated the 'repozo --kill-old-on-full' option to remove any '.index' - Updated the 'repozo --kill-old-on-full' option to remove any '.index'
files corresponding to backups being removed. files corresponding to backups being removed.
......
...@@ -11,10 +11,14 @@ ...@@ -11,10 +11,14 @@
# FOR A PARTICULAR PURPOSE # FOR A PARTICULAR PURPOSE
# #
############################################################################## ##############################################################################
"""Handy standard storage machinery """Storage base class that is mostly a mistake
$Id$ The base class here is tightly coupled with its subclasses and
its use is not recommended. It's still here for historical reasons.
""" """
from __future__ import with_statement
import cPickle import cPickle
import threading import threading
import time import time
...@@ -306,6 +310,10 @@ class BaseStorage(UndoLogCompatible): ...@@ -306,6 +310,10 @@ class BaseStorage(UndoLogCompatible):
""" """
pass pass
def lastTransaction(self):
with self._lock:
return self._ltid
def getTid(self, oid): def getTid(self, oid):
self._lock_acquire() self._lock_acquire()
try: try:
......
...@@ -1221,10 +1221,6 @@ class FileStorage( ...@@ -1221,10 +1221,6 @@ class FileStorage(
def iterator(self, start=None, stop=None): def iterator(self, start=None, stop=None):
return FileIterator(self._file_name, start, stop) return FileIterator(self._file_name, start, stop)
def lastTransaction(self):
"""Return transaction id for last committed transaction"""
return self._ltid
def lastInvalidations(self, count): def lastInvalidations(self, count):
file = self._file file = self._file
seek = file.seek seek = file.seek
......
...@@ -458,6 +458,36 @@ class IDatabase(IStorageDB): ...@@ -458,6 +458,36 @@ class IDatabase(IStorageDB):
class IStorage(Interface): class IStorage(Interface):
"""A storage is responsible for storing and retrieving data of objects. """A storage is responsible for storing and retrieving data of objects.
Consistency and locking
-----------------------
When transactions are committed, a storage assigns monotonically
increasing transaction identifiers (tids) to the transactions and
to the object versions written by the transactions. ZODB relies
on this to decide if data in object caches are up to date and to
implement multi-version concurrency control.
There are methods in IStorage and in derived interfaces that
provide information about the current revisions (tids) for objects
or for the database as a whole. It is critical for the proper
working of ZODB that the resulting tids are increasing with
respect to the object identifier given or to the databases. That
is, if there are 2 results for an object or for the database, R1
and R2, such that R1 is returned before R2, then the tid returned
by R2 must be greater than or equal to the tid returned by R1.
(When thinking about results for the database, think of these as
results for all objects in the database.)
This implies some sort of locking strategy. The key method is
tcp_finish, which causes new tids to be generated and also,
through the callback passed to it, returns new current tids for
the objects stored in a transaction and for the database as a whole.
The IStorage methods affected are lastTransaction, load, store,
and tpc_finish. Derived interfaces may introduce additional
methods.
""" """
def close(): def close():
...@@ -1294,7 +1324,6 @@ class IBroken(Interface): ...@@ -1294,7 +1324,6 @@ class IBroken(Interface):
__Broken_initargs__ = Attribute("Arguments passed to __init__.") __Broken_initargs__ = Attribute("Arguments passed to __init__.")
__Broken_state__ = Attribute("Value passed to __setstate__.") __Broken_state__ = Attribute("Value passed to __setstate__.")
class BlobError(Exception): class BlobError(Exception):
pass pass
......
...@@ -19,6 +19,8 @@ http://www.zope.org/Documentation/Developer/Models/ZODB/ZODB_Architecture_Storag ...@@ -19,6 +19,8 @@ http://www.zope.org/Documentation/Developer/Models/ZODB/ZODB_Architecture_Storag
All storages should be able to pass these tests. All storages should be able to pass these tests.
""" """
from __future__ import with_statement
from ZODB import POSException from ZODB import POSException
from ZODB.tests.MinPO import MinPO from ZODB.tests.MinPO import MinPO
from ZODB.tests.StorageTestBase import zodb_unpickle, zodb_pickle from ZODB.tests.StorageTestBase import zodb_unpickle, zodb_pickle
...@@ -298,3 +300,101 @@ class BasicStorage: ...@@ -298,3 +300,101 @@ class BasicStorage:
tid4 = self._storage.load(oid)[1] tid4 = self._storage.load(oid)[1]
self.assert_(tid4 > self._storage.load('\0\0\0\0\0\0\0\xf4')[1]) self.assert_(tid4 > self._storage.load('\0\0\0\0\0\0\0\xf4')[1])
def check_tid_ordering_w_commit(self):
# It's important that storages always give a consistent
# ordering for revisions, tids. This is most likely to fail
# around commit. Here we'll do some basic tests to check this.
# We'll use threads to arrange for ordering to go wrong and
# verify that a storage gets it right.
# First, some initial data.
t = transaction.get()
self._storage.tpc_begin(t)
self._storage.store(ZERO, ZERO, 'x', '', t)
self._storage.tpc_vote(t)
tids = []
self._storage.tpc_finish(t, lambda tid: tids.append(tid))
# OK, now we'll start a new transaction, take it to finish,
# and then block finish while we do some other operations.
t = transaction.get()
self._storage.tpc_begin(t)
self._storage.store(ZERO, tids[0], 'y', '', t)
self._storage.tpc_vote(t)
to_join = []
def run_in_thread(func):
t = threading.Thread(target=func)
t.setDaemon(True)
t.start()
to_join.append(t)
started = threading.Event()
finish = threading.Event()
@run_in_thread
def commit():
def callback(tid):
started.set()
tids.append(tid)
finish.wait()
self._storage.tpc_finish(t, callback)
results = {}
started.wait()
attempts = []
attempts_cond = threading.Condition()
def update_attempts():
with attempts_cond:
attempts.append(1)
attempts_cond.notifyAll()
@run_in_thread
def lastTransaction():
update_attempts()
results['lastTransaction'] = self._storage.lastTransaction()
@run_in_thread
def load():
update_attempts()
results['load'] = self._storage.load(ZERO, '')[1]
expected_attempts = 2
if hasattr(self._storage, 'getTid'):
expected_attempts += 1
@run_in_thread
def getTid():
update_attempts()
results['getTid'] = self._storage.getTid(ZERO)
if hasattr(self._storage, 'lastInvalidations'):
expected_attempts += 1
@run_in_thread
def lastInvalidations():
update_attempts()
invals = self._storage.lastInvalidations(1)
if invals:
results['lastInvalidations'] = invals[0][0]
with attempts_cond:
while len(attempts) < expected_attempts:
attempts_cond.wait()
time.sleep(.01) # for good measure :)
finish.set()
for t in to_join:
t.join(1)
self.assertEqual(results.pop('load'), tids[1])
self.assertEqual(results.pop('lastTransaction'), tids[1])
for m, tid in results.items():
self.assertEqual(tid, tids[1])
...@@ -18,6 +18,7 @@ storage to use for unit tests. MappingStorage isn't sufficient. ...@@ -18,6 +18,7 @@ storage to use for unit tests. MappingStorage isn't sufficient.
Since even a minimal storage has some complexity, we run standard Since even a minimal storage has some complexity, we run standard
storage tests against the test storage. storage tests against the test storage.
""" """
from __future__ import with_statement
import bisect import bisect
import threading import threading
...@@ -77,14 +78,11 @@ class MinimalMemoryStorage(BaseStorage, object): ...@@ -77,14 +78,11 @@ class MinimalMemoryStorage(BaseStorage, object):
def load(self, oid, version=''): def load(self, oid, version=''):
assert version == '' assert version == ''
self._lock_acquire() with self._lock:
try:
assert not version assert not version
tid = self._cur[oid] tid = self._cur[oid]
self.hook(oid, tid, '') self.hook(oid, tid, '')
return self._index[(oid, tid)], tid return self._index[(oid, tid)], tid
finally:
self._lock_release()
def _begin(self, tid, u, d, e): def _begin(self, tid, u, d, e):
self._txn = Transaction(tid) self._txn = Transaction(tid)
...@@ -104,22 +102,15 @@ class MinimalMemoryStorage(BaseStorage, object): ...@@ -104,22 +102,15 @@ class MinimalMemoryStorage(BaseStorage, object):
del self._txn del self._txn
def _finish(self, tid, u, d, e): def _finish(self, tid, u, d, e):
self._lock_acquire() with self._lock:
try:
self._index.update(self._txn.index) self._index.update(self._txn.index)
self._cur.update(self._txn.cur()) self._cur.update(self._txn.cur())
self._ltid = self._tid self._ltid = self._tid
finally:
self._lock_release()
def lastTransaction(self):
return self._ltid
def loadBefore(self, the_oid, the_tid): def loadBefore(self, the_oid, the_tid):
# It's okay if loadBefore() is really expensive, because this # It's okay if loadBefore() is really expensive, because this
# storage is just used for testing. # storage is just used for testing.
self._lock_acquire() with self._lock:
try:
tids = [tid for oid, tid in self._index if oid == the_oid] tids = [tid for oid, tid in self._index if oid == the_oid]
if not tids: if not tids:
raise KeyError(the_oid) raise KeyError(the_oid)
...@@ -134,15 +125,9 @@ class MinimalMemoryStorage(BaseStorage, object): ...@@ -134,15 +125,9 @@ class MinimalMemoryStorage(BaseStorage, object):
else: else:
end_tid = tids[j] end_tid = tids[j]
return self._index[(the_oid, tid)], tid, end_tid return self._index[(the_oid, tid)], tid, end_tid
finally:
self._lock_release()
def loadSerial(self, oid, serial): def loadSerial(self, oid, serial):
self._lock_acquire()
try:
return self._index[(oid, serial)] return self._index[(oid, serial)]
finally:
self._lock_release()
def close(self): def close(self):
pass pass
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment