Commit 9dae6a29 authored by Guido van Rossum's avatar Guido van Rossum

Change how a disconnect in mid-transaction is handled. Rather than

trying to fake a tpc_abort() call with the right transaction id, call
a new method abort(), which resets some instance variables and then
calls the strategy's abort() method.  The immediate strategy's abort()
aborts its own transaction and then calls the zeo_storage's
handle_waiting() method to restart the next blocked transaction from
the _waiting list; the delayed strategy's abort() removes the
zeo_storage from the _waiting list.
parent 43abd23e
...@@ -134,7 +134,7 @@ class ZEOStorage: ...@@ -134,7 +134,7 @@ class ZEOStorage:
# any pending transaction. Not sure if this is the cleanest way. # any pending transaction. Not sure if this is the cleanest way.
if self.transaction is not None: if self.transaction is not None:
self.log("disconnected during transaction %s" % self.transaction) self.log("disconnected during transaction %s" % self.transaction)
self.tpc_abort(self.transaction.id) self.abort()
else: else:
self.log("disconnected") self.log("disconnected")
...@@ -329,15 +329,13 @@ class ZEOStorage: ...@@ -329,15 +329,13 @@ class ZEOStorage:
strategy.tpc_abort() strategy.tpc_abort()
self.transaction = None self.transaction = None
self.strategy = None self.strategy = None
# When ZEOStorage.notifyDisconnected() calls self.tpc_abort(),
# it is possible that self.strategy is DelayedCommitStrategy.
# In that case, ZEOStorage.tpc_abort() should *not* call
# self.handle_waiting(), otherwise there could be two
# ZEOStorage instances whose strategy is
# ImmediateCommitStrategy!
if isinstance(strategy, ImmediateCommitStrategy):
self.handle_waiting() self.handle_waiting()
# XXX else, should we remove ourselves from storage._waiting???
def abort(self):
strategy = self.strategy
self.transaction = None
self.strategy = None
strategy.abort(self)
# XXX handle new serialnos # XXX handle new serialnos
...@@ -459,6 +457,9 @@ class ICommitStrategy: ...@@ -459,6 +457,9 @@ class ICommitStrategy:
def tpc_finish(self): pass def tpc_finish(self): pass
# What to do if a connection is closed in mid-transaction
def abort(self, zeo_storage): pass
class ImmediateCommitStrategy: class ImmediateCommitStrategy:
"""The storage is available so do a normal commit.""" """The storage is available so do a normal commit."""
...@@ -533,6 +534,10 @@ class ImmediateCommitStrategy: ...@@ -533,6 +534,10 @@ class ImmediateCommitStrategy:
self.invalidated.extend(inv) self.invalidated.extend(inv)
return oids return oids
def abort(self, zeo_storage):
self.tpc_abort()
zeo_storage.handle_waiting()
class DelayedCommitStrategy: class DelayedCommitStrategy:
"""The storage is unavailable, so log to a file.""" """The storage is unavailable, so log to a file."""
...@@ -596,6 +601,15 @@ class DelayedCommitStrategy: ...@@ -596,6 +601,15 @@ class DelayedCommitStrategy:
meth = getattr(new_strategy, self.name) meth = getattr(new_strategy, self.name)
return meth(*self.args) return meth(*self.args)
def abort(self, zeo_storage):
# Delete (d, zeo_storage) from the _waiting list, if found.
waiting = self.storage._waiting
for i in range(len(waiting)):
d, z = waiting[i]
if z is zeo_storage:
del waiting[i]
break
def run_in_thread(method, *args): def run_in_thread(method, *args):
t = SlowMethodThread(method, args) t = SlowMethodThread(method, args)
t.start() t.start()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment