Commit c2c97752 authored by Julien Muchembled's avatar Julien Muchembled

Rename parameter of polling methods now that _poll computes the timeout itself

parent eef52c27
...@@ -104,10 +104,10 @@ class EpollEventManager(object): ...@@ -104,10 +104,10 @@ class EpollEventManager(object):
if conn not in pending_processing: if conn not in pending_processing:
pending_processing.append(conn) pending_processing.append(conn)
def poll(self, timeout=1): def poll(self, blocking=1):
if not self._pending_processing: if not self._pending_processing:
# Fetch messages from polled file descriptors # Fetch messages from polled file descriptors
self._poll(timeout=timeout) self._poll(blocking)
if not self._pending_processing: if not self._pending_processing:
return return
to_process = self._pending_processing.pop(0) to_process = self._pending_processing.pop(0)
...@@ -120,10 +120,10 @@ class EpollEventManager(object): ...@@ -120,10 +120,10 @@ class EpollEventManager(object):
# Non-blocking call: as we handled a packet, we should just offer # Non-blocking call: as we handled a packet, we should just offer
# poll a chance to fetch & send already-available data, but it must # poll a chance to fetch & send already-available data, but it must
# not delay us. # not delay us.
self._poll(timeout=0) self._poll(0)
def _poll(self, timeout=1): def _poll(self, blocking):
if timeout: if blocking:
timeout = None timeout = None
for conn in self.connection_dict.itervalues(): for conn in self.connection_dict.itervalues():
t = conn.getTimeout() t = conn.getTimeout()
...@@ -133,9 +133,9 @@ class EpollEventManager(object): ...@@ -133,9 +133,9 @@ class EpollEventManager(object):
# Make sure epoll_wait does not return too early, because it has a # Make sure epoll_wait does not return too early, because it has a
# granularity of 1ms and Python 2.7 rounds the timeout towards zero. # granularity of 1ms and Python 2.7 rounds the timeout towards zero.
# See also https://bugs.python.org/issue20452 (fixed in Python 3). # See also https://bugs.python.org/issue20452 (fixed in Python 3).
timeout = .001 + max(0, timeout - time()) if timeout else -1 blocking = .001 + max(0, timeout - time()) if timeout else -1
try: try:
event_list = self.epoll.poll(timeout) event_list = self.epoll.poll(blocking)
except IOError, exc: except IOError, exc:
if exc.errno in (0, EAGAIN): if exc.errno in (0, EAGAIN):
logging.info('epoll.poll triggered undocumented error %r', logging.info('epoll.poll triggered undocumented error %r',
...@@ -144,7 +144,7 @@ class EpollEventManager(object): ...@@ -144,7 +144,7 @@ class EpollEventManager(object):
raise raise
return return
if not event_list: if not event_list:
if timeout > 0: if blocking > 0:
timeout_conn.onTimeout() timeout_conn.onTimeout()
return return
wlist = [] wlist = []
......
...@@ -96,7 +96,7 @@ class EventTests(NeoUnitTestBase): ...@@ -96,7 +96,7 @@ class EventTests(NeoUnitTestBase):
(r_connector.getDescriptor(), EPOLLIN), (r_connector.getDescriptor(), EPOLLIN),
(w_connector.getDescriptor(), EPOLLOUT), (w_connector.getDescriptor(), EPOLLOUT),
)}) )})
em.poll(timeout=1) em.poll(1)
# check it called poll on epoll # check it called poll on epoll
self.assertEqual(len(em.epoll.mockGetNamedCalls("poll")), 1) self.assertEqual(len(em.epoll.mockGetNamedCalls("poll")), 1)
call = em.epoll.mockGetNamedCalls("poll")[0] call = em.epoll.mockGetNamedCalls("poll")[0]
......
...@@ -113,7 +113,7 @@ class Serialized(object): ...@@ -113,7 +113,7 @@ class Serialized(object):
class SerializedEventManager(EventManager): class SerializedEventManager(EventManager):
_lock = None _lock = None
_timeout = 0 _blocking = 0
@classmethod @classmethod
def decorate(cls, func): def decorate(cls, func):
...@@ -135,10 +135,10 @@ class SerializedEventManager(EventManager): ...@@ -135,10 +135,10 @@ class SerializedEventManager(EventManager):
self.__class__ = SerializedEventManager self.__class__ = SerializedEventManager
self._super__init__() self._super__init__()
def _poll(self, timeout=1): def _poll(self, blocking):
if self._pending_processing: if self._pending_processing:
assert timeout == 0, timeout assert blocking == 0, blocking
elif 0 == self._timeout == timeout == Serialized.pending == len( elif 0 == self._blocking == blocking == Serialized.pending == len(
self.writer_set): self.writer_set):
return return
else: else:
...@@ -151,11 +151,11 @@ class SerializedEventManager(EventManager): ...@@ -151,11 +151,11 @@ class SerializedEventManager(EventManager):
# TODO: Detect where a message is sent to jump immediately to nodes # TODO: Detect where a message is sent to jump immediately to nodes
# that will do something. # that will do something.
Serialized.tic(self._lock) Serialized.tic(self._lock)
if timeout != 0: if blocking != 0:
timeout = self._timeout blocking = self._blocking
if timeout != 0 and Serialized.pending == 1: if blocking != 0 and Serialized.pending == 1:
Serialized.pending = timeout = 0 Serialized.pending = blocking = 0
EventManager._poll(self, timeout) EventManager._poll(self, blocking)
def addReader(self, conn): def addReader(self, conn):
EventManager.addReader(self, conn) EventManager.addReader(self, conn)
...@@ -336,12 +336,12 @@ class ClientApplication(Node, neo.client.app.Application): ...@@ -336,12 +336,12 @@ class ClientApplication(Node, neo.client.app.Application):
processe packets upon NEOCluster.tic() calls. processe packets upon NEOCluster.tic() calls.
""" """
if master: if master:
self.em._timeout = 1 self.em._blocking = 1
if not self.em._lock.acquire(0): if not self.em._lock.acquire(0):
Serialized.background() Serialized.background()
else: else:
Serialized.release(wake_other=0); Serialized.acquire() Serialized.release(wake_other=0); Serialized.acquire()
self.em._timeout = 0 self.em._blocking = 0
def __del__(self): def __del__(self):
try: try:
...@@ -365,7 +365,7 @@ class NeoCTL(neo.neoctl.app.NeoCTL): ...@@ -365,7 +365,7 @@ class NeoCTL(neo.neoctl.app.NeoCTL):
@SerializedEventManager.decorate @SerializedEventManager.decorate
def __init__(self, *args, **kw): def __init__(self, *args, **kw):
super(NeoCTL, self).__init__(*args, **kw) super(NeoCTL, self).__init__(*args, **kw)
self.em._timeout = 1 self.em._blocking = 1
class LoggerThreadName(str): class LoggerThreadName(str):
...@@ -672,7 +672,7 @@ class NEOCluster(object): ...@@ -672,7 +672,7 @@ class NEOCluster(object):
return db return db
def stop(self): def stop(self):
if hasattr(self, '_db') and self.client.em._timeout == 0: if hasattr(self, '_db') and self.client.em._blocking == 0:
self.client.setPoll(True) self.client.setPoll(True)
self.__dict__.pop('_db', self.client).close() self.__dict__.pop('_db', self.client).close()
try: try:
...@@ -714,7 +714,7 @@ class NEOCluster(object): ...@@ -714,7 +714,7 @@ class NEOCluster(object):
def getZODBStorage(self, **kw): def getZODBStorage(self, **kw):
# automatically put client in master mode # automatically put client in master mode
if self.client.em._timeout == 0: if self.client.em._blocking == 0:
self.client.setPoll(True) self.client.setPoll(True)
return Storage.Storage(None, self.name, _app=self.client, **kw) return Storage.Storage(None, self.name, _app=self.client, **kw)
......
...@@ -256,12 +256,12 @@ class ReplicationTests(NEOThreadedTest): ...@@ -256,12 +256,12 @@ class ReplicationTests(NEOThreadedTest):
# force ping to have expired # force ping to have expired
# connection will be closed before upstream master has time # connection will be closed before upstream master has time
# to answer # to answer
def _poll(orig, self, timeout): def _poll(orig, self, blocking):
if backup.master.em is self: if backup.master.em is self:
p.revert() p.revert()
conn.onTimeout() conn.onTimeout()
else: else:
orig(self, timeout) orig(self, blocking)
with Patch(EventManager, _poll=_poll) as p: with Patch(EventManager, _poll=_poll) as p:
backup.tic(force=1) backup.tic(force=1)
new_conn, = backup.master.getConnectionList(backup.upstream.master) new_conn, = backup.master.getConnectionList(backup.upstream.master)
...@@ -285,7 +285,7 @@ class ReplicationTests(NEOThreadedTest): ...@@ -285,7 +285,7 @@ class ReplicationTests(NEOThreadedTest):
Serialized.tic(); count[0] or Serialized.tic() Serialized.tic(); count[0] or Serialized.tic()
t = time.time() t = time.time()
# XXX: review API for checking timeouts # XXX: review API for checking timeouts
backup.storage.em._timeout = 1 backup.storage.em._blocking = 1
Serialized.tic(); self.assertEqual(count[0], 2) Serialized.tic(); self.assertEqual(count[0], 2)
Serialized.tic(); self.assertEqual(count[0], 3) Serialized.tic(); self.assertEqual(count[0], 3)
self.assertTrue(t + 1 <= time.time()) self.assertTrue(t + 1 <= time.time())
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment