Commit 4a2407dc authored by Kirill Smelkov's avatar Kirill Smelkov

Sync with NEO/py v1.9

parents 5bf40022 1b57a7ae
Change History
==============
1.9 (2018-03-13)
----------------
A lot of performance improvements have been done on storage nodes for this
release, and some of them required changes in the storage format. In short,
the migration is done automatically, but you may want to read UPGRADE notes
for more details.
Performance:
- Speed up replication by sending bigger network packets,
and by not getting object next_serial for nothing.
- Speed up reads by indexing 'obj' primarily by 'oid' (instead of 'tid').
- Optimize storage layout of raw data for replication.
Other storage changes:
- Disable data deduplication by default. --dedup option added.
- importer: do not crash if a backup cluster tries to replicate.
- importer: preserve 'packed' flag.
Master:
- Fix possible failure when reading data in a backup cluster with replicas.
- Fix generation of TID.
- Fix resumption of backup replication (internal or not).
Client:
- New 'cache-size' Storage option.
- Cache hit/miss statistics.
- Fix accounting of cache size.
- Preserve 'packed' flag on iteration.
- At startup, or after nodes are back, full load balancing could be prevented
until some data are written.
Other:
- neolog: --from option now also tries to parse with `dateutil`_.
- neolog: add support for xz-compressed logs, using external xzcat commands.
.. _dateutil: https://dateutil.readthedocs.io/
1.8.1 (2017-11-07)
------------------
......
NEO 1.9
=======
The `obj` table in MySQL/SQLite backends is automatically upgraded at startup.
Note however that rewritting this table can take a lot of time if you have a
lot of object records.
This migration is reversible but you'd have to dig into the migration code
(``DatabaseManager._migrate2()``) and do manually the opposite SQL queries.
The change in the `data` table is such that no migration is done (like for NEO
1.4, it's optional and it would cost too much). For optimal performance, you
may prefer to set up new nodes (and drop the old ones once the replication
is finished).
Because there's no change in the protocol, it's actually possible to mix nodes
with versions 1.8.1 and 1.9, and avoid any downtime because of the migration.
NEO 1.6
=======
......
......@@ -136,9 +136,6 @@ class Storage(BaseStorage.BaseStorage,
def supportsUndo(self):
return True
def supportsTransactionalUndo(self):
return True
def loadEx(self, oid, version):
try:
data, serial, _ = self.app.load(oid)
......
......@@ -56,25 +56,28 @@ class ConnectionPool(object):
logging.info('%r not ready', node)
else:
logging.info('Connected %r', node)
# Make sure this node will be considered for the next reads
# even if there was a previous recent failure.
self.node_failure_dict.pop(node.getUUID(), None)
return conn
self.node_failure_dict[node.getUUID()] = time.time() + MAX_FAILURE_AGE
def getCellSortKey(self, cell, random=random.random):
# The use of 'random' suffles cells to randomise node to access.
uuid = cell.getUUID()
# First, prefer a connected node.
if uuid in self.connection_dict:
return random()
# Then one that didn't fail recently.
failure = self.node_failure_dict.get(uuid)
# Prefer a node that didn't fail recently.
failure = self.node_failure_dict.get(cell.getUUID())
if failure:
if time.time() < failure:
# At last, order by date of connection failure.
# Or order by date of connection failure.
return failure
# Do not use 'del' statement: we didn't lock, so another
# thread might have removed uuid from node_failure_dict.
self.node_failure_dict.pop(uuid, None)
return 1 + random()
self.node_failure_dict.pop(cell.getUUID(), None)
# A random one, connected or not, is a trivial and quite efficient way
# to distribute the load evenly. On write accesses, a client connects
# to all nodes of touched cells, but before that, or if a client is
# specialized to only do read-only accesses, it should not limit
# itself to only use the first connected nodes.
return random()
def getConnForNode(self, node):
"""Return a locked connection object to a given node
......
......@@ -206,12 +206,21 @@ class BackupApplication(object):
except IndexError:
last_max_tid = prev_tid
if offset in partition_set:
self.tid_list[offset].append(tid)
primary_list = []
node_list = []
for cell in pt.getCellList(offset, readable=True):
cell_list = pt.getCellList(offset, readable=True)
for cell in cell_list:
node = cell.getNode()
assert node.isConnected(), node
if cell.backup_tid == prev_tid:
if prev_tid == tid:
# Connecting to upstream: any node is that is
# up-to-date wrt upstream is candidate for being
# primary.
assert self.ignore_invalidations
if app.isStorageReady(node.getUUID()):
primary_list.append(node)
continue
# Let's given 4 TID t0,t1,t2,t3: if a cell is only
# modified by t0 & t3 and has all data for t0, 4 values
# are possible for its 'backup_tid' until it replicates
......@@ -231,12 +240,19 @@ class BackupApplication(object):
cell.backup_tid, last_max_tid, prev_tid, tid)
if app.isStorageReady(node.getUUID()):
node_list.append(node)
assert node_list
trigger_set.update(node_list)
# Make sure we have a primary storage for this partition.
if offset not in self.primary_partition_dict:
self.primary_partition_dict[offset] = \
random.choice(node_list)
random.choice(primary_list or node_list)
if node_list:
self.tid_list[offset].append(tid)
if primary_list:
# Resume replication to secondary cells.
self._triggerSecondary(
self.primary_partition_dict[offset],
offset, tid, cell_list)
else:
trigger_set.update(node_list)
else:
# Partition not touched, so increase 'backup_tid' of all
# "up-to-date" replicas, without having to replicate.
......@@ -335,15 +351,18 @@ class BackupApplication(object):
if app.getClusterState() == ClusterStates.BACKINGUP:
self.triggerBackup(node)
if primary:
# Notify secondary storages that they can replicate from
# primary ones, even if they are already replicating.
p = Packets.Replicate(tid, '', {offset: node.getAddress()})
for cell in cell_list:
if max(cell.backup_tid, cell.replicating) < tid:
cell.replicating = tid
logging.debug(
"ask %s to replicate partition %u up to %s from %s",
uuid_str(cell.getUUID()), offset,
dump(tid), uuid_str(node.getUUID()))
cell.getNode().send(p)
self._triggerSecondary(node, offset, tid, cell_list)
return result
def _triggerSecondary(self, node, offset, tid, cell_list):
# Notify secondary storages that they can replicate from
# primary ones, even if they are already replicating.
p = Packets.Replicate(tid, '', {offset: node.getAddress()})
for cell in cell_list:
if max(cell.backup_tid, cell.replicating) < tid:
cell.replicating = tid
logging.debug(
"ask %s to replicate partition %u up to %s from %s",
uuid_str(cell.getUUID()), offset,
dump(tid), uuid_str(node.getUUID()))
cell.getNode().send(p)
......@@ -35,7 +35,7 @@ class BackupHandler(EventHandler):
def answerLastTransaction(self, conn, tid):
app = self.app
prev_tid = app.app.getLastTransaction()
if prev_tid < tid:
if prev_tid <= tid:
# Since we don't know which partitions were modified during our
# absence, we must force replication on all storages. As long as
# they haven't done this first check, our backup tid will remain
......@@ -43,8 +43,12 @@ class BackupHandler(EventHandler):
# >= app.app.getLastTransaction()
# < tid
# but passing 'tid' is good enough.
# A special case is when prev_tid == tid: even in this case, we
# must restore the state of the backup app so that any interrupted
# replication (internal or not) is resumed, otherwise the global
# backup_tid could remain stuck to an old tid if upstream is idle.
app.invalidatePartitions(tid, tid, xrange(app.pt.getPartitions()))
elif prev_tid != tid:
else:
raise RuntimeError("upstream DB truncated")
app.ignore_invalidations = False
......
......@@ -147,5 +147,5 @@ class ClientReadOnlyServiceHandler(ClientServiceHandler):
# like in MasterHandler but returns backup_tid instead of last_tid
def askLastTransaction(self, conn):
assert self.app.backup_tid is not None # we are in BACKUPING mode
backup_tid = self.app.pt.getBackupTid()
backup_tid = self.app.pt.getBackupTid(min)
conn.answer(Packets.AnswerLastTransaction(backup_tid))
......@@ -234,29 +234,16 @@ class TransactionManager(EventQueue):
tid = tidFromTime(time())
min_tid = self._last_tid
if tid <= min_tid:
tid = addTID(min_tid, 1)
# We know we won't have room to adjust by decreasing.
try_decrease = False
else:
try_decrease = True
tid = addTID(min_tid, 1)
if ttid is not None:
assert isinstance(ttid, basestring), repr(ttid)
assert isinstance(divisor, (int, long)), repr(divisor)
ref_remainder = u64(ttid) % divisor
remainder = u64(tid) % divisor
if ref_remainder != remainder:
if try_decrease:
new_tid = addTID(tid, ref_remainder - divisor - remainder)
assert u64(new_tid) % divisor == ref_remainder, (dump(new_tid),
ref_remainder)
if new_tid <= min_tid:
new_tid = addTID(new_tid, divisor)
else:
if ref_remainder > remainder:
ref_remainder += divisor
new_tid = addTID(tid, ref_remainder - remainder)
assert min_tid < new_tid, (dump(min_tid), dump(tid), dump(new_tid))
tid = new_tid
remainder = u64(ttid) % divisor
delta_remainder = remainder - u64(tid) % divisor
if delta_remainder:
tid = addTID(tid, delta_remainder)
if tid <= min_tid:
tid = addTID(tid, divisor)
assert u64(tid) % divisor == remainder, (dump(tid), remainder)
assert min_tid < tid, (dump(min_tid), dump(tid))
self._last_tid = tid
return self._last_tid
......
......@@ -406,7 +406,7 @@ class ImporterDatabaseManager(DatabaseManager):
if compression:
data = compressed_data
checksum = util.makeChecksum(data)
data_id = self.holdData(util.makeChecksum(data), data,
data_id = self.holdData(util.makeChecksum(data), oid, data,
compression)
data_id_list.append(data_id)
object_list.append((oid, data_id, data_tid))
......
......@@ -161,11 +161,14 @@ class DatabaseManager(object):
"The database can not be upgraded because you have unfinished"
" transactions. Use an older version of NEO to verify them.")
def _getVersion(self):
def migrate(self, *args, **kw):
version = int(self.getConfiguration("version") or 0)
if self.VERSION < version:
raise DatabaseFailure("The database can not be downgraded.")
return version
while version < self.VERSION:
version += 1
getattr(self, '_migrate%s' % version)(*args, **kw)
self.setConfiguration("version", version)
def doOperation(self, app):
pass
......@@ -485,7 +488,11 @@ class DatabaseManager(object):
existing data is first thrown away.
"""
@requires(_changePartitionTable)
def _getDataLastId(self, partition):
"""
"""
@requires(_changePartitionTable, _getDataLastId)
def changePartitionTable(self, ptid, cell_list, reset=False):
readable_set = self._readable_set
if reset:
......@@ -500,6 +507,10 @@ class DatabaseManager(object):
raise NonReadableCell
self._getPartition = _getPartition
self._getReadablePartition = _getReadablePartition
d = self._data_last_ids = []
for p in xrange(np):
i = self._getDataLastId(p)
d.append(p << 48 if i is None else i + 1)
me = self.getUUID()
for offset, nid, state in cell_list:
if nid == me:
......@@ -567,7 +578,7 @@ class DatabaseManager(object):
"""
@abstract
def storeData(self, checksum, data, compression):
def storeData(self, checksum, oid, data, compression):
"""To be overridden by the backend to store object raw data
If same data was already stored, the storage only has to check there's
......
......@@ -15,6 +15,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from binascii import a2b_hex
from collections import OrderedDict
import MySQLdb
from MySQLdb import DataError, IntegrityError, \
OperationalError, ProgrammingError
......@@ -49,7 +50,7 @@ def getPrintableQuery(query, max=70):
class MySQLDatabaseManager(DatabaseManager):
"""This class manages a database on MySQL."""
VERSION = 1
VERSION = 2
ENGINES = "InnoDB", "RocksDB", "TokuDB"
_engine = ENGINES[0] # default engine
......@@ -176,41 +177,48 @@ class MySQLDatabaseManager(DatabaseManager):
if e.args[0] != NO_SUCH_TABLE:
raise
def _migrate1(self, _):
self._checkNoUnfinishedTransactions()
self.query("DROP TABLE IF EXISTS ttrans")
def _migrate2(self, schema_dict):
q = self.query
if self.nonempty('obj') is None:
if self.nonempty('new_obj') is None:
return
else:
q("DROP TABLE IF EXISTS new_obj")
q(schema_dict.pop('obj') % 'new_obj' + " SELECT * FROM obj")
q("DROP TABLE obj")
q("ALTER TABLE new_obj RENAME TO obj")
def _setup(self, dedup=False):
self._config.clear()
q = self.query
p = engine = self._engine
schema_dict = OrderedDict()
if self.nonempty("config") is None:
# The table "config" stores configuration
# parameters which affect the persistent data.
q("""CREATE TABLE config (
name VARBINARY(255) NOT NULL PRIMARY KEY,
value VARBINARY(255) NULL
) ENGINE=""" + engine)
else:
# Automatic migration.
version = self._getVersion()
if version < 1:
self._checkNoUnfinishedTransactions()
q("DROP TABLE IF EXISTS ttrans")
self._setConfiguration("version", self.VERSION)
# The table "config" stores configuration
# parameters which affect the persistent data.
schema_dict['config'] = """CREATE TABLE %s (
name VARBINARY(255) NOT NULL PRIMARY KEY,
value VARBINARY(255) NULL
) ENGINE=""" + engine
# The table "pt" stores a partition table.
q("""CREATE TABLE IF NOT EXISTS pt (
schema_dict['pt'] = """CREATE TABLE %s (
rid INT UNSIGNED NOT NULL,
nid INT NOT NULL,
state TINYINT UNSIGNED NOT NULL,
PRIMARY KEY (rid, nid)
) ENGINE=""" + engine)
) ENGINE=""" + engine
if self._use_partition:
p += """ PARTITION BY LIST (`partition`) (
PARTITION dummy VALUES IN (NULL))"""
# The table "trans" stores information on committed transactions.
q("""CREATE TABLE IF NOT EXISTS trans (
schema_dict['trans'] = """CREATE TABLE %s (
`partition` SMALLINT UNSIGNED NOT NULL,
tid BIGINT UNSIGNED NOT NULL,
packed BOOLEAN NOT NULL,
......@@ -220,19 +228,19 @@ class MySQLDatabaseManager(DatabaseManager):
ext BLOB NOT NULL,
ttid BIGINT UNSIGNED NOT NULL,
PRIMARY KEY (`partition`, tid)
) ENGINE=""" + p)
) ENGINE=""" + p
# The table "obj" stores committed object metadata.
q("""CREATE TABLE IF NOT EXISTS obj (
schema_dict['obj'] = """CREATE TABLE %s (
`partition` SMALLINT UNSIGNED NOT NULL,
oid BIGINT UNSIGNED NOT NULL,
tid BIGINT UNSIGNED NOT NULL,
data_id BIGINT UNSIGNED NULL,
value_tid BIGINT UNSIGNED NULL,
PRIMARY KEY (`partition`, tid, oid),
KEY (`partition`, oid, tid),
PRIMARY KEY (`partition`, oid, tid),
KEY tid (`partition`, tid, oid),
KEY (data_id)
) ENGINE=""" + p)
) ENGINE=""" + p
if engine == "TokuDB":
engine += " compression='tokudb_uncompressed'"
......@@ -240,21 +248,21 @@ class MySQLDatabaseManager(DatabaseManager):
# The table "data" stores object data.
# We'd like to have partial index on 'hash' column (e.g. hash(4))
# but 'UNIQUE' constraint would not work as expected.
q("""CREATE TABLE IF NOT EXISTS data (
id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY,
schema_dict['data'] = """CREATE TABLE %%s (
id BIGINT UNSIGNED NOT NULL PRIMARY KEY,
hash BINARY(20) NOT NULL,
compression TINYINT UNSIGNED NULL,
value MEDIUMBLOB NOT NULL%s
) ENGINE=%s""" % (""",
UNIQUE (hash, compression)""" if dedup else "", engine))
UNIQUE (hash, compression)""" if dedup else "", engine)
q("""CREATE TABLE IF NOT EXISTS bigdata (
schema_dict['bigdata'] = """CREATE TABLE %s (
id INT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY,
value MEDIUMBLOB NOT NULL
) ENGINE=""" + engine)
) ENGINE=""" + engine
# The table "ttrans" stores information on uncommitted transactions.
q("""CREATE TABLE IF NOT EXISTS ttrans (
schema_dict['ttrans'] = """CREATE TABLE %s (
`partition` SMALLINT UNSIGNED NOT NULL,
tid BIGINT UNSIGNED,
packed BOOLEAN NOT NULL,
......@@ -263,17 +271,26 @@ class MySQLDatabaseManager(DatabaseManager):
description BLOB NOT NULL,
ext BLOB NOT NULL,
ttid BIGINT UNSIGNED NOT NULL
) ENGINE=""" + engine)
) ENGINE=""" + engine
# The table "tobj" stores uncommitted object metadata.
q("""CREATE TABLE IF NOT EXISTS tobj (
schema_dict['tobj'] = """CREATE TABLE %s (
`partition` SMALLINT UNSIGNED NOT NULL,
oid BIGINT UNSIGNED NOT NULL,
tid BIGINT UNSIGNED NOT NULL,
data_id BIGINT UNSIGNED NULL,
value_tid BIGINT UNSIGNED NULL,
PRIMARY KEY (tid, oid)
) ENGINE=""" + engine)
) ENGINE=""" + engine
if self.nonempty('config') is None:
q(schema_dict.pop('config') % 'config')
self._setConfiguration('version', self.VERSION)
else:
self.migrate(schema_dict)
for table, schema in schema_dict.iteritems():
q(schema % ('IF NOT EXISTS ' + table))
self._uncommitted_data.update(q("SELECT data_id, count(*)"
" FROM tobj WHERE data_id IS NOT NULL GROUP BY data_id"))
......@@ -335,18 +352,21 @@ class MySQLDatabaseManager(DatabaseManager):
offset_list = self._getAssignedPartitionList()
p64 = util.p64
q = self.query
sql = ("SELECT MAX(tid) FROM %s FORCE INDEX (PRIMARY)"
" WHERE `partition`=%s")
sql = "SELECT MAX(tid) FROM %s WHERE `partition`=%s"
trans, obj = ({partition: p64(tid)
for partition in offset_list
for tid, in q(sql % (t, partition))
if tid is not None}
for t in ('trans', 'obj'))
for t in ('trans FORCE INDEX (PRIMARY)', 'obj FORCE INDEX (tid)'))
oid = self._sqlmax(
"SELECT MAX(oid) FROM obj FORCE INDEX (`partition`)"
"SELECT MAX(oid) FROM obj FORCE INDEX (PRIMARY)"
" WHERE `partition`=%s", offset_list)
return trans, obj, None if oid is None else p64(oid)
def _getDataLastId(self, partition):
return self.query("SELECT MAX(id) FROM data WHERE %s <= id AND id < %s"
% (partition << 48, (partition + 1) << 48))[0][0]
def _getUnfinishedTIDDict(self):
q = self.query
return q("SELECT ttid, tid FROM ttrans"), (ttid
......@@ -363,7 +383,7 @@ class MySQLDatabaseManager(DatabaseManager):
def getLastObjectTID(self, oid):
oid = util.u64(oid)
r = self.query("SELECT tid FROM obj FORCE INDEX(`partition`)"
r = self.query("SELECT tid FROM obj FORCE INDEX(PRIMARY)"
" WHERE `partition`=%d AND oid=%d"
" ORDER BY tid DESC LIMIT 1"
% (self._getReadablePartition(oid), oid))
......@@ -371,7 +391,7 @@ class MySQLDatabaseManager(DatabaseManager):
def _getNextTID(self, *args): # partition, oid, tid
r = self.query("SELECT tid FROM obj"
" FORCE INDEX(`partition`)"
" FORCE INDEX(PRIMARY)"
" WHERE `partition`=%d AND oid=%d AND tid>%d"
" ORDER BY tid LIMIT 1" % args)
return r[0][0] if r else None
......@@ -380,7 +400,7 @@ class MySQLDatabaseManager(DatabaseManager):
q = self.query
partition = self._getReadablePartition(oid)
sql = ('SELECT tid, compression, data.hash, value, value_tid'
' FROM obj FORCE INDEX(`partition`)'
' FROM obj FORCE INDEX(PRIMARY)'
' LEFT JOIN data ON (obj.data_id = data.id)'
' WHERE `partition` = %d AND oid = %d') % (partition, oid)
if before_tid is not None:
......@@ -437,9 +457,8 @@ class MySQLDatabaseManager(DatabaseManager):
for partition in offset_list:
where = " WHERE `partition`=%d" % partition
data_id_list = [x for x, in
q("SELECT DISTINCT data_id FROM obj FORCE INDEX(PRIMARY)"
+ where)
if x]
q("SELECT DISTINCT data_id FROM obj FORCE INDEX(tid)"
"%s AND data_id IS NOT NULL" % where)]
if not self._use_partition:
q("DELETE FROM obj" + where)
q("DELETE FROM trans" + where)
......@@ -455,7 +474,8 @@ class MySQLDatabaseManager(DatabaseManager):
raise
def _getUnfinishedDataIdList(self):
return [x for x, in self.query("SELECT data_id FROM tobj") if x]
return [x for x, in self.query(
"SELECT data_id FROM tobj WHERE data_id IS NOT NULL")]
def dropPartitionsTemporary(self, offset_list=None):
where = "" if offset_list is None else \
......@@ -491,7 +511,9 @@ class MySQLDatabaseManager(DatabaseManager):
else:
value_serial = 'NULL'
value = "(%s,%s,%s,%s,%s)," % (
partition, oid, tid, data_id or 'NULL', value_serial)
partition, oid, tid,
'NULL' if data_id is None else data_id,
value_serial)
values_size += len(value)
# actually: max_values < values_size + EXTRA - len(final comma)
# (test_max_allowed_packet checks that EXTRA == 2)
......@@ -550,7 +572,7 @@ class MySQLDatabaseManager(DatabaseManager):
for i in xrange(bigdata_id,
bigdata_id + (length + 0x7fffff >> 23)))
def storeData(self, checksum, data, compression, _pack=_structLL.pack):
def storeData(self, checksum, oid, data, compression, _pack=_structLL.pack):
e = self.escape
checksum = e(checksum)
if 0x1000000 <= len(data): # 16M (MEDIUMBLOB limit)
......@@ -577,9 +599,11 @@ class MySQLDatabaseManager(DatabaseManager):
i = bigdata_id = self.conn.insert_id()
i += 1
data = _pack(bigdata_id, length)
p = self._getPartition(util.u64(oid))
r = self._data_last_ids[p]
try:
self.query("INSERT INTO data VALUES (NULL, '%s', %d, '%s')" %
(checksum, compression, e(data)))
self.query("INSERT INTO data VALUES (%s, '%s', %d, '%s')" %
(r, checksum, compression, e(data)))
except IntegrityError as e:
if e.args[0] == DUP_ENTRY:
(r, d), = self.query("SELECT id, value FROM data"
......@@ -588,7 +612,8 @@ class MySQLDatabaseManager(DatabaseManager):
if d == data:
return r
raise
return self.conn.insert_id()
self._data_last_ids[p] = r + 1
return r
def loadData(self, data_id):
compression, hash, value = self.query(
......@@ -602,7 +627,7 @@ class MySQLDatabaseManager(DatabaseManager):
del _structLL
def _getDataTID(self, oid, tid=None, before_tid=None):
sql = ('SELECT tid, value_tid FROM obj FORCE INDEX(`partition`)'
sql = ('SELECT tid, value_tid FROM obj FORCE INDEX(PRIMARY)'
' WHERE `partition` = %d AND oid = %d'
) % (self._getReadablePartition(oid), oid)
if tid is not None:
......@@ -627,7 +652,8 @@ class MySQLDatabaseManager(DatabaseManager):
u64 = util.u64
tid = u64(tid)
sql = " FROM tobj WHERE tid=%d" % u64(ttid)
data_id_list = [x for x, in q("SELECT data_id" + sql) if x]
data_id_list = [x for x, in q("SELECT data_id%s AND data_id IS NOT NULL"
% sql)]
q("INSERT INTO obj SELECT `partition`, oid, %d, data_id, value_tid %s"
% (tid, sql))
q("DELETE" + sql)
......@@ -654,7 +680,8 @@ class MySQLDatabaseManager(DatabaseManager):
if serial:
sql += ' AND tid=%d' % u64(serial)
q = self.query
data_id_list = [x for x, in q("SELECT DISTINCT data_id" + sql) if x]
data_id_list = [x for x, in q(
"SELECT DISTINCT data_id%s AND data_id IS NOT NULL" % sql)]
q("DELETE" + sql)
self._pruneData(data_id_list)
......@@ -667,7 +694,8 @@ class MySQLDatabaseManager(DatabaseManager):
q = self.query
q("DELETE FROM trans" + sql)
sql = " FROM obj" + sql
data_id_list = [x for x, in q("SELECT DISTINCT data_id" + sql) if x]
data_id_list = [x for x, in q(
"SELECT DISTINCT data_id%s AND data_id IS NOT NULL" % sql)]
q("DELETE" + sql)
self._pruneData(data_id_list)
......@@ -693,7 +721,7 @@ class MySQLDatabaseManager(DatabaseManager):
p64 = util.p64
r = self.query("SELECT tid, IF(compression < 128, LENGTH(value),"
" CAST(CONV(HEX(SUBSTR(value, 5, 4)), 16, 10) AS INT))"
" FROM obj FORCE INDEX(`partition`)"
" FROM obj FORCE INDEX(PRIMARY)"
" LEFT JOIN data ON (obj.data_id = data.id)"
" WHERE `partition` = %d AND oid = %d AND tid >= %d"
" ORDER BY tid DESC LIMIT %d, %d" %
......@@ -722,7 +750,7 @@ class MySQLDatabaseManager(DatabaseManager):
u64 = util.u64
p64 = util.p64
min_tid = u64(min_tid)
r = self.query('SELECT tid, oid FROM obj FORCE INDEX(PRIMARY)'
r = self.query('SELECT tid, oid FROM obj FORCE INDEX(tid)'
' WHERE `partition` = %d AND tid <= %d'
' AND (tid = %d AND %d <= oid OR %d < tid)'
' ORDER BY tid ASC, oid ASC LIMIT %d' % (
......@@ -787,7 +815,7 @@ class MySQLDatabaseManager(DatabaseManager):
q = self.query
self._setPackTID(tid)
for count, oid, max_serial in q("SELECT COUNT(*) - 1, oid, MAX(tid)"
" FROM obj FORCE INDEX(`partition`)"
" FROM obj FORCE INDEX(PRIMARY)"
" WHERE tid <= %d GROUP BY oid"
% tid):
partition = getPartition(oid)
......@@ -838,7 +866,7 @@ class MySQLDatabaseManager(DatabaseManager):
# last grouped value, instead of the greatest one.
r = self.query(
"""SELECT tid, oid
FROM obj FORCE INDEX(PRIMARY)
FROM obj FORCE INDEX(tid)
WHERE `partition` = %(partition)s
AND tid <= %(max_tid)d
AND (tid > %(min_tid)d OR
......
......@@ -14,6 +14,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from collections import OrderedDict
import os
import sqlite3
from hashlib import sha1
......@@ -67,7 +68,7 @@ class SQLiteDatabaseManager(DatabaseManager):
never be used for small requests.
"""
VERSION = 1
VERSION = 2
def _parse(self, database):
self.db = os.path.expanduser(database)
......@@ -112,39 +113,51 @@ class SQLiteDatabaseManager(DatabaseManager):
if not e.args[0].startswith("no such table:"):
raise
def _migrate1(self, *_):
self._checkNoUnfinishedTransactions()
self.query("DROP TABLE IF EXISTS ttrans")
def _migrate2(self, schema_dict, index_dict):
# BBB: As explained in _setup, no transactional DDL
# so let's do the same dance as for MySQL.
q = self.query
if self.nonempty('obj') is None:
if self.nonempty('new_obj') is None:
return
else:
q("DROP TABLE IF EXISTS new_obj")
q(schema_dict.pop('obj') % 'new_obj')
q("INSERT INTO new_obj SELECT * FROM obj")
q("DROP TABLE obj")
q("ALTER TABLE new_obj RENAME TO obj")
def _setup(self, dedup=False):
# SQLite does support transactional Data Definition Language statements
# but unfortunately, the built-in Python binding automatically commits
# between such statements. This anti-feature causes this method to be
# relatively slow; unit tests enables the UNSAFE boolean flag.
# BBB: SQLite has transactional DDL but before Python 3.6,
# the binding automatically commits between such statements.
# This anti-feature causes this method to be relatively slow.
# Unit tests enables the UNSAFE boolean flag.
self._config.clear()
q = self.query
schema_dict = OrderedDict()
index_dict = {}
if self.nonempty("config") is None:
# The table "config" stores configuration
# parameters which affect the persistent data.
q("CREATE TABLE IF NOT EXISTS config ("
" name TEXT NOT NULL PRIMARY KEY,"
" value TEXT)")
else:
# Automatic migration.
version = self._getVersion()
if version < 1:
self._checkNoUnfinishedTransactions()
q("DROP TABLE IF EXISTS ttrans")
self._setConfiguration("version", self.VERSION)
# The table "config" stores configuration
# parameters which affect the persistent data.
schema_dict['config'] = """CREATE TABLE %s (
name TEXT NOT NULL PRIMARY KEY,
value TEXT)
"""
# The table "pt" stores a partition table.
q("""CREATE TABLE IF NOT EXISTS pt (
schema_dict['pt'] = """CREATE TABLE %s (
rid INTEGER NOT NULL,
nid INTEGER NOT NULL,
state INTEGER NOT NULL,
PRIMARY KEY (rid, nid))
""")
"""
# The table "trans" stores information on committed transactions.
q("""CREATE TABLE IF NOT EXISTS trans (
schema_dict['trans'] = """CREATE TABLE %s (
partition INTEGER NOT NULL,
tid INTEGER NOT NULL,
packed BOOLEAN NOT NULL,
......@@ -154,38 +167,34 @@ class SQLiteDatabaseManager(DatabaseManager):
ext BLOB NOT NULL,
ttid INTEGER NOT NULL,
PRIMARY KEY (partition, tid))
""")
"""
# The table "obj" stores committed object metadata.
q("""CREATE TABLE IF NOT EXISTS obj (
schema_dict['obj'] = """CREATE TABLE %s (
partition INTEGER NOT NULL,
oid INTEGER NOT NULL,
tid INTEGER NOT NULL,
data_id INTEGER,
value_tid INTEGER,
PRIMARY KEY (partition, tid, oid))
""")
q("""CREATE INDEX IF NOT EXISTS _obj_i1 ON
obj(partition, oid, tid)
""")
q("""CREATE INDEX IF NOT EXISTS _obj_i2 ON
obj(data_id)
""")
PRIMARY KEY (partition, oid, tid))
"""
index_dict['obj'] = (
"CREATE INDEX %s ON %s(partition, tid, oid)",
"CREATE INDEX %s ON %s(data_id)")
# The table "data" stores object data.
q("""CREATE TABLE IF NOT EXISTS data (
id INTEGER PRIMARY KEY AUTOINCREMENT,
schema_dict['data'] = """CREATE TABLE %s (
id INTEGER PRIMARY KEY,
hash BLOB NOT NULL,
compression INTEGER NOT NULL,
value BLOB NOT NULL)
""")
"""
if dedup:
q("""CREATE UNIQUE INDEX IF NOT EXISTS _data_i1 ON
data(hash, compression)
""")
index_dict['data'] = (
"CREATE UNIQUE INDEX %s ON %s(hash, compression)",)
# The table "ttrans" stores information on uncommitted transactions.
q("""CREATE TABLE IF NOT EXISTS ttrans (
schema_dict['ttrans'] = """CREATE TABLE %s (
partition INTEGER NOT NULL,
tid INTEGER,
packed BOOLEAN NOT NULL,
......@@ -194,17 +203,28 @@ class SQLiteDatabaseManager(DatabaseManager):
description BLOB NOT NULL,
ext BLOB NOT NULL,
ttid INTEGER NOT NULL)
""")
"""
# The table "tobj" stores uncommitted object metadata.
q("""CREATE TABLE IF NOT EXISTS tobj (
schema_dict['tobj'] = """CREATE TABLE %s (
partition INTEGER NOT NULL,
oid INTEGER NOT NULL,
tid INTEGER NOT NULL,
data_id INTEGER,
value_tid INTEGER,
PRIMARY KEY (tid, oid))
""")
"""
if self.nonempty('config') is None:
q(schema_dict.pop('config') % 'config')
self._setConfiguration('version', self.VERSION)
else:
self.migrate(schema_dict, index_dict)
for table, schema in schema_dict.iteritems():
q(schema % ('IF NOT EXISTS ' + table))
for i, index in enumerate(index_dict.get(table, ()), 1):
q(index % ('IF NOT EXISTS _%s_i%s' % (table, i), table))
self._uncommitted_data.update(q("SELECT data_id, count(*)"
" FROM tobj WHERE data_id IS NOT NULL GROUP BY data_id"))
......@@ -266,6 +286,10 @@ class SQLiteDatabaseManager(DatabaseManager):
" WHERE nid=? AND rid=partition", args).next()[0]
return trans, obj, None if oid is None else p64(oid)
def _getDataLastId(self, partition):
return self.query("SELECT MAX(id) FROM data WHERE %s <= id AND id < %s"
% (partition << 48, (partition + 1) << 48)).fetchone()[0]
def _getUnfinishedTIDDict(self):
q = self.query
return q("SELECT ttid, tid FROM ttrans"), (ttid
......@@ -338,14 +362,16 @@ class SQLiteDatabaseManager(DatabaseManager):
q = self.query
for partition in offset_list:
args = partition,
data_id_list = [x for x, in
q("SELECT DISTINCT data_id FROM obj" + where, args) if x]
data_id_list = [x for x, in q(
"SELECT DISTINCT data_id FROM obj%s AND data_id IS NOT NULL"
% where, args)]
q("DELETE FROM obj" + where, args)
q("DELETE FROM trans" + where, args)
self._pruneData(data_id_list)
def _getUnfinishedDataIdList(self):
return [x for x, in self.query("SELECT data_id FROM tobj") if x]
return [x for x, in self.query(
"SELECT data_id FROM tobj WHERE data_id IS NOT NULL")]
def dropPartitionsTemporary(self, offset_list=None):
where = "" if offset_list is None else \
......@@ -408,12 +434,14 @@ class SQLiteDatabaseManager(DatabaseManager):
return len(data_id_list)
return 0
def storeData(self, checksum, data, compression,
def storeData(self, checksum, oid, data, compression,
_dup=unique_constraint_message("data", "hash", "compression")):
H = buffer(checksum)
p = self._getPartition(util.u64(oid))
r = self._data_last_ids[p]
try:
return self.query("INSERT INTO data VALUES (NULL,?,?,?)",
(H, compression, buffer(data))).lastrowid
self.query("INSERT INTO data VALUES (?,?,?,?)",
(r, H, compression, buffer(data)))
except sqlite3.IntegrityError, e:
if e.args[0] == _dup:
(r, d), = self.query("SELECT id, value FROM data"
......@@ -422,10 +450,12 @@ class SQLiteDatabaseManager(DatabaseManager):
if str(d) == data:
return r
raise
self._data_last_ids[p] = r + 1
return r
def loadData(self, data_id):
return self.query("SELECT compression, hash, value"
" FROM data where id=?", (data_id,)).fetchone()
" FROM data WHERE id=?", (data_id,)).fetchone()
def _getDataTID(self, oid, tid=None, before_tid=None):
partition = self._getReadablePartition(oid)
......@@ -454,7 +484,8 @@ class SQLiteDatabaseManager(DatabaseManager):
tid = u64(tid)
ttid = u64(ttid)
sql = " FROM tobj WHERE tid=?"
data_id_list = [x for x, in q("SELECT data_id" + sql, (ttid,)) if x]
data_id_list = [x for x, in q("SELECT data_id%s AND data_id IS NOT NULL"
% sql, (ttid,))]
q("INSERT INTO obj SELECT partition, oid, ?, data_id, value_tid" + sql,
(tid, ttid))
q("DELETE" + sql, (ttid,))
......@@ -481,8 +512,8 @@ class SQLiteDatabaseManager(DatabaseManager):
sql += " AND tid=?"
args.append(util.u64(serial))
q = self.query
data_id_list = [x for x, in q("SELECT DISTINCT data_id" + sql, args)
if x]
data_id_list = [x for x, in q(
"SELECT DISTINCT data_id%s AND data_id IS NOT NULL" % sql, args)]
q("DELETE" + sql, args)
self._pruneData(data_id_list)
......@@ -498,8 +529,8 @@ class SQLiteDatabaseManager(DatabaseManager):
q = self.query
q("DELETE FROM trans" + sql, args)
sql = " FROM obj" + sql
data_id_list = [x for x, in q("SELECT DISTINCT data_id" + sql, args)
if x]
data_id_list = [x for x, in q(
"SELECT DISTINCT data_id%s AND data_id IS NOT NULL" % sql, args)]
q("DELETE" + sql, args)
self._pruneData(data_id_list)
......
......@@ -113,7 +113,7 @@ class StorageOperationHandler(EventHandler):
checksum, data, data_serial):
dm = self.app.dm
if data or checksum != ZERO_HASH:
data_id = dm.storeData(checksum, data, compression)
data_id = dm.storeData(checksum, oid, data, compression)
else:
data_id = None
# Directly store the transaction.
......
......@@ -470,7 +470,7 @@ class TransactionManager(EventQueue):
if data is None:
data_id = None
else:
data_id = self._app.dm.holdData(checksum, data, compression)
data_id = self._app.dm.holdData(checksum, oid, data, compression)
transaction.store(oid, data_id, value_serial)
def rebaseObject(self, ttid, oid):
......
......@@ -104,6 +104,7 @@ class StorageDBTests(NeoUnitTestBase):
def test_getPartitionTable(self):
db = self.getDB()
db.setNumPartitions(3)
uuid1, uuid2 = self.getStorageUUID(), self.getStorageUUID()
cell1 = (0, uuid1, CellStates.OUT_OF_DATE)
cell2 = (1, uuid1, CellStates.UP_TO_DATE)
......@@ -124,7 +125,7 @@ class StorageDBTests(NeoUnitTestBase):
self._last_ttid = ttid = add64(self._last_ttid, 1)
transaction = oid_list, 'user', 'desc', 'ext', False, ttid
H = "0" * 20
object_list = [(oid, self.db.holdData(H, '', 1), None)
object_list = [(oid, self.db.holdData(H, oid, '', 1), None)
for oid in oid_list]
return (transaction, object_list)
......@@ -203,6 +204,7 @@ class StorageDBTests(NeoUnitTestBase):
def test_setPartitionTable(self):
db = self.getDB()
db.setNumPartitions(3)
ptid = 1
uuid = self.getStorageUUID()
cell1 = 0, uuid, CellStates.OUT_OF_DATE
......@@ -452,8 +454,8 @@ class StorageDBTests(NeoUnitTestBase):
tid4 = self.getNextTID()
tid5 = self.getNextTID()
oid1 = p64(1)
foo = db.holdData("3" * 20, 'foo', 0)
bar = db.holdData("4" * 20, 'bar', 0)
foo = db.holdData("3" * 20, oid1, 'foo', 0)
bar = db.holdData("4" * 20, oid1, 'bar', 0)
db.releaseData((foo, bar))
db.storeTransaction(
tid1, (
......
......@@ -19,6 +19,7 @@ from MySQLdb import NotSupportedError, OperationalError
from MySQLdb.constants.ER import UNKNOWN_STORAGE_ENGINE
from ..mock import Mock
from neo.lib.exception import DatabaseFailure
from neo.lib.protocol import ZERO_OID
from neo.lib.util import p64
from .. import DB_PREFIX, DB_SOCKET, DB_USER
from .testStorageDBTests import StorageDBTests
......@@ -114,7 +115,7 @@ class StorageMySQLdbTests(StorageDBTests):
self.assertEqual(2, max(len(self.db.escape(chr(x)))
for x in xrange(256)))
self.assertEqual(2, len(self.db.escape('\0')))
self.db.storeData('\0' * 20, '\0' * (2**24-1), 0)
self.db.storeData('\0' * 20, ZERO_OID, '\0' * (2**24-1), 0)
size, = query_list
max_allowed = self.db.__class__._max_allowed_packet
self.assertTrue(max_allowed - 1024 < size <= max_allowed, size)
......@@ -123,7 +124,7 @@ class StorageMySQLdbTests(StorageDBTests):
self.db._max_allowed_packet = max_allowed_packet
del query_list[:]
self.db.storeTransaction(p64(0),
((p64(1<<i),0,None) for i in xrange(10)), None)
((p64(1<<i),1234,None) for i in xrange(10)), None)
self.assertEqual(max(query_list), max_allowed_packet)
self.assertEqual(len(query_list), count)
......
......@@ -57,7 +57,7 @@ class TransactionManagerTests(NeoUnitTestBase):
self.manager.storeObject(locking_serial, ram_serial, oid, 0, "3" * 20,
'bar', None)
holdData = self.app.dm.mockGetNamedCalls('holdData')
self.assertEqual(holdData.pop(0).params, ("3" * 20, 'bar', 0))
self.assertEqual(holdData.pop(0).params, ("3" * 20, oid, 'bar', 0))
orig_object = self.manager.getObjectFromTransaction(locking_serial,
oid)
self.manager.updateObjectDataForPack(oid, orig_serial, None, checksum)
......
......@@ -480,17 +480,18 @@ class Test(NEOThreadedTest):
def test_notifyNodeInformation(self, cluster):
# translated from MasterNotificationsHandlerTests
# (neo.tests.client.testMasterHandler)
good = [1, 0].pop
if 1:
cluster.db # open DB
s0, s1 = cluster.client.nm.getStorageList()
conn = s0.getConnection()
self.assertFalse(conn.isClosed())
getCellSortKey = cluster.client.cp.getCellSortKey
self.assertEqual(getCellSortKey(s0, int), 0)
self.assertEqual(getCellSortKey(s0, good), 0)
cluster.neoctl.dropNode(s0.getUUID())
self.assertEqual([s1], cluster.client.nm.getStorageList())
self.assertTrue(conn.isClosed())
self.assertEqual(getCellSortKey(s0, int), 1)
self.assertEqual(getCellSortKey(s0, good), 1)
# XXX: the test originally checked that 'unregister' method
# was called (even if it's useless in this case),
# but we would need an API to do that easily.
......@@ -1438,7 +1439,7 @@ class Test(NEOThreadedTest):
bad = []
ok = []
def data_args(value):
return makeChecksum(value), value, 0
return makeChecksum(value), ZERO_OID, value, 0
node_list = []
for i, s in enumerate(cluster.storage_list):
node_list.append(s.uuid)
......
......@@ -132,10 +132,14 @@ class ReplicationTests(NEOThreadedTest):
self.assertEqual(backup.neoctl.getClusterState(),
ClusterStates.RUNNING)
# Restart and switch to BACKINGUP mode again.
backup.stop()
backup.start()
backup.neoctl.setClusterState(ClusterStates.STARTING_BACKUP)
self.tic()
# Leave BACKINGUP mode when 1 replica is late. The cluster
# remains in STOPPING_BACKUP state until it catches up.
with backup.master.filterConnection(*backup.storage_list) as f:
f.add(delaySecondary)
while not f.filtered_count:
......@@ -147,6 +151,8 @@ class ReplicationTests(NEOThreadedTest):
self.assertEqual(np*nr, self.checkBackup(backup,
max_tid=backup.last_tid))
# Again but leave BACKINGUP mode when a storage node is
# receiving data from the upstream cluster.
backup.stop()
backup.start()
backup.neoctl.setClusterState(ClusterStates.STARTING_BACKUP)
......@@ -162,6 +168,48 @@ class ReplicationTests(NEOThreadedTest):
self.assertEqual(np*nr, self.checkBackup(backup,
max_tid=backup.last_tid))
storage = upstream.getZODBStorage()
# Check that replication from upstream is resumed even if
# upstream is idle.
backup.neoctl.setClusterState(ClusterStates.STARTING_BACKUP)
self.tic()
x = backup.master.backup_app.primary_partition_dict
new_oid_storage = x[0]
with upstream.moduloTID(next(p for p, n in x.iteritems()
if n is not new_oid_storage)), \
ConnectionFilter() as f:
f.delayAddObject()
# Transaction that touches 2 primary cells on 2 different
# nodes.
txn = transaction.Transaction()
tid = storage.load(ZERO_OID)[1]
storage.tpc_begin(txn)
storage.store(ZERO_OID, tid, '', '', txn)
storage.tpc_vote(txn)
storage.tpc_finish(txn)
self.tic()
# Stop when exactly 1 of the 2 cells is synced with
# upstream.
backup.stop()
backup.start()
self.assertEqual(np*nr, self.checkBackup(backup,
max_tid=backup.last_tid))
# Check that replication to secondary cells is resumed even if
# upstream is idle.
with backup.master.filterConnection(*backup.storage_list) as f:
f.add(delaySecondary)
txn = transaction.Transaction()
storage.tpc_begin(txn)
storage.tpc_finish(txn)
self.tic()
backup.stop()
backup.start()
self.assertEqual(np*nr, self.checkBackup(backup,
max_tid=backup.last_tid))
@predictable_random()
def testBackupNodeLost(self):
"""Check backup cluster can recover after random connection loss
......
......@@ -60,7 +60,7 @@ else:
setup(
name = 'neoppod',
version = '1.8.1',
version = '1.9',
description = __doc__.strip(),
author = 'Nexedi SA',
author_email = 'neo-dev@erp5.org',
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment