Commit 37c66789 authored by Jim Fulton's avatar Jim Fulton

When calling loadBefore, collapse outstanding calls.

IOW, if there's an outstanding call is made for a given oid and tid,
and another call is made, the second call will use the result of the
outstanding call, rather than making another call to the server.
parent d2095794
...@@ -240,6 +240,17 @@ class Protocol(base.Protocol): ...@@ -240,6 +240,17 @@ class Protocol(base.Protocol):
def promise(self, method, *args): def promise(self, method, *args):
return self.call(Promise(), method, args) return self.call(Promise(), method, args)
def load_before(self, oid, tid):
# Special-case loadBefore, so we collapse outstanding requests
message_id = (oid, tid)
future = self.futures.get(message_id)
if future is None:
future = asyncio.Future(loop=self.loop)
self.futures[message_id] = future
self._write(
self.encode(message_id, False, 'loadBefore', (oid, tid)))
return future.add_done_callback
# Methods called by the server. # Methods called by the server.
# WARNING WARNING we can't call methods that call back to us # WARNING WARNING we can't call methods that call back to us
# syncronously, as that would lead to DEADLOCK! # syncronously, as that would lead to DEADLOCK!
...@@ -519,14 +530,17 @@ class Client(object): ...@@ -519,14 +530,17 @@ class Client(object):
if data is not None: if data is not None:
future.set_result(data) future.set_result(data)
elif self.ready: elif self.ready:
@self.protocol.promise('loadBefore', oid, tid)
def load_before(data): @self.protocol.load_before(oid, tid)
def load_before(load_future):
try:
data = load_future.result()
future.set_result(data) future.set_result(data)
if data: if data:
data, start, end = data data, start, end = data
self.cache.store(oid, start, end, data) self.cache.store(oid, start, end, data)
except Exception as exc:
load_before.catch(future.set_exception) future.set_exception(exc)
else: else:
self._when_ready(self.load_before_threadsafe, future, oid, tid) self._when_ready(self.load_before_threadsafe, future, oid, tid)
......
...@@ -71,6 +71,8 @@ class Base(object): ...@@ -71,6 +71,8 @@ class Base(object):
class ClientTests(Base, setupstack.TestCase, ClientRunner): class ClientTests(Base, setupstack.TestCase, ClientRunner):
maxDiff = None
def start(self, def start(self,
addrs=(('127.0.0.1', 8200), ), loop_addrs=None, addrs=(('127.0.0.1', 8200), ), loop_addrs=None,
read_only=False, read_only=False,
...@@ -191,9 +193,11 @@ class ClientTests(Base, setupstack.TestCase, ClientRunner): ...@@ -191,9 +193,11 @@ class ClientTests(Base, setupstack.TestCase, ClientRunner):
# Loading objects gets special handling to leverage the cache. # Loading objects gets special handling to leverage the cache.
loaded = self.load_before(b'1'*8, m64) loaded = self.load_before(b'1'*8, m64)
# The data wasn't in the cache, so we make a server call: # The data wasn't in the cache, so we made a server call:
self.assertEqual(self.pop(), (5, False, 'loadBefore', (b'1'*8, m64))) self.assertEqual(self.pop(),
self.respond(5, (b'data', b'a'*8, None)) ((b'1'*8, m64), False, 'loadBefore', (b'1'*8, m64)))
# Note load_before uses the oid as the message id.
self.respond((b'1'*8, m64), (b'data', b'a'*8, None))
self.assertEqual(loaded.result(), (b'data', b'a'*8, None)) self.assertEqual(loaded.result(), (b'data', b'a'*8, None))
# If we make another request, it will be satisfied from the cache: # If we make another request, it will be satisfied from the cache:
...@@ -206,9 +210,16 @@ class ClientTests(Base, setupstack.TestCase, ClientRunner): ...@@ -206,9 +210,16 @@ class ClientTests(Base, setupstack.TestCase, ClientRunner):
# Now, if we try to load current again, we'll make a server request. # Now, if we try to load current again, we'll make a server request.
loaded = self.load_before(b'1'*8, m64) loaded = self.load_before(b'1'*8, m64)
self.assertEqual(self.pop(), (6, False, 'loadBefore', (b'1'*8, m64)))
self.respond(6, (b'data2', b'b'*8, None)) # Note that if we make another request for the same object,
# the requests will be collapsed:
loaded2 = self.load_before(b'1'*8, m64)
self.assertEqual(self.pop(),
((b'1'*8, m64), False, 'loadBefore', (b'1'*8, m64)))
self.respond((b'1'*8, m64), (b'data2', b'b'*8, None))
self.assertEqual(loaded.result(), (b'data2', b'b'*8, None)) self.assertEqual(loaded.result(), (b'data2', b'b'*8, None))
self.assertEqual(loaded2.result(), (b'data2', b'b'*8, None))
# Loading non-current data may also be satisfied from cache # Loading non-current data may also be satisfied from cache
loaded = self.load_before(b'1'*8, b'b'*8) loaded = self.load_before(b'1'*8, b'b'*8)
...@@ -219,9 +230,10 @@ class ClientTests(Base, setupstack.TestCase, ClientRunner): ...@@ -219,9 +230,10 @@ class ClientTests(Base, setupstack.TestCase, ClientRunner):
self.assertFalse(transport.data) self.assertFalse(transport.data)
loaded = self.load_before(b'1'*8, b'_'*8) loaded = self.load_before(b'1'*8, b'_'*8)
self.assertEqual(self.pop(), self.assertEqual(
(7, False, 'loadBefore', (b'1'*8, b'_'*8))) self.pop(),
self.respond(7, (b'data0', b'^'*8, b'_'*8)) ((b'1'*8, b'_'*8), False, 'loadBefore', (b'1'*8, b'_'*8)))
self.respond((b'1'*8, b'_'*8), (b'data0', b'^'*8, b'_'*8))
self.assertEqual(loaded.result(), (b'data0', b'^'*8, b'_'*8)) self.assertEqual(loaded.result(), (b'data0', b'^'*8, b'_'*8))
# When committing transactions, we need to update the cache # When committing transactions, we need to update the cache
...@@ -244,8 +256,8 @@ class ClientTests(Base, setupstack.TestCase, ClientRunner): ...@@ -244,8 +256,8 @@ class ClientTests(Base, setupstack.TestCase, ClientRunner):
cache.load(b'4'*8)) cache.load(b'4'*8))
self.assertEqual(cache.load(b'1'*8), (b'data2', b'b'*8)) self.assertEqual(cache.load(b'1'*8), (b'data2', b'b'*8))
self.assertEqual(self.pop(), self.assertEqual(self.pop(),
(8, False, 'tpc_finish', (b'd'*8,))) (5, False, 'tpc_finish', (b'd'*8,)))
self.respond(8, b'e'*8) self.respond(5, b'e'*8)
self.assertEqual(committed.result(), b'e'*8) self.assertEqual(committed.result(), b'e'*8)
self.assertEqual(cache.load(b'1'*8), None) self.assertEqual(cache.load(b'1'*8), None)
self.assertEqual(cache.load(b'2'*8), ('committed 2', b'e'*8)) self.assertEqual(cache.load(b'2'*8), ('committed 2', b'e'*8))
...@@ -257,8 +269,9 @@ class ClientTests(Base, setupstack.TestCase, ClientRunner): ...@@ -257,8 +269,9 @@ class ClientTests(Base, setupstack.TestCase, ClientRunner):
loaded = self.load_before(b'1'*8, m64) loaded = self.load_before(b'1'*8, m64)
f1 = self.call('foo', 1, 2) f1 = self.call('foo', 1, 2)
self.assertFalse(loaded.done() or f1.done()) self.assertFalse(loaded.done() or f1.done())
self.assertEqual(self.pop(), [(9, False, 'loadBefore', (b'1'*8, m64)), self.assertEqual(self.pop(),
(10, False, 'foo', (1, 2))], [((b'1'*8, m64), False, 'loadBefore', (b'1'*8, m64)),
(6, False, 'foo', (1, 2))],
) )
exc = TypeError(43) exc = TypeError(43)
...@@ -720,7 +733,9 @@ class MemoryCache(object): ...@@ -720,7 +733,9 @@ class MemoryCache(object):
def store(self, oid, start_tid, end_tid, data): def store(self, oid, start_tid, end_tid, data):
assert start_tid is not None assert start_tid is not None
revisions = self.data[oid] revisions = self.data[oid]
revisions.append((start_tid, end_tid, data)) data = (start_tid, end_tid, data)
if not revisions or data != revisions[-1]:
revisions.append(data)
revisions.sort() revisions.sort()
def loadBefore(self, oid, tid): def loadBefore(self, oid, tid):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment