Commit ee3f9a1d authored by Grégory Wisniewski's avatar Grégory Wisniewski

Revert commit #864 after a discussion about isolation between the connection and

thread context.


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@869 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 88980cbb
......@@ -73,8 +73,9 @@ class ConnectionPool(object):
while True:
logging.info('trying to connect to %s - %s', node, node.getState())
app.setNodeReady()
conn = MTClientConnection(self.app.local_var, app.em, app.storage_event_handler,
addr, connector_handler=app.connector_handler, dispatcher=app.dispatcher)
conn = MTClientConnection(app.em, app.storage_event_handler, addr,
connector_handler=app.connector_handler,
dispatcher=app.dispatcher)
conn.lock()
try:
if conn.getConnector() is None:
......@@ -84,7 +85,7 @@ class ConnectionPool(object):
p = protocol.requestNodeIdentification(CLIENT_NODE_TYPE,
app.uuid, '0.0.0.0', 0, app.name)
msg_id = conn.ask(p)
msg_id = conn.ask(app.local_var.queue, p)
finally:
conn.unlock()
......@@ -343,7 +344,8 @@ class Application(object):
def _askStorage(self, conn, packet, timeout=5, additional_timeout=30):
""" Send a request to a storage node and process it's answer """
try:
msg_id = conn.ask(packet, timeout, additional_timeout)
msg_id = conn.ask(self.local_var.queue, packet, timeout,
additional_timeout)
finally:
# assume that the connection was already locked
conn.unlock()
......@@ -354,7 +356,8 @@ class Application(object):
conn = self._getMasterConnection()
conn.lock()
try:
msg_id = conn.ask(packet, timeout, additional_timeout)
msg_id = conn.ask(self.local_var.queue, packet, timeout,
additional_timeout)
finally:
conn.unlock()
self._waitMessage(conn, msg_id, self.primary_handler)
......@@ -405,7 +408,7 @@ class Application(object):
self.trying_master_node = master_list[0]
index += 1
# Connect to master
conn = MTClientConnection(self.local_var, self.em, self.notifications_handler,
conn = MTClientConnection(self.em, self.notifications_handler,
addr=self.trying_master_node.getServer(),
connector_handler=self.connector_handler,
dispatcher=self.dispatcher)
......@@ -417,7 +420,7 @@ class Application(object):
logging.error('Connection to master node %s failed',
self.trying_master_node)
continue
msg_id = conn.ask(protocol.askPrimaryMaster())
msg_id = conn.ask(self.local_var.queue, protocol.askPrimaryMaster())
finally:
conn.unlock()
try:
......@@ -440,7 +443,7 @@ class Application(object):
break
p = protocol.requestNodeIdentification(CLIENT_NODE_TYPE,
self.uuid, '0.0.0.0', 0, self.name)
msg_id = conn.ask(p)
msg_id = conn.ask(self.local_var.queue, p)
finally:
conn.unlock()
try:
......@@ -466,13 +469,15 @@ class Application(object):
# wait on one message at a time
conn.lock()
try:
msg_id = conn.ask(protocol.askPartitionTable([]))
msg_id = conn.ask(self.local_var.queue,
protocol.askPartitionTable([]))
finally:
conn.unlock()
self._waitMessage(conn, msg_id, handler=self.primary_bootstrap_handler)
conn.lock()
try:
msg_id = conn.ask(protocol.askNodeInformation())
msg_id = conn.ask(self.local_var.queue,
protocol.askNodeInformation())
finally:
conn.unlock()
self._waitMessage(conn, msg_id, handler=self.primary_bootstrap_handler)
......@@ -878,7 +883,8 @@ class Application(object):
continue
try:
conn.ask(protocol.askTIDs(first, last, INVALID_PARTITION))
conn.ask(self.local_var.queue,
protocol.askTIDs(first, last, INVALID_PARTITION))
finally:
conn.unlock()
......
......@@ -462,14 +462,12 @@ class ServerConnection(Connection):
class MTClientConnection(ClientConnection):
"""A Multithread-safe version of ClientConnection."""
def __init__(self, local_var, *args, **kwargs):
def __init__(self, *args, **kwargs):
# _lock is only here for lock debugging purposes. Do not use.
self._lock = lock = RLock()
self.acquire = lock.acquire
self.release = lock.release
self.dispatcher = kwargs.pop('dispatcher')
self.local_var = local_var
self.lock()
try:
super(MTClientConnection, self).__init__(*args, **kwargs)
......@@ -503,10 +501,10 @@ class MTClientConnection(ClientConnection):
return super(MTClientConnection, self).notify(*args, **kw)
@lockCheckWrapper
def ask(self, packet, timeout=5, additional_timeout=30):
def ask(self, queue, packet, timeout=5, additional_timeout=30):
msg_id = self._getNextId()
packet.setId(msg_id)
self.dispatcher.register(self, msg_id, self.local_var.queue)
self.dispatcher.register(self, msg_id, queue)
self.expectMessage(msg_id)
self._addPacket(packet)
return msg_id
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment