Commit 699cd279 authored by Jim Fulton's avatar Jim Fulton

Simplified vote handling and made it more robust to storage vote

failures.

Also made the disconnected in transaction messages show the locked
status.
parent 0ad400ef
......@@ -145,7 +145,8 @@ class ZEOStorage:
# When this storage closes, we must ensure that it aborts
# any pending transaction.
if self.transaction is not None:
self.log("disconnected during transaction %s" % self.transaction)
self.log("disconnected during %s transaction"
% self.locked and 'locked' or 'unlocked')
self.tpc_abort(self.transaction.id)
else:
self.log("disconnected")
......@@ -442,7 +443,6 @@ class ZEOStorage:
if not self._check_tid(tid):
return
self.stats.aborts += 1
if self.locked:
self.storage.tpc_abort(self.transaction)
self._clear_transaction()
......@@ -470,44 +470,17 @@ class ZEOStorage:
self.locked = self.server.lock_storage(self)
if self.locked:
try:
self._vote()
except Exception:
if delay is not None:
delay.error()
else:
raise
else:
if delay is not None:
delay.reply(None)
else:
if delay == None:
self.log("(%r) queue lock: transactions waiting: %s"
% (self.storage_id, self.server.waiting(self)+1))
delay = Delay()
self.server.unlock_callback(self, delay)
return delay
def _unlock_callback(self, delay):
connection = self.connection
if connection is not None:
connection.call_from_thread(self._try_to_vote, delay)
def _vote(self):
if self.txnlog.stores == 1:
template = "Preparing to commit transaction: %d object, %d bytes"
else:
template = "Preparing to commit transaction: %d objects, %d bytes"
self.log(template % (self.txnlog.stores, self.txnlog.size()),
self.log(
"Preparing to commit transaction: %d objects, %d bytes"
% (self.txnlog.stores, self.txnlog.size()),
level=BLATHER)
if (self.tid is not None) or (self.status != ' '):
self.storage.tpc_begin(self.transaction, self.tid, self.status)
self.storage.tpc_begin(self.transaction,
self.tid, self.status)
else:
self.storage.tpc_begin(self.transaction)
try:
for op, args in self.txnlog:
if not getattr(self, op)(*args):
break
......@@ -517,22 +490,40 @@ class ZEOStorage:
oid, oldserial, data, blobfilename = self.blob_log.pop()
self._store(oid, oldserial, data, blobfilename)
except:
self.storage.tpc_abort(self.transaction)
self._clear_transaction()
raise
if not self.store_failed:
# Only call tpc_vote of no store call failed, otherwise
# the serialnos() call will deliver an exception that will be
# handled by the client in its tpc_vote() method.
# Only call tpc_vote of no store call failed,
# otherwise the serialnos() call will deliver an
# exception that will be handled by the client in
# its tpc_vote() method.
serials = self.storage.tpc_vote(self.transaction)
if serials:
self.serials.extend(serials)
self.client.serialnos(self.serials)
except Exception:
self.storage.tpc_abort(self.transaction)
self._clear_transaction()
if delay is not None:
delay.error()
else:
raise
else:
if delay is not None:
delay.reply(None)
else:
if delay == None:
self.log("(%r) queue lock: transactions waiting: %s"
% (self.storage_id, self.server.waiting(self)+1))
delay = Delay()
self.server.unlock_callback(self, delay)
return delay
def _unlock_callback(self, delay):
connection = self.connection
if connection is not None:
connection.call_from_thread(self._try_to_vote, delay)
# The public methods of the ZEO client API do not do the real work.
# They defer work until after the storage lock has been acquired.
# Most of the real implementations are in methods beginning with
......
......@@ -155,6 +155,50 @@ We can start another client and get the storage lock.
>>> fs.close()
"""
def errors_in_vote_should_clear_lock():
"""
So, we arrange to get an error in vote:
>>> import ZODB.MappingStorage
>>> vote_should_fail = True
>>> class MappingStorage(ZODB.MappingStorage.MappingStorage):
... def tpc_vote(*args):
... if vote_should_fail:
... raise ValueError
... return ZODB.MappingStorage.MappingStorage.tpc_vote(*args)
>>> server = ZEO.tests.servertesting.StorageServer(
... 'x', {'1': MappingStorage()})
>>> zs = ZEO.StorageServer.ZEOStorage(server)
>>> conn = ZEO.tests.servertesting.Connection(1)
>>> zs.notifyConnected(conn)
>>> zs.register('1', 0)
>>> zs.tpc_begin('0', '', '', {})
>>> zs.storea(ZODB.utils.p64(99), ZODB.utils.z64, 'x', '0')
>>> zs.vote('0')
Traceback (most recent call last):
...
ValueError
When we do, the storage server's transaction lock shouldn't be held:
>>> '1' in server._commit_locks
False
Of course, of vote suceeds, the lock will be held:
>>> vote_should_fail = False
>>> zs.tpc_begin('1', '', '', {})
>>> zs.storea(ZODB.utils.p64(99), ZODB.utils.z64, 'x', '1')
>>> _ = zs.vote('1') # doctest: +ELLIPSIS
1 callAsync serialnos ...
>>> '1' in server._commit_locks
True
"""
def test_suite():
return unittest.TestSuite((
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment