Commit c8199844 authored by Jim Fulton's avatar Jim Fulton

A ZEO threading bug could cause transactions to read inconsistent

data.  (This sometimes caused an AssertionError in
Connection._setstate_noncurrent.)
parent d1480d2d
...@@ -1143,15 +1143,12 @@ class ClientStorage(object): ...@@ -1143,15 +1143,12 @@ class ClientStorage(object):
raise ClientDisconnected( raise ClientDisconnected(
'Calling tpc_finish() on a disconnected transaction') 'Calling tpc_finish() on a disconnected transaction')
# The calls to tpc_finish() and _update_cache() should finished = 0
# never run currently with another thread, because the
# tpc_cond condition variable prevents more than one
# thread from calling tpc_finish() at a time.
tid = self._server.tpc_finish(id(txn))
try: try:
self._lock.acquire() # for atomic processing of invalidations self._lock.acquire() # for atomic processing of invalidations
try: try:
tid = self._server.tpc_finish(id(txn))
finished = 1
self._update_cache(tid) self._update_cache(tid)
if f is not None: if f is not None:
f(tid) f(tid)
...@@ -1161,9 +1158,10 @@ class ClientStorage(object): ...@@ -1161,9 +1158,10 @@ class ClientStorage(object):
r = self._check_serials() r = self._check_serials()
assert r is None or len(r) == 0, "unhandled serialnos: %s" % r assert r is None or len(r) == 0, "unhandled serialnos: %s" % r
except: except:
# The server successfully committed. If we get a failure if finished:
# here, our own state will be in question, so reconnect. # The server successfully committed. If we get a failure
self._connection.close() # here, our own state will be in question, so reconnect.
self._connection.close()
raise raise
self.end_transaction() self.end_transaction()
......
...@@ -341,6 +341,10 @@ class Connection(ExportImport, object): ...@@ -341,6 +341,10 @@ class Connection(ExportImport, object):
try: try:
if self._txn_time is None: if self._txn_time is None:
self._txn_time = tid self._txn_time = tid
elif tid < self._txn_time:
raise AssertionError("invalidations out of order, %r < %r"
% (tid, self._txn_time))
self._invalidated.update(oids) self._invalidated.update(oids)
finally: finally:
self._inv_lock.release() self._inv_lock.release()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment