Commit bc9c252b authored by Julien Muchembled's avatar Julien Muchembled

mysql: implement horizontal partitioning but keep it disabled

This makes dropping of partitions very fast, and should also speed up all
other queries to the 'trans', 'obj' & 'obj_short' tables.

However, it is not enabled due to a bug in MySQL 5.1 & 5.5.

git-svn-id: https://svn.erp5.org/repos/neo/trunk@2828 71dcc9de-d417-0410-9af5-da40c76e7ee4
git-svn-id: https://svn.erp5.org/repos/neo/trunk@2829 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent f4b0d4f7
...@@ -45,6 +45,10 @@ def splitOIDField(tid, oids): ...@@ -45,6 +45,10 @@ def splitOIDField(tid, oids):
class MySQLDatabaseManager(DatabaseManager): class MySQLDatabaseManager(DatabaseManager):
"""This class manages a database on MySQL.""" """This class manages a database on MySQL."""
# Disabled even on MySQL 5.1 & 5.5 because 'select count(*) from obj'
# sometimes returns incorrect values.
_use_partition = False
def __init__(self, database): def __init__(self, database):
super(MySQLDatabaseManager, self).__init__() super(MySQLDatabaseManager, self).__init__()
self.user, self.passwd, self.db = self._parse(database) self.user, self.passwd, self.db = self._parse(database)
...@@ -151,6 +155,9 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -151,6 +155,9 @@ class MySQLDatabaseManager(DatabaseManager):
PRIMARY KEY (rid, uuid) PRIMARY KEY (rid, uuid)
) ENGINE = InnoDB""") ) ENGINE = InnoDB""")
p = self._use_partition and """ PARTITION BY LIST (partition) (
PARTITION dummy VALUES IN (NULL))""" or ''
# The table "trans" stores information on committed transactions. # The table "trans" stores information on committed transactions.
q("""CREATE TABLE IF NOT EXISTS trans ( q("""CREATE TABLE IF NOT EXISTS trans (
partition SMALLINT UNSIGNED NOT NULL, partition SMALLINT UNSIGNED NOT NULL,
...@@ -161,7 +168,7 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -161,7 +168,7 @@ class MySQLDatabaseManager(DatabaseManager):
description BLOB NOT NULL, description BLOB NOT NULL,
ext BLOB NOT NULL, ext BLOB NOT NULL,
PRIMARY KEY (partition, tid) PRIMARY KEY (partition, tid)
) ENGINE = InnoDB""") ) ENGINE = InnoDB""" + p)
# The table "obj" stores committed object data. # The table "obj" stores committed object data.
q("""CREATE TABLE IF NOT EXISTS obj ( q("""CREATE TABLE IF NOT EXISTS obj (
...@@ -173,7 +180,7 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -173,7 +180,7 @@ class MySQLDatabaseManager(DatabaseManager):
value LONGBLOB NULL, value LONGBLOB NULL,
value_serial BIGINT UNSIGNED NULL, value_serial BIGINT UNSIGNED NULL,
PRIMARY KEY (partition, oid, serial) PRIMARY KEY (partition, oid, serial)
) ENGINE = InnoDB""") ) ENGINE = InnoDB""" + p)
# The table "obj_short" contains columns which are accessed in queries # The table "obj_short" contains columns which are accessed in queries
# which don't need to access object data. This is needed because InnoDB # which don't need to access object data. This is needed because InnoDB
...@@ -183,7 +190,7 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -183,7 +190,7 @@ class MySQLDatabaseManager(DatabaseManager):
'oid BIGINT UNSIGNED NOT NULL,' 'oid BIGINT UNSIGNED NOT NULL,'
'serial BIGINT UNSIGNED NOT NULL,' 'serial BIGINT UNSIGNED NOT NULL,'
'PRIMARY KEY (partition, oid, serial)' 'PRIMARY KEY (partition, oid, serial)'
') ENGINE = InnoDB') ') ENGINE = InnoDB' + p)
# The table "ttrans" stores information on uncommitted transactions. # The table "ttrans" stores information on uncommitted transactions.
q("""CREATE TABLE IF NOT EXISTS ttrans ( q("""CREATE TABLE IF NOT EXISTS ttrans (
...@@ -367,6 +374,7 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -367,6 +374,7 @@ class MySQLDatabaseManager(DatabaseManager):
def doSetPartitionTable(self, ptid, cell_list, reset): def doSetPartitionTable(self, ptid, cell_list, reset):
q = self.query q = self.query
e = self.escape e = self.escape
offset_list = []
self.begin() self.begin()
try: try:
if reset: if reset:
...@@ -379,6 +387,7 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -379,6 +387,7 @@ class MySQLDatabaseManager(DatabaseManager):
q("""DELETE FROM pt WHERE rid = %d AND uuid = '%s'""" \ q("""DELETE FROM pt WHERE rid = %d AND uuid = '%s'""" \
% (offset, uuid)) % (offset, uuid))
else: else:
offset_list.append(offset)
q("""INSERT INTO pt VALUES (%d, '%s', %d) q("""INSERT INTO pt VALUES (%d, '%s', %d)
ON DUPLICATE KEY UPDATE state = %d""" \ ON DUPLICATE KEY UPDATE state = %d""" \
% (offset, uuid, state, state)) % (offset, uuid, state, state))
...@@ -387,6 +396,16 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -387,6 +396,16 @@ class MySQLDatabaseManager(DatabaseManager):
self.rollback() self.rollback()
raise raise
self.commit() self.commit()
if self._use_partition:
for offset in offset_list:
add = """ALTER TABLE %%s ADD PARTITION (
PARTITION p%u VALUES IN (%u))""" % (offset, offset)
for table in 'trans', 'obj', 'obj_short':
try:
self.conn.query(add % table)
except OperationalError, (code, _):
if code != 1517: # duplicate partition name
raise
def changePartitionTable(self, ptid, cell_list): def changePartitionTable(self, ptid, cell_list):
self.doSetPartitionTable(ptid, cell_list, False) self.doSetPartitionTable(ptid, cell_list, False)
...@@ -396,6 +415,16 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -396,6 +415,16 @@ class MySQLDatabaseManager(DatabaseManager):
def dropPartitions(self, num_partitions, offset_list): def dropPartitions(self, num_partitions, offset_list):
q = self.query q = self.query
if self._use_partition:
drop = "ALTER TABLE %s DROP PARTITION" + \
','.join(' p%u' % i for i in offset_list)
for table in 'trans', 'obj', 'obj_short':
try:
self.conn.query(drop % table)
except OperationalError, (code, _):
if code != 1508: # already dropped
raise
return
e = self.escape e = self.escape
offset_list = ', '.join((str(i) for i in offset_list)) offset_list = ', '.join((str(i) for i in offset_list))
self.begin() self.begin()
......
...@@ -22,10 +22,10 @@ from neo.storage.database.btree import BTreeDatabaseManager ...@@ -22,10 +22,10 @@ from neo.storage.database.btree import BTreeDatabaseManager
class StorageBTreeTests(StorageDBTests): class StorageBTreeTests(StorageDBTests):
def getDB(self): def getDB(self, reset=0):
# db manager # db manager
db = BTreeDatabaseManager('') db = BTreeDatabaseManager('')
db.setup() db.setup(reset)
return db return db
del StorageDBTests del StorageDBTests
......
...@@ -29,23 +29,49 @@ class StorageDBTests(NeoUnitTestBase): ...@@ -29,23 +29,49 @@ class StorageDBTests(NeoUnitTestBase):
def setUp(self): def setUp(self):
NeoUnitTestBase.setUp(self) NeoUnitTestBase.setUp(self)
self.db = self.getDB()
@property
def db(self):
try:
return self._db
except AttributeError:
self.setNumPartitions(1)
return self._db
def tearDown(self): def tearDown(self):
self.closeDB() try:
del self._db
except AttributeError:
pass
NeoUnitTestBase.tearDown(self) NeoUnitTestBase.tearDown(self)
def getDB(self): def getDB(self):
raise NotImplementedError raise NotImplementedError
def closeDB(self): def setNumPartitions(self, num_partitions, reset=0):
pass try:
db = self._db
def test_configuration(self): except AttributeError:
# check if a configuration entry is well written self._db = db = self.getDB(reset)
self.db.setConfiguration('a', 'c') else:
result = self.db.getConfiguration('a') if reset:
self.assertEqual(result, 'c') db.setup(reset)
else:
try:
n = db.getNumPartitions()
except KeyError:
n = 0
if num_partitions == n:
return
if num_partitions < n:
db.dropPartitions(n, range(num_partitions, n))
db.setNumPartitions(num_partitions)
self.assertEqual(num_partitions, db.getNumPartitions())
uuid = self.getNewUUID()
db.setUUID(uuid)
self.assertEqual(uuid, db.getUUID())
db.setPartitionTable(1,
[(i, uuid, CellStates.UP_TO_DATE) for i in xrange(num_partitions)])
def checkConfigEntry(self, get_call, set_call, value): def checkConfigEntry(self, get_call, set_call, value):
# generic test for all configuration entries accessors # generic test for all configuration entries accessors
...@@ -56,39 +82,36 @@ class StorageDBTests(NeoUnitTestBase): ...@@ -56,39 +82,36 @@ class StorageDBTests(NeoUnitTestBase):
self.assertEqual(get_call(), value * 2) self.assertEqual(get_call(), value * 2)
def test_UUID(self): def test_UUID(self):
self.checkConfigEntry(self.db.getUUID, self.db.setUUID, 'TEST_VALUE') db = self.getDB()
self.checkConfigEntry(db.getUUID, db.setUUID, 'TEST_VALUE')
def test_NumPartitions(self):
self.db.setup(reset=True)
self.checkConfigEntry(self.db.getNumPartitions,
self.db.setNumPartitions, 10)
def test_Name(self): def test_Name(self):
self.checkConfigEntry(self.db.getName, self.db.setName, 'TEST_NAME') db = self.getDB()
self.checkConfigEntry(db.getName, db.setName, 'TEST_NAME')
def test_15_PTID(self): def test_15_PTID(self):
self.checkConfigEntry( db = self.getDB()
get_call=self.db.getPTID, self.checkConfigEntry(db.getPTID, db.setPTID, self.getPTID(1))
set_call=self.db.setPTID,
value=self.getPTID(1))
def test_getPartitionTable(self): def test_getPartitionTable(self):
db = self.getDB()
ptid = self.getPTID(1) ptid = self.getPTID(1)
uuid1, uuid2 = self.getNewUUID(), self.getNewUUID() uuid1, uuid2 = self.getNewUUID(), self.getNewUUID()
cell1 = (0, uuid1, CellStates.OUT_OF_DATE) cell1 = (0, uuid1, CellStates.OUT_OF_DATE)
cell2 = (1, uuid1, CellStates.UP_TO_DATE) cell2 = (1, uuid1, CellStates.UP_TO_DATE)
self.db.setPartitionTable(ptid, [cell1, cell2]) db.setPartitionTable(ptid, [cell1, cell2])
result = self.db.getPartitionTable() result = db.getPartitionTable()
self.assertEqual(set(result), set([cell1, cell2])) self.assertEqual(set(result), set([cell1, cell2]))
def test_getLastOID(self): def test_getLastOID(self):
db = self.getDB()
oid1 = self.getOID(1) oid1 = self.getOID(1)
self.db.setLastOID(oid1) db.setLastOID(oid1)
result1 = self.db.getLastOID() result1 = db.getLastOID()
self.assertEqual(result1, oid1) self.assertEqual(result1, oid1)
def getOIDs(self, count): def getOIDs(self, count):
return [self.getOID(i) for i in xrange(count)] return map(self.getOID, xrange(count))
def getTIDs(self, count): def getTIDs(self, count):
tid_list = [self.getNextTID()] tid_list = [self.getNextTID()]
...@@ -197,45 +220,47 @@ class StorageDBTests(NeoUnitTestBase): ...@@ -197,45 +220,47 @@ class StorageDBTests(NeoUnitTestBase):
OBJECT_T1_NEXT) OBJECT_T1_NEXT)
def test_setPartitionTable(self): def test_setPartitionTable(self):
db = self.getDB()
ptid = self.getPTID(1) ptid = self.getPTID(1)
uuid1, uuid2 = self.getNewUUID(), self.getNewUUID() uuid1, uuid2 = self.getNewUUID(), self.getNewUUID()
cell1 = (0, uuid1, CellStates.OUT_OF_DATE) cell1 = (0, uuid1, CellStates.OUT_OF_DATE)
cell2 = (1, uuid1, CellStates.UP_TO_DATE) cell2 = (1, uuid1, CellStates.UP_TO_DATE)
cell3 = (1, uuid1, CellStates.DISCARDED) cell3 = (1, uuid1, CellStates.DISCARDED)
# no partition table # no partition table
self.assertEqual(self.db.getPartitionTable(), []) self.assertEqual(db.getPartitionTable(), [])
# set one # set one
self.db.setPartitionTable(ptid, [cell1]) db.setPartitionTable(ptid, [cell1])
result = self.db.getPartitionTable() result = db.getPartitionTable()
self.assertEqual(result, [cell1]) self.assertEqual(result, [cell1])
# then another # then another
self.db.setPartitionTable(ptid, [cell2]) db.setPartitionTable(ptid, [cell2])
result = self.db.getPartitionTable() result = db.getPartitionTable()
self.assertEqual(result, [cell2]) self.assertEqual(result, [cell2])
# drop discarded cells # drop discarded cells
self.db.setPartitionTable(ptid, [cell2, cell3]) db.setPartitionTable(ptid, [cell2, cell3])
result = self.db.getPartitionTable() result = db.getPartitionTable()
self.assertEqual(result, []) self.assertEqual(result, [])
def test_changePartitionTable(self): def test_changePartitionTable(self):
db = self.getDB()
ptid = self.getPTID(1) ptid = self.getPTID(1)
uuid1, uuid2 = self.getNewUUID(), self.getNewUUID() uuid1, uuid2 = self.getNewUUID(), self.getNewUUID()
cell1 = (0, uuid1, CellStates.OUT_OF_DATE) cell1 = (0, uuid1, CellStates.OUT_OF_DATE)
cell2 = (1, uuid1, CellStates.UP_TO_DATE) cell2 = (1, uuid1, CellStates.UP_TO_DATE)
cell3 = (1, uuid1, CellStates.DISCARDED) cell3 = (1, uuid1, CellStates.DISCARDED)
# no partition table # no partition table
self.assertEqual(self.db.getPartitionTable(), []) self.assertEqual(db.getPartitionTable(), [])
# set one # set one
self.db.changePartitionTable(ptid, [cell1]) db.changePartitionTable(ptid, [cell1])
result = self.db.getPartitionTable() result = db.getPartitionTable()
self.assertEqual(result, [cell1]) self.assertEqual(result, [cell1])
# add more entries # add more entries
self.db.changePartitionTable(ptid, [cell2]) db.changePartitionTable(ptid, [cell2])
result = self.db.getPartitionTable() result = db.getPartitionTable()
self.assertEqual(set(result), set([cell1, cell2])) self.assertEqual(set(result), set([cell1, cell2]))
# drop discarded cells # drop discarded cells
self.db.changePartitionTable(ptid, [cell2, cell3]) db.changePartitionTable(ptid, [cell2, cell3])
result = self.db.getPartitionTable() result = db.getPartitionTable()
self.assertEqual(result, [cell1]) self.assertEqual(result, [cell1])
def test_dropUnfinishedData(self): def test_dropUnfinishedData(self):
...@@ -337,7 +362,7 @@ class StorageDBTests(NeoUnitTestBase): ...@@ -337,7 +362,7 @@ class StorageDBTests(NeoUnitTestBase):
self.assertEqual(self.db.getTransaction(tid2, True), None) self.assertEqual(self.db.getTransaction(tid2, True), None)
def test_deleteTransactionsAbove(self): def test_deleteTransactionsAbove(self):
self.db.setNumPartitions(2) self.setNumPartitions(2)
tid1 = self.getOID(0) tid1 = self.getOID(0)
tid2 = self.getOID(1) tid2 = self.getOID(1)
tid3 = self.getOID(2) tid3 = self.getOID(2)
...@@ -372,7 +397,7 @@ class StorageDBTests(NeoUnitTestBase): ...@@ -372,7 +397,7 @@ class StorageDBTests(NeoUnitTestBase):
objs2[1][1:]) objs2[1][1:])
def test_deleteObjectsAbove(self): def test_deleteObjectsAbove(self):
self.db.setNumPartitions(2) self.setNumPartitions(2)
tid1 = self.getOID(1) tid1 = self.getOID(1)
tid2 = self.getOID(2) tid2 = self.getOID(2)
tid3 = self.getOID(3) tid3 = self.getOID(3)
...@@ -442,8 +467,7 @@ class StorageDBTests(NeoUnitTestBase): ...@@ -442,8 +467,7 @@ class StorageDBTests(NeoUnitTestBase):
self.assertEqual(result, None) self.assertEqual(result, None)
def test_getObjectHistoryFrom(self): def test_getObjectHistoryFrom(self):
self.db.setup() self.setNumPartitions(2)
self.db.setNumPartitions(2)
oid1 = self.getOID(0) oid1 = self.getOID(0)
oid2 = self.getOID(2) oid2 = self.getOID(2)
oid3 = self.getOID(1) oid3 = self.getOID(1)
...@@ -508,8 +532,7 @@ class StorageDBTests(NeoUnitTestBase): ...@@ -508,8 +532,7 @@ class StorageDBTests(NeoUnitTestBase):
return tid_list return tid_list
def test_getTIDList(self): def test_getTIDList(self):
self.db.setup(True) self.setNumPartitions(2, True)
self.db.setNumPartitions(2)
tid1, tid2, tid3, tid4 = self._storeTransactions(4) tid1, tid2, tid3, tid4 = self._storeTransactions(4)
# get tids # get tids
# - all partitions # - all partitions
...@@ -529,8 +552,7 @@ class StorageDBTests(NeoUnitTestBase): ...@@ -529,8 +552,7 @@ class StorageDBTests(NeoUnitTestBase):
self.checkSet(result, []) self.checkSet(result, [])
def test_getReplicationTIDList(self): def test_getReplicationTIDList(self):
self.db.setup(True) self.setNumPartitions(2, True)
self.db.setNumPartitions(2)
tid1, tid2, tid3, tid4 = self._storeTransactions(4) tid1, tid2, tid3, tid4 = self._storeTransactions(4)
# get tids # get tids
# - all # - all
...@@ -553,9 +575,8 @@ class StorageDBTests(NeoUnitTestBase): ...@@ -553,9 +575,8 @@ class StorageDBTests(NeoUnitTestBase):
self.checkSet(result, [tid1]) self.checkSet(result, [tid1])
def test__getObjectData(self): def test__getObjectData(self):
self.setNumPartitions(4, True)
db = self.db db = self.db
db.setup(reset=True)
self.db.setNumPartitions(4)
tid0 = self.getNextTID() tid0 = self.getNextTID()
tid1 = self.getNextTID() tid1 = self.getNextTID()
tid2 = self.getNextTID() tid2 = self.getNextTID()
...@@ -640,9 +661,8 @@ class StorageDBTests(NeoUnitTestBase): ...@@ -640,9 +661,8 @@ class StorageDBTests(NeoUnitTestBase):
self.assertEqual(sum(call_counter), 1) self.assertEqual(sum(call_counter), 1)
def test__getDataTIDFromData(self): def test__getDataTIDFromData(self):
self.setNumPartitions(4, True)
db = self.db db = self.db
db.setup(reset=True)
self.db.setNumPartitions(4)
tid1 = self.getNextTID() tid1 = self.getNextTID()
tid2 = self.getNextTID() tid2 = self.getNextTID()
oid1 = self.getOID(1) oid1 = self.getOID(1)
...@@ -665,9 +685,8 @@ class StorageDBTests(NeoUnitTestBase): ...@@ -665,9 +685,8 @@ class StorageDBTests(NeoUnitTestBase):
(u64(tid2), u64(tid1))) (u64(tid2), u64(tid1)))
def test__getDataTID(self): def test__getDataTID(self):
self.setNumPartitions(4, True)
db = self.db db = self.db
db.setup(reset=True)
self.db.setNumPartitions(4)
tid1 = self.getNextTID() tid1 = self.getNextTID()
tid2 = self.getNextTID() tid2 = self.getNextTID()
oid1 = self.getOID(1) oid1 = self.getOID(1)
...@@ -688,9 +707,8 @@ class StorageDBTests(NeoUnitTestBase): ...@@ -688,9 +707,8 @@ class StorageDBTests(NeoUnitTestBase):
(u64(tid2), u64(tid1))) (u64(tid2), u64(tid1)))
def test_findUndoTID(self): def test_findUndoTID(self):
self.setNumPartitions(4, True)
db = self.db db = self.db
db.setup(reset=True)
self.db.setNumPartitions(4)
tid1 = self.getNextTID() tid1 = self.getNextTID()
tid2 = self.getNextTID() tid2 = self.getNextTID()
tid3 = self.getNextTID() tid3 = self.getNextTID()
......
...@@ -27,18 +27,14 @@ NEO_SQL_USER = 'test' ...@@ -27,18 +27,14 @@ NEO_SQL_USER = 'test'
class StorageMySQSLdbTests(StorageDBTests): class StorageMySQSLdbTests(StorageDBTests):
def getDB(self): def getDB(self, reset=0):
self.prepareDatabase(number=1, prefix=NEO_SQL_DATABASE[:-1]) self.prepareDatabase(number=1, prefix=NEO_SQL_DATABASE[:-1])
# db manager # db manager
database = '%s@%s' % (NEO_SQL_USER, NEO_SQL_DATABASE) database = '%s@%s' % (NEO_SQL_USER, NEO_SQL_DATABASE)
db = MySQLDatabaseManager(database) db = MySQLDatabaseManager(database)
db.setup() db.setup(reset)
db.setNumPartitions(1)
return db return db
def closeDB(self):
self.db.close()
def checkCalledQuery(self, query=None, call=0): def checkCalledQuery(self, query=None, call=0):
self.assertTrue(len(self.db.conn.mockGetNamedCalls('query')) > call) self.assertTrue(len(self.db.conn.mockGetNamedCalls('query')) > call)
call = self.db.conn.mockGetNamedCalls('query')[call] call = self.db.conn.mockGetNamedCalls('query')[call]
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment