Commit 56570945 authored by Julien Muchembled's avatar Julien Muchembled

storage: rename 'obj.*serial' columns to '*tid' for consistency with 'trans' table

parent 8e3c7b01
...@@ -9,10 +9,10 @@ SQL commands to migrate each storage from NEO 0.10.x:: ...@@ -9,10 +9,10 @@ SQL commands to migrate each storage from NEO 0.10.x::
CREATE TABLE new_data (id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY, hash BINARY(20) NOT NULL UNIQUE, compression TINYINT UNSIGNED NULL, value LONGBLOB NULL) ENGINE = InnoDB SELECT DISTINCT obj.hash as hash, compression, value FROM obj, data WHERE obj.hash=data.hash ORDER BY serial; CREATE TABLE new_data (id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY, hash BINARY(20) NOT NULL UNIQUE, compression TINYINT UNSIGNED NULL, value LONGBLOB NULL) ENGINE = InnoDB SELECT DISTINCT obj.hash as hash, compression, value FROM obj, data WHERE obj.hash=data.hash ORDER BY serial;
DROP TABLE data; DROP TABLE data;
RENAME TABLE new_data TO data; RENAME TABLE new_data TO data;
CREATE TABLE new_obj (partition SMALLINT UNSIGNED NOT NULL, oid BIGINT UNSIGNED NOT NULL, serial BIGINT UNSIGNED NOT NULL, data_id BIGINT UNSIGNED NULL, value_serial BIGINT UNSIGNED NULL, PRIMARY KEY (partition, serial, oid), KEY (partition, oid, serial), KEY (data_id)) ENGINE = InnoDB SELECT partition, oid, serial, data.id as data_id, value_serial FROM obj LEFT JOIN data ON (obj.hash=data.hash); CREATE TABLE new_obj (partition SMALLINT UNSIGNED NOT NULL, oid BIGINT UNSIGNED NOT NULL, tid BIGINT UNSIGNED NOT NULL, data_id BIGINT UNSIGNED NULL, value_tid BIGINT UNSIGNED NULL, PRIMARY KEY (partition, tid, oid), KEY (partition, oid, tid), KEY (data_id)) ENGINE = InnoDB SELECT partition, oid, serial as tid, data.id as data_id, value_serial as value_tid FROM obj LEFT JOIN data ON (obj.hash=data.hash);
DROP TABLE obj; DROP TABLE obj;
RENAME TABLE new_obj TO obj; RENAME TABLE new_obj TO obj;
ALTER TABLE tobj CHANGE hash data_id BIGINT UNSIGNED NULL; ALTER TABLE tobj CHANGE serial tid BIGINT UNSIGNED NOT NULL, CHANGE hash data_id BIGINT UNSIGNED NULL, CHANGE value_serial value_tid BIGINT UNSIGNED NULL;
NEO 0.10 NEO 0.10
======== ========
......
...@@ -193,11 +193,11 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -193,11 +193,11 @@ class MySQLDatabaseManager(DatabaseManager):
q("""CREATE TABLE IF NOT EXISTS obj ( q("""CREATE TABLE IF NOT EXISTS obj (
partition SMALLINT UNSIGNED NOT NULL, partition SMALLINT UNSIGNED NOT NULL,
oid BIGINT UNSIGNED NOT NULL, oid BIGINT UNSIGNED NOT NULL,
serial BIGINT UNSIGNED NOT NULL, tid BIGINT UNSIGNED NOT NULL,
data_id BIGINT UNSIGNED NULL, data_id BIGINT UNSIGNED NULL,
value_serial BIGINT UNSIGNED NULL, value_tid BIGINT UNSIGNED NULL,
PRIMARY KEY (partition, serial, oid), PRIMARY KEY (partition, tid, oid),
KEY (partition, oid, serial), KEY (partition, oid, tid),
KEY (data_id) KEY (data_id)
) ENGINE = InnoDB""" + p) ) ENGINE = InnoDB""" + p)
...@@ -226,10 +226,10 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -226,10 +226,10 @@ class MySQLDatabaseManager(DatabaseManager):
q("""CREATE TABLE IF NOT EXISTS tobj ( q("""CREATE TABLE IF NOT EXISTS tobj (
partition SMALLINT UNSIGNED NOT NULL, partition SMALLINT UNSIGNED NOT NULL,
oid BIGINT UNSIGNED NOT NULL, oid BIGINT UNSIGNED NOT NULL,
serial BIGINT UNSIGNED NOT NULL, tid BIGINT UNSIGNED NOT NULL,
data_id BIGINT UNSIGNED NULL, data_id BIGINT UNSIGNED NULL,
value_serial BIGINT UNSIGNED NULL, value_tid BIGINT UNSIGNED NULL,
PRIMARY KEY (serial, oid) PRIMARY KEY (tid, oid)
) ENGINE = InnoDB""") ) ENGINE = InnoDB""")
self._uncommitted_data = dict(q("SELECT data_id, count(*)" self._uncommitted_data = dict(q("SELECT data_id, count(*)"
...@@ -284,13 +284,13 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -284,13 +284,13 @@ class MySQLDatabaseManager(DatabaseManager):
for partition, tid in q("SELECT partition, MAX(tid)" for partition, tid in q("SELECT partition, MAX(tid)"
" FROM trans GROUP BY partition")) " FROM trans GROUP BY partition"))
obj = dict((partition, p64(tid)) obj = dict((partition, p64(tid))
for partition, tid in q("SELECT partition, MAX(serial)" for partition, tid in q("SELECT partition, MAX(tid)"
" FROM obj GROUP BY partition")) " FROM obj GROUP BY partition"))
if all: if all:
tid = q("SELECT MAX(tid) FROM ttrans")[0][0] tid = q("SELECT MAX(tid) FROM ttrans")[0][0]
if tid is not None: if tid is not None:
trans[None] = p64(tid) trans[None] = p64(tid)
tid = q("SELECT MAX(serial) FROM tobj")[0][0] tid = q("SELECT MAX(tid) FROM tobj")[0][0]
if tid is not None: if tid is not None:
obj[None] = p64(tid) obj[None] = p64(tid)
return trans, obj return trans, obj
...@@ -300,7 +300,7 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -300,7 +300,7 @@ class MySQLDatabaseManager(DatabaseManager):
with self as q: with self as q:
r = q("""SELECT tid FROM ttrans""") r = q("""SELECT tid FROM ttrans""")
tid_set.update((util.p64(t[0]) for t in r)) tid_set.update((util.p64(t[0]) for t in r))
r = q("""SELECT serial FROM tobj""") r = q("""SELECT tid FROM tobj""")
tid_set.update((util.p64(t[0]) for t in r)) tid_set.update((util.p64(t[0]) for t in r))
return list(tid_set) return list(tid_set)
...@@ -310,32 +310,31 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -310,32 +310,31 @@ class MySQLDatabaseManager(DatabaseManager):
partition = self._getPartition(oid) partition = self._getPartition(oid)
with self as q: with self as q:
return q("SELECT oid FROM obj WHERE partition=%d AND oid=%d AND " return q("SELECT oid FROM obj WHERE partition=%d AND oid=%d AND "
"serial=%d" % (partition, oid, tid)) or all and \ "tid=%d" % (partition, oid, tid)) or all and \
q("SELECT oid FROM tobj WHERE serial=%d AND oid=%d" q("SELECT oid FROM tobj WHERE tid=%d AND oid=%d"
% (tid, oid)) % (tid, oid))
def _getObject(self, oid, tid=None, before_tid=None): def _getObject(self, oid, tid=None, before_tid=None):
q = self.query q = self.query
partition = self._getPartition(oid) partition = self._getPartition(oid)
sql = ('SELECT serial, compression, data.hash, value, value_serial' sql = ('SELECT tid, compression, data.hash, value, value_tid'
' FROM obj LEFT JOIN data ON (obj.data_id = data.id)' ' FROM obj LEFT JOIN data ON (obj.data_id = data.id)'
' WHERE partition = %d AND oid = %d') % (partition, oid) ' WHERE partition = %d AND oid = %d') % (partition, oid)
if tid is not None: if tid is not None:
sql += ' AND serial = %d' % tid sql += ' AND tid = %d' % tid
elif before_tid is not None: elif before_tid is not None:
sql += ' AND serial < %d ORDER BY serial DESC LIMIT 1' % before_tid sql += ' AND tid < %d ORDER BY tid DESC LIMIT 1' % before_tid
else: else:
# XXX I want to express "HAVING serial = MAX(serial)", but # XXX I want to express "HAVING tid = MAX(tid)", but
# MySQL does not use an index for a HAVING clause! # MySQL does not use an index for a HAVING clause!
sql += ' ORDER BY serial DESC LIMIT 1' sql += ' ORDER BY tid DESC LIMIT 1'
r = q(sql) r = q(sql)
try: try:
serial, compression, checksum, data, value_serial = r[0] serial, compression, checksum, data, value_serial = r[0]
except IndexError: except IndexError:
return None return None
r = q("""SELECT serial FROM obj r = q("SELECT tid FROM obj WHERE partition=%d AND oid=%d AND tid>%d"
WHERE partition = %d AND oid = %d AND serial > %d " ORDER BY tid LIMIT 1" % (partition, oid, serial))
ORDER BY serial LIMIT 1""" % (partition, oid, serial))
try: try:
next_serial = r[0][0] next_serial = r[0][0]
except IndexError: except IndexError:
...@@ -427,7 +426,7 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -427,7 +426,7 @@ class MySQLDatabaseManager(DatabaseManager):
if value_serial: if value_serial:
value_serial = u64(value_serial) value_serial = u64(value_serial)
(data_id,), = q("SELECT data_id FROM obj" (data_id,), = q("SELECT data_id FROM obj"
" WHERE partition=%d AND oid=%d AND serial=%d" " WHERE partition=%d AND oid=%d AND tid=%d"
% (partition, oid, value_serial)) % (partition, oid, value_serial))
if temporary: if temporary:
self.storeData(data_id) self.storeData(data_id)
...@@ -475,17 +474,17 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -475,17 +474,17 @@ class MySQLDatabaseManager(DatabaseManager):
return r return r
def _getDataTID(self, oid, tid=None, before_tid=None): def _getDataTID(self, oid, tid=None, before_tid=None):
sql = ('SELECT serial, data_id, value_serial FROM obj' sql = ('SELECT tid, data_id, value_tid FROM obj'
' WHERE partition = %d AND oid = %d' ' WHERE partition = %d AND oid = %d'
) % (self._getPartition(oid), oid) ) % (self._getPartition(oid), oid)
if tid is not None: if tid is not None:
sql += ' AND serial = %d' % tid sql += ' AND tid = %d' % tid
elif before_tid is not None: elif before_tid is not None:
sql += ' AND serial < %d ORDER BY serial DESC LIMIT 1' % before_tid sql += ' AND tid < %d ORDER BY tid DESC LIMIT 1' % before_tid
else: else:
# XXX I want to express "HAVING serial = MAX(serial)", but # XXX I want to express "HAVING tid = MAX(tid)", but
# MySQL does not use an index for a HAVING clause! # MySQL does not use an index for a HAVING clause!
sql += ' ORDER BY serial DESC LIMIT 1' sql += ' ORDER BY tid DESC LIMIT 1'
r = self.query(sql) r = self.query(sql)
if r: if r:
(serial, data_id, value_serial), = r (serial, data_id, value_serial), = r
...@@ -498,10 +497,10 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -498,10 +497,10 @@ class MySQLDatabaseManager(DatabaseManager):
q = self.query q = self.query
tid = util.u64(tid) tid = util.u64(tid)
with self as q: with self as q:
sql = " FROM tobj WHERE serial=%d" % tid sql = " FROM tobj WHERE tid=%d" % tid
data_id_list = [x for x, in q("SELECT data_id" + sql) if x] data_id_list = [x for x, in q("SELECT data_id" + sql) if x]
q("INSERT INTO obj SELECT *" + sql) q("INSERT INTO obj SELECT *" + sql)
q("DELETE FROM tobj WHERE serial=%d" % tid) q("DELETE FROM tobj WHERE tid=%d" % tid)
q("INSERT INTO trans SELECT * FROM ttrans WHERE tid=%d" % tid) q("INSERT INTO trans SELECT * FROM ttrans WHERE tid=%d" % tid)
q("DELETE FROM ttrans WHERE tid=%d" % tid) q("DELETE FROM ttrans WHERE tid=%d" % tid)
self.unlockData(data_id_list) self.unlockData(data_id_list)
...@@ -511,7 +510,7 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -511,7 +510,7 @@ class MySQLDatabaseManager(DatabaseManager):
tid = u64(tid) tid = u64(tid)
getPartition = self._getPartition getPartition = self._getPartition
with self as q: with self as q:
sql = " FROM tobj WHERE serial=%d" % tid sql = " FROM tobj WHERE tid=%d" % tid
data_id_list = [x for x, in q("SELECT data_id" + sql) if x] data_id_list = [x for x, in q("SELECT data_id" + sql) if x]
self.unlockData(data_id_list) self.unlockData(data_id_list)
q("DELETE" + sql) q("DELETE" + sql)
...@@ -522,7 +521,7 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -522,7 +521,7 @@ class MySQLDatabaseManager(DatabaseManager):
data_id_set = set() data_id_set = set()
for oid in oid_list: for oid in oid_list:
oid = u64(oid) oid = u64(oid)
sql = " FROM obj WHERE partition=%d AND oid=%d AND serial=%d" \ sql = " FROM obj WHERE partition=%d AND oid=%d AND tid=%d" \
% (getPartition(oid), oid, tid) % (getPartition(oid), oid, tid)
data_id_set.update(*q("SELECT data_id" + sql)) data_id_set.update(*q("SELECT data_id" + sql))
q("DELETE" + sql) q("DELETE" + sql)
...@@ -535,7 +534,7 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -535,7 +534,7 @@ class MySQLDatabaseManager(DatabaseManager):
sql = " FROM obj WHERE partition=%d AND oid=%d" \ sql = " FROM obj WHERE partition=%d AND oid=%d" \
% (self._getPartition(oid), oid) % (self._getPartition(oid), oid)
if serial: if serial:
sql += ' AND serial=%d' % u64(serial) sql += ' AND tid=%d' % u64(serial)
with self as q: with self as q:
data_id_list = [x for x, in q("SELECT DISTINCT data_id" + sql) if x] data_id_list = [x for x, in q("SELECT DISTINCT data_id" + sql) if x]
q("DELETE" + sql) q("DELETE" + sql)
...@@ -549,7 +548,7 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -549,7 +548,7 @@ class MySQLDatabaseManager(DatabaseManager):
sql += " AND tid <= %d" % util.u64(max_tid) sql += " AND tid <= %d" % util.u64(max_tid)
q = self.query q = self.query
q("DELETE FROM trans" + sql) q("DELETE FROM trans" + sql)
sql = " FROM obj" + sql.replace('tid', 'serial') sql = " FROM obj" + sql
data_id_list = [x for x, in q("SELECT DISTINCT data_id" + sql) if x] data_id_list = [x for x, in q("SELECT DISTINCT data_id" + sql) if x]
q("DELETE" + sql) q("DELETE" + sql)
self._pruneData(data_id_list) self._pruneData(data_id_list)
...@@ -571,9 +570,9 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -571,9 +570,9 @@ class MySQLDatabaseManager(DatabaseManager):
def _getObjectLength(self, oid, value_serial): def _getObjectLength(self, oid, value_serial):
if value_serial is None: if value_serial is None:
raise CreationUndone raise CreationUndone
r = self.query("""SELECT LENGTH(value), value_serial r = self.query("""SELECT LENGTH(value), value_tid
FROM obj LEFT JOIN data ON (obj.data_id = data.id) FROM obj LEFT JOIN data ON (obj.data_id = data.id)
WHERE partition = %d AND oid = %d AND serial = %d""" % WHERE partition = %d AND oid = %d AND tid = %d""" %
(self._getPartition(oid), oid, value_serial)) (self._getPartition(oid), oid, value_serial))
length, value_serial = r[0] length, value_serial = r[0]
if length is None: if length is None:
...@@ -590,10 +589,10 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -590,10 +589,10 @@ class MySQLDatabaseManager(DatabaseManager):
oid = util.u64(oid) oid = util.u64(oid)
p64 = util.p64 p64 = util.p64
pack_tid = self._getPackTID() pack_tid = self._getPackTID()
r = self.query("""SELECT serial, LENGTH(value), value_serial r = self.query("""SELECT tid, LENGTH(value), value_tid
FROM obj LEFT JOIN data ON (obj.data_id = data.id) FROM obj LEFT JOIN data ON (obj.data_id = data.id)
WHERE partition = %d AND oid = %d AND serial >= %d WHERE partition = %d AND oid = %d AND tid >= %d
ORDER BY serial DESC LIMIT %d, %d""" \ ORDER BY tid DESC LIMIT %d, %d""" \
% (self._getPartition(oid), oid, pack_tid, offset, length)) % (self._getPartition(oid), oid, pack_tid, offset, length))
if r: if r:
result = [] result = []
...@@ -613,10 +612,10 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -613,10 +612,10 @@ class MySQLDatabaseManager(DatabaseManager):
u64 = util.u64 u64 = util.u64
p64 = util.p64 p64 = util.p64
min_tid = u64(min_tid) min_tid = u64(min_tid)
r = self.query('SELECT serial, oid FROM obj' r = self.query('SELECT tid, oid FROM obj'
' WHERE partition = %d AND serial <= %d' ' WHERE partition = %d AND tid <= %d'
' AND (serial = %d AND %d <= oid OR %d < serial)' ' AND (tid = %d AND %d <= oid OR %d < tid)'
' ORDER BY serial ASC, oid ASC LIMIT %d' % ( ' ORDER BY tid ASC, oid ASC LIMIT %d' % (
partition, u64(max_tid), min_tid, u64(min_oid), min_tid, length)) partition, u64(max_tid), min_tid, u64(min_oid), min_tid, length))
return [(p64(serial), p64(oid)) for serial, oid in r] return [(p64(serial), p64(oid)) for serial, oid in r]
...@@ -653,21 +652,21 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -653,21 +652,21 @@ class MySQLDatabaseManager(DatabaseManager):
kw = { kw = {
'partition': self._getPartition(oid), 'partition': self._getPartition(oid),
'oid': oid, 'oid': oid,
'orig_serial': orig_serial, 'orig_tid': orig_serial,
'max_serial': max_serial, 'max_tid': max_serial,
'new_serial': 'NULL', 'new_tid': 'NULL',
} }
for kw['table'] in 'obj', 'tobj': for kw['table'] in 'obj', 'tobj':
for kw['serial'], in q('SELECT serial FROM %(table)s' for kw['tid'], in q('SELECT tid FROM %(table)s'
' WHERE partition=%(partition)d AND oid=%(oid)d' ' WHERE partition=%(partition)d AND oid=%(oid)d'
' AND serial>=%(max_serial)d AND value_serial=%(orig_serial)d' ' AND tid>=%(max_tid)d AND value_tid=%(orig_tid)d'
' ORDER BY serial ASC' % kw): ' ORDER BY tid ASC' % kw):
q('UPDATE %(table)s SET value_serial=%(new_serial)s' q('UPDATE %(table)s SET value_tid=%(new_tid)s'
' WHERE partition=%(partition)d AND oid=%(oid)d' ' WHERE partition=%(partition)d AND oid=%(oid)d'
' AND serial=%(serial)d' % kw) ' AND tid=%(tid)d' % kw)
if value_serial is None: if value_serial is None:
# First found, mark its serial for future reference. # First found, mark its serial for future reference.
kw['new_serial'] = value_serial = kw['serial'] kw['new_tid'] = value_serial = kw['tid']
return value_serial return value_serial
def pack(self, tid, updateObjectDataForPack): def pack(self, tid, updateObjectDataForPack):
...@@ -679,11 +678,11 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -679,11 +678,11 @@ class MySQLDatabaseManager(DatabaseManager):
with self as q: with self as q:
self._setPackTID(tid) self._setPackTID(tid)
for count, oid, max_serial in q('SELECT COUNT(*) - 1, oid, ' for count, oid, max_serial in q('SELECT COUNT(*) - 1, oid, '
'MAX(serial) FROM obj WHERE serial <= %d GROUP BY oid' 'MAX(tid) FROM obj WHERE tid <= %d GROUP BY oid'
% tid): % tid):
partition = getPartition(oid) partition = getPartition(oid)
if q("SELECT 1 FROM obj WHERE partition = %d" if q("SELECT 1 FROM obj WHERE partition = %d"
" AND oid = %d AND serial = %d AND data_id IS NULL" " AND oid = %d AND tid = %d AND data_id IS NULL"
% (partition, oid, max_serial)): % (partition, oid, max_serial)):
max_serial += 1 max_serial += 1
elif not count: elif not count:
...@@ -691,8 +690,8 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -691,8 +690,8 @@ class MySQLDatabaseManager(DatabaseManager):
# There are things to delete for this object # There are things to delete for this object
data_id_set = set() data_id_set = set()
sql = ' FROM obj WHERE partition=%d AND oid=%d' \ sql = ' FROM obj WHERE partition=%d AND oid=%d' \
' AND serial<%d' % (partition, oid, max_serial) ' AND tid<%d' % (partition, oid, max_serial)
for serial, data_id in q('SELECT serial, data_id' + sql): for serial, data_id in q('SELECT tid, data_id' + sql):
data_id_set.add(data_id) data_id_set.add(data_id)
new_serial = updatePackFuture(oid, serial, max_serial) new_serial = updatePackFuture(oid, serial, max_serial)
if new_serial: if new_serial:
...@@ -727,15 +726,15 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -727,15 +726,15 @@ class MySQLDatabaseManager(DatabaseManager):
# We would need a function (that could be named 'LAST') that returns the # We would need a function (that could be named 'LAST') that returns the
# last grouped value, instead of the greatest one. # last grouped value, instead of the greatest one.
r = self.query( r = self.query(
"""SELECT oid, serial """SELECT oid, tid
FROM obj FROM obj
WHERE partition = %(partition)s WHERE partition = %(partition)s
AND serial <= %(max_tid)d AND tid <= %(max_tid)d
AND (oid > %(min_oid)d OR AND (oid > %(min_oid)d OR
oid = %(min_oid)d AND serial >= %(min_serial)d) oid = %(min_oid)d AND tid >= %(min_tid)d)
ORDER BY oid ASC, serial ASC %(limit)s""" % { ORDER BY oid ASC, tid ASC %(limit)s""" % {
'min_oid': u64(min_oid), 'min_oid': u64(min_oid),
'min_serial': u64(min_serial), 'min_tid': u64(min_serial),
'max_tid': u64(max_tid), 'max_tid': u64(max_tid),
'limit': '' if length is None else 'LIMIT %(length)d' % length, 'limit': '' if length is None else 'LIMIT %(length)d' % length,
'partition': partition, 'partition': partition,
......
...@@ -130,13 +130,13 @@ class SQLiteDatabaseManager(DatabaseManager): ...@@ -130,13 +130,13 @@ class SQLiteDatabaseManager(DatabaseManager):
q("""CREATE TABLE IF NOT EXISTS obj ( q("""CREATE TABLE IF NOT EXISTS obj (
partition INTEGER NOT NULL, partition INTEGER NOT NULL,
oid INTEGER NOT NULL, oid INTEGER NOT NULL,
serial INTEGER NOT NULL, tid INTEGER NOT NULL,
data_id INTEGER, data_id INTEGER,
value_serial INTEGER, value_tid INTEGER,
PRIMARY KEY (partition, serial, oid)) PRIMARY KEY (partition, tid, oid))
""") """)
q("""CREATE INDEX IF NOT EXISTS _obj_i1 ON q("""CREATE INDEX IF NOT EXISTS _obj_i1 ON
obj(partition, oid, serial) obj(partition, oid, tid)
""") """)
q("""CREATE INDEX IF NOT EXISTS _obj_i2 ON q("""CREATE INDEX IF NOT EXISTS _obj_i2 ON
obj(data_id) obj(data_id)
...@@ -165,10 +165,10 @@ class SQLiteDatabaseManager(DatabaseManager): ...@@ -165,10 +165,10 @@ class SQLiteDatabaseManager(DatabaseManager):
q("""CREATE TABLE IF NOT EXISTS tobj ( q("""CREATE TABLE IF NOT EXISTS tobj (
partition INTEGER NOT NULL, partition INTEGER NOT NULL,
oid INTEGER NOT NULL, oid INTEGER NOT NULL,
serial INTEGER NOT NULL, tid INTEGER NOT NULL,
data_id INTEGER, data_id INTEGER,
value_serial INTEGER, value_tid INTEGER,
PRIMARY KEY (serial, oid)) PRIMARY KEY (tid, oid))
""") """)
self._uncommitted_data = dict(q("SELECT data_id, count(*)" self._uncommitted_data = dict(q("SELECT data_id, count(*)"
...@@ -215,13 +215,13 @@ class SQLiteDatabaseManager(DatabaseManager): ...@@ -215,13 +215,13 @@ class SQLiteDatabaseManager(DatabaseManager):
for partition, tid in q("SELECT partition, MAX(tid)" for partition, tid in q("SELECT partition, MAX(tid)"
" FROM trans GROUP BY partition")) " FROM trans GROUP BY partition"))
obj = dict((partition, p64(tid)) obj = dict((partition, p64(tid))
for partition, tid in q("SELECT partition, MAX(serial)" for partition, tid in q("SELECT partition, MAX(tid)"
" FROM obj GROUP BY partition")) " FROM obj GROUP BY partition"))
if all: if all:
tid = q("SELECT MAX(tid) FROM ttrans").fetchone()[0] tid = q("SELECT MAX(tid) FROM ttrans").fetchone()[0]
if tid is not None: if tid is not None:
trans[None] = p64(tid) trans[None] = p64(tid)
tid = q("SELECT MAX(serial) FROM tobj").fetchone()[0] tid = q("SELECT MAX(tid) FROM tobj").fetchone()[0]
if tid is not None: if tid is not None:
obj[None] = p64(tid) obj[None] = p64(tid)
return trans, obj return trans, obj
...@@ -231,40 +231,40 @@ class SQLiteDatabaseManager(DatabaseManager): ...@@ -231,40 +231,40 @@ class SQLiteDatabaseManager(DatabaseManager):
tid_set = set() tid_set = set()
with self as q: with self as q:
tid_set.update((p64(t[0]) for t in q("SELECT tid FROM ttrans"))) tid_set.update((p64(t[0]) for t in q("SELECT tid FROM ttrans")))
tid_set.update((p64(t[0]) for t in q("SELECT serial FROM tobj"))) tid_set.update((p64(t[0]) for t in q("SELECT tid FROM tobj")))
return list(tid_set) return list(tid_set)
def objectPresent(self, oid, tid, all=True): def objectPresent(self, oid, tid, all=True):
oid = util.u64(oid) oid = util.u64(oid)
tid = util.u64(tid) tid = util.u64(tid)
with self as q: with self as q:
r = q("SELECT 1 FROM obj WHERE partition=? AND oid=? AND serial=?", r = q("SELECT 1 FROM obj WHERE partition=? AND oid=? AND tid=?",
(self._getPartition(oid), oid, tid)).fetchone() (self._getPartition(oid), oid, tid)).fetchone()
if not r and all: if not r and all:
r = q("SELECT 1 FROM tobj WHERE serial=? AND oid=?", r = q("SELECT 1 FROM tobj WHERE tid=? AND oid=?",
(tid, oid)).fetchone() (tid, oid)).fetchone()
return bool(r) return bool(r)
def _getObject(self, oid, tid=None, before_tid=None): def _getObject(self, oid, tid=None, before_tid=None):
q = self.query q = self.query
partition = self._getPartition(oid) partition = self._getPartition(oid)
sql = ('SELECT serial, compression, data.hash, value, value_serial' sql = ('SELECT tid, compression, data.hash, value, value_tid'
' FROM obj LEFT JOIN data ON obj.data_id = data.id' ' FROM obj LEFT JOIN data ON obj.data_id = data.id'
' WHERE partition=? AND oid=?') ' WHERE partition=? AND oid=?')
if tid is not None: if tid is not None:
r = q(sql + ' AND serial=?', (partition, oid, tid)) r = q(sql + ' AND tid=?', (partition, oid, tid))
elif before_tid is not None: elif before_tid is not None:
r = q(sql + ' AND serial<? ORDER BY serial DESC LIMIT 1', r = q(sql + ' AND tid<? ORDER BY tid DESC LIMIT 1',
(partition, oid, before_tid)) (partition, oid, before_tid))
else: else:
r = q(sql + ' ORDER BY serial DESC LIMIT 1', (partition, oid)) r = q(sql + ' ORDER BY tid DESC LIMIT 1', (partition, oid))
try: try:
serial, compression, checksum, data, value_serial = r.fetchone() serial, compression, checksum, data, value_serial = r.fetchone()
except TypeError: except TypeError:
return None return None
r = q("""SELECT serial FROM obj r = q("""SELECT tid FROM obj
WHERE partition=? AND oid=? AND serial>? WHERE partition=? AND oid=? AND tid>?
ORDER BY serial LIMIT 1""", ORDER BY tid LIMIT 1""",
(partition, oid, serial)).fetchone() (partition, oid, serial)).fetchone()
if checksum: if checksum:
checksum = str(checksum) checksum = str(checksum)
...@@ -325,7 +325,7 @@ class SQLiteDatabaseManager(DatabaseManager): ...@@ -325,7 +325,7 @@ class SQLiteDatabaseManager(DatabaseManager):
if value_serial: if value_serial:
value_serial = u64(value_serial) value_serial = u64(value_serial)
(data_id,), = q("SELECT data_id FROM obj" (data_id,), = q("SELECT data_id FROM obj"
" WHERE partition=? AND oid=? AND serial=?", " WHERE partition=? AND oid=? AND tid=?",
(partition, oid, value_serial)) (partition, oid, value_serial))
if temporary: if temporary:
self.storeData(data_id) self.storeData(data_id)
...@@ -335,8 +335,8 @@ class SQLiteDatabaseManager(DatabaseManager): ...@@ -335,8 +335,8 @@ class SQLiteDatabaseManager(DatabaseManager):
# This may happen if a previous replication of 'obj' was # This may happen if a previous replication of 'obj' was
# interrupted. # interrupted.
if not T: if not T:
r, = q("SELECT data_id, value_serial FROM obj" r, = q("SELECT data_id, value_tid FROM obj"
" WHERE partition=? AND oid=? AND serial=?", " WHERE partition=? AND oid=? AND tid=?",
(partition, oid, tid)) (partition, oid, tid))
if r == (data_id, value_serial): if r == (data_id, value_serial):
continue continue
...@@ -376,15 +376,15 @@ class SQLiteDatabaseManager(DatabaseManager): ...@@ -376,15 +376,15 @@ class SQLiteDatabaseManager(DatabaseManager):
def _getDataTID(self, oid, tid=None, before_tid=None): def _getDataTID(self, oid, tid=None, before_tid=None):
partition = self._getPartition(oid) partition = self._getPartition(oid)
sql = 'SELECT serial, data_id, value_serial FROM obj' \ sql = 'SELECT tid, data_id, value_tid FROM obj' \
' WHERE partition=? AND oid=?' ' WHERE partition=? AND oid=?'
if tid is not None: if tid is not None:
r = self.query(sql + ' AND serial=?', (partition, oid, tid)) r = self.query(sql + ' AND tid=?', (partition, oid, tid))
elif before_tid is not None: elif before_tid is not None:
r = self.query(sql + ' AND serial<? ORDER BY serial DESC LIMIT 1', r = self.query(sql + ' AND tid<? ORDER BY tid DESC LIMIT 1',
(partition, oid, before_tid)) (partition, oid, before_tid))
else: else:
r = self.query(sql + ' ORDER BY serial DESC LIMIT 1', r = self.query(sql + ' ORDER BY tid DESC LIMIT 1',
(partition, oid)) (partition, oid))
r = r.fetchone() r = r.fetchone()
if r: if r:
...@@ -397,10 +397,10 @@ class SQLiteDatabaseManager(DatabaseManager): ...@@ -397,10 +397,10 @@ class SQLiteDatabaseManager(DatabaseManager):
def finishTransaction(self, tid): def finishTransaction(self, tid):
args = util.u64(tid), args = util.u64(tid),
with self as q: with self as q:
sql = " FROM tobj WHERE serial=?" sql = " FROM tobj WHERE tid=?"
data_id_list = [x for x, in q("SELECT data_id" + sql, args) if x] data_id_list = [x for x, in q("SELECT data_id" + sql, args) if x]
q("INSERT OR FAIL INTO obj SELECT *" + sql, args) q("INSERT OR FAIL INTO obj SELECT *" + sql, args)
q("DELETE FROM tobj WHERE serial=?", args) q("DELETE FROM tobj WHERE tid=?", args)
q("INSERT OR FAIL INTO trans SELECT * FROM ttrans WHERE tid=?", q("INSERT OR FAIL INTO trans SELECT * FROM ttrans WHERE tid=?",
args) args)
q("DELETE FROM ttrans WHERE tid=?", args) q("DELETE FROM ttrans WHERE tid=?", args)
...@@ -411,7 +411,7 @@ class SQLiteDatabaseManager(DatabaseManager): ...@@ -411,7 +411,7 @@ class SQLiteDatabaseManager(DatabaseManager):
tid = u64(tid) tid = u64(tid)
getPartition = self._getPartition getPartition = self._getPartition
with self as q: with self as q:
sql = " FROM tobj WHERE serial=?" sql = " FROM tobj WHERE tid=?"
data_id_list = [x for x, in q("SELECT data_id" + sql, (tid,)) if x] data_id_list = [x for x, in q("SELECT data_id" + sql, (tid,)) if x]
self.unlockData(data_id_list) self.unlockData(data_id_list)
q("DELETE" + sql, (tid,)) q("DELETE" + sql, (tid,))
...@@ -422,7 +422,7 @@ class SQLiteDatabaseManager(DatabaseManager): ...@@ -422,7 +422,7 @@ class SQLiteDatabaseManager(DatabaseManager):
data_id_set = set() data_id_set = set()
for oid in oid_list: for oid in oid_list:
oid = u64(oid) oid = u64(oid)
sql = " FROM obj WHERE partition=? AND oid=? AND serial=?" sql = " FROM obj WHERE partition=? AND oid=? AND tid=?"
args = getPartition(oid), oid, tid args = getPartition(oid), oid, tid
data_id_set.update(*q("SELECT data_id" + sql, args)) data_id_set.update(*q("SELECT data_id" + sql, args))
q("DELETE" + sql, args) q("DELETE" + sql, args)
...@@ -434,7 +434,7 @@ class SQLiteDatabaseManager(DatabaseManager): ...@@ -434,7 +434,7 @@ class SQLiteDatabaseManager(DatabaseManager):
sql = " FROM obj WHERE partition=? AND oid=?" sql = " FROM obj WHERE partition=? AND oid=?"
args = [self._getPartition(oid), oid] args = [self._getPartition(oid), oid]
if serial: if serial:
sql += " AND serial=?" sql += " AND tid=?"
args.append(util.u64(serial)) args.append(util.u64(serial))
with self as q: with self as q:
data_id_list = [x for x, in q("SELECT DISTINCT data_id" + sql, args) data_id_list = [x for x, in q("SELECT DISTINCT data_id" + sql, args)
...@@ -453,7 +453,7 @@ class SQLiteDatabaseManager(DatabaseManager): ...@@ -453,7 +453,7 @@ class SQLiteDatabaseManager(DatabaseManager):
args.append(util.u64(max_tid)) args.append(util.u64(max_tid))
q = self.query q = self.query
q("DELETE FROM trans" + sql, args) q("DELETE FROM trans" + sql, args)
sql = " FROM obj" + sql.replace('tid', 'serial') sql = " FROM obj" + sql
data_id_list = [x for x, in q("SELECT DISTINCT data_id" + sql, args) data_id_list = [x for x, in q("SELECT DISTINCT data_id" + sql, args)
if x] if x]
q("DELETE" + sql, args) q("DELETE" + sql, args)
...@@ -476,9 +476,9 @@ class SQLiteDatabaseManager(DatabaseManager): ...@@ -476,9 +476,9 @@ class SQLiteDatabaseManager(DatabaseManager):
def _getObjectLength(self, oid, value_serial): def _getObjectLength(self, oid, value_serial):
if value_serial is None: if value_serial is None:
raise CreationUndone raise CreationUndone
length, value_serial = self.query("""SELECT LENGTH(value), value_serial length, value_serial = self.query("""SELECT LENGTH(value), value_tid
FROM obj LEFT JOIN data ON obj.data_id=data.id FROM obj LEFT JOIN data ON obj.data_id=data.id
WHERE partition=? AND oid=? AND serial=?""", WHERE partition=? AND oid=? AND tid=?""",
(self._getPartition(oid), oid, value_serial)).fetchone() (self._getPartition(oid), oid, value_serial)).fetchone()
if length is None: if length is None:
neo.lib.logging.info("Multiple levels of indirection" neo.lib.logging.info("Multiple levels of indirection"
...@@ -498,10 +498,10 @@ class SQLiteDatabaseManager(DatabaseManager): ...@@ -498,10 +498,10 @@ class SQLiteDatabaseManager(DatabaseManager):
append = result.append append = result.append
with self as q: with self as q:
for serial, length, value_serial in q("""\ for serial, length, value_serial in q("""\
SELECT serial, LENGTH(value), value_serial SELECT tid, LENGTH(value), value_tid
FROM obj LEFT JOIN data ON obj.data_id = data.id FROM obj LEFT JOIN data ON obj.data_id = data.id
WHERE partition=? AND oid=? AND serial>=? WHERE partition=? AND oid=? AND tid>=?
ORDER BY serial DESC LIMIT ?,?""", ORDER BY tid DESC LIMIT ?,?""",
(self._getPartition(oid), oid, pack_tid, offset, length)): (self._getPartition(oid), oid, pack_tid, offset, length)):
if length is None: if length is None:
try: try:
...@@ -517,10 +517,10 @@ class SQLiteDatabaseManager(DatabaseManager): ...@@ -517,10 +517,10 @@ class SQLiteDatabaseManager(DatabaseManager):
p64 = util.p64 p64 = util.p64
min_tid = u64(min_tid) min_tid = u64(min_tid)
return [(p64(serial), p64(oid)) for serial, oid in self.query("""\ return [(p64(serial), p64(oid)) for serial, oid in self.query("""\
SELECT serial, oid FROM obj SELECT tid, oid FROM obj
WHERE partition=? AND serial<=? WHERE partition=? AND tid<=?
AND (serial=? AND ?<=oid OR ?<serial) AND (tid=? AND ?<=oid OR ?<tid)
ORDER BY serial ASC, oid ASC LIMIT ?""", ORDER BY tid ASC, oid ASC LIMIT ?""",
(partition, u64(max_tid), min_tid, u64(min_oid), min_tid, length))] (partition, u64(max_tid), min_tid, u64(min_oid), min_tid, length))]
def getTIDList(self, offset, length, partition_list): def getTIDList(self, offset, length, partition_list):
...@@ -550,11 +550,11 @@ class SQLiteDatabaseManager(DatabaseManager): ...@@ -550,11 +550,11 @@ class SQLiteDatabaseManager(DatabaseManager):
value_serial = None value_serial = None
q = self.query q = self.query
for T in '', 't': for T in '', 't':
update = """UPDATE OR FAIL %sobj SET value_serial=? update = """UPDATE OR FAIL %sobj SET value_tid=?
WHERE partition=? AND oid=? AND serial=?""" % T WHERE partition=? AND oid=? AND tid=?""" % T
for serial, in q("""SELECT serial FROM %sobj for serial, in q("""SELECT tid FROM %sobj
WHERE partition=? AND oid=? AND serial>=? AND value_serial=? WHERE partition=? AND oid=? AND tid>=? AND value_tid=?
ORDER BY serial ASC""" % T, ORDER BY tid ASC""" % T,
(partition, oid, max_serial, orig_serial)): (partition, oid, max_serial, orig_serial)):
q(update, (value_serial, partition, oid, serial)) q(update, (value_serial, partition, oid, serial))
if value_serial is None: if value_serial is None:
...@@ -571,20 +571,20 @@ class SQLiteDatabaseManager(DatabaseManager): ...@@ -571,20 +571,20 @@ class SQLiteDatabaseManager(DatabaseManager):
with self as q: with self as q:
self._setPackTID(tid) self._setPackTID(tid)
for count, oid, max_serial in q("SELECT COUNT(*) - 1, oid," for count, oid, max_serial in q("SELECT COUNT(*) - 1, oid,"
" MAX(serial) FROM obj WHERE serial<=? GROUP BY oid", " MAX(tid) FROM obj WHERE tid<=? GROUP BY oid",
(tid,)): (tid,)):
partition = getPartition(oid) partition = getPartition(oid)
if q("SELECT 1 FROM obj WHERE partition=?" if q("SELECT 1 FROM obj WHERE partition=?"
" AND oid=? AND serial=? AND data_id IS NULL", " AND oid=? AND tid=? AND data_id IS NULL",
(partition, oid, max_serial)).fetchone(): (partition, oid, max_serial)).fetchone():
max_serial += 1 max_serial += 1
elif not count: elif not count:
continue continue
# There are things to delete for this object # There are things to delete for this object
data_id_set = set() data_id_set = set()
sql = " FROM obj WHERE partition=? AND oid=? AND serial<?" sql = " FROM obj WHERE partition=? AND oid=? AND tid<?"
args = partition, oid, max_serial args = partition, oid, max_serial
for serial, data_id in q("SELECT serial, data_id" + sql, args): for serial, data_id in q("SELECT tid, data_id" + sql, args):
data_id_set.add(data_id) data_id_set.add(data_id)
new_serial = updatePackFuture(oid, serial, max_serial) new_serial = updatePackFuture(oid, serial, max_serial)
if new_serial: if new_serial:
...@@ -615,11 +615,11 @@ class SQLiteDatabaseManager(DatabaseManager): ...@@ -615,11 +615,11 @@ class SQLiteDatabaseManager(DatabaseManager):
# last grouped value, instead of the greatest one. # last grouped value, instead of the greatest one.
min_oid = u64(min_oid) min_oid = u64(min_oid)
r = self.query("""\ r = self.query("""\
SELECT oid, serial SELECT oid, tid
FROM obj FROM obj
WHERE partition=? AND serial<=? WHERE partition=? AND tid<=?
AND (oid>? OR oid=? AND serial>=?) AND (oid>? OR oid=? AND tid>=?)
ORDER BY oid ASC, serial ASC LIMIT ?""", ORDER BY oid ASC, tid ASC LIMIT ?""",
(partition, u64(max_tid), min_oid, min_oid, u64(min_serial), (partition, u64(max_tid), min_oid, min_oid, u64(min_serial),
-1 if length is None else length)).fetchall() -1 if length is None else length)).fetchall()
if r: if r:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment