Commit fb47eeec authored by Julien Muchembled's avatar Julien Muchembled

Bump protocol version

...@@ -14,8 +14,9 @@ ...@@ -14,8 +14,9 @@
# You should have received a copy of the GNU General Public License # You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
from ZODB import BaseStorage, ConflictResolution, POSException from ZODB import BaseStorage, ConflictResolution
from ZODB.POSException import ConflictError, UndoError from ZODB.POSException import (
ConflictError, POSKeyError, ReadOnlyError, UndoError)
from zope.interface import implementer from zope.interface import implementer
import ZODB.interfaces import ZODB.interfaces
...@@ -25,7 +26,7 @@ from .app import Application ...@@ -25,7 +26,7 @@ from .app import Application
from .exception import NEOStorageNotFoundError, NEOStorageDoesNotExistError from .exception import NEOStorageNotFoundError, NEOStorageDoesNotExistError
def raiseReadOnlyError(*args, **kw): def raiseReadOnlyError(*args, **kw):
raise POSException.ReadOnlyError() raise ReadOnlyError
@implementer( @implementer(
ZODB.interfaces.IStorage, ZODB.interfaces.IStorage,
...@@ -39,7 +40,7 @@ class Storage(BaseStorage.BaseStorage, ...@@ -39,7 +40,7 @@ class Storage(BaseStorage.BaseStorage,
ConflictResolution.ConflictResolvingStorage): ConflictResolution.ConflictResolvingStorage):
"""Wrapper class for neoclient.""" """Wrapper class for neoclient."""
def __init__(self, master_nodes, name, read_only=False, def __init__(self, master_nodes, name,
compress=None, logfile=None, _app=None, **kw): compress=None, logfile=None, _app=None, **kw):
""" """
Do not pass those parameters (used internally): Do not pass those parameters (used internally):
...@@ -50,30 +51,28 @@ class Storage(BaseStorage.BaseStorage, ...@@ -50,30 +51,28 @@ class Storage(BaseStorage.BaseStorage,
if logfile: if logfile:
logging.setup(logfile) logging.setup(logfile)
BaseStorage.BaseStorage.__init__(self, 'NEOStorage(%s)' % (name, )) BaseStorage.BaseStorage.__init__(self, 'NEOStorage(%s)' % (name, ))
# Warning: _is_read_only is used in BaseStorage, do not rename it.
self._is_read_only = read_only
if read_only:
for method_id in (
'new_oid',
'tpc_begin',
'tpc_vote',
'tpc_abort',
'store',
'deleteObject',
'undo',
'undoLog',
):
setattr(self, method_id, raiseReadOnlyError)
if _app is None: if _app is None:
ssl = [kw.pop(x, None) for x in ('ca', 'cert', 'key')] ssl = [kw.pop(x, None) for x in ('ca', 'cert', 'key')]
_app = Application(master_nodes, name, compress=compress, _app = Application(master_nodes, name, compress=compress,
ssl=ssl if any(ssl) else None, **kw) ssl=ssl if any(ssl) else None, **kw)
self.app = _app self.app = _app
if __debug__ and self._is_read_only:
# For ZODB checkWriteMethods:
self.store = self.undo = raiseReadOnlyError
# For tpc_begin, it's checked in Application because it's used
# internally (e.g. pack) and the caller does not want to clean up
# with tpc_abort.
# For other methods, either the master rejects with
# READ_ONLY_ACCESS or the call is outside of a transaction.
@property @property
def _cache(self): def _cache(self):
return self.app._cache return self.app._cache
@property
def _is_read_only(self): # used in BaseStorage, do not rename it
return self.app.read_only
def load(self, oid, version=''): def load(self, oid, version=''):
# XXX: interface definition states that version parameter is # XXX: interface definition states that version parameter is
# mandatory, while some ZODB tests do not provide it. For now, make # mandatory, while some ZODB tests do not provide it. For now, make
...@@ -82,7 +81,7 @@ class Storage(BaseStorage.BaseStorage, ...@@ -82,7 +81,7 @@ class Storage(BaseStorage.BaseStorage,
try: try:
return self.app.load(oid)[:2] return self.app.load(oid)[:2]
except NEOStorageNotFoundError: except NEOStorageNotFoundError:
raise POSException.POSKeyError(oid) raise POSKeyError(oid)
except Exception: except Exception:
logging.exception('oid=%r', oid) logging.exception('oid=%r', oid)
raise raise
...@@ -151,7 +150,7 @@ class Storage(BaseStorage.BaseStorage, ...@@ -151,7 +150,7 @@ class Storage(BaseStorage.BaseStorage,
try: try:
return self.app.load(oid, serial)[0] return self.app.load(oid, serial)[0]
except NEOStorageNotFoundError: except NEOStorageNotFoundError:
raise POSException.POSKeyError(oid) raise POSKeyError(oid)
except Exception: except Exception:
logging.exception('oid=%r, serial=%r', oid, serial) logging.exception('oid=%r, serial=%r', oid, serial)
raise raise
...@@ -160,7 +159,7 @@ class Storage(BaseStorage.BaseStorage, ...@@ -160,7 +159,7 @@ class Storage(BaseStorage.BaseStorage,
try: try:
return self.app.load(oid, None, tid) return self.app.load(oid, None, tid)
except NEOStorageDoesNotExistError: except NEOStorageDoesNotExistError:
raise POSException.POSKeyError(oid) raise POSKeyError(oid)
except NEOStorageNotFoundError: except NEOStorageNotFoundError:
return None return None
except Exception: except Exception:
...@@ -195,7 +194,7 @@ class Storage(BaseStorage.BaseStorage, ...@@ -195,7 +194,7 @@ class Storage(BaseStorage.BaseStorage,
try: try:
data, serial, _ = self.app.load(oid) data, serial, _ = self.app.load(oid)
except NEOStorageNotFoundError: except NEOStorageNotFoundError:
raise POSException.POSKeyError(oid) raise POSKeyError(oid)
except Exception: except Exception:
logging.exception('oid=%r', oid) logging.exception('oid=%r', oid)
raise raise
...@@ -215,7 +214,7 @@ class Storage(BaseStorage.BaseStorage, ...@@ -215,7 +214,7 @@ class Storage(BaseStorage.BaseStorage,
try: try:
return self.app.history(oid, *args, **kw) return self.app.history(oid, *args, **kw)
except NEOStorageNotFoundError: except NEOStorageNotFoundError:
raise POSException.POSKeyError(oid) raise POSKeyError(oid)
except Exception: except Exception:
logging.exception('oid=%r', oid) logging.exception('oid=%r', oid)
raise raise
......
...@@ -25,7 +25,8 @@ try: ...@@ -25,7 +25,8 @@ try:
except ImportError: except ImportError:
from cPickle import dumps, loads from cPickle import dumps, loads
_protocol = 1 _protocol = 1
from ZODB.POSException import UndoError, ConflictError, ReadConflictError from ZODB.POSException import (
ConflictError, ReadConflictError, ReadOnlyError, UndoError)
from neo.lib import logging from neo.lib import logging
from neo.lib.compress import decompress_list, getCompress from neo.lib.compress import decompress_list, getCompress
...@@ -72,7 +73,7 @@ class Application(ThreadedApplication): ...@@ -72,7 +73,7 @@ class Application(ThreadedApplication):
wait_for_pack = False wait_for_pack = False
def __init__(self, master_nodes, name, compress=True, cache_size=None, def __init__(self, master_nodes, name, compress=True, cache_size=None,
ignore_wrong_checksum=False, **kw): read_only=False, ignore_wrong_checksum=False, **kw):
super(Application, self).__init__(parseMasterList(master_nodes), super(Application, self).__init__(parseMasterList(master_nodes),
name, **kw) name, **kw)
# Internal Attributes common to all thread # Internal Attributes common to all thread
...@@ -108,6 +109,7 @@ class Application(ThreadedApplication): ...@@ -108,6 +109,7 @@ class Application(ThreadedApplication):
self._connecting_to_storage_node = Lock() self._connecting_to_storage_node = Lock()
self._node_failure_dict = {} self._node_failure_dict = {}
self.compress = getCompress(compress) self.compress = getCompress(compress)
self.read_only = read_only
self.ignore_wrong_checksum = ignore_wrong_checksum self.ignore_wrong_checksum = ignore_wrong_checksum
def __getattr__(self, attr): def __getattr__(self, attr):
...@@ -200,56 +202,65 @@ class Application(ThreadedApplication): ...@@ -200,56 +202,65 @@ class Application(ThreadedApplication):
fail_count = 0 fail_count = 0
ask = self._ask ask = self._ask
handler = self.primary_bootstrap_handler handler = self.primary_bootstrap_handler
while 1: conn = None
self.ignore_invalidations = True try:
# Get network connection to primary master while 1:
while fail_count < self.max_reconnection_to_master: self.ignore_invalidations = True
self.nm.reset() # Get network connection to primary master
if self.primary_master_node is not None: while fail_count < self.max_reconnection_to_master:
# If I know a primary master node, pinpoint it. self.nm.reset()
node = self.primary_master_node if self.primary_master_node is not None:
self.primary_master_node = None # If I know a primary master node, pinpoint it.
else: node = self.primary_master_node
# Otherwise, check one by one. self.primary_master_node = None
master_list = self.nm.getMasterList() else:
if not master_list: # Otherwise, check one by one.
# XXX: On shutdown, it already happened that this list master_list = self.nm.getMasterList()
# is empty, leading to ZeroDivisionError. This if not master_list:
# looks a minor issue so let's wait to have more # XXX: On shutdown, it already happened that this
# information. # list is empty, leading to ZeroDivisionError.
logging.error('%r', self.__dict__) # This looks a minor issue so let's wait to
index = (index + 1) % len(master_list) # have more information.
node = master_list[index] logging.error('%r', self.__dict__)
# Connect to master index = (index + 1) % len(master_list)
conn = MTClientConnection(self, node = master_list[index]
# Connect to master
conn = MTClientConnection(self,
self.notifications_handler, self.notifications_handler,
node=node, node=node,
dispatcher=self.dispatcher) dispatcher=self.dispatcher)
p = Packets.RequestIdentification(NodeTypes.CLIENT, p = Packets.RequestIdentification(NodeTypes.CLIENT,
self.uuid, None, self.name, None, {}) self.uuid, None, self.name, None,
{'read_only': True} if self.read_only else {})
try:
ask(conn, p, handler=handler)
except ConnectionClosed:
conn = None
fail_count += 1
else:
self.primary_master_node = node
break
else:
raise NEOPrimaryMasterLost(
"Too many connection failures to the primary master")
logging.info('Connected to %s', self.primary_master_node)
try: try:
ask(conn, p, handler=handler) # Request identification and required informations to be
# operational. Might raise ConnectionClosed so that the new
# primary can be looked-up again.
logging.info('Initializing from master')
ask(conn, Packets.AskLastTransaction(), handler=handler)
if self.pt.operational():
break
except ConnectionClosed: except ConnectionClosed:
fail_count += 1 conn = self.primary_master_node = None
else: logging.error('Connection to %s lost',
self.primary_master_node = node self.trying_master_node)
break fail_count += 1
else: except:
raise NEOPrimaryMasterLost( if conn is not None:
"Too many connection failures to the primary master") conn.close()
logging.info('Connected to %s', self.primary_master_node) raise
try:
# Request identification and required informations to be
# operational. Might raise ConnectionClosed so that the new
# primary can be looked-up again.
logging.info('Initializing from master')
ask(conn, Packets.AskLastTransaction(), handler=handler)
if self.pt.operational():
break
except ConnectionClosed:
logging.error('Connection to %s lost', self.trying_master_node)
self.primary_master_node = None
fail_count += 1
logging.info("Connected and ready") logging.info("Connected and ready")
return conn return conn
...@@ -497,6 +508,8 @@ class Application(ThreadedApplication): ...@@ -497,6 +508,8 @@ class Application(ThreadedApplication):
def tpc_begin(self, storage, transaction, tid=None, status=' '): def tpc_begin(self, storage, transaction, tid=None, status=' '):
"""Begin a new transaction.""" """Begin a new transaction."""
if self.read_only:
raise ReadOnlyError
# First get a transaction, only one is allowed at a time # First get a transaction, only one is allowed at a time
txn_context = self._txn_container.new(transaction) txn_context = self._txn_container.new(transaction)
# use the given TID or request a new one to the master # use the given TID or request a new one to the master
...@@ -523,6 +536,9 @@ class Application(ThreadedApplication): ...@@ -523,6 +536,9 @@ class Application(ThreadedApplication):
compressed_data = '' compressed_data = ''
compression = 0 compression = 0
checksum = ZERO_HASH checksum = ZERO_HASH
if data_serial is None:
assert oid not in txn_context.resolved_dict, oid
txn_context.delete_list.append(oid)
else: else:
size, compression, compressed_data = self.compress(data) size, compression, compressed_data = self.compress(data)
checksum = makeChecksum(compressed_data) checksum = makeChecksum(compressed_data)
...@@ -573,6 +589,7 @@ class Application(ThreadedApplication): ...@@ -573,6 +589,7 @@ class Application(ThreadedApplication):
'Conflict resolution succeeded for %s@%s with %s', 'Conflict resolution succeeded for %s@%s with %s',
dump(oid), dump(old_serial), dump(serial)) dump(oid), dump(old_serial), dump(serial))
# Mark this conflict as resolved # Mark this conflict as resolved
assert oid not in txn_context.delete_list, oid
resolved_dict[oid] = serial resolved_dict[oid] = serial
# Try to store again # Try to store again
self._store(txn_context, oid, serial, data) self._store(txn_context, oid, serial, data)
...@@ -725,13 +742,22 @@ class Application(ThreadedApplication): ...@@ -725,13 +742,22 @@ class Application(ThreadedApplication):
self.tpc_vote(transaction) self.tpc_vote(transaction)
txn_context = txn_container.pop(transaction) txn_context = txn_container.pop(transaction)
cache_dict = txn_context.cache_dict cache_dict = txn_context.cache_dict
checked_list = [oid for oid, data in cache_dict.iteritems() getPartition = self.pt.getPartition
if data is CHECKED_SERIAL] checked = set()
for oid in checked_list: for oid, data in cache_dict.items():
del cache_dict[oid] if data is CHECKED_SERIAL:
del cache_dict[oid]
checked.add(getPartition(oid))
deleted = txn_context.delete_list
if deleted:
oids = set(cache_dict)
oids.difference_update(deleted)
deleted = map(getPartition, deleted)
else:
oids = list(cache_dict)
ttid = txn_context.ttid ttid = txn_context.ttid
p = Packets.AskFinishTransaction(ttid, list(cache_dict), p = Packets.AskFinishTransaction(ttid, oids, deleted, checked,
checked_list, txn_context.pack) txn_context.pack)
try: try:
tid = self._askPrimary(p, cache_dict=cache_dict, callback=f) tid = self._askPrimary(p, cache_dict=cache_dict, callback=f)
assert tid assert tid
......
...@@ -26,9 +26,9 @@ ...@@ -26,9 +26,9 @@
</key> </key>
<key name="read-only" datatype="boolean"> <key name="read-only" datatype="boolean">
<description> <description>
If true, only reads may be executed against the storage. Note If true, only reads may be executed against the storage.
that the "pack" operation is not considered a write operation If false when cluster is backing up, POSException.ReadOnlyError
and is still allowed on a read-only neostorage. is raised.
</description> </description>
</key> </key>
<key name="logfile" datatype="existing-dirpath"> <key name="logfile" datatype="existing-dirpath">
......
...@@ -38,6 +38,7 @@ class Transaction(object): ...@@ -38,6 +38,7 @@ class Transaction(object):
self.txn = txn self.txn = txn
# data being stored # data being stored
self.data_dict = {} # {oid: (value, serial, [node_id])} self.data_dict = {} # {oid: (value, serial, [node_id])}
self.delete_list = [] # [oid]
# data stored: this will go to the cache on tpc_finish # data stored: this will go to the cache on tpc_finish
self.cache_dict = {} # {oid: value} self.cache_dict = {} # {oid: value}
# conflicts to resolve # conflicts to resolve
......
...@@ -26,7 +26,7 @@ except ImportError: ...@@ -26,7 +26,7 @@ except ImportError:
# The protocol version must be increased whenever upgrading a node may require # The protocol version must be increased whenever upgrading a node may require
# to upgrade other nodes. # to upgrade other nodes.
PROTOCOL_VERSION = 4 PROTOCOL_VERSION = 5
# By encoding the handshake packet with msgpack, the whole NEO stream can be # By encoding the handshake packet with msgpack, the whole NEO stream can be
# decoded with msgpack. The first byte is 0x92, which is different from TLS # decoded with msgpack. The first byte is 0x92, which is different from TLS
# Handshake (0x16). # Handshake (0x16).
...@@ -497,7 +497,14 @@ class Packets(dict): ...@@ -497,7 +497,14 @@ class Packets(dict):
InvalidateObjects = notify(""" InvalidateObjects = notify("""
Notify about a new transaction modifying objects, Notify about a new transaction modifying objects,
invalidating client caches. invalidating client caches. Deleted objects are excluded.
:nodes: M -> C
""")
InvalidatePartitions = notify("""
Notify about a new transaction, listing partitions
with modified or deleted objects.
:nodes: M -> C :nodes: M -> C
""") """)
......
...@@ -150,7 +150,10 @@ class Application(BaseApplication): ...@@ -150,7 +150,10 @@ class Application(BaseApplication):
self.election_handler = master.ElectionHandler(self) self.election_handler = master.ElectionHandler(self)
self.secondary_handler = master.SecondaryHandler(self) self.secondary_handler = master.SecondaryHandler(self)
self.client_service_handler = client.ClientServiceHandler(self) self.client_service_handler = client.ClientServiceHandler(self)
self.client_ro_service_handler = client.ClientReadOnlyServiceHandler(self) self.client_ro_service_handler = client.ClientReadOnlyServiceHandler(
self)
self.client_backup_service_handler = client.ClientBackupServiceHandler(
self)
self.storage_service_handler = storage.StorageServiceHandler(self) self.storage_service_handler = storage.StorageServiceHandler(self)
registerLiveDebugger(on_log=self.log) registerLiveDebugger(on_log=self.log)
...@@ -559,23 +562,26 @@ class Application(BaseApplication): ...@@ -559,23 +562,26 @@ class Application(BaseApplication):
# I have received all the lock answers now: # I have received all the lock answers now:
# - send a Notify Transaction Finished to the initiated client node # - send a Notify Transaction Finished to the initiated client node
# - Invalidate Objects to the other client nodes # - Invalidate Objects to the other client nodes
ttid = txn.getTTID() ttid = txn.ttid
tid = txn.getTID() tid = txn.tid
transaction_node = txn.getNode() transaction_node = txn.node
invalidate_objects = Packets.InvalidateObjects(tid, txn.getOIDList()) invalidate_objects = Packets.InvalidateObjects(tid, txn.oid_list)
invalidate_partitions = Packets.InvalidatePartitions(
tid, txn.partition_list)
client_list = self.nm.getClientList(only_identified=True) client_list = self.nm.getClientList(only_identified=True)
for client_node in client_list: for client_node in client_list:
if client_node is transaction_node: if client_node is transaction_node:
client_node.send(Packets.AnswerTransactionFinished(ttid, tid), client_node.send(Packets.AnswerTransactionFinished(ttid, tid),
msg_id=txn.getMessageId()) msg_id=txn.msg_id)
else: else:
client_node.send(invalidate_objects) client_node.send(invalidate_partitions
if client_node.extra.get('backup') else
invalidate_objects)
# Unlock Information to relevant storage nodes. # Unlock Information to relevant storage nodes.
notify_unlock = Packets.NotifyUnlockInformation(ttid) notify_unlock = Packets.NotifyUnlockInformation(ttid)
getByUUID = self.nm.getByUUID getByUUID = self.nm.getByUUID
txn_storage_list = txn.getUUIDList() for storage_uuid in txn.involved:
for storage_uuid in txn_storage_list:
getByUUID(storage_uuid).send(notify_unlock) getByUUID(storage_uuid).send(notify_unlock)
# Notify storage nodes about new pack order if any. # Notify storage nodes about new pack order if any.
......
...@@ -41,7 +41,7 @@ class MasterHandler(EventHandler): ...@@ -41,7 +41,7 @@ class MasterHandler(EventHandler):
def askLastIDs(self, conn): def askLastIDs(self, conn):
tm = self.app.tm tm = self.app.tm
conn.answer(Packets.AnswerLastIDs(tm.getLastTID(), tm.getLastOID())) conn.answer(Packets.AnswerLastIDs(tm.getLastTID(), tm.getLastOID(), tm.getFirstTID()))
def askLastTransaction(self, conn): def askLastTransaction(self, conn):
conn.answer(Packets.AnswerLastTransaction( conn.answer(Packets.AnswerLastTransaction(
......
...@@ -239,6 +239,9 @@ class AdministrationHandler(MasterHandler): ...@@ -239,6 +239,9 @@ class AdministrationHandler(MasterHandler):
app = self.app app = self.app
if app.getLastTransaction() <= tid: if app.getLastTransaction() <= tid:
raise AnswerDenied("Truncating after last transaction does nothing") raise AnswerDenied("Truncating after last transaction does nothing")
if app.tm.getFirstTID() > tid:
raise AnswerDenied("Truncating before first transaction is"
" probably not what you intended to do")
if app.pm.getApprovedRejected(add64(tid, 1))[0]: if app.pm.getApprovedRejected(add64(tid, 1))[0]:
# TODO: The protocol must be extended to support safe cases # TODO: The protocol must be extended to support safe cases
# (e.g. no started pack whose id is after truncation tid). # (e.g. no started pack whose id is after truncation tid).
......
...@@ -63,13 +63,12 @@ class BackupHandler(EventHandler): ...@@ -63,13 +63,12 @@ class BackupHandler(EventHandler):
raise RuntimeError("upstream DB truncated") raise RuntimeError("upstream DB truncated")
app.ignore_invalidations = False app.ignore_invalidations = False
def invalidateObjects(self, conn, tid, oid_list): def invalidatePartitions(self, conn, tid, partition_list):
app = self.app app = self.app
if app.ignore_invalidations: if app.ignore_invalidations:
return return
getPartition = app.app.pt.getPartition partition_set = set(partition_list)
partition_set = set(map(getPartition, oid_list)) partition_set.add(app.app.pt.getPartition(tid))
partition_set.add(getPartition(tid))
prev_tid = app.app.getLastTransaction() prev_tid = app.app.getLastTransaction()
app.invalidatePartitions(tid, prev_tid, partition_set) app.invalidatePartitions(tid, prev_tid, partition_set)
......
...@@ -64,7 +64,8 @@ class ClientServiceHandler(MasterHandler): ...@@ -64,7 +64,8 @@ class ClientServiceHandler(MasterHandler):
conn.answer((Errors.Ack if app.tm.vote(app, *args) else conn.answer((Errors.Ack if app.tm.vote(app, *args) else
Errors.IncompleteTransaction)()) Errors.IncompleteTransaction)())
def askFinishTransaction(self, conn, ttid, oid_list, checked_list, pack): def askFinishTransaction(self, conn, ttid, oid_list,
deleted, checked, pack):
app = self.app app = self.app
if pack: if pack:
tid = pack[1] tid = pack[1]
...@@ -74,7 +75,8 @@ class ClientServiceHandler(MasterHandler): ...@@ -74,7 +75,8 @@ class ClientServiceHandler(MasterHandler):
app, app,
ttid, ttid,
oid_list, oid_list,
checked_list, deleted,
checked,
conn.getPeerId(), conn.getPeerId(),
) )
if tid: if tid:
...@@ -131,12 +133,13 @@ class ClientServiceHandler(MasterHandler): ...@@ -131,12 +133,13 @@ class ClientServiceHandler(MasterHandler):
else: else:
pack.waitForPack(conn.delayedAnswer(Packets.WaitedForPack)) pack.waitForPack(conn.delayedAnswer(Packets.WaitedForPack))
# like ClientServiceHandler but read-only & only for tid <= backup_tid
class ClientReadOnlyServiceHandler(ClientServiceHandler): class ClientReadOnlyServiceHandler(ClientServiceHandler):
_read_only_message = 'read-only access as requested by the client'
def _readOnly(self, conn, *args, **kw): def _readOnly(self, conn, *args, **kw):
conn.answer(Errors.ReadOnlyAccess( conn.answer(Errors.ReadOnlyAccess(self._read_only_message))
'read-only access because cluster is in backuping mode'))
askBeginTransaction = _readOnly askBeginTransaction = _readOnly
askNewOIDs = _readOnly askNewOIDs = _readOnly
...@@ -145,9 +148,15 @@ class ClientReadOnlyServiceHandler(ClientServiceHandler): ...@@ -145,9 +148,15 @@ class ClientReadOnlyServiceHandler(ClientServiceHandler):
askPack = _readOnly askPack = _readOnly
abortTransaction = _readOnly abortTransaction = _readOnly
# like ClientReadOnlyServiceHandler but only for tid <= backup_tid
class ClientBackupServiceHandler(ClientReadOnlyServiceHandler):
_read_only_message = 'read-only access because cluster is in backuping mode'
# XXX LastIDs is not used by client at all, and it requires work to determine # XXX LastIDs is not used by client at all, and it requires work to determine
# last_oid up to backup_tid, so just make it non-functional for client. # last_oid up to backup_tid, so just make it non-functional for client.
askLastIDs = _readOnly askLastIDs = ClientReadOnlyServiceHandler._readOnly.__func__ # Py3
# like in MasterHandler but returns backup_tid instead of last_tid # like in MasterHandler but returns backup_tid instead of last_tid
def askLastTransaction(self, conn): def askLastTransaction(self, conn):
......
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
from neo.lib import logging from neo.lib import logging
from neo.lib.exception import NotReadyError, PrimaryElected, ProtocolError from neo.lib.exception import NotReadyError, PrimaryElected, ProtocolError
from neo.lib.handler import EventHandler from neo.lib.handler import EventHandler
from neo.lib.protocol import CellStates, ClusterStates, NodeStates, \ from neo.lib.protocol import CellStates, ClusterStates, Errors, NodeStates, \
NodeTypes, Packets, uuid_str NodeTypes, Packets, uuid_str
from ..app import monotonic_time from ..app import monotonic_time
...@@ -63,10 +63,17 @@ class IdentificationHandler(EventHandler): ...@@ -63,10 +63,17 @@ class IdentificationHandler(EventHandler):
new_nid = extra.pop('new_nid', None) new_nid = extra.pop('new_nid', None)
state = NodeStates.RUNNING state = NodeStates.RUNNING
if node_type == NodeTypes.CLIENT: if node_type == NodeTypes.CLIENT:
read_only = extra.pop('read_only', 'backup' in extra)
if app.cluster_state == ClusterStates.RUNNING: if app.cluster_state == ClusterStates.RUNNING:
handler = app.client_service_handler handler = (app.client_ro_service_handler if read_only else
app.client_service_handler)
elif app.cluster_state == ClusterStates.BACKINGUP: elif app.cluster_state == ClusterStates.BACKINGUP:
handler = app.client_ro_service_handler if not read_only:
conn.answer(Errors.ReadOnlyAccess(
"read-write access requested"
" but cluster is backing up"))
return
handler = app.client_backup_service_handler
else: else:
raise NotReadyError raise NotReadyError
human_readable_node_type = ' client ' human_readable_node_type = ' client '
......
...@@ -20,87 +20,40 @@ from struct import pack, unpack ...@@ -20,87 +20,40 @@ from struct import pack, unpack
from neo.lib import logging from neo.lib import logging
from neo.lib.exception import ProtocolError from neo.lib.exception import ProtocolError
from neo.lib.handler import DelayEvent, EventQueue from neo.lib.handler import DelayEvent, EventQueue
from neo.lib.protocol import uuid_str, ZERO_OID, ZERO_TID from neo.lib.protocol import uuid_str, ZERO_OID, ZERO_TID, MAX_TID
from neo.lib.util import dump, u64, addTID, tidFromTime from neo.lib.util import dump, u64, addTID, tidFromTime
class Transaction(object): class Transaction(object):
""" """
A pending transaction A pending transaction
""" """
_tid = None tid = None
_msg_id = None oid_list = ()
_oid_list = None failed = \
_failed = frozenset() involved = frozenset()
_prepared = False
# uuid dict hold flag to known who has locked the transaction
_uuid_set = None
_lock_wait_uuid_set = None
def __init__(self, node, storage_readiness, ttid): def __init__(self, node, storage_readiness, ttid):
""" """
Prepare the transaction, set OIDs and UUIDs related to it Prepare the transaction, set OIDs and UUIDs related to it
""" """
self._node = node self.node = node
self._storage_readiness = storage_readiness self.storage_readiness = storage_readiness
self._ttid = ttid self.ttid = ttid
self._birth = time() self._birth = time()
# store storage uuids that must be notified at commit # store storage uuids that must be notified at commit
self._notification_set = set() self._notification_set = set()
def __repr__(self): def __repr__(self):
return "<%s(client=%r, tid=%r, oids=%r, storages=%r, age=%.2fs) at %x>" % ( return "<%s(client=%r, tid=%r, invalidated=%r, storages=%r, age=%.2fs) at %x>" % (
self.__class__.__name__, self.__class__.__name__,
self._node, self.node,
dump(self._tid), dump(self.tid),
map(dump, self._oid_list or ()), map(dump, self.oid_list),
map(uuid_str, self._uuid_set or ()), map(uuid_str, self.involved),
time() - self._birth, time() - self._birth,
id(self), id(self),
) )
def getNode(self):
"""
Return the node that had began the transaction
"""
return self._node
def getTTID(self):
"""
Return the temporary transaction ID.
"""
return self._ttid
def getTID(self):
"""
Return the transaction ID
"""
return self._tid
def getMessageId(self):
"""
Returns the packet ID to use in the answer
"""
return self._msg_id
def getUUIDList(self):
"""
Returns the list of node's UUID that lock the transaction
"""
return list(self._uuid_set)
def getOIDList(self):
"""
Returns the list of OIDs used in the transaction
"""
return list(self._oid_list)
def isPrepared(self):
"""
Returns True if the commit has been requested by the client
"""
return self._prepared
def registerForNotification(self, uuid): def registerForNotification(self, uuid):
""" """
Register a node that requires a notification at commit Register a node that requires a notification at commit
...@@ -114,14 +67,13 @@ class Transaction(object): ...@@ -114,14 +67,13 @@ class Transaction(object):
""" """
return list(self._notification_set) return list(self._notification_set)
def prepare(self, tid, oid_list, uuid_set, msg_id): def prepare(self, tid, oid_list, partition_list, involved, msg_id):
self.tid = tid
self._tid = tid self.oid_list = oid_list
self._oid_list = oid_list self.partition_list = partition_list
self._msg_id = msg_id self.msg_id = msg_id
self._uuid_set = uuid_set self.involved = involved
self._lock_wait_uuid_set = uuid_set.copy() self.locking = involved.copy()
self._prepared = True
def storageLost(self, uuid): def storageLost(self, uuid):
""" """
...@@ -133,16 +85,17 @@ class Transaction(object): ...@@ -133,16 +85,17 @@ class Transaction(object):
# XXX: We might lose information that a storage successfully locked # XXX: We might lose information that a storage successfully locked
# data but was later found to be disconnected. This loss has no impact # data but was later found to be disconnected. This loss has no impact
# on current code, but it might be disturbing to reader or future code. # on current code, but it might be disturbing to reader or future code.
if self._prepared: if self.tid:
self._lock_wait_uuid_set.discard(uuid) locking = self.locking
self._uuid_set.discard(uuid) locking.discard(uuid)
return self.locked() self.involved.discard(uuid)
return not locking
return False return False
def clientLost(self, node): def clientLost(self, node):
if self._node is node: if self.node is node:
if self._prepared: if self.tid:
self._node = None # orphan self.node = None # orphan
else: else:
return True # abort return True # abort
else: else:
...@@ -154,14 +107,9 @@ class Transaction(object): ...@@ -154,14 +107,9 @@ class Transaction(object):
Define that a node has locked the transaction Define that a node has locked the transaction
Returns true if all nodes are locked Returns true if all nodes are locked
""" """
self._lock_wait_uuid_set.remove(uuid) locking = self.locking
return self.locked() locking.remove(uuid)
return not locking
def locked(self):
"""
Returns true if all nodes are locked
"""
return not self._lock_wait_uuid_set
class TransactionManager(EventQueue): class TransactionManager(EventQueue):
...@@ -179,6 +127,9 @@ class TransactionManager(EventQueue): ...@@ -179,6 +127,9 @@ class TransactionManager(EventQueue):
self._ttid_dict = {} self._ttid_dict = {}
self._last_oid = ZERO_OID self._last_oid = ZERO_OID
self._last_tid = ZERO_TID self._last_tid = ZERO_TID
self._first_tid = MAX_TID
# avoid the overhead of min_tid on every _unlockPending
self._unlockPending = self._firstUnlockPending
# queue filled with ttids pointing to transactions with increasing tids # queue filled with ttids pointing to transactions with increasing tids
self._queue = deque() self._queue = deque()
...@@ -212,6 +163,13 @@ class TransactionManager(EventQueue): ...@@ -212,6 +163,13 @@ class TransactionManager(EventQueue):
self._last_oid = oid_list[-1] self._last_oid = oid_list[-1]
return oid_list return oid_list
def setFirstTID(self, tid):
if self._first_tid > tid:
self._first_tid = tid
def getFirstTID(self):
return self._first_tid
def setLastOID(self, oid): def setLastOID(self, oid):
if self._last_oid < oid: if self._last_oid < oid:
self._last_oid = oid self._last_oid = oid
...@@ -302,7 +260,7 @@ class TransactionManager(EventQueue): ...@@ -302,7 +260,7 @@ class TransactionManager(EventQueue):
txn = self[ttid] txn = self[ttid]
# The client does not know which nodes are not expected to have # The client does not know which nodes are not expected to have
# transactions in full. Let's filter out them. # transactions in full. Let's filter out them.
failed = app.getStorageReadySet(txn._storage_readiness) failed = app.getStorageReadySet(txn.storage_readiness)
failed.intersection_update(uuid_list) failed.intersection_update(uuid_list)
if failed: if failed:
operational = app.pt.operational operational = app.pt.operational
...@@ -312,41 +270,42 @@ class TransactionManager(EventQueue): ...@@ -312,41 +270,42 @@ class TransactionManager(EventQueue):
return False return False
all_failed = failed.copy() all_failed = failed.copy()
for t in self._ttid_dict.itervalues(): for t in self._ttid_dict.itervalues():
all_failed |= t._failed all_failed |= t.failed
if not operational(all_failed): if not operational(all_failed):
# Other transactions were voted and unless they're aborted, # Other transactions were voted and unless they're aborted,
# we won't be able to finish this one, because that would make # we won't be able to finish this one, because that would make
# the cluster non-operational. Let's tell the caller to retry # the cluster non-operational. Let's retry later.
# later.
raise DelayEvent raise DelayEvent
# Allow the client to finish the transaction, # Allow the client to finish the transaction,
# even if this will disconnect storage nodes. # even if this will disconnect storage nodes.
txn._failed = failed txn.failed = failed
return True return True
def prepare(self, app, ttid, oid_list, checked_list, msg_id): def prepare(self, app, ttid, oid_list, deleted, checked, msg_id):
""" """
Prepare a transaction to be finished Prepare a transaction to be finished
""" """
txn = self[ttid] txn = self[ttid]
pt = app.pt pt = app.pt
failed = txn._failed failed = txn.failed
if failed and not pt.operational(failed): if failed and not pt.operational(failed):
return None, None return None, None
ready = app.getStorageReadySet(txn._storage_readiness) ready = app.getStorageReadySet(txn.storage_readiness)
getPartition = pt.getPartition getPartition = pt.getPartition
partition_set = set(map(getPartition, oid_list)) partition_set = set(map(getPartition, oid_list))
partition_set.update(map(getPartition, checked_list)) partition_set.update(deleted)
partition_list = list(partition_set)
partition_set.add(getPartition(ttid)) partition_set.add(getPartition(ttid))
partition_set.update(checked)
node_list = [] node_list = []
uuid_set = set() involved = set()
for partition in partition_set: for partition in partition_set:
for cell in pt.getCellList(partition): for cell in pt.getCellList(partition):
node = cell.getNode() node = cell.getNode()
if node.isIdentified(): if node.isIdentified():
uuid = node.getUUID() uuid = node.getUUID()
if uuid in uuid_set: if uuid in involved:
continue continue
if uuid in failed: if uuid in failed:
# This will commit a new PT with outdated cells before # This will commit a new PT with outdated cells before
...@@ -354,7 +313,7 @@ class TransactionManager(EventQueue): ...@@ -354,7 +313,7 @@ class TransactionManager(EventQueue):
# the verification phase. # the verification phase.
node.getConnection().close() node.getConnection().close()
elif uuid in ready: elif uuid in ready:
uuid_set.add(uuid) involved.add(uuid)
node_list.append(node) node_list.append(node)
# A node that was not ready at the beginning of the transaction # A node that was not ready at the beginning of the transaction
# can't have readable cells. And if we're still operational without # can't have readable cells. And if we're still operational without
...@@ -369,8 +328,8 @@ class TransactionManager(EventQueue): ...@@ -369,8 +328,8 @@ class TransactionManager(EventQueue):
tid = self._nextTID(ttid, pt.getPartitions()) tid = self._nextTID(ttid, pt.getPartitions())
self._queue.append(ttid) self._queue.append(ttid)
logging.debug('Finish TXN %s for %s (was %s)', logging.debug('Finish TXN %s for %s (was %s)',
dump(tid), txn.getNode(), dump(ttid)) dump(tid), txn.node, dump(ttid))
txn.prepare(tid, oid_list, uuid_set, msg_id) txn.prepare(tid, oid_list, partition_list, involved, msg_id)
# check if greater and foreign OID was stored # check if greater and foreign OID was stored
if oid_list: if oid_list:
self.setLastOID(max(oid_list)) self.setLastOID(max(oid_list))
...@@ -382,7 +341,7 @@ class TransactionManager(EventQueue): ...@@ -382,7 +341,7 @@ class TransactionManager(EventQueue):
""" """
logging.debug('Abort TXN %s for %s', dump(ttid), uuid_str(uuid)) logging.debug('Abort TXN %s for %s', dump(ttid), uuid_str(uuid))
txn = self[ttid] txn = self[ttid]
if txn.isPrepared(): if txn.tid:
raise ProtocolError("commit already requested for ttid %s" raise ProtocolError("commit already requested for ttid %s"
% dump(ttid)) % dump(ttid))
del self[ttid] del self[ttid]
...@@ -412,6 +371,16 @@ class TransactionManager(EventQueue): ...@@ -412,6 +371,16 @@ class TransactionManager(EventQueue):
if unlock: if unlock:
self._unlockPending() self._unlockPending()
def _firstUnlockPending(self):
"""Set first TID when the first transaction is committed
Masks _unlockPending on reset.
Unmasks and call it when called.
"""
self.setFirstTID(self._ttid_dict[self._queue[0]].tid)
del self._unlockPending
self._unlockPending()
def _unlockPending(self): def _unlockPending(self):
"""Serialize transaction unlocks """Serialize transaction unlocks
...@@ -424,7 +393,7 @@ class TransactionManager(EventQueue): ...@@ -424,7 +393,7 @@ class TransactionManager(EventQueue):
while queue: while queue:
ttid = queue[0] ttid = queue[0]
txn = self._ttid_dict[ttid] txn = self._ttid_dict[ttid]
if not txn.locked(): if txn.locking:
break break
del queue[0], self._ttid_dict[ttid] del queue[0], self._ttid_dict[ttid]
self._on_commit(txn) self._on_commit(txn)
...@@ -433,7 +402,7 @@ class TransactionManager(EventQueue): ...@@ -433,7 +402,7 @@ class TransactionManager(EventQueue):
def clientLost(self, node): def clientLost(self, node):
for txn in self._ttid_dict.values(): for txn in self._ttid_dict.values():
if txn.clientLost(node): if txn.clientLost(node):
tid = txn.getTTID() tid = txn.ttid
del self[tid] del self[tid]
yield tid, txn.getNotificationUUIDList() yield tid, txn.getNotificationUUIDList()
......
...@@ -139,11 +139,12 @@ class VerificationManager(BaseServiceHandler): ...@@ -139,11 +139,12 @@ class VerificationManager(BaseServiceHandler):
def notifyPackCompleted(self, conn, pack_id): def notifyPackCompleted(self, conn, pack_id):
self.app.nm.getByUUID(conn.getUUID()).completed_pack_id = pack_id self.app.nm.getByUUID(conn.getUUID()).completed_pack_id = pack_id
def answerLastIDs(self, conn, ltid, loid): def answerLastIDs(self, conn, ltid, loid, ftid):
self._uuid_set.remove(conn.getUUID()) self._uuid_set.remove(conn.getUUID())
tm = self.app.tm tm = self.app.tm
tm.setLastTID(ltid) tm.setLastTID(ltid)
tm.setLastOID(loid) tm.setLastOID(loid)
tm.setFirstTID(ftid)
def answerPackOrders(self, conn, pack_list): def answerPackOrders(self, conn, pack_list):
self._uuid_set.remove(conn.getUUID()) self._uuid_set.remove(conn.getUUID())
......
# -*- coding: utf-8 -*-
# #
# Copyright (C) 2006-2019 Nexedi SA # Copyright (C) 2006-2019 Nexedi SA
# #
...@@ -70,9 +71,9 @@ class TerminalNeoCTL(object): ...@@ -70,9 +71,9 @@ class TerminalNeoCTL(object):
return getattr(ClusterStates, value.upper()) return getattr(ClusterStates, value.upper())
def asTID(self, value): def asTID(self, value):
if '.' in value: if value.lower().startswith('tid:'):
return tidFromTime(float(value)) return p64(int(value[4:], 0))
return p64(int(value, 0)) return tidFromTime(float(value))
asNode = staticmethod(uuid_int) asNode = staticmethod(uuid_int)
...@@ -386,7 +387,8 @@ class Application(object): ...@@ -386,7 +387,8 @@ class Application(object):
def usage(self): def usage(self):
output_list = ('Available commands:', self._usage(action_dict), output_list = ('Available commands:', self._usage(action_dict),
"TID arguments can be either integers or timestamps as floats," "The syntax of « TID » arguments is either tid:<integer>"
" e.g. '257684787499560686', '0x3937af2eeeeeeee' or '1325421296.'" " (case insensitive) for a TID or <float> for a UNIX timestamp,"
" for 2012-01-01 12:34:56 UTC") " e.g. 'tid:257684787499560686', 'tid:0x3937af2eeeeeeee' or"
" '1325421296' for 2012-01-01 12:34:56 UTC.")
return '\n'.join(output_list) return '\n'.join(output_list)
...@@ -137,7 +137,7 @@ class NeoCTL(BaseApplication): ...@@ -137,7 +137,7 @@ class NeoCTL(BaseApplication):
response = self.__ask(Packets.AskLastIDs()) response = self.__ask(Packets.AskLastIDs())
if response[0] != Packets.AnswerLastIDs: if response[0] != Packets.AnswerLastIDs:
raise RuntimeError(response) raise RuntimeError(response)
return response[1:] return response[1:3]
def getLastTransaction(self): def getLastTransaction(self):
response = self.__ask(Packets.AskLastTransaction()) response = self.__ask(Packets.AskLastTransaction())
......
...@@ -601,6 +601,9 @@ class ImporterDatabaseManager(DatabaseManager): ...@@ -601,6 +601,9 @@ class ImporterDatabaseManager(DatabaseManager):
zodb = self.zodb[bisect(self.zodb_index, oid) - 1] zodb = self.zodb[bisect(self.zodb_index, oid) - 1]
return zodb, oid - zodb.shift_oid return zodb, oid - zodb.shift_oid
def getFirstTID(self):
return min(next(zodb.iterator()).tid for zodb in self.zodb)
def getLastIDs(self): def getLastIDs(self):
tid, oid = self.db.getLastIDs() tid, oid = self.db.getLastIDs()
return (max(tid, util.p64(self.zodb_ltid)), return (max(tid, util.p64(self.zodb_ltid)),
......
...@@ -758,6 +758,19 @@ class DatabaseManager(object): ...@@ -758,6 +758,19 @@ class DatabaseManager(object):
# XXX: Consider splitting getLastIDs/_getLastIDs because # XXX: Consider splitting getLastIDs/_getLastIDs because
# sometimes the last oid is not wanted. # sometimes the last oid is not wanted.
def _getFirstTID(self, partition):
"""Return tid of first transaction in given 'partition'
tids are in unpacked format.
"""
@requires(_getFirstTID)
def getFirstTID(self):
"""Return tid of first transaction
"""
x = self._readable_set
return util.p64(min(map(self._getFirstTID, x))) if x else MAX_TID
def _getLastTID(self, partition, max_tid=None): def _getLastTID(self, partition, max_tid=None):
"""Return tid of last transaction <= 'max_tid' in given 'partition' """Return tid of last transaction <= 'max_tid' in given 'partition'
......
...@@ -53,7 +53,7 @@ from .manager import MVCCDatabaseManager, splitOIDField ...@@ -53,7 +53,7 @@ from .manager import MVCCDatabaseManager, splitOIDField
from neo.lib import logging, util from neo.lib import logging, util
from neo.lib.exception import NonReadableCell, UndoPackError from neo.lib.exception import NonReadableCell, UndoPackError
from neo.lib.interfaces import implements from neo.lib.interfaces import implements
from neo.lib.protocol import CellStates, ZERO_OID, ZERO_TID, ZERO_HASH from neo.lib.protocol import CellStates, ZERO_OID, ZERO_TID, ZERO_HASH, MAX_TID
class MysqlError(DatabaseFailure): class MysqlError(DatabaseFailure):
...@@ -457,6 +457,12 @@ class MySQLDatabaseManager(MVCCDatabaseManager): ...@@ -457,6 +457,12 @@ class MySQLDatabaseManager(MVCCDatabaseManager):
def _getPartitionTable(self): def _getPartitionTable(self):
return self.query("SELECT * FROM pt") return self.query("SELECT * FROM pt")
def _getFirstTID(self, partition):
(tid,), = self.query(
"SELECT MIN(tid) as t FROM trans FORCE INDEX (PRIMARY)"
" WHERE `partition`=%s" % partition)
return util.u64(MAX_TID) if tid is None else tid
def _getLastTID(self, partition, max_tid=None): def _getLastTID(self, partition, max_tid=None):
sql = ("SELECT MAX(tid) as t FROM trans FORCE INDEX (PRIMARY)" sql = ("SELECT MAX(tid) as t FROM trans FORCE INDEX (PRIMARY)"
" WHERE `partition`=%s") % partition " WHERE `partition`=%s") % partition
......
...@@ -28,7 +28,7 @@ from .manager import DatabaseManager, splitOIDField ...@@ -28,7 +28,7 @@ from .manager import DatabaseManager, splitOIDField
from neo.lib import logging, util from neo.lib import logging, util
from neo.lib.exception import NonReadableCell, UndoPackError from neo.lib.exception import NonReadableCell, UndoPackError
from neo.lib.interfaces import implements from neo.lib.interfaces import implements
from neo.lib.protocol import CellStates, ZERO_OID, ZERO_TID, ZERO_HASH from neo.lib.protocol import CellStates, ZERO_OID, ZERO_TID, ZERO_HASH, MAX_TID
def unique_constraint_message(table, *columns): def unique_constraint_message(table, *columns):
c = sqlite3.connect(":memory:") c = sqlite3.connect(":memory:")
...@@ -343,6 +343,11 @@ class SQLiteDatabaseManager(DatabaseManager): ...@@ -343,6 +343,11 @@ class SQLiteDatabaseManager(DatabaseManager):
def _getPartitionTable(self): def _getPartitionTable(self):
return self.query("SELECT * FROM pt") return self.query("SELECT * FROM pt")
def _getFirstTID(self, partition):
tid = self.query("SELECT MIN(tid) FROM trans WHERE partition=?",
(partition,)).fetchone()[0]
return util.u64(MAX_TID) if tid is None else tid
def _getLastTID(self, partition, max_tid=None): def _getLastTID(self, partition, max_tid=None):
x = self.query x = self.query
if max_tid is None: if max_tid is None:
......
...@@ -55,7 +55,8 @@ class InitializationHandler(BaseMasterHandler): ...@@ -55,7 +55,8 @@ class InitializationHandler(BaseMasterHandler):
if packed: if packed:
self.app.completed_pack_id = pack_id = min(packed.itervalues()) self.app.completed_pack_id = pack_id = min(packed.itervalues())
conn.send(Packets.NotifyPackCompleted(pack_id)) conn.send(Packets.NotifyPackCompleted(pack_id))
conn.answer(Packets.AnswerLastIDs(*dm.getLastIDs())) last_tid, last_oid = dm.getLastIDs() # PY3
conn.answer(Packets.AnswerLastIDs(last_tid, last_oid, dm.getFirstTID()))
def askPartitionTable(self, conn): def askPartitionTable(self, conn):
pt = self.app.pt pt = self.app.pt
......
...@@ -13,7 +13,7 @@ AnswerFetchObjects(?p64,?p64,{:}) ...@@ -13,7 +13,7 @@ AnswerFetchObjects(?p64,?p64,{:})
AnswerFetchTransactions(?p64,[],?p64) AnswerFetchTransactions(?p64,[],?p64)
AnswerFinalTID(p64) AnswerFinalTID(p64)
AnswerInformationLocked(p64) AnswerInformationLocked(p64)
AnswerLastIDs(?p64,?p64) AnswerLastIDs(?p64,?p64,p64)
AnswerLastTransaction(p64) AnswerLastTransaction(p64)
AnswerLockedTransactions({p64:?p64}) AnswerLockedTransactions({p64:?p64})
AnswerMonitorInformation([?bin],[?bin],bin) AnswerMonitorInformation([?bin],[?bin],bin)
...@@ -46,7 +46,7 @@ AskClusterState() ...@@ -46,7 +46,7 @@ AskClusterState()
AskFetchObjects(int,int,p64,p64,p64,{p64:[p64]}) AskFetchObjects(int,int,p64,p64,p64,{p64:[p64]})
AskFetchTransactions(int,int,p64,p64,[p64],bool) AskFetchTransactions(int,int,p64,p64,[p64],bool)
AskFinalTID(p64) AskFinalTID(p64)
AskFinishTransaction(p64,[p64],[p64],?(?[p64],p64)) AskFinishTransaction(p64,[p64],[int],[int],?(?[p64],p64))
AskLastIDs() AskLastIDs()
AskLastTransaction() AskLastTransaction()
AskLockInformation(p64,p64,bool) AskLockInformation(p64,p64,bool)
...@@ -76,6 +76,7 @@ CheckReplicas({int:?int},p64,?) ...@@ -76,6 +76,7 @@ CheckReplicas({int:?int},p64,?)
Error(int,bin) Error(int,bin)
FailedVote(p64,[int]) FailedVote(p64,[int])
InvalidateObjects(p64,[p64]) InvalidateObjects(p64,[p64])
InvalidatePartitions(p64,[int])
NotPrimaryMaster(?int,[(bin,int)]) NotPrimaryMaster(?int,[(bin,int)])
NotifyClusterInformation(ClusterStates) NotifyClusterInformation(ClusterStates)
NotifyDeadlock(p64,p64) NotifyDeadlock(p64,p64)
......
...@@ -183,6 +183,7 @@ class StorageDBTests(NeoUnitTestBase): ...@@ -183,6 +183,7 @@ class StorageDBTests(NeoUnitTestBase):
txn1, objs1 = self.getTransaction([oid1]) txn1, objs1 = self.getTransaction([oid1])
txn2, objs2 = self.getTransaction([oid2]) txn2, objs2 = self.getTransaction([oid2])
# nothing in database # nothing in database
self.assertEqual(self.db.getFirstTID(), MAX_TID)
self.assertEqual(self.db.getLastIDs(), (None, None)) self.assertEqual(self.db.getLastIDs(), (None, None))
self.assertEqual(self.db.getUnfinishedTIDDict(), {}) self.assertEqual(self.db.getUnfinishedTIDDict(), {})
self.assertEqual(self.db.getObject(oid1), None) self.assertEqual(self.db.getObject(oid1), None)
...@@ -199,6 +200,7 @@ class StorageDBTests(NeoUnitTestBase): ...@@ -199,6 +200,7 @@ class StorageDBTests(NeoUnitTestBase):
([oid2], 'user', 'desc', 'ext', False, p64(2), None)) ([oid2], 'user', 'desc', 'ext', False, p64(2), None))
self.assertEqual(self.db.getTransaction(tid1, False), None) self.assertEqual(self.db.getTransaction(tid1, False), None)
self.assertEqual(self.db.getTransaction(tid2, False), None) self.assertEqual(self.db.getTransaction(tid2, False), None)
self.assertEqual(self.db.getFirstTID(), tid1)
self.assertEqual(self.db.getTransaction(tid1, True), self.assertEqual(self.db.getTransaction(tid1, True),
([oid1], 'user', 'desc', 'ext', False, p64(1), None)) ([oid1], 'user', 'desc', 'ext', False, p64(1), None))
self.assertEqual(self.db.getTransaction(tid2, True), self.assertEqual(self.db.getTransaction(tid2, True),
......
...@@ -200,7 +200,7 @@ class StressApplication(AdminApplication): ...@@ -200,7 +200,7 @@ class StressApplication(AdminApplication):
if conn: if conn:
conn.ask(Packets.AskLastIDs()) conn.ask(Packets.AskLastIDs())
def answerLastIDs(self, ltid, loid): def answerLastIDs(self, ltid, loid, ftid):
self.loid = loid self.loid = loid
self.ltid = ltid self.ltid = ltid
self.em.setTimeout(int(time.time() + 1), self.askLastIDs) self.em.setTimeout(int(time.time() + 1), self.askLastIDs)
......
...@@ -1027,8 +1027,12 @@ class NEOCluster(object): ...@@ -1027,8 +1027,12 @@ class NEOCluster(object):
if not cell.isReadable()] if not cell.isReadable()]
def getZODBStorage(self, **kw): def getZODBStorage(self, **kw):
kw['_app'] = kw.pop('client', self.client) try:
return Storage.Storage(None, self.name, **kw) app = kw.pop('client')
assert not kw, kw
except KeyError:
app = self._newClient(**kw) if kw else self.client
return Storage.Storage(None, self.name, _app=app)
def importZODB(self, dummy_zodb=None, random=random): def importZODB(self, dummy_zodb=None, random=random):
if dummy_zodb is None: if dummy_zodb is None:
......
...@@ -121,28 +121,6 @@ class Test(NEOThreadedTest): ...@@ -121,28 +121,6 @@ class Test(NEOThreadedTest):
self.assertFalse(cluster.storage.sqlCount('bigdata')) self.assertFalse(cluster.storage.sqlCount('bigdata'))
self.assertFalse(cluster.storage.sqlCount('data')) self.assertFalse(cluster.storage.sqlCount('data'))
@with_cluster()
def testDeleteObject(self, cluster):
if 1:
storage = cluster.getZODBStorage()
for clear_cache in 0, 1:
for tst in 'a.', 'bcd.':
oid = storage.new_oid()
serial = None
for data in tst:
txn = transaction.Transaction()
storage.tpc_begin(txn)
if data == '.':
storage.deleteObject(oid, serial, txn)
else:
storage.store(oid, serial, data, '', txn)
storage.tpc_vote(txn)
serial = storage.tpc_finish(txn)
if clear_cache:
storage._cache.clear()
self.assertRaises(POSException.POSKeyError,
storage.load, oid, '')
@with_cluster(storage_count=3, replicas=1, partitions=5) @with_cluster(storage_count=3, replicas=1, partitions=5)
def testIterOIDs(self, cluster): def testIterOIDs(self, cluster):
storage = cluster.getZODBStorage() storage = cluster.getZODBStorage()
......
...@@ -17,13 +17,13 @@ ...@@ -17,13 +17,13 @@
import random, sys, threading, time import random, sys, threading, time
import transaction import transaction
from ZODB.POSException import ReadOnlyError, POSKeyError from ZODB.POSException import (
POSKeyError, ReadOnlyError, StorageTransactionError)
import unittest import unittest
from collections import defaultdict from collections import defaultdict
from functools import wraps from functools import wraps
from itertools import product from itertools import product
from neo.lib import logging from neo.lib import logging
from neo.client.exception import NEOStorageError
from neo.master.handlers.backup import BackupHandler from neo.master.handlers.backup import BackupHandler
from neo.storage.checker import CHECK_COUNT from neo.storage.checker import CHECK_COUNT
from neo.storage.database.manager import DatabaseManager from neo.storage.database.manager import DatabaseManager
...@@ -220,7 +220,7 @@ class ReplicationTests(NEOThreadedTest): ...@@ -220,7 +220,7 @@ class ReplicationTests(NEOThreadedTest):
counts[0] += 1 counts[0] += 1
if counts[0] > 1: if counts[0] > 1:
node_list = orig.im_self.nm.getClientList(only_identified=True) node_list = orig.im_self.nm.getClientList(only_identified=True)
node_list.remove(txn.getNode()) node_list.remove(txn.node)
node_list[0].getConnection().close() node_list[0].getConnection().close()
return orig(txn) return orig(txn)
with NEOCluster(partitions=np, replicas=0, storage_count=1) as upstream: with NEOCluster(partitions=np, replicas=0, storage_count=1) as upstream:
...@@ -266,7 +266,7 @@ class ReplicationTests(NEOThreadedTest): ...@@ -266,7 +266,7 @@ class ReplicationTests(NEOThreadedTest):
def testBackupUpstreamStorageDead(self, backup): def testBackupUpstreamStorageDead(self, backup):
upstream = backup.upstream upstream = backup.upstream
with ConnectionFilter() as f: with ConnectionFilter() as f:
f.delayInvalidateObjects() f.delayInvalidatePartitions()
upstream.importZODB()(1) upstream.importZODB()(1)
count = [0] count = [0]
def _connect(orig, conn): def _connect(orig, conn):
...@@ -1112,7 +1112,9 @@ class ReplicationTests(NEOThreadedTest): ...@@ -1112,7 +1112,9 @@ class ReplicationTests(NEOThreadedTest):
B = backup B = backup
U = B.upstream U = B.upstream
Z = U.getZODBStorage() Z = U.getZODBStorage()
#Zb = B.getZODBStorage() # XXX see below about invalidations with B.newClient() as client, self.assertRaises(ReadOnlyError):
client.last_tid
#Zb = B.getZODBStorage(read_only=True) # XXX see below about invalidations
oid_list = [] oid_list = []
tid_list = [] tid_list = []
...@@ -1157,7 +1159,7 @@ class ReplicationTests(NEOThreadedTest): ...@@ -1157,7 +1159,7 @@ class ReplicationTests(NEOThreadedTest):
# read data from B and verify it is what it should be # read data from B and verify it is what it should be
# XXX we open new ZODB storage every time because invalidations # XXX we open new ZODB storage every time because invalidations
# are not yet implemented in read-only mode. # are not yet implemented in read-only mode.
Zb = B.getZODBStorage() Zb = B.getZODBStorage(read_only=True)
for j, oid in enumerate(oid_list): for j, oid in enumerate(oid_list):
if cutoff <= i < recover and j >= cutoff: if cutoff <= i < recover and j >= cutoff:
self.assertRaises(POSKeyError, Zb.load, oid, '') self.assertRaises(POSKeyError, Zb.load, oid, '')
...@@ -1170,7 +1172,6 @@ class ReplicationTests(NEOThreadedTest): ...@@ -1170,7 +1172,6 @@ class ReplicationTests(NEOThreadedTest):
# not-yet-fully fetched backup state (transactions committed at # not-yet-fully fetched backup state (transactions committed at
# [cutoff, recover) should not be there; otherwise transactions # [cutoff, recover) should not be there; otherwise transactions
# should be fully there) # should be fully there)
Zb = B.getZODBStorage()
Btxn_list = list(Zb.iterator()) Btxn_list = list(Zb.iterator())
self.assertEqual(len(Btxn_list), cutoff if cutoff <= i < recover self.assertEqual(len(Btxn_list), cutoff if cutoff <= i < recover
else i+1) else i+1)
...@@ -1185,15 +1186,12 @@ class ReplicationTests(NEOThreadedTest): ...@@ -1185,15 +1186,12 @@ class ReplicationTests(NEOThreadedTest):
# try to commit something to backup storage and make sure it is # try to commit something to backup storage and make sure it is
# really read-only # really read-only
Zb._cache.max_size = 0 # make store() do work in sync way
txn = transaction.Transaction() txn = transaction.Transaction()
self.assertRaises(ReadOnlyError, Zb.tpc_begin, txn) self.assertRaises(ReadOnlyError, Zb.tpc_begin, txn)
self.assertRaises(ReadOnlyError, Zb.new_oid) self.assertRaises(ReadOnlyError, Zb.new_oid)
self.assertRaises(ReadOnlyError, Zb.store, oid_list[-1], # tpc_vote first checks whether the transaction has begun -
tid_list[-1], 'somedata', '', txn)
# tpc_vote first checks whether there were store replies -
# thus not ReadOnlyError # thus not ReadOnlyError
self.assertRaises(NEOStorageError, Zb.tpc_vote, txn) self.assertRaises(StorageTransactionError, Zb.tpc_vote, txn)
if i == loop // 2: if i == loop // 2:
# Check that we survive a disconnection from upstream # Check that we survive a disconnection from upstream
...@@ -1203,8 +1201,6 @@ class ReplicationTests(NEOThreadedTest): ...@@ -1203,8 +1201,6 @@ class ReplicationTests(NEOThreadedTest):
conn.close() conn.close()
self.tic() self.tic()
# close storage because client app is otherwise shared in
# threaded tests and we need to refresh last_tid on next run
# (XXX see above about invalidations not working) # (XXX see above about invalidations not working)
Zb.close() Zb.close()
...@@ -1231,5 +1227,29 @@ class ReplicationTests(NEOThreadedTest): ...@@ -1231,5 +1227,29 @@ class ReplicationTests(NEOThreadedTest):
self.assertEqual(1, self.checkBackup(backup)) self.assertEqual(1, self.checkBackup(backup))
@backup_test(3)
def testDeleteObject(self, backup):
upstream = backup.upstream
storage = upstream.getZODBStorage()
for clear_cache in 0, 1:
for tst in 'a.', 'bcd.':
oid = storage.new_oid()
serial = None
for data in tst:
txn = transaction.Transaction()
storage.tpc_begin(txn)
if data == '.':
storage.deleteObject(oid, serial, txn)
else:
storage.store(oid, serial, data, '', txn)
storage.tpc_vote(txn)
serial = storage.tpc_finish(txn)
self.tic()
self.assertEqual(3, self.checkBackup(backup))
if clear_cache:
storage._cache.clear()
self.assertRaises(POSKeyError, storage.load, oid, '')
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment