Commit f895cbd2 authored by Jeremy Hylton's avatar Jeremy Hylton

Prevent client from using stale cache data while connecting.

XXX Maybe there should be an option to allow this.

A ZEO client can run in disconnected mode, using data from
its cache, or in connected mode.  Several instance variables
are related to whether the client is connected.

_server: All method calls are invoked through the server
   stub.  When not connect, set to disconnected_stub an
   object that raises ClientDisconnected errors.

_ready: A threading Event that is set only if _server
   is set to a real stub.

_connection: The current zrpc connection or None.

_connection is set as soon as a connection is established,
but _server is set only after cache verification has finished
and clients can safely use the server.  _pending_server holds
a server stub while it is being verified.

Before this change, a client could start using a connection before
verification finished.  If verification took a long time, it could
even commit a new transaction using a mixing of old and new data.
parent c0a9a8d4
......@@ -202,11 +202,33 @@ class ClientStorage:
wait = 1
self._addr = addr # For tests
# A ZEO client can run in disconnected mode, using data from
# its cache, or in connected mode. Several instance variables
# are related to whether the client is connected.
# _server: All method calls are invoked through the server
# stub. When not connect, set to disconnected_stub an
# object that raises ClientDisconnected errors.
# _ready: A threading Event that is set only if _server
# is set to a real stub.
# _connection: The current zrpc connection or None.
# _connection is set as soon as a connection is established,
# but _server is set only after cache verification has finished
# and clients can safely use the server. _pending_server holds
# a server stub while it is being verified.
self._server = disconnected_stub
self._connection = None
self._pending_server = None
self._ready = threading.Event()
self._is_read_only = read_only
self._storage = storage
self._read_only_fallback = read_only_fallback
self._connection = None
# _server_addr is used by sortKey()
self._server_addr = None
self._tfile = None
......@@ -257,16 +279,46 @@ class ClientStorage:
tmax=max_disconnect_poll)
if wait:
self._rpc_mgr.connect(sync=1)
self._wait()
else:
# attempt_connect() will make an attempt that doesn't block
# "too long," for a very vague notion of too long. If that
# doesn't succeed, call connect() to start a thread.
if not self._rpc_mgr.attempt_connect():
self._rpc_mgr.connect()
# If we're connected at this point, the cache is opened as a
# side effect of verify_cache(). If not, open it now.
if not self.is_connected():
# If the connect hasn't occurred, run with cached data.
if not self._ready.isSet():
self._cache.open()
def _wait(self):
# Wait for a connection to be established.
self._rpc_mgr.connect(sync=1)
# When a synchronous connect() call returns, there is
# a valid _connection object but cache validation may
# still be going on. This code must wait until validation
# finishes, but if the connection isn't a zrpc async
# connection it also needs to poll for input.
if self._connection.is_async():
while 1:
self._ready.wait(30)
if self._ready.isSet():
break
log2(INFO, "Waiting to connect to server")
else:
# If there is no mainloop running, this code needs
# to call poll() to cause asyncore to handle events.
while 1:
cn = self._connection
if cn is None:
# If the connection was closed while we were
# waiting for it to become ready, start over.
return self._wait()
else:
cn.pending(30)
if self._ready.isSet():
break
log2(INFO, "Waiting to connect to server")
def close(self):
"""Storage API: finalize the storage, releasing external resources."""
self._tbuf.close()
......@@ -288,17 +340,22 @@ class ClientStorage:
def is_connected(self):
"""Return whether the storage is currently connected to a server."""
if self._server is disconnected_stub:
return 0
else:
return 1
# This function is used by clients, so we only report that a
# connection exists when the connection is ready to use.
return self._ready.isSet()
def sync(self):
"""Handle any pending invalidation messages.
This is called by the sync method in ZODB.Connection.
"""
self._server._update()
# If there is no connection, return immediately. Technically,
# there are no pending invalidations so they are all handled.
# There doesn't seem to be much benefit to raising an exception.
cn = self._connection
if cn is not None:
cn.pending()
def testConnection(self, conn):
"""Internal: test the given connection.
......@@ -346,23 +403,24 @@ class ClientStorage:
else:
reconnect = 0
self.set_server_addr(conn.get_addr())
stub = self.StorageServerStubClass(conn)
stub = self.StorageServerStubClass(conn)
self._oids = []
self._info.update(stub.get_info())
self.verify_cache(stub)
# XXX The stub should be saved here and set in endVerify() below.
# If we are upgrading from a read-only fallback connection,
# we must close the old connection to prevent it from being
# used while the cache is verified against the new connection.
if self._connection is not None:
self._connection.close()
self._connection = conn
self._server = stub
if reconnect:
log2(INFO, "Reconnected to storage: %s" % self._server_addr)
else:
log2(INFO, "Connected to storage: %s" % self._server_addr)
stub = self.StorageServerStubClass(conn)
self._oids = []
self._info.update(stub.get_info())
self.verify_cache(stub)
def set_server_addr(self, addr):
# Normalize server address and convert to string
if isinstance(addr, types.StringType):
......@@ -396,6 +454,11 @@ class ClientStorage:
The return value (indicating which path we took) is used by
the test suite.
"""
# If verify_cache() finishes the cache verification process,
# it should set self._server. If it goes through full cache
# verification, then endVerify() should self._server.
last_inval_tid = self._cache.getLastTid()
if last_inval_tid is not None:
ltid = server.lastTransaction()
......@@ -403,10 +466,12 @@ class ClientStorage:
log2(INFO, "No verification necessary "
"(last_inval_tid up-to-date)")
self._cache.open()
self._server = server
self._ready.set()
return "no verification"
# log some hints about last transaction
log2(INFO, "last inval tid: %r %s"
log2(INFO, "last inval tid: %r %s\n"
% (last_inval_tid, tid2time(last_inval_tid)))
log2(INFO, "last transaction: %r %s" %
(ltid, ltid and tid2time(ltid)))
......@@ -416,6 +481,8 @@ class ClientStorage:
log2(INFO, "Recovering %d invalidations" % len(pair[1]))
self._cache.open()
self.invalidateTransaction(*pair)
self._server = server
self._ready.set()
return "quick verification"
log2(INFO, "Verifying cache")
......@@ -425,6 +492,7 @@ class ClientStorage:
self._pickler.fast = 1 # Don't use the memo
self._cache.verify(server.zeoVerify)
self._pending_server = server
server.endZeoVerify()
return "full verification"
......@@ -445,6 +513,7 @@ class ClientStorage:
log2(PROBLEM, "Disconnected from storage: %s"
% repr(self._server_addr))
self._connection = None
self._ready.clear()
self._server = disconnected_stub
def __len__(self):
......@@ -847,12 +916,20 @@ class ClientStorage:
while 1:
oid, version = unpick.load()
log2(INFO, "verify invalidate %r" % oid)
if not oid:
break
self._cache.invalidate(oid, version=version)
if self._db is not None:
self._db.invalidate(oid, version=version)
f.close()
log2(INFO, "endVerify finishing")
self._server = self._pending_server
self._ready.set()
self._pending_conn = None
log2(INFO, "endVerify finished")
def invalidateTransaction(self, tid, args):
"""Invalidate objects modified by tid."""
self._cache.setLastTid(tid)
......
......@@ -39,15 +39,6 @@ class StorageServer:
def extensionMethod(self, name):
return ExtensionMethodWrapper(self.rpc, name).call
def _update(self):
"""Handle pending incoming messages.
This method is typically only used when no asyncore mainloop
is already active. It can cause arbitrary callbacks from the
server to the client to be handled.
"""
self.rpc.pending()
def register(self, storage_name, read_only):
self.rpc.call('register', storage_name, read_only)
......
......@@ -202,6 +202,7 @@ class ConnectionTests(CommonSetupTearDown):
self._dostore()
break
except Disconnected:
self._storage.sync()
time.sleep(0.5)
def checkReadOnlyClient(self):
......@@ -345,13 +346,15 @@ class ConnectionTests(CommonSetupTearDown):
try:
self._dostore(oid, data=obj)
break
except (Disconnected, select.error,
threading.ThreadError, socket.error):
except Disconnected:
# Maybe the exception mess is better now
## except (Disconnected, select.error,
## threading.ThreadError, socket.error):
zLOG.LOG("checkReconnection", zLOG.INFO,
"Error after server restart; retrying.",
error=sys.exc_info())
get_transaction().abort()
time.sleep(0.1) # XXX how long to sleep
self._storage.sync()
# XXX This is a bloody pain. We're placing a heavy burden
# on users to catch a plethora of exceptions in order to
# write robust code. Need to think about implementing
......@@ -564,9 +567,8 @@ class ReconnectionTests(CommonSetupTearDown):
try:
self._dostore()
break
except (Disconnected, ReadOnlyError,
select.error, threading.ThreadError, socket.error):
time.sleep(0.1)
except (Disconnected, ReadOnlyError):
self._storage.sync()
else:
self.fail("Couldn't store after starting a read-write server")
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment