Commit dcbeab41 authored by Jim Fulton's avatar Jim Fulton

Merge remote-tracking branch 'origin/master' into asyncio

Conflicts:
	.travis.yml
	setup.py
	src/ZEO/ClientStorage.py
	src/ZEO/cache.py
	src/ZEO/tests/testZEO.py
	src/ZEO/zrpc/client.py
	src/ZEO/zrpc/server.py

Also, removed load from the asyncion client implementation, since it
isn't used anymore.
parents da4d4ce1 a7a5fab7
language: python language: python
sudo: false sudo: false
python: matrix:
- 3.4 include:
- 3.5 - os: linux
python: 3.4
- os: linux
python: 3.5
- os: osx
language: generic
env: TERRYFY_PYTHON='macpython 3.4'
- os: osx
language: generic
env: TERRYFY_PYTHON='homebrew 3'
before_install:
- if [[ "$TRAVIS_OS_NAME" == "osx" ]]; then git clone https://github.com/MacPython/terryfy; fi
- if [[ "$TRAVIS_OS_NAME" == "osx" ]]; then source terryfy/travis_tools.sh; fi
- if [[ "$TRAVIS_OS_NAME" == "osx" ]]; then get_python_environment $TERRYFY_PYTHON venv; fi
- if [[ "$TERRYFY_PYTHON" == "homebrew 3" ]]; then alias pip=`which pip3` ; fi
install: install:
- pip install -U setuptools - pip install -U setuptools
- python bootstrap.py - python bootstrap.py
......
Changelog Changelog
========= =========
4.2.0 (unreleased) 4.2.0 (2016-06-15)
------------------ ------------------
- Changed loadBefore to operate more like load behaved, especially
with regard to the load lock. This allowes ZEO to work with the
upcoming ZODB 5, which used loadbefore rather than load.
Reimplemented load using loadBefore, thus testing loadBefore
extensively via existing tests.
- Other changes to work with ZODB 5 (as well as ZODB 4)
- Fixed: the ZEO cache loadBefore method failed to utilize current data.
- Drop support for Python 2.6 and 3.2. - Drop support for Python 2.6 and 3.2.
4.2.0b1 (2015-06-05) 4.2.0b1 (2015-06-05)
......
...@@ -11,9 +11,8 @@ ...@@ -11,9 +11,8 @@
# FOR A PARTICULAR PURPOSE. # FOR A PARTICULAR PURPOSE.
# #
############################################################################## ##############################################################################
"""Setup
"""
version = '5.0.0a0' version = '5.0.0a0'
from setuptools import setup, find_packages from setuptools import setup, find_packages
import os import os
import sys import sys
......
...@@ -50,6 +50,9 @@ import ZEO.cache ...@@ -50,6 +50,9 @@ import ZEO.cache
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# max signed 64-bit value ~ infinity :) Signed cuz LBTree and TimeStamp
m64 = b'\x7f\xff\xff\xff\xff\xff\xff\xff'
try: try:
from ZODB.ConflictResolution import ResolvedSerial from ZODB.ConflictResolution import ResolvedSerial
except ImportError: except ImportError:
...@@ -499,7 +502,10 @@ class ClientStorage(object): ...@@ -499,7 +502,10 @@ class ClientStorage(object):
return self._call('loadSerial', oid, serial) return self._call('loadSerial', oid, serial)
def load(self, oid, version=''): def load(self, oid, version=''):
return self._server.load(oid) result = self.loadBefore(oid, m64)
if result is None:
raise POSException.POSKeyError(oid)
return result[:2]
def loadBefore(self, oid, tid): def loadBefore(self, oid, tid):
return self._server.load_before(oid, tid) return self._server.load_before(oid, tid)
...@@ -778,7 +784,8 @@ class ClientStorage(object): ...@@ -778,7 +784,8 @@ class ClientStorage(object):
self._commit_lock.release() self._commit_lock.release()
def lastTransaction(self): def lastTransaction(self):
return self._cache.getLastTid() with self._lock:
return self._cache.getLastTid()
def tpc_abort(self, txn, timeout=None): def tpc_abort(self, txn, timeout=None):
"""Storage API: abort a transaction. """Storage API: abort a transaction.
...@@ -1081,8 +1088,8 @@ def _check_blob_cache_size(blob_dir, target): ...@@ -1081,8 +1088,8 @@ def _check_blob_cache_size(blob_dir, target):
logger = logging.getLogger(__name__+'.check_blob_cache') logger = logging.getLogger(__name__+'.check_blob_cache')
layout = open(os.path.join(blob_dir, ZODB.blob.LAYOUT_MARKER) with open(os.path.join(blob_dir, ZODB.blob.LAYOUT_MARKER)) as layout_file:
).read().strip() layout = layout_file.read().strip()
if not layout == 'zeocache': if not layout == 'zeocache':
logger.critical("Invalid blob directory layout %s", layout) logger.critical("Invalid blob directory layout %s", layout)
raise ValueError("Invalid blob directory layout", layout) raise ValueError("Invalid blob directory layout", layout)
......
...@@ -478,7 +478,7 @@ class ClientCache(object): ...@@ -478,7 +478,7 @@ class ClientCache(object):
# @return (data record, serial number, tid), or None if the object is not # @return (data record, serial number, tid), or None if the object is not
# in the cache # in the cache
# @defreturn 3-tuple: (string, string, string) # @defreturn 3-tuple: (string, string, string)
def load(self, oid): def load(self, oid, before_tid=None):
ofs = self.current.get(oid) ofs = self.current.get(oid)
if ofs is None: if ofs is None:
self._trace(0x20, oid) self._trace(0x20, oid)
...@@ -493,6 +493,9 @@ class ClientCache(object): ...@@ -493,6 +493,9 @@ class ClientCache(object):
assert end_tid == z64, (ofs, self.f.tell(), oid, tid, end_tid) assert end_tid == z64, (ofs, self.f.tell(), oid, tid, end_tid)
assert lver == 0, "Versions aren't supported" assert lver == 0, "Versions aren't supported"
if before_tid and tid >= before_tid:
return None
data = read(ldata) data = read(ldata)
assert len(data) == ldata, (ofs, self.f.tell(), oid, len(data), ldata) assert len(data) == ldata, (ofs, self.f.tell(), oid, len(data), ldata)
...@@ -532,13 +535,22 @@ class ClientCache(object): ...@@ -532,13 +535,22 @@ class ClientCache(object):
def loadBefore(self, oid, before_tid): def loadBefore(self, oid, before_tid):
noncurrent_for_oid = self.noncurrent.get(u64(oid)) noncurrent_for_oid = self.noncurrent.get(u64(oid))
if noncurrent_for_oid is None: if noncurrent_for_oid is None:
self._trace(0x24, oid, "", before_tid) result = self.load(oid, before_tid)
return None if result:
return result[0], result[1], None
else:
self._trace(0x24, oid, "", before_tid)
return result
items = noncurrent_for_oid.items(None, u64(before_tid)-1) items = noncurrent_for_oid.items(None, u64(before_tid)-1)
if not items: if not items:
self._trace(0x24, oid, "", before_tid) result = self.load(oid, before_tid)
return None if result:
return result[0], result[1], None
else:
self._trace(0x24, oid, "", before_tid)
return result
tid, ofs = items[-1] tid, ofs = items[-1]
self.f.seek(ofs) self.f.seek(ofs)
...@@ -559,8 +571,12 @@ class ClientCache(object): ...@@ -559,8 +571,12 @@ class ClientCache(object):
assert read(8) == oid, (ofs, self.f.tell(), oid) assert read(8) == oid, (ofs, self.f.tell(), oid)
if end_tid < before_tid: if end_tid < before_tid:
self._trace(0x24, oid, "", before_tid) result = self.load(oid, before_tid)
return None if result:
return result[0], result[1], None
else:
self._trace(0x24, oid, "", before_tid)
return result
self._n_accesses += 1 self._n_accesses += 1
self._trace(0x26, oid, "", saved_tid) self._trace(0x26, oid, "", saved_tid)
......
...@@ -598,6 +598,10 @@ class InvqTests(CommonSetupTearDown): ...@@ -598,6 +598,10 @@ class InvqTests(CommonSetupTearDown):
revid2 = self._dostore(oid2) revid2 = self._dostore(oid2)
revid2 = self._dostore(oid2, revid2) revid2 = self._dostore(oid2, revid2)
forker.wait_until(
lambda :
perstorage.lastTransaction() == self._storage.lastTransaction())
perstorage.load(oid, '') perstorage.load(oid, '')
perstorage.close() perstorage.close()
...@@ -606,12 +610,6 @@ class InvqTests(CommonSetupTearDown): ...@@ -606,12 +610,6 @@ class InvqTests(CommonSetupTearDown):
revid = self._dostore(oid, revid) revid = self._dostore(oid, revid)
perstorage = self.openClientStorage(cache="test") perstorage = self.openClientStorage(cache="test")
forker.wait_until(
func=(lambda : perstorage.verify_result == "quick verification"),
timeout=60,
label="perstorage.verify_result to be quick verification")
self.assertEqual(perstorage.verify_result, "quick verification") self.assertEqual(perstorage.verify_result, "quick verification")
self.assertEqual(perstorage.load(oid, ''), self.assertEqual(perstorage.load(oid, ''),
......
...@@ -176,8 +176,9 @@ Start a server: ...@@ -176,8 +176,9 @@ Start a server:
Open a client storage to it and commit a some transactions: Open a client storage to it and commit a some transactions:
>>> import ZEO, transaction >>> import ZEO, ZODB, transaction
>>> db = ZEO.DB(addr) >>> client = ZEO.client(addr)
>>> db = ZODB.DB(client)
>>> conn = db.open() >>> conn = db.open()
>>> for i in range(10): >>> for i in range(10):
... conn.root().i = i ... conn.root().i = i
...@@ -185,19 +186,19 @@ Open a client storage to it and commit a some transactions: ...@@ -185,19 +186,19 @@ Open a client storage to it and commit a some transactions:
Create an iterator: Create an iterator:
>>> it = conn._storage.iterator() >>> it = client.iterator()
>>> tid1 = it.next().tid >>> tid1 = it.next().tid
Restart the storage: Restart the storage:
>>> stop_server(adminaddr) >>> stop_server(adminaddr)
>>> wait_disconnected(conn._storage) >>> wait_disconnected(client)
>>> _ = start_server('<filestorage>\npath fs\n</filestorage>', addr=addr) >>> _ = start_server('<filestorage>\npath fs\n</filestorage>', addr=addr)
>>> wait_connected(conn._storage) >>> wait_connected(client)
Now, we'll create a second iterator: Now, we'll create a second iterator:
>>> it2 = conn._storage.iterator() >>> it2 = client.iterator()
If we try to advance the first iterator, we should get an error: If we try to advance the first iterator, we should get an error:
......
...@@ -162,10 +162,10 @@ def start_zeo_server(storage_conf=None, zeo_conf=None, port=None, keep=False, ...@@ -162,10 +162,10 @@ def start_zeo_server(storage_conf=None, zeo_conf=None, port=None, keep=False,
else: else:
pid = subprocess.Popen(args, env=d, close_fds=True).pid pid = subprocess.Popen(args, env=d, close_fds=True).pid
# We need to wait until the server starts, but not forever. # We need to wait until the server starts, but not forever. 150
# 30 seconds is a somewhat arbitrary upper bound. A BDBStorage # seconds is a somewhat arbitrary upper bound, but probably helps
# takes a long time to open -- more than 10 seconds on occasion. # in an address already in use situation.
for i in range(300): for i in range(1500):
time.sleep(0.1) time.sleep(0.1)
try: try:
if isinstance(adminaddr, str) and not os.path.exists(adminaddr): if isinstance(adminaddr, str) and not os.path.exists(adminaddr):
......
...@@ -1058,9 +1058,9 @@ def client_asyncore_thread_has_name(): ...@@ -1058,9 +1058,9 @@ def client_asyncore_thread_has_name():
""" """
>>> addr, _ = start_server() >>> addr, _ = start_server()
>>> db = ZEO.DB(addr) >>> db = ZEO.DB(addr)
>>> len([t for t in threading.enumerate() >>> any(t for t in threading.enumerate()
... if ' zeo client networking thread' in t.getName()]) ... if ' zeo client networking thread' in t.getName())
1 True
>>> db.close() >>> db.close()
""" """
...@@ -1299,9 +1299,9 @@ But, if we abort, we'll get up to date data and we'll see the changes. ...@@ -1299,9 +1299,9 @@ But, if we abort, we'll get up to date data and we'll see the changes.
>>> sorted(conn2.root.x.items()) >>> sorted(conn2.root.x.items())
[('x', 1), ('y', 1)] [('x', 1), ('y', 1)]
>>> conn2.close()
>>> cs.close() >>> cs.close()
>>> conn1.close() >>> conn1.close()
""" """
...@@ -1392,7 +1392,8 @@ def gracefully_handle_abort_while_storing_many_blobs(): ...@@ -1392,7 +1392,8 @@ def gracefully_handle_abort_while_storing_many_blobs():
>>> logging.getLogger().addHandler(handler) >>> logging.getLogger().addHandler(handler)
>>> addr, _ = start_server(blob_dir='blobs') >>> addr, _ = start_server(blob_dir='blobs')
>>> c = ZEO.connection(addr, blob_dir='cblobs') >>> client = ZEO.client(addr, blob_dir='cblobs')
>>> c = ZODB.connection(client)
>>> c.root.x = ZODB.blob.Blob(b'z'*(1<<20)) >>> c.root.x = ZODB.blob.Blob(b'z'*(1<<20))
>>> c.root.y = ZODB.blob.Blob(b'z'*(1<<2)) >>> c.root.y = ZODB.blob.Blob(b'z'*(1<<2))
>>> t = c.transaction_manager.get() >>> t = c.transaction_manager.get()
...@@ -1409,7 +1410,7 @@ Now we'll try to use the connection, mainly to wait for everything to ...@@ -1409,7 +1410,7 @@ Now we'll try to use the connection, mainly to wait for everything to
get processed. Before we fixed this by making tpc_finish a synchronous get processed. Before we fixed this by making tpc_finish a synchronous
call to the server. we'd get some sort of error here. call to the server. we'd get some sort of error here.
>>> _ = c._storage._call('loadEx', b'\0'*8) >>> _ = client._call('loadEx', b'\0'*8)
>>> c.close() >>> c.close()
......
...@@ -314,7 +314,9 @@ class CacheTests(ZODB.tests.util.TestCase): ...@@ -314,7 +314,9 @@ class CacheTests(ZODB.tests.util.TestCase):
# We use large-2 for the same reason we used small-1 above. # We use large-2 for the same reason we used small-1 above.
expected_len = large-2 expected_len = large-2
self.assertEquals(len(cache), expected_len) self.assertEquals(len(cache), expected_len)
expected_oids = set(list(range(11, 50))+list(range(106, 110))+list(range(200, 305))) expected_oids = set(list(range(11, 50)) +
list(range(106, 110)) +
list(range(200, 305)))
self.assertEquals(set(u64(oid) for (oid, tid) in cache.contents()), self.assertEquals(set(u64(oid) for (oid, tid) in cache.contents()),
expected_oids) expected_oids)
...@@ -336,6 +338,21 @@ class CacheTests(ZODB.tests.util.TestCase): ...@@ -336,6 +338,21 @@ class CacheTests(ZODB.tests.util.TestCase):
self.cache.setLastTid(p64(3)) self.cache.setLastTid(p64(3))
self.cache.setLastTid(p64(4)) self.cache.setLastTid(p64(4))
def test_loadBefore_doesnt_miss_current(self):
# Make sure that loadBefore get's current data if there
# isn't non-current data
cache = self.cache
oid = n1
cache.store(oid, n1, None, b'first')
self.assertEqual(cache.loadBefore(oid, n1), None)
self.assertEqual(cache.loadBefore(oid, n2), (b'first', n1, None))
self.cache.invalidate(oid, n2)
cache.store(oid, n2, None, b'second')
self.assertEqual(cache.loadBefore(oid, n1), None)
self.assertEqual(cache.loadBefore(oid, n2), (b'first', n1, n2))
self.assertEqual(cache.loadBefore(oid, n3), (b'second', n2, None))
def kill_does_not_cause_cache_corruption(): def kill_does_not_cause_cache_corruption():
r""" r"""
......
...@@ -242,19 +242,24 @@ class Connection(smac.SizedMessageAsyncConnection, object): ...@@ -242,19 +242,24 @@ class Connection(smac.SizedMessageAsyncConnection, object):
# Undone oid info returned by vote. # Undone oid info returned by vote.
# #
# Z3101 -- checkCurrentSerialInTransaction # Z3101 -- checkCurrentSerialInTransaction
#
# Z4 -- checkCurrentSerialInTransaction
# No-longer call load.
# Protocol variables: # Protocol variables:
# Our preferred protocol. # Our preferred protocol.
current_protocol = b"Z3101" current_protocol = b"Z4"
# If we're a client, an exhaustive list of the server protocols we # If we're a client, an exhaustive list of the server protocols we
# can accept. # can accept.
servers_we_can_talk_to = [b"Z308", b"Z309", b"Z310", current_protocol] servers_we_can_talk_to = [b"Z308", b"Z309", b"Z310", b"Z3101",
current_protocol]
# If we're a server, an exhaustive list of the client protocols we # If we're a server, an exhaustive list of the client protocols we
# can accept. # can accept.
clients_we_can_talk_to = [ clients_we_can_talk_to = [
b"Z200", b"Z201", b"Z303", b"Z308", b"Z309", b"Z310", current_protocol] b"Z200", b"Z201", b"Z303", b"Z308", b"Z309", b"Z310", b"Z3101",
current_protocol]
# This is pretty excruciating. Details: # This is pretty excruciating. Details:
# #
......
...@@ -81,6 +81,7 @@ class Dispatcher(asyncore.dispatcher): ...@@ -81,6 +81,7 @@ class Dispatcher(asyncore.dispatcher):
time.sleep(5) time.sleep(5)
else: else:
break break
self.listen(5) self.listen(5)
def writable(self): def writable(self):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment