Commit a6dd40a9 authored by Aurel's avatar Aurel

redirect message in handler and not in dispatcher


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@159 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 70fb83f7
......@@ -33,32 +33,6 @@ class Dispatcher(Thread):
# This happen when there is no connection
logging.error('Dispatcher, run, poll returned a KeyError')
while 1:
try:
conn, packet = self.message.get_nowait()
except Empty:
break
# Send message to waiting thread
key = (conn.getUUID(), packet.getId())
#logging.info('dispatcher got packet %s' %(key,))
if key in self.message_table:
queue = self.message_table.pop(key)
queue.put((conn, packet))
else:
#conn, packet = self.message
method_type = packet.getType()
if method_type == PING:
# must answer with no delay
conn.lock()
try:
conn.addPacket(Packet().pong(packet.getId()))
finally:
conn.unlock()
else:
# put message in request queue
self._request_queue.put((conn, packet))
def register(self, conn, msg_id, queue):
"""Register an expectation for a reply. Thanks to GIL, it is
safe not to use a lock here."""
......@@ -106,6 +80,7 @@ class Dispatcher(Thread):
# Send message
conn.addPacket(p)
conn.expectMessage(msg_id)
self.register(conn, msg_id, app.getQueue())
finally:
conn.unlock()
......@@ -118,24 +93,7 @@ class Dispatcher(Thread):
while time() < t + 1:
pass
break
# Check if we got a reply
try:
conn, packet = self.message.get_nowait()
method_type = packet.getType()
conn.lock()
try:
if method_type == PING:
# Must answer with no delay
conn.addPacket(Packet().pong(packet.getId()))
break
else:
# Process message by handler
conn.handler.dispatch(conn, packet)
finally:
conn.unlock()
except Empty:
pass
app._waitMessage()
# Now check result
if app.primary_master_node is not None:
if app.primary_master_node == -1:
......
......@@ -4,7 +4,8 @@ from neo.handler import EventHandler
from neo.connection import MTClientConnection
from neo.protocol import Packet, \
MASTER_NODE_TYPE, STORAGE_NODE_TYPE, CLIENT_NODE_TYPE, \
INVALID_UUID, RUNNING_STATE, TEMPORARILY_DOWN_STATE, BROKEN_STATE
INVALID_UUID, RUNNING_STATE, TEMPORARILY_DOWN_STATE, \
BROKEN_STATE, PING
from neo.node import MasterNode, StorageNode, ClientNode
from neo.pt import PartitionTable
from neo.client.NEOStorage import NEOStorageError
......@@ -25,7 +26,24 @@ class ClientEventHandler(EventHandler):
def packetReceived(self, conn, packet):
"""Redirect all received packet to dispatcher thread."""
self.dispatcher.message.put((conn, packet), True)
dispatcher = self.dispatcher
# Send message to waiting thread
key = (conn.getUUID(), packet.getId())
if key in dispatcher.message_table:
queue = dispatcher.message_table.pop(key)
queue.put((conn, packet))
else:
method_type = packet.getType()
if method_type == PING:
# must answer with no delay
conn.lock()
try:
conn.addPacket(Packet().pong(packet.getId()))
finally:
conn.unlock()
else:
# put message in request queue
dispatcher._request_queue.put((conn, packet))
def connectionFailed(self, conn):
app = self.app
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment