Commit c087f634 authored by Jim Fulton's avatar Jim Fulton Committed by GitHub

Merge pull request #63 from zopefoundation/server-sync

Server sync
parents 7eedfd75 fd8e334f
Changelog Changelog
========= =========
- Added a ``ClientStorage`` ``server-sync`` configuration option and
``server_sync`` constructor argument to force a server round trip at
the beginning of transactions to wait for any outstanding
invalidations at the start of the transaction to be delivered.
- The ZEO server register method now returns the storage last - The ZEO server register method now returns the storage last
transaction, allowing the client to avoid an extra round trip during transaction, allowing the client to avoid an extra round trip during
cache verification. cache verification.
......
...@@ -454,6 +454,23 @@ read_only_fallback ...@@ -454,6 +454,23 @@ read_only_fallback
If ``read_only_fallback`` is set, then ``read_only`` is ignored. If ``read_only_fallback`` is set, then ``read_only`` is ignored.
server_sync
Flag, false by default, indicating whether the ``sync`` method
should make a server request. The ``sync`` method is called at the
start of explcitly begin transactions. Making a server requests assures
that any invalidations outstanding at the beginning of a
transaction are processed.
Setting this to True is important when application activity is
spread over multiple ZEO clients. The classic example of this is
when a web browser makes a request to an application server (ZEO
client) that makes a change and then makes a request to another
application server that depends on the change.
Setting this to True makes transactions a little slower because of
the added server round trip. For transactions that don't otherwise
need to access the storage server, the impact can be significant.
wait_timeout wait_timeout
How long to wait for an initial connection, defaulting to 30 How long to wait for an initial connection, defaulting to 30
seconds. If an initial connection can't be made within this time seconds. If an initial connection can't be made within this time
...@@ -565,6 +582,9 @@ read-only-fallback ...@@ -565,6 +582,9 @@ read-only-fallback
If ``read_only_fallback`` is set, then ``read_only`` is ignored. If ``read_only_fallback`` is set, then ``read_only`` is ignored.
server-sync
Sets thr ``server_sync`` option described above.
wait_timeout wait_timeout
How long to wait for an initial connection, defaulting to 30 How long to wait for an initial connection, defaulting to 30
seconds. If an initial connection can't be made within this time seconds. If an initial connection can't be made within this time
......
...@@ -95,8 +95,10 @@ class ClientStorage(ZODB.ConflictResolution.ConflictResolvingStorage): ...@@ -95,8 +95,10 @@ class ClientStorage(ZODB.ConflictResolution.ConflictResolvingStorage):
min_disconnect_poll=1, max_disconnect_poll=None, min_disconnect_poll=1, max_disconnect_poll=None,
wait=True, wait=True,
drop_cache_rather_verify=True, drop_cache_rather_verify=True,
username=None, password=None, realm=None,
credentials=None, credentials=None,
server_sync=False,
# The ZODB-define ZConfig support may ball these:
username=None, password=None, realm=None,
# For tests: # For tests:
_client_factory=ZEO.asyncio.client.ClientThread, _client_factory=ZEO.asyncio.client.ClientThread,
): ):
...@@ -181,6 +183,8 @@ class ClientStorage(ZODB.ConflictResolution.ConflictResolvingStorage): ...@@ -181,6 +183,8 @@ class ClientStorage(ZODB.ConflictResolution.ConflictResolvingStorage):
""" """
assert not username or password or realm
if isinstance(addr, int): if isinstance(addr, int):
addr = ('127.0.0.1', addr) addr = ('127.0.0.1', addr)
...@@ -254,6 +258,8 @@ class ClientStorage(ZODB.ConflictResolution.ConflictResolvingStorage): ...@@ -254,6 +258,8 @@ class ClientStorage(ZODB.ConflictResolution.ConflictResolvingStorage):
blob_cache_size * blob_cache_size_check // 100) blob_cache_size * blob_cache_size_check // 100)
self._check_blob_size() self._check_blob_size()
self.server_sync = server_sync
self._server = _client_factory( self._server = _client_factory(
addr, self, cache, storage, addr, self, cache, storage,
ZEO.asyncio.client.Fallback if read_only_fallback else read_only, ZEO.asyncio.client.Fallback if read_only_fallback else read_only,
...@@ -377,6 +383,14 @@ class ClientStorage(ZODB.ConflictResolution.ConflictResolvingStorage): ...@@ -377,6 +383,14 @@ class ClientStorage(ZODB.ConflictResolution.ConflictResolvingStorage):
'interfaces', ()): 'interfaces', ()):
zope.interface.alsoProvides(self, iface) zope.interface.alsoProvides(self, iface)
if self.protocol_version >= b'Z5':
self.ping = lambda : self._call('ping')
else:
self.ping = lambda : self._call('lastTransaction')
if self.server_sync:
self.sync = self.ping
def set_server_addr(self, addr): def set_server_addr(self, addr):
# Normalize server address and convert to string # Normalize server address and convert to string
if isinstance(addr, str): if isinstance(addr, str):
......
...@@ -73,7 +73,7 @@ registered_methods = set(( 'get_info', 'lastTransaction', ...@@ -73,7 +73,7 @@ registered_methods = set(( 'get_info', 'lastTransaction',
'history', 'record_iternext', 'sendBlob', 'getTid', 'loadSerial', 'history', 'record_iternext', 'sendBlob', 'getTid', 'loadSerial',
'new_oid', 'undoa', 'undoLog', 'undoInfo', 'iterator_start', 'new_oid', 'undoa', 'undoLog', 'undoInfo', 'iterator_start',
'iterator_next', 'iterator_record_start', 'iterator_record_next', 'iterator_next', 'iterator_record_start', 'iterator_record_next',
'iterator_gc', 'server_status', 'set_client_label')) 'iterator_gc', 'server_status', 'set_client_label', 'ping'))
class ZEOStorage: class ZEOStorage:
"""Proxy to underlying storage for a single remote client.""" """Proxy to underlying storage for a single remote client."""
...@@ -616,6 +616,9 @@ class ZEOStorage: ...@@ -616,6 +616,9 @@ class ZEOStorage:
def ruok(self): def ruok(self):
return self.server.ruok() return self.server.ruok()
def ping(self):
pass
class StorageServerDB: class StorageServerDB:
"""Adapter from StorageServerDB to ZODB.interfaces.IStorageWrapper """Adapter from StorageServerDB to ZODB.interfaces.IStorageWrapper
...@@ -951,7 +954,6 @@ class StorageServer: ...@@ -951,7 +954,6 @@ class StorageServer:
return dict((storage_id, self.server_status(storage_id)) return dict((storage_id, self.server_status(storage_id))
for storage_id in self.storages) for storage_id in self.storages)
class StubTimeoutThread: class StubTimeoutThread:
def begin(self, client): def begin(self, client):
......
...@@ -120,6 +120,15 @@ ...@@ -120,6 +120,15 @@
</description> </description>
</key> </key>
<key name="server-sync" datatype="boolean" default="off">
<description>
A flag indicating whether calls to sync() should make a server
request, thus causing the storage to wait for any outstanding
invalidations. The sync method is called when transactions are
explicitly begun.
</description>
</key>
<key name="wait-timeout" datatype="integer" default="30"> <key name="wait-timeout" datatype="integer" default="30">
<description> <description>
How long to wait for an initial connection, defaulting to 30 How long to wait for an initial connection, defaulting to 30
......
...@@ -769,7 +769,7 @@ class ReconnectionTests(CommonSetupTearDown): ...@@ -769,7 +769,7 @@ class ReconnectionTests(CommonSetupTearDown):
# Accesses should fail now # Accesses should fail now
with short_timeout(self): with short_timeout(self):
self.assertRaises(ClientDisconnected, self._storage.history, ZERO) self.assertRaises(ClientDisconnected, self._storage.ping)
# Restart the server, this time read-write # Restart the server, this time read-write
self.startServer(create=0, keep=0) self.startServer(create=0, keep=0)
......
...@@ -62,6 +62,7 @@ class ZEOConfigTestBase(setupstack.TestCase): ...@@ -62,6 +62,7 @@ class ZEOConfigTestBase(setupstack.TestCase):
blob_cache_size_check=10, blob_cache_size_check=10,
read_only=False, read_only=False,
read_only_fallback=False, read_only_fallback=False,
server_sync=False,
wait_timeout=30, wait_timeout=30,
client_label=None, client_label=None,
storage='1', storage='1',
...@@ -106,6 +107,7 @@ class ZEOConfigTest(ZEOConfigTestBase): ...@@ -106,6 +107,7 @@ class ZEOConfigTest(ZEOConfigTestBase):
blob_cache_size=424242, blob_cache_size=424242,
read_only=True, read_only=True,
read_only_fallback=True, read_only_fallback=True,
server_sync=True,
wait_timeout=33, wait_timeout=33,
client_label='test_client', client_label='test_client',
name='Test' name='Test'
......
import unittest
from zope.testing import setupstack
from .. import server, client
from . import forker
if forker.ZEO4_SERVER:
server_ping_method = 'lastTransaction'
server_zss = 'connections'
else:
server_ping_method = 'ping'
server_zss = 'zeo_storages_by_storage_id'
class SyncTests(setupstack.TestCase):
def instrument(self):
self.__ping_calls = 0
server = getattr(forker, self.__name + '_server')
[zs] = getattr(server.server, server_zss)['1']
orig_ping = getattr(zs, server_ping_method)
def ping():
self.__ping_calls += 1
return orig_ping()
setattr(zs, server_ping_method, ping)
def test_server_sync(self):
self.__name = 's%s' % id(self)
addr, stop = server(name=self.__name)
# By default the client sync method is a noop:
c = client(addr)
self.instrument()
c.sync()
self.assertEqual(self.__ping_calls, 0)
c.close()
# But if we pass server_sync:
c = client(addr, server_sync=True)
self.instrument()
c.sync()
self.assertEqual(self.__ping_calls, 1)
c.close()
stop()
...@@ -85,5 +85,6 @@ class ClientStorageConfig: ...@@ -85,5 +85,6 @@ class ClientStorageConfig:
name=config.name, name=config.name,
read_only=config.read_only, read_only=config.read_only,
read_only_fallback=config.read_only_fallback, read_only_fallback=config.read_only_fallback,
server_sync = config.server_sync,
wait_timeout=config.wait_timeout, wait_timeout=config.wait_timeout,
**options) **options)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment