Commit e72a3179 authored by Guido van Rossum's avatar Guido van Rossum

More cleanup and refactoring:

- Get rid of _lock() and _unlock(); _lock() is inlined, and so is
  _unlock(), after factoring out some common code at the end of
  tpc_begin() and tpc_end().

- In the refactored _unlock() code, only call self.timeout.end() if
  self.locked was actually set.

In the TimeoutThread class:

- Add some comments.

- Add some assertions.

- Get rid of the stop() method; we're a daemon thread so we'll be
  killed anyway; close_server() is only used from the test suite.

- Switch from using a lock + an event to a condition variable.  Be
  religious about doing stuff only while holding the lock.

- Inline the timeout() function; it shouldn't reacquire the lock
  anyway.

--Guido & Jeremy
parent 8333c5a5
...@@ -157,18 +157,6 @@ class ZEOStorage: ...@@ -157,18 +157,6 @@ class ZEOStorage:
return 0 return 0
return 1 return 1
# _lock() and _unlock() control the locked flag
def _lock(self):
self.locked = 1
self.timeout.begin(self)
self.stats.lock_time = time.time()
def _unlock(self):
self.locked = 0
self.timeout.end(self)
self.stats.lock_time = None
def register(self, storage_id, read_only): def register(self, storage_id, read_only):
"""Select the storage that this client will use """Select the storage that this client will use
...@@ -360,10 +348,8 @@ class ZEOStorage: ...@@ -360,10 +348,8 @@ class ZEOStorage:
if self.invalidated: if self.invalidated:
self.server.invalidate(self, self.storage_id, tid, self.server.invalidate(self, self.storage_id, tid,
self.invalidated, self.get_size_info()) self.invalidated, self.get_size_info())
self.transaction = None self._clear_transaction()
self._unlock()
# Return the tid, for cache invalidation optimization # Return the tid, for cache invalidation optimization
self._handle_waiting()
return tid return tid
def tpc_abort(self, id): def tpc_abort(self, id):
...@@ -373,8 +359,17 @@ class ZEOStorage: ...@@ -373,8 +359,17 @@ class ZEOStorage:
self.stats.aborts += 1 self.stats.aborts += 1
if self.locked: if self.locked:
self.storage.tpc_abort(self.transaction) self.storage.tpc_abort(self.transaction)
self._clear_transaction()
def _clear_transaction(self):
# Common code at end of tpc_finish() and tpc_abort()
self.transaction = None self.transaction = None
self._unlock() if self.locked:
self.locked = 0
self.timeout.end(self)
self.stats.lock_time = None
# _handle_waiting() can start another transaction (by
# restarting a waiting one) so must be done last
self._handle_waiting() self._handle_waiting()
def _abort(self): def _abort(self):
...@@ -437,7 +432,9 @@ class ZEOStorage: ...@@ -437,7 +432,9 @@ class ZEOStorage:
return self._wait(lambda: self._transactionalUndo(trans_id)) return self._wait(lambda: self._transactionalUndo(trans_id))
def _tpc_begin(self, txn, tid, status): def _tpc_begin(self, txn, tid, status):
self._lock() self.locked = 1
self.timeout.begin(self)
self.stats.lock_time = time.time()
self.storage.tpc_begin(txn, tid, status) self.storage.tpc_begin(txn, tid, status)
def _store(self, oid, serial, data, version): def _store(self, oid, serial, data, version):
...@@ -753,8 +750,6 @@ class StorageServer: ...@@ -753,8 +750,6 @@ class StorageServer:
This is only called from the test suite, AFAICT. This is only called from the test suite, AFAICT.
""" """
for timeout in self.timeouts.values():
timeout.stop()
self.dispatcher.close() self.dispatcher.close()
if self.monitor is not None: if self.monitor is not None:
self.monitor.close() self.monitor.close()
...@@ -786,82 +781,63 @@ class StubTimeoutThread: ...@@ -786,82 +781,63 @@ class StubTimeoutThread:
def end(self, client): def end(self, client):
pass pass
def stop(self):
pass
class TimeoutThread(threading.Thread): class TimeoutThread(threading.Thread):
"""Monitors transaction progress and generates timeouts.""" """Monitors transaction progress and generates timeouts."""
# There is one TimeoutThread per storage, because there's one
# transaction lock per storage.
def __init__(self, timeout): def __init__(self, timeout):
threading.Thread.__init__(self) threading.Thread.__init__(self)
self.setDaemon(1) self.setDaemon(1)
self._timeout = timeout self._timeout = timeout
self._client = None self._client = None
self._deadline = None self._deadline = None
self._stop = 0 self._cond = threading.Condition() # Protects _client and _deadline
self._active = threading.Event()
self._lock = threading.Lock()
self._trigger = trigger() self._trigger = trigger()
def stop(self):
self._stop = 1
def begin(self, client): def begin(self, client):
self._lock.acquire() # Called from the restart code the "main" thread, whenever the
# storage lock is being acquired. (Serialized by asyncore.)
self._cond.acquire()
try: try:
self._active.set() assert self._client is None
self._client = client self._client = client
self._deadline = time.time() + self._timeout self._deadline = time.time() + self._timeout
self._cond.notify()
finally: finally:
self._lock.release() self._cond.release()
def end(self, client): def end(self, client):
# The ZEOStorage will call this message for every aborted # Called from the "main" thread whenever the storage lock is
# transaction, regardless of whether the transaction started # being released. (Serialized by asyncore.)
# the 2PC. Ignore here if 2PC never began. self._cond.acquire()
if client is not self._client:
return
self._lock.acquire()
try: try:
self._active.clear() assert self._client is not None
self._client = None self._client = None
self._deadline = None self._deadline = None
finally: finally:
self._lock.release() self._cond.release()
def run(self): def run(self):
while not self._stop: # Code running in the thread.
self._active.wait() while 1:
self._lock.acquire() self._cond.acquire()
try: try:
deadline = self._deadline while self._client is None:
if deadline is None: self._cond.wait()
continue howlong = self._deadline - time.time()
howlong = deadline - time.time() client = self._client # For the howlong <= 0 branch below
finally: finally:
self._lock.release() self._cond.release()
if howlong <= 0: if howlong <= 0:
self.timeout() client.log("Transaction timeout after %s seconds" %
self._timeout)
self._trigger.pull_trigger(lambda: client.connection.close())
else: else:
time.sleep(howlong) time.sleep(howlong)
self.trigger.close() self.trigger.close()
def timeout(self):
self._lock.acquire()
try:
client = self._client
deadline = self._deadline
self._active.clear()
self._client = None
self._deadline = None
finally:
self._lock.release()
if client is None:
return
elapsed = time.time() - (deadline - self._timeout)
client.log("Transaction timeout after %d seconds" % int(elapsed))
self._trigger.pull_trigger(lambda: client.connection.close())
def run_in_thread(method, *args): def run_in_thread(method, *args):
t = SlowMethodThread(method, args) t = SlowMethodThread(method, args)
t.start() t.start()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment