Commit 179d76ea authored by Jim Fulton's avatar Jim Fulton

ZEO Servers now provide an option, invalidation-age, that allows

quick verification of ZEO clients less than a given age even if the
number of transactions the client hasn't seen exceeds the
invalidation queue size. This is only recommended if the storage
being served  supports effecient iteration from a point near the end
of the transaction history.

Also refactored ZEO/tests/zeoserver and ZEO/runzeo to have the test
server use, and this exercisem, the option-handling code from runzeo.
parent 8e8e2357
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
Change History Change History
================ ================
3.9.0 (2008-??-??) 3.9.0 (2009-??-??)
==================== ====================
New Features New Features
...@@ -22,7 +22,7 @@ New Features ...@@ -22,7 +22,7 @@ New Features
XXX There are known issues with this implementation that need to be XXX There are known issues with this implementation that need to be
sorted out before it is "released". sorted out before it is "released".
3.9.0a9 (2008-12-??) 3.9.0a9 (2009-01-??)
==================== ====================
New Features New Features
...@@ -42,10 +42,18 @@ New Features ...@@ -42,10 +42,18 @@ New Features
- As a small convenience (mainly for tests), you can now specify - As a small convenience (mainly for tests), you can now specify
initial data as a string argument to the Blob constructor. initial data as a string argument to the Blob constructor.
- The FileStorage iterator now handles large files better. Whenm - ZEO Servers now provide an option, invalidation-age, that allows
quick verification of ZEO clients less than a given age even if the
number of transactions the client hasn't seen exceeds the
invalidation queue size. This is only recommended if the storage
being served supports effecient iteration from a point near the end
of the transaction history.
- The FileStorage iterator now handles large files better. When
iteratng from a starting transaction near the end of the file, the iteratng from a starting transaction near the end of the file, the
iterator will scan backward from the end of the file to find the iterator will scan backward from the end of the file to find the
starting point. starting point. This enhancement makes it practical to take
advantage of the new storage server invalidation-age option.
3.9.0a8 (2008-12-15) 3.9.0a8 (2008-12-15)
==================== ====================
......
...@@ -34,6 +34,7 @@ import itertools ...@@ -34,6 +34,7 @@ import itertools
import transaction import transaction
import ZODB.serialize import ZODB.serialize
import ZODB.TimeStamp
import ZEO.zrpc.error import ZEO.zrpc.error
import zope.interface import zope.interface
...@@ -48,7 +49,7 @@ from ZODB.ConflictResolution import ResolvedSerial ...@@ -48,7 +49,7 @@ from ZODB.ConflictResolution import ResolvedSerial
from ZODB.POSException import StorageError, StorageTransactionError from ZODB.POSException import StorageError, StorageTransactionError
from ZODB.POSException import TransactionError, ReadOnlyError, ConflictError from ZODB.POSException import TransactionError, ReadOnlyError, ConflictError
from ZODB.serialize import referencesf from ZODB.serialize import referencesf
from ZODB.utils import u64, oid_repr, mktemp from ZODB.utils import u64, p64, oid_repr, mktemp
from ZODB.loglevels import BLATHER from ZODB.loglevels import BLATHER
...@@ -817,6 +818,7 @@ class StorageServer: ...@@ -817,6 +818,7 @@ class StorageServer:
def __init__(self, addr, storages, read_only=0, def __init__(self, addr, storages, read_only=0,
invalidation_queue_size=100, invalidation_queue_size=100,
invalidation_age=None,
transaction_timeout=None, transaction_timeout=None,
monitor_address=None, monitor_address=None,
auth_protocol=None, auth_protocol=None,
...@@ -854,6 +856,13 @@ class StorageServer: ...@@ -854,6 +856,13 @@ class StorageServer:
speed client cache verification when a client disconnects speed client cache verification when a client disconnects
for a short period of time. for a short period of time.
invalidation_age --
If the invalidation queue isn't big enough to support a
quick verification, but the last transaction seen by a
client is younger than the invalidation age, then
invalidations will be computed by iterating over
transactions later than the given transaction.
transaction_timeout -- The maximum amount of time to wait for transaction_timeout -- The maximum amount of time to wait for
a transaction to commit after acquiring the storage lock. a transaction to commit after acquiring the storage lock.
If the transaction takes too long, the client connection If the transaction takes too long, the client connection
...@@ -907,7 +916,7 @@ class StorageServer: ...@@ -907,7 +916,7 @@ class StorageServer:
for name, storage in storages.items(): for name, storage in storages.items():
self._setup_invq(name, storage) self._setup_invq(name, storage)
storage.registerDB(StorageServerDB(self, name)) storage.registerDB(StorageServerDB(self, name))
self.invalidation_age = invalidation_age
self.connections = {} self.connections = {}
self.dispatcher = self.DispatcherClass(addr, self.dispatcher = self.DispatcherClass(addr,
factory=self.new_connection) factory=self.new_connection)
...@@ -1126,30 +1135,34 @@ class StorageServer: ...@@ -1126,30 +1135,34 @@ class StorageServer:
do full cache verification. do full cache verification.
""" """
invq = self.invq[storage_id]
# We make a copy of invq because it might be modified by a # We make a copy of invq because it might be modified by a
# foreign (other than main thread) calling invalidate above. # foreign (other than main thread) calling invalidate above.
invq = invq[:] invq = self.invq[storage_id][:]
if not invq: oids = set()
latest_tid = None
if invq and invq[-1][0] <= tid:
# We have needed data in the queue
for _tid, L in invq:
if _tid <= tid:
break
oids.update(L)
latest_tid = invq[0][0]
elif (self.invalidation_age and
(self.invalidation_age >
(time.time()-ZODB.TimeStamp.TimeStamp(tid).timeTime())
)
):
for t in self.storages[storage_id].iterator(p64(u64(tid)+1)):
for r in t:
oids.add(r.oid)
latest_tid = t.tid
elif not invq:
log("invq empty") log("invq empty")
return None, [] else:
log("tid to old for invq %s < %s" % (u64(tid), u64(invq[-1][0])))
earliest_tid = invq[-1][0]
if earliest_tid > tid:
log("tid to old for invq %s < %s" % (u64(tid), u64(earliest_tid)))
return None, []
oids = {} return latest_tid, list(oids)
for _tid, L in invq:
if _tid <= tid:
break
for key in L:
oids[key] = 1
latest_tid = invq[0][0]
return latest_tid, oids.keys()
def close_server(self): def close_server(self):
"""Close the dispatcher so that there are no new connections. """Close the dispatcher so that there are no new connections.
......
...@@ -45,6 +45,16 @@ ...@@ -45,6 +45,16 @@
</description> </description>
</key> </key>
<key name="invalidation-age" datatype="float" required="no">
<description>
The maximum age of a client for which quick-verification
invalidations will be provided by iterating over the served
storage. This option should only be used if the served storage
supports efficient iteration from a starting point near the
end of the transaction history (e.g. end of file).
</description>
</key>
<key name="monitor-address" datatype="socket-binding-address" <key name="monitor-address" datatype="socket-binding-address"
required="no"> required="no">
<description> <description>
......
...@@ -100,6 +100,7 @@ class ZEOOptionsMixin: ...@@ -100,6 +100,7 @@ class ZEOOptionsMixin:
self.add("read_only", "zeo.read_only", default=0) self.add("read_only", "zeo.read_only", default=0)
self.add("invalidation_queue_size", "zeo.invalidation_queue_size", self.add("invalidation_queue_size", "zeo.invalidation_queue_size",
default=100) default=100)
self.add("invalidation_age", "zeo.invalidation_age")
self.add("transaction_timeout", "zeo.transaction_timeout", self.add("transaction_timeout", "zeo.transaction_timeout",
"t:", "timeout=", float) "t:", "timeout=", float)
self.add("monitor_address", "zeo.monitor_address.address", self.add("monitor_address", "zeo.monitor_address.address",
...@@ -137,7 +138,7 @@ class ZEOOptions(ZDOptions, ZEOOptionsMixin): ...@@ -137,7 +138,7 @@ class ZEOOptions(ZDOptions, ZEOOptionsMixin):
if s.name is None: if s.name is None:
s.name = '1' s.name = '1'
break break
class ZEOServer: class ZEOServer:
...@@ -243,17 +244,7 @@ class ZEOServer: ...@@ -243,17 +244,7 @@ class ZEOServer:
SignalHandler.registerHandler(SIGUSR2, self.handle_sigusr2) SignalHandler.registerHandler(SIGUSR2, self.handle_sigusr2)
def create_server(self): def create_server(self):
from ZEO.StorageServer import StorageServer self.server = create_server(self.storages, self.options)
self.server = StorageServer(
self.options.address,
self.storages,
read_only=self.options.read_only,
invalidation_queue_size=self.options.invalidation_queue_size,
transaction_timeout=self.options.transaction_timeout,
monitor_address=self.options.monitor_address,
auth_protocol=self.options.auth_protocol,
auth_database=self.options.auth_database,
auth_realm=self.options.auth_realm)
def loop_forever(self): def loop_forever(self):
asyncore.loop() asyncore.loop()
...@@ -332,6 +323,23 @@ class ZEOServer: ...@@ -332,6 +323,23 @@ class ZEOServer:
except IOError: except IOError:
logger.error("PID file '%s' could not be removed" % pidfile) logger.error("PID file '%s' could not be removed" % pidfile)
def create_server(storages, options):
from ZEO.StorageServer import StorageServer
return StorageServer(
options.address,
storages,
read_only = options.read_only,
invalidation_queue_size = options.invalidation_queue_size,
invalidation_age = options.invalidation_age,
transaction_timeout = options.transaction_timeout,
monitor_address = options.monitor_address,
auth_protocol = options.auth_protocol,
auth_database = options.auth_database,
auth_realm = options.auth_realm,
)
# Signal names # Signal names
signames = None signames = None
......
...@@ -36,6 +36,7 @@ class ZEOConfig: ...@@ -36,6 +36,7 @@ class ZEOConfig:
self.address = addr self.address = addr
self.read_only = None self.read_only = None
self.invalidation_queue_size = None self.invalidation_queue_size = None
self.invalidation_age = None
self.monitor_address = None self.monitor_address = None
self.transaction_timeout = None self.transaction_timeout = None
self.authentication_protocol = None self.authentication_protocol = None
...@@ -49,6 +50,8 @@ class ZEOConfig: ...@@ -49,6 +50,8 @@ class ZEOConfig:
print >> f, "read-only", self.read_only and "true" or "false" print >> f, "read-only", self.read_only and "true" or "false"
if self.invalidation_queue_size is not None: if self.invalidation_queue_size is not None:
print >> f, "invalidation-queue-size", self.invalidation_queue_size print >> f, "invalidation-queue-size", self.invalidation_queue_size
if self.invalidation_age is not None:
print >> f, "invalidation-age", self.invalidation_age
if self.monitor_address is not None: if self.monitor_address is not None:
print >> f, "monitor-address %s:%s" % self.monitor_address print >> f, "monitor-address %s:%s" % self.monitor_address
if self.transaction_timeout is not None: if self.transaction_timeout is not None:
......
Invalidation age
================
When a ZEO client with a non-empty cache connects to the server, it
needs to verify whether the data in its cache is current. It does
this in one of 2 ways:
quick verification
It gets a list of invalidations from the server since the last
transaction the client has seen and applies those to it's disk and
in-memory caches. This is only possible if there haven't been too
many transactions since the client was last connected.
full verification
If quick verification isn't possible, the client iterates through
it's disk cache asking the server to verify whether each current
entry is valid.
Unfortunately, for large caches, full verification is soooooo not
quick that it is impractical. Quick verificatioin is highly
desireable.
To support quick verification, the server keeps a list of recent
invalidations. The size of this list is controlled by the
invalidation_queue_size parameter. If there is a lot of database
activity, the size might need to be quite large to support having
clients be disconnected for more than a few minutes. A very large
invalidation queue size can use a lot of memory.
To suppliment the invalidation queue, you can also specify an
invalidation_age parameter. When a client connects and presents the
last transaction id it has seen, we first check to see if the
invalidation queue has that transaction id. It it does, then we send
all transactions since that id. Otherwise, we check to see if the
difference between storage's last transaction id and the given id is
less than or equal to the invalidation age. If it is, then we iterate
over the storage, starting with the given id, to get the invalidations
since the given id.
NOTE: This assumes that iterating from a point near the "end" of a
database is inexpensive. Don't use this option for a storage for which
that is not the case.
Here's an example. We set up a server, using an
invalidation-queue-size of 5:
>>> addr, admin = start_server(zeo_conf=dict(invalidation_queue_size=5),
... keep=True)
Now, we'll open a client with a persistent cache, set up some data,
and then close client:
>>> import ZEO, transaction
>>> db = ZEO.DB(addr, client='test')
>>> conn = db.open()
>>> for i in range(9):
... conn.root()[i] = conn.root().__class__()
... conn.root()[i].x = 0
>>> transaction.commit()
>>> db.close()
We'll open another client, and commit some transactions:
>>> db = ZEO.DB(addr)
>>> conn = db.open()
>>> import transaction
>>> for i in range(2):
... conn.root()[i].x = 1
... transaction.commit()
>>> db.close()
If we reopen the first client, we'll do quick verification. We'll
turn on logging so we can see this:
>>> import logging, sys
>>> logging.getLogger().setLevel(logging.INFO)
>>> handler = logging.StreamHandler(sys.stdout)
>>> logging.getLogger().addHandler(handler)
>>> db = ZEO.DB(addr, client='test') # doctest: +ELLIPSIS
('localhost', ...
('localhost', ...) Recovering 2 invalidations
>>> logging.getLogger().removeHandler(handler)
>>> [v.x for v in db.open().root().values()]
[1, 1, 0, 0, 0, 0, 0, 0, 0]
Now, if we disconnect and commit more than 5 transactions, we'll see
that verification is necessary:
>>> db.close()
>>> db = ZEO.DB(addr)
>>> conn = db.open()
>>> import transaction
>>> for i in range(9):
... conn.root()[i].x = 2
... transaction.commit()
>>> db.close()
>>> logging.getLogger().addHandler(handler)
>>> db = ZEO.DB(addr, client='test') # doctest: +ELLIPSIS
('localhost', ...
('localhost', ...) Verifying cache
('localhost', ...) endVerify finishing
('localhost', ...) endVerify finished
>>> logging.getLogger().removeHandler(handler)
>>> [v.x for v in db.open().root().values()]
[2, 2, 2, 2, 2, 2, 2, 2, 2]
>>> db.close()
But if we restart the server with invalidation-age set, we can
do quick verification:
>>> stop_server(admin)
>>> addr, admin = start_server(zeo_conf=dict(invalidation_queue_size=5,
... invalidation_age=100))
>>> db = ZEO.DB(addr)
>>> conn = db.open()
>>> import transaction
>>> for i in range(9):
... conn.root()[i].x = 3
... transaction.commit()
>>> db.close()
>>> logging.getLogger().addHandler(handler)
>>> db = ZEO.DB(addr, client='test') # doctest: +ELLIPSIS
('localhost', ...
('localhost', ...) Recovering 9 invalidations
>>> logging.getLogger().removeHandler(handler)
>>> [v.x for v in db.open().root().values()]
[3, 3, 3, 3, 3, 3, 3, 3, 3]
>>> db.close()
...@@ -68,7 +68,7 @@ class MiscZEOTests: ...@@ -68,7 +68,7 @@ class MiscZEOTests:
"""ZEO tests that don't fit in elsewhere.""" """ZEO tests that don't fit in elsewhere."""
def checkCreativeGetState(self): def checkCreativeGetState(self):
# This test covers persistent objects that provide their own # This test covers persistent objects that provide their own
# __getstate__ which modifies the state of the object. # __getstate__ which modifies the state of the object.
# For details see bug #98275 # For details see bug #98275
...@@ -456,7 +456,7 @@ class CatastrophicClientLoopFailure( ...@@ -456,7 +456,7 @@ class CatastrophicClientLoopFailure(
) )
ZEO.zrpc.connection.client_map[None] = Evil() ZEO.zrpc.connection.client_map[None] = Evil()
try: try:
ZEO.zrpc.connection.client_trigger.pull_trigger() ZEO.zrpc.connection.client_trigger.pull_trigger()
except DisconnectedError: except DisconnectedError:
...@@ -498,7 +498,7 @@ class ConnectionInvalidationOnReconnect( ...@@ -498,7 +498,7 @@ class ConnectionInvalidationOnReconnect(
self._invalidatedCache += 1 self._invalidatedCache += 1
def invalidate(*a, **k): def invalidate(*a, **k):
pass pass
db = DummyDB() db = DummyDB()
storage.registerDB(db) storage.registerDB(db)
...@@ -517,7 +517,7 @@ class ConnectionInvalidationOnReconnect( ...@@ -517,7 +517,7 @@ class ConnectionInvalidationOnReconnect(
# Now, the root object in the connection should have been invalidated: # Now, the root object in the connection should have been invalidated:
self.assertEqual(db._invalidatedCache, base+1) self.assertEqual(db._invalidatedCache, base+1)
class CommonBlobTests: class CommonBlobTests:
...@@ -638,7 +638,7 @@ class BlobAdaptedFileStorageTests(FullGenericTests, CommonBlobTests): ...@@ -638,7 +638,7 @@ class BlobAdaptedFileStorageTests(FullGenericTests, CommonBlobTests):
for i in range(1000000): for i in range(1000000):
somedata.write("%s\n" % i) somedata.write("%s\n" % i)
somedata.seek(0) somedata.seek(0)
blob = Blob() blob = Blob()
bd_fh = blob.open('w') bd_fh = blob.open('w')
ZODB.utils.cp(somedata, bd_fh) ZODB.utils.cp(somedata, bd_fh)
...@@ -665,7 +665,7 @@ class BlobAdaptedFileStorageTests(FullGenericTests, CommonBlobTests): ...@@ -665,7 +665,7 @@ class BlobAdaptedFileStorageTests(FullGenericTests, CommonBlobTests):
def check_data(path): def check_data(path):
self.assert_(os.path.exists(path)) self.assert_(os.path.exists(path))
f = open(path, 'rb') f = open(path, 'rb')
somedata.seek(0) somedata.seek(0)
d1 = d2 = 1 d1 = d2 = 1
while d1 or d2: while d1 or d2:
d1 = f.read(8096) d1 = f.read(8096)
...@@ -681,7 +681,7 @@ class BlobAdaptedFileStorageTests(FullGenericTests, CommonBlobTests): ...@@ -681,7 +681,7 @@ class BlobAdaptedFileStorageTests(FullGenericTests, CommonBlobTests):
self.blobdir, self.blobdir,
ZODB.blob.BushyLayout().getBlobFilePath(oid, revid), ZODB.blob.BushyLayout().getBlobFilePath(oid, revid),
) )
self.assert_(server_filename.startswith(self.blobdir)) self.assert_(server_filename.startswith(self.blobdir))
check_data(server_filename) check_data(server_filename)
...@@ -800,12 +800,12 @@ def multiple_storages_invalidation_queue_is_not_insane(): ...@@ -800,12 +800,12 @@ def multiple_storages_invalidation_queue_is_not_insane():
>>> for i in range(20): >>> for i in range(20):
... o2.x = PersistentDict(); o2 = o2.x ... o2.x = PersistentDict(); o2 = o2.x
... commit() ... commit()
>>> trans, oids = s1.getInvalidations(last) >>> trans, oids = s1.getInvalidations(last)
>>> from ZODB.utils import u64 >>> from ZODB.utils import u64
>>> sorted([int(u64(oid)) for oid in oids]) >>> sorted([int(u64(oid)) for oid in oids])
[10, 11, 12, 13, 14] [10, 11, 12, 13, 14]
>>> server.close_server() >>> server.close_server()
""" """
...@@ -834,7 +834,7 @@ Let's create a file storage and stuff some data into it: ...@@ -834,7 +834,7 @@ Let's create a file storage and stuff some data into it:
>>> db.close() >>> db.close()
Now we'll open a storage server on the data, simulating a restart: Now we'll open a storage server on the data, simulating a restart:
>>> fs = FileStorage('t.fs') >>> fs = FileStorage('t.fs')
>>> sv = StorageServer(('', get_port()), dict(fs=fs)) >>> sv = StorageServer(('', get_port()), dict(fs=fs))
>>> s = ZEOStorage(sv, sv.read_only) >>> s = ZEOStorage(sv, sv.read_only)
...@@ -884,7 +884,7 @@ without this method: ...@@ -884,7 +884,7 @@ without this method:
>>> sv = StorageServer(('', get_port()), dict(fs=fs)) >>> sv = StorageServer(('', get_port()), dict(fs=fs))
>>> st = StorageServerWrapper(sv, 'fs') >>> st = StorageServerWrapper(sv, 'fs')
>>> s = st.server >>> s = st.server
Now, if we ask for the invalidations since the last committed Now, if we ask for the invalidations since the last committed
transaction, we'll get a result: transaction, we'll get a result:
...@@ -1070,7 +1070,7 @@ def client_has_newer_data_than_server(): ...@@ -1070,7 +1070,7 @@ def client_has_newer_data_than_server():
>>> handler.uninstall() >>> handler.uninstall()
>>> stop_server(admin) >>> stop_server(admin)
""" """
def history_over_zeo(): def history_over_zeo():
...@@ -1106,7 +1106,7 @@ def delete_object_multiple_clients(): ...@@ -1106,7 +1106,7 @@ def delete_object_multiple_clients():
"""If we delete on one client, the delete should be reflected on the other. """If we delete on one client, the delete should be reflected on the other.
First, we'll create an object: First, we'll create an object:
>>> addr, _ = start_server() >>> addr, _ = start_server()
>>> db = ZEO.DB(addr) >>> db = ZEO.DB(addr)
>>> conn = db.open() >>> conn = db.open()
...@@ -1116,10 +1116,10 @@ def delete_object_multiple_clients(): ...@@ -1116,10 +1116,10 @@ def delete_object_multiple_clients():
We verify that we can read it in another client, which also loads We verify that we can read it in another client, which also loads
it into the client cache. it into the client cache.
>>> cs = ClientStorage(addr) >>> cs = ClientStorage(addr)
>>> p, s = cs.load(oid) >>> p, s = cs.load(oid)
Now, we'll remove the object: Now, we'll remove the object:
>>> txn = transaction.begin() >>> txn = transaction.begin()
...@@ -1137,7 +1137,7 @@ def delete_object_multiple_clients(): ...@@ -1137,7 +1137,7 @@ def delete_object_multiple_clients():
We'll wait for our other storage to get the invalidation and then We'll wait for our other storage to get the invalidation and then
try to access the object. We'll get a POSKeyError there too: try to access the object. We'll get a POSKeyError there too:
>>> tid = db.storage.lastTransaction() >>> tid = db.storage.lastTransaction()
>>> forker.wait_until(lambda : cs.lastTransaction() == tid) >>> forker.wait_until(lambda : cs.lastTransaction() == tid)
>>> cs.load(oid) # doctest: +ELLIPSIS >>> cs.load(oid) # doctest: +ELLIPSIS
...@@ -1153,7 +1153,7 @@ slow_test_classes = [ ...@@ -1153,7 +1153,7 @@ slow_test_classes = [
BlobAdaptedFileStorageTests, BlobWritableCacheTests, BlobAdaptedFileStorageTests, BlobWritableCacheTests,
DemoStorageTests, FileStorageTests, MappingStorageTests, DemoStorageTests, FileStorageTests, MappingStorageTests,
] ]
quick_test_classes = [ quick_test_classes = [
FileStorageRecoveryTests, ConfigurationTests, HeartbeatTests, FileStorageRecoveryTests, ConfigurationTests, HeartbeatTests,
CatastrophicClientLoopFailure, ConnectionInvalidationOnReconnect, CatastrophicClientLoopFailure, ConnectionInvalidationOnReconnect,
...@@ -1194,7 +1194,7 @@ class ServerManagingClientStorage(ClientStorage): ...@@ -1194,7 +1194,7 @@ class ServerManagingClientStorage(ClientStorage):
shared_blob_dir=True) shared_blob_dir=True)
else: else:
ClientStorage.__init__(self, addr, blob_dir=blob_dir) ClientStorage.__init__(self, addr, blob_dir=blob_dir)
def close(self): def close(self):
ClientStorage.close(self) ClientStorage.close(self)
zope.testing.setupstack.tearDown(self) zope.testing.setupstack.tearDown(self)
...@@ -1228,7 +1228,7 @@ def test_suite(): ...@@ -1228,7 +1228,7 @@ def test_suite():
doctest.DocFileSuite( doctest.DocFileSuite(
'zeo-fan-out.test', 'zdoptions.test', 'zeo-fan-out.test', 'zdoptions.test',
'drop_cache_rather_than_verify.txt', 'client-config.test', 'drop_cache_rather_than_verify.txt', 'client-config.test',
'protocols.test', 'zeo_blob_cache.test', 'protocols.test', 'zeo_blob_cache.test', 'invalidation-age.txt',
setUp=forker.setUp, tearDown=zope.testing.setupstack.tearDown, setUp=forker.setUp, tearDown=zope.testing.setupstack.tearDown,
), ),
) )
......
...@@ -13,20 +13,18 @@ ...@@ -13,20 +13,18 @@
############################################################################## ##############################################################################
"""Helper file used to launch a ZEO server cross platform""" """Helper file used to launch a ZEO server cross platform"""
from ZEO.StorageServer import StorageServer
from ZEO.runzeo import ZEOOptions
import asyncore import asyncore
import os
import sys
import time
import errno import errno
import getopt import getopt
import socket import logging
import os
import signal import signal
import asyncore import socket
import sys
import threading import threading
import logging import time
import ZEO.runzeo
import ZEO.zrpc.connection
def cleanup(storage): def cleanup(storage):
# FileStorage and the Berkeley storages have this method, which deletes # FileStorage and the Berkeley storages have this method, which deletes
...@@ -164,15 +162,14 @@ def main(): ...@@ -164,15 +162,14 @@ def main():
elif opt == '-S': elif opt == '-S':
suicide = False suicide = False
elif opt == '-v': elif opt == '-v':
import ZEO.zrpc.connection
ZEO.zrpc.connection.Connection.current_protocol = arg ZEO.zrpc.connection.Connection.current_protocol = arg
zo = ZEOOptions() zo = ZEO.runzeo.ZEOOptions()
zo.realize(["-C", configfile]) zo.realize(["-C", configfile])
zeo_port = int(zo.address[1]) zeo_port = int(zo.address[1])
if zo.auth_protocol == "plaintext": if zo.auth_protocol == "plaintext":
import ZEO.tests.auth_plaintext __import__('ZEO.tests.auth_plaintext')
# Open the config file and let ZConfig parse the data there. Then remove # Open the config file and let ZConfig parse the data there. Then remove
# the config file, otherwise we'll leave turds. # the config file, otherwise we'll leave turds.
...@@ -185,16 +182,7 @@ def main(): ...@@ -185,16 +182,7 @@ def main():
mon_addr = None mon_addr = None
if zo.monitor_address: if zo.monitor_address:
mon_addr = zo.monitor_address mon_addr = zo.monitor_address
server = StorageServer( server = ZEO.runzeo.create_server({"1": storage}, zo)
zo.address,
{"1": storage},
read_only=zo.read_only,
invalidation_queue_size=zo.invalidation_queue_size,
transaction_timeout=zo.transaction_timeout,
monitor_address=mon_addr,
auth_protocol=zo.auth_protocol,
auth_database=zo.auth_database,
auth_realm=zo.auth_realm)
try: try:
log(label, 'creating the test server, keep: %s', keep) log(label, 'creating the test server, keep: %s', keep)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment