Commit 99cbe353 authored by Jim Fulton's avatar Jim Fulton

Implemented checkCurrentSerialInTransaction for the basic storages.

parent 82c3c0fe
...@@ -399,6 +399,19 @@ def copy(source, dest, verbose=0): ...@@ -399,6 +399,19 @@ def copy(source, dest, verbose=0):
dest.tpc_finish(transaction) dest.tpc_finish(transaction)
# defined outside of BaseStorage to facilitate independent reuse.
# just depends on _transaction attr and getTid method.
def checkCurrentSerialInTransaction(self, oid, serial, transaction):
if transaction is not self._transaction:
raise POSException.StorageTransactionError(self, transaction)
committed_tid = self.getTid(oid)
if committed_tid != serial:
raise POSException.ReadConflictError(
oid=oid, serials=(committed_tid, serial))
BaseStorage.checkCurrentSerialInTransaction = checkCurrentSerialInTransaction
class TransactionRecord(object): class TransactionRecord(object):
"""Abstract base class for iterator protocol""" """Abstract base class for iterator protocol"""
......
...@@ -24,6 +24,7 @@ import random ...@@ -24,6 +24,7 @@ import random
import weakref import weakref
import tempfile import tempfile
import threading import threading
import ZODB.BaseStorage
import ZODB.blob import ZODB.blob
import ZODB.interfaces import ZODB.interfaces
import ZODB.MappingStorage import ZODB.MappingStorage
...@@ -304,6 +305,9 @@ class DemoStorage(object): ...@@ -304,6 +305,9 @@ class DemoStorage(object):
oid, oldserial, data, blobfilename, '', transaction) oid, oldserial, data, blobfilename, '', transaction)
raise raise
checkCurrentSerialInTransaction = (
ZODB.BaseStorage.checkCurrentSerialInTransaction)
def temporaryDirectory(self): def temporaryDirectory(self):
try: try:
return self.changes.temporaryDirectory() return self.changes.temporaryDirectory()
......
...@@ -21,6 +21,7 @@ import BTrees ...@@ -21,6 +21,7 @@ import BTrees
import cPickle import cPickle
import time import time
import threading import threading
import ZODB.BaseStorage
import ZODB.interfaces import ZODB.interfaces
import ZODB.POSException import ZODB.POSException
import ZODB.TimeStamp import ZODB.TimeStamp
...@@ -261,6 +262,9 @@ class MappingStorage(object): ...@@ -261,6 +262,9 @@ class MappingStorage(object):
return self._tid return self._tid
checkCurrentSerialInTransaction = (
ZODB.BaseStorage.checkCurrentSerialInTransaction)
# ZODB.interfaces.IStorage # ZODB.interfaces.IStorage
@ZODB.utils.locked(opened) @ZODB.utils.locked(opened)
def tpc_abort(self, transaction): def tpc_abort(self, transaction):
......
...@@ -199,3 +199,89 @@ class BasicStorage: ...@@ -199,3 +199,89 @@ class BasicStorage:
self._storage.tpc_vote(t) self._storage.tpc_vote(t)
self._storage.tpc_finish(t) self._storage.tpc_finish(t)
t.commit() t.commit()
def check_checkCurrentSerialInTransaction(self):
oid = '\0\0\0\0\0\0\0\xf0'
tid = self._dostore(oid)
tid2 = self._dostore(oid, revid=tid)
# stale read
transaction.begin()
t = transaction.get()
self._storage.tpc_begin(t)
try:
self._storage.store('\0\0\0\0\0\0\0\xf1',
'\0\0\0\0\0\0\0\0', 'x', '', t)
self._storage.checkCurrentSerialInTransaction(oid, tid, t)
self._storage.tpc_vote(t)
except POSException.ReadConflictError, v:
self.assert_(v.oid) == oid
self.assert_(v.serials == (tid2, tid))
else:
self.assert_(False, "No conflict error")
self._storage.tpc_abort(t)
# non-stale read, no stress. :)
transaction.begin()
t = transaction.get()
self._storage.tpc_begin(t)
self._storage.store('\0\0\0\0\0\0\0\xf2',
'\0\0\0\0\0\0\0\0', 'x', '', t)
self._storage.checkCurrentSerialInTransaction(oid, tid2, t)
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
# non-stale read, competition after vote. The competing
# transaction most produce a tid > this transaction's tid
transaction.begin()
t = transaction.get()
self._storage.tpc_begin(t)
self._storage.store('\0\0\0\0\0\0\0\xf3',
'\0\0\0\0\0\0\0\0', 'x', '', t)
self._storage.checkCurrentSerialInTransaction(oid, tid2, t)
self._storage.tpc_vote(t)
# We'll run the competing trans in a separate thread:
import threading, time
thread = threading.Thread(name='T1',
target=self._dostore, args=(oid,), kwargs=dict(revid=tid2))
thread.start()
time.sleep(.1)
self._storage.tpc_finish(t)
thread.join()
tid3 = self._storage.load(oid)[1]
self.assert_(tid3 > self._storage.load('\0\0\0\0\0\0\0\xf3')[1])
# non-stale competing trans after checkCurrentSerialInTransaction
transaction.begin()
t = transaction.get()
self._storage.tpc_begin(t)
self._storage.store('\0\0\0\0\0\0\0\xf4',
'\0\0\0\0\0\0\0\0', 'x', '', t)
self._storage.checkCurrentSerialInTransaction(oid, tid3, t)
# We'll run the competing trans in a separate thread:
thread = threading.Thread(name='T2',
target=self._dostore, args=(oid,), kwargs=dict(revid=tid3))
thread.start()
time.sleep(.1)
# There are 2 possibilities:
# 1. The store happens before this transaction completes,
# in which case, the vote below fails.
# 2. The store happens after this trans, in which case, the
# tid of the object is greater than this transaction's tid.
try:
self._storage.tpc_vote(t)
except ReadConflictError:
thread.join() # OK :)
else:
self._storage.tpc_finish(t)
thread.join()
tid4 = self._storage.load(oid)[1]
self.assert_(tid4 > self._storage.load('\0\0\0\0\0\0\0\xf4')[1])
...@@ -73,7 +73,8 @@ class MinimalMemoryStorage(BaseStorage, object): ...@@ -73,7 +73,8 @@ class MinimalMemoryStorage(BaseStorage, object):
def _clear_temp(self): def _clear_temp(self):
pass pass
def load(self, oid, version): def load(self, oid, version=''):
assert version == ''
self._lock_acquire() self._lock_acquire()
try: try:
assert not version assert not version
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment