Commit f8ea49e8 authored by Jim Fulton's avatar Jim Fulton

Implemented checkCurrentSerialInTransaction for the basic storages.

parent cd4763a3
......@@ -399,6 +399,19 @@ def copy(source, dest, verbose=0):
dest.tpc_finish(transaction)
# defined outside of BaseStorage to facilitate independent reuse.
# just depends on _transaction attr and getTid method.
def checkCurrentSerialInTransaction(self, oid, serial, transaction):
if transaction is not self._transaction:
raise POSException.StorageTransactionError(self, transaction)
committed_tid = self.getTid(oid)
if committed_tid != serial:
raise POSException.ReadConflictError(
oid=oid, serials=(committed_tid, serial))
BaseStorage.checkCurrentSerialInTransaction = checkCurrentSerialInTransaction
class TransactionRecord(object):
"""Abstract base class for iterator protocol"""
......
......@@ -24,6 +24,7 @@ import random
import weakref
import tempfile
import threading
import ZODB.BaseStorage
import ZODB.blob
import ZODB.interfaces
import ZODB.MappingStorage
......@@ -304,6 +305,9 @@ class DemoStorage(object):
oid, oldserial, data, blobfilename, '', transaction)
raise
checkCurrentSerialInTransaction = (
ZODB.BaseStorage.checkCurrentSerialInTransaction)
def temporaryDirectory(self):
try:
return self.changes.temporaryDirectory()
......
......@@ -21,6 +21,7 @@ import BTrees
import cPickle
import time
import threading
import ZODB.BaseStorage
import ZODB.interfaces
import ZODB.POSException
import ZODB.TimeStamp
......@@ -261,6 +262,9 @@ class MappingStorage(object):
return self._tid
checkCurrentSerialInTransaction = (
ZODB.BaseStorage.checkCurrentSerialInTransaction)
# ZODB.interfaces.IStorage
@ZODB.utils.locked(opened)
def tpc_abort(self, transaction):
......
......@@ -199,3 +199,89 @@ class BasicStorage:
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
t.commit()
def check_checkCurrentSerialInTransaction(self):
oid = '\0\0\0\0\0\0\0\xf0'
tid = self._dostore(oid)
tid2 = self._dostore(oid, revid=tid)
# stale read
transaction.begin()
t = transaction.get()
self._storage.tpc_begin(t)
try:
self._storage.store('\0\0\0\0\0\0\0\xf1',
'\0\0\0\0\0\0\0\0', 'x', '', t)
self._storage.checkCurrentSerialInTransaction(oid, tid, t)
self._storage.tpc_vote(t)
except POSException.ReadConflictError, v:
self.assert_(v.oid) == oid
self.assert_(v.serials == (tid2, tid))
else:
self.assert_(False, "No conflict error")
self._storage.tpc_abort(t)
# non-stale read, no stress. :)
transaction.begin()
t = transaction.get()
self._storage.tpc_begin(t)
self._storage.store('\0\0\0\0\0\0\0\xf2',
'\0\0\0\0\0\0\0\0', 'x', '', t)
self._storage.checkCurrentSerialInTransaction(oid, tid2, t)
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
# non-stale read, competition after vote. The competing
# transaction most produce a tid > this transaction's tid
transaction.begin()
t = transaction.get()
self._storage.tpc_begin(t)
self._storage.store('\0\0\0\0\0\0\0\xf3',
'\0\0\0\0\0\0\0\0', 'x', '', t)
self._storage.checkCurrentSerialInTransaction(oid, tid2, t)
self._storage.tpc_vote(t)
# We'll run the competing trans in a separate thread:
import threading, time
thread = threading.Thread(name='T1',
target=self._dostore, args=(oid,), kwargs=dict(revid=tid2))
thread.start()
time.sleep(.1)
self._storage.tpc_finish(t)
thread.join()
tid3 = self._storage.load(oid)[1]
self.assert_(tid3 > self._storage.load('\0\0\0\0\0\0\0\xf3')[1])
# non-stale competing trans after checkCurrentSerialInTransaction
transaction.begin()
t = transaction.get()
self._storage.tpc_begin(t)
self._storage.store('\0\0\0\0\0\0\0\xf4',
'\0\0\0\0\0\0\0\0', 'x', '', t)
self._storage.checkCurrentSerialInTransaction(oid, tid3, t)
# We'll run the competing trans in a separate thread:
thread = threading.Thread(name='T2',
target=self._dostore, args=(oid,), kwargs=dict(revid=tid3))
thread.start()
time.sleep(.1)
# There are 2 possibilities:
# 1. The store happens before this transaction completes,
# in which case, the vote below fails.
# 2. The store happens after this trans, in which case, the
# tid of the object is greater than this transaction's tid.
try:
self._storage.tpc_vote(t)
except ReadConflictError:
thread.join() # OK :)
else:
self._storage.tpc_finish(t)
thread.join()
tid4 = self._storage.load(oid)[1]
self.assert_(tid4 > self._storage.load('\0\0\0\0\0\0\0\xf4')[1])
......@@ -73,7 +73,8 @@ class MinimalMemoryStorage(BaseStorage, object):
def _clear_temp(self):
pass
def load(self, oid, version):
def load(self, oid, version=''):
assert version == ''
self._lock_acquire()
try:
assert not version
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment