Commit 788b65f9 authored by Julien Muchembled's avatar Julien Muchembled

wip

parent ae7e18a2
......@@ -282,7 +282,7 @@ class ImporterDatabaseManager(DatabaseManager):
def __init__(self, *args, **kw):
super(ImporterDatabaseManager, self).__init__(*args, **kw)
implements(self, """_getNextTID checkSerialRange checkTIDRange
deleteObject deleteTransaction dropPartitions getLastTID
deleteObject deleteTransaction _dropPartition getLastTID
getReplicationObjectList _getTIDList nonempty""".split())
_getPartition = property(lambda self: self.db._getPartition)
......
......@@ -18,6 +18,7 @@ import os, errno, socket, struct, sys, threading
from collections import defaultdict
from contextlib import contextmanager
from functools import wraps
from time import time
from neo.lib import logging, util
from neo.lib.exception import DatabaseFailure
from neo.lib.interfaces import abstract, requires
......@@ -60,6 +61,8 @@ class DatabaseManager(object):
LOCKED = "error: database is locked"
_deferred = 0
_drop_stats = 0, 0
_dropping = None
_duplicating = _repairing = None
def __init__(self, database, engine=None, wait=None):
......@@ -168,7 +171,8 @@ class DatabaseManager(object):
return version
def doOperation(self, app):
pass
if self._dropping:
self._dropPartitions(app)
def _close(self):
"""Backend-specific code to close the database"""
......@@ -453,11 +457,13 @@ class DatabaseManager(object):
"""
@requires(_changePartitionTable)
def changePartitionTable(self, ptid, cell_list, reset=False):
def changePartitionTable(self, app, ptid, cell_list, reset=False):
dropping = self._dropping or set()
readable_set = self._readable_set
if reset:
readable_set.clear()
np = self.getNumPartitions()
dropping.update(xrange(np))
def _getPartition(x, np=np):
return x % np
def _getReadablePartition(x, np=np, r=readable_set):
......@@ -470,6 +476,10 @@ class DatabaseManager(object):
me = self.getUUID()
for offset, nid, state in cell_list:
if nid == me:
if state == CellStates.DISCARDED:
dropping.add(offset)
else:
dropping.discard(offset)
if CellStates.UP_TO_DATE != state != CellStates.FEEDING:
readable_set.discard(offset)
else:
......@@ -477,10 +487,57 @@ class DatabaseManager(object):
self._changePartitionTable(cell_list, reset)
assert isinstance(ptid, (int, long)), ptid
self._setConfiguration('ptid', str(ptid))
if dropping and not self._dropping:
self._dropping = dropping
if app.operational:
self._dropPartitions(app)
def _dropPartitions(self, app):
if app.disable_drop_partitions:
logging.info("don't drop data for partitions %r", self._dropping)
return
def dropPartitions():
dropping = self._dropping
before = drop_count, drop_time = self._drop_stats
dropped = 0
while dropping:
offset = next(iter(dropping))
log = dropped
while True:
yield 1
if offset not in dropping:
break
start = time()
data_id_list = self._dropPartition(offset,
# The efficiency drops when the number of lines to
# delete is too small so do not delete too few.
max(100, int(.1 * drop_count / drop_time))
if drop_time else 1000)
if data_id_list is None:
dropping.remove(offset)
break
if log == dropped:
dropped += 1
logging.info("dropping partition %s...", offset)
drop_count += self._pruneData(data_id_list)
drop_time += time() - start
self.commit()
self._drop_stats = drop_count, drop_time
if dropped:
logging.info("%s partition(s) dropped"
" (stats: count: %s/%s, time: %.4s/%.4s)",
dropped, drop_count - before[0], drop_count,
round(drop_time - before[1], 3), round(drop_time, 3))
app.newTask(dropPartitions())
@abstract
def dropPartitions(self, offset_list):
"""Delete all data for specified partitions"""
def _dropPartition(self, offset, count):
"""Delete rows for given partition
Delete at most 'count' rows of from obj:
- if there's no line to delete, purge trans and return None
- else return data ids of deleted rows
"""
def _getUnfinishedDataIdList(self):
"""Drop any unfinished data from a database."""
......
......@@ -405,19 +405,15 @@ class MySQLDatabaseManager(DatabaseManager):
" ON DUPLICATE KEY UPDATE state = %d"
% (offset, nid, state, state))
def dropPartitions(self, offset_list):
def _dropPartition(self, offset, count):
q = self.query
# XXX: these queries are inefficient (execution time increase with
# row count, although we use indexes) when there are rows to
# delete. It should be done as an idle task, by chunks.
for partition in offset_list:
where = " WHERE `partition`=%d" % partition
data_id_list = [x for x, in
q("SELECT DISTINCT data_id FROM obj USE INDEX(PRIMARY)" + where)
if x]
where = " WHERE `partition`=%s ORDER BY tid, oid LIMIT %s" % (
offset, count)
x = q("SELECT data_id FROM obj USE INDEX(PRIMARY)" + where)
if x:
q("DELETE FROM obj" + where)
q("DELETE FROM trans" + where)
self._pruneData(data_id_list)
return [x for x, in x if x]
q("DELETE FROM trans WHERE `partition`=%s" % offset)
def _getUnfinishedDataIdList(self):
return [x for x, in self.query("SELECT data_id FROM tobj") if x]
......
......@@ -332,16 +332,14 @@ class SQLiteDatabaseManager(DatabaseManager):
q("INSERT OR FAIL INTO pt VALUES (?,?,?)",
(offset, nid, int(state)))
def dropPartitions(self, offset_list):
where = " WHERE partition=?"
def _dropPartition(self, *args):
q = self.query
for partition in offset_list:
args = partition,
data_id_list = [x for x, in
q("SELECT DISTINCT data_id FROM obj" + where, args) if x]
q("DELETE FROM obj" + where, args)
q("DELETE FROM trans" + where, args)
self._pruneData(data_id_list)
where = " FROM obj WHERE partition=? ORDER BY tid, oid LIMIT ?"
x = q("SELECT data_id" + where, args).fetchall()
if x:
q("DELETE" + where, args)
return [x for x, in x if x]
q("DELETE FROM trans WHERE partition=?", args[:1])
def _getUnfinishedDataIdList(self):
return [x for x, in self.query("SELECT data_id FROM tobj") if x]
......
......@@ -71,7 +71,7 @@ class BaseMasterHandler(BaseHandler):
if ptid != 1 + app.pt.getID():
raise ProtocolError('wrong partition table id')
app.pt.update(ptid, cell_list, app.nm)
app.dm.changePartitionTable(ptid, cell_list)
app.dm.changePartitionTable(app, ptid, cell_list)
if app.operational:
app.replicator.notifyPartitionChanges(cell_list)
app.dm.commit()
......
......@@ -26,25 +26,11 @@ class InitializationHandler(BaseMasterHandler):
pt.load(ptid, row_list, app.nm)
if not pt.filled():
raise ProtocolError('Partial partition table received')
# Install the partition table into the database for persistence.
cell_list = []
offset_list = xrange(pt.getPartitions())
unassigned_set = set(offset_list)
for offset in offset_list:
for cell in pt.getCellList(offset):
cell_list.append((offset, cell.getUUID(), cell.getState()))
if cell.getUUID() == app.uuid:
unassigned_set.remove(offset)
# delete objects database
cell_list = [(offset, cell.getUUID(), cell.getState())
for offset in xrange(pt.getPartitions())
for cell in pt.getCellList(offset)]
dm = app.dm
if unassigned_set:
if app.disable_drop_partitions:
logging.info("don't drop data for partitions %r", unassigned_set)
else:
logging.debug('drop data for partitions %r', unassigned_set)
dm.dropPartitions(unassigned_set)
dm.changePartitionTable(ptid, cell_list, reset=True)
dm.changePartitionTable(app, ptid, cell_list, reset=True)
dm.commit()
def truncate(self, conn, tid):
......
......@@ -90,7 +90,7 @@ class StorageMasterHandlerTests(NeoUnitTestBase):
# dm call
calls = self.app.dm.mockGetNamedCalls('changePartitionTable')
self.assertEqual(len(calls), 1)
calls[0].checkArgs(ptid, cells)
calls[0].checkArgs(app, ptid, cells)
if __name__ == "__main__":
unittest.main()
......@@ -15,13 +15,23 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from binascii import a2b_hex
from collections import deque
from contextlib import contextmanager
import unittest
from neo.lib.util import add64, p64, u64
from neo.lib.protocol import CellStates, ZERO_HASH, ZERO_OID, ZERO_TID, MAX_TID
from neo.storage.app import Application
from .. import NeoUnitTestBase
class DummyApplication(deque):
newTask = Application.newTask.__func__
disable_drop_partitions = False
operational = True
task_queue = property(lambda self: self)
class StorageDBTests(NeoUnitTestBase):
_last_ttid = ZERO_TID
......@@ -69,7 +79,7 @@ class StorageDBTests(NeoUnitTestBase):
uuid = self.getStorageUUID()
db.setUUID(uuid)
self.assertEqual(uuid, db.getUUID())
db.changePartitionTable(1,
db.changePartitionTable(None, 1,
[(i, uuid, CellStates.UP_TO_DATE) for i in xrange(num_partitions)],
reset=True)
db.commit()
......@@ -102,15 +112,6 @@ class StorageDBTests(NeoUnitTestBase):
db = self.getDB()
self.checkConfigEntry(db.getName, db.setName, 'TEST_NAME')
def test_getPartitionTable(self):
db = self.getDB()
uuid1, uuid2 = self.getStorageUUID(), self.getStorageUUID()
cell1 = (0, uuid1, CellStates.OUT_OF_DATE)
cell2 = (1, uuid1, CellStates.UP_TO_DATE)
db.changePartitionTable(1, [cell1, cell2], 1)
result = db.getPartitionTable()
self.assertEqual(set(result), {cell1, cell2})
def getOIDs(self, count):
return map(p64, xrange(count))
......@@ -202,7 +203,9 @@ class StorageDBTests(NeoUnitTestBase):
OBJECT_T1_NEXT)
def test_setPartitionTable(self):
app = DummyApplication()
db = self.getDB()
db.setNumPartitions(1)
ptid = 1
uuid = self.getStorageUUID()
cell1 = 0, uuid, CellStates.OUT_OF_DATE
......@@ -211,17 +214,20 @@ class StorageDBTests(NeoUnitTestBase):
# no partition table
self.assertEqual(list(db.getPartitionTable()), [])
# set one
db.changePartitionTable(ptid, [cell1], 1)
db.changePartitionTable(app, ptid, [cell1], 1)
result = db.getPartitionTable()
self.assertEqual(list(result), [cell1])
# then another
db.changePartitionTable(ptid, [cell2], 1)
db.changePartitionTable(None, ptid, [cell2], 1)
result = db.getPartitionTable()
self.assertEqual(list(result), [cell2])
# drop discarded cells
db.changePartitionTable(ptid, [cell2, cell3], 1)
db.changePartitionTable(None, ptid, [cell2, cell3], 1)
result = db.getPartitionTable()
self.assertEqual(list(result), [])
self.assertTrue(db._dropping)
task, = app
self.assertEqual(list(task), [])
def test_changePartitionTable(self):
db = self.getDB()
......@@ -233,15 +239,15 @@ class StorageDBTests(NeoUnitTestBase):
# no partition table
self.assertEqual(list(db.getPartitionTable()), [])
# set one
db.changePartitionTable(ptid, [cell1])
db.changePartitionTable(None, ptid, [cell1])
result = db.getPartitionTable()
self.assertEqual(list(result), [cell1])
# add more entries
db.changePartitionTable(ptid, [cell2])
db.changePartitionTable(None, ptid, [cell2])
result = db.getPartitionTable()
self.assertEqual(set(result), {cell1, cell2})
# drop discarded cells
db.changePartitionTable(ptid, [cell2, cell3])
db.changePartitionTable(None, ptid, [cell2, cell3])
result = db.getPartitionTable()
self.assertEqual(list(result), [cell1])
......
......@@ -345,13 +345,10 @@ class ReplicationTests(NEOThreadedTest):
return isinstance(packet, delayed) and \
packet.decode()[0] == offset and \
conn in s1.getConnectionList(s0)
def changePartitionTable(orig, ptid, cell_list):
def changePartitionTable(orig, app, ptid, cell_list):
if (offset, s0.uuid, CellStates.DISCARDED) in cell_list:
connection_filter.remove(delayAskFetch)
# XXX: this is currently not done by
# default for performance reason
orig.im_self.dropPartitions((offset,))
return orig(ptid, cell_list)
return orig(app, ptid, cell_list)
np = cluster.num_partitions
s0, s1, s2 = cluster.storage_list
for delayed in Packets.AskFetchTransactions, Packets.AskFetchObjects:
......@@ -550,16 +547,7 @@ class ReplicationTests(NEOThreadedTest):
cluster.neoctl.tweakPartitionTable()
self.tic()
self.assertEqual(1, s1.sqlCount('obj'))
# Deletion should start as soon as the cell is discarded, as a
# background task, instead of doing it during initialization.
count = s0.sqlCount('obj')
s0.stop()
cluster.join((s0,))
s0.resetNode()
s0.start()
self.tic()
self.assertEqual(2, s0.sqlCount('obj'))
expectedFailure(self.assertEqual)(2, count)
@with_cluster(start_cluster=0, replicas=1)
def testResumingReplication(self, cluster):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment