Commit f51affd7 authored by Jim Fulton's avatar Jim Fulton

Rearranged the code a bit, especially rolled up some excess

abstraction to make the code a little more readable while trying to
decipher the storage locking logic.
parent cf9f715d
...@@ -123,7 +123,7 @@ class ZEOStorage: ...@@ -123,7 +123,7 @@ class ZEOStorage:
self.database = database self.database = database
def notifyConnected(self, conn): def notifyConnected(self, conn):
self.connection = conn # For restart_other() below self.connection = conn
assert conn.peer_protocol_version is not None assert conn.peer_protocol_version is not None
if conn.peer_protocol_version < 'Z309': if conn.peer_protocol_version < 'Z309':
self.client = ClientStub308(conn) self.client = ClientStub308(conn)
...@@ -143,9 +143,23 @@ class ZEOStorage: ...@@ -143,9 +143,23 @@ class ZEOStorage:
# any pending transaction. # any pending transaction.
if self.transaction is not None: if self.transaction is not None:
self.log("disconnected during transaction %s" % self.transaction) self.log("disconnected during transaction %s" % self.transaction)
self._abort() if not self.locked:
# Delete (d, zeo_storage) from the _waiting list, if found.
waiting = self.storage._waiting
for i in range(len(waiting)):
d, z = waiting[i]
if z is self:
del waiting[i]
self.log("Closed connection removed from waiting list."
" Clients waiting: %d." % len(waiting))
break
if self.transaction:
self.tpc_abort(self.transaction.id)
else: else:
self.log("disconnected") self.log("disconnected")
if self.stats is not None: if self.stats is not None:
self.stats.clients -= 1 self.stats.clients -= 1
...@@ -412,7 +426,6 @@ class ZEOStorage: ...@@ -412,7 +426,6 @@ class ZEOStorage:
if not self._check_tid(id): if not self._check_tid(id):
return return
assert self.locked assert self.locked
self.stats.active_txns -= 1
self.stats.commits += 1 self.stats.commits += 1
self.storage.tpc_finish(self.transaction) self.storage.tpc_finish(self.transaction)
tid = self.storage.lastTransaction() tid = self.storage.lastTransaction()
...@@ -426,7 +439,6 @@ class ZEOStorage: ...@@ -426,7 +439,6 @@ class ZEOStorage:
def tpc_abort(self, id): def tpc_abort(self, id):
if not self._check_tid(id): if not self._check_tid(id):
return return
self.stats.active_txns -= 1
self.stats.aborts += 1 self.stats.aborts += 1
if self.locked: if self.locked:
self.storage.tpc_abort(self.transaction) self.storage.tpc_abort(self.transaction)
...@@ -434,6 +446,7 @@ class ZEOStorage: ...@@ -434,6 +446,7 @@ class ZEOStorage:
def _clear_transaction(self): def _clear_transaction(self):
# Common code at end of tpc_finish() and tpc_abort() # Common code at end of tpc_finish() and tpc_abort()
self.stats.active_txns -= 1
self.transaction = None self.transaction = None
self.txnlog.close() self.txnlog.close()
if self.locked: if self.locked:
...@@ -441,27 +454,115 @@ class ZEOStorage: ...@@ -441,27 +454,115 @@ class ZEOStorage:
self.timeout.end(self) self.timeout.end(self)
self.stats.lock_time = None self.stats.lock_time = None
self.log("Transaction released storage lock", BLATHER) self.log("Transaction released storage lock", BLATHER)
# _handle_waiting() can start another transaction (by
# restarting a waiting one) so must be done last
self._handle_waiting()
def _abort(self):
# called when a connection is closed unexpectedly
if not self.locked:
# Delete (d, zeo_storage) from the _waiting list, if found.
waiting = self.storage._waiting
for i in range(len(waiting)):
d, z = waiting[i]
if z is self:
del waiting[i]
self.log("Closed connection removed from waiting list."
" Clients waiting: %d." % len(waiting))
break
if self.transaction: # Restart any client waiting for the storage lock.
self.stats.active_txns -= 1 while self.storage._waiting:
self.stats.aborts += 1 delay, zeo_storage = self.storage._waiting.pop(0)
self.tpc_abort(self.transaction.id) try:
zeo_storage._restart(delay)
except:
self.log("Unexpected error handling waiting transaction",
level=logging.WARNING, exc_info=True)
zeo_storage.connection.close()
continue
if self.storage._waiting:
n = len(self.storage._waiting)
self.log("Blocked transaction restarted. "
"Clients waiting: %d" % n)
else:
self.log("Blocked transaction restarted.")
break
# The following two methods return values, so they must acquire
# the storage lock and begin the transaction before returning.
# It's a bit vile that undo can cause us to get the lock before vote.
def undo(self, trans_id, id):
self._check_tid(id, exc=StorageTransactionError)
if self.locked:
return self._undo(trans_id)
else:
return self._wait(lambda: self._undo(trans_id))
def vote(self, id):
self._check_tid(id, exc=StorageTransactionError)
if self.locked:
return self._vote()
else:
return self._wait(lambda: self._vote())
# When a delayed transaction is restarted, the dance is
# complicated. The restart occurs when one ZEOStorage instance
# finishes as a transaction and finds another instance is in the
# _waiting list.
# It might be better to have a mechanism to explicitly send
# the finishing transaction's reply before restarting the waiting
# transaction. If the restart takes a long time, the previous
# client will be blocked until it finishes.
def _wait(self, thunk):
# Wait for the storage lock to be acquired.
self._thunk = thunk
if self.tpc_transaction():
d = Delay()
self.storage._waiting.append((d, self))
self.log("Transaction blocked waiting for storage. "
"Clients waiting: %d." % len(self.storage._waiting))
return d
else:
self.log("Transaction acquired storage lock.", BLATHER)
return self._restart()
def _restart(self, delay=None):
# Restart when the storage lock is available.
if self.txnlog.stores == 1:
template = "Preparing to commit transaction: %d object, %d bytes"
else:
template = "Preparing to commit transaction: %d objects, %d bytes"
self.log(template % (self.txnlog.stores, self.txnlog.size()),
level=BLATHER)
self.locked = 1
self.timeout.begin(self)
self.stats.lock_time = time.time()
if (self.tid is not None) or (self.status != ' '):
self.storage.tpc_begin(self.transaction, self.tid, self.status)
else:
self.storage.tpc_begin(self.transaction)
loads, loader = self.txnlog.get_loader()
for i in range(loads):
store = loader.load()
store_type = store[0]
store_args = store[1:]
if store_type == 'd':
do_store = self._delete
elif store_type == 's':
do_store = self._store
elif store_type == 'r':
do_store = self._restore
else:
raise ValueError('Invalid store type: %r' % store_type)
if not do_store(*store_args):
break
# Blob support
while self.blob_log:
oid, oldserial, data, blobfilename = self.blob_log.pop()
self.storage.storeBlob(oid, oldserial, data, blobfilename,
'', self.transaction,)
resp = self._thunk()
if delay is not None:
delay.reply(resp)
else:
return resp
# The public methods of the ZEO client API do not do the real work. # The public methods of the ZEO client API do not do the real work.
# They defer work until after the storage lock has been acquired. # They defer work until after the storage lock has been acquired.
...@@ -487,7 +588,7 @@ class ZEOStorage: ...@@ -487,7 +588,7 @@ class ZEOStorage:
assert self.blob_tempfile is None assert self.blob_tempfile is None
self.blob_tempfile = tempfile.mkstemp( self.blob_tempfile = tempfile.mkstemp(
dir=self.storage.temporaryDirectory()) dir=self.storage.temporaryDirectory())
def storeBlobChunk(self, chunk): def storeBlobChunk(self, chunk):
os.write(self.blob_tempfile[0], chunk) os.write(self.blob_tempfile[0], chunk)
...@@ -506,23 +607,6 @@ class ZEOStorage: ...@@ -506,23 +607,6 @@ class ZEOStorage:
def sendBlob(self, oid, serial): def sendBlob(self, oid, serial):
self.client.storeBlob(oid, serial, self.storage.loadBlob(oid, serial)) self.client.storeBlob(oid, serial, self.storage.loadBlob(oid, serial))
# The following four methods return values, so they must acquire
# the storage lock and begin the transaction before returning.
def vote(self, id):
self._check_tid(id, exc=StorageTransactionError)
if self.locked:
return self._vote()
else:
return self._wait(lambda: self._vote())
def undo(self, trans_id, id):
self._check_tid(id, exc=StorageTransactionError)
if self.locked:
return self._undo(trans_id)
else:
return self._wait(lambda: self._undo(trans_id))
def _delete(self, oid, serial): def _delete(self, oid, serial):
err = None err = None
try: try:
...@@ -635,102 +719,6 @@ class ZEOStorage: ...@@ -635,102 +719,6 @@ class ZEOStorage:
self.invalidated.extend(oids) self.invalidated.extend(oids)
return tid, oids return tid, oids
# When a delayed transaction is restarted, the dance is
# complicated. The restart occurs when one ZEOStorage instance
# finishes as a transaction and finds another instance is in the
# _waiting list.
# It might be better to have a mechanism to explicitly send
# the finishing transaction's reply before restarting the waiting
# transaction. If the restart takes a long time, the previous
# client will be blocked until it finishes.
def _wait(self, thunk):
# Wait for the storage lock to be acquired.
self._thunk = thunk
if self.tpc_transaction():
d = Delay()
self.storage._waiting.append((d, self))
self.log("Transaction blocked waiting for storage. "
"Clients waiting: %d." % len(self.storage._waiting))
return d
else:
self.log("Transaction acquired storage lock.", BLATHER)
return self._restart()
def _restart(self, delay=None):
# Restart when the storage lock is available.
if self.txnlog.stores == 1:
template = "Preparing to commit transaction: %d object, %d bytes"
else:
template = "Preparing to commit transaction: %d objects, %d bytes"
self.log(template % (self.txnlog.stores, self.txnlog.size()),
level=BLATHER)
self.locked = 1
self.timeout.begin(self)
self.stats.lock_time = time.time()
if (self.tid is not None) or (self.status != ' '):
self.storage.tpc_begin(self.transaction, self.tid, self.status)
else:
self.storage.tpc_begin(self.transaction)
loads, loader = self.txnlog.get_loader()
for i in range(loads):
store = loader.load()
store_type = store[0]
store_args = store[1:]
if store_type == 'd':
do_store = self._delete
elif store_type == 's':
do_store = self._store
elif store_type == 'r':
do_store = self._restore
else:
raise ValueError('Invalid store type: %r' % store_type)
if not do_store(*store_args):
break
# Blob support
while self.blob_log:
oid, oldserial, data, blobfilename = self.blob_log.pop()
self.storage.storeBlob(oid, oldserial, data, blobfilename,
'', self.transaction,)
resp = self._thunk()
if delay is not None:
delay.reply(resp)
else:
return resp
def _handle_waiting(self):
# Restart any client waiting for the storage lock.
while self.storage._waiting:
delay, zeo_storage = self.storage._waiting.pop(0)
if self._restart_other(zeo_storage, delay):
if self.storage._waiting:
n = len(self.storage._waiting)
self.log("Blocked transaction restarted. "
"Clients waiting: %d" % n)
else:
self.log("Blocked transaction restarted.")
return
def _restart_other(self, zeo_storage, delay):
# Return True if the server restarted.
# call the restart() method on the appropriate server.
try:
zeo_storage._restart(delay)
except:
self.log("Unexpected error handling waiting transaction",
level=logging.WARNING, exc_info=True)
zeo_storage.connection.close()
return 0
else:
return 1
# IStorageIteration support # IStorageIteration support
def iterator_start(self, start, stop): def iterator_start(self, start, stop):
...@@ -809,7 +797,7 @@ class StorageServerDB: ...@@ -809,7 +797,7 @@ class StorageServerDB:
def invalidateCache(self): def invalidateCache(self):
self.server._invalidateCache(self.storage_id) self.server._invalidateCache(self.storage_id)
class StorageServer: class StorageServer:
...@@ -1064,7 +1052,7 @@ class StorageServer: ...@@ -1064,7 +1052,7 @@ class StorageServer:
p.connection.trigger.pull_trigger() p.connection.trigger.pull_trigger()
except ZEO.zrpc.error.DisconnectedError: except ZEO.zrpc.error.DisconnectedError:
pass pass
def invalidate(self, conn, storage_id, tid, invalidated=(), info=None): def invalidate(self, conn, storage_id, tid, invalidated=(), info=None):
"""Internal: broadcast info and invalidations to clients. """Internal: broadcast info and invalidations to clients.
...@@ -1110,7 +1098,7 @@ class StorageServer: ...@@ -1110,7 +1098,7 @@ class StorageServer:
# #
# b. A connection is closes while we are iterating. We'll need # b. A connection is closes while we are iterating. We'll need
# to cactch and ignore Disconnected errors. # to cactch and ignore Disconnected errors.
if invalidated: if invalidated:
invq = self.invq[storage_id] invq = self.invq[storage_id]
...@@ -1138,11 +1126,11 @@ class StorageServer: ...@@ -1138,11 +1126,11 @@ class StorageServer:
do full cache verification. do full cache verification.
""" """
invq = self.invq[storage_id] invq = self.invq[storage_id]
# We make a copy of invq because it might be modified by a # We make a copy of invq because it might be modified by a
# foreign (other than main thread) calling invalidate above. # foreign (other than main thread) calling invalidate above.
invq = invq[:] invq = invq[:]
if not invq: if not invq:
...@@ -1421,4 +1409,4 @@ class ZEOStorage308Adapter: ...@@ -1421,4 +1409,4 @@ class ZEOStorage308Adapter:
def __getattr__(self, name): def __getattr__(self, name):
return getattr(self.storage, name) return getattr(self.storage, name)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment