From 523675ad1cbae60a53ada8712ccd195cab13a136 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gr=C3=A9gory=20Wisniewski?= <gregory@nexedi.com> Date: Wed, 20 Jan 2010 17:17:58 +0000 Subject: [PATCH] Use master's transaction manager and setup some TODO/XXX. git-svn-id: https://svn.erp5.org/repos/neo/trunk@1453 71dcc9de-d417-0410-9af5-da40c76e7ee4 --- neo/master/app.py | 12 +++---- neo/master/handlers/client.py | 64 ++++++---------------------------- neo/master/handlers/storage.py | 19 +++++----- 3 files changed, 24 insertions(+), 71 deletions(-) diff --git a/neo/master/app.py b/neo/master/app.py index 9da1161f..1bb5c5f0 100644 --- a/neo/master/app.py +++ b/neo/master/app.py @@ -32,6 +32,7 @@ from neo.master.handlers import election, identification, secondary, recovery from neo.master.handlers import verification, storage, client, shutdown from neo.master.handlers import administration from neo.master.pt import PartitionTable +from neo.master.transactions import TransactionManager from neo.util import dump, parseMasterList from neo.connector import getConnectorHandler @@ -55,6 +56,7 @@ class Application(object): # Internal attributes. self.em = EventManager() self.nm = NodeManager() + self.tm = TransactionManager() # Partition table replicas, partitions = config.getReplicas(), config.getPartitions() @@ -95,9 +97,6 @@ class Application(object): self.asking_uuid_dict = {} self.object_present = False - # service related data - self.finishing_transaction_dict = {} - def run(self): """Make sure that the status is sane and start a loop.""" @@ -551,13 +550,10 @@ class Application(object): logging.info('provide service') em = self.em nm = self.nm + self.tm.reset() self.changeClusterState(ClusterStates.RUNNING) - # This dictionary is used to hold information on transactions being - # finished. - self.finishing_transaction_dict = {} - # Now everything is passive. while True: try: @@ -717,7 +713,7 @@ class Application(object): # wait for all transaction to be finished while 1: self.em.poll(1) - if len(self.finishing_transaction_dict) == 0: + if not self.tm.hasPending(): if self.cluster_state == ClusterStates.RUNNING: sys.exit("Application has been asked to shut down") else: diff --git a/neo/master/handlers/client.py b/neo/master/handlers/client.py index 6df3498d..cb00d3c5 100644 --- a/neo/master/handlers/client.py +++ b/neo/master/handlers/client.py @@ -22,44 +22,6 @@ from neo.protocol import NodeStates, Packets, UnexpectedPacketError from neo.master.handlers import BaseServiceHandler from neo.util import dump, getNextTID -class FinishingTransaction(object): - """This class describes a finishing transaction.""" - - def __init__(self, conn): - self._conn = conn - self._msg_id = None - self._oid_list = None - self._uuid_set = None - self._locked_uuid_set = set() - - def getConnection(self): - return self._conn - - def setMessageId(self, msg_id): - self._msg_id = msg_id - - def getMessageId(self): - return self._msg_id - - def setOIDList(self, oid_list): - self._oid_list = oid_list - - def getOIDList(self): - return self._oid_list - - def setUUIDSet(self, uuid_set): - self._uuid_set = uuid_set - - def getUUIDSet(self): - return self._uuid_set - - def addLockedUUID(self, uuid): - if uuid in self._uuid_set: - self._locked_uuid_set.add(uuid) - - def allLocked(self): - return self._uuid_set == self._locked_uuid_set - class ClientServiceHandler(BaseServiceHandler): """ Handler dedicated to client during service state """ @@ -68,16 +30,14 @@ class ClientServiceHandler(BaseServiceHandler): pass def nodeLost(self, conn, node): - app = self.app - for tid, t in app.finishing_transaction_dict.items(): - if t.getConnection() is conn: - del app.finishing_transaction_dict[tid] - app.nm.remove(node) + # cancel it's transactions and forgot the node + self.app.tm.abortFor(node) + self.app.nm.remove(node) def abortTransaction(self, conn, packet, tid): - try: - del self.app.finishing_transaction_dict[tid] - except KeyError: + if tid in self.app.tm: + self.app.tm.remove(tid) + else: logging.warn('aborting transaction %s does not exist', dump(tid)) def askBeginTransaction(self, conn, packet, tid): @@ -88,8 +48,10 @@ class ClientServiceHandler(BaseServiceHandler): if tid is None: # give a new transaction ID tid = getNextTID(app.ltid) + # TODO: transaction manager should handle last TID app.ltid = tid - app.finishing_transaction_dict[tid] = FinishingTransaction(conn) + node = app.nm.getByUUID(conn.getUUID()) + app.tm.begin(node, tid) conn.answer(Packets.AnswerBeginTransaction(tid), packet.getId()) def askNewOIDs(self, conn, packet, num_oids): @@ -124,11 +86,5 @@ class ClientServiceHandler(BaseServiceHandler): c.ask(Packets.LockInformation(tid), timeout=60) used_uuid_set.add(c.getUUID()) - try: - t = app.finishing_transaction_dict[tid] - t.setOIDList(oid_list) - t.setUUIDSet(used_uuid_set) - t.setMessageId(packet.getId()) - except KeyError: - logging.warn('finishing transaction %s does not exist', dump(tid)) + app.tm.prepare(tid, oid_list, used_uuid_set, packet.getId()) diff --git a/neo/master/handlers/storage.py b/neo/master/handlers/storage.py index 47653d87..e763207d 100644 --- a/neo/master/handlers/storage.py +++ b/neo/master/handlers/storage.py @@ -48,9 +48,7 @@ class StorageServiceHandler(BaseServiceHandler): packet.getId()) def askUnfinishedTransactions(self, conn, packet): - app = self.app - p = Packets.AnswerUnfinishedTransactions( - app.finishing_transaction_dict.keys()) + p = Packets.AnswerUnfinishedTransactions(self.app.tm.getPendingList()) conn.answer(p, packet.getId()) def notifyInformationLocked(self, conn, packet, tid): @@ -64,9 +62,12 @@ class StorageServiceHandler(BaseServiceHandler): raise UnexpectedPacketError try: - t = app.finishing_transaction_dict[tid] - t.addLockedUUID(uuid) - if t.allLocked(): + t = self.app.tm[tid] + if t.lock(uuid): # all nodes are locked + # XXX: review needed: + # don't iterate over connections but search by uuid + # include client's uuid in Transaction object + # I have received all the answers now. So send a Notify # Transaction Finished to the initiated client node, # Invalidate Objects to the other client nodes, and Unlock @@ -76,7 +77,7 @@ class StorageServiceHandler(BaseServiceHandler): if uuid is not None: node = app.nm.getByUUID(uuid) if node.isClient(): - if c is t.getConnection(): + if node is t.getNode(): p = Packets.NotifyTransactionFinished(tid) c.answer(p, t.getMessageId()) else: @@ -84,10 +85,10 @@ class StorageServiceHandler(BaseServiceHandler): tid) c.notify(p) elif node.isStorage(): - if uuid in t.getUUIDSet(): + if uuid in t.getUUIDList(): p = Packets.UnlockInformation(tid) c.notify(p) - del app.finishing_transaction_dict[tid] + self.app.tm.remove(tid) except KeyError: # What is this? pass -- 2.30.9