Commit 9a7f8c7f authored by Vincent Pelletier's avatar Vincent Pelletier

Turn critical_tid_dict into a list.

This requires explicitly collecting garbage if we get disconnected from
primary master.

git-svn-id: https://svn.erp5.org/repos/neo/trunk@2436 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 987ffc47
...@@ -201,6 +201,7 @@ class Application(object): ...@@ -201,6 +201,7 @@ class Application(object):
except OperationFailure, msg: except OperationFailure, msg:
neo.logging.error('operation stopped: %s', msg) neo.logging.error('operation stopped: %s', msg)
except PrimaryFailure, msg: except PrimaryFailure, msg:
self.replicator.masterLost()
neo.logging.error('primary master is down: %s', msg) neo.logging.error('primary master is down: %s', msg)
self.master_node = None self.master_node = None
......
...@@ -25,7 +25,7 @@ class MasterOperationHandler(BaseMasterHandler): ...@@ -25,7 +25,7 @@ class MasterOperationHandler(BaseMasterHandler):
""" This handler is used for the primary master """ """ This handler is used for the primary master """
def answerLastIDs(self, conn, loid, ltid, lptid): def answerLastIDs(self, conn, loid, ltid, lptid):
self.app.replicator.setCriticalTID(conn.getUUID(), ltid) self.app.replicator.setCriticalTID(ltid)
def answerUnfinishedTransactions(self, conn, tid_list): def answerUnfinishedTransactions(self, conn, tid_list):
self.app.replicator.setUnfinishedTIDList(tid_list) self.app.replicator.setUnfinishedTIDList(tid_list)
......
...@@ -120,7 +120,7 @@ class Replicator(object): ...@@ -120,7 +120,7 @@ class Replicator(object):
# new_partition_dict # new_partition_dict
# outdated partitions for which no critical tid was asked to primary # outdated partitions for which no critical tid was asked to primary
# master yet # master yet
# critical_tid_dict # critical_tid_list
# outdated partitions for which a critical tid was asked to primary # outdated partitions for which a critical tid was asked to primary
# master, but not answered so far # master, but not answered so far
# partition_dict # partition_dict
...@@ -149,11 +149,19 @@ class Replicator(object): ...@@ -149,11 +149,19 @@ class Replicator(object):
def __init__(self, app): def __init__(self, app):
self.app = app self.app = app
self.new_partition_dict = {} self.new_partition_dict = {}
self.critical_tid_dict = {} self.critical_tid_list = []
self.partition_dict = {} self.partition_dict = {}
self.task_list = [] self.task_list = []
self.task_dict = {} self.task_dict = {}
def masterLost(self):
"""
When connection to primary master is lost, stop waiting for unfinished
transactions.
"""
self.critical_tid_list = []
self.waiting_for_unfinished_tids = False
def populate(self): def populate(self):
""" """
Populate partitions to replicate. Must be called when partition Populate partitions to replicate. Must be called when partition
...@@ -161,7 +169,6 @@ class Replicator(object): ...@@ -161,7 +169,6 @@ class Replicator(object):
Implies a reset. Implies a reset.
""" """
self.new_partition_dict = self._getOutdatedPartitionList() self.new_partition_dict = self._getOutdatedPartitionList()
self.critical_tid_dict = {}
self.partition_dict = {} self.partition_dict = {}
self.reset() self.reset()
...@@ -171,7 +178,6 @@ class Replicator(object): ...@@ -171,7 +178,6 @@ class Replicator(object):
self.task_dict = {} self.task_dict = {}
self.current_partition = None self.current_partition = None
self.current_connection = None self.current_connection = None
self.waiting_for_unfinished_tids = False
self.unfinished_tid_list = None self.unfinished_tid_list = None
self.replication_done = True self.replication_done = True
...@@ -203,24 +209,17 @@ class Replicator(object): ...@@ -203,24 +209,17 @@ class Replicator(object):
def isCurrentConnection(self, conn): def isCurrentConnection(self, conn):
return self.current_connection is conn return self.current_connection is conn
def setCriticalTID(self, uuid, tid): def setCriticalTID(self, tid):
"""This is a callback from MasterOperationHandler.""" """This is a callback from MasterOperationHandler."""
try:
partition_list = self.critical_tid_dict.pop(uuid)
except KeyError:
neo.logging.debug("setCriticalTID raised KeyError for %s" %
(dump(uuid), ))
else:
neo.logging.debug('setting critical TID %s to %s', dump(tid), neo.logging.debug('setting critical TID %s to %s', dump(tid),
', '.join([str(p.getRID()) for p in partition_list])) ', '.join([str(p.getRID()) for p in self.critical_tid_list]))
for partition in partition_list: for partition in self.critical_tid_list:
partition.setCriticalTID(tid) partition.setCriticalTID(tid)
self.critical_tid_list = []
def _askCriticalTID(self): def _askCriticalTID(self):
conn = self.app.master_conn self.app.master_conn.ask(Packets.AskLastIDs())
conn.ask(Packets.AskLastIDs()) self.critical_tid_list.extend(self.new_partition_dict.values())
uuid = conn.getUUID()
self.critical_tid_dict[uuid] = self.new_partition_dict.values()
self.partition_dict.update(self.new_partition_dict) self.partition_dict.update(self.new_partition_dict)
self.new_partition_dict = {} self.new_partition_dict = {}
......
...@@ -180,7 +180,7 @@ class StorageMasterHandlerTests(NeoUnitTestBase): ...@@ -180,7 +180,7 @@ class StorageMasterHandlerTests(NeoUnitTestBase):
) )
calls = self.app.replicator.mockGetNamedCalls('setCriticalTID') calls = self.app.replicator.mockGetNamedCalls('setCriticalTID')
self.assertEquals(len(calls), 1) self.assertEquals(len(calls), 1)
calls[0].checkArgs(conn.getUUID(), INVALID_TID) calls[0].checkArgs(INVALID_TID)
def test_31_answerUnfinishedTransactions(self): def test_31_answerUnfinishedTransactions(self):
# set unfinished TID on replicator # set unfinished TID on replicator
......
...@@ -73,14 +73,13 @@ class StorageReplicatorTests(NeoUnitTestBase): ...@@ -73,14 +73,13 @@ class StorageReplicatorTests(NeoUnitTestBase):
def test_setCriticalTID(self): def test_setCriticalTID(self):
replicator = Replicator(None) replicator = Replicator(None)
master_uuid = self.getNewUUID()
partition_list = [Partition(0), Partition(5)] partition_list = [Partition(0), Partition(5)]
replicator.critical_tid_dict = {master_uuid: partition_list} replicator.critical_tid_list = partition_list[:]
critical_tid = self.getNextTID() critical_tid = self.getNextTID()
for partition in partition_list: for partition in partition_list:
self.assertEqual(partition.getCriticalTID(), None) self.assertEqual(partition.getCriticalTID(), None)
replicator.setCriticalTID(master_uuid, critical_tid) replicator.setCriticalTID(critical_tid)
self.assertEqual(replicator.critical_tid_dict, {}) self.assertEqual(replicator.critical_tid_list, [])
for partition in partition_list: for partition in partition_list:
self.assertEqual(partition.getCriticalTID(), critical_tid) self.assertEqual(partition.getCriticalTID(), critical_tid)
...@@ -154,7 +153,7 @@ class StorageReplicatorTests(NeoUnitTestBase): ...@@ -154,7 +153,7 @@ class StorageReplicatorTests(NeoUnitTestBase):
self.checkNoPacketSent(app.master_conn) self.checkNoPacketSent(app.master_conn)
self.assertTrue(replicator.waiting_for_unfinished_tids) self.assertTrue(replicator.waiting_for_unfinished_tids)
# Send answers (garanteed to happen in this order) # Send answers (garanteed to happen in this order)
replicator.setCriticalTID(master_uuid, critical_tid) replicator.setCriticalTID(critical_tid)
act() act()
self.checkNoPacketSent(app.master_conn) self.checkNoPacketSent(app.master_conn)
self.assertTrue(replicator.waiting_for_unfinished_tids) self.assertTrue(replicator.waiting_for_unfinished_tids)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment