Commit 0e43dd1f authored by Julien Muchembled's avatar Julien Muchembled

Fix signals not always being processed as soon as possible

parent 39ae4a2f
...@@ -14,8 +14,9 @@ ...@@ -14,8 +14,9 @@
# You should have received a copy of the GNU General Public License # You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
import os import fcntl, os
from collections import deque from collections import deque
from signal import set_wakeup_fd
from time import time from time import time
from select import epoll, EPOLLIN, EPOLLOUT, EPOLLERR, EPOLLHUP from select import epoll, EPOLLIN, EPOLLOUT, EPOLLERR, EPOLLHUP
from errno import EAGAIN, EEXIST, EINTR, ENOENT from errno import EAGAIN, EEXIST, EINTR, ENOENT
...@@ -31,6 +32,15 @@ def dictionary_changed_size_during_iteration(): ...@@ -31,6 +32,15 @@ def dictionary_changed_size_during_iteration():
return str(e) return str(e)
raise AssertionError raise AssertionError
def nonblock(fd):
flags = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
# We use set_wakeup_fd to handle the case of a signal that happens between
# Python checks for signals and epoll_wait is called. Otherwise, the signal
# would not be processed as long as epoll_wait sleeps.
# If a process has several instances of EpollEventManager like in threaded
# tests, it does not matter which one is woke up by signals.
class EpollEventManager(object): class EpollEventManager(object):
"""This class manages connections and events based on epoll(5).""" """This class manages connections and events based on epoll(5)."""
...@@ -44,8 +54,14 @@ class EpollEventManager(object): ...@@ -44,8 +54,14 @@ class EpollEventManager(object):
self.epoll = epoll() self.epoll = epoll()
self._pending_processing = deque() self._pending_processing = deque()
self._trigger_list = [] self._trigger_list = []
self._trigger_fd, w = os.pipe() r, w = os.pipe()
os.close(w) self._wakeup_rfd = r
self._wakeup_wfd = w
nonblock(r)
nonblock(w)
fd = set_wakeup_fd(w)
assert fd == -1, fd
self.epoll.register(r, EPOLLIN)
self._trigger_lock = Lock() self._trigger_lock = Lock()
close_list = [] close_list = []
self._closeAppend = close_list.append self._closeAppend = close_list.append
...@@ -61,9 +77,12 @@ class EpollEventManager(object): ...@@ -61,9 +77,12 @@ class EpollEventManager(object):
self._closeRelease = release self._closeRelease = release
def close(self): def close(self):
os.close(self._trigger_fd) set_wakeup_fd(-1)
os.close(self._wakeup_wfd)
os.close(self._wakeup_rfd)
for c in self.connection_dict.values(): for c in self.connection_dict.values():
c.close() c.close()
self.epoll.close()
del self.__dict__ del self.__dict__
def getConnectionList(self): def getConnectionList(self):
...@@ -213,6 +232,15 @@ class EpollEventManager(object): ...@@ -213,6 +232,15 @@ class EpollEventManager(object):
try: try:
conn = self.connection_dict[fd] conn = self.connection_dict[fd]
except KeyError: except KeyError:
if fd == self._wakeup_rfd:
os.read(fd, 8)
with self._trigger_lock:
action_list = self._trigger_list
try:
while action_list:
action_list.pop(0)()
finally:
del action_list[:]
continue continue
if conn.readable(): if conn.readable():
pending_processing.append(conn) pending_processing.append(conn)
...@@ -230,15 +258,6 @@ class EpollEventManager(object): ...@@ -230,15 +258,6 @@ class EpollEventManager(object):
try: try:
conn = self.connection_dict[fd] conn = self.connection_dict[fd]
except KeyError: except KeyError:
if fd == self._trigger_fd:
with self._trigger_lock:
self.epoll.unregister(fd)
action_list = self._trigger_list
try:
while action_list:
action_list.pop(0)()
finally:
del action_list[:]
continue continue
if conn.readable(): if conn.readable():
pending_processing.append(conn) pending_processing.append(conn)
...@@ -262,10 +281,10 @@ class EpollEventManager(object): ...@@ -262,10 +281,10 @@ class EpollEventManager(object):
with self._trigger_lock: with self._trigger_lock:
self._trigger_list += actions self._trigger_list += actions
try: try:
self.epoll.register(self._trigger_fd) os.write(self._wakeup_wfd, '\0')
except IOError, e: except OSError, e:
# Ignore if 'wakeup' is called several times in a row. # Ignore if wakeup fd is triggered many times in a row.
if e.errno != EEXIST: if e.errno != EAGAIN:
raise raise
def addReader(self, conn): def addReader(self, conn):
......
...@@ -20,6 +20,7 @@ import functools ...@@ -20,6 +20,7 @@ import functools
import gc import gc
import os import os
import random import random
import signal
import socket import socket
import subprocess import subprocess
import sys import sys
...@@ -38,7 +39,7 @@ except ImportError: ...@@ -38,7 +39,7 @@ except ImportError:
from functools import wraps from functools import wraps
from inspect import isclass from inspect import isclass
from .mock import Mock from .mock import Mock
from neo.lib import debug, logging from neo.lib import debug, event, logging
from neo.lib.protocol import NodeTypes, Packet, Packets, UUID_NAMESPACES from neo.lib.protocol import NodeTypes, Packet, Packets, UUID_NAMESPACES
from neo.lib.util import cached_property from neo.lib.util import cached_property
from time import time, sleep from time import time, sleep
...@@ -96,6 +97,12 @@ logging.default_root_handler.handle = lambda record: None ...@@ -96,6 +97,12 @@ logging.default_root_handler.handle = lambda record: None
debug.register() debug.register()
# XXX: Not so important and complicated to make it work in the test process
# because there may be several EpollEventManager and threads.
# We only need it in child processes so that functional tests can stop.
event.set_wakeup_fd = lambda fd, pid=os.getpid(): (
-1 if pid == os.getpid() else signal.set_wakeup_fd(fd))
def mockDefaultValue(name, function): def mockDefaultValue(name, function):
def method(self, *args, **kw): def method(self, *args, **kw):
if name in self.mockReturnValues: if name in self.mockReturnValues:
...@@ -622,7 +629,6 @@ class Patch(object): ...@@ -622,7 +629,6 @@ class Patch(object):
def __exit__(self, t, v, tb): def __exit__(self, t, v, tb):
self.__del__() self.__del__()
def unpickle_state(data): def unpickle_state(data):
unpickler = Unpickler(StringIO(data)) unpickler = Unpickler(StringIO(data))
unpickler.persistent_load = PersistentReferenceFactory().persistent_load unpickler.persistent_load = PersistentReferenceFactory().persistent_load
......
...@@ -201,14 +201,6 @@ class Process(object): ...@@ -201,14 +201,6 @@ class Process(object):
logging._max_size, logging._max_packet, logging._max_size, logging._max_packet,
command), command),
*args) *args)
# XXX: Sometimes, the handler is not called immediately.
# The process is stuck at an unknown place and the test
# never ends. strace unlocks:
# strace: Process 5520 attached
# close(25) = 0
# getpid() = 5520
# kill(5520, SIGSTOP) = 0
# ...
signal.signal(signal.SIGUSR2, save_coverage) signal.signal(signal.SIGUSR2, save_coverage)
os.close(self._coverage_fd) os.close(self._coverage_fd)
os.write(w, '\0') os.write(w, '\0')
......
...@@ -21,15 +21,17 @@ from .. import Patch, SSL ...@@ -21,15 +21,17 @@ from .. import Patch, SSL
from . import NEOCluster, test, testReplication from . import NEOCluster, test, testReplication
class SSLMixin: class SSLMixin(object):
@classmethod @classmethod
def setUpClass(cls): def setUpClass(cls):
super(SSLMixin, cls).setUpClass()
NEOCluster.SSL = SSL NEOCluster.SSL = SSL
@classmethod @classmethod
def tearDownClass(cls): def tearDownClass(cls):
NEOCluster.SSL = None NEOCluster.SSL = None
super(SSLMixin, cls).tearDownClass()
class SSLTests(SSLMixin, test.Test): class SSLTests(SSLMixin, test.Test):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment