Commit 3a8f6f03 authored by Julien Muchembled's avatar Julien Muchembled

Drop support for ZODB3

parent 414573b9
graft tools graft tools
include neo.conf CHANGELOG.rst TODO ZODB3.patch include neo.conf CHANGELOG.rst TODO
...@@ -52,7 +52,7 @@ Requirements ...@@ -52,7 +52,7 @@ Requirements
- MySQLdb: https://github.com/PyMySQL/mysqlclient-python - MySQLdb: https://github.com/PyMySQL/mysqlclient-python
- For client nodes: ZODB 3.10.x or later - For client nodes: ZODB 4.4.5 or later
Installation Installation
============ ============
......
Patch to ZODB 3.10.5 for ZODB unit tests.
See also monkey-patch to Connection.tpc_finish in neo/client/__init__.py
diff --git a/src/ZODB/interfaces.py b/src/ZODB/interfaces.py
index 44ded35..db5c525 100644
--- a/src/ZODB/interfaces.py
+++ b/src/ZODB/interfaces.py
@@ -776,6 +776,10 @@ def tpc_finish(transaction, func = lambda tid: None):
called while the storage transaction lock is held. It takes
the new transaction id generated by the transaction.
+ The return value can be either None or a serial giving new
+ serial for objects whose ids were passed to previous store calls
+ in the same transaction, and for which no serial was returned
+ from either store or tpc_vote for objects passed to store.
"""
def tpc_vote(transaction):
@@ -794,8 +798,6 @@ def tpc_vote(transaction):
The return value can be either None or a sequence of object-id
and serial pairs giving new serials for objects who's ids were
passed to previous store calls in the same transaction.
- After the tpc_vote call, new serials must have been returned,
- either from tpc_vote or store for objects passed to store.
A serial returned in a sequence of oid/serial pairs, may be
the special value ZODB.ConflictResolution.ResolvedSerial to
diff --git a/src/ZODB/tests/BasicStorage.py b/src/ZODB/tests/BasicStorage.py
index 8e72272..61bc801 100644
--- a/src/ZODB/tests/BasicStorage.py
+++ b/src/ZODB/tests/BasicStorage.py
@@ -72,8 +72,10 @@ def checkSerialIsNoneForInitialRevision(self):
r1 = self._storage.store(oid, None, zodb_pickle(MinPO(11)),
'', txn)
r2 = self._storage.tpc_vote(txn)
- self._storage.tpc_finish(txn)
+ serial = self._storage.tpc_finish(txn)
newrevid = handle_serials(oid, r1, r2)
+ if newrevid is None and serial is not None:
+ newrevid = serial
data, revid = self._storage.load(oid, '')
value = zodb_unpickle(data)
eq(value, MinPO(11))
diff --git a/src/ZODB/tests/MTStorage.py b/src/ZODB/tests/MTStorage.py
index e01c802..b57de28 100644
--- a/src/ZODB/tests/MTStorage.py
+++ b/src/ZODB/tests/MTStorage.py
@@ -155,10 +155,12 @@ def dostore(self, i):
r2 = self.storage.tpc_vote(t)
self.pause()
- self.storage.tpc_finish(t)
+ serial = self.storage.tpc_finish(t)
self.pause()
revid = handle_serials(oid, r1, r2)
+ if serial is not None and revid is None:
+ revid = serial
self.oids[oid] = revid
class ExtStorageClientThread(StorageClientThread):
diff --git a/src/ZODB/tests/RevisionStorage.py b/src/ZODB/tests/RevisionStorage.py
index 9113757..ddd2dde 100644
--- a/src/ZODB/tests/RevisionStorage.py
+++ b/src/ZODB/tests/RevisionStorage.py
@@ -150,10 +150,12 @@ def helper(tid, revid, x):
# Finish the transaction
r2 = self._storage.tpc_vote(t)
newrevid = handle_serials(oid, r1, r2)
- self._storage.tpc_finish(t)
+ serial = self._storage.tpc_finish(t)
except:
self._storage.tpc_abort(t)
raise
+ if serial is not None and newrevid is None:
+ newrevid = serial
return newrevid
revid1 = helper(1, None, 1)
revid2 = helper(2, revid1, 2)
diff --git a/src/ZODB/tests/StorageTestBase.py b/src/ZODB/tests/StorageTestBase.py
index 9737ec4..43f29ed 100644
--- a/src/ZODB/tests/StorageTestBase.py
+++ b/src/ZODB/tests/StorageTestBase.py
@@ -134,7 +134,7 @@ def handle_serials(oid, *args):
A helper for function _handle_all_serials().
"""
- return handle_all_serials(oid, *args)[oid]
+ return handle_all_serials(oid, *args).get(oid)
def import_helper(name):
__import__(name)
@@ -191,7 +191,9 @@ def _dostore(self, oid=None, revid=None, data=None,
# Finish the transaction
r2 = self._storage.tpc_vote(t)
revid = handle_serials(oid, r1, r2)
- self._storage.tpc_finish(t)
+ serial = self._storage.tpc_finish(t)
+ if serial is not None and revid is None:
+ revid = serial
except:
self._storage.tpc_abort(t)
raise
@@ -211,8 +213,8 @@ def _undo(self, tid, expected_oids=None, note=None):
self._storage.tpc_begin(t)
undo_result = self._storage.undo(tid, t)
vote_result = self._storage.tpc_vote(t)
- self._storage.tpc_finish(t)
- if expected_oids is not None:
+ serial = self._storage.tpc_finish(t)
+ if expected_oids is not None and serial is None:
oids = undo_result and undo_result[1] or []
oids.extend(oid for (oid, _) in vote_result or ())
self.assertEqual(len(oids), len(expected_oids), repr(oids))
diff --git a/src/ZODB/tests/TransactionalUndoStorage.py b/src/ZODB/tests/TransactionalUndoStorage.py
index 72adac2..203736a 100644
--- a/src/ZODB/tests/TransactionalUndoStorage.py
+++ b/src/ZODB/tests/TransactionalUndoStorage.py
@@ -76,6 +76,12 @@ def _transaction_vote(self, trans):
def _transaction_newserial(self, oid):
return self.__serials[oid]
+ def _transaction_finish(self, t, oid_list):
+ tid = self._storage.tpc_finish(t)
+ if tid is not None:
+ for oid in oid_list:
+ self.__serials[oid] = tid
+
def _multi_obj_transaction(self, objs):
newrevs = {}
t = Transaction()
@@ -85,7 +91,7 @@ def _multi_obj_transaction(self, objs):
self._transaction_store(oid, rev, data, '', t)
newrevs[oid] = None
self._transaction_vote(t)
- self._storage.tpc_finish(t)
+ self._transaction_finish(t, [x[0] for x in objs])
for oid in newrevs.keys():
newrevs[oid] = self._transaction_newserial(oid)
return newrevs
@@ -218,9 +224,9 @@ def checkTwoObjectUndo(self):
self._transaction_store(oid2, revid2, p51, '', t)
# Finish the transaction
self._transaction_vote(t)
+ self._transaction_finish(t, [oid1, oid2])
revid1 = self._transaction_newserial(oid1)
revid2 = self._transaction_newserial(oid2)
- self._storage.tpc_finish(t)
eq(revid1, revid2)
# Update those same two objects
t = Transaction()
@@ -230,9 +236,9 @@ def checkTwoObjectUndo(self):
self._transaction_store(oid2, revid2, p52, '', t)
# Finish the transaction
self._transaction_vote(t)
+ self._transaction_finish(t, [oid1, oid2])
revid1 = self._transaction_newserial(oid1)
revid2 = self._transaction_newserial(oid2)
- self._storage.tpc_finish(t)
eq(revid1, revid2)
# Make sure the objects have the current value
data, revid1 = self._storage.load(oid1, '')
@@ -288,11 +294,12 @@ def checkTwoObjectUndoAtOnce(self):
tid1 = info[1]['id']
t = Transaction()
oids = self._begin_undos_vote(t, tid, tid1)
- self._storage.tpc_finish(t)
+ serial = self._storage.tpc_finish(t)
# We get the finalization stuff called an extra time:
- eq(len(oids), 4)
- unless(oid1 in oids)
- unless(oid2 in oids)
+ if serial is None:
+ eq(len(oids), 4)
+ unless(oid1 in oids)
+ unless(oid2 in oids)
data, revid1 = self._storage.load(oid1, '')
eq(zodb_unpickle(data), MinPO(30))
data, revid2 = self._storage.load(oid2, '')
@@ -326,7 +333,7 @@ def checkTwoObjectUndoAgain(self):
self._transaction_store(oid2, revid2, p52, '', t)
# Finish the transaction
self._transaction_vote(t)
- self._storage.tpc_finish(t)
+ self._transaction_finish(t, [oid1, oid2])
revid1 = self._transaction_newserial(oid1)
revid2 = self._transaction_newserial(oid2)
eq(revid1, revid2)
@@ -346,7 +353,7 @@ def checkTwoObjectUndoAgain(self):
self._transaction_store(oid2, revid2, p53, '', t)
# Finish the transaction
self._transaction_vote(t)
- self._storage.tpc_finish(t)
+ self._transaction_finish(t, [oid1, oid2])
revid1 = self._transaction_newserial(oid1)
revid2 = self._transaction_newserial(oid2)
eq(revid1, revid2)
@@ -358,10 +365,11 @@ def checkTwoObjectUndoAgain(self):
tid = info[1]['id']
t = Transaction()
oids = self._begin_undos_vote(t, tid)
- self._storage.tpc_finish(t)
- eq(len(oids), 1)
- self.failUnless(oid1 in oids)
- self.failUnless(not oid2 in oids)
+ serial = self._storage.tpc_finish(t)
+ if serial is None:
+ eq(len(oids), 1)
+ self.failUnless(oid1 in oids)
+ self.failUnless(not oid2 in oids)
data, revid1 = self._storage.load(oid1, '')
eq(zodb_unpickle(data), MinPO(33))
data, revid2 = self._storage.load(oid2, '')
@@ -397,7 +405,7 @@ def checkNotUndoable(self):
self._transaction_store(oid1, revid1, p81, '', t)
self._transaction_store(oid2, revid2, p91, '', t)
self._transaction_vote(t)
- self._storage.tpc_finish(t)
+ self._transaction_finish(t, [oid1, oid2])
revid1 = self._transaction_newserial(oid1)
revid2 = self._transaction_newserial(oid2)
eq(revid1, revid2)
@@ -683,9 +691,9 @@ def undo(i):
tid = p64(i + 1)
eq(txn.tid, tid)
- L1 = [(rec.oid, rec.tid, rec.data_txn) for rec in txn]
- L2 = [(oid, revid, None) for _tid, oid, revid in orig
- if _tid == tid]
+ L1 = {(rec.oid, rec.tid, rec.data_txn) for rec in txn}
+ L2 = {(oid, revid, None) for _tid, oid, revid in orig
+ if _tid == tid}
eq(L1, L2)
...@@ -25,51 +25,7 @@ def patch(): ...@@ -25,51 +25,7 @@ def patch():
H = lambda f: md5(f.func_code.co_code).hexdigest() H = lambda f: md5(f.func_code.co_code).hexdigest()
# Allow serial to be returned as late as tpc_finish if hasattr(Connection, '_handle_serial'): # merged upstream ?
#
# This makes possible for storage to allocate serial inside tpc_finish,
# removing the requirement to serialise second commit phase (tpc_vote
# to tpc_finish/tpc_abort).
h = H(Connection.tpc_finish)
def tpc_finish(self, transaction):
"""Indicate confirmation that the transaction is done."""
def callback(tid):
if self._mvcc_storage:
# Inter-connection invalidation is not needed when the
# storage provides MVCC.
return
d = dict.fromkeys(self._modified)
self._db.invalidate(tid, d, self)
# It's important that the storage calls the passed function
# while it still has its lock. We don't want another thread
# to be able to read any updated data until we've had a chance
# to send an invalidation message to all of the other
# connections!
# <patch>
serial = self._storage.tpc_finish(transaction, callback)
if serial is not None:
assert isinstance(serial, bytes), repr(serial)
for oid_iterator in (self._modified, self._creating):
for oid in oid_iterator:
obj = self._cache.get(oid, None)
# Ignore missing objects and don't update ghosts.
if obj is not None and obj._p_changed is not None:
obj._p_changed = 0
obj._p_serial = serial
# </patch>
self._tpc_cleanup()
global OLD_ZODB
OLD_ZODB = h in (
'ab9b1b8d82c40e5fffa84f7bc4ea3a8b', # Python 2.7
)
if OLD_ZODB:
Connection.tpc_finish = tpc_finish
elif hasattr(Connection, '_handle_serial'): # merged upstream ?
assert hasattr(Connection, '_warn_about_returned_serial') assert hasattr(Connection, '_warn_about_returned_serial')
# sync() is used to provide a "network barrier", which is required for # sync() is used to provide a "network barrier", which is required for
......
...@@ -25,9 +25,6 @@ except ImportError: ...@@ -25,9 +25,6 @@ except ImportError:
from cPickle import dumps, loads from cPickle import dumps, loads
_protocol = 1 _protocol = 1
from ZODB.POSException import UndoError, ConflictError, ReadConflictError from ZODB.POSException import UndoError, ConflictError, ReadConflictError
from . import OLD_ZODB
if OLD_ZODB:
from ZODB.ConflictResolution import ResolvedSerial
from persistent.TimeStamp import TimeStamp from persistent.TimeStamp import TimeStamp
from neo.lib import logging from neo.lib import logging
...@@ -641,9 +638,6 @@ class Application(ThreadedApplication): ...@@ -641,9 +638,6 @@ class Application(ThreadedApplication):
# - If possible, recover from master failure. # - If possible, recover from master failure.
if txn_context.error: if txn_context.error:
raise NEOStorageError(txn_context.error) raise NEOStorageError(txn_context.error)
if OLD_ZODB:
return [(oid, ResolvedSerial)
for oid in txn_context.resolved_dict]
return txn_context.resolved_dict return txn_context.resolved_dict
def tpc_abort(self, transaction): def tpc_abort(self, transaction):
......
...@@ -609,6 +609,10 @@ class ImporterDatabaseManager(DatabaseManager): ...@@ -609,6 +609,10 @@ class ImporterDatabaseManager(DatabaseManager):
except TypeError: # loadBefore returned None except TypeError: # loadBefore returned None
return False return False
except POSKeyError: except POSKeyError:
# loadBefore does not distinguish between an oid:
# - that does not exist at any serial
# - that was deleted
# - whose creation was undone
assert not o or o[3] is None, o assert not o or o[3] is None, o
return o return o
if serial != tid: if serial != tid:
...@@ -617,22 +621,15 @@ class ImporterDatabaseManager(DatabaseManager): ...@@ -617,22 +621,15 @@ class ImporterDatabaseManager(DatabaseManager):
u_tid = u64(serial) u_tid = u64(serial)
if u_tid <= self.zodb_tid and o: if u_tid <= self.zodb_tid and o:
return o return o
if value: value = zodb.repickle(value)
value = zodb.repickle(value)
checksum = util.makeChecksum(value)
else:
# CAVEAT: Although we think loadBefore should not return an empty
# value for a deleted object (BBB: fixed in ZODB4),
# there's no need to distinguish this case in the above
# except clause because it would be crazy to import a
# NEO DB using this backend.
checksum = None
if not next_serial: if not next_serial:
next_serial = db._getNextTID(db._getPartition(u_oid), u_oid, u_tid) next_serial = db._getNextTID(db._getPartition(u_oid), u_oid, u_tid)
if next_serial: if next_serial:
next_serial = p64(next_serial) next_serial = p64(next_serial)
return (serial, next_serial, return (serial, next_serial, 0,
0, checksum, value, zodb.getDataTid(z_oid, u_tid)) util.makeChecksum(value),
value,
zodb.getDataTid(z_oid, u_tid))
def getTransaction(self, tid, all=False): def getTransaction(self, tid, all=False):
u64 = util.u64 u64 = util.u64
......
...@@ -517,6 +517,15 @@ class TransactionalResource(object): ...@@ -517,6 +517,15 @@ class TransactionalResource(object):
return lambda *_: None return lambda *_: None
return self.__getattribute__(attr) return self.__getattribute__(attr)
try:
from ZODB.Connection import TransactionMetaData
except ImportError: # BBB: ZODB < 5
def getTransactionMetaData(txn, conn):
return txn
else:
def getTransactionMetaData(txn, conn):
return txn.data(conn)
class Patch(object): class Patch(object):
""" """
......
...@@ -330,6 +330,15 @@ class Node(object): ...@@ -330,6 +330,15 @@ class Node(object):
def filterConnection(self, *peers): def filterConnection(self, *peers):
return ConnectionFilter(self.getConnectionList(*peers)) return ConnectionFilter(self.getConnectionList(*peers))
@contextmanager
def patchDeferred(self, method):
deferred = []
with Patch(method.__self__, **{method.__name__:
lambda orig, *args, **kw: deferred.append(
partial(orig, *args, **kw))}) as p:
yield p
self.em.wakeup(*deferred)
class ServerNode(Node): class ServerNode(Node):
_server_class_dict = {} _server_class_dict = {}
......
...@@ -36,7 +36,7 @@ from neo.lib.handler import DelayEvent, EventHandler ...@@ -36,7 +36,7 @@ from neo.lib.handler import DelayEvent, EventHandler
from neo.lib import logging from neo.lib import logging
from neo.lib.protocol import (CellStates, ClusterStates, NodeStates, NodeTypes, from neo.lib.protocol import (CellStates, ClusterStates, NodeStates, NodeTypes,
Packets, Packet, uuid_str, ZERO_OID, ZERO_TID, MAX_TID) Packets, Packet, uuid_str, ZERO_OID, ZERO_TID, MAX_TID)
from .. import Patch, TransactionalResource from .. import Patch, TransactionalResource, getTransactionMetaData
from . import ClientApplication, ConnectionFilter, LockLock, NEOCluster, \ from . import ClientApplication, ConnectionFilter, LockLock, NEOCluster, \
NEOThreadedTest, RandomConflictDict, Serialized, ThreadId, with_cluster NEOThreadedTest, RandomConflictDict, Serialized, ThreadId, with_cluster
from neo.lib.util import add64, makeChecksum, p64, u64 from neo.lib.util import add64, makeChecksum, p64, u64
...@@ -1312,10 +1312,7 @@ class Test(NEOThreadedTest): ...@@ -1312,10 +1312,7 @@ class Test(NEOThreadedTest):
# Check that the storage hasn't answered to the store, # Check that the storage hasn't answered to the store,
# which means that a lock is still taken for r['x'] by t2. # which means that a lock is still taken for r['x'] by t2.
self.tic() self.tic()
try: txn = getTransactionMetaData(txn, c1)
txn = txn.data(c1)
except (AttributeError, KeyError): # BBB: ZODB < 5
pass
txn_context = cluster.client._txn_container.get(txn) txn_context = cluster.client._txn_container.get(txn)
empty = txn_context.queue.empty() empty = txn_context.queue.empty()
ll() ll()
......
...@@ -19,18 +19,22 @@ from cStringIO import StringIO ...@@ -19,18 +19,22 @@ from cStringIO import StringIO
from itertools import izip_longest from itertools import izip_longest
import os, random, shutil, threading, time, unittest import os, random, shutil, threading, time, unittest
import transaction, ZODB import transaction, ZODB
from persistent import Persistent
from neo.client.exception import NEOPrimaryMasterLost from neo.client.exception import NEOPrimaryMasterLost
from neo.lib import logging from neo.lib import logging
from neo.lib.protocol import MAX_TID
from neo.lib.util import cached_property, p64, u64 from neo.lib.util import cached_property, p64, u64
from neo.master.transactions import TransactionManager from neo.master.transactions import TransactionManager
from neo.storage.database import getAdapterKlass, importer, manager from neo.storage.database import getAdapterKlass, importer, manager
from neo.storage.database.importer import \ from neo.storage.database.importer import \
Repickler, TransactionRecord, WriteBack Repickler, TransactionRecord, WriteBack
from .. import expectedFailure, getTempDirectory, random_tree, Patch from .. import expectedFailure, getTempDirectory, random_tree, \
Patch, TransactionalResource, getTransactionMetaData
from . import NEOCluster, NEOThreadedTest from . import NEOCluster, NEOThreadedTest
from ZODB import serialize from ZODB import serialize
from ZODB.DB import TransactionalUndo from ZODB.DB import TransactionalUndo
from ZODB.FileStorage import FileStorage from ZODB.FileStorage import FileStorage
from ZODB.POSException import POSKeyError
class Equal: class Equal:
...@@ -371,6 +375,45 @@ class ImporterTests(NEOThreadedTest): ...@@ -371,6 +375,45 @@ class ImporterTests(NEOThreadedTest):
t.begin() t.begin()
finalCheck(r) finalCheck(r)
def testDeleteAndUndo(self):
fs_path, cfg = self.getFS()
c = ZODB.DB(FileStorage(fs_path)).open()
s = c.db().storage
r = c.root()
tid = r._p_serial
r[''] = delete = Persistent()
transaction.commit()
self.assertEqual(delete._p_oid, p64(1))
del r['']
TransactionalResource(transaction, 0, commit=lambda txn:
s.deleteObject(delete._p_oid, delete._p_serial,
getTransactionMetaData(txn, c)))
transaction.commit()
r[''] = undo = Persistent()
transaction.commit()
c.db().undo(s.undoLog(last=1)[0]['id'])
transaction.commit()
self.assertEqual(undo._p_oid, p64(2))
def check():
self.assertIsNone(s.loadBefore(delete._p_oid, tid))
for oid in delete._p_oid, undo._p_oid, p64(3):
self.assertRaises(POSKeyError, s.loadBefore, oid, MAX_TID)
check() # FileStorage
c.db().close()
importer = {'zodb': [('root', cfg)]}
with NEOCluster(importer=importer) as cluster:
storage = cluster.storage
dm = storage.dm
with storage.patchDeferred(dm._finished):
with storage.patchDeferred(dm.doOperation):
cluster.start()
s = cluster.getZODBStorage()
check() # before import
self.tic()
check() # imported, Importer getObject
self.tic()
check() # imported, direct getObject
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()
...@@ -44,7 +44,7 @@ get3rdParty(x, '3rdparty/' + x, 'https://lab.nexedi.com/nexedi/erp5' ...@@ -44,7 +44,7 @@ get3rdParty(x, '3rdparty/' + x, 'https://lab.nexedi.com/nexedi/erp5'
'/raw/14b0fcdcc31c5791646f9590678ca028f5d221f5/product/ERP5Type/' + x, '/raw/14b0fcdcc31c5791646f9590678ca028f5d221f5/product/ERP5Type/' + x,
'abb7970856540fd02150edd1fa9a3a3e8d0074ec526ab189684ef7ea9b41825f') 'abb7970856540fd02150edd1fa9a3a3e8d0074ec526ab189684ef7ea9b41825f')
zodb_require = ['ZODB3>=3.10dev'] zodb_require = ['ZODB>=4.4.5']
extras_require = { extras_require = {
'admin': [], 'admin': [],
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment