Commit 9a85358e authored by Vincent Pelletier's avatar Vincent Pelletier

Make master forget lock waits for lost storage nodes.

If the last storage node gets disconnected, an operation failure will
occur, so there is no risk of sending AnswerTransactionFinished to client
if some object was not stored.

git-svn-id: https://svn.erp5.org/repos/neo/trunk@2150 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 31b3268b
...@@ -41,6 +41,10 @@ class StorageServiceHandler(BaseServiceHandler): ...@@ -41,6 +41,10 @@ class StorageServiceHandler(BaseServiceHandler):
# this is intentionaly placed after the raise because the last cell in a # this is intentionaly placed after the raise because the last cell in a
# partition must not oudated to allows a cluster restart. # partition must not oudated to allows a cluster restart.
self.app.outdateAndBroadcastPartition() self.app.outdateAndBroadcastPartition()
uuid = conn.getUUID()
for tid, transaction in self.app.tm.items():
if transaction.forget(uuid):
self._afterLock(tid)
def askLastIDs(self, conn): def askLastIDs(self, conn):
app = self.app app = self.app
...@@ -65,10 +69,15 @@ class StorageServiceHandler(BaseServiceHandler): ...@@ -65,10 +69,15 @@ class StorageServiceHandler(BaseServiceHandler):
t = tm[tid] t = tm[tid]
if not t.lock(uuid): if not t.lock(uuid):
return return
self._afterLock(tid)
def _afterLock(self, tid):
# I have received all the lock answers now: # I have received all the lock answers now:
# - send a Notify Transaction Finished to the initiated client node # - send a Notify Transaction Finished to the initiated client node
# - Invalidate Objects to the other client nodes # - Invalidate Objects to the other client nodes
app = self.app
tm = app.tm
t = tm[tid]
nm = app.nm nm = app.nm
transaction_node = t.getNode() transaction_node = t.getNode()
invalidate_objects = Packets.InvalidateObjects(tid, t.getOIDList()) invalidate_objects = Packets.InvalidateObjects(tid, t.getOIDList())
......
...@@ -85,6 +85,18 @@ class Transaction(object): ...@@ -85,6 +85,18 @@ class Transaction(object):
self._uuid_dict = dict.fromkeys(uuid_list, False) self._uuid_dict = dict.fromkeys(uuid_list, False)
self._msg_id = msg_id self._msg_id = msg_id
def forget(self, uuid):
"""
Given storage was lost while waiting for its lock, stop waiting
for it.
Does nothing if the node was not part of the transaction.
"""
# XXX: We might loose information that a storage successfully locked
# data but was later found to be disconnected. This loss has no impact
# on current code, but it might be disturbing to reader or future code.
self._uuid_dict.pop(uuid, None)
return self.locked()
def lock(self, uuid): def lock(self, uuid):
""" """
Define that a node has locked the transaction Define that a node has locked the transaction
...@@ -126,6 +138,9 @@ class TransactionManager(object): ...@@ -126,6 +138,9 @@ class TransactionManager(object):
""" """
return tid in self._tid_dict return tid in self._tid_dict
def items(self):
return self._tid_dict.items()
def _nextTID(self): def _nextTID(self):
""" Compute the next TID based on the current time and check collisions """ """ Compute the next TID based on the current time and check collisions """
tm = time() tm = time()
......
...@@ -192,6 +192,59 @@ class MasterStorageHandlerTests(NeoTestBase): ...@@ -192,6 +192,59 @@ class MasterStorageHandlerTests(NeoTestBase):
self._testWithMethod(self.service.connectionClosed, self._testWithMethod(self.service.connectionClosed,
NodeStates.TEMPORARILY_DOWN) NodeStates.TEMPORARILY_DOWN)
def test_nodeLostAfterAskLockInformation(self):
# 2 storage nodes, one will die
node1, conn1 = self._getStorage()
node2, conn2 = self._getStorage()
# client nodes, to distinguish answers for the sample transactions
client1, cconn1 = self._getClient()
client2, cconn2 = self._getClient()
client3, cconn3 = self._getClient()
tid1 = self.getNextTID()
tid2 = self.getNextTID(tid1)
tid3 = self.getNextTID(tid2)
oid_list = [self.getOID(), ]
# Some shortcuts to simplify test code
self.app.pt = Mock({'operational': True})
self.app.outdateAndBroadcastPartition = lambda: None
# Register some transactions
tm = self.app.tm
# Transaction 1: 2 storage nodes involved, one will die and the other
# already answered node lock
msg_id_1 = 1
tm.begin(client1, tid1)
tm.prepare(tid1, oid_list, [node1.getUUID(), node2.getUUID()], msg_id_1)
tm.lock(tid1, node2.getUUID())
# Transaction 2: 2 storage nodes involved, one will die
msg_id_2 = 2
tm.begin(client2, tid2)
tm.prepare(tid2, oid_list, [node1.getUUID(), node2.getUUID()], msg_id_2)
# Transaction 3: 1 storage node involved, which won't die
msg_id_3 = 3
tm.begin(client3, tid3)
tm.prepare(tid3, oid_list, [node2.getUUID(), ], msg_id_3)
# Assert initial state
self.checkNoPacketSent(cconn1)
self.checkNoPacketSent(cconn2)
self.checkNoPacketSent(cconn3)
# Storage 1 dies
node1.setTemporarilyDown()
self.service.nodeLost(conn1, node1)
# Check state after node lost
# T1: last locking node lost, client receives AnswerTransactionFinished
self.checkAnswerTransactionFinished(cconn1)
# ...and notifications are sent to other clients
self.checkInvalidateObjects(cconn2)
self.checkInvalidateObjects(cconn3)
# T2: pending locking answer, client keeps waiting
self.checkNoPacketSent(cconn2, check_notify=False)
# T3: action not significant to this transacion, so no response
self.checkNoPacketSent(cconn3, check_notify=False)
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()
......
...@@ -119,7 +119,51 @@ class testTransactionManager(NeoTestBase): ...@@ -119,7 +119,51 @@ class testTransactionManager(NeoTestBase):
self.assertTrue(tid2 is not None) self.assertTrue(tid2 is not None)
self.assertTrue(tid2 > ntid > tid1) self.assertTrue(tid2 > ntid > tid1)
def test_forget(self):
client1 = Mock({'__hash__': 1})
client2 = Mock({'__hash__': 2})
client3 = Mock({'__hash__': 3})
storage_1_uuid = self.makeUUID(1)
storage_2_uuid = self.makeUUID(2)
tid1 = self.makeTID(1)
tid2 = self.makeTID(2)
tid3 = self.makeTID(3)
oid_list = [self.makeOID(1), ]
tm = TransactionManager()
# Transaction 1: 2 storage nodes involved, one will die and the other
# already answered node lock
msg_id_1 = 1
tm.begin(client1, tid1)
tm.prepare(tid1, oid_list, [storage_1_uuid, storage_2_uuid], msg_id_1)
tm.lock(tid1, storage_2_uuid)
# Transaction 2: 2 storage nodes involved, one will die
msg_id_2 = 2
tm.begin(client2, tid2)
tm.prepare(tid2, oid_list, [storage_1_uuid, storage_2_uuid], msg_id_2)
# Transaction 3: 1 storage node involved, which won't die
msg_id_3 = 3
tm.begin(client3, tid3)
tm.prepare(tid3, oid_list, [storage_2_uuid, ], msg_id_3)
t1 = tm[tid1]
t2 = tm[tid2]
t3 = tm[tid3]
# Assert initial state
self.assertFalse(t1.locked())
self.assertFalse(t2.locked())
self.assertFalse(t3.locked())
# Storage 1 dies:
# t1 is over
self.assertTrue(t1.forget(storage_1_uuid))
# t2 still waits for storage 2
self.assertFalse(t2.forget(storage_1_uuid))
self.assertTrue(t2.lock(storage_2_uuid))
# t3 doesn't care
self.assertFalse(t3.forget(storage_1_uuid))
self.assertTrue(t3.lock(storage_2_uuid))
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment