Commit 3c501f4a authored by Jeremy Hylton's avatar Jeremy Hylton

Refactor ZEOStorage to remove the strategy classes.

The strategy classes stopped being useful, because the code had been
modified to always use one of the strategies.  The classes were left
behind because editing the code to use a single class was too much
work at the time.

Move all the logic back into ZEOStorage.  Rename non-public methods on
the class to start with _.  Use lambda: rather than getattr()() to
invoke the method that gets delayed waiting for the storage lock.
parent b4b9b0b7
...@@ -97,6 +97,11 @@ class StorageServer: ...@@ -97,6 +97,11 @@ class StorageServer:
allowed, even if the storages are writable. Note that allowed, even if the storages are writable. Note that
pack() is considered a read-only operation. pack() is considered a read-only operation.
invalidation_queue_size -- The storage server keeps a queue
of the objects modified by the last N transactions, where
N == invalidation_queue_size. This queue is used to
speed client cache verification when a client disconnects
for a short period of time.
""" """
self.addr = addr self.addr = addr
...@@ -236,8 +241,6 @@ class ZEOStorage: ...@@ -236,8 +241,6 @@ class ZEOStorage:
# Classes we instantiate. A subclass might override. # Classes we instantiate. A subclass might override.
ClientStorageStubClass = ClientStub.ClientStorage ClientStorageStubClass = ClientStub.ClientStorage
DelayedCommitStrategyClass = None # patched up later
ImmediateCommitStrategyClass = None # patched up later
def __init__(self, server, read_only=0): def __init__(self, server, read_only=0):
self.server = server self.server = server
...@@ -247,6 +250,7 @@ class ZEOStorage: ...@@ -247,6 +250,7 @@ class ZEOStorage:
self.storage_id = "uninitialized" self.storage_id = "uninitialized"
self.transaction = None self.transaction = None
self.read_only = read_only self.read_only = read_only
self.locked = 0
self.log_label = _label self.log_label = _label
def notifyConnected(self, conn): def notifyConnected(self, conn):
...@@ -265,7 +269,7 @@ class ZEOStorage: ...@@ -265,7 +269,7 @@ class ZEOStorage:
# any pending transaction. # any pending transaction.
if self.transaction is not None: if self.transaction is not None:
self.log("disconnected during transaction %s" % self.transaction) self.log("disconnected during transaction %s" % self.transaction)
self.abort() self._abort()
else: else:
self.log("disconnected") self.log("disconnected")
...@@ -430,15 +434,15 @@ class ZEOStorage: ...@@ -430,15 +434,15 @@ class ZEOStorage:
def pack(self, time, wait=1): def pack(self, time, wait=1):
# Yes, you can pack a read-only server or storage! # Yes, you can pack a read-only server or storage!
if wait: if wait:
return run_in_thread(self.pack_impl, time) return run_in_thread(self._pack_impl, time)
else: else:
# If the client isn't waiting for a reply, start a thread # If the client isn't waiting for a reply, start a thread
# and forget about it. # and forget about it.
t = threading.Thread(target=self.pack_impl, args=(time,)) t = threading.Thread(target=self._pack_impl, args=(time,))
t.start() t.start()
return None return None
def pack_impl(self, time): def _pack_impl(self, time):
self.log("pack(time=%s) started..." % repr(time)) self.log("pack(time=%s) started..." % repr(time))
self.storage.pack(time, referencesf) self.storage.pack(time, referencesf)
self.log("pack(time=%s) complete" % repr(time)) self.log("pack(time=%s) complete" % repr(time))
...@@ -483,206 +487,104 @@ class ZEOStorage: ...@@ -483,206 +487,104 @@ class ZEOStorage:
raise StorageTransactionError("Multiple simultaneous tpc_begin" raise StorageTransactionError("Multiple simultaneous tpc_begin"
" requests from one client.") " requests from one client.")
# (This doesn't require a lock because we're using asyncore) self.transaction = t = Transaction()
self.strategy = self.DelayedCommitStrategyClass(self.storage,
self.wait)
t = Transaction()
t.id = id t.id = id
t.user = user t.user = user
t.description = description t.description = description
t._extension = ext t._extension = ext
self.strategy.tpc_begin(t, tid, status) self.serials = []
self.transaction = t self.invalidated = []
self.txnlog = CommitLog()
self.tid = tid
self.status = status
def tpc_finish(self, id): def tpc_finish(self, id):
if not self.check_tid(id): if not self.check_tid(id):
return return
invalidated = self.strategy.tpc_finish() assert self.locked
self.storage.tpc_finish(self.transaction)
tid = self.storage.lastTransaction() tid = self.storage.lastTransaction()
if invalidated: if self.invalidated:
self.server.invalidate(self, self.storage_id, tid, self.server.invalidate(self, self.storage_id, tid,
invalidated, self.get_size_info()) self.invalidated, self.get_size_info())
self.transaction = None self.transaction = None
self.strategy = None self.locked = 0
# Return the tid, for cache invalidation optimization # Return the tid, for cache invalidation optimization
self.handle_waiting() self._handle_waiting()
return tid return tid
def tpc_abort(self, id): def tpc_abort(self, id):
if not self.check_tid(id): if not self.check_tid(id):
return return
strategy = self.strategy if self.locked:
strategy.tpc_abort() self.storage.tpc_abort(self.transaction)
self.transaction = None
self.strategy = None
self.handle_waiting()
def abort(self):
strategy = self.strategy
self.transaction = None self.transaction = None
self.strategy = None self.locked = 0
strategy.abort(self) self._handle_waiting()
# XXX handle new serialnos def _abort(self):
# called when a connection is closed unexpectedly
if not self.locked:
# Delete (d, zeo_storage) from the _waiting list, if found.
waiting = self.storage._waiting
for i in range(len(waiting)):
d, z = waiting[i]
if z is self:
del waiting[i]
break
if self.transaction:
self.tpc_abort(self.transaction.id)
# The public methods of the ZEO client API do not do the real work.
# They defer work until after the storage lock has been acquired.
# Most of the real implementations are in methods beginning with
# an _.
def storea(self, oid, serial, data, version, id): def storea(self, oid, serial, data, version, id):
self.check_tid(id, exc=StorageTransactionError) self.check_tid(id, exc=StorageTransactionError)
self.strategy.store(oid, serial, data, version) self.txnlog.store(oid, serial, data, version)
# The following four methods return values, so they must acquire
# the storage lock and begin the transaction before returning.
def vote(self, id): def vote(self, id):
self.check_tid(id, exc=StorageTransactionError) self.check_tid(id, exc=StorageTransactionError)
return self.strategy.tpc_vote() if self.locked:
return self._vote()
else:
return self._wait(lambda: self._vote())
def abortVersion(self, src, id): def abortVersion(self, src, id):
self.check_tid(id, exc=StorageTransactionError) self.check_tid(id, exc=StorageTransactionError)
return self.strategy.abortVersion(src) if self.locked:
return self._abortVersion(src)
else:
return self._wait(lambda: self._abortVersion(src))
def commitVersion(self, src, dest, id): def commitVersion(self, src, dest, id):
self.check_tid(id, exc=StorageTransactionError) self.check_tid(id, exc=StorageTransactionError)
return self.strategy.commitVersion(src, dest) if self.locked:
return self._commitVersion(src, dest)
else:
return self._wait(lambda: self._commitVersion(src, dest))
def transactionalUndo(self, trans_id, id): def transactionalUndo(self, trans_id, id):
self.check_tid(id, exc=StorageTransactionError) self.check_tid(id, exc=StorageTransactionError)
return self.strategy.transactionalUndo(trans_id) if self.locked:
return self._transactionalUndo(trans_id)
# When a delayed transaction is restarted, the dance is
# complicated. The restart occurs when one ZEOStorage instance
# finishes as a transaction and finds another instance is in the
# _waiting list.
# XXX It might be better to have a mechanism to explicitly send
# the finishing transaction's reply before restarting the waiting
# transaction. If the restart takes a long time, the previous
# client will be blocked until it finishes.
def wait(self):
if self.storage._transaction:
d = Delay()
self.storage._waiting.append((d, self))
self.log("Transaction blocked waiting for storage. "
"Clients waiting: %d." % len(self.storage._waiting))
return d
else: else:
return self.restart() return self._wait(lambda: self._transactionalUndo(trans_id))
def dontwait(self):
return self.restart()
def handle_waiting(self):
while self.storage._waiting:
delay, zeo_storage = self.storage._waiting.pop(0)
if self.restart_other(zeo_storage, delay):
if self.storage._waiting:
n = len(self.storage._waiting)
self.log("Blocked transaction restarted. "
"Clients waiting: %d" % n)
else:
self.log("Blocked transaction restarted.")
return
def restart_other(self, zeo_storage, delay):
# Return True if the server restarted.
# call the restart() method on the appropriate server.
try:
zeo_storage.restart(delay)
except:
self.log("Unexpected error handling waiting transaction",
level=zLOG.WARNING, error=sys.exc_info())
zeo_storage.connection.close()
return 0
else:
return 1
def restart(self, delay=None):
old_strategy = self.strategy
assert isinstance(old_strategy, DelayedCommitStrategy)
self.strategy = ImmediateCommitStrategy(self.storage,
self.client,
self.log)
resp = old_strategy.restart(self.strategy)
if delay is not None:
delay.reply(resp)
else:
return resp
# A ZEOStorage instance can use different strategies to commit a
# transaction. The current implementation uses different strategies
# depending on whether the underlying storage is available. These
# strategies implement the distributed commit lock.
# If the underlying storage is availabe, start the commit immediately
# using the ImmediateCommitStrategy. If the underlying storage is not
# available because another client is committing a transaction, delay
# the commit as long as possible. At some point it will no longer be
# possible to delay; either the transaction will reach the vote stage
# or a synchronous method like transactionalUndo() will be called.
# When it is no longer possible to delay, the client must block until
# the storage is ready. Then we switch back to the immediate strategy.
class ICommitStrategy:
"""A class that describes that commit strategy interface.
The commit strategy interface does not require the transaction
argument, except for tpc_begin(). The storage interface requires
the client to pass a transaction object/id to each transactional
method. The strategy does not; it requires the caller to only
call methods for a single transaction.
"""
# This isn't a proper Zope interface, because I don't want to
# introduce a dependency between ZODB and Zope interfaces.
def tpc_begin(self, trans, tid, status): pass
def store(self, oid, serial, data, version): pass
def abortVersion(self, src): pass
def commitVersion(self, src, dest): pass
# the trans_id arg to transactionalUndo is not the current txn's id
def transactionalUndo(self, trans_id): pass
def tpc_vote(self): pass
def tpc_abort(self): pass def _tpc_begin(self, txn, tid, status):
self.locked = 1
def tpc_finish(self): pass
# What to do if a connection is closed in mid-transaction
def abort(self, zeo_storage): pass
class ImmediateCommitStrategy:
"""The storage is available so do a normal commit."""
def __init__(self, storage, client, logmethod):
self.storage = storage
self.client = client
self.invalidated = []
self.serials = []
self.log = logmethod
def tpc_begin(self, txn, tid, status):
self.txn = txn
self.storage.tpc_begin(txn, tid, status) self.storage.tpc_begin(txn, tid, status)
def tpc_vote(self): def _store(self, oid, serial, data, version):
# send all the serialnos as a batch
self.client.serialnos(self.serials)
return self.storage.tpc_vote(self.txn)
def tpc_finish(self):
self.storage.tpc_finish(self.txn)
return self.invalidated
def tpc_abort(self):
self.storage.tpc_abort(self.txn)
def store(self, oid, serial, data, version):
try: try:
newserial = self.storage.store(oid, serial, data, version, newserial = self.storage.store(oid, serial, data, version,
self.txn) self.transaction)
except (SystemExit, KeyboardInterrupt): except (SystemExit, KeyboardInterrupt):
raise raise
except Exception, err: except Exception, err:
...@@ -709,8 +611,18 @@ class ImmediateCommitStrategy: ...@@ -709,8 +611,18 @@ class ImmediateCommitStrategy:
self.invalidated.append((oid, version)) self.invalidated.append((oid, version))
self.serials.append((oid, newserial)) self.serials.append((oid, newserial))
def commitVersion(self, src, dest): def _vote(self):
oids = self.storage.commitVersion(src, dest, self.txn) self.client.serialnos(self.serials)
return self.storage.tpc_vote(self.transaction)
def _abortVersion(self, src):
oids = self.storage.abortVersion(src, self.transaction)
inv = [(oid, src) for oid in oids]
self.invalidated.extend(inv)
return oids
def _commitVersion(self, src, dest):
oids = self.storage.commitVersion(src, dest, self.transaction)
inv = [(oid, dest) for oid in oids] inv = [(oid, dest) for oid in oids]
self.invalidated.extend(inv) self.invalidated.extend(inv)
if dest: if dest:
...@@ -718,93 +630,72 @@ class ImmediateCommitStrategy: ...@@ -718,93 +630,72 @@ class ImmediateCommitStrategy:
self.invalidated.extend(inv) self.invalidated.extend(inv)
return oids return oids
def abortVersion(self, src): def _transactionalUndo(self, trans_id):
oids = self.storage.abortVersion(src, self.txn) oids = self.storage.transactionalUndo(trans_id, self.transaction)
inv = [(oid, src) for oid in oids]
self.invalidated.extend(inv)
return oids
def transactionalUndo(self, trans_id):
oids = self.storage.transactionalUndo(trans_id, self.txn)
inv = [(oid, None) for oid in oids] inv = [(oid, None) for oid in oids]
self.invalidated.extend(inv) self.invalidated.extend(inv)
return oids return oids
def abort(self, zeo_storage): # When a delayed transaction is restarted, the dance is
self.tpc_abort() # complicated. The restart occurs when one ZEOStorage instance
zeo_storage.handle_waiting() # finishes as a transaction and finds another instance is in the
# _waiting list.
class DelayedCommitStrategy: # XXX It might be better to have a mechanism to explicitly send
"""The storage is unavailable, so log to a file.""" # the finishing transaction's reply before restarting the waiting
# transaction. If the restart takes a long time, the previous
# client will be blocked until it finishes.
def __init__(self, storage, block): def _wait(self, thunk):
# the block argument is called when we can't delay any longer # Wait for the storage lock to be acquired.
self.storage = storage self._thunk = thunk
self.block = block if self.storage._transaction:
self.log = CommitLog() d = Delay()
self.storage._waiting.append((d, self))
self.log("Transaction blocked waiting for storage. "
"Clients waiting: %d." % len(self.storage._waiting))
return d
else:
return self._restart()
# Store information about the call that blocks def _restart(self, delay=None):
self.name = None # Restart when the storage lock is available.
self.args = None self._tpc_begin(self.transaction, self.tid, self.status)
loads, loader = self.txnlog.get_loader()
for i in range(loads):
# load oid, serial, data, version
self._store(*loader.load())
resp = self._thunk()
if delay is not None:
delay.reply(resp)
else:
return resp
def tpc_begin(self, txn, tid, status): def _handle_waiting(self):
self.txn = txn # Restart any client waiting for the storage lock.
self.tid = tid while self.storage._waiting:
self.status = status delay, zeo_storage = self.storage._waiting.pop(0)
if self._restart_other(zeo_storage, delay):
if self.storage._waiting:
n = len(self.storage._waiting)
self.log("Blocked transaction restarted. "
"Clients waiting: %d" % n)
else:
self.log("Blocked transaction restarted.")
return
def store(self, oid, serial, data, version): def _restart_other(self, zeo_storage, delay):
self.log.store(oid, serial, data, version) # Return True if the server restarted.
# call the restart() method on the appropriate server.
def tpc_abort(self): try:
pass # just forget about this strategy zeo_storage._restart(delay)
except:
def tpc_finish(self): self.log("Unexpected error handling waiting transaction",
# There has to be a tpc_vote() call before tpc_finish() is level=zLOG.WARNING, error=sys.exc_info())
# called, and tpc_vote() always blocks, so a proper zeo_storage.connection.close()
# tpc_finish() call will always be sent to the immediate return 0
# commit strategy object. So, if we get here, it means no else:
# call to tpc_vote() was made, which is a bug in the caller. return 1
raise RuntimeError, "Logic error. This method must not be called."
def tpc_vote(self):
self.name = "tpc_vote"
self.args = ()
return self.block()
def commitVersion(self, src, dest):
self.name = "commitVersion"
self.args = src, dest
return self.block()
def abortVersion(self, src):
self.name = "abortVersion"
self.args = src,
return self.block()
def transactionalUndo(self, trans_id):
self.name = "transactionalUndo"
self.args = trans_id,
return self.block()
def restart(self, new_strategy):
# called by the storage when the storage is available
assert isinstance(new_strategy, ImmediateCommitStrategy)
new_strategy.tpc_begin(self.txn, self.tid, self.status)
loads, loader = self.log.get_loader()
for i in range(loads):
oid, serial, data, version = loader.load()
new_strategy.store(oid, serial, data, version)
meth = getattr(new_strategy, self.name)
return meth(*self.args)
def abort(self, zeo_storage):
# Delete (d, zeo_storage) from the _waiting list, if found.
waiting = self.storage._waiting
for i in range(len(waiting)):
d, z = waiting[i]
if z is zeo_storage:
del waiting[i]
break
def run_in_thread(method, *args): def run_in_thread(method, *args):
t = SlowMethodThread(method, args) t = SlowMethodThread(method, args)
...@@ -842,5 +733,3 @@ class SlowMethodThread(threading.Thread): ...@@ -842,5 +733,3 @@ class SlowMethodThread(threading.Thread):
# Patch up class references # Patch up class references
StorageServer.ZEOStorageClass = ZEOStorage StorageServer.ZEOStorageClass = ZEOStorage
ZEOStorage.DelayedCommitStrategyClass = DelayedCommitStrategy
ZEOStorage.ImmediateCommitStrategyClass = ImmediateCommitStrategy
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment