Commit ff92d8be authored by Jim Fulton's avatar Jim Fulton

The storage server is now multi-threaded.

parent da5312e1
...@@ -1346,7 +1346,7 @@ class ClientStub: ...@@ -1346,7 +1346,7 @@ class ClientStub:
self.rpc.callAsync('endVerify') self.rpc.callAsync('endVerify')
def invalidateTransaction(self, tid, args): def invalidateTransaction(self, tid, args):
self.rpc.callAsyncNoPoll('invalidateTransaction', tid, args) self.rpc.callAsync('invalidateTransaction', tid, args)
def serialnos(self, arg): def serialnos(self, arg):
self.rpc.callAsyncNoPoll('serialnos', arg) self.rpc.callAsyncNoPoll('serialnos', arg)
...@@ -1372,11 +1372,11 @@ class ClientStub: ...@@ -1372,11 +1372,11 @@ class ClientStub:
class ClientStub308(ClientStub): class ClientStub308(ClientStub):
def invalidateTransaction(self, tid, args): def invalidateTransaction(self, tid, args):
self.rpc.callAsyncNoPoll( ClientStub.invalidateTransaction(
'invalidateTransaction', tid, [(arg, '') for arg in args]) self, tid, [(arg, '') for arg in args])
def invalidateVerify(self, oid): def invalidateVerify(self, oid):
self.rpc.callAsync('invalidateVerify', (oid, '')) ClientStub.invalidateVerify(self, (oid, ''))
class ZEOStorage308Adapter: class ZEOStorage308Adapter:
......
...@@ -560,14 +560,17 @@ class ManagedServerConnection(Connection): ...@@ -560,14 +560,17 @@ class ManagedServerConnection(Connection):
# Exception types that should not be logged: # Exception types that should not be logged:
unlogged_exception_types = (ZODB.POSException.POSKeyError, ) unlogged_exception_types = (ZODB.POSException.POSKeyError, )
# Servers use a shared server trigger that uses the asyncore socket map
trigger = ZEO.zrpc.trigger.trigger()
call_from_thread = trigger.pull_trigger
def __init__(self, sock, addr, obj, mgr): def __init__(self, sock, addr, obj, mgr):
self.mgr = mgr self.mgr = mgr
Connection.__init__(self, sock, addr, obj, 'S') map = {}
Connection.__init__(self, sock, addr, obj, 'S', map=map)
self.marshal = ServerMarshaller() self.marshal = ServerMarshaller()
self.trigger = ZEO.zrpc.trigger.trigger(map)
self.call_from_thread = self.trigger.pull_trigger
t = threading.Thread(target=server_loop, args=(map,))
t.setDaemon(True)
t.start()
def handshake(self): def handshake(self):
# Send the server's preferred protocol to the client. # Send the server's preferred protocol to the client.
...@@ -601,6 +604,13 @@ class ManagedServerConnection(Connection): ...@@ -601,6 +604,13 @@ class ManagedServerConnection(Connection):
poll = smac.SizedMessageAsyncConnection.handle_write poll = smac.SizedMessageAsyncConnection.handle_write
def server_loop(map):
while len(map) > 1:
asyncore.poll(30.0, map)
for o in map.values():
o.close()
class ManagedClientConnection(Connection): class ManagedClientConnection(Connection):
"""Client-side Connection subclass.""" """Client-side Connection subclass."""
__super_init = Connection.__init__ __super_init = Connection.__init__
...@@ -714,10 +724,6 @@ class ManagedClientConnection(Connection): ...@@ -714,10 +724,6 @@ class ManagedClientConnection(Connection):
self.trigger.pull_trigger() self.trigger.pull_trigger()
# Delay used when we call asyncore.poll() directly.
# Start with a 1 msec delay, double until 1 sec.
delay = 0.001
self.replies_cond.acquire() self.replies_cond.acquire()
try: try:
while 1: while 1:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment