Commit 53007b4a authored by Jim Fulton's avatar Jim Fulton

Added (back) heartbeats from client to server

parent 4833183f
...@@ -41,7 +41,8 @@ class Protocol(asyncio.Protocol): ...@@ -41,7 +41,8 @@ class Protocol(asyncio.Protocol):
protocols = b"Z309", b"Z310", b"Z3101" protocols = b"Z309", b"Z310", b"Z3101"
def __init__(self, loop, def __init__(self, loop,
addr, client, storage_key, read_only, connect_poll=1): addr, client, storage_key, read_only, connect_poll=1,
heartbeat_interval=60):
"""Create a client interface """Create a client interface
addr is either a host,port tuple or a string file name. addr is either a host,port tuple or a string file name.
...@@ -58,6 +59,7 @@ class Protocol(asyncio.Protocol): ...@@ -58,6 +59,7 @@ class Protocol(asyncio.Protocol):
self.__class__.__name__, addr, storage_key, read_only) self.__class__.__name__, addr, storage_key, read_only)
self.client = client self.client = client
self.connect_poll = connect_poll self.connect_poll = connect_poll
self.heartbeat_interval = heartbeat_interval
self.futures = {} # { message_id -> future } self.futures = {} # { message_id -> future }
self.input = [] # Buffer when assembling messages self.input = [] # Buffer when assembling messages
self.output = [] # Buffer when paused self.output = [] # Buffer when paused
...@@ -144,7 +146,10 @@ class Protocol(asyncio.Protocol): ...@@ -144,7 +146,10 @@ class Protocol(asyncio.Protocol):
self._writeit = writeit self._writeit = writeit
self.heartbeat(write=False)
def connection_lost(self, exc): def connection_lost(self, exc):
self.heartbeat_handle.cancel()
if self.closed: if self.closed:
for f in self.pop_futures(): for f in self.pop_futures():
f.cancel() f.cancel()
...@@ -358,6 +363,12 @@ class Protocol(asyncio.Protocol): ...@@ -358,6 +363,12 @@ class Protocol(asyncio.Protocol):
) )
client_delegated = client_methods[2:] client_delegated = client_methods[2:]
def heartbeat(self, write=True):
if write:
self._write(b'(J\xff\xff\xff\xffK\x00U\x06.replyNt.')
self.heartbeat_handle = self.loop.call_later(
self.heartbeat_interval, self.heartbeat)
class Client: class Client:
"""asyncio low-level ZEO client interface """asyncio low-level ZEO client interface
""" """
......
...@@ -51,13 +51,23 @@ class Loop: ...@@ -51,13 +51,23 @@ class Loop:
def call_soon_threadsafe(self, func, *args): def call_soon_threadsafe(self, func, *args):
func(*args) func(*args)
return Handle()
def call_later(self, delay, func, *args): def call_later(self, delay, func, *args):
self.later.append((delay, func, args)) handle = Handle()
self.later.append((delay, func, args, handle))
return handle
def call_exception_handler(self, context): def call_exception_handler(self, context):
self.exceptions.append(context) self.exceptions.append(context)
class Handle:
cancelled = False
def cancel(self):
self.cancelled = True
class Transport: class Transport:
capacity = 1 << 64 capacity = 1 << 64
......
...@@ -368,7 +368,7 @@ class AsyncTests(setupstack.TestCase, ClientRunner): ...@@ -368,7 +368,7 @@ class AsyncTests(setupstack.TestCase, ClientRunner):
self.assertEqual(sorted(loop.connecting), addrs[1:]) self.assertEqual(sorted(loop.connecting), addrs[1:])
# The failed connection is attempted in the future: # The failed connection is attempted in the future:
delay, func, args = loop.later.pop(0) delay, func, args, _ = loop.later.pop(0)
self.assert_(1 <= delay <= 2) self.assert_(1 <= delay <= 2)
func(*args) func(*args)
self.assertEqual(sorted(loop.connecting), addrs) self.assertEqual(sorted(loop.connecting), addrs)
...@@ -384,10 +384,11 @@ class AsyncTests(setupstack.TestCase, ClientRunner): ...@@ -384,10 +384,11 @@ class AsyncTests(setupstack.TestCase, ClientRunner):
# Now, when the first connection fails, it won't be retried, # Now, when the first connection fails, it won't be retried,
# because we're already connected. # because we're already connected.
self.assertEqual(sorted(loop.later), []) # (first in later is heartbeat)
self.assertEqual(sorted(loop.later[1:]), [])
loop.fail_connecting(addrs[0]) loop.fail_connecting(addrs[0])
self.assertEqual(sorted(loop.connecting), []) self.assertEqual(sorted(loop.connecting), [])
self.assertEqual(sorted(loop.later), []) self.assertEqual(sorted(loop.later[1:]), [])
def test_bad_server_tid(self): def test_bad_server_tid(self):
# If in verification we get a server_tid behing the cache's, make sure # If in verification we get a server_tid behing the cache's, make sure
...@@ -406,9 +407,9 @@ class AsyncTests(setupstack.TestCase, ClientRunner): ...@@ -406,9 +407,9 @@ class AsyncTests(setupstack.TestCase, ClientRunner):
respond(1, None) respond(1, None)
respond(2, 'a'*8) respond(2, 'a'*8)
self.assertFalse(client.connected.done() or transport.data) self.assertFalse(client.connected.done() or transport.data)
delay, func, args = loop.later.pop(0) delay, func, args, _ = loop.later.pop(1) # first in later is heartbeat
self.assert_(8 < delay < 10) self.assert_(8 < delay < 10)
self.assertEqual(len(loop.later), 0) self.assertEqual(len(loop.later), 1) # first in later is heartbeat
func(*args) # connect again func(*args) # connect again
self.assertFalse(protocol is loop.protocol) self.assertFalse(protocol is loop.protocol)
self.assertFalse(transport is loop.transport) self.assertFalse(transport is loop.transport)
...@@ -641,6 +642,35 @@ class AsyncTests(setupstack.TestCase, ClientRunner): ...@@ -641,6 +642,35 @@ class AsyncTests(setupstack.TestCase, ClientRunner):
wrapper.receiveBlobChunk.assert_called_with('oid', 'serial', chunk) wrapper.receiveBlobChunk.assert_called_with('oid', 'serial', chunk)
wrapper.receiveBlobStop.assert_called_with('oid', 'serial') wrapper.receiveBlobStop.assert_called_with('oid', 'serial')
def test_heartbeat(self):
# Protocols run heartbeats on a configurable (sort of)
# heartbeat interval, which defaults to every 60 seconds.
wrapper, cache, loop, client, protocol, transport, send, respond = (
self.start(finish_start=True))
delay, func, args, handle = loop.later.pop()
self.assertEqual(
(delay, func, args, handle),
(60, protocol.heartbeat, (), protocol.heartbeat_handle),
)
self.assertFalse(loop.later or handle.cancelled)
# The heartbeat function sends heartbeat data and reschedules itself.
func()
self.assertEqual(self.parse(transport.pop()), (-1, 0, '.reply', None))
self.assertTrue(protocol.heartbeat_handle != handle)
delay, func, args, handle = loop.later.pop()
self.assertEqual(
(delay, func, args, handle),
(60, protocol.heartbeat, (), protocol.heartbeat_handle),
)
self.assertFalse(loop.later or handle.cancelled)
# The heartbeat is cancelled when the protocol connection is lost:
protocol.connection_lost(None)
self.assertTrue(handle.cancelled)
def unsized(self, data, unpickle=False): def unsized(self, data, unpickle=False):
result = [] result = []
while data: while data:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment