Commit fb8e39c8 authored by Jeremy Hylton's avatar Jeremy Hylton

Merge jeremy-atomic-invalidation-branch.

Add suspend() and resume() to transaction manager API.
Change implementation so that the thread-aware manager does not
inherit from the thread-agnostic manager.
Add suspended transaction state.
parent b0b6fd8d
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
from transaction.manager import ThreadedTransactionManager from transaction.manager import ThreadedTransactionManager
_manager = ThreadedTransactionManager() _manager = ThreadedTransactionManager()
get_transaction = _manager.new get_transaction = _manager.get
def set_factory(factory): def set_factory(factory):
_manager.txn_factory = factory _manager.txn_factory = factory
...@@ -75,3 +75,43 @@ class ITransaction(Interface): ...@@ -75,3 +75,43 @@ class ITransaction(Interface):
def status(): def status():
"""Return status of the current transaction.""" """Return status of the current transaction."""
def suspend():
"""Suspend the current transaction.
If a transaction is suspended, the transaction manager no
longer treats it as active. The resume() method must be
called before the transaction can be used.
"""
def resume():
"""Resume the current transaction.
If another transaction is active, it must be suspended before
resume() is called.
"""
class ITransactionManager(Interface):
"""Coordinates application use of transactional resources."""
def get():
"""Return the curren transaction.
Calls new() to start a new transaction if one does not exist.
"""
def begin():
"""Return a new transaction.
If a transaction is currently active for the calling thread,
it is aborted.
"""
def commit(txn):
"""Commit txn."""
def abort(txn):
"""Abort txn."""
def savepoint(txn):
"""Return rollback object that can restore txn to current state."""
import logging import logging
from transaction.interfaces import IRollback from transaction.interfaces import *
from transaction.txn import Transaction, Status from transaction.txn import Transaction, Status, Set
# XXX need to change asserts of transaction status into explicit checks # XXX need to change asserts of transaction status into explicit checks
# that raise some exception # that raise some exception
# XXX need lots of error checking # XXX need lots of error checking
class TransactionManager(object): class AbstractTransactionManager(object):
# base class to provide commit logic
txn_factory = Transaction # concrete class must provide logger attribute
def __init__(self):
self.logger = logging.getLogger("txn")
def new(self):
txn = self.txn_factory(self)
self.logger.debug("%s: begin", txn)
return txn
def commit(self, txn): def commit(self, txn):
# commit calls _finishCommit() or abort()
assert txn._status is Status.ACTIVE assert txn._status is Status.ACTIVE
txn._status = Status.PREPARING txn._status = Status.PREPARING
prepare_ok = True prepare_ok = True
...@@ -35,11 +28,11 @@ class TransactionManager(object): ...@@ -35,11 +28,11 @@ class TransactionManager(object):
txn._status = Status.PREPARED txn._status = Status.PREPARED
# XXX An error below is intolerable. What state to use? # XXX An error below is intolerable. What state to use?
if prepare_ok: if prepare_ok:
self._commit(txn) self._finishCommit(txn)
else: else:
self.abort(txn) self.abort(txn)
def _commit(self, txn): def _finishCommit(self, txn):
self.logger.debug("%s: commit", txn) self.logger.debug("%s: commit", txn)
# finish the two-phase commit # finish the two-phase commit
for r in txn._resources: for r in txn._resources:
...@@ -58,6 +51,36 @@ class TransactionManager(object): ...@@ -58,6 +51,36 @@ class TransactionManager(object):
self.logger.debug("%s: savepoint", txn) self.logger.debug("%s: savepoint", txn)
return Rollback([r.savepoint(txn) for r in txn._resources]) return Rollback([r.savepoint(txn) for r in txn._resources])
class TransactionManager(AbstractTransactionManager):
txn_factory = Transaction
__implements__ = ITransactionManager
def __init__(self):
self.logger = logging.getLogger("txn")
self._current = None
def get(self):
if self._current is None:
self._current = self.begin()
return self._current
def begin(self):
txn = self.txn_factory(self)
self.logger.debug("%s: begin", txn)
return txn
def commit(self, txn):
super(TransactionManager, self).commit(txn)
self._current = None
def abort(self, txn):
super(TransactionManager, self).abort(txn)
self._current = None
# XXX need suspend and resume
class Rollback(object): class Rollback(object):
__implements__ = IRollback __implements__ = IRollback
...@@ -72,24 +95,46 @@ class Rollback(object): ...@@ -72,24 +95,46 @@ class Rollback(object):
# make the transaction manager visible to client code # make the transaction manager visible to client code
import thread import thread
class ThreadedTransactionManager(TransactionManager): class ThreadedTransactionManager(AbstractTransactionManager):
# XXX Do we need locking on _pool or _suspend?
# Most methods read and write pool based on the id of the current
# thread, so they should never interfere with each other.
# The suspend() and resume() methods modify the _suspend set,
# but suspend() only adds a new thread. The resume() method
# does need a lock to prevent two different threads from resuming
# the same transaction.
__implements__ = ITransactionManager
def __init__(self): def __init__(self):
TransactionManager.__init__(self) self.logger = logging.getLogger("txn")
self._pool = {} self._pool = {}
self._suspend = Set()
self._lock = thread.allocate_lock()
def new(self): def get(self):
tid = thread.get_ident() tid = thread.get_ident()
txn = self._pool.get(tid) txn = self._pool.get(tid)
if txn is None: if txn is None:
txn = super(ThreadedTransactionManager, self).new() txn = self.begin()
return txn
def begin(self):
tid = thread.get_ident()
txn = self._pool.get(tid)
if txn is not None:
txn.abort()
txn = self.txn_factory(self)
self._pool[tid] = txn self._pool[tid] = txn
return txn return txn
def _commit(self, txn): def _finishCommit(self, txn):
tid = thread.get_ident() tid = thread.get_ident()
assert self._pool[tid] is txn assert self._pool[tid] is txn
super(ThreadedTransactionManager, self)._commit(txn) super(ThreadedTransactionManager, self)._finishCommit(txn)
del self._pool[tid] del self._pool[tid]
def abort(self, txn): def abort(self, txn):
...@@ -97,3 +142,27 @@ class ThreadedTransactionManager(TransactionManager): ...@@ -97,3 +142,27 @@ class ThreadedTransactionManager(TransactionManager):
assert self._pool[tid] is txn assert self._pool[tid] is txn
super(ThreadedTransactionManager, self).abort(txn) super(ThreadedTransactionManager, self).abort(txn)
del self._pool[tid] del self._pool[tid]
# XXX should we require that the transaction calling suspend()
# be the one that is using the transaction?
# XXX need to add locking to suspend() and resume()
def suspend(self, txn):
tid = thread.get_ident()
if self._pool[tid] is txn:
self._suspend.add(txn)
del self._pool[tid]
else:
raise TransactionError("txn %s not owned by thread %s" %
(txn, tid))
def resume(self, txn):
tid = thread.get_ident()
if self._pool.get(tid) is not None:
raise TransactionError("thread %s already has transaction" %
tid)
if txn not in self._suspend:
raise TransactionError("unknown transaction: %s" % txn)
del self._suspend[txn]
self._pool[tid] = txn
...@@ -4,6 +4,7 @@ ...@@ -4,6 +4,7 @@
__metaclass__ = type __metaclass__ = type
from transaction.interfaces import ITransaction, TransactionError from transaction.interfaces import ITransaction, TransactionError
from threading import Lock
class Set(dict): class Set(dict):
...@@ -19,6 +20,7 @@ class Status: ...@@ -19,6 +20,7 @@ class Status:
COMMITTED = "Committed" COMMITTED = "Committed"
ABORTING = "Aborting" ABORTING = "Aborting"
ABORTED = "Aborted" ABORTED = "Aborted"
SUSPENDED = "Suspended"
class Transaction: class Transaction:
...@@ -28,7 +30,9 @@ class Transaction: ...@@ -28,7 +30,9 @@ class Transaction:
self._manager = manager self._manager = manager
self._parent = parent self._parent = parent
self._status = Status.ACTIVE self._status = Status.ACTIVE
self._suspend = None
self._resources = Set() self._resources = Set()
self._lock = Lock()
def __repr__(self): def __repr__(self):
return "<%s %X %s>" % (self.__class__.__name__, id(self), self._status) return "<%s %X %s>" % (self.__class__.__name__, id(self), self._status)
...@@ -69,3 +73,25 @@ class Transaction: ...@@ -69,3 +73,25 @@ class Transaction:
def status(self): def status(self):
"""Return the status of the transaction.""" """Return the status of the transaction."""
return self._status return self._status
def suspend(self):
self._lock.acquire()
try:
if self._status == Status.SUSPENDED:
raise TransactionError("Already suspended")
self._manager.suspend(self)
self._suspend = self._status
self._status = Status.SUSPENDED
finally:
self._lock.release()
def resume(self):
self._lock.acquire()
try:
if self._status != Status.SUSPENDED:
raise TransactionError("Can only resume suspended transaction")
self._manager.resume(self)
self._status = self._suspend
self._suspend = None
finally:
self._lock.release()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment