Commit 860e89b9 authored by Jeremy Hylton's avatar Jeremy Hylton

Merge the jeremy-txn-branch to the head.

This branch introduces a new transaction API.  The key features are:
  - top-level functions in transaction -- get(), commit(), abort()
  - explicit transaction manager objects
  - Transaction objects are used for exactly one transaction
  - support for transaction synchronizers

The changes here are still provisional, but we want to get them off an
obscure branch and onto the head for further development.
parent 64fced28
......@@ -27,6 +27,8 @@ import sys
import threading
import time
import transaction
from ZEO import ClientStub
from ZEO.CommitLog import CommitLog
from ZEO.monitor import StorageStats, StatsServer
......@@ -40,7 +42,6 @@ from ZODB.ConflictResolution import ResolvedSerial
from ZODB.POSException import StorageError, StorageTransactionError
from ZODB.POSException import TransactionError, ReadOnlyError, ConflictError
from ZODB.serialize import referencesf
from ZODB.Transaction import Transaction
from ZODB.utils import u64, oid_repr
_label = "ZSS" # Default label used for logging.
......@@ -365,7 +366,7 @@ class ZEOStorage:
raise StorageTransactionError("Multiple simultaneous tpc_begin"
" requests from one client.")
self.transaction = t = Transaction()
self.transaction = t = transaction.Transaction()
t.id = id
t.user = user
t.description = description
......
......@@ -13,10 +13,11 @@
##############################################################################
"""Tests of the ZEO cache"""
from ZODB.Transaction import Transaction
from ZODB.tests.MinPO import MinPO
from ZODB.tests.StorageTestBase import zodb_unpickle
from transaction import Transaction
class TransUndoStorageWithCache:
def checkUndoInvalidation(self):
......
......@@ -16,8 +16,8 @@
import threading
import time
from ZODB.Transaction import Transaction
from persistent.TimeStamp import TimeStamp
import transaction
from ZODB.tests.StorageTestBase import zodb_pickle, MinPO
import ZEO.ClientStorage
......@@ -97,7 +97,7 @@ class CommitLockTests:
self._storages = []
def _start_txn(self):
txn = Transaction()
txn = transaction.Transaction()
self._storage.tpc_begin(txn)
oid = self._storage.new_oid()
self._storage.store(oid, ZERO, zodb_pickle(MinPO(1)), '', txn)
......@@ -112,7 +112,7 @@ class CommitLockTests:
for i in range(self.NUM_CLIENTS):
storage = self._duplicate_client()
txn = Transaction()
txn = transaction.Transaction()
tid = self._get_timestamp()
t = WorkerThread(self, storage, txn)
......
......@@ -29,13 +29,15 @@ from ZEO.zrpc.marshal import Marshaller
from ZEO.tests import forker
from ZODB.DB import DB
from ZODB.Transaction import get_transaction, Transaction
from ZODB.POSException import ReadOnlyError, ConflictError
from ZODB.tests.StorageTestBase import StorageTestBase
from ZODB.tests.MinPO import MinPO
from ZODB.tests.StorageTestBase \
import zodb_pickle, zodb_unpickle, handle_all_serials, handle_serials
import transaction
from transaction import Transaction
ZERO = '\0'*8
class TestServerStub(ZEO.ServerStub.StorageServer):
......@@ -465,7 +467,7 @@ class ConnectionTests(CommonSetupTearDown):
zLOG.LOG("checkReconnection", zLOG.INFO,
"Error after server restart; retrying.",
error=sys.exc_info())
get_transaction().abort()
transaction.abort()
# Give the other thread a chance to run.
time.sleep(0.1)
zLOG.LOG("checkReconnection", zLOG.INFO, "finished")
......@@ -552,7 +554,7 @@ class ConnectionTests(CommonSetupTearDown):
r1 = c1.root()
r1["a"] = MinPO("a")
get_transaction().commit()
transaction.commit()
db2 = DB(self.openClientStorage())
r2 = db2.open().root()
......@@ -560,7 +562,7 @@ class ConnectionTests(CommonSetupTearDown):
self.assertEqual(r2["a"].value, "a")
r2["b"] = MinPO("b")
get_transaction().commit()
transaction.commit()
# make sure the invalidation is received in the other client
for i in range(10):
......
......@@ -16,6 +16,8 @@ import threading
import time
from random import Random
import transaction
from BTrees.check import check, display
from BTrees.OOBTree import OOBTree
......@@ -70,8 +72,7 @@ class StressTask:
self.step = step
self.sleep = sleep
self.added_keys = []
self.cn = self.db.open()
self.cn.setLocalTransaction()
self.cn = self.db.open(txn_mgr=transaction.TransactionManager())
self.cn.sync()
def doStep(self):
......
......@@ -15,9 +15,8 @@
import threading
from ZODB.Transaction import Transaction
import transaction
from ZODB.tests.StorageTestBase import zodb_pickle, MinPO
import ZEO.ClientStorage
ZERO = '\0'*8
......@@ -25,7 +24,7 @@ ZERO = '\0'*8
class BasicThread(threading.Thread):
def __init__(self, storage, doNextEvent, threadStartedEvent):
self.storage = storage
self.trans = Transaction()
self.trans = transaction.Transaction()
self.doNextEvent = doNextEvent
self.threadStartedEvent = threadStartedEvent
self.gotValueError = 0
......
......@@ -13,7 +13,7 @@
##############################################################################
"""Database connection support
$Id: Connection.py,v 1.140 2004/03/16 16:18:20 jeremy Exp $"""
$Id: Connection.py,v 1.141 2004/04/01 03:56:58 jeremy Exp $"""
import logging
import sys
......@@ -26,12 +26,13 @@ from utils import u64
from persistent import PickleCache
from persistent.interfaces import IPersistent
import transaction
from ZODB.ConflictResolution import ResolvedSerial
from ZODB.ExportImport import ExportImport
from ZODB.POSException \
import ConflictError, ReadConflictError, InvalidObjectReference
from ZODB.TmpStore import TmpStore
from ZODB.Transaction import Transaction, get_transaction
from ZODB.utils import oid_repr, z64
from ZODB.serialize import ObjectWriter, ConnectionObjectReader, myhasattr
......@@ -123,10 +124,9 @@ class Connection(ExportImport, object):
_tmp = None
_code_timestamp = 0
_transaction = None
def __init__(self, version='', cache_size=400,
cache_deactivate_after=None, mvcc=True):
cache_deactivate_after=None, mvcc=True, txn_mgr=None):
"""Create a new Connection.
A Connection instance should by instantiated by the DB
......@@ -153,6 +153,13 @@ class Connection(ExportImport, object):
self._reset_counter = global_reset_counter
self._load_count = 0 # Number of objects unghosted
self._store_count = 0 # Number of objects stored
self._modified = []
# If a transaction manager is passed to the constructor, use
# it instead of the global transaction manager. The instance
# variable will hold either a TM class or the transaction
# module itself, which implements the same interface.
self._txn_mgr = txn_mgr or transaction
# _invalidated queues invalidate messages delivered from the DB
# _inv_lock prevents one thread from modifying the set while
......@@ -187,21 +194,18 @@ class Connection(ExportImport, object):
self._import = None
def getTransaction(self):
t = self._transaction
if t is None:
# Fall back to thread-bound transactions
t = get_transaction()
return t
# XXX mark this as deprecated?
return self._txn_mgr.get()
def setLocalTransaction(self):
"""Use a transaction bound to the connection rather than the thread"""
# XXX mention that this should only be called when you open
# a connection or at transaction boundaries (but the lattter are
# hard to be sure about).
if self._transaction is None:
self._transaction = Transaction()
return self._transaction
# XXX mark this method as depcrecated? note that it's
# signature changed?
if self._txn_mgr is transaction:
self._txn_mgr = transaction.TransactionManager()
return self._txn_mgr
def _cache_items(self):
# find all items on the lru list
......@@ -307,10 +311,13 @@ class Connection(ExportImport, object):
assert obj._p_oid is None
oid = obj._p_oid = self._storage.new_oid()
obj._p_jar = self
self._added[oid] = obj
if self._added_during_commit is not None:
self._added_during_commit.append(obj)
self.getTransaction().register(obj)
self._txn_mgr.get().register(obj)
# Add to _added after calling register(), so that _added
# can be used as a test for whether the object has been
# registered with the transaction.
self._added[oid] = obj
elif obj._p_jar is not self:
raise InvalidObjectReference(obj, obj._p_jar)
......@@ -453,7 +460,7 @@ class Connection(ExportImport, object):
except: # except what?
f = getattr(f, 'im_self', f)
self._log.error("Close callback failed for %s", f,
sys.exc_info())
exc_info=sys.exc_info())
self.__onCloseCallbacks = None
self._storage = self._tmp = self.new_oid = None
self._debug_info = ()
......@@ -472,7 +479,6 @@ class Connection(ExportImport, object):
oid = obj._p_oid
if oid in self._conflicts:
self.getTransaction().register(obj)
raise ReadConflictError(object=obj)
if oid is None or obj._p_jar is not self:
......@@ -650,7 +656,10 @@ class Connection(ExportImport, object):
# a way, this will be a very confusing warning.
warnings.warn("Assigning to _p_jar is deprecated",
DeprecationWarning)
self.getTransaction().register(obj)
elif obj._p_oid in self._added:
# It was registered before it was added to _added.
return
self._txn_mgr.get().register(obj)
def root(self):
"""Return the database root object.
......@@ -729,7 +738,7 @@ class Connection(ExportImport, object):
"""Load non-current state for obj or raise ReadConflictError."""
if not (self._mvcc and self._setstate_noncurrent(obj)):
self.getTransaction().register(obj)
self._txn_mgr.get().register(obj)
self._conflicts[obj._p_oid] = True
raise ReadConflictError(object=obj)
......@@ -771,7 +780,7 @@ class Connection(ExportImport, object):
finally:
self._inv_lock.release()
else:
self.getTransaction().register(obj)
self._txn_mgr.get().register(obj)
raise ReadConflictError(object=obj)
def oldstate(self, obj, tid):
......@@ -829,19 +838,16 @@ class Connection(ExportImport, object):
del obj._p_oid
del obj._p_jar
def tpc_begin(self, transaction, sub=None):
def tpc_begin(self, transaction, sub=False):
self._modified = []
# _creating is a list of oids of new objects, which is used to
# remove them from the cache if a transaction aborts.
self._creating = []
if sub:
if sub and self._tmp is None:
# Sub-transaction!
if self._tmp is None:
_tmp = TmpStore(self._version)
self._tmp = self._storage
self._storage = _tmp
_tmp.registerDB(self._db, 0)
self._storage = TmpStore(self._version, self._storage)
self._storage.tpc_begin(transaction)
......@@ -920,7 +926,7 @@ class Connection(ExportImport, object):
self._flush_invalidations()
def sync(self):
self.getTransaction().abort()
self._txn_mgr.get().abort()
sync = getattr(self._storage, 'sync', 0)
if sync:
sync()
......@@ -949,5 +955,5 @@ class Connection(ExportImport, object):
new._p_oid = oid
new._p_jar = self
new._p_changed = 1
self.getTransaction().register(new)
self._txn_mgr.get().register(new)
self._cache[oid] = new
......@@ -13,7 +13,7 @@
##############################################################################
"""Database objects
$Id: DB.py,v 1.71 2004/03/16 16:28:19 jeremy Exp $"""
$Id: DB.py,v 1.72 2004/04/01 03:56:58 jeremy Exp $"""
import cPickle, cStringIO, sys
from thread import allocate_lock
......@@ -23,9 +23,10 @@ import warnings
from ZODB.broken import find_global
from ZODB.Connection import Connection
from ZODB.serialize import referencesf
from ZODB.Transaction import Transaction, get_transaction
from zLOG import LOG, ERROR
import transaction
class DB(object):
"""The Object Database
-------------------
......@@ -132,7 +133,7 @@ class DB(object):
p = cPickle.Pickler(file, 1)
p.dump((root.__class__, None))
p.dump(root.__getstate__())
t = Transaction()
t = transaction.Transaction()
t.description = 'initial database creation'
storage.tpc_begin(t)
storage.store('\0\0\0\0\0\0\0\0', None, file.getvalue(), '', t)
......@@ -140,13 +141,12 @@ class DB(object):
storage.tpc_finish(t)
# Pass through methods:
for m in ('history',
'supportsUndo', 'supportsVersions', 'undoLog',
'versionEmpty', 'versions'):
for m in ['history', 'supportsUndo', 'supportsVersions', 'undoLog',
'versionEmpty', 'versions']:
setattr(self, m, getattr(storage, m))
if hasattr(storage, 'undoInfo'):
self.undoInfo=storage.undoInfo
self.undoInfo = storage.undoInfo
def _cacheMean(self, attr):
......@@ -206,10 +206,10 @@ class DB(object):
self._temps=t
finally: self._r()
def abortVersion(self, version, transaction=None):
if transaction is None:
transaction = get_transaction()
transaction.register(AbortVersion(self, version))
def abortVersion(self, version, txn=None):
if txn is None:
txn = transaction.get()
txn.register(AbortVersion(self, version))
def cacheDetail(self):
"""Return information on objects in the various caches
......@@ -316,10 +316,10 @@ class DB(object):
"""
self._storage.close()
def commitVersion(self, source, destination='', transaction=None):
if transaction is None:
transaction = get_transaction()
transaction.register(CommitVersion(self, source, destination))
def commitVersion(self, source, destination='', txn=None):
if txn is None:
txn = transaction.get()
txn.register(CommitVersion(self, source, destination))
def getCacheSize(self):
return self._cache_size
......@@ -391,7 +391,7 @@ class DB(object):
return len(self._storage)
def open(self, version='', transaction=None, temporary=0, force=None,
waitflag=1, mvcc=True):
waitflag=1, mvcc=True, txn_mgr=None):
"""Return a database Connection for use by application code.
The optional version argument can be used to specify that a
......@@ -424,7 +424,7 @@ class DB(object):
# a one-use connection.
c = self.klass(version=version,
cache_size=self._version_cache_size,
mvcc=mvcc)
mvcc=mvcc, txn_mgr=txn_mgr)
c._setDB(self)
self._temps.append(c)
if transaction is not None:
......@@ -474,13 +474,13 @@ class DB(object):
if self._version_pool_size > len(allocated) or force:
c = self.klass(version=version,
cache_size=self._version_cache_size,
mvcc=mvcc)
mvcc=mvcc, txn_mgr=txn_mgr)
allocated.append(c)
pool.append(c)
elif self._pool_size > len(allocated) or force:
c = self.klass(version=version,
cache_size=self._cache_size,
mvcc=mvcc)
mvcc=mvcc, txn_mgr=txn_mgr)
allocated.append(c)
pool.append(c)
......@@ -611,7 +611,7 @@ class DB(object):
def cacheStatistics(self): return () # :(
def undo(self, id, transaction=None):
def undo(self, id, txn=None):
"""Undo a transaction identified by id.
A transaction can be undone if all of the objects involved in
......@@ -625,12 +625,12 @@ class DB(object):
:Parameters:
- `id`: a storage-specific transaction identifier
- `transaction`: transaction context to use for undo().
- `txn`: transaction context to use for undo().
By default, uses the current transaction.
"""
if transaction is None:
transaction = get_transaction()
transaction.register(TransactionalUndo(self, id))
if txn is None:
txn = transaction.get()
txn.register(TransactionalUndo(self, id))
def versionEmpty(self, version):
return self._storage.versionEmpty(version)
......@@ -663,7 +663,6 @@ class ResourceManager(object):
def __init__(self, db):
self._db = db
# Delegate the actual 2PC methods to the storage
self.tpc_begin = self._db._storage.tpc_begin
self.tpc_vote = self._db._storage.tpc_vote
self.tpc_finish = self._db._storage.tpc_finish
self.tpc_abort = self._db._storage.tpc_abort
......@@ -671,13 +670,19 @@ class ResourceManager(object):
def sortKey(self):
return "%s:%s" % (self._db._storage.sortKey(), id(self))
def tpc_begin(self, txn, sub=False):
# XXX we should never be called with sub=True.
if sub:
raise ValueError, "doesn't supoprt sub-transactions"
self._db._storage.tpc_begin(txn)
# The object registers itself with the txn manager, so the ob
# argument to the methods below is self.
def abort(self, ob, t):
def abort(self, obj, txn):
pass
def commit(self, ob, t):
def commit(self, obj, txn):
pass
class CommitVersion(ResourceManager):
......
......@@ -21,7 +21,7 @@ It is meant to illustrate the simplest possible storage.
The Mapping storage uses a single data structure to map object ids to data.
"""
__version__='$Revision: 1.12 $'[11:-2]
__version__='$Revision: 1.13 $'[11:-2]
from ZODB import utils
from ZODB import BaseStorage
......@@ -68,6 +68,16 @@ class MappingStorage(BaseStorage.BaseStorage):
finally:
self._lock_release()
def getTid(self, oid):
self._lock_acquire()
try:
# The tid is the first 8 bytes of the buffer.
s = self._index[oid]
return s[:8]
finally:
self._lock_release()
def store(self, oid, serial, data, version, transaction):
if transaction is not self._transaction:
raise POSException.StorageTransactionError(self, transaction)
......
......@@ -22,8 +22,9 @@ class TmpStore:
_bver = ''
def __init__(self, base_version):
def __init__(self, base_version, storage):
self._transaction = None
self._storage = storage
if base_version:
self._bver = base_version
self._file = tempfile.TemporaryFile()
......@@ -34,14 +35,13 @@ class TmpStore:
self._index = {}
# _tindex: map oid to pos for new updates
self._tindex = {}
self._db = None
self._creating = []
def close(self):
self._file.close()
def getName(self):
return self._db.getName()
return self._storage.getName()
def getSize(self):
return self._pos
......@@ -66,14 +66,13 @@ class TmpStore:
def modifiedInVersion(self, oid):
if self._index.has_key(oid):
return self._bver
return self._db._storage.modifiedInVersion(oid)
return self._storage.modifiedInVersion(oid)
def new_oid(self):
return self._db._storage.new_oid()
return self._storage.new_oid()
def registerDB(self, db, limit):
self._db = db
self._storage = db._storage
pass
def store(self, oid, serial, data, version, transaction):
if transaction is not self._transaction:
......
......@@ -13,7 +13,7 @@
##############################################################################
"""Transaction management
$Id: Transaction.py,v 1.58 2004/02/27 00:31:53 faassen Exp $
$Id: Transaction.py,v 1.59 2004/04/01 03:56:58 jeremy Exp $
"""
import sys
from thread import get_ident as _get_ident
......@@ -64,6 +64,7 @@ class Transaction:
self._id=id
self._objects=[]
self._append=self._objects.append
raise RuntimeError
def _init(self):
self._objects=[]
......@@ -532,25 +533,27 @@ class DataManagerAdapter(object):
############################################################################
# install get_transaction:
# Map thread ident to its Transaction instance.
_tid2tran = {}
# Get Transaction associated with current thread; if none, create a
# new Transaction and return it.
def get_transaction():
tid = _get_ident()
result = _tid2tran.get(tid)
if result is None:
_tid2tran[tid] = result = Transaction(tid)
return result
# Forget whatever Transaction (if any) is associated with current thread.
def free_transaction():
tid = _get_ident()
try:
del _tid2tran[tid]
except KeyError:
pass
### Map thread ident to its Transaction instance.
##_tid2tran = {}
### Get Transaction associated with current thread; if none, create a
### new Transaction and return it.
##def get_transaction():
## tid = _get_ident()
## result = _tid2tran.get(tid)
## if result is None:
## _tid2tran[tid] = result = Transaction(tid)
## return result
### Forget whatever Transaction (if any) is associated with current thread.
##def free_transaction():
## tid = _get_ident()
## try:
## del _tid2tran[tid]
## except KeyError:
## pass
from transaction import get as get_transaction
import __builtin__
__builtin__.get_transaction = get_transaction
......
......@@ -19,20 +19,20 @@ http://www.zope.org/Documentation/Developer/Models/ZODB/ZODB_Architecture_Storag
All storages should be able to pass these tests.
"""
from ZODB.Transaction import Transaction
from ZODB import POSException
from ZODB.tests.MinPO import MinPO
from ZODB.tests.StorageTestBase \
import zodb_unpickle, zodb_pickle, handle_serials
import transaction
ZERO = '\0'*8
class BasicStorage:
def checkBasics(self):
t = Transaction()
t = transaction.Transaction()
self._storage.tpc_begin(t)
# This should simply return
self._storage.tpc_begin(t)
......@@ -44,10 +44,10 @@ class BasicStorage:
self.assertRaises(
POSException.StorageTransactionError,
self._storage.store,
0, 0, 0, 0, Transaction())
0, 0, 0, 0, transaction.Transaction())
try:
self._storage.abortVersion('dummy', Transaction())
self._storage.abortVersion('dummy', transaction.Transaction())
except (POSException.StorageTransactionError,
POSException.VersionCommitError):
pass # test passed ;)
......@@ -55,7 +55,7 @@ class BasicStorage:
assert 0, "Should have failed, invalid transaction."
try:
self._storage.commitVersion('dummy', 'dummer', Transaction())
self._storage.commitVersion('dummy', 'dummer', transaction.Transaction())
except (POSException.StorageTransactionError,
POSException.VersionCommitError):
pass # test passed ;)
......@@ -65,13 +65,13 @@ class BasicStorage:
self.assertRaises(
POSException.StorageTransactionError,
self._storage.store,
0, 1, 2, 3, Transaction())
0, 1, 2, 3, transaction.Transaction())
self._storage.tpc_abort(t)
def checkSerialIsNoneForInitialRevision(self):
eq = self.assertEqual
oid = self._storage.new_oid()
txn = Transaction()
txn = transaction.Transaction()
self._storage.tpc_begin(txn)
# Use None for serial. Don't use _dostore() here because that coerces
# serial=None to serial=ZERO.
......@@ -120,7 +120,7 @@ class BasicStorage:
def checkWriteAfterAbort(self):
oid = self._storage.new_oid()
t = Transaction()
t = transaction.Transaction()
self._storage.tpc_begin(t)
self._storage.store(oid, ZERO, zodb_pickle(MinPO(5)), '', t)
# Now abort this transaction
......@@ -133,7 +133,7 @@ class BasicStorage:
oid1 = self._storage.new_oid()
revid1 = self._dostore(oid=oid1, data=MinPO(-2))
oid = self._storage.new_oid()
t = Transaction()
t = transaction.Transaction()
self._storage.tpc_begin(t)
self._storage.store(oid, ZERO, zodb_pickle(MinPO(5)), '', t)
# Now abort this transaction
......@@ -176,7 +176,7 @@ class BasicStorage:
def checkTwoArgBegin(self):
# XXX how standard is three-argument tpc_begin()?
t = Transaction()
t = transaction.Transaction()
tid = '\0\0\0\0\0psu'
self._storage.tpc_begin(t, tid)
oid = self._storage.new_oid()
......@@ -205,7 +205,7 @@ class BasicStorage:
def checkNote(self):
oid = self._storage.new_oid()
t = Transaction()
t = transaction.Transaction()
self._storage.tpc_begin(t)
t.note('this is a test')
self._storage.store(oid, ZERO, zodb_pickle(MinPO(5)), '', t)
......
......@@ -13,9 +13,9 @@
##############################################################################
"""Tests for application-level conflict resolution."""
from ZODB.Transaction import Transaction
from ZODB.POSException import ConflictError, UndoError
from persistent import Persistent
from transaction import Transaction
from ZODB.tests.StorageTestBase import zodb_unpickle, zodb_pickle
......
......@@ -17,8 +17,8 @@ Any storage that supports the history() method should be able to pass
all these tests.
"""
from ZODB.Transaction import Transaction
from ZODB.tests.MinPO import MinPO
from transaction import Transaction
class HistoryStorage:
def checkSimpleHistory(self):
......
......@@ -20,8 +20,8 @@ all these tests.
from ZODB.tests.MinPO import MinPO
from ZODB.tests.StorageTestBase import zodb_pickle, zodb_unpickle
from ZODB.utils import U64, p64
from ZODB.Transaction import Transaction
from transaction import Transaction
class IteratorCompare:
......
......@@ -3,13 +3,13 @@ import sys
import threading
import time
import ZODB
from persistent.mapping import PersistentMapping
import transaction
import ZODB
from ZODB.tests.StorageTestBase \
import zodb_pickle, zodb_unpickle, handle_serials
from ZODB.tests.MinPO import MinPO
from ZODB.Transaction import Transaction
from ZODB.POSException import ConflictError
SHORT_DELAY = 0.01
......@@ -59,6 +59,7 @@ class ZODBClientThread(TestThread):
def runtest(self):
conn = self.db.open()
conn.sync()
root = conn.root()
d = self.get_thread_dict(root)
if d is None:
......@@ -126,7 +127,7 @@ class StorageClientThread(TestThread):
def dostore(self, i):
data = zodb_pickle(MinPO((self.getName(), i)))
t = Transaction()
t = transaction.Transaction()
oid = self.oid()
self.pause()
......
......@@ -127,7 +127,7 @@ class PackableStorageBase:
try:
self._storage.load(ZERO, '')
except KeyError:
from ZODB.Transaction import Transaction
from transaction import Transaction
file = StringIO()
p = cPickle.Pickler(file, 1)
p.dump((PersistentMapping, None))
......
......@@ -12,7 +12,7 @@
#
##############################################################################
from ZODB.POSException import ReadOnlyError, Unsupported
from ZODB.Transaction import Transaction
import transaction
class ReadOnlyStorage:
......@@ -47,7 +47,7 @@ class ReadOnlyStorage:
def checkWriteMethods(self):
self._make_readonly()
self.assertRaises(ReadOnlyError, self._storage.new_oid)
t = Transaction()
t = transaction.Transaction()
self.assertRaises(ReadOnlyError, self._storage.tpc_begin, t)
if self._storage.supportsVersions():
......
......@@ -13,7 +13,7 @@
##############################################################################
"""More recovery and iterator tests."""
from ZODB.Transaction import Transaction
from transaction import Transaction
from ZODB.tests.IteratorStorage import IteratorDeepCompare
from ZODB.tests.StorageTestBase import MinPO, zodb_unpickle, snooze
from ZODB import DB
......
......@@ -16,9 +16,10 @@
from ZODB.tests.MinPO import MinPO
from ZODB.tests.StorageTestBase import \
zodb_unpickle, zodb_pickle, snooze, handle_serials
from ZODB.Transaction import Transaction
from ZODB.utils import p64, u64
import transaction
ZERO = '\0'*8
class RevisionStorage:
......@@ -142,7 +143,7 @@ class RevisionStorage:
oid = self._storage.new_oid()
def helper(tid, revid, x):
data = zodb_pickle(MinPO(x))
t = Transaction()
t = transaction.Transaction()
try:
self._storage.tpc_begin(t, p64(tid))
r1 = self._storage.store(oid, revid, data, '', t)
......
......@@ -26,9 +26,9 @@ import unittest
from cPickle import Pickler, Unpickler
from cStringIO import StringIO
from ZODB.Transaction import Transaction
from ZODB.utils import u64
import transaction
from ZODB.utils import u64
from ZODB.tests.MinPO import MinPO
ZERO = '\0'*8
......@@ -184,7 +184,7 @@ class StorageTestBase(unittest.TestCase):
if version is None:
version = ''
# Begin the transaction
t = Transaction()
t = transaction.Transaction()
if user is not None:
t.user = user
if description is not None:
......@@ -211,7 +211,7 @@ class StorageTestBase(unittest.TestCase):
def _undo(self, tid, expected_oids=None, note=None):
# Undo a tid that affects a single object (oid).
# XXX This is very specialized
t = Transaction()
t = transaction.Transaction()
t.note(note or "undo")
self._storage.tpc_begin(t)
tid, oids = self._storage.undo(tid, t)
......@@ -224,7 +224,7 @@ class StorageTestBase(unittest.TestCase):
return self._storage.lastTransaction()
def _commitVersion(self, src, dst):
t = Transaction()
t = transaction.Transaction()
t.note("commit %r to %r" % (src, dst))
self._storage.tpc_begin(t)
tid, oids = self._storage.commitVersion(src, dst, t)
......@@ -233,7 +233,7 @@ class StorageTestBase(unittest.TestCase):
return oids
def _abortVersion(self, ver):
t = Transaction()
t = transaction.Transaction()
t.note("abort %r" % ver)
self._storage.tpc_begin(t)
tid, oids = self._storage.abortVersion(ver, t)
......
......@@ -62,7 +62,7 @@ tested? Is it a general restriction?
"""
from ZODB.Transaction import Transaction
from transaction import Transaction
from ZODB.POSException import StorageTransactionError
VERSION = "testversion"
......
......@@ -18,13 +18,15 @@ Any storage that supports undo() must pass these tests.
import time
import types
from persistent import Persistent
from transaction import Transaction
from ZODB import POSException
from ZODB.Transaction import Transaction
from ZODB.serialize import referencesf
from ZODB.utils import p64
from ZODB import DB
from persistent import Persistent
from ZODB.tests.MinPO import MinPO
from ZODB.tests.StorageTestBase import zodb_pickle, zodb_unpickle
......
......@@ -16,8 +16,9 @@
import time
import transaction
from ZODB.serialize import referencesf
from ZODB.Transaction import Transaction
from ZODB.tests.MinPO import MinPO
from ZODB.tests.StorageTestBase import zodb_unpickle
......@@ -114,7 +115,7 @@ class TransactionalUndoVersionStorage:
version=version, description='version2')
self._x_dostore(description='create2')
t = Transaction()
t = transaction.Transaction()
t.description = 'commit version'
self._storage.tpc_begin(t)
self._storage.commitVersion(version, '', t)
......
......@@ -18,9 +18,10 @@ Any storage that supports versions should be able to pass all these tests.
import time
from transaction import Transaction
from ZODB import POSException
from ZODB.serialize import referencesf
from ZODB.Transaction import Transaction
from ZODB.tests.MinPO import MinPO
from ZODB.tests.StorageTestBase import zodb_unpickle, snooze
from ZODB import DB
......
......@@ -483,8 +483,6 @@ class StubStorage:
appended to self._finished.
"""
sortKey = 'StubStorage sortKey'
# internal
_oid = 1
_transaction = None
......@@ -502,6 +500,9 @@ class StubStorage:
self._oid += 1
return oid
def sortKey(self):
return 'StubStorage sortKey'
def tpc_begin(self, transaction):
if transaction is None:
raise TypeError('transaction may not be None')
......
......@@ -11,9 +11,9 @@
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
import ZODB.FileStorage
import os, unittest
from ZODB.Transaction import Transaction
import transaction
import ZODB.FileStorage
from ZODB import POSException
from ZODB.tests import StorageTestBase, BasicStorage, \
......@@ -195,7 +195,7 @@ class SlowFileStorageTest(BaseFileStorageTests):
# every 8000 calls. Make sure it gets minimal coverage.
oids = [[self._storage.new_oid(), None] for i in range(100)]
for i in range(100):
t = Transaction()
t = transaction.Transaction()
self._storage.tpc_begin(t)
for j in range(100):
o = MinPO(j)
......
......@@ -11,13 +11,7 @@
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""
Revision information:
$Id: testTransaction.py,v 1.19 2004/02/27 00:31:55 faassen Exp $
"""
"""
"""Test tranasction behavior for variety of cases.
I wrote these unittests to investigate some odd transaction
behavior when doing unittests of integrating non sub transaction
......@@ -42,26 +36,24 @@ TODO
add in tests for objects which are modified multiple times,
for example an object that gets modified in multiple sub txns.
$Id: testTransaction.py,v 1.20 2004/04/01 03:56:57 jeremy Exp $
"""
from types import TupleType
import unittest
from ZODB import Transaction
import transaction
class TransactionTests(unittest.TestCase):
def setUp(self):
Transaction.hosed = 0
self.orig_tm = transaction.manager
transaction.manager = transaction.TransactionManager()
self.sub1 = DataObject()
self.sub2 = DataObject()
self.sub3 = DataObject()
self.nosub1 = DataObject(nost=1)
def tearDown(self):
Transaction.free_transaction()
transaction.manager = self.orig_tm
# basic tests with two sub trans jars
# really we only need one, so tests for
......@@ -124,18 +116,12 @@ class TransactionTests(unittest.TestCase):
assert self.sub1._p_jar.cabort_sub == 1
def testMultipleSubTransactionCommitCommit(self):
# add it
self.sub1.modify()
get_transaction().commit(1)
# add another
self.sub2.modify()
# reset a flag on the original to test it again
self.sub1.ctpc_finish = 0
get_transaction().commit(1)
# this is interesting.. we go through
......@@ -150,7 +136,7 @@ class TransactionTests(unittest.TestCase):
get_transaction().commit()
# we did an implicit sub commit, is this impl artifiact?
# we did an implicit sub commit, is this impl artifact?
assert self.sub3._p_jar.ccommit_sub == 1
assert self.sub1._p_jar.ctpc_finish > 1
......@@ -350,7 +336,6 @@ class TransactionTests(unittest.TestCase):
assert self.nosub1._p_jar.ctpc_finish == 0
assert self.nosub1._p_jar.ccommit == 1
assert self.nosub1._p_jar.ctpc_abort == 1
assert Transaction.hosed == 0
def testExceptionInTpcVote(self):
......@@ -367,7 +352,6 @@ class TransactionTests(unittest.TestCase):
assert self.nosub1._p_jar.ccommit == 1
assert self.nosub1._p_jar.ctpc_abort == 1
assert self.sub1._p_jar.ctpc_abort == 1
assert Transaction.hosed == 0
def testExceptionInTpcBegin(self):
"""
......@@ -406,29 +390,18 @@ class TransactionTests(unittest.TestCase):
except TestTxnException: pass
assert self.nosub1._p_jar.ctpc_abort == 1
assert Transaction.hosed == 0
### More Failure modes...
# now we mix in some sub transactions
###
def testExceptionInSubCommitSub(self):
"""
this tests exhibits some odd behavior,
nothing thats technically incorrect...
basically it seems non deterministic, even
stranger the behavior seems dependent on what
values i test after the fact... very odd,
almost relativistic.
in-retrospect this is from the fact that
dictionaries are used to store jars at some point
"""
# It's harder than normal to verify test results, because
# the subtransaction jars are stored in a dictionary. The
# order in which jars are processed depends on the order
# they come out of the dictionary.
self.sub1.modify()
get_transaction().commit(1)
self.nosub1.modify()
......@@ -442,24 +415,30 @@ class TransactionTests(unittest.TestCase):
try:
get_transaction().commit()
except TestTxnException: pass
except TestTxnException:
pass
# odd this doesn't seem to be entirely deterministic..
if self.sub1._p_jar.ccommit_sub:
assert self.sub1._p_jar.ctpc_abort == 1
self.assertEqual(self.sub1._p_jar.ctpc_abort, 1)
else:
assert self.sub1._p_jar.cabort_sub == 1
self.assertEqual(self.sub1._p_jar.cabort_sub, 1)
self.assertEqual(self.sub2._p_jar.ctpc_abort, 1)
self.assertEqual(self.nosub1._p_jar.ctpc_abort, 1)
if self.sub3._p_jar.ccommit_sub:
assert self.sub3._p_jar.ctpc_abort == 1
self.assertEqual(self.sub3._p_jar.ctpc_abort, 1)
else:
assert self.sub3._p_jar.cabort_sub == 1
assert self.sub2._p_jar.ctpc_abort == 1
assert self.nosub1._p_jar.ctpc_abort == 1
self.assertEqual(self.sub3._p_jar.cabort_sub, 1)
def testExceptionInSubAbortSub(self):
# This test has two errors. When commit_sub() is called on
# sub1, it will fail. If sub1 is handled first, it will raise
# an except and abort_sub() will be called on sub2. If sub2
# is handled first, then commit_sub() will fail after sub2 has
# already begun its top-level transaction and tpc_abort() will
# be called.
self.sub1._p_jar = SubTransactionJar(errors='commit_sub')
self.sub1.modify(nojar=1)
get_transaction().commit(1)
......@@ -482,51 +461,47 @@ class TransactionTests(unittest.TestCase):
# called, then tpc_abort() should be called to abort the
# actual transaction. If not, then calling abort_sub() is
# sufficient.
if self.sub3._p_jar.ccommit_sub == 1:
if self.sub3._p_jar.ccommit_sub:
self.assertEqual(self.sub3._p_jar.ctpc_abort, 1)
else:
self.assertEqual(self.sub3._p_jar.cabort_sub, 1)
# last test, check the hosing mechanism
def testHoserStoppage(self):
# XXX We should consult ZConfig to decide whether we can get into a
# hosed state or not.
return
## def testHoserStoppage(self):
## # It's hard to test the "hosed" state of the database, where
## # hosed means that a failure occurred in the second phase of
## # the two phase commit. It's hard because the database can
## # recover from such an error if it occurs during the very first
## # tpc_finish() call of the second phase.
# It's hard to test the "hosed" state of the database, where
# hosed means that a failure occurred in the second phase of
# the two phase commit. It's hard because the database can
# recover from such an error if it occurs during the very first
# tpc_finish() call of the second phase.
## for obj in self.sub1, self.sub2:
## j = HoserJar(errors='tpc_finish')
## j.reset()
## obj._p_jar = j
## obj.modify(nojar=1)
for obj in self.sub1, self.sub2:
j = HoserJar(errors='tpc_finish')
j.reset()
obj._p_jar = j
obj.modify(nojar=1)
## try:
## get_transaction().commit()
## except TestTxnException:
## pass
try:
get_transaction().commit()
except TestTxnException:
pass
## self.assert_(Transaction.hosed)
self.assert_(Transaction.hosed)
## self.sub2.modify()
self.sub2.modify()
try:
get_transaction().commit()
except Transaction.POSException.TransactionError:
pass
else:
self.fail("Hosed Application didn't stop commits")
## try:
## get_transaction().commit()
## except Transaction.POSException.TransactionError:
## pass
## else:
## self.fail("Hosed Application didn't stop commits")
class DataObject:
def __init__(self, nost=0):
self.nost= nost
self.nost = nost
self._p_jar = None
def modify(self, nojar=0, tracing=0):
......@@ -543,7 +518,7 @@ class TestTxnException(Exception):
class BasicJar:
def __init__(self, errors=(), tracing=0):
if not isinstance(errors, TupleType):
if not isinstance(errors, tuple):
errors = errors,
self.errors = errors
self.tracing = tracing
......@@ -557,7 +532,12 @@ class BasicJar:
self.ccommit_sub = 0
def __repr__(self):
return "<jar %X %s>" % (id(self), self.errors)
return "<%s %X %s>" % (self.__class__.__name__, id(self), self.errors)
def sortKey(self):
# All these jars use the same sort key, and Python's list.sort()
# is stable. These two
return self.__class__.__name__
def check(self, method):
if self.tracing:
......@@ -637,17 +617,17 @@ def test_join():
transaction.interfaces.IDataManager.
>>> from ZODB.tests.sampledm import DataManager
>>> from ZODB.Transaction import DataManagerAdapter
>>> t = Transaction.Transaction()
>>> from transaction._transaction import DataManagerAdapter
>>> t = transaction.Transaction()
>>> dm = DataManager()
>>> t.join(dm)
The end result is that a data manager adapter is one of the
transaction's objects:
>>> isinstance(t._objects[0], DataManagerAdapter)
>>> isinstance(t._resources[0], DataManagerAdapter)
True
>>> t._objects[0]._datamanager is dm
>>> t._resources[0]._datamanager is dm
True
"""
......
......@@ -16,8 +16,10 @@ import unittest
import ZODB
import ZODB.FileStorage
from ZODB.POSException import ReadConflictError, ConflictError
from persistent import Persistent
from persistent.mapping import PersistentMapping
import transaction
class P(Persistent):
pass
......@@ -54,13 +56,22 @@ class ZODBTests(unittest.TestCase):
self._db.close()
self._storage.cleanup()
def checkExportImport(self, abort_it=0, dup_name='test_duplicate'):
def checkExportImport(self, abort_it=False):
self.populate()
get_transaction().begin()
get_transaction().note('duplication')
# Duplicate the 'test' object.
conn = self._db.open()
try:
self.duplicate(conn, abort_it)
finally:
conn.close()
conn = self._db.open()
try:
self.verify(conn, abort_it)
finally:
conn.close()
def duplicate(self, conn, abort_it):
get_transaction().begin()
get_transaction().note('duplication')
root = conn.root()
ob = root['test']
assert len(ob) > 10, 'Insufficient test data'
......@@ -71,7 +82,8 @@ class ZODBTests(unittest.TestCase):
assert f.tell() > 0, 'Did not export correctly'
f.seek(0)
new_ob = ob._p_jar.importFile(f)
root[dup_name] = new_ob
self.assertEqual(new_ob, ob)
root['dup'] = new_ob
f.close()
if abort_it:
get_transaction().abort()
......@@ -80,16 +92,13 @@ class ZODBTests(unittest.TestCase):
except:
get_transaction().abort()
raise
finally:
conn.close()
def verify(self, conn, abort_it):
get_transaction().begin()
# Verify the duplicate.
conn = self._db.open()
try:
root = conn.root()
ob = root['test']
try:
ob2 = root[dup_name]
ob2 = root['dup']
except KeyError:
if abort_it:
# Passed the test.
......@@ -97,17 +106,16 @@ class ZODBTests(unittest.TestCase):
else:
raise
else:
if abort_it:
assert 0, 'Did not abort duplication'
self.failUnless(not abort_it, 'Did not abort duplication')
l1 = list(ob.items())
l1.sort()
l2 = list(ob2.items())
l2.sort()
l1 = map(lambda (k, v): (k, v[0]), l1)
l2 = map(lambda (k, v): (k, v[0]), l2)
assert l1 == l2, 'Duplicate did not match'
assert ob._p_oid != ob2._p_oid, 'Did not duplicate'
assert ob._p_jar == ob2._p_jar, 'Not same connection'
self.assertEqual(l1, l2)
self.assert_(ob._p_oid != ob2._p_oid)
self.assertEqual(ob._p_jar, ob2._p_jar)
oids = {}
for v in ob.values():
oids[v._p_oid] = 1
......@@ -115,11 +123,9 @@ class ZODBTests(unittest.TestCase):
assert not oids.has_key(v._p_oid), (
'Did not fully separate duplicate from original')
get_transaction().commit()
finally:
conn.close()
def checkExportImportAborted(self):
self.checkExportImport(abort_it=1, dup_name='test_duplicate_aborted')
self.checkExportImport(abort_it=True)
def checkVersionOnly(self):
# Make sure the changes to make empty transactions a no-op
......@@ -160,6 +166,44 @@ class ZODBTests(unittest.TestCase):
conn._setDB(self._db) # simulate the connection being reopened
self.assertEqual(len(conn._cache), 0)
def checkExplicitTransactionManager(self):
# Test of transactions that apply to only the connection,
# not the thread.
tm1 = transaction.TransactionManager()
conn1 = self._db.open(txn_mgr=tm1)
tm2 = transaction.TransactionManager()
conn2 = self._db.open(txn_mgr=tm2)
try:
r1 = conn1.root()
r2 = conn2.root()
if r1.has_key('item'):
del r1['item']
tm1.get().commit()
r1.get('item')
r2.get('item')
r1['item'] = 1
tm1.get().commit()
self.assertEqual(r1['item'], 1)
# r2 has not seen a transaction boundary,
# so it should be unchanged.
self.assertEqual(r2.get('item'), None)
conn2.sync()
# Now r2 is updated.
self.assertEqual(r2['item'], 1)
# Now, for good measure, send an update in the other direction.
r2['item'] = 2
tm2.get().commit()
self.assertEqual(r1['item'], 1)
self.assertEqual(r2['item'], 2)
conn1.sync()
conn2.sync()
self.assertEqual(r1['item'], 2)
self.assertEqual(r2['item'], 2)
finally:
conn1.close()
conn2.close()
def checkLocalTransactions(self):
# Test of transactions that apply to only the connection,
# not the thread.
......
......@@ -41,8 +41,9 @@ We will use two different connections with the experimental
setLocalTransaction() method to make sure that the connections act
independently, even though they'll be run from a single thread.
>>> cn1 = db.open()
>>> txn1 = cn1.setLocalTransaction()
>>> import transaction
>>> tm1 = transaction.TransactionManager()
>>> cn1 = db.open(txn_mgr=tm1)
The test will just use some MinPO objects. The next few lines just
setup an initial database state.
......@@ -51,12 +52,12 @@ setup an initial database state.
>>> r = cn1.root()
>>> r["a"] = MinPO(1)
>>> r["b"] = MinPO(1)
>>> txn1.commit()
>>> tm1.get().commit()
Now open a second connection.
>>> cn2 = db.open()
>>> txn2 = cn2.setLocalTransaction()
>>> tm2 = transaction.TransactionManager()
>>> cn2 = db.open(txn_mgr=tm2)
Connection high-water mark
--------------------------
......@@ -104,7 +105,7 @@ will modify "a." The other transaction will then modify "b" and commit.
>>> r1 = cn1.root()
>>> r1["a"].value = 2
>>> cn1.getTransaction().commit()
>>> tm1.get().commit()
>>> txn = db.lastTransaction()
The second connection has its high-water mark set now.
......@@ -141,7 +142,7 @@ It's possible to modify "a", but we get a conflict error when we
commit the transaction.
>>> r2["a"].value = 3
>>> txn2.commit()
>>> tm2.get().commit()
Traceback (most recent call last):
...
ConflictError: database conflict error (oid 0000000000000001, class ZODB.tests.MinPO.MinPO)
......@@ -155,9 +156,7 @@ None
>>> r1 = cn1.root()
>>> r1["a"].value = 3
>>> txn1 is cn1.getTransaction()
True
>>> cn1.getTransaction().commit()
>>> tm1.get().commit()
>>> txn = db.lastTransaction()
>>> cn2._txn_time == txn
True
......@@ -165,7 +164,7 @@ True
>>> r2["b"].value = r2["a"].value + 1
>>> r2["b"].value
3
>>> txn2.commit()
>>> tm2.get().commit()
>>> print cn2._txn_time
None
......@@ -185,7 +184,7 @@ First get the database back in an initial state.
>>> cn1.sync()
>>> r1["a"].value = 0
>>> r1["b"].value = 0
>>> cn1.getTransaction().commit()
>>> tm1.get().commit()
>>> cn2.sync()
>>> r2["a"].value
......@@ -206,7 +205,7 @@ should all have the same effect on non-current objects in cache.
... cn1.sync()
... r1["a"].value = 0
... r1["b"].value = 0
... cn1.getTransaction().commit()
... tm1.get().commit()
... cn2.sync()
... r2["b"].value = 1
... cn2.getTransaction().commit()
......@@ -217,7 +216,7 @@ should all have the same effect on non-current objects in cache.
>>> r1["b"].value
0
>>> r1["a"].value = 1
>>> cn1.getTransaction().commit()
>>> tm1.get().commit()
>>> r1["b"]._p_state
-1
......@@ -280,14 +279,13 @@ non-current revision to load.
>>> ts = TestStorage()
>>> db = DB(ts)
>>> cn1 = db.open()
>>> txn1 = cn1.setLocalTransaction()
>>> cn1 = db.open(txn_mgr=tm1)
>>> r1 = cn1.root()
>>> r1["a"] = MinPO(0)
>>> r1["b"] = MinPO(0)
>>> cn1.getTransaction().commit()
>>> tm1.get().commit()
>>> r1["b"].value = 1
>>> cn1.getTransaction().commit()
>>> tm1.get().commit()
>>> cn1.cacheMinimize() # makes everything in cache a ghost
>>> oid = r1["b"]._p_oid
......@@ -318,12 +316,11 @@ activate "b" will result in a ReadConflictError.
>>> ts = TestStorage()
>>> db = DB(ts)
>>> cn1 = db.open()
>>> txn1 = cn1.setLocalTransaction()
>>> cn1 = db.open(txn_mgr=tm1)
>>> r1 = cn1.root()
>>> r1["a"] = MinPO(0)
>>> r1["b"] = MinPO(0)
>>> cn1.getTransaction().commit()
>>> tm1.get().commit()
>>> cn1.cacheMinimize() # makes everything in cache a ghost
>>> oid = r1["b"]._p_oid
......
......@@ -11,3 +11,4 @@ is syntactically simple, but semantically complex. The semantics
were not easy to express in the interface. This could probably use
more work. The semantics are presented in detail through examples of
a sample data manager in transaction.tests.test_SampleDataManager.
......@@ -12,5 +12,22 @@
#
############################################################################
from ZODB.Transaction import get_transaction
from transaction._transaction import Transaction
from transaction._manager import TransactionManager, ThreadTransactionManager
manager = ThreadTransactionManager()
def get():
return manager.get()
def begin():
return manager.begin()
def commit():
manager.get().commit()
def abort():
manager.get().abort()
# XXX Issue deprecation warning if this variant is used?
get_transaction = get
############################################################################
#
# Copyright (c) 2004 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
############################################################################
"""A TransactionManager controls transaction boundaries.
It coordinates application code and resource managers, so that they
are associated with the right transaction.
"""
import thread
from transaction._transaction import Transaction
class TransactionManager(object):
def __init__(self):
self._txn = None
self._synchs = []
def begin(self):
if self._txn is not None:
self._txn.abort()
self._txn = Transaction(self._synchs, self)
return self._txn
def get(self):
if self._txn is None:
self._txn = Transaction(self._synchs, self)
return self._txn
def free(self, txn):
assert txn is self._txn
self._txn = None
def registerSynch(self, synch):
self.synchs.append(synch)
def unregisterSynch(self, synch):
self._synchs.remove(synch)
class ThreadTransactionManager(object):
"""Thread-aware transaction manager.
Each thread is associated with a unique transaction.
"""
def __init__(self):
# _threads maps thread ids to transactions
self._txns = {}
# _synchs maps a thread id to a list of registered synchronizers.
# The list is passed to the Transaction constructor, because
# it needs to call the synchronizers when it commits.
self._synchs = {}
def begin(self):
tid = thread.get_ident()
txn = self._txns.get(tid)
if txn is not None:
txn.abort()
txn = self._txns[tid] = Transaction(self._synchs.get(tid), self)
return txn
def get(self):
tid = thread.get_ident()
txn = self._txns.get(tid)
if txn is None:
txn = self._txns[tid] = Transaction(self._synchs.get(tid), self)
return txn
def free(self, txn):
tid = thread.get_ident()
assert txn is self._txns.get(tid)
del self._txns[tid]
def registerSynch(self, synch):
tid = thread.get_ident()
L = self._synchs.setdefault(tid, [])
L.append(synch)
def unregisterSynch(self, synch):
tid = thread.get_ident()
L = self._synchs.get(tid)
L.remove(synch)
############################################################################
#
# Copyright (c) 2004 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
############################################################################
"""Transaction objects manage resources for an individual activity.
Compatibility issues
--------------------
The implementation of Transaction objects involves two layers of
backwards compatibility, because this version of transaction supports
both ZODB 3 and ZODB 4. Zope is evolving towards the ZODB4
interfaces.
Transaction has two methods for a resource manager to call to
participate in a transaction -- register() and join(). join() takes a
resource manager and adds it to the list of resources. register() is
for backwards compatibility. It takes a persistent object and
registers its _p_jar attribute. XXX explain adapter
Subtransactions
---------------
A subtransaction applies the transaction notion recursively. It
allows a set of modifications within a transaction to be committed or
aborted as a group. A subtransaction is a strictly local activity;
its changes are not visible to any other database connection until the
top-level transaction commits. In addition to its use to organize a
large transaction, subtransactions can be used to optimize memory use.
ZODB must keep modified objects in memory until a transaction commits
and it can write the changes to the storage. A subtransaction uses a
temporary disk storage for its commits, allowing modified objects to
be flushed from memory when the subtransaction commits.
The commit() and abort() methods take an optional subtransaction
argument that defaults to false. If it is a true, the operation is
performed on a subtransaction.
Subtransactions add a lot of complexity to the transaction
implementation. Some resource managers support subtransactions, but
they are not required to. (ZODB Connection is the only standard
resource manager that supports subtransactions.) Resource managers
that do support subtransactions implement abort_sub() and commit_sub()
methods and support a second argument to tpc_begin().
The second argument to tpc_begin() indicates that a subtransaction
commit is beginning (if it is true). In a subtransaction, there is no
tpc_vote() call. (XXX I don't have any idea why.) The tpc_finish()
or tpc_abort() call applies just to that subtransaction.
Once a resource manager is involved in a subtransaction, all
subsequent transactions will be treated as subtransactions until
abort_sub() or commit_sub() is called. abort_sub() will undo all the
changes of the subtransactions. commit_sub() will begin a top-level
transaction and store all the changes from subtransactions. After
commit_sub(), the transaction must still call tpc_vote() and
tpc_finish().
If the resource manager does not support subtransactions, nothing
happens when the subtransaction commits. Instead, the resource
manager is put on a list of managers to commit when the actual
top-level transaction commits. If this happens, it will not be
possible to abort subtransactions.
Two-phase commit
----------------
A transaction commit involves an interaction between the transaction
object and one or more resource managers. The transaction manager
calls the following four methods on each resource manager; it calls
tpc_begin() on each resource manager before calling commit() on any of
them.
1. tpc_begin()
2. commit()
3. tpc_vote()
4. tpc_finish()
Subtransaction commit
---------------------
When a subtransaction commits, the protocol is different.
1. tpc_begin() is passed a second argument, which indicates that a
subtransaction is begin committed.
2. tpc_vote() is not called.
Once a subtransaction has been committed, the top-level transaction
commit will start with a commit_sub() called instead of a tpc_begin()
call.
Error handling
--------------
When errors occur during two-phase commit, the transaction manager
aborts all the resource managers. The specific methods it calls
depend on whether the error occurs before or after the call to
tpc_vote() on that transaction manager.
If the resource manager has not voted, then the resource manager will
have one or more uncommitted objects. There are two cases that lead
to this state; either the transaction manager has not called commit()
for any objects on this resource manager or the call that failed was a
commit() for one of the objects of this resource manager. For each
uncommitted object, including the object that failed in its commit(),
call abort().
Once uncommitted objects are aborted, tpc_abort() or abort_sub() is
called on each resource manager. abort_sub() is called if the
resource manager was involved in a subtransaction.
Synchronization
---------------
You can register sychronization objects (synchronizers) with the
tranasction manager. The synchronizer must implement
beforeCompletion() and afterCompletion() methods. The transaction
manager calls beforeCompletion() when it starts a top-level two-phase
commit. It calls afterCompletion() when a top-level transaction is
committed or aborted. The methods are passed the current Transaction
as their only argument.
XXX This code isn't tested.
"""
import logging
import sys
import thread
_marker = object()
# The point of this is to avoid hiding exceptions (which the builtin
# hasattr() does).
def myhasattr(obj, attr):
return getattr(obj, attr, _marker) is not _marker
class Status:
ACTIVE = "Active"
COMMITTING = "Committing"
COMMITTED = "Committed"
ABORTING = "Aborting"
ABORTED = "Aborted"
FAILED = "Failed"
class Transaction(object):
def __init__(self, synchronizers=None, manager=None):
self.status = Status.ACTIVE
# List of resource managers, e.g. MultiObjectResourceAdapters.
self._resources = []
self._synchronizers = synchronizers or []
self._manager = manager
# _adapters: Connection/_p_jar -> MultiObjectResourceAdapter[Sub]
self._adapters = {}
self._voted = {} # id(Connection) -> boolean, True if voted
# _voted and other dictionaries use the id() of the resource
# manager as a key, because we can't guess whether the actual
# resource managers will be safe to use as dict keys.
# The user, description, and _extension attributes are accessed
# directly by storages, leading underscore notwithstanding.
self.user = ""
self.description = ""
self._extension = {}
self.log = logging.getLogger("txn.%d" % thread.get_ident())
self.log.debug("new transaction")
# _sub contains all of the resource managers involved in
# subtransactions. It maps id(a resource manager) to the resource
# manager.
self._sub = {}
# _nonsub contains all the resource managers that do not support
# subtransactions that were involved in subtransaction commits.
self._nonsub = {}
def join(self, resource):
if self.status != Status.ACTIVE:
# XXX Should it be possible to join a committing transaction?
# I think some users want it.
raise ValueError("expected txn status %r, but it's %r" % (
Status.ACTIVE, self.status))
# XXX the prepare check is a bit of a hack, perhaps it would
# be better to use interfaces. If this is a ZODB4-style
# resource manager, it needs to be adapted, too.
if myhasattr(resource, "prepare"):
resource = DataManagerAdapter(resource)
self._resources.append(resource)
def register(self, obj):
# The old way of registering transaction participants.
#
# register() is passed either a persisent object or a
# resource manager like the ones defined in ZODB.DB.
# If it is passed a persistent object, that object should
# be stored when the transaction commits. For other
# objects, the object implements the standard two-phase
# commit protocol.
manager = getattr(obj, "_p_jar", obj)
adapter = self._adapters.get(manager)
if adapter is None:
if myhasattr(manager, "commit_sub"):
adapter = MultiObjectResourceAdapterSub(manager)
else:
adapter = MultiObjectResourceAdapter(manager)
adapter.objects.append(obj)
self._adapters[manager] = adapter
self.join(adapter)
else:
# XXX comment out this expensive assert later
# Use id() to guard against proxies.
assert id(obj) not in map(id, adapter.objects)
adapter.objects.append(obj)
# In the presence of subtransactions, an existing adapter
# might be in _adapters but not in _resources.
if adapter not in self._resources:
self._resources.append(adapter)
def begin(self):
# XXX I'm not sure how this should be implemented. Not doing
# anything now, but my best guess is: If nothing has happened
# yet, it's fine. Otherwise, abort this transaction and let
# the txn manager create a new one.
pass
def commit(self, subtransaction=False):
if not subtransaction and self._sub and self._resources:
# This commit is for a top-level transaction that has
# previously committed subtransactions. Do one last
# subtransaction commit to clear out the current objects,
# then commit all the subjars.
self.commit(True)
if not subtransaction:
for s in self._synchronizers:
s.beforeCompletion()
if not subtransaction:
self.status = Status.COMMITTING
self._commitResources(subtransaction)
if subtransaction:
self._resources = []
else:
self.status = Status.COMMITTED
if self._manager:
self._manager.free(self)
for s in self._synchronizers:
s.afterCompletion()
self.log.debug("commit")
def _commitResources(self, subtransaction):
# Execute the two-phase commit protocol.
L = self._getResourceManagers(subtransaction)
try:
for rm in L:
# If you pass subtransaction=True to tpc_begin(), it
# will create a temporary storage for the duration of
# the transaction. To signal that the top-level
# transaction is committing, you must then call
# commit_sub().
if not subtransaction and id(rm) in self._sub:
del self._sub[id(rm)]
rm.commit_sub(self)
else:
rm.tpc_begin(self, subtransaction)
for rm in L:
rm.commit(self)
self.log.debug("commit %r" % rm)
if not subtransaction:
# Not sure why, but it is intentional that you do not
# call tpc_vote() for subtransaction commits.
for rm in L:
rm.tpc_vote(self)
self._voted[id(rm)] = True
try:
for rm in L:
rm.tpc_finish(self)
except:
# XXX do we need to make this warning stronger?
# XXX It would be nice if the system could be configured
# to stop committing transactions at this point.
self.log.critical("A storage error occured during the second "
"phase of the two-phase commit. Resources "
"may be in an inconsistent state.")
raise
except:
# If an error occurs committing a transaction, we try
# to revert the changes in each of the resource managers.
# For top-level transactions, it must be freed from the
# txn manager.
try:
self._cleanup(L)
finally:
if not subtransaction:
self.status = Status.FAILED
if self._manager:
self._manager.free(self)
raise
def _cleanup(self, L):
# Called when an exception occurs during tpc_vote or tpc_finish.
for rm in L:
if id(rm) not in self._voted:
rm.cleanup(self)
for rm in L:
if id(rm) in self._sub:
try:
rm.abort_sub(self)
except Exception, err:
print err
else:
try:
rm.tpc_abort(self)
except Exception, err:
print err
def _getResourceManagers(self, subtransaction):
L = []
if subtransaction:
# If we are in a subtransaction, make sure all resource
# managers are placed in either _sub or _nonsub. When
# the top-level transaction commits, we need to merge
# these back into the resource set.
# If a data manager doesn't support sub-transactions, we
# don't do anything with it now. (That's somewhat okay,
# because subtransactions are mostly just an
# optimization.) Save it until the top-level transaction
# commits.
for rm in self._resources:
if myhasattr(rm, "commit_sub"):
self._sub[id(rm)] = rm
L.append(rm)
else:
self._nonsub[id(rm)] = rm
else:
if self._sub or self._nonsub:
# Merge all of _sub, _nonsub, and _resources.
d = dict(self._sub)
d.update(self._nonsub)
# XXX I think _sub and _nonsub are disjoint, and that
# XXX _resources is empty. If so, we can simplify this code.
assert len(d) == len(self._sub) + len(self._nonsub)
assert not self._resources
for rm in self._resources:
d[id(rm)] = rm
L = d.values()
else:
L = list(self._resources)
L.sort(rm_cmp)
return L
def abort(self, subtransaction=False):
if not subtransaction:
for s in self._synchronizers:
s.beforeCompletion(self)
if subtransaction and self._nonsub:
raise TransactionError("Resource manager does not support "
"subtransaction abort")
tb = None
for rm in self._resources + self._nonsub.values():
try:
rm.abort(self)
except:
if tb is None:
t, v, tb = sys.exc_info()
self.log.error("Failed to abort resource manager: %s",
rm, exc_info=sys.exc_info())
if not subtransaction:
for rm in self._sub.values():
try:
rm.abort_sub(self)
except:
if tb is None:
t, v, tb = sys.exc_info()
self.log.error("Failed to abort_sub resource manager: %s",
rm, exc_info=sys.exc_info())
if not subtransaction:
if self._manager:
self._manager.free(self)
for s in self._synchronizers:
s.afterCompletion(self)
self.log.debug("abort")
if tb is not None:
raise t, v, tb
def note(self, text):
text = text.strip()
if self.description:
self.description += "\n\n" + text
else:
self.description = text
def setUser(self, user_name, path="/"):
self.user = "%s %s" % (path, user_name)
def setExtendedInfo(self, name, value):
self._extension[name] = value
# XXX We need a better name for the adapters.
class MultiObjectResourceAdapter(object):
"""Adapt the old-style register() call to the new-style join().
With join(), a resource mananger like a Connection registers with
the transaction manager. With register(), an individual object
is passed to register().
"""
def __init__(self, jar):
self.manager = jar
self.objects = []
self.ncommitted = 0
def __repr__(self):
return "<%s for %s at %s>" % (self.__class__.__name__,
self.manager, id(self))
def sortKey(self):
return self.manager.sortKey()
def tpc_begin(self, txn, sub=False):
self.manager.tpc_begin(txn, sub)
def tpc_finish(self, txn):
self.manager.tpc_finish(txn)
def tpc_abort(self, txn):
self.manager.tpc_abort(txn)
def commit(self, txn):
for o in self.objects:
self.manager.commit(o, txn)
self.ncommitted += 1
def tpc_vote(self, txn):
self.manager.tpc_vote(txn)
def cleanup(self, txn):
self._abort(self.objects[self.ncommitted:], txn)
def abort(self, txn):
self._abort(self.objects, txn)
def _abort(self, objects, txn):
tb = None
for o in objects:
try:
self.manager.abort(o, txn)
except:
# Capture the first exception and re-raise it after
# aborting all the other objects.
if tb is None:
t, v, tb = sys.exc_info()
txn.log.error("Failed to abort object: %s",
object_hint(o), exc_info=sys.exc_info())
if tb is not None:
raise t, v, tb
class MultiObjectResourceAdapterSub(MultiObjectResourceAdapter):
"""Adapt resource managers that participate in subtransactions."""
def commit_sub(self, txn):
self.manager.commit_sub(txn)
def abort_sub(self, txn):
self.manager.abort_sub(txn)
def tpc_begin(self, txn, sub=False):
self.manager.tpc_begin(txn, sub)
self.sub = sub
def tpc_finish(self, txn):
self.manager.tpc_finish(txn)
if self.sub:
self.objects = []
def rm_cmp(rm1, rm2):
return cmp(rm1.sortKey(), rm2.sortKey())
def object_hint(o):
"""Return a string describing the object.
This function does not raise an exception.
"""
from ZODB.utils import oid_repr
# We should always be able to get __class__.
klass = o.__class__.__name__
# oid would be great, but may this isn't a persistent object.
oid = getattr(o, "_p_oid", _marker)
if oid is not _marker:
oid = oid_repr(oid)
return "%s oid=%s" % (klass, oid)
class DataManagerAdapter(object):
"""Adapt zodb 4-style data managers to zodb3 style
Adapt transaction.interfaces.IDataManager to
ZODB.interfaces.IPureDatamanager
"""
# Note that it is pretty important that this does not have a _p_jar
# attribute. This object will be registered with a zodb3 TM, which
# will then try to get a _p_jar from it, using it as the default.
# (Objects without a _p_jar are their own data managers.)
def __init__(self, datamanager):
self._datamanager = datamanager
self._rollback = None
# XXX I'm not sure why commit() doesn't do anything
def commit(self, transaction):
pass
def abort(self, transaction):
# We need to discard any changes since the last save point, or all
# changes
if self._rollback is None:
# No previous savepoint, so just abort
self._datamanager.abort(transaction)
else:
self._rollback()
def abort_sub(self, transaction):
self._datamanager.abort(transaction)
def commit_sub(self, transaction):
# Nothing to do wrt data, be we begin 2pc for the top-level
# trans
self._sub = False
def tpc_begin(self, transaction, subtransaction=False):
self._sub = subtransaction
def tpc_abort(self, transaction):
if self._sub:
self.abort(self, transaction)
else:
self._datamanager.abort(transaction)
def tpc_finish(self, transaction):
if self._sub:
self._rollback = self._datamanager.savepoint(transaction).rollback
else:
self._datamanager.commit(transaction)
def tpc_vote(self, transaction):
if not self._sub:
self._datamanager.prepare(transaction)
Notes on a future transaction API
=================================
I did a brief review of the current transaction APIs from ZODB 3 and
ZODB 4, considering some of the issues that have come up since last
winter when most of the initial design and implementation of ZODB 4's
transaction API was done.
Participants
------------
There are four participants in the transaction APIs.
1. Application -- Some application code is ultimately in charge of the
transaction process. It uses transactional resources, decides the
scope of individual transactions, and commits or aborts transactions.
2. Resource Manager -- Typically library or framework code that provides
transactional access to some resource -- a ZODB database, a relational
database, or some other resource. It provides an API for application
code that isn't defined by the transaction framework. It collaborates
with the transaction manager to find the current transaction. It
collaborates with the transaction for registration, notification, and
for committing changes.
The ZODB Connection is a resource manager. In ZODB 4, it is called a
data manager. In ZODB 3, it is called a jar. In other literature,
resource manager seems to be common.
3. Transaction -- coordinates the actions of application and resource
managers for a particular activity. The transaction usually has a
short lifetime. The application begins it, resources register with it
as the application runs, then it finishes with a commit or abort.
4. Transaction Manager -- coordinates the use of transaction. The
transaction manager provides policies for associating resource
managers with specific transactions. The question "What is the
current transaction?" is answered by the transaction manager.
I'm taking as a starting point the transaction API that was defined
for ZODB 4. I reviewed it again after a lot of time away, and I still
think it's on the right track.
Current transaction
-------------------
The first question is "What is the current transaction?" This
question is decided by the transaction manager. An application could
chose an application manager that suites its need best.
In the current ZODB, the transaction manager is essentially the
implementation of ZODB.Transaction.get_transaction() and the
associated thread id -> txn dict. I think we can encapsulate this
policy an a first-class object and allow applications to decide which
one they want to use. By default, a thread-based txn manager would be
provided.
The other responsibility of the transaction manager is to decide when
to start a new transaction. The current ZODB transaction manager
starts one whenever a client calls get() and there is no current
transaction. I think there could be some benefit to an explicit new()
operation that will always create a new transaction. A particular
manager could implement the policy that get() called before new()
returns None or raises an exception.
Basic transaction API
---------------------
A transaction module or package can export a very simple API for
interacting with transactions. It hides most of the complexity from
applications that want to use the standard Zope policies. Here's a
sketch of an implementation:
_mgr = TransactionManager()
def get():
"""Return the current transaction."""
return _mgr.get()
def new():
"""Return a new transaction."""
return _mgr.new()
def commit():
"""Commit the current transaction."""
_mgr.get().commit()
def abort():
"""Abort the current transaction."""
_mgr.get().abort()
Application code can just import the transaction module to use the
get(), new(), abort(), and commit() methods.
The individual transaction objects should have a register() method
that is used by a resource manager to register that it has
modifications for this transaction. It's part of the basic API, but
not the basic user API.
Extended transaction API
------------------------
There are a few other methods that might make sense on a transaction:
status() -- return a code or string indicating what state the
transaction is in -- begin, aborted, committed, etc.
note() -- add metadata to txn
The transaction module should have a mechanism for installing a new
transaction manager.
Suspend and resume
------------------
If the transaction manager's job is to decide what the current
transaction is, then it would make sense to have suspend() and
resume() APIs that allow the current activity to be stopped for a
time. The goal of these APIs is to allow more control over
coordination.
It seems like user code would call suspend() and resume() on
individual transaction objects, which would interact with the
transaction manager.
If suspend() and resume() are supported, then we need to think about
whether those events need to be communicated to the resource
managers.
This is a new feature that isn't needed for ZODB 3.3.
Registration and notification
-----------------------------
The transaction object coordinates the activities of resource
managers. When a managed resource is modified, its manager must
register with the current transaction. (It's an error to modify an
object when there is no transaction?)
When the transaction commits or aborts, the transaction calls back to
each registered resource manager. The callbacks form the two-phase
commit protocol. I like the ZODB 4 names and approach prepare() (does
tpc_begin through tpc_vote on the storage).
A resource manager does not register with a transaction if none of its
resources are modified. Some resource managers would like to know
about transaction boundaries anyway. A ZODB Connection would like to
process invalidations at every commit, even if none of its objects
were modified.
It's not clear what the notification interface should look like or
what events are of interest. In theory, transaction begin, abort, and
commit are all interesting; perhaps a combined abort-or-commit event
would be useful. The ZODB use case only needs one event.
The java transaction API has beforeCompletion and afterCompletion,
where after gets passed a status code to indicate abort or commit.
I think these should be sufficient.
Nested transactions / savepoints
--------------------------------
ZODB 3 and ZODB 4 each have a limited form of nested transactions.
They are called subtransactions in ZODB 3 and savepoints in ZODB 4.
The essential mechanism is the same: At the time of subtransaction is
committed, all the modifications up to that time are written out to a
temporary file. The application can later revert to that saved state
or commit the main transaction, which copies modifications from the
temporary file to the real storage.
The savepoint mechanism can be used to implement the subtransaction
model, by creating a savepoint every time a subtransaction starts or
ends.
If a resource manager joins a transaction after a savepoint, we need
to create an initial savepoint for the new resource manager that will
rollback all its changes. If the new resource manager doesn't support
savepoints, we probably need to mark earlier savepoints as invalid.
There are some edges cases to work out here.
It's not clear how nested transactions affect the transaction manager
API. If we just use savepoint(), then there's no issue to sort out.
A nested transaction API may be more convenient. One possibility is
to pass a transaction object to new() indicating that the new
transaction is a child of the current transaction. Example:
transaction.new(transaction.get())
That seems rather wordy. Perhaps:
transaction.child()
where this creates a new nested transaction that is a child of the
current one, raising an exception if there is no current transaction.
This illustrates that a subtransaction feature could create new
requirements for the transaction manager API.
The current ZODB 3 API is that calling commit(1) or commit(True) means
"commit a subtransaction." abort() has the same API. We need to
support this API for backwards compatibility. A new API would be a
new feature that isn't necessary for ZODB 3.3.
ZODB Connection and Transactions
--------------------------------
The Connection has three interactions with a transaction manager.
First, it registers itself with the transaction manager for
synchronization messages. Second, it registers with the current
transaction the first time an object is modified in that transaction.
Third, there is an option to explicitly pass a transaction manager to
the connection constructor via DB.open(); the connection always uses
this transaction manager, regardless of the default manager.
Deadlock and recovery
---------------------
ZODB uses a global sort order to prevent deadlock when it commits
transactions involving multiple resource managers. The resource
manager must define a sortKey() method that provides a global ordering
for resource managers. The sort code doesn't exist in ZODB 4, but
could be added fairly easily.
The transaction managers don't support recovery, where recovery means
restoring a system to a consistent state after a failure during the
second phase of two-phase commit. When a failure occurs in the second
phase, some transaction participations may not know the outcome of the
transaction. (It would be cool to support recovery, but that's not
being discussed now.)
In the absence of real recovery manager means that our transaction
commit implementation needs to play many tricks to avoid the need for
recovery (pseudo-recovery). For example, if the first resource
manager fails in the second phase, we attempt to abort all the other
resource managers. (This isn't strictly correct, because we don't know the
status of the first resource manager if it fails.) If we used
something more like the ZODB 4 implementation, we'd need to make sure
all the pseudo-recovery work is done in the new implementation.
Closing resource managers
-------------------------
The ZODB Connection is explicitly opened and closed by the
application; other resource managers probably get closed to. The
relationship between transactions and closed resource managers is
undefined in the current API. A transaction will probably fail if the
Connection is closed, or succeed by accident if the Connection is
re-opened.
The resource manager - transaction API should include some means for
dealing with close. The likely approach is to raise an error if you
close a resource manager that is currently registered with a
transaction.
First steps
-----------
I would definitely like to see some things in ZODB 3.3:
- simplified module-level transaction calls
- notifications for abort-commit event
- restructured Connection to track modified objects itself
- explicit transaction manager object
##############################################################################
#
# Copyright (c) 2004 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Test backwards compatibility for resource managers using register().
The transaction package supports several different APIs for resource
managers. The original ZODB3 API was implemented by ZODB.Connection.
The Connection passed persistent objects to a Transaction's register()
method. It's possible that third-party code also used this API, hence
these tests that the code that adapts the old interface to the current
API works.
These tests use a TestConnection object that implements the old API.
They check that the right methods are called and in roughly the right
order.
Common cases
------------
First, check that a basic transaction commit works.
>>> cn = TestConnection()
>>> cn.register(Object())
>>> cn.register(Object())
>>> cn.register(Object())
>>> transaction.commit()
>>> len(cn.committed)
3
>>> len(cn.aborted)
0
>>> cn.calls
['begin', 'vote', 'finish']
Second, check that a basic transaction abort works. If the
application calls abort(), then the transaction never gets into the
two-phase commit. It just aborts each object.
>>> cn = TestConnection()
>>> cn.register(Object())
>>> cn.register(Object())
>>> cn.register(Object())
>>> transaction.abort()
>>> len(cn.committed)
0
>>> len(cn.aborted)
3
>>> cn.calls
[]
Error handling
--------------
The tricky part of the implementation is recovering from an error that
occurs during the two-phase commit. We override the commit() and
abort() methods of Object to cause errors during commit.
Note that the implementation uses lists internally, so that objects
are committed in the order they are registered. (In the presence of
multiple resource managers, objects from a single resource manager are
committed in order. The order of resource managers depends on
sortKey().) I'm not sure if this is an accident of the implementation
or a feature that should be supported by any implementation.
>>> cn = TestConnection()
>>> cn.register(Object())
>>> cn.register(CommitError())
>>> cn.register(Object())
>>> transaction.commit()
Traceback (most recent call last):
...
RuntimeError: commit
>>> len(cn.committed)
1
>>> len(cn.aborted)
2
"""
import transaction
class Object(object):
def commit(self):
pass
def abort(self):
pass
class CommitError(Object):
def commit(self):
raise RuntimeError("commit")
class AbortError(Object):
def abort(self):
raise RuntimeError("abort")
class BothError(CommitError, AbortError):
pass
class TestConnection:
def __init__(self):
self.committed = []
self.aborted = []
self.calls = []
def register(self, obj):
obj._p_jar = self
transaction.get().register(obj)
def sortKey(self):
return str(id(self))
def tpc_begin(self, txn, sub):
self.calls.append("begin")
def tpc_vote(self, txn):
self.calls.append("vote")
def tpc_finish(self, txn):
self.calls.append("finish")
def tpc_abort(self, txn):
self.calls.append("abort")
def commit(self, obj, txn):
obj.commit()
self.committed.append(obj)
def abort(self, obj, txn):
obj.abort()
self.aborted.append(obj)
import doctest
def test_suite():
return doctest.DocTestSuite()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment