Commit 8b91706a authored by Julien Muchembled's avatar Julien Muchembled

Fix race conditions in EventManager between _poll/connection_dict and (un)registration

The following error was reported on a client node:

    #0x0000 Error                   < None (2001:...:2051)
    1 (Retry Later)
    connection closed for <MTClientConnection(uuid=None, address=2001:...:2051, handler=PrimaryNotificationsHandler, closed, client) at 7f1ea7c42f90>
    Event Manager:
    connection started for <MTClientConnection(uuid=None, address=2001:...:2051, handler=PrimaryNotificationsHandler, fd=13, on_close=onConnectionClosed, connecting, client) at 7f1ea7c25c10>
    #0x0000 RequestIdentification          > None (2001:...:2051)
      Readers: []
      Writers: []
      Connections:
        13: <MTClientConnection(uuid=None, address=2001:...:2051, handler=PrimaryNotificationsHandler, fd=13, on_close=onConnectionClosed, connecting, client) at 7f1ea7c25c10> (pending=False)
    Node manager : 1 nodes
    * None |   MASTER | 2001:...:2051 | UNKNOWN
    <ClientCache history_size=0 oid_count=0 size=0 time=0 queue_length=[0] (life_time=10000 max_history_size=100000 max_size=20971520)>
    poll raised, retrying
    Traceback (most recent call last):
      File "neo/lib/threaded_app.py", line 93, in _run
        poll(1)
      File "neo/lib/event.py", line 134, in poll
        self._poll(0)
      File "neo/lib/event.py", line 164, in _poll
        conn = self.connection_dict[fd]
    KeyError: 13

which means that:
- while the poll thread is getting a (13, EPOLLIN) event because it is
  closed (aborted by the master)
- another thread processes the error packet, by closing it in
  PrimaryBootstrapHandler.notReady
- next, the poll thread resumes the execution of EpollEventManager._poll
  and fails to find fd=13 in self.connection_dict

So here, we have a race condition between epoll_wait and any further use
of connection_dict to map returned fds.

However, what commit a4731a0c does to handle
the case of fd reallocation only works for mono-threaded applications.
In EPOLLIN, wrapping 'self.connection_dict[fd]' the same way as for other
events is not enough. For example:
- case 1:
  - thread 1: epoll returns fd=13
  - thread 2: close(13)
  - thread 2: open(13)
  - thread 1: self.connection_dict[13] does not match
              but this would be handled by the 'unregistered' list
- case 2:
  - thread 1: reset 'unregistered'
  - thread 2: close(13)
  - thread 2: open(13)
  - thread 1: epoll returns fd=13
  - thread 1: self.connection_dict[13] matches
              but it would be wrongly ignored by 'unregistered'
- case 3:
  - thread 1: about to call readable/writable/onTimeout on a connection
  - thread 2: this connection is closed
  - thread 1: readable/writable/onTimeout wrongly called on a closed connection

We could protect _poll() with a lock, and make unregister() use wakeup() so
that it gets a chance to acquire it, but that causes threaded tests to deadlock
(continuing in this direction seems too complicated).

So we have to deal with the fact that there can be race conditions at any time
and there's no way to make 'connection_dict' match exactly what epoll returns.
We solve this by preventing fd reallocation inside _poll(), which is fortunately
possible with sockets, using 'shutdown': the closing of fds is delayed.

For above case 3, readable/writable/onTimeout for MTClientConnection are also
changed to test whether the connection is still open while it has the lock.
Just for safety, we do the same for 'process'.

At last, another kind of race condition that this commit also fixes concerns
the use of itervalues() on EventManager.connection_dict.
parent 4a0b936f
...@@ -228,8 +228,7 @@ class BaseConnection(object): ...@@ -228,8 +228,7 @@ class BaseConnection(object):
def close(self): def close(self):
"""Close the connection.""" """Close the connection."""
if self.connector is not None: if self.connector is not None:
self.em.unregister(self) self.em.unregister(self, True)
self.connector.close()
self.connector = None self.connector = None
self.aborted = False self.aborted = False
...@@ -672,9 +671,11 @@ class MTConnectionType(type): ...@@ -672,9 +671,11 @@ class MTConnectionType(type):
if __debug__: if __debug__:
for name in 'answer',: for name in 'answer',:
setattr(cls, name, cls.lockCheckWrapper(name)) setattr(cls, name, cls.lockCheckWrapper(name))
for name in ('_delayedConnect', 'close', 'notify', 'onTimeout', for name in 'close', 'notify':
'process', 'readable', 'writable'):
setattr(cls, name, cls.__class__.lockWrapper(cls, name)) setattr(cls, name, cls.__class__.lockWrapper(cls, name))
for name in ('_delayedConnect', 'onTimeout',
'process', 'readable', 'writable'):
setattr(cls, name, cls.__class__.lockWrapper(cls, name, True))
def lockCheckWrapper(cls, name): def lockCheckWrapper(cls, name):
def wrapper(self, *args, **kw): def wrapper(self, *args, **kw):
...@@ -684,7 +685,15 @@ class MTConnectionType(type): ...@@ -684,7 +685,15 @@ class MTConnectionType(type):
return getattr(super(cls, self), name)(*args, **kw) return getattr(super(cls, self), name)(*args, **kw)
return wraps(getattr(cls, name).im_func)(wrapper) return wraps(getattr(cls, name).im_func)(wrapper)
def lockWrapper(cls, name): def lockWrapper(cls, name, maybe_closed=False):
if maybe_closed:
def wrapper(self):
with self.lock:
if self.isClosed():
logging.info("%r.%s()", self, name)
else:
return getattr(super(cls, self), name)()
else:
def wrapper(self, *args, **kw): def wrapper(self, *args, **kw):
with self.lock: with self.lock:
return getattr(super(cls, self), name)(*args, **kw) return getattr(super(cls, self), name)(*args, **kw)
......
...@@ -174,14 +174,19 @@ class SocketConnector(object): ...@@ -174,14 +174,19 @@ class SocketConnector(object):
return True return True
def close(self): def shutdown(self):
self.is_closed = True self.is_closed = True
try: try:
if self.connect_limit[self.addr] < time(): if self.connect_limit[self.addr] < time():
del self.connect_limit[self.addr] del self.connect_limit[self.addr]
except KeyError: except KeyError:
pass pass
return self.socket.close() try:
self.socket.shutdown(socket.SHUT_RDWR)
except socket.error, e:
if e.errno != errno.ENOTCONN:
raise
return self.socket.close
def setReconnectionNoDelay(self): def setReconnectionNoDelay(self):
"""Mark as successful so that we can reconnect without delay""" """Mark as successful so that we can reconnect without delay"""
......
...@@ -21,6 +21,16 @@ from errno import EAGAIN, EEXIST, EINTR, ENOENT ...@@ -21,6 +21,16 @@ from errno import EAGAIN, EEXIST, EINTR, ENOENT
from . import logging from . import logging
from .locking import Lock from .locking import Lock
@apply
def dictionary_changed_size_during_iteration():
d = {}; i = iter(d); d[0] = 0
try:
next(i)
except RuntimeError as e:
return str(e)
raise AssertionError
class EpollEventManager(object): class EpollEventManager(object):
"""This class manages connections and events based on epoll(5).""" """This class manages connections and events based on epoll(5)."""
...@@ -29,10 +39,6 @@ class EpollEventManager(object): ...@@ -29,10 +39,6 @@ class EpollEventManager(object):
def __init__(self): def __init__(self):
self.connection_dict = {} self.connection_dict = {}
# Initialize a dummy 'unregistered' for the very rare case a registered
# connection is closed before the first call to poll. We don't care
# leaking a few integers for connections closed between 2 polls.
self.unregistered = []
self.reader_set = set() self.reader_set = set()
self.writer_set = set() self.writer_set = set()
self.epoll = epoll() self.epoll = epoll()
...@@ -40,6 +46,18 @@ class EpollEventManager(object): ...@@ -40,6 +46,18 @@ class EpollEventManager(object):
self._trigger_fd, w = os.pipe() self._trigger_fd, w = os.pipe()
os.close(w) os.close(w)
self._trigger_lock = Lock() self._trigger_lock = Lock()
close_list = []
self._closeAppend = close_list.append
l = Lock()
self._closeAcquire = l.acquire
_release = l.release
def release():
try:
while close_list:
close_list.pop()()
finally:
_release()
self._closeRelease = release
def close(self): def close(self):
os.close(self._trigger_fd) os.close(self._trigger_fd)
...@@ -49,8 +67,15 @@ class EpollEventManager(object): ...@@ -49,8 +67,15 @@ class EpollEventManager(object):
def getConnectionList(self): def getConnectionList(self):
# XXX: use index # XXX: use index
while 1:
# See _poll() about the use of self.connection_dict.itervalues()
try:
return [x for x in self.connection_dict.itervalues() return [x for x in self.connection_dict.itervalues()
if not x.isAborted()] if not x.isAborted()]
except RuntimeError, e:
if str(e) != dictionary_changed_size_during_iteration:
raise
logging.info("%r", e)
def getClientList(self): def getClientList(self):
# XXX: use index # XXX: use index
...@@ -86,17 +111,17 @@ class EpollEventManager(object): ...@@ -86,17 +111,17 @@ class EpollEventManager(object):
self.epoll.register(fd) self.epoll.register(fd)
self.addReader(conn) self.addReader(conn)
def unregister(self, conn): def unregister(self, conn, close=False):
new_pending_processing = [x for x in self._pending_processing new_pending_processing = [x for x in self._pending_processing
if x is not conn] if x is not conn]
# Check that we removed at most one entry from # Check that we removed at most one entry from
# self._pending_processing . # self._pending_processing .
assert len(new_pending_processing) > len(self._pending_processing) - 2 assert len(new_pending_processing) > len(self._pending_processing) - 2
self._pending_processing = new_pending_processing self._pending_processing = new_pending_processing
fd = conn.getConnector().getDescriptor() connector = conn.getConnector()
fd = connector.getDescriptor()
try: try:
del self.connection_dict[fd] del self.connection_dict[fd]
self.unregistered.append(fd)
self.epoll.unregister(fd) self.epoll.unregister(fd)
except KeyError: except KeyError:
pass pass
...@@ -106,6 +131,10 @@ class EpollEventManager(object): ...@@ -106,6 +131,10 @@ class EpollEventManager(object):
else: else:
self.reader_set.discard(fd) self.reader_set.discard(fd)
self.writer_set.discard(fd) self.writer_set.discard(fd)
if close:
self._closeAppend(connector.shutdown())
if self._closeAcquire(0):
self._closeRelease()
def isIdle(self): def isIdle(self):
return not (self._pending_processing or self.writer_set) return not (self._pending_processing or self.writer_set)
...@@ -135,7 +164,13 @@ class EpollEventManager(object): ...@@ -135,7 +164,13 @@ class EpollEventManager(object):
def _poll(self, blocking): def _poll(self, blocking):
if blocking: if blocking:
if 1: # self.connection_dict may be changed at any time by another thread,
# which may cause itervalues() to fail. But this happens so rarely,
# that for performance reasons, we prefer to retry, rather than:
# - protect self.connection_dict with a lock
# - or iterate over an atomic copy.
while 1:
try:
timeout = self._timeout timeout = self._timeout
timeout_object = self timeout_object = self
for conn in self.connection_dict.itervalues(): for conn in self.connection_dict.itervalues():
...@@ -143,10 +178,21 @@ class EpollEventManager(object): ...@@ -143,10 +178,21 @@ class EpollEventManager(object):
if t and (timeout is None or t < timeout): if t and (timeout is None or t < timeout):
timeout = t timeout = t
timeout_object = conn timeout_object = conn
break
except RuntimeError, e:
if str(e) != dictionary_changed_size_during_iteration:
raise
logging.info("%r", e)
# Make sure epoll_wait does not return too early, because it has a # Make sure epoll_wait does not return too early, because it has a
# granularity of 1ms and Python 2.7 rounds the timeout towards zero. # granularity of 1ms and Python 2.7 rounds the timeout towards zero.
# See also https://bugs.python.org/issue20452 (fixed in Python 3). # See also https://bugs.python.org/issue20452 (fixed in Python 3).
blocking = .001 + max(0, timeout - time()) if timeout else -1 blocking = .001 + max(0, timeout - time()) if timeout else -1
# From this point, and until we have processed all fds returned by
# epoll, we must prevent any fd from being closed, because they could
# be reallocated by new connection, either by this thread or by another.
# Sockets to close are queued, and they're really closed in the
# 'finally' clause.
self._closeAcquire()
try: try:
event_list = self.epoll.poll(blocking) event_list = self.epoll.poll(blocking)
except IOError, exc: except IOError, exc:
...@@ -158,12 +204,14 @@ class EpollEventManager(object): ...@@ -158,12 +204,14 @@ class EpollEventManager(object):
return return
else: else:
if event_list: if event_list:
self.unregistered = unregistered = []
wlist = [] wlist = []
elist = [] elist = []
for fd, event in event_list: for fd, event in event_list:
if event & EPOLLIN: if event & EPOLLIN:
try:
conn = self.connection_dict[fd] conn = self.connection_dict[fd]
except KeyError:
continue
if conn.readable(): if conn.readable():
self._addPendingConnection(conn) self._addPendingConnection(conn)
if event & EPOLLOUT: if event & EPOLLOUT:
...@@ -171,15 +219,16 @@ class EpollEventManager(object): ...@@ -171,15 +219,16 @@ class EpollEventManager(object):
if event & (EPOLLERR | EPOLLHUP): if event & (EPOLLERR | EPOLLHUP):
elist.append(fd) elist.append(fd)
for fd in wlist: for fd in wlist:
if fd not in unregistered: try:
self.connection_dict[fd].writable() conn = self.connection_dict[fd]
for fd in elist: except KeyError:
if fd in unregistered:
continue continue
conn.writable()
for fd in elist:
try: try:
conn = self.connection_dict[fd] conn = self.connection_dict[fd]
except KeyError: except KeyError:
assert fd == self._trigger_fd, fd if fd == self._trigger_fd:
with self._trigger_lock: with self._trigger_lock:
self.epoll.unregister(fd) self.epoll.unregister(fd)
if self._trigger_exit: if self._trigger_exit:
...@@ -189,6 +238,8 @@ class EpollEventManager(object): ...@@ -189,6 +238,8 @@ class EpollEventManager(object):
if conn.readable(): if conn.readable():
self._addPendingConnection(conn) self._addPendingConnection(conn)
return return
finally:
self._closeRelease()
if blocking > 0: if blocking > 0:
logging.debug('timeout triggered for %r', timeout_object) logging.debug('timeout triggered for %r', timeout_object)
timeout_object.onTimeout() timeout_object.onTimeout()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment