Commit 8ab30c70 authored by Tim Peters's avatar Tim Peters

Forward-porting critical ZEO client fixes from ZODB3-3_1-boston-branch.

+ If a storage times out between the vote and the finish, the ZEO cache
  could get populated with objects that don't make it to the storage
  server.

  A new flag self._midtxn_disconnect gets reset in tpc_begin() and set
  in notifyDisconnected().  If tpc_finish() discovers this flag set, it
  raises a ClientDisconnected error before calling tpc_finish() on the
  server.

+ In tpc_finish() we re-order the calls so that the server's tpc_finish()
  is called (and must succeed) before we update the ZEO client cache.

+ The storage name is now prepended to the sort key, to ensure a
  unique global sort order if storages are named uniquely.

+ Added new tests for the above (checkTimeoutAfterVote,
  checkTimeoutProvokingConflicts, checkSortKey).
parent 1e7306d4
...@@ -232,6 +232,11 @@ class ClientStorage: ...@@ -232,6 +232,11 @@ class ClientStorage:
self._username = username self._username = username
self._password = password self._password = password
self._realm = realm self._realm = realm
# Flag tracking disconnections in the middle of a transaction. This
# is reset in tpc_begin() and set in notifyDisconnected().
self._midtxn_disconnect = 0
# _server_addr is used by sortKey() # _server_addr is used by sortKey()
self._server_addr = None self._server_addr = None
self._tfile = None self._tfile = None
...@@ -514,7 +519,7 @@ class ClientStorage: ...@@ -514,7 +519,7 @@ class ClientStorage:
if self._server_addr is None: if self._server_addr is None:
raise ClientDisconnected raise ClientDisconnected
else: else:
return self._server_addr return '%s:%s' % (self._storage, self._server_addr)
def verify_cache(self, server): def verify_cache(self, server):
"""Internal routine called to verify the cache. """Internal routine called to verify the cache.
...@@ -583,6 +588,7 @@ class ClientStorage: ...@@ -583,6 +588,7 @@ class ClientStorage:
self._connection = None self._connection = None
self._ready.clear() self._ready.clear()
self._server = disconnected_stub self._server = disconnected_stub
self._midtxn_disconnect = 1
def __len__(self): def __len__(self):
"""Return the size of the storage.""" """Return the size of the storage."""
...@@ -821,6 +827,7 @@ class ClientStorage: ...@@ -821,6 +827,7 @@ class ClientStorage:
if self._is_read_only: if self._is_read_only:
raise POSException.ReadOnlyError() raise POSException.ReadOnlyError()
self._tpc_cond.acquire() self._tpc_cond.acquire()
self._midtxn_disconnect = 0
while self._transaction is not None: while self._transaction is not None:
# It is allowable for a client to call two tpc_begins in a # It is allowable for a client to call two tpc_begins in a
# row with the same transaction, and the second of these # row with the same transaction, and the second of these
...@@ -891,6 +898,12 @@ class ClientStorage: ...@@ -891,6 +898,12 @@ class ClientStorage:
return return
self._load_lock.acquire() self._load_lock.acquire()
try: try:
if self._midtxn_disconnect:
raise ClientDisconnected(
'Calling tpc_finish() on a disconnected transaction')
tid = self._server.tpc_finish(self._serial)
self._lock.acquire() # for atomic processing of invalidations self._lock.acquire() # for atomic processing of invalidations
try: try:
self._update_cache() self._update_cache()
...@@ -898,8 +911,6 @@ class ClientStorage: ...@@ -898,8 +911,6 @@ class ClientStorage:
f() f()
finally: finally:
self._lock.release() self._lock.release()
tid = self._server.tpc_finish(self._serial)
self._cache.setLastTid(tid) self._cache.setLastTid(tid)
r = self._check_serials() r = self._check_serials()
......
...@@ -29,15 +29,17 @@ import zLOG ...@@ -29,15 +29,17 @@ import zLOG
from ZEO.ClientStorage import ClientStorage from ZEO.ClientStorage import ClientStorage
from ZEO.Exceptions import ClientDisconnected from ZEO.Exceptions import ClientDisconnected
from ZEO.zrpc.marshal import Marshaller from ZEO.zrpc.marshal import Marshaller
from ZEO.zrpc.error import DisconnectedError
from ZEO.tests import forker from ZEO.tests import forker
from ZODB.DB import DB from ZODB.DB import DB
from ZODB.Transaction import get_transaction, Transaction from ZODB.Transaction import get_transaction, Transaction
from ZODB.POSException import ReadOnlyError from ZODB.POSException import ReadOnlyError, ConflictError
from ZODB.tests.StorageTestBase import StorageTestBase from ZODB.tests.StorageTestBase import StorageTestBase
from ZODB.tests.MinPO import MinPO from ZODB.tests.MinPO import MinPO
from ZODB.tests.StorageTestBase import zodb_pickle, zodb_unpickle from ZODB.tests.StorageTestBase import zodb_pickle, zodb_unpickle
from ZODB.tests.StorageTestBase import handle_all_serials, ZERO from ZODB.tests.StorageTestBase import handle_all_serials, ZERO
from ZODB.tests.StorageTestBase import handle_serials
class TestClientStorage(ClientStorage): class TestClientStorage(ClientStorage):
...@@ -844,6 +846,105 @@ class TimeoutTests(CommonSetupTearDown): ...@@ -844,6 +846,105 @@ class TimeoutTests(CommonSetupTearDown):
storage.tpc_abort(txn) storage.tpc_abort(txn)
storage.close() storage.close()
def checkTimeoutAfterVote(self):
raises = self.assertRaises
unless = self.failUnless
self.startServer()
self._storage = storage = self.openClientStorage()
# Assert that the zeo cache is empty
unless(not storage._cache._index)
# Create the object
oid = storage.new_oid()
obj = MinPO(7)
ZERO = '\0'*8
# Now do a store, sleeping before the finish so as to cause a timeout
t = Transaction()
storage.tpc_begin(t)
revid1 = storage.store(oid, ZERO, zodb_pickle(obj), '', t)
storage.tpc_vote(t)
# Now sleep long enough for the storage to time out
time.sleep(3)
storage.sync()
unless(not storage.is_connected())
storage._wait()
unless(storage.is_connected())
# We expect finish to fail
raises(ClientDisconnected, storage.tpc_finish, t)
# The cache should still be empty
unless(not storage._cache._index)
# Load should fail since the object should not be in either the cache
# or the server.
raises(KeyError, storage.load, oid, '')
def checkTimeoutProvokingConflicts(self):
eq = self.assertEqual
raises = self.assertRaises
unless = self.failUnless
self.startServer()
self._storage = storage = self.openClientStorage()
# Assert that the zeo cache is empty
unless(not storage._cache._index)
# Create the object
oid = storage.new_oid()
obj = MinPO(7)
ZERO = '\0'*8
# We need to successfully commit an object now so we have something to
# conflict about.
t = Transaction()
storage.tpc_begin(t)
revid1a = storage.store(oid, ZERO, zodb_pickle(obj), '', t)
revid1b = storage.tpc_vote(t)
revid1 = handle_serials(oid, revid1a, revid1b)
storage.tpc_finish(t)
# Now do a store, sleeping before the finish so as to cause a timeout
obj.value = 8
t = Transaction()
storage.tpc_begin(t)
revid2a = storage.store(oid, revid1, zodb_pickle(obj), '', t)
revid2b = storage.tpc_vote(t)
revid2 = handle_serials(oid, revid2a, revid2b)
# Now sleep long enough for the storage to time out
time.sleep(3)
storage.sync()
unless(not storage.is_connected())
storage._wait()
unless(storage.is_connected())
# We expect finish to fail
raises(ClientDisconnected, storage.tpc_finish, t)
# Now we think we've committed the second transaction, but we really
# haven't. A third one should produce a POSKeyError on the server,
# which manifests as a ConflictError on the client.
obj.value = 9
t = Transaction()
storage.tpc_begin(t)
storage.store(oid, revid2, zodb_pickle(obj), '', t)
raises(ConflictError, storage.tpc_vote, t)
# Even aborting won't help
storage.tpc_abort(t)
storage.tpc_finish(t)
# Try again
obj.value = 10
t = Transaction()
storage.tpc_begin(t)
storage.store(oid, revid2, zodb_pickle(obj), '', t)
# Even aborting won't help
raises(ConflictError, storage.tpc_vote, t)
# Abort this one and try a transaction that should succeed
storage.tpc_abort(t)
storage.tpc_finish(t)
# Now do a store, sleeping before the finish so as to cause a timeout
obj.value = 11
t = Transaction()
storage.tpc_begin(t)
revid2a = storage.store(oid, revid1, zodb_pickle(obj), '', t)
revid2b = storage.tpc_vote(t)
revid2 = handle_serials(oid, revid2a, revid2b)
storage.tpc_finish(t)
# Now load the object and verify that it has a value of 11
data, revid = storage.load(oid, '')
eq(zodb_unpickle(data), MinPO(11))
eq(revid, revid2)
class MSTThread(threading.Thread): class MSTThread(threading.Thread):
__super_init = threading.Thread.__init__ __super_init = threading.Thread.__init__
......
...@@ -162,6 +162,10 @@ class GenericTests( ...@@ -162,6 +162,10 @@ class GenericTests(
if hasattr(ZODB, "__version__"): if hasattr(ZODB, "__version__"):
ReadOnlyStorage.ReadOnlyStorage.checkWriteMethods(self) ReadOnlyStorage.ReadOnlyStorage.checkWriteMethods(self)
def checkSortKey(self):
key = '%s:%s' % (self._storage._storage, self._storage._server_addr)
self.assertEqual(self._storage.sortKey(), key)
class FileStorageTests(GenericTests): class FileStorageTests(GenericTests):
"""Test ZEO backed by a FileStorage.""" """Test ZEO backed by a FileStorage."""
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment