Commit 29936392 authored by Jim Fulton's avatar Jim Fulton

Simplified vote handling and made it more robust to storage vote

failures.

Also made the disconnected in transaction messages show the locked
status.
parent 0f36389f
...@@ -145,7 +145,8 @@ class ZEOStorage: ...@@ -145,7 +145,8 @@ class ZEOStorage:
# When this storage closes, we must ensure that it aborts # When this storage closes, we must ensure that it aborts
# any pending transaction. # any pending transaction.
if self.transaction is not None: if self.transaction is not None:
self.log("disconnected during transaction %s" % self.transaction) self.log("disconnected during %s transaction"
% self.locked and 'locked' or 'unlocked')
self.tpc_abort(self.transaction.id) self.tpc_abort(self.transaction.id)
else: else:
self.log("disconnected") self.log("disconnected")
...@@ -442,8 +443,7 @@ class ZEOStorage: ...@@ -442,8 +443,7 @@ class ZEOStorage:
if not self._check_tid(tid): if not self._check_tid(tid):
return return
self.stats.aborts += 1 self.stats.aborts += 1
if self.locked: self.storage.tpc_abort(self.transaction)
self.storage.tpc_abort(self.transaction)
self._clear_transaction() self._clear_transaction()
def _clear_transaction(self): def _clear_transaction(self):
...@@ -470,8 +470,40 @@ class ZEOStorage: ...@@ -470,8 +470,40 @@ class ZEOStorage:
self.locked = self.server.lock_storage(self) self.locked = self.server.lock_storage(self)
if self.locked: if self.locked:
try: try:
self._vote() self.log(
"Preparing to commit transaction: %d objects, %d bytes"
% (self.txnlog.stores, self.txnlog.size()),
level=BLATHER)
if (self.tid is not None) or (self.status != ' '):
self.storage.tpc_begin(self.transaction,
self.tid, self.status)
else:
self.storage.tpc_begin(self.transaction)
for op, args in self.txnlog:
if not getattr(self, op)(*args):
break
# Blob support
while self.blob_log and not self.store_failed:
oid, oldserial, data, blobfilename = self.blob_log.pop()
self._store(oid, oldserial, data, blobfilename)
if not self.store_failed:
# Only call tpc_vote of no store call failed,
# otherwise the serialnos() call will deliver an
# exception that will be handled by the client in
# its tpc_vote() method.
serials = self.storage.tpc_vote(self.transaction)
if serials:
self.serials.extend(serials)
self.client.serialnos(self.serials)
except Exception: except Exception:
self.storage.tpc_abort(self.transaction)
self._clear_transaction()
if delay is not None: if delay is not None:
delay.error() delay.error()
else: else:
...@@ -492,47 +524,6 @@ class ZEOStorage: ...@@ -492,47 +524,6 @@ class ZEOStorage:
if connection is not None: if connection is not None:
connection.call_from_thread(self._try_to_vote, delay) connection.call_from_thread(self._try_to_vote, delay)
def _vote(self):
if self.txnlog.stores == 1:
template = "Preparing to commit transaction: %d object, %d bytes"
else:
template = "Preparing to commit transaction: %d objects, %d bytes"
self.log(template % (self.txnlog.stores, self.txnlog.size()),
level=BLATHER)
if (self.tid is not None) or (self.status != ' '):
self.storage.tpc_begin(self.transaction, self.tid, self.status)
else:
self.storage.tpc_begin(self.transaction)
try:
for op, args in self.txnlog:
if not getattr(self, op)(*args):
break
# Blob support
while self.blob_log and not self.store_failed:
oid, oldserial, data, blobfilename = self.blob_log.pop()
self._store(oid, oldserial, data, blobfilename)
except:
self.storage.tpc_abort(self.transaction)
self._clear_transaction()
raise
if not self.store_failed:
# Only call tpc_vote of no store call failed, otherwise
# the serialnos() call will deliver an exception that will be
# handled by the client in its tpc_vote() method.
serials = self.storage.tpc_vote(self.transaction)
if serials:
self.serials.extend(serials)
self.client.serialnos(self.serials)
# The public methods of the ZEO client API do not do the real work. # The public methods of the ZEO client API do not do the real work.
# They defer work until after the storage lock has been acquired. # They defer work until after the storage lock has been acquired.
# Most of the real implementations are in methods beginning with # Most of the real implementations are in methods beginning with
......
...@@ -155,6 +155,50 @@ We can start another client and get the storage lock. ...@@ -155,6 +155,50 @@ We can start another client and get the storage lock.
>>> fs.close() >>> fs.close()
""" """
def errors_in_vote_should_clear_lock():
"""
So, we arrange to get an error in vote:
>>> import ZODB.MappingStorage
>>> vote_should_fail = True
>>> class MappingStorage(ZODB.MappingStorage.MappingStorage):
... def tpc_vote(*args):
... if vote_should_fail:
... raise ValueError
... return ZODB.MappingStorage.MappingStorage.tpc_vote(*args)
>>> server = ZEO.tests.servertesting.StorageServer(
... 'x', {'1': MappingStorage()})
>>> zs = ZEO.StorageServer.ZEOStorage(server)
>>> conn = ZEO.tests.servertesting.Connection(1)
>>> zs.notifyConnected(conn)
>>> zs.register('1', 0)
>>> zs.tpc_begin('0', '', '', {})
>>> zs.storea(ZODB.utils.p64(99), ZODB.utils.z64, 'x', '0')
>>> zs.vote('0')
Traceback (most recent call last):
...
ValueError
When we do, the storage server's transaction lock shouldn't be held:
>>> '1' in server._commit_locks
False
Of course, of vote suceeds, the lock will be held:
>>> vote_should_fail = False
>>> zs.tpc_begin('1', '', '', {})
>>> zs.storea(ZODB.utils.p64(99), ZODB.utils.z64, 'x', '1')
>>> _ = zs.vote('1') # doctest: +ELLIPSIS
1 callAsync serialnos ...
>>> '1' in server._commit_locks
True
"""
def test_suite(): def test_suite():
return unittest.TestSuite(( return unittest.TestSuite((
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment