Commit b3dd6973 authored by Julien Muchembled's avatar Julien Muchembled

Optimize resumption of replication by starting from a greater TID

Although data that are already transferred aren't transferred again, checking
that the data are there for a whole partition can still be a lot of work for
big databases. This commit is a major performance improvement in that a storage
node that gets disconnected for a short time now gets fully operational quite
instantaneously because it only has to replicate the new data. Before, the time
to recover depended on the size of the DB.

For OUT_OF_DATE cells, the difficult part was that they are writable and
can then contain holes, so we can't just take the last TID in trans/obj
(we wrongly did that at the beginning, and then committed
6b1f198f as a workaround). We solve that
by storing up to where it was up-to-date: this value is initialized from
the last TIDs in trans/obj when the state switches from UP_TO_DATE/FEEDING.

There's actually one such OUT_OF_DATE TID per assigned cell (backends store
these values in the 'pt' table). Otherwise, a cell that still has a lot to
replicate would still cause all other cells to resume from the a very small
TID, or even ZERO_TID; the worse case is when a new cell is assigned to a node
(as a result of tweak).

For UP_TO_DATE cells of a backup cluster, replication was resumed from the
maximum TID at which all assigned cells are known to be fully replicated.
Like for OUT_OF_DATE cells, the presence of a late cell could cause a lot of
extra work for others, the worst case being when setting up a backup cluster
(it always restarted from ZERO_TID as long as at least 1 cell was still empty).
Because UP_TO_DATE cells are guaranteed to have no holes, there's no need to
store extra information: we simply look at the last TIDs in trans/obj.
We even handle trans & obj independently, to minimize the work in 1 table
(i.e. trans since it's processed first) if the other is late (obj).

There's a small change in the protocol so that OUT_OF_DATE enum value equals 0.
This way, backends can store the OUT_OF_DATE TID (verbatim) in the same column
as the cell state.

Note about MySQL changes in commit ca58ccd7:
what we did as a workaround is not one any more. Now, we do so much on Python
side that it's unlikely we could reduce the number of queries using GROUP BY.
We even stopped doing that for SQLite.
parent fa9664ee
......@@ -122,22 +122,22 @@ def NodeStates():
@Enum
def CellStates():
# Normal state: cell is writable/readable, and it isn't planned to drop it.
UP_TO_DATE
# Write-only cell. Last transactions are missing because storage is/was down
# for a while, or because it is new for the partition. It usually becomes
# UP_TO_DATE when replication is done.
OUT_OF_DATE
# Normal state: cell is writable/readable, and it isn't planned to drop it.
UP_TO_DATE
# Same as UP_TO_DATE, except that it will be discarded as soon as another
# node finishes to replicate it. It means a partition is moved from 1 node
# to another. It is also discarded immediately if out-of-date.
FEEDING
# Not really a state: only used in network packets to tell storages to drop
# partitions.
DISCARDED
# A check revealed that data differs from other replicas. Cell is neither
# readable nor writable.
CORRUPTED
# Not really a state: only used in network packets to tell storages to drop
# partitions.
DISCARDED
# used for logging
node_state_prefix_dict = {
......
......@@ -348,7 +348,7 @@ class ImporterDatabaseManager(DatabaseManager):
def __init__(self, *args, **kw):
super(ImporterDatabaseManager, self).__init__(*args, **kw)
implements(self, """_getNextTID checkSerialRange checkTIDRange
deleteObject deleteTransaction dropPartitions getLastTID
deleteObject deleteTransaction dropPartitions _getLastTID
getReplicationObjectList _getTIDList nonempty""".split())
_getPartition = property(lambda self: self.db._getPartition)
......@@ -379,9 +379,9 @@ class ImporterDatabaseManager(DatabaseManager):
db = self.db = buildDatabaseManager(conf['adapter'],
(conf['database'], conf.get('engine'), conf['wait']))
for x in """getConfiguration _setConfiguration setNumPartitions
query erase getPartitionTable
getUnfinishedTIDDict dropUnfinishedData abortTransaction
storeTransaction lockTransaction
query erase getPartitionTable _iterAssignedCells
updateCellTID getUnfinishedTIDDict dropUnfinishedData
abortTransaction storeTransaction lockTransaction
loadData storeData getOrphanList _pruneData deferCommit
dropPartitionsTemporary
""".split():
......@@ -552,8 +552,8 @@ class ImporterDatabaseManager(DatabaseManager):
return zodb, oid - zodb.shift_oid
def getLastIDs(self):
tid, _, _, oid = self.db.getLastIDs()
return (max(tid, util.p64(self.zodb_ltid)), None, None,
tid, oid = self.db.getLastIDs()
return (max(tid, util.p64(self.zodb_ltid)),
max(oid, util.p64(self.zodb_loid)))
def getObject(self, oid, tid=None, before_tid=None):
......@@ -623,7 +623,7 @@ class ImporterDatabaseManager(DatabaseManager):
def _deleteRange(self, partition, min_tid=None, max_tid=None):
# Even if everything is imported, we can't truncate below
# because it would import again if we restart with this backend.
if u64(min_tid) < self.zodb_ltid:
if min_tid < self.zodb_ltid:
raise NotImplementedError
self.db._deleteRange(partition, min_tid, max_tid)
......
......@@ -21,9 +21,11 @@ from copy import copy
from functools import wraps
from neo.lib import logging, util
from neo.lib.interfaces import abstract, requires
from neo.lib.protocol import CellStates, NonReadableCell, ZERO_TID
from neo.lib.protocol import CellStates, NonReadableCell, MAX_TID, ZERO_TID
from . import DatabaseFailure
READABLE = CellStates.UP_TO_DATE, CellStates.FEEDING
def lazymethod(func):
def getter(self):
cls = self.__class__
......@@ -348,61 +350,47 @@ class DatabaseManager(object):
except TypeError:
return -1
@abstract
def getPartitionTable(self, *nid):
"""Return a whole partition table as a sequence of rows. Each row
is again a tuple of an offset (row ID), the NID of a storage
node, and a cell state."""
def _getAssignedPartitionList(self, *states):
nid = self.getUUID()
if nid is None:
return ()
if states:
return [nid for nid, state in self.getPartitionTable(nid)
if state in states]
return [x[0] for x in self.getPartitionTable(nid)]
# XXX: Consider splitting getLastIDs/_getLastIDs because
# sometimes the last oid is not wanted.
@abstract
def getLastTID(self, max_tid):
"""Return greatest tid in trans table that is <= given 'max_tid'
def _getLastTID(self, partition, max_tid=None):
"""Return tid of last transaction <= 'max_tid' in given 'partition'
Required only to import a DB using Importer backend.
max_tid must be in unpacked format.
tids are in unpacked format.
"""
Data from unassigned partitions must be ignored.
This is important because there may remain data from cells that have
been discarded, either due to --disable-drop-partitions option,
or in the future when dropping partitions is done in background
(because this is an expensive operation).
@requires(_getLastTID)
def getLastTID(self, max_tid=None):
"""Return tid of last transaction <= 'max_tid'
XXX: Given the TODO comment in getLastIDs, getting ids
from readable partitions should be enough.
tids are in unpacked format.
"""
if self.getNumPartitions():
return max(map(self._getLastTID, self._readable_set))
def _getLastIDs(self):
"""Return (trans, obj, max(oid)) where
both 'trans' and 'obj' are {partition: max(tid)}
def _getLastIDs(self, partition):
"""Return max(tid) & max(oid) for objects of given partition
Same as in getLastTID: data from unassigned partitions must be ignored.
Results are in unpacked format
"""
@requires(_getLastIDs)
def getLastIDs(self):
trans, obj, oid = self._getLastIDs()
if trans:
tid = max(trans.itervalues())
if obj:
tid = max(tid, max(obj.itervalues()))
else:
tid = max(obj.itervalues()) if obj else None
# TODO: Replication can't be resumed from the tids in 'trans' and 'obj'
# because outdated cells are writable and may contain recently
# committed data. We must save somewhere where replication was
# interrupted and return this information. For the moment, we
# tell the replicator to resume from the beginning.
trans = obj = {}
return tid, trans, obj, oid
"""Return max(tid) & max(oid) for readable data
It is important to ignore unassigned partitions because there may
remain data from cells that have been discarded, either due to
--disable-drop-partitions option, or in the future when dropping
partitions is done in background (as it is an expensive operation).
"""
x = self._readable_set
if x:
tid, oid = zip(*map(self._getLastIDs, x))
tid = max(self.getLastTID(None), max(tid))
oid = max(oid)
return (None if tid is None else util.p64(tid),
None if oid is None else util.p64(oid))
return None, None
def _getUnfinishedTIDDict(self):
""""""
......@@ -514,6 +502,22 @@ class DatabaseManager(object):
return (util.p64(serial), compression, checksum, data,
None if data_serial is None else util.p64(data_serial))
def _getPartitionTable(self):
"""Return a whole partition table as a sequence of rows. Each row
is again a tuple of an offset (row ID), the NID of a storage
node, and a cell state."""
@requires(_getPartitionTable)
def _iterAssignedCells(self):
my_nid = self.getUUID()
return ((offset, tid) for offset, nid, tid in self._getPartitionTable()
if my_nid == nid)
@requires(_getPartitionTable)
def getPartitionTable(self):
return [(offset, nid, max(0, -state))
for offset, nid, state in self._getPartitionTable()]
@contextmanager
def replicated(self, offset):
readable_set = self._readable_set
......@@ -557,16 +561,78 @@ class DatabaseManager(object):
d.append(p << 48 if i is None else i + 1)
else:
readable_set.clear()
readable_set.update(self._getAssignedPartitionList(
CellStates.UP_TO_DATE, CellStates.FEEDING))
readable_set.update(x[0] for x in self._iterAssignedCells()
if -x[1] in READABLE)
@requires(_changePartitionTable)
@requires(_changePartitionTable, _getLastIDs, _getLastTID)
def changePartitionTable(self, ptid, cell_list, reset=False):
my_nid = self.getUUID()
pt = dict(self._iterAssignedCells())
# In backup mode, the last transactions of a readable cell may be
# incomplete.
backup_tid = self.getBackupTID()
if backup_tid:
backup_tid = util.u64(backup_tid)
def outofdate_tid(offset):
tid = pt.get(offset, 0)
if tid >= 0:
return tid
return -tid in READABLE and (backup_tid or
max(self._getLastIDs(offset)[0],
self._getLastTID(offset))) or 0
cell_list = [(offset, nid, (
None if state == CellStates.DISCARDED else
-state if nid != my_nid or state != CellStates.OUT_OF_DATE else
outofdate_tid(offset)))
for offset, nid, state in cell_list]
self._changePartitionTable(cell_list, reset)
self._updateReadable()
assert isinstance(ptid, (int, long)), ptid
self._setConfiguration('ptid', str(ptid))
@requires(_changePartitionTable)
def updateCellTID(self, partition, tid):
t, = (t for p, t in self._iterAssignedCells() if p == partition)
if t < 0:
return
tid = util.u64(tid)
# Replicator doesn't optimize when there's no new data
# since the node went down.
if t == tid:
return
# In a backup cluster, when a storage node gets down just after
# being the first to replicate fully new transactions from upstream,
# we may end up in a special situation where an OUT_OF_DATE cell
# is actually more up-to-date than an UP_TO_DATE one.
assert t < tid or self.getBackupTID()
self._changePartitionTable([(partition, self.getUUID(), tid)])
def iterCellNextTIDs(self):
p64 = util.p64
backup_tid = self.getBackupTID()
if backup_tid:
next_tid = util.u64(backup_tid)
if next_tid:
next_tid += 1
for offset, tid in self._iterAssignedCells():
if tid >= 0: # OUT_OF_DATE
yield offset, p64(tid and tid + 1)
elif -tid in READABLE:
if backup_tid:
# An UP_TO_DATE cell does not have holes so it's fine to
# resume from the last found records.
tid = self._getLastTID(offset)
yield offset, (
# For trans, a transaction can't be partially
# replicated, so replication can resume from the next
# possible tid.
p64(max(next_tid, tid + 1) if tid else next_tid),
# For obj, the last transaction may be partially
# replicated so it must be checked again (i.e. no +1).
p64(max(next_tid, self._getLastIDs(offset)[0])))
else:
yield offset, None
@abstract
def dropPartitions(self, offset_list):
"""Delete all data for specified partitions"""
......@@ -786,9 +852,16 @@ class DatabaseManager(object):
def truncate(self):
tid = self.getTruncateTID()
if tid:
assert tid != ZERO_TID, tid
for partition in xrange(self.getNumPartitions()):
tid = util.u64(tid)
assert tid, tid
cell_list = []
my_nid = self.getUUID()
for partition, state in self._iterAssignedCells():
if state > tid:
cell_list.append((partition, my_nid, tid))
self._deleteRange(partition, tid)
if cell_list:
self._changePartitionTable(cell_list)
self._setTruncateTID(None)
self.commit()
......
......@@ -38,7 +38,7 @@ from . import LOG_QUERIES, DatabaseFailure
from .manager import DatabaseManager, splitOIDField
from neo.lib import logging, util
from neo.lib.interfaces import implements
from neo.lib.protocol import CellStates, ZERO_OID, ZERO_TID, ZERO_HASH
from neo.lib.protocol import ZERO_OID, ZERO_TID, ZERO_HASH
class MysqlError(DatabaseFailure):
......@@ -90,7 +90,7 @@ def auto_reconnect(wrapped):
class MySQLDatabaseManager(DatabaseManager):
"""This class manages a database on MySQL."""
VERSION = 2
VERSION = 3
ENGINES = "InnoDB", "RocksDB", "TokuDB"
_engine = ENGINES[0] # default engine
......@@ -257,6 +257,14 @@ class MySQLDatabaseManager(DatabaseManager):
def _migrate2(self, schema_dict):
self._alterTable(schema_dict, 'obj')
def _migrate3(self, schema_dict):
self._alterTable(schema_dict, 'pt', "rid as `partition`, nid,"
" CASE state"
" WHEN 0 THEN -1" # UP_TO_DATE
" WHEN 2 THEN -2" # FEEDING
" ELSE 1-state"
" END as tid")
def _setup(self, dedup=False):
self._config.clear()
q = self.query
......@@ -272,10 +280,10 @@ class MySQLDatabaseManager(DatabaseManager):
# The table "pt" stores a partition table.
schema_dict['pt'] = """CREATE TABLE %s (
rid INT UNSIGNED NOT NULL,
`partition` SMALLINT UNSIGNED NOT NULL,
nid INT NOT NULL,
state TINYINT UNSIGNED NOT NULL,
PRIMARY KEY (rid, nid)
tid BIGINT NOT NULL,
PRIMARY KEY (`partition`, nid)
) ENGINE=""" + engine
if self._use_partition:
......@@ -394,36 +402,23 @@ class MySQLDatabaseManager(DatabaseManager):
q("ALTER TABLE config MODIFY value VARBINARY(%s) NULL" % len(value))
q(sql)
def getPartitionTable(self, *nid):
if nid:
return self.query("SELECT rid, state FROM pt WHERE nid=%u" % nid)
def _getPartitionTable(self):
return self.query("SELECT * FROM pt")
def _sqlmax(self, sql, arg_list):
q = self.query
x = [x for x in arg_list for x, in q(sql % x) if x is not None]
if x: return max(x)
def getLastTID(self, max_tid):
return self._sqlmax(
"SELECT MAX(tid) as t FROM trans FORCE INDEX (PRIMARY)"
" WHERE tid<=%s and `partition`=%%s" % max_tid,
self._getAssignedPartitionList())
def _getLastTID(self, partition, max_tid=None):
x = "WHERE `partition`=%s" % partition
if max_tid:
x += " AND tid<=%s" % max_tid
(tid,), = self.query(
"SELECT MAX(tid) as t FROM trans FORCE INDEX (PRIMARY)" + x)
return tid
def _getLastIDs(self):
offset_list = self._getAssignedPartitionList()
p64 = util.p64
def _getLastIDs(self, partition):
q = self.query
sql = "SELECT MAX(tid) FROM %s WHERE `partition`=%s"
trans, obj = ({partition: p64(tid)
for partition in offset_list
for tid, in q(sql % (t, partition))
if tid is not None}
for t in ('trans FORCE INDEX (PRIMARY)', 'obj FORCE INDEX (tid)'))
oid = self._sqlmax(
"SELECT MAX(oid) FROM obj FORCE INDEX (PRIMARY)"
" WHERE `partition`=%s", offset_list)
return trans, obj, None if oid is None else p64(oid)
x = "WHERE `partition`=%s" % partition
(oid,), = q("SELECT MAX(oid) FROM obj FORCE INDEX (PRIMARY)" + x)
(tid,), = q("SELECT MAX(tid) FROM obj FORCE INDEX (tid)" + x)
return tid, oid
def _getDataLastId(self, partition):
return self.query("SELECT MAX(id) FROM data WHERE %s <= id AND id < %s"
......@@ -489,17 +484,17 @@ class MySQLDatabaseManager(DatabaseManager):
q = self.query
if reset:
q("DELETE FROM pt")
for offset, nid, state in cell_list:
for offset, nid, tid in cell_list:
# TODO: this logic should move out of database manager
# add 'dropCells(cell_list)' to API and use one query
if state == CellStates.DISCARDED:
q("DELETE FROM pt WHERE rid = %d AND nid = %d"
if tid is None:
q("DELETE FROM pt WHERE `partition` = %d AND nid = %d"
% (offset, nid))
else:
offset_list.append(offset)
q("INSERT INTO pt VALUES (%d, %d, %d)"
" ON DUPLICATE KEY UPDATE state = %d"
% (offset, nid, state, state))
" ON DUPLICATE KEY UPDATE tid = %d"
% (offset, nid, tid, tid))
if self._use_partition:
for offset in offset_list:
add = """ALTER TABLE %%s ADD PARTITION (
......@@ -750,10 +745,10 @@ class MySQLDatabaseManager(DatabaseManager):
def _deleteRange(self, partition, min_tid=None, max_tid=None):
sql = " WHERE `partition`=%d" % partition
if min_tid:
sql += " AND %d < tid" % util.u64(min_tid)
if max_tid:
sql += " AND tid <= %d" % util.u64(max_tid)
if min_tid is not None:
sql += " AND %d < tid" % min_tid
if max_tid is not None:
sql += " AND tid <= %d" % max_tid
q = self.query
q("DELETE FROM trans" + sql)
sql = " FROM obj" + sql
......
......@@ -25,7 +25,7 @@ from . import LOG_QUERIES
from .manager import DatabaseManager, splitOIDField
from neo.lib import logging, util
from neo.lib.interfaces import implements
from neo.lib.protocol import CellStates, ZERO_OID, ZERO_TID, ZERO_HASH
from neo.lib.protocol import ZERO_OID, ZERO_TID, ZERO_HASH
def unique_constraint_message(table, *columns):
c = sqlite3.connect(":memory:")
......@@ -68,7 +68,7 @@ class SQLiteDatabaseManager(DatabaseManager):
never be used for small requests.
"""
VERSION = 2
VERSION = 3
def _parse(self, database):
self.db = os.path.expanduser(database)
......@@ -135,6 +135,12 @@ class SQLiteDatabaseManager(DatabaseManager):
def _migrate2(self, schema_dict, index_dict):
self._alterTable(schema_dict, 'obj')
def _migrate3(self, schema_dict, index_dict):
self._alterTable(schema_dict, 'pt', "rid, nid, CASE state"
" WHEN 0 THEN -1" # UP_TO_DATE
" WHEN 2 THEN -2" # FEEDING
" ELSE 1-state END")
def _setup(self, dedup=False):
# BBB: SQLite has transactional DDL but before Python 3.6,
# the binding automatically commits between such statements.
......@@ -154,10 +160,10 @@ class SQLiteDatabaseManager(DatabaseManager):
# The table "pt" stores a partition table.
schema_dict['pt'] = """CREATE TABLE %s (
rid INTEGER NOT NULL,
partition INTEGER NOT NULL,
nid INTEGER NOT NULL,
state INTEGER NOT NULL,
PRIMARY KEY (rid, nid))
tid INTEGER NOT NULL,
PRIMARY KEY (partition, nid))
"""
# The table "trans" stores information on committed transactions.
......@@ -254,42 +260,23 @@ class SQLiteDatabaseManager(DatabaseManager):
else:
q("REPLACE INTO config VALUES (?,?)", (key, str(value)))
def getPartitionTable(self, *nid):
if nid:
return self.query("SELECT rid, state FROM pt WHERE nid=?", nid)
def _getPartitionTable(self):
return self.query("SELECT * FROM pt")
# A test with a table of 20 million lines and SQLite 3.8.7.1 shows that
# it's not worth changing getLastTID:
# - It already returns the result in less than 2 seconds, without reading
# the whole table (this is 4-7 times faster than MySQL).
# - Strangely, a "GROUP BY partition" clause makes SQLite almost twice
# slower.
# - Getting MAX(tid) is immediate with a "AND partition=?" condition so one
# way to speed up the following 2 methods is to repeat the queries for
# each partition (and finish in Python with max() for getLastTID).
def getLastTID(self, max_tid):
return self.query(
"SELECT MAX(tid) FROM pt, trans"
" WHERE nid=? AND rid=partition AND tid<=?",
(self.getUUID(), max_tid,)).next()[0]
def _getLastIDs(self):
p64 = util.p64
def _getLastTID(self, partition, max_tid=None):
x = self.query
if max_tid is None:
x = x("SELECT MAX(tid) FROM trans WHERE partition=?", (partition,))
else:
x = x("SELECT MAX(tid) FROM trans WHERE partition=? AND tid<=?",
(partition, max_tid))
return x.next()[0]
def _getLastIDs(self, *args):
q = self.query
args = self.getUUID(),
trans = {partition: p64(tid)
for partition, tid in q(
"SELECT partition, MAX(tid) FROM pt, trans"
" WHERE nid=? AND rid=partition GROUP BY partition", args)}
obj = {partition: p64(tid)
for partition, tid in q(
"SELECT partition, MAX(tid) FROM pt, obj"
" WHERE nid=? AND rid=partition GROUP BY partition", args)}
oid = q("SELECT MAX(oid) oid FROM pt, obj"
" WHERE nid=? AND rid=partition", args).next()[0]
return trans, obj, None if oid is None else p64(oid)
(oid,), = q("SELECT MAX(oid) FROM obj WHERE `partition`=?", args)
(tid,), = q("SELECT MAX(tid) FROM obj WHERE `partition`=?", args)
return tid, oid
def _getDataLastId(self, partition):
return self.query("SELECT MAX(id) FROM data WHERE %s <= id AND id < %s"
......@@ -357,8 +344,8 @@ class SQLiteDatabaseManager(DatabaseManager):
# whereas we try to replace only 1 value ?
# We don't want to remove the 'NOT NULL' constraint
# so we must simulate a "REPLACE OR FAIL".
q("DELETE FROM pt WHERE rid=? AND nid=?", (offset, nid))
if state != CellStates.DISCARDED:
q("DELETE FROM pt WHERE partition=? AND nid=?", (offset, nid))
if state is not None:
q("INSERT OR FAIL INTO pt VALUES (?,?,?)",
(offset, nid, int(state)))
......@@ -525,12 +512,12 @@ class SQLiteDatabaseManager(DatabaseManager):
def _deleteRange(self, partition, min_tid=None, max_tid=None):
sql = " WHERE partition=?"
args = [partition]
if min_tid:
if min_tid is not None:
sql += " AND ? < tid"
args.append(util.u64(min_tid))
if max_tid:
args.append(min_tid)
if max_tid is not None:
sql += " AND tid <= ?"
args.append(util.u64(max_tid))
args.append(max_tid)
q = self.query
q("DELETE FROM trans" + sql, args)
sql = " FROM obj" + sql
......
......@@ -63,7 +63,7 @@ class InitializationHandler(BaseMasterHandler):
def askLastIDs(self, conn):
dm = self.app.dm
dm.truncate()
ltid, _, _, loid = dm.getLastIDs()
ltid, loid = dm.getLastIDs()
conn.answer(Packets.AnswerLastIDs(loid, ltid))
def askPartitionTable(self, conn):
......@@ -81,14 +81,6 @@ class InitializationHandler(BaseMasterHandler):
dm.commit()
def startOperation(self, conn, backup):
self.app.operational = True
# XXX: see comment in protocol
dm = self.app.dm
if backup:
if dm.getBackupTID():
return
tid = dm.getLastIDs()[0] or ZERO_TID
else:
tid = None
dm._setBackupTID(tid)
dm.commit()
self.app.operational = True
self.app.replicator.startOperation(backup)
......@@ -26,10 +26,7 @@ class MasterOperationHandler(BaseMasterHandler):
def startOperation(self, conn, backup):
# XXX: see comment in protocol
assert self.app.operational and backup
dm = self.app.dm
if not dm.getBackupTID():
dm._setBackupTID(dm.getLastIDs()[0] or ZERO_TID)
dm.commit()
self.app.replicator.startOperation(backup)
def askLockInformation(self, conn, ttid, tid):
self.app.tm.lock(ttid, tid)
......
......@@ -97,15 +97,13 @@ class StorageOperationHandler(EventHandler):
for serial, oid_list in object_dict.iteritems():
for oid in oid_list:
deleteObject(oid, serial)
# XXX: It should be possible not to commit here if it was the last
# chunk, because we'll either commit again when updating
# 'backup_tid' or the partition table.
self.app.dm.commit()
assert not pack_tid, "TODO"
if next_tid:
self.app.dm.commit() # like in answerFetchTransactions
# TODO also provide feedback to master about current replication state (tid)
self.app.replicator.fetchObjects(next_tid, next_oid)
else:
# This will also commit.
self.app.replicator.finish()
@checkConnectionIsReplicatorConnection
......
......@@ -93,7 +93,7 @@ from neo.lib import logging
from neo.lib.protocol import CellStates, NodeTypes, NodeStates, \
Packets, INVALID_TID, ZERO_TID, ZERO_OID
from neo.lib.connection import ClientConnection, ConnectionClosed
from neo.lib.util import add64, dump
from neo.lib.util import add64, dump, p64
from .handlers.storage import StorageOperationHandler
FETCH_COUNT = 1000
......@@ -190,41 +190,50 @@ class Replicator(object):
return add64(tid, -1)
return ZERO_TID
def updateBackupTID(self):
def updateBackupTID(self, commit=False):
dm = self.app.dm
tid = dm.getBackupTID()
if tid:
new_tid = self.getBackupTID()
if tid != new_tid:
dm._setBackupTID(new_tid)
if commit:
dm.commit()
def startOperation(self, backup):
dm = self.app.dm
if backup:
if dm.getBackupTID():
assert not hasattr(self, 'partition_dict'), self.partition_dict
return
tid = dm.getLastIDs()[0] or ZERO_TID
else:
tid = None
dm._setBackupTID(tid)
dm.commit()
try:
partition_dict = self.partition_dict
except AttributeError:
return
for offset, next_tid in dm.iterCellNextTIDs():
if type(next_tid) is not bytes: # readable
p = partition_dict[offset]
p.next_trans, p.next_obj = next_tid
def populate(self):
app = self.app
pt = app.pt
uuid = app.uuid
self.partition_dict = {}
self.replicate_dict = {}
self.source_dict = {}
self.ttid_set = set()
last_tid, last_trans_dict, last_obj_dict, _ = app.dm.getLastIDs()
next_tid = app.dm.getBackupTID() or last_tid
next_tid = add64(next_tid, 1) if next_tid else ZERO_TID
outdated_list = []
for offset in xrange(pt.getPartitions()):
for cell in pt.getCellList(offset):
if cell.getUUID() == uuid and not cell.isCorrupted():
for offset, next_tid in self.app.dm.iterCellNextTIDs():
self.partition_dict[offset] = p = Partition()
if cell.isOutOfDate():
if type(next_tid) is bytes: # OUT_OF_DATE
outdated_list.append(offset)
try:
p.next_trans = add64(last_trans_dict[offset], 1)
except KeyError:
p.next_trans = ZERO_TID
p.next_obj = last_obj_dict.get(offset, ZERO_TID)
p.max_ttid = INVALID_TID
else:
p.next_trans = p.next_obj = next_tid
p.max_ttid = INVALID_TID
else: # readable
p.next_trans, p.next_obj = next_tid or (None, None)
p.max_ttid = None
if outdated_list:
self.app.tm.replicating(outdated_list)
......@@ -236,7 +245,6 @@ class Replicator(object):
discarded_list = []
readable_list = []
app = self.app
last_tid, last_trans_dict, last_obj_dict, _ = app.dm.getLastIDs()
for offset, uuid, state in cell_list:
if uuid == app.uuid:
if state in (CellStates.DISCARDED, CellStates.CORRUPTED):
......@@ -251,11 +259,9 @@ class Replicator(object):
elif state == CellStates.OUT_OF_DATE:
assert offset not in self.partition_dict
self.partition_dict[offset] = p = Partition()
try:
p.next_trans = add64(last_trans_dict[offset], 1)
except KeyError:
p.next_trans = ZERO_TID
p.next_obj = last_obj_dict.get(offset, ZERO_TID)
# New cell. 0 is also what should be stored by the backend.
# Nothing to optimize.
p.next_trans = p.next_obj = ZERO_TID
p.max_ttid = INVALID_TID
added_list.append(offset)
else:
......@@ -289,7 +295,7 @@ class Replicator(object):
next_tid = add64(tid, 1)
p.next_trans = p.next_obj = next_tid
if next_tid:
self.updateBackupTID()
self.updateBackupTID(True)
self._nextPartition()
def _nextPartitionSortKey(self, offset):
......@@ -406,11 +412,14 @@ class Replicator(object):
p = self.partition_dict[offset]
p.next_obj = add64(tid, 1)
self.updateBackupTID()
app = self.app
app.dm.updateCellTID(offset, tid)
app.dm.commit()
if p.max_ttid or offset in self.replicate_dict and \
offset not in self.source_dict:
logging.debug("unfinished transactions: %r", self.ttid_set)
else:
self.app.tm.replicated(offset, tid)
app.tm.replicated(offset, tid)
logging.debug("partition %u replicated up to %s from %r",
offset, dump(tid), self.current_node)
self.getCurrentConnection().setReconnectionNoDelay()
......
......@@ -254,7 +254,7 @@ class StorageDBTests(NeoUnitTestBase):
txn1, objs1 = self.getTransaction([oid1])
txn2, objs2 = self.getTransaction([oid2])
# nothing in database
self.assertEqual(self.db.getLastIDs(), (None, {}, {}, None))
self.assertEqual(self.db.getLastIDs(), (None, None))
self.assertEqual(self.db.getUnfinishedTIDDict(), {})
self.assertEqual(self.db.getObject(oid1), None)
self.assertEqual(self.db.getObject(oid2), None)
......@@ -320,13 +320,17 @@ class StorageDBTests(NeoUnitTestBase):
expected = [(t, oid_list[offset+i]) for t in tids for i in (0, np)]
self.assertEqual(self.db.getReplicationObjectList(ZERO_TID,
MAX_TID, len(expected) + 1, offset, ZERO_OID), expected)
self.db._deleteRange(0, MAX_TID)
self.db._deleteRange(0, max_tid=ZERO_TID)
def deleteRange(partition, min_tid=None, max_tid=None):
self.db._deleteRange(partition,
None if min_tid is None else u64(min_tid),
None if max_tid is None else u64(max_tid))
deleteRange(0, MAX_TID)
deleteRange(0, max_tid=ZERO_TID)
check(0, [], t1, t2, t3)
self.db._deleteRange(0); check(0, [])
self.db._deleteRange(1, t2); check(1, [t1], t1, t2)
self.db._deleteRange(2, max_tid=t2); check(2, [], t3)
self.db._deleteRange(3, t1, t2); check(3, [t3], t1, t3)
deleteRange(0); check(0, [])
deleteRange(1, t2); check(1, [t1], t1, t2)
deleteRange(2, max_tid=t2); check(2, [], t3)
deleteRange(3, t1, t2); check(3, [t3], t1, t3)
def test_getTransaction(self):
oid1, oid2 = self.getOIDs(2)
......
......@@ -718,8 +718,9 @@ class Test(NEOThreadedTest):
@with_cluster()
def testStorageUpgrade1(self, cluster):
if 1:
storage = cluster.storage
# Disable migration steps that aren't idempotent.
with Patch(storage.dm.__class__, _migrate3=lambda *_: None):
t, c = cluster.getTransaction()
storage.dm.setConfiguration("version", None)
c.root()._p_changed = 1
......@@ -1289,7 +1290,7 @@ class Test(NEOThreadedTest):
s1.resetNode()
with Patch(s1.dm, truncate=dieFirst(1)):
s1.start()
self.assertEqual(s0.dm.getLastIDs()[0], truncate_tid)
self.assertFalse(s0.dm.getLastIDs()[0])
self.assertEqual(s1.dm.getLastIDs()[0], r._p_serial)
self.tic()
self.assertEqual(calls, [1, 2])
......@@ -2351,7 +2352,7 @@ class Test(NEOThreadedTest):
oid, tid = big_id_list[i]
for j, expected in (
(1 - i, (dm.getLastTID(u64(MAX_TID)), dm.getLastIDs())),
(i, (u64(tid), (tid, {}, {}, oid)))):
(i, (u64(tid), (tid, oid)))):
oid, tid = big_id_list[j]
# Somehow we abuse 'storeTransaction' because we ask it to
# write data for unassigned partitions. This is not checked
......
......@@ -24,12 +24,12 @@ from neo.lib import logging
from neo.client.exception import NEOStorageError
from neo.master.handlers.backup import BackupHandler
from neo.storage.checker import CHECK_COUNT
from neo.storage.replicator import Replicator
from neo.storage import replicator
from neo.lib.connector import SocketConnector
from neo.lib.connection import ClientConnection
from neo.lib.protocol import CellStates, ClusterStates, Packets, \
ZERO_OID, ZERO_TID, MAX_TID, uuid_str
from neo.lib.util import p64, u64
from neo.lib.util import add64, p64, u64
from .. import expectedFailure, Patch, TransactionalResource
from . import ConnectionFilter, NEOCluster, NEOThreadedTest, \
predictable_random, with_cluster
......@@ -39,9 +39,9 @@ from .test import PCounter, PCounterWithResolution # XXX
def backup_test(partitions=1, upstream_kw={}, backup_kw={}):
def decorator(wrapped):
def wrapper(self):
with NEOCluster(partitions, **upstream_kw) as upstream:
with NEOCluster(partitions=partitions, **upstream_kw) as upstream:
upstream.start()
with NEOCluster(partitions, upstream=upstream,
with NEOCluster(partitions=partitions, upstream=upstream,
**backup_kw) as backup:
backup.start()
backup.neoctl.setClusterState(ClusterStates.STARTING_BACKUP)
......@@ -659,33 +659,158 @@ class ReplicationTests(NEOThreadedTest):
self.assertEqual(2, s0.sqlCount('obj'))
expectedFailure(self.assertEqual)(2, count)
@with_cluster(start_cluster=0, replicas=1)
@with_cluster(replicas=1)
def testResumingReplication(self, cluster):
if 1:
"""
Check from where replication resumes for an OUT_OF_DATE cell that has
a hole, which is possible because OUT_OF_DATE cells are writable.
"""
ask = []
def logReplication(conn, packet):
if isinstance(packet, (Packets.AskFetchTransactions,
Packets.AskFetchObjects)):
ask.append(packet.decode()[2:])
def getTIDList():
return [t.tid for t in c.db().storage.iterator()]
s0, s1 = cluster.storage_list
cluster.start(storage_list=(s0,))
t, c = cluster.getTransaction()
r = c.root()
# s1 is UP_TO_DATE and it has the initial transaction.
# Let's outdate it: replication will have to resume just after this
# transaction, regardless of future written transactions.
# To make sure, we get a hole in the cell, we block replication.
s1.stop()
cluster.join((s1,))
r._p_changed = 1
t.commit()
s1.resetNode()
with Patch(replicator.Replicator, connected=lambda *_: None):
s1.start()
self.tic()
with Patch(Replicator, connected=lambda *_: None):
cluster.enableStorageList((s1,))
cluster.neoctl.tweakPartitionTable()
r._p_changed = 1
t.commit()
self.tic()
s1.stop()
cluster.join((s1,))
t0, t1, t2 = c.db().storage.iterator()
tids = getTIDList()
s1.resetNode()
# Initialization done. Now we check that replication is correct
# and efficient.
with ConnectionFilter() as f:
f.add(logReplication)
s1.start()
self.tic()
self.assertEqual([], cluster.getOutdatedCells())
s0.stop()
cluster.join((s0,))
t0, t1, t2 = c.db().storage.iterator()
self.assertEqual(tids, getTIDList())
t0_next = add64(tids[0], 1)
self.assertEqual(ask, [
(t0_next, tids[2], tids[2:]),
(t0_next, tids[2], ZERO_OID, {tids[2]: [ZERO_OID]}),
])
@backup_test(2, backup_kw=dict(replicas=1))
def testResumingBackupReplication(self, backup):
upstream = backup.upstream
t, c = upstream.getTransaction()
r = c.root()
r[1] = PCounter()
t.commit()
r[2] = ob = PCounter()
tids = []
def newTransaction():
r._p_changed = ob._p_changed = 1
with upstream.moduloTID(0):
t.commit()
self.tic()
tids.append(r._p_serial)
def getTIDList(storage):
return storage.dm.getReplicationTIDList(tids[0], MAX_TID, 9, 0)
newTransaction()
self.assertEqual(u64(ob._p_oid), 2)
getBackupTid = backup.master.pt.getBackupTid
# Check when an OUT_OF_DATE cell has more data than an UP_TO_DATE one.
primary = backup.master.backup_app.primary_partition_dict[0]._uuid
slave, primary = sorted(backup.storage_list,
key=lambda x: x.uuid == primary)
with ConnectionFilter() as f:
@f.delayAnswerFetchTransactions
def delay(conn, x={None: 0, primary.uuid: 0}):
return x.pop(conn.getUUID(), 1)
newTransaction()
self.assertEqual(getBackupTid(), tids[1])
primary.stop()
backup.join((primary,))
primary.resetNode()
primary.start()
self.tic()
primary, slave = slave, primary
self.assertEqual(tids, getTIDList(slave))
self.assertEqual(tids[:1], getTIDList(primary))
self.assertEqual(getBackupTid(), add64(tids[1], -1))
self.assertEqual(f.filtered_count, 3)
self.tic()
self.assertEqual(4, self.checkBackup(backup))
self.assertEqual(getBackupTid(min), tids[1])
# Check that replication resumes from the maximum possible tid
# (for UP_TO_DATE cells of a backup cluster). More precisely:
# - cells are handled independently (done here by blocking replication
# of partition 1 to keep the backup TID low)
# - trans and obj are also handled independently (with FETCH_COUNT=1,
# we interrupt replication of obj in the middle of a transaction)
slave.stop()
backup.join((slave,))
ask = []
def delayReplicate(conn, packet):
if isinstance(packet, Packets.AskFetchObjects):
if len(ask) == 6:
return True
elif not isinstance(packet, Packets.AskFetchTransactions):
return
ask.append(packet.decode())
conn, = upstream.master.getConnectionList(backup.master)
with ConnectionFilter() as f, Patch(replicator.Replicator,
_nextPartitionSortKey=lambda orig, self, offset: offset):
f.add(delayReplicate)
delayReconnect = f.delayAskLastTransaction()
conn.close()
newTransaction()
newTransaction()
newTransaction()
self.assertFalse(ask)
self.assertEqual(f.filtered_count, 1)
with Patch(replicator, FETCH_COUNT=1):
f.remove(delayReconnect)
self.tic()
t1_next = add64(tids[1], 1)
self.assertEqual(ask, [
# trans
(0, 1, t1_next, tids[4], []),
(0, 1, tids[3], tids[4], []),
(0, 1, tids[4], tids[4], []),
# obj
(0, 1, t1_next, tids[4], ZERO_OID, {}),
(0, 1, tids[2], tids[4], p64(2), {}),
(0, 1, tids[3], tids[4], ZERO_OID, {}),
])
del ask[:]
max_ask = None
backup.stop()
newTransaction()
backup.start((primary,))
n = replicator.FETCH_COUNT
t4_next = add64(tids[4], 1)
self.assertEqual(ask, [
(0, n, t4_next, tids[5], []),
(0, n, tids[3], tids[5], ZERO_OID, {tids[3]: [ZERO_OID]}),
(1, n, t1_next, tids[5], []),
(1, n, t1_next, tids[5], ZERO_OID, {}),
])
self.tic()
self.assertEqual(2, self.checkBackup(backup))
@with_cluster(start_cluster=0, replicas=1, partitions=2)
def testReplicationBlockedByUnfinished1(self, cluster,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment