Commit 156107a8 authored by Jim Fulton's avatar Jim Fulton

ZEO Servers now provide an option, invalidation-age, that allows

quick verification of ZEO clients less than a given age even if the
number of transactions the client hasn't seen exceeds the
invalidation queue size. This is only recommended if the storage
being served  supports effecient iteration from a point near the end
of the transaction history.

Also refactored ZEO/tests/zeoserver and ZEO/runzeo to have the test
server use, and this exercisem, the option-handling code from runzeo.
parent f51affd7
......@@ -2,7 +2,7 @@
Change History
================
3.9.0 (2008-??-??)
3.9.0 (2009-??-??)
====================
New Features
......@@ -22,7 +22,7 @@ New Features
XXX There are known issues with this implementation that need to be
sorted out before it is "released".
3.9.0a9 (2008-12-??)
3.9.0a9 (2009-01-??)
====================
New Features
......@@ -42,10 +42,18 @@ New Features
- As a small convenience (mainly for tests), you can now specify
initial data as a string argument to the Blob constructor.
- The FileStorage iterator now handles large files better. Whenm
- ZEO Servers now provide an option, invalidation-age, that allows
quick verification of ZEO clients less than a given age even if the
number of transactions the client hasn't seen exceeds the
invalidation queue size. This is only recommended if the storage
being served supports effecient iteration from a point near the end
of the transaction history.
- The FileStorage iterator now handles large files better. When
iteratng from a starting transaction near the end of the file, the
iterator will scan backward from the end of the file to find the
starting point.
starting point. This enhancement makes it practical to take
advantage of the new storage server invalidation-age option.
3.9.0a8 (2008-12-15)
====================
......
......@@ -34,6 +34,7 @@ import itertools
import transaction
import ZODB.serialize
import ZODB.TimeStamp
import ZEO.zrpc.error
import zope.interface
......@@ -48,7 +49,7 @@ from ZODB.ConflictResolution import ResolvedSerial
from ZODB.POSException import StorageError, StorageTransactionError
from ZODB.POSException import TransactionError, ReadOnlyError, ConflictError
from ZODB.serialize import referencesf
from ZODB.utils import u64, oid_repr, mktemp
from ZODB.utils import u64, p64, oid_repr, mktemp
from ZODB.loglevels import BLATHER
......@@ -817,6 +818,7 @@ class StorageServer:
def __init__(self, addr, storages, read_only=0,
invalidation_queue_size=100,
invalidation_age=None,
transaction_timeout=None,
monitor_address=None,
auth_protocol=None,
......@@ -854,6 +856,13 @@ class StorageServer:
speed client cache verification when a client disconnects
for a short period of time.
invalidation_age --
If the invalidation queue isn't big enough to support a
quick verification, but the last transaction seen by a
client is younger than the invalidation age, then
invalidations will be computed by iterating over
transactions later than the given transaction.
transaction_timeout -- The maximum amount of time to wait for
a transaction to commit after acquiring the storage lock.
If the transaction takes too long, the client connection
......@@ -907,7 +916,7 @@ class StorageServer:
for name, storage in storages.items():
self._setup_invq(name, storage)
storage.registerDB(StorageServerDB(self, name))
self.invalidation_age = invalidation_age
self.connections = {}
self.dispatcher = self.DispatcherClass(addr,
factory=self.new_connection)
......@@ -1126,30 +1135,34 @@ class StorageServer:
do full cache verification.
"""
invq = self.invq[storage_id]
# We make a copy of invq because it might be modified by a
# foreign (other than main thread) calling invalidate above.
invq = invq[:]
if not invq:
invq = self.invq[storage_id][:]
oids = set()
latest_tid = None
if invq and invq[-1][0] <= tid:
# We have needed data in the queue
for _tid, L in invq:
if _tid <= tid:
break
oids.update(L)
latest_tid = invq[0][0]
elif (self.invalidation_age and
(self.invalidation_age >
(time.time()-ZODB.TimeStamp.TimeStamp(tid).timeTime())
)
):
for t in self.storages[storage_id].iterator(p64(u64(tid)+1)):
for r in t:
oids.add(r.oid)
latest_tid = t.tid
elif not invq:
log("invq empty")
return None, []
earliest_tid = invq[-1][0]
if earliest_tid > tid:
log("tid to old for invq %s < %s" % (u64(tid), u64(earliest_tid)))
return None, []
else:
log("tid to old for invq %s < %s" % (u64(tid), u64(invq[-1][0])))
oids = {}
for _tid, L in invq:
if _tid <= tid:
break
for key in L:
oids[key] = 1
latest_tid = invq[0][0]
return latest_tid, oids.keys()
return latest_tid, list(oids)
def close_server(self):
"""Close the dispatcher so that there are no new connections.
......
......@@ -45,6 +45,16 @@
</description>
</key>
<key name="invalidation-age" datatype="float" required="no">
<description>
The maximum age of a client for which quick-verification
invalidations will be provided by iterating over the served
storage. This option should only be used if the served storage
supports efficient iteration from a starting point near the
end of the transaction history (e.g. end of file).
</description>
</key>
<key name="monitor-address" datatype="socket-binding-address"
required="no">
<description>
......
......@@ -100,6 +100,7 @@ class ZEOOptionsMixin:
self.add("read_only", "zeo.read_only", default=0)
self.add("invalidation_queue_size", "zeo.invalidation_queue_size",
default=100)
self.add("invalidation_age", "zeo.invalidation_age")
self.add("transaction_timeout", "zeo.transaction_timeout",
"t:", "timeout=", float)
self.add("monitor_address", "zeo.monitor_address.address",
......@@ -137,7 +138,7 @@ class ZEOOptions(ZDOptions, ZEOOptionsMixin):
if s.name is None:
s.name = '1'
break
class ZEOServer:
......@@ -243,17 +244,7 @@ class ZEOServer:
SignalHandler.registerHandler(SIGUSR2, self.handle_sigusr2)
def create_server(self):
from ZEO.StorageServer import StorageServer
self.server = StorageServer(
self.options.address,
self.storages,
read_only=self.options.read_only,
invalidation_queue_size=self.options.invalidation_queue_size,
transaction_timeout=self.options.transaction_timeout,
monitor_address=self.options.monitor_address,
auth_protocol=self.options.auth_protocol,
auth_database=self.options.auth_database,
auth_realm=self.options.auth_realm)
self.server = create_server(self.storages, self.options)
def loop_forever(self):
asyncore.loop()
......@@ -332,6 +323,23 @@ class ZEOServer:
except IOError:
logger.error("PID file '%s' could not be removed" % pidfile)
def create_server(storages, options):
from ZEO.StorageServer import StorageServer
return StorageServer(
options.address,
storages,
read_only = options.read_only,
invalidation_queue_size = options.invalidation_queue_size,
invalidation_age = options.invalidation_age,
transaction_timeout = options.transaction_timeout,
monitor_address = options.monitor_address,
auth_protocol = options.auth_protocol,
auth_database = options.auth_database,
auth_realm = options.auth_realm,
)
# Signal names
signames = None
......
......@@ -36,6 +36,7 @@ class ZEOConfig:
self.address = addr
self.read_only = None
self.invalidation_queue_size = None
self.invalidation_age = None
self.monitor_address = None
self.transaction_timeout = None
self.authentication_protocol = None
......@@ -49,6 +50,8 @@ class ZEOConfig:
print >> f, "read-only", self.read_only and "true" or "false"
if self.invalidation_queue_size is not None:
print >> f, "invalidation-queue-size", self.invalidation_queue_size
if self.invalidation_age is not None:
print >> f, "invalidation-age", self.invalidation_age
if self.monitor_address is not None:
print >> f, "monitor-address %s:%s" % self.monitor_address
if self.transaction_timeout is not None:
......
Invalidation age
================
When a ZEO client with a non-empty cache connects to the server, it
needs to verify whether the data in its cache is current. It does
this in one of 2 ways:
quick verification
It gets a list of invalidations from the server since the last
transaction the client has seen and applies those to it's disk and
in-memory caches. This is only possible if there haven't been too
many transactions since the client was last connected.
full verification
If quick verification isn't possible, the client iterates through
it's disk cache asking the server to verify whether each current
entry is valid.
Unfortunately, for large caches, full verification is soooooo not
quick that it is impractical. Quick verificatioin is highly
desireable.
To support quick verification, the server keeps a list of recent
invalidations. The size of this list is controlled by the
invalidation_queue_size parameter. If there is a lot of database
activity, the size might need to be quite large to support having
clients be disconnected for more than a few minutes. A very large
invalidation queue size can use a lot of memory.
To suppliment the invalidation queue, you can also specify an
invalidation_age parameter. When a client connects and presents the
last transaction id it has seen, we first check to see if the
invalidation queue has that transaction id. It it does, then we send
all transactions since that id. Otherwise, we check to see if the
difference between storage's last transaction id and the given id is
less than or equal to the invalidation age. If it is, then we iterate
over the storage, starting with the given id, to get the invalidations
since the given id.
NOTE: This assumes that iterating from a point near the "end" of a
database is inexpensive. Don't use this option for a storage for which
that is not the case.
Here's an example. We set up a server, using an
invalidation-queue-size of 5:
>>> addr, admin = start_server(zeo_conf=dict(invalidation_queue_size=5),
... keep=True)
Now, we'll open a client with a persistent cache, set up some data,
and then close client:
>>> import ZEO, transaction
>>> db = ZEO.DB(addr, client='test')
>>> conn = db.open()
>>> for i in range(9):
... conn.root()[i] = conn.root().__class__()
... conn.root()[i].x = 0
>>> transaction.commit()
>>> db.close()
We'll open another client, and commit some transactions:
>>> db = ZEO.DB(addr)
>>> conn = db.open()
>>> import transaction
>>> for i in range(2):
... conn.root()[i].x = 1
... transaction.commit()
>>> db.close()
If we reopen the first client, we'll do quick verification. We'll
turn on logging so we can see this:
>>> import logging, sys
>>> logging.getLogger().setLevel(logging.INFO)
>>> handler = logging.StreamHandler(sys.stdout)
>>> logging.getLogger().addHandler(handler)
>>> db = ZEO.DB(addr, client='test') # doctest: +ELLIPSIS
('localhost', ...
('localhost', ...) Recovering 2 invalidations
>>> logging.getLogger().removeHandler(handler)
>>> [v.x for v in db.open().root().values()]
[1, 1, 0, 0, 0, 0, 0, 0, 0]
Now, if we disconnect and commit more than 5 transactions, we'll see
that verification is necessary:
>>> db.close()
>>> db = ZEO.DB(addr)
>>> conn = db.open()
>>> import transaction
>>> for i in range(9):
... conn.root()[i].x = 2
... transaction.commit()
>>> db.close()
>>> logging.getLogger().addHandler(handler)
>>> db = ZEO.DB(addr, client='test') # doctest: +ELLIPSIS
('localhost', ...
('localhost', ...) Verifying cache
('localhost', ...) endVerify finishing
('localhost', ...) endVerify finished
>>> logging.getLogger().removeHandler(handler)
>>> [v.x for v in db.open().root().values()]
[2, 2, 2, 2, 2, 2, 2, 2, 2]
>>> db.close()
But if we restart the server with invalidation-age set, we can
do quick verification:
>>> stop_server(admin)
>>> addr, admin = start_server(zeo_conf=dict(invalidation_queue_size=5,
... invalidation_age=100))
>>> db = ZEO.DB(addr)
>>> conn = db.open()
>>> import transaction
>>> for i in range(9):
... conn.root()[i].x = 3
... transaction.commit()
>>> db.close()
>>> logging.getLogger().addHandler(handler)
>>> db = ZEO.DB(addr, client='test') # doctest: +ELLIPSIS
('localhost', ...
('localhost', ...) Recovering 9 invalidations
>>> logging.getLogger().removeHandler(handler)
>>> [v.x for v in db.open().root().values()]
[3, 3, 3, 3, 3, 3, 3, 3, 3]
>>> db.close()
......@@ -68,7 +68,7 @@ class MiscZEOTests:
"""ZEO tests that don't fit in elsewhere."""
def checkCreativeGetState(self):
# This test covers persistent objects that provide their own
# This test covers persistent objects that provide their own
# __getstate__ which modifies the state of the object.
# For details see bug #98275
......@@ -456,7 +456,7 @@ class CatastrophicClientLoopFailure(
)
ZEO.zrpc.connection.client_map[None] = Evil()
try:
ZEO.zrpc.connection.client_trigger.pull_trigger()
except DisconnectedError:
......@@ -498,7 +498,7 @@ class ConnectionInvalidationOnReconnect(
self._invalidatedCache += 1
def invalidate(*a, **k):
pass
db = DummyDB()
storage.registerDB(db)
......@@ -517,7 +517,7 @@ class ConnectionInvalidationOnReconnect(
# Now, the root object in the connection should have been invalidated:
self.assertEqual(db._invalidatedCache, base+1)
class CommonBlobTests:
......@@ -638,7 +638,7 @@ class BlobAdaptedFileStorageTests(FullGenericTests, CommonBlobTests):
for i in range(1000000):
somedata.write("%s\n" % i)
somedata.seek(0)
blob = Blob()
bd_fh = blob.open('w')
ZODB.utils.cp(somedata, bd_fh)
......@@ -665,7 +665,7 @@ class BlobAdaptedFileStorageTests(FullGenericTests, CommonBlobTests):
def check_data(path):
self.assert_(os.path.exists(path))
f = open(path, 'rb')
somedata.seek(0)
somedata.seek(0)
d1 = d2 = 1
while d1 or d2:
d1 = f.read(8096)
......@@ -681,7 +681,7 @@ class BlobAdaptedFileStorageTests(FullGenericTests, CommonBlobTests):
self.blobdir,
ZODB.blob.BushyLayout().getBlobFilePath(oid, revid),
)
self.assert_(server_filename.startswith(self.blobdir))
check_data(server_filename)
......@@ -800,12 +800,12 @@ def multiple_storages_invalidation_queue_is_not_insane():
>>> for i in range(20):
... o2.x = PersistentDict(); o2 = o2.x
... commit()
>>> trans, oids = s1.getInvalidations(last)
>>> from ZODB.utils import u64
>>> sorted([int(u64(oid)) for oid in oids])
[10, 11, 12, 13, 14]
>>> server.close_server()
"""
......@@ -834,7 +834,7 @@ Let's create a file storage and stuff some data into it:
>>> db.close()
Now we'll open a storage server on the data, simulating a restart:
>>> fs = FileStorage('t.fs')
>>> sv = StorageServer(('', get_port()), dict(fs=fs))
>>> s = ZEOStorage(sv, sv.read_only)
......@@ -884,7 +884,7 @@ without this method:
>>> sv = StorageServer(('', get_port()), dict(fs=fs))
>>> st = StorageServerWrapper(sv, 'fs')
>>> s = st.server
Now, if we ask for the invalidations since the last committed
transaction, we'll get a result:
......@@ -1070,7 +1070,7 @@ def client_has_newer_data_than_server():
>>> handler.uninstall()
>>> stop_server(admin)
"""
def history_over_zeo():
......@@ -1106,7 +1106,7 @@ def delete_object_multiple_clients():
"""If we delete on one client, the delete should be reflected on the other.
First, we'll create an object:
>>> addr, _ = start_server()
>>> db = ZEO.DB(addr)
>>> conn = db.open()
......@@ -1116,10 +1116,10 @@ def delete_object_multiple_clients():
We verify that we can read it in another client, which also loads
it into the client cache.
>>> cs = ClientStorage(addr)
>>> p, s = cs.load(oid)
Now, we'll remove the object:
>>> txn = transaction.begin()
......@@ -1137,7 +1137,7 @@ def delete_object_multiple_clients():
We'll wait for our other storage to get the invalidation and then
try to access the object. We'll get a POSKeyError there too:
>>> tid = db.storage.lastTransaction()
>>> forker.wait_until(lambda : cs.lastTransaction() == tid)
>>> cs.load(oid) # doctest: +ELLIPSIS
......@@ -1153,7 +1153,7 @@ slow_test_classes = [
BlobAdaptedFileStorageTests, BlobWritableCacheTests,
DemoStorageTests, FileStorageTests, MappingStorageTests,
]
quick_test_classes = [
FileStorageRecoveryTests, ConfigurationTests, HeartbeatTests,
CatastrophicClientLoopFailure, ConnectionInvalidationOnReconnect,
......@@ -1194,7 +1194,7 @@ class ServerManagingClientStorage(ClientStorage):
shared_blob_dir=True)
else:
ClientStorage.__init__(self, addr, blob_dir=blob_dir)
def close(self):
ClientStorage.close(self)
zope.testing.setupstack.tearDown(self)
......@@ -1228,7 +1228,7 @@ def test_suite():
doctest.DocFileSuite(
'zeo-fan-out.test', 'zdoptions.test',
'drop_cache_rather_than_verify.txt', 'client-config.test',
'protocols.test', 'zeo_blob_cache.test',
'protocols.test', 'zeo_blob_cache.test', 'invalidation-age.txt',
setUp=forker.setUp, tearDown=zope.testing.setupstack.tearDown,
),
)
......
......@@ -13,20 +13,18 @@
##############################################################################
"""Helper file used to launch a ZEO server cross platform"""
from ZEO.StorageServer import StorageServer
from ZEO.runzeo import ZEOOptions
import asyncore
import os
import sys
import time
import errno
import getopt
import socket
import logging
import os
import signal
import asyncore
import socket
import sys
import threading
import logging
import time
import ZEO.runzeo
import ZEO.zrpc.connection
def cleanup(storage):
# FileStorage and the Berkeley storages have this method, which deletes
......@@ -164,15 +162,14 @@ def main():
elif opt == '-S':
suicide = False
elif opt == '-v':
import ZEO.zrpc.connection
ZEO.zrpc.connection.Connection.current_protocol = arg
zo = ZEOOptions()
zo = ZEO.runzeo.ZEOOptions()
zo.realize(["-C", configfile])
zeo_port = int(zo.address[1])
if zo.auth_protocol == "plaintext":
import ZEO.tests.auth_plaintext
__import__('ZEO.tests.auth_plaintext')
# Open the config file and let ZConfig parse the data there. Then remove
# the config file, otherwise we'll leave turds.
......@@ -185,16 +182,7 @@ def main():
mon_addr = None
if zo.monitor_address:
mon_addr = zo.monitor_address
server = StorageServer(
zo.address,
{"1": storage},
read_only=zo.read_only,
invalidation_queue_size=zo.invalidation_queue_size,
transaction_timeout=zo.transaction_timeout,
monitor_address=mon_addr,
auth_protocol=zo.auth_protocol,
auth_database=zo.auth_database,
auth_realm=zo.auth_realm)
server = ZEO.runzeo.create_server({"1": storage}, zo)
try:
log(label, 'creating the test server, keep: %s', keep)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment