From eeaa93d9eb3f45acf3ca4c14774162069e44d7cf Mon Sep 17 00:00:00 2001 From: Vincent Pelletier <vincent@nexedi.com> Date: Tue, 7 Jul 2009 09:27:49 +0000 Subject: [PATCH] Make MTClientConnection.ask register message to dispatcher. This makes sure registration happens before the message has a chance of being sent. git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@807 71dcc9de-d417-0410-9af5-da40c76e7ee4 --- neo/client/app.py | 36 +++++++++++++++++------------------- neo/connection.py | 10 ++++++++-- 2 files changed, 25 insertions(+), 21 deletions(-) diff --git a/neo/client/app.py b/neo/client/app.py index 78243156..cc0e891c 100644 --- a/neo/client/app.py +++ b/neo/client/app.py @@ -74,7 +74,8 @@ class ConnectionPool(object): logging.info('trying to connect to %s - %s', node, node.getState()) app.setNodeReady() conn = MTClientConnection(app.em, app.storage_event_handler, addr, - connector_handler=app.connector_handler) + connector_handler=app.connector_handler, + dispatcher=app.dispatcher) conn.lock() try: if conn.getConnector() is None: @@ -84,8 +85,7 @@ class ConnectionPool(object): p = protocol.requestNodeIdentification(CLIENT_NODE_TYPE, app.uuid, addr[0], addr[1], app.name) - msg_id = conn.ask(p) - app.dispatcher.register(conn, msg_id, app.local_var.queue) + msg_id = conn.ask(app.local_var.queue, p) finally: conn.unlock() @@ -341,8 +341,8 @@ class Application(object): def _askStorage(self, conn, packet, timeout=5, additional_timeout=30): """ Send a request to a storage node and process it's answer """ try: - msg_id = conn.ask(packet, timeout, additional_timeout) - self.dispatcher.register(conn, msg_id, self.local_var.queue) + msg_id = conn.ask(self.local_var.queue, packet, timeout, + additional_timeout) finally: # assume that the connection was already locked conn.unlock() @@ -353,8 +353,8 @@ class Application(object): conn = self._getMasterConnection() conn.lock() try: - msg_id = conn.ask(packet, timeout, additional_timeout) - self.dispatcher.register(conn, msg_id, self.local_var.queue) + msg_id = conn.ask(self.local_var.queue, packet, timeout, + additional_timeout) finally: conn.unlock() self._waitMessage(conn, msg_id, self.primary_handler) @@ -407,7 +407,8 @@ class Application(object): # Connect to master conn = MTClientConnection(self.em, self.notifications_handler, addr=self.trying_master_node.getServer(), - connector_handler=self.connector_handler) + connector_handler=self.connector_handler, + dispatcher=self.dispatcher) # Query for primary master node conn.lock() try: @@ -416,8 +417,7 @@ class Application(object): logging.error('Connection to master node %s failed', self.trying_master_node) continue - msg_id = conn.ask(protocol.askPrimaryMaster()) - self.dispatcher.register(conn, msg_id, self.local_var.queue) + msg_id = conn.ask(self.local_var.queue, protocol.askPrimaryMaster()) finally: conn.unlock() try: @@ -440,8 +440,7 @@ class Application(object): break p = protocol.requestNodeIdentification(CLIENT_NODE_TYPE, self.uuid, '0.0.0.0', 0, self.name) - msg_id = conn.ask(p) - self.dispatcher.register(conn, msg_id, self.local_var.queue) + msg_id = conn.ask(self.local_var.queue, p) finally: conn.unlock() try: @@ -467,15 +466,15 @@ class Application(object): # wait on one message at a time conn.lock() try: - msg_id = conn.ask(protocol.askPartitionTable([])) - self.dispatcher.register(conn, msg_id, self.local_var.queue) + msg_id = conn.ask(self.local_var.queue, + protocol.askPartitionTable([])) finally: conn.unlock() self._waitMessage(conn, msg_id, handler=self.primary_bootstrap_handler) conn.lock() try: - msg_id = conn.ask(protocol.askNodeInformation()) - self.dispatcher.register(conn, msg_id, self.local_var.queue) + msg_id = conn.ask(self.local_var.queue, + protocol.askNodeInformation()) finally: conn.unlock() self._waitMessage(conn, msg_id, handler=self.primary_bootstrap_handler) @@ -882,9 +881,8 @@ class Application(object): continue try: - p = protocol.askTIDs(first, last, INVALID_PARTITION) - msg_id = conn.ask(p) - self.dispatcher.register(conn, msg_id, self.local_var.queue) + conn.ask(self.local_var.queue, + protocol.askTIDs(first, last, INVALID_PARTITION)) finally: conn.unlock() diff --git a/neo/connection.py b/neo/connection.py index 128d4b09..9434079c 100644 --- a/neo/connection.py +++ b/neo/connection.py @@ -468,6 +468,7 @@ class MTClientConnection(ClientConnection): self._lock = lock = RLock() self.acquire = lock.acquire self.release = lock.release + self.dispatcher = kwargs.pop('dispatcher') self.lock() try: super(MTClientConnection, self).__init__(*args, **kwargs) @@ -501,8 +502,13 @@ class MTClientConnection(ClientConnection): return super(MTClientConnection, self).notify(*args, **kw) @lockCheckWrapper - def ask(self, *args, **kw): - return super(MTClientConnection, self).ask(*args, **kw) + def ask(self, queue, packet, timeout=5, additional_timeout=30): + msg_id = self._getNextId() + packet.setId(msg_id) + self.dispatcher.register(self, msg_id, queue) + self.expectMessage(msg_id) + self._addPacket(packet) + return msg_id @lockCheckWrapper def answer(self, *args, **kw): -- 2.30.9