Commit 743026d5 authored by Julien Muchembled's avatar Julien Muchembled

client: fix race condition in refcounting dispatched answer packets

This was found when stress-testing a big cluster. 1 client node was stuck:

  (Pdb) pp app.dispatcher.__dict__
  {'lock_acquire': <built-in method acquire of thread.lock object at 0x7f788c6e4250>,
  'lock_release': <built-in method release of thread.lock object at 0x7f788c6e4250>,
  'message_table': {140155667614608: {},
                    140155668875280: {},
                    140155671145872: {},
                    140155672381008: {},
                    140155672381136: {},
                    140155672381456: {},
                    140155673002448: {},
                    140155673449680: {},
                    140155676093648: {170: <neo.lib.locking.SimpleQueue object at 0x7f788a109c58>},
                    140155677536464: {},
                    140155679224336: {},
                    140155679876496: {},
                    140155680702992: {},
                    140155681851920: {},
                    140155681852624: {},
                    140155682773584: {},
                    140155685988880: {},
                    140155693061328: {},
                    140155693062224: {},
                    140155693074960: {},
                    140155696334736: {278: <neo.lib.locking.SimpleQueue object at 0x7f788a109c58>},
                    140155696411408: {},
                    140155696414160: {},
                    140155696576208: {},
                    140155722373904: {}},
  'queue_dict': {140155673622936: 1, 140155689147480: 2}}

140155673622936 should not be queue_dict
parent 7e456329
...@@ -87,13 +87,11 @@ class Dispatcher: ...@@ -87,13 +87,11 @@ class Dispatcher:
def unregister(self, conn): def unregister(self, conn):
""" Unregister a connection and put fake packet in queues to unlock """ Unregister a connection and put fake packet in queues to unlock
threads expecting responses from that connection """ threads expecting responses from that connection """
notified_set = set()
_decrefQueue = self._decrefQueue
self.lock_acquire() self.lock_acquire()
try: try:
message_table = self.message_table.pop(id(conn), EMPTY) message_table = self.message_table.pop(id(conn), EMPTY)
finally:
self.lock_release()
notified_set = set()
_decrefQueue = self._decrefQueue
for queue in message_table.itervalues(): for queue in message_table.itervalues():
if queue is NOBODY: if queue is NOBODY:
continue continue
...@@ -102,6 +100,8 @@ class Dispatcher: ...@@ -102,6 +100,8 @@ class Dispatcher:
queue.put((conn, _ConnectionClosed, EMPTY)) queue.put((conn, _ConnectionClosed, EMPTY))
notified_set.add(queue_id) notified_set.add(queue_id)
_decrefQueue(queue) _decrefQueue(queue)
finally:
self.lock_release()
@giant_lock @giant_lock
def forget_queue(self, queue, flush_queue=True): def forget_queue(self, queue, flush_queue=True):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment