Commit 3cd508c3 authored by Guido van Rossum's avatar Guido van Rossum

Fixed a few docstrings that were out of date.

Added a new test: checkMultiStorageTransaction().  This tests for the
deadlocks that we've seen when multiple appservers do transactions
involving multiple ZEO 2.0 storages.  It also nicely tests the timeout
feature that Jeremy added to StorageServer.

WARNING: with the current ZEO code, this occasionally hangs.  That's
the point of this test. :-)
parent 4e912c57
...@@ -18,7 +18,7 @@ import select ...@@ -18,7 +18,7 @@ import select
import socket import socket
import sys import sys
import tempfile import tempfile
import thread import threading
import time import time
import zLOG import zLOG
...@@ -27,30 +27,38 @@ from ZEO.ClientStorage import ClientStorage ...@@ -27,30 +27,38 @@ from ZEO.ClientStorage import ClientStorage
from ZEO.Exceptions import Disconnected from ZEO.Exceptions import Disconnected
from ZEO.zrpc.marshal import Marshaller from ZEO.zrpc.marshal import Marshaller
from ZODB.Transaction import get_transaction from ZODB.Transaction import get_transaction, Transaction
from ZODB.POSException import ReadOnlyError from ZODB.POSException import ReadOnlyError
from ZODB.tests import StorageTestBase from ZODB.tests.StorageTestBase import StorageTestBase
from ZODB.tests.StorageTestBase import zodb_unpickle, MinPO from ZODB.tests.MinPO import MinPO
from ZODB.tests.StorageTestBase import zodb_pickle, zodb_unpickle
from ZODB.tests.StorageTestBase import handle_all_serials, ZERO
class DummyDB: class DummyDB:
def invalidate(self, *args): def invalidate(self, *args):
pass pass
class ConnectionTests(StorageTestBase.StorageTestBase): class ConnectionTests(StorageTestBase):
"""Tests that explicitly manage the server process. """Tests that explicitly manage the server process.
To test the cache or re-connection, these test cases explicit To test the cache or re-connection, these test cases explicit
start and stop a ZEO storage server. start and stop a ZEO storage server.
This must be subclassed; the subclass must provide implementations
of startServer() and shutdownServer().
""" """
__super_tearDown = StorageTestBase.StorageTestBase.tearDown __super_setUp = StorageTestBase.setUp
__super_tearDown = StorageTestBase.tearDown
def setUp(self): def setUp(self):
"""Start a ZEO server using a Unix domain socket """Test setup for connection tests.
The ZEO server uses the storage object returned by the This starts only one server; a test may start more servers by
getStorage() method. calling self._newAddr() and then self.startServer(index=i)
for i in 1, 2, ...
""" """
self.__super_setUp()
zLOG.LOG("testZEO", zLOG.INFO, "setUp() %s" % self.id()) zLOG.LOG("testZEO", zLOG.INFO, "setUp() %s" % self.id())
self.file = tempfile.mktemp() self.file = tempfile.mktemp()
self.addr = [] self.addr = []
...@@ -70,15 +78,17 @@ class ConnectionTests(StorageTestBase.StorageTestBase): ...@@ -70,15 +78,17 @@ class ConnectionTests(StorageTestBase.StorageTestBase):
return 'localhost', random.randrange(25000, 30000, 2) return 'localhost', random.randrange(25000, 30000, 2)
def openClientStorage(self, cache='', cache_size=200000, wait=1, def openClientStorage(self, cache='', cache_size=200000, wait=1,
read_only=0, read_only_fallback=0): read_only=0, read_only_fallback=0,
base = ClientStorage(self.addr, addr=None):
if addr is None:
addr = self.addr
storage = ClientStorage(addr,
client=cache, client=cache,
cache_size=cache_size, cache_size=cache_size,
wait=wait, wait=wait,
min_disconnect_poll=0.1, min_disconnect_poll=0.1,
read_only=read_only, read_only=read_only,
read_only_fallback=read_only_fallback) read_only_fallback=read_only_fallback)
storage = base
storage.registerDB(DummyDB(), None) storage.registerDB(DummyDB(), None)
return storage return storage
...@@ -376,7 +386,7 @@ class ConnectionTests(StorageTestBase.StorageTestBase): ...@@ -376,7 +386,7 @@ class ConnectionTests(StorageTestBase.StorageTestBase):
self._dostore() self._dostore()
break break
except (Disconnected, ReadOnlyError, except (Disconnected, ReadOnlyError,
select.error, thread.error, socket.error): select.error, threading.ThreadError, socket.error):
time.sleep(0.1) time.sleep(0.1)
else: else:
self.fail("Couldn't store after starting a read-write server") self.fail("Couldn't store after starting a read-write server")
...@@ -453,7 +463,8 @@ class ConnectionTests(StorageTestBase.StorageTestBase): ...@@ -453,7 +463,8 @@ class ConnectionTests(StorageTestBase.StorageTestBase):
try: try:
self._dostore(oid, data=obj) self._dostore(oid, data=obj)
break break
except (Disconnected, select.error, thread.error, socket.error): except (Disconnected, select.error,
threading.ThreadError, socket.error):
zLOG.LOG("checkReconnection", zLOG.INFO, zLOG.LOG("checkReconnection", zLOG.INFO,
"Error after server restart; retrying.", "Error after server restart; retrying.",
error=sys.exc_info()) error=sys.exc_info())
...@@ -503,3 +514,107 @@ class ConnectionTests(StorageTestBase.StorageTestBase): ...@@ -503,3 +514,107 @@ class ConnectionTests(StorageTestBase.StorageTestBase):
self._storage = self.openClientStorage() self._storage = self.openClientStorage()
self._dostore() self._dostore()
# Test case for multiple storages participating in a single
# transaction. This is not really a connection test, but it needs
# about the same infrastructure (several storage servers).
# XXX WARNING: with the current ZEO code, this occasionally hangs.
# That's the point of this test. :-)
def checkMultiStorageTransaction(self):
# Configuration parameters (larger values mean more likely deadlocks)
self.nservers = 2
self.nthreads = 2
self.ntrans = 2
self.nobj = 2
# Start extra servers
for i in range(1, self.nservers):
self._newAddr()
self.startServer(index=i)
# Spawn threads that each do some transactions on all storages
threads = []
try:
for i in range(self.nthreads):
t = MSTThread(self, "T%d" % i)
threads.append(t)
t.start()
# Wait for all threads to finish
for t in threads:
t.join(60)
self.failIf(t.isAlive(), "%s didn't die" % t.getName())
finally:
for t in threads:
t.closeclients()
class MSTThread(threading.Thread):
__super_init = threading.Thread.__init__
def __init__(self, testcase, name):
self.__super_init(name=name)
self.testcase = testcase
self.clients = []
def run(self):
tname = self.getName()
testcase = self.testcase
# Create client connections to each server
clients = self.clients
for i in range(len(testcase.addr)):
c = testcase.openClientStorage(addr=testcase.addr[i])
c.__name = "C%d" % i
clients.append(c)
for i in range(testcase.ntrans):
# Because we want a transaction spanning all storages,
# we can't use _dostore(). This is several _dostore() calls
# expanded in-line (mostly).
# Create oid->serial mappings
for c in clients:
c.__oids = []
c.__serials = {}
# Begin a transaction
t = Transaction()
for c in clients:
c.tpc_begin(t)
for j in range(testcase.nobj):
for c in clients:
# Create and store a new object on each server
oid = c.new_oid()
c.__oids.append(oid)
data = MinPO("%s.%s.t%d.o%d" % (tname, c.__name, i, j))
data = zodb_pickle(data)
s = c.store(oid, ZERO, data, '', t)
c.__serials.update(handle_all_serials(oid, s))
# Vote on all servers and handle serials
for c in clients:
s = c.tpc_vote(t)
c.__serials.update(handle_all_serials(None, s))
# Finish on all servers
for c in clients:
c.tpc_finish(t)
for c in clients:
# Check that we got serials for all oids
for oid in c.__oids:
testcase.failUnless(c.__serials.has_key(oid))
# Check that we got serials for no other oids
for oid in c.__serials.keys():
testcase.failUnless(oid in c.__oids)
def closeclients(self):
# Close clients opened by run()
for c in self.clients:
try:
c.close()
except:
pass
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment