Commit a26ec112 authored by Jeremy Hylton's avatar Jeremy Hylton

Add new tests from the Standby-branch branch

parent dfdcb79b
import random
import threading
import time
import ZODB
from PersistentMapping import PersistentMapping
from ZODB.tests.StorageTestBase \
import StorageTestBase, zodb_pickle, zodb_unpickle, handle_serials
from ZODB.tests.MinPO import MinPO
from ZODB.Transaction import Transaction
from ZODB.POSException import ConflictError
SHORT_DELAY = 0.01
def sort(l):
"Sort a list in place and return it."
l.sort()
return l
class ZODBClientThread(threading.Thread):
__super_init = threading.Thread.__init__
def __init__(self, db, test, commits=10, delay=SHORT_DELAY):
self.__super_init()
self.db = db
self.test = test
self.commits = commits
self.delay = delay
def run(self):
conn = self.db.open()
root = conn.root()
d = self.get_thread_dict(root)
if d is None:
self.test.fail()
else:
for i in range(self.commits):
self.commit(d, i)
self.test.assertEqual(sort(d.keys()), range(self.commits))
def commit(self, d, num):
d[num] = time.time()
time.sleep(self.delay)
get_transaction().commit()
time.sleep(self.delay)
def get_thread_dict(self, root):
name = self.getName()
# arbitrarily limit to 10 re-tries
for i in range(10):
try:
m = PersistentMapping()
root[name] = m
get_transaction().commit()
break
except ConflictError:
get_transaction().abort()
for i in range(10):
try:
return root.get(name)
except ConflictError:
get_transaction().abort()
class StorageClientThread(threading.Thread):
__super_init = threading.Thread.__init__
def __init__(self, storage, test, commits=10, delay=SHORT_DELAY):
self.__super_init()
self.storage = storage
self.test = test
self.commits = commits
self.delay = delay
self.oids = {}
def run(self):
for i in range(self.commits):
self.dostore(i)
self.check()
def check(self):
for oid, revid in self.oids.items():
data, serial = self.storage.load(oid, '')
self.test.assertEqual(serial, revid)
obj = zodb_unpickle(data)
self.test.assertEqual(obj.value[0], self.getName())
def pause(self):
time.sleep(self.delay)
def oid(self):
oid = self.storage.new_oid()
self.oids[oid] = None
return oid
def dostore(self, i):
data = zodb_pickle(MinPO((self.getName(), i)))
t = Transaction()
oid = self.oid()
self.pause()
self.storage.tpc_begin(t)
self.pause()
# Always create a new object, signified by None for revid
r1 = self.storage.store(oid, None, data, '', t)
self.pause()
r2 = self.storage.tpc_vote(t)
self.pause()
self.storage.tpc_finish(t)
self.pause()
revid = handle_serials(oid, r1, r2)
self.oids[oid] = revid
class ExtStorageClientThread(StorageClientThread):
def run(self):
# pick some other storage ops to execute
ops = [getattr(self, meth) for meth in dir(ExtStorageClientThread)
if meth.startswith('do_')]
assert ops, "Didn't find an storage ops in %s" % self.storage
# do a store to guarantee there's at least one oid in self.oids
self.dostore(0)
for i in range(self.commits - 1):
meth = random.choice(ops)
meth()
self.dostore(i)
self.check()
def pick_oid(self):
return random.choice(self.oids.keys())
def do_load(self):
oid = self.pick_oid()
self.storage.load(oid, '')
def do_loadSerial(self):
oid = self.pick_oid()
self.storage.loadSerial(oid, self.oids[oid])
def do_modifiedInVersion(self):
oid = self.pick_oid()
self.storage.modifiedInVersion(oid)
def do_undoLog(self):
self.storage.undoLog(0, -20)
def do_iterator(self):
try:
iter = self.storage.iterator()
except AttributeError:
# XXX It's hard to detect that a ZEO ClientStorage
# doesn't have this method, but does have all the others.
return
for obj in iter:
pass
class MTStorage:
"Test a storage with multiple client threads executing concurrently."
def _checkNThreads(self, n, constructor, *args):
threads = [constructor(*args) for i in range(n)]
for t in threads:
t.start()
for t in threads:
t.join()
def check2ZODBThreads(self):
db = ZODB.DB(self._storage)
self._checkNThreads(2, ZODBClientThread, db, self)
def check7ZODBThreads(self):
db = ZODB.DB(self._storage)
self._checkNThreads(7, ZODBClientThread, db, self)
def check2StorageThreads(self):
self._checkNThreads(2, StorageClientThread, self._storage, self)
def check7StorageThreads(self):
self._checkNThreads(7, StorageClientThread, self._storage, self)
def check4ExtStorageThread(self):
self._checkNThreads(4, ExtStorageClientThread, self._storage, self)
from ZODB.POSException import ReadOnlyError
from ZODB.Transaction import Transaction
class ReadOnlyStorage:
def _create_data(self):
# test a read-only storage that already has some data
self.oids = {}
for i in range(10):
oid = self._storage.new_oid()
revid = self._dostore(oid)
self.oids[oid] = revid
def _make_readonly(self):
self._storage.close()
self.open(read_only=1)
self.assert_(self._storage.isReadOnly())
def checkReadMethods(self):
self._create_data()
self._make_readonly()
# XXX not going to bother checking all read methods
for oid in self.oids.keys():
data, revid = self._storage.load(oid, '')
self.assertEqual(revid, self.oids[oid])
self.assert_(not self._storage.modifiedInVersion(oid))
_data = self._storage.loadSerial(oid, revid)
self.assertEqual(data, _data)
def checkWriteMethods(self):
self._make_readonly()
self.assertRaises(ReadOnlyError, self._storage.new_oid)
self.assertRaises(ReadOnlyError, self._storage.undo,
'\000' * 8)
t = Transaction()
self._storage.tpc_begin(t)
self.assertRaises(ReadOnlyError, self._storage.abortVersion,
'', t)
self._storage.tpc_abort(t)
t = Transaction()
self._storage.tpc_begin(t)
self.assertRaises(ReadOnlyError, self._storage.commitVersion,
'', '', t)
self._storage.tpc_abort(t)
t = Transaction()
self._storage.tpc_begin(t)
self.assertRaises(ReadOnlyError, self._storage.store,
'\000' * 8, None, '', '', t)
self._storage.tpc_abort(t)
if self._storage.supportsTransactionalUndo():
t = Transaction()
self._storage.tpc_begin(t)
self.assertRaises(ReadOnlyError, self._storage.transactionalUndo,
'\000' * 8, t)
self._storage.tpc_abort(t)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment