Commit 8a27614a authored by Jeremy Hylton's avatar Jeremy Hylton

Swap order of StorageServer and ZEOStorage to avoid patchup at the

end.
parent b0cb1069
...@@ -56,208 +56,6 @@ def log(message, level=zLOG.INFO, label=None, error=None): ...@@ -56,208 +56,6 @@ def log(message, level=zLOG.INFO, label=None, error=None):
class StorageServerError(StorageError): class StorageServerError(StorageError):
"""Error reported when an unpickleable exception is raised.""" """Error reported when an unpickleable exception is raised."""
class StorageServer:
"""The server side implementation of ZEO.
The StorageServer is the 'manager' for incoming connections. Each
connection is associated with its own ZEOStorage instance (defined
below). The StorageServer may handle multiple storages; each
ZEOStorage instance only handles a single storage.
"""
# Classes we instantiate. A subclass might override.
DispatcherClass = Dispatcher
ZEOStorageClass = None # patched up later
ManagedServerConnectionClass = ManagedServerConnection
def __init__(self, addr, storages, read_only=0,
invalidation_queue_size=100,
transaction_timeout=None):
"""StorageServer constructor.
This is typically invoked from the start.py script.
Arguments (the first two are required and positional):
addr -- the address at which the server should listen. This
can be a tuple (host, port) to signify a TCP/IP connection
or a pathname string to signify a Unix domain socket
connection. A hostname may be a DNS name or a dotted IP
address.
storages -- a dictionary giving the storage(s) to handle. The
keys are the storage names, the values are the storage
instances, typically FileStorage or Berkeley storage
instances. By convention, storage names are typically
strings representing small integers starting at '1'.
read_only -- an optional flag saying whether the server should
operate in read-only mode. Defaults to false. Note that
even if the server is operating in writable mode,
individual storages may still be read-only. But if the
server is in read-only mode, no write operations are
allowed, even if the storages are writable. Note that
pack() is considered a read-only operation.
invalidation_queue_size -- The storage server keeps a queue
of the objects modified by the last N transactions, where
N == invalidation_queue_size. This queue is used to
speed client cache verification when a client disconnects
for a short period of time.
transaction_timout -- The maximum amount of time to wait for
a transaction to commit after acquiring the storage lock.
If the transaction takes too long, the client connection
will be closed and the transaction aborted.
"""
self.addr = addr
self.storages = storages
set_label()
msg = ", ".join(
["%s:%s:%s" % (name, storage.isReadOnly() and "RO" or "RW",
storage.getName())
for name, storage in storages.items()])
log("%s created %s with storages: %s" %
(self.__class__.__name__, read_only and "RO" or "RW", msg))
for s in storages.values():
s._waiting = []
self.read_only = read_only
# A list of at most invalidation_queue_size invalidations
self.invq = []
self.invq_bound = invalidation_queue_size
self.connections = {}
self.dispatcher = self.DispatcherClass(addr,
factory=self.new_connection,
reuse_addr=1)
self.timeouts = {}
for name in self.storages.keys():
if transaction_timeout is None:
# An object with no-op methods
timeout = StubTimeoutThread()
else:
timeout = TimeoutThread(transaction_timeout)
timeout.start()
self.timeouts[name] = timeout
def new_connection(self, sock, addr):
"""Internal: factory to create a new connection.
This is called by the Dispatcher class in ZEO.zrpc.server
whenever accept() returns a socket for a new incoming
connection.
"""
z = self.ZEOStorageClass(self, self.read_only)
c = self.ManagedServerConnectionClass(sock, addr, z, self)
log("new connection %s: %s" % (addr, `c`))
return c
def register_connection(self, storage_id, conn):
"""Internal: register a connection with a particular storage.
This is called by ZEOStorage.register().
The dictionary self.connections maps each storage name to a
list of current connections for that storage; this information
is needed to handle invalidation. This function updates this
dictionary.
Returns the timeout object for the appropriate storage.
"""
l = self.connections.get(storage_id)
if l is None:
l = self.connections[storage_id] = []
l.append(conn)
return self.timeouts[storage_id]
def invalidate(self, conn, storage_id, tid, invalidated=(), info=None):
"""Internal: broadcast info and invalidations to clients.
This is called from several ZEOStorage methods.
This can do three different things:
- If the invalidated argument is non-empty, it broadcasts
invalidateTransaction() messages to all clients of the given
storage except the current client (the conn argument).
- If the invalidated argument is empty and the info argument
is a non-empty dictionary, it broadcasts info() messages to
all clients of the given storage, including the current
client.
- If both the invalidated argument and the info argument are
non-empty, it broadcasts invalidateTransaction() messages to all
clients except the current, and sends an info() message to
the current client.
"""
if invalidated:
if len(self.invq) >= self.invq_bound:
del self.invq[0]
self.invq.append((tid, invalidated))
for p in self.connections.get(storage_id, ()):
if invalidated and p is not conn:
p.client.invalidateTransaction(tid, invalidated)
elif info is not None:
p.client.info(info)
def get_invalidations(self, tid):
"""Return a tid and list of all objects invalidation since tid.
The tid is the most recent transaction id committed by the server.
Returns None if it is unable to provide a complete list
of invalidations for tid. In this case, client should
do full cache verification.
"""
if not self.invq:
log("invq empty")
return None, []
earliest_tid = self.invq[0][0]
if earliest_tid > tid:
log("tid to old for invq %s < %s" % (u64(tid), u64(earliest_tid)))
return None, []
oids = {}
for tid, L in self.invq:
for key in L:
oids[key] = 1
latest_tid = self.invq[-1][0]
return latest_tid, oids.keys()
def close_server(self):
"""Close the dispatcher so that there are no new connections.
This is only called from the test suite, AFAICT.
"""
for timeout in self.timeouts.values():
timeout.stop()
self.dispatcher.close()
for storage in self.storages.values():
storage.close()
# Force the asyncore mainloop to exit by hackery, i.e. close
# every socket in the map. loop() will return when the map is
# empty.
for s in asyncore.socket_map.values():
try:
s.close()
except:
pass
def close_conn(self, conn):
"""Internal: remove the given connection from self.connections.
This is the inverse of register_connection().
"""
for cl in self.connections.values():
if conn.obj in cl:
cl.remove(conn.obj)
class ZEOStorage: class ZEOStorage:
"""Proxy to underlying storage for a single remote client.""" """Proxy to underlying storage for a single remote client."""
...@@ -727,6 +525,208 @@ class ZEOStorage: ...@@ -727,6 +525,208 @@ class ZEOStorage:
else: else:
return 1 return 1
class StorageServer:
"""The server side implementation of ZEO.
The StorageServer is the 'manager' for incoming connections. Each
connection is associated with its own ZEOStorage instance (defined
below). The StorageServer may handle multiple storages; each
ZEOStorage instance only handles a single storage.
"""
# Classes we instantiate. A subclass might override.
DispatcherClass = Dispatcher
ZEOStorageClass = ZEOStorage
ManagedServerConnectionClass = ManagedServerConnection
def __init__(self, addr, storages, read_only=0,
invalidation_queue_size=100,
transaction_timeout=None):
"""StorageServer constructor.
This is typically invoked from the start.py script.
Arguments (the first two are required and positional):
addr -- the address at which the server should listen. This
can be a tuple (host, port) to signify a TCP/IP connection
or a pathname string to signify a Unix domain socket
connection. A hostname may be a DNS name or a dotted IP
address.
storages -- a dictionary giving the storage(s) to handle. The
keys are the storage names, the values are the storage
instances, typically FileStorage or Berkeley storage
instances. By convention, storage names are typically
strings representing small integers starting at '1'.
read_only -- an optional flag saying whether the server should
operate in read-only mode. Defaults to false. Note that
even if the server is operating in writable mode,
individual storages may still be read-only. But if the
server is in read-only mode, no write operations are
allowed, even if the storages are writable. Note that
pack() is considered a read-only operation.
invalidation_queue_size -- The storage server keeps a queue
of the objects modified by the last N transactions, where
N == invalidation_queue_size. This queue is used to
speed client cache verification when a client disconnects
for a short period of time.
transaction_timout -- The maximum amount of time to wait for
a transaction to commit after acquiring the storage lock.
If the transaction takes too long, the client connection
will be closed and the transaction aborted.
"""
self.addr = addr
self.storages = storages
set_label()
msg = ", ".join(
["%s:%s:%s" % (name, storage.isReadOnly() and "RO" or "RW",
storage.getName())
for name, storage in storages.items()])
log("%s created %s with storages: %s" %
(self.__class__.__name__, read_only and "RO" or "RW", msg))
for s in storages.values():
s._waiting = []
self.read_only = read_only
# A list of at most invalidation_queue_size invalidations
self.invq = []
self.invq_bound = invalidation_queue_size
self.connections = {}
self.dispatcher = self.DispatcherClass(addr,
factory=self.new_connection,
reuse_addr=1)
self.timeouts = {}
for name in self.storages.keys():
if transaction_timeout is None:
# An object with no-op methods
timeout = StubTimeoutThread()
else:
timeout = TimeoutThread(transaction_timeout)
timeout.start()
self.timeouts[name] = timeout
def new_connection(self, sock, addr):
"""Internal: factory to create a new connection.
This is called by the Dispatcher class in ZEO.zrpc.server
whenever accept() returns a socket for a new incoming
connection.
"""
z = self.ZEOStorageClass(self, self.read_only)
c = self.ManagedServerConnectionClass(sock, addr, z, self)
log("new connection %s: %s" % (addr, `c`))
return c
def register_connection(self, storage_id, conn):
"""Internal: register a connection with a particular storage.
This is called by ZEOStorage.register().
The dictionary self.connections maps each storage name to a
list of current connections for that storage; this information
is needed to handle invalidation. This function updates this
dictionary.
Returns the timeout object for the appropriate storage.
"""
l = self.connections.get(storage_id)
if l is None:
l = self.connections[storage_id] = []
l.append(conn)
return self.timeouts[storage_id]
def invalidate(self, conn, storage_id, tid, invalidated=(), info=None):
"""Internal: broadcast info and invalidations to clients.
This is called from several ZEOStorage methods.
This can do three different things:
- If the invalidated argument is non-empty, it broadcasts
invalidateTransaction() messages to all clients of the given
storage except the current client (the conn argument).
- If the invalidated argument is empty and the info argument
is a non-empty dictionary, it broadcasts info() messages to
all clients of the given storage, including the current
client.
- If both the invalidated argument and the info argument are
non-empty, it broadcasts invalidateTransaction() messages to all
clients except the current, and sends an info() message to
the current client.
"""
if invalidated:
if len(self.invq) >= self.invq_bound:
del self.invq[0]
self.invq.append((tid, invalidated))
for p in self.connections.get(storage_id, ()):
if invalidated and p is not conn:
p.client.invalidateTransaction(tid, invalidated)
elif info is not None:
p.client.info(info)
def get_invalidations(self, tid):
"""Return a tid and list of all objects invalidation since tid.
The tid is the most recent transaction id committed by the server.
Returns None if it is unable to provide a complete list
of invalidations for tid. In this case, client should
do full cache verification.
"""
if not self.invq:
log("invq empty")
return None, []
earliest_tid = self.invq[0][0]
if earliest_tid > tid:
log("tid to old for invq %s < %s" % (u64(tid), u64(earliest_tid)))
return None, []
oids = {}
for tid, L in self.invq:
for key in L:
oids[key] = 1
latest_tid = self.invq[-1][0]
return latest_tid, oids.keys()
def close_server(self):
"""Close the dispatcher so that there are no new connections.
This is only called from the test suite, AFAICT.
"""
for timeout in self.timeouts.values():
timeout.stop()
self.dispatcher.close()
for storage in self.storages.values():
storage.close()
# Force the asyncore mainloop to exit by hackery, i.e. close
# every socket in the map. loop() will return when the map is
# empty.
for s in asyncore.socket_map.values():
try:
s.close()
except:
pass
def close_conn(self, conn):
"""Internal: remove the given connection from self.connections.
This is the inverse of register_connection().
"""
for cl in self.connections.values():
if conn.obj in cl:
cl.remove(conn.obj)
class StubTimeoutThread: class StubTimeoutThread:
def begin(self, client): def begin(self, client):
...@@ -844,6 +844,3 @@ class SlowMethodThread(threading.Thread): ...@@ -844,6 +844,3 @@ class SlowMethodThread(threading.Thread):
self.delay.error(sys.exc_info()) self.delay.error(sys.exc_info())
else: else:
self.delay.reply(result) self.delay.reply(result)
# Patch up class references
StorageServer.ZEOStorageClass = ZEOStorage
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment