Commit 6134baab authored by Jim Fulton's avatar Jim Fulton

Fixed read-only fallback.

We were sending lastTransaction requests while register requests were
in-flight.  If register failed, then the lastTransaction request was
invalid, causing the connection to be closed. :(

When we update the server, we'll have register return lastTransaction
and probably info, since the client wants that information on connect.
parent b79c7216
...@@ -78,9 +78,16 @@ class Protocol(asyncio.Protocol): ...@@ -78,9 +78,16 @@ class Protocol(asyncio.Protocol):
self._connecting.cancel() self._connecting.cancel()
if self.transport is not None: if self.transport is not None:
self.transport.close() self.transport.close()
for future in self.futures.values(): for future in self.pop_futures():
future.set_exception(ClientDisconnected("Closed")) future.set_exception(ClientDisconnected("Closed"))
self.futures.clear()
def pop_futures(self):
# Remove and return futures from self.futures. The caller
# will finalize them in some way and callbacks may modify
# self.futures.
futures = list(self.futures.values())
self.futures.clear()
return futures
def protocol_factory(self): def protocol_factory(self):
return self return self
...@@ -137,38 +144,15 @@ class Protocol(asyncio.Protocol): ...@@ -137,38 +144,15 @@ class Protocol(asyncio.Protocol):
self._writeit = writeit self._writeit = writeit
def pause_writing(self):
self.paused.append(1)
def resume_writing(self):
paused = self.paused
del paused[:]
output = self.output
writelines = self.transport.writelines
from struct import pack
while output and not paused:
message = output.pop(0)
if isinstance(message, bytes):
writelines((pack(">I", len(message)), message))
else:
data = message
for message in data:
writelines((pack(">I", len(message)), message))
if paused: # paused again. Put iter back.
output.insert(0, data)
break
def get_peername(self):
return self.transport.get_extra_info('peername')
def connection_lost(self, exc): def connection_lost(self, exc):
if self.closed: if self.closed:
for f in self.futures.values(): for f in self.pop_futures():
f.cancel() f.cancel()
else: else:
logger.info("Disconnected, %s, %r", self, exc)
self.client.disconnected(self) self.client.disconnected(self)
for f in self.futures.values(): # We have to be careful processing the futures, because
# exception callbacks might modufy them.
for f in self.pop_futures():
f.set_exception(ClientDisconnected(exc or 'connection lost')) f.set_exception(ClientDisconnected(exc or 'connection lost'))
def finish_connect(self, protocol_version): def finish_connect(self, protocol_version):
...@@ -199,15 +183,27 @@ class Protocol(asyncio.Protocol): ...@@ -199,15 +183,27 @@ class Protocol(asyncio.Protocol):
'register', self.storage_key, 'register', self.storage_key,
self.read_only if self.read_only is not Fallback else False, self.read_only if self.read_only is not Fallback else False,
) )
# Get lastTransaction in flight right away to make successful if self.read_only is not Fallback:
# connection quicker # Get lastTransaction in flight right away to make
lastTransaction = self.promise('lastTransaction') # successful connection quicker, but only if we're not
# doing read-only fallback. If we might need to retry, we
# can't send lastTransaction because if the registration
# fails, it will be seen as an invalid message and the
# connection will close. :( It would be a lot better of
# registere returned the last transaction (and info while
# it's at it).
lastTransaction = self.promise('lastTransaction')
else:
lastTransaction = None # to make python happy
@register @register
def registered(_): def registered(_):
if self.read_only is Fallback: if self.read_only is Fallback:
self.read_only = False self.read_only = False
self.client.registered(self, lastTransaction) r_lastTransaction = self.promise('lastTransaction')
else:
r_lastTransaction = lastTransaction
self.client.registered(self, r_lastTransaction)
@register.catch @register.catch
def register_failed(exc): def register_failed(exc):
...@@ -215,8 +211,10 @@ class Protocol(asyncio.Protocol): ...@@ -215,8 +211,10 @@ class Protocol(asyncio.Protocol):
self.read_only is Fallback): self.read_only is Fallback):
# We tried a write connection, degrade to a read-only one # We tried a write connection, degrade to a read-only one
self.read_only = True self.read_only = True
register = self.promise( logger.info("%s write connection failed. Trying read-only",
'register', self.storage_key, self.read_only) self)
register = self.promise('register', self.storage_key, True)
# get lastTransaction in flight.
lastTransaction = self.promise('lastTransaction') lastTransaction = self.promise('lastTransaction')
@register @register
...@@ -312,6 +310,30 @@ class Protocol(asyncio.Protocol): ...@@ -312,6 +310,30 @@ class Protocol(asyncio.Protocol):
def promise(self, method, *args): def promise(self, method, *args):
return self.call(Promise(), method, args) return self.call(Promise(), method, args)
def pause_writing(self):
self.paused.append(1)
def resume_writing(self):
paused = self.paused
del paused[:]
output = self.output
writelines = self.transport.writelines
from struct import pack
while output and not paused:
message = output.pop(0)
if isinstance(message, bytes):
writelines((pack(">I", len(message)), message))
else:
data = message
for message in data:
writelines((pack(">I", len(message)), message))
if paused: # paused again. Put iter back.
output.insert(0, data)
break
def get_peername(self):
return self.transport.get_extra_info('peername')
# Methods called by the server. # Methods called by the server.
# WARNING WARNING we can't call methods that call back to us # WARNING WARNING we can't call methods that call back to us
# syncronously, as that would lead to DEADLOCK! # syncronously, as that would lead to DEADLOCK!
...@@ -759,6 +781,10 @@ class ClientThread(ClientRunner): ...@@ -759,6 +781,10 @@ class ClientThread(ClientRunner):
daemon=True, daemon=True,
) )
self.started = threading.Event() self.started = threading.Event()
self.thread.start()
self.started.wait()
if self.exception:
raise self.exception
exception = None exception = None
def run(self): def run(self):
...@@ -782,10 +808,6 @@ class ClientThread(ClientRunner): ...@@ -782,10 +808,6 @@ class ClientThread(ClientRunner):
logger.debug('Stopping client thread') logger.debug('Stopping client thread')
def start(self, wait=True): def start(self, wait=True):
self.thread.start()
self.started.wait()
if self.exception:
raise self.exception
if wait: if wait:
self.wait_for_result(self.connected, self.timeout) self.wait_for_result(self.connected, self.timeout)
......
...@@ -441,21 +441,19 @@ class AsyncTests(setupstack.TestCase, ClientRunner): ...@@ -441,21 +441,19 @@ class AsyncTests(setupstack.TestCase, ClientRunner):
self.assertEqual(self.unsized(transport.pop(2)), b'Z3101') self.assertEqual(self.unsized(transport.pop(2)), b'Z3101')
# We see that the client tried a writable connection: # We see that the client tried a writable connection:
self.assertEqual(self.parse(transport.pop()), self.assertEqual(self.parse(transport.pop()),
[(1, False, 'register', ('TEST', False)), (1, False, 'register', ('TEST', False)))
(2, False, 'lastTransaction', ()),
])
# We respond with a read-only exception: # We respond with a read-only exception:
respond(1, (ReadOnlyError, ReadOnlyError())) respond(1, (ReadOnlyError, ReadOnlyError()))
self.assertTrue(self.is_read_only()) self.assertTrue(self.is_read_only())
# The client tries for a read-only connection: # The client tries for a read-only connection:
self.assertEqual(self.parse(transport.pop()), self.assertEqual(self.parse(transport.pop()),
[(3, False, 'register', ('TEST', True)), [(2, False, 'register', ('TEST', True)),
(4, False, 'lastTransaction', ()), (3, False, 'lastTransaction', ()),
]) ])
# We respond with successfully: # We respond with successfully:
respond(3, None) respond(2, None)
respond(4, 'b'*8) respond(3, 'b'*8)
self.assertTrue(self.is_read_only()) self.assertTrue(self.is_read_only())
# At this point, the client is ready and using the protocol, # At this point, the client is ready and using the protocol,
...@@ -467,8 +465,8 @@ class AsyncTests(setupstack.TestCase, ClientRunner): ...@@ -467,8 +465,8 @@ class AsyncTests(setupstack.TestCase, ClientRunner):
# The client asks for info, and we respond: # The client asks for info, and we respond:
self.assertEqual(self.parse(transport.pop()), self.assertEqual(self.parse(transport.pop()),
(5, False, 'get_info', ())) (4, False, 'get_info', ()))
respond(5, dict(length=42)) respond(4, dict(length=42))
self.assert_(connected.done()) self.assert_(connected.done())
...@@ -477,15 +475,18 @@ class AsyncTests(setupstack.TestCase, ClientRunner): ...@@ -477,15 +475,18 @@ class AsyncTests(setupstack.TestCase, ClientRunner):
loop.protocol.data_received(sized(b'Z3101')) loop.protocol.data_received(sized(b'Z3101'))
self.assertEqual(self.unsized(loop.transport.pop(2)), b'Z3101') self.assertEqual(self.unsized(loop.transport.pop(2)), b'Z3101')
self.assertEqual(self.parse(loop.transport.pop()), self.assertEqual(self.parse(loop.transport.pop()),
[(1, False, 'register', ('TEST', False)), (1, False, 'register', ('TEST', False)))
(2, False, 'lastTransaction', ()),
])
self.assertTrue(self.is_read_only()) self.assertTrue(self.is_read_only())
# We respond and the writable connection succeeds: # We respond and the writable connection succeeds:
respond(1, None) respond(1, None)
self.assertFalse(self.is_read_only()) self.assertFalse(self.is_read_only())
# at this point, a lastTransaction request is emitted:
self.assertEqual(self.parse(loop.transport.pop()),
(2, False, 'lastTransaction', ()))
# Now, the original protocol is closed, and the client is # Now, the original protocol is closed, and the client is
# no-longer ready: # no-longer ready:
self.assertFalse(client.ready) self.assertFalse(client.ready)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment