Commit 20964f4f authored by Vincent Pelletier's avatar Vincent Pelletier

Use python's epoll wrapper

Custom ctypes wrapper for epoll is not needed with python 2.6+.
Also, do not retry upon temporary error: it is valid to handle no packet,
and retrying the poll call would increase overall timeout without easy fix.
Also, remove meaningless frozensets (leftovers from an old bug's symptoms).
parent 200a63c1
#
# Copyright (C) 2006-2010 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
"""This is an epoll(4) interface available in Linux 2.6."""
from ctypes import CDLL, get_errno, Union, Structure
from ctypes import c_void_p, c_int, byref, c_uint32, c_uint64
from os import close
from errno import EINTR, EAGAIN
libc = CDLL('libc.so.6', use_errno=True)
epoll_create = libc.epoll_create
epoll_wait = libc.epoll_wait
epoll_ctl = libc.epoll_ctl
EPOLLIN = 0x001
EPOLLPRI = 0x002
EPOLLOUT = 0x004
EPOLLRDNORM = 0x040
EPOLLRDBAND = 0x080
EPOLLWRNORM = 0x100
EPOLLWRBAND = 0x200
EPOLLMSG = 0x400
EPOLLERR = 0x008
EPOLLHUP = 0x010
EPOLLONESHOT = (1 << 30)
EPOLLET = (1 << 31)
EPOLL_CTL_ADD = 1
EPOLL_CTL_DEL = 2
EPOLL_CTL_MOD = 3
class EpollData(Union):
_fields_ = [("ptr", c_void_p),
("fd", c_int),
("u32", c_uint32),
("u64", c_uint64)]
class EpollEvent(Structure):
_fields_ = [("events", c_uint32),
("data", EpollData)]
_pack_ = 1
class Epoll(object):
efd = -1
def __init__(self):
self.efd = epoll_create(10)
if self.efd == -1:
raise OSError(get_errno(), 'epoll_create failed')
self.maxevents = 1024 # XXX arbitrary
epoll_event_array = EpollEvent * self.maxevents
self.events = epoll_event_array()
def poll(self, timeout=1):
if timeout is None:
timeout = -1
else:
timeout *= 1000
timeout = int(timeout)
while True:
n = epoll_wait(self.efd, byref(self.events), self.maxevents,
timeout)
if n == -1:
e = get_errno()
# XXX: Why 0 ?
if e in (0, EINTR, EAGAIN):
continue
else:
raise OSError(e, 'epoll_wait failed')
else:
readable_fd_list = []
writable_fd_list = []
error_fd_list = []
for i in xrange(n):
ev = self.events[i]
fd = int(ev.data.fd)
if ev.events & EPOLLIN:
readable_fd_list.append(fd)
if ev.events & EPOLLOUT:
writable_fd_list.append(fd)
if ev.events & (EPOLLERR | EPOLLHUP):
error_fd_list.append(fd)
return readable_fd_list, writable_fd_list, error_fd_list
def register(self, fd):
ev = EpollEvent()
ev.data.fd = fd
ret = epoll_ctl(self.efd, EPOLL_CTL_ADD, fd, byref(ev))
if ret == -1:
raise OSError(get_errno(), 'epoll_ctl failed')
def modify(self, fd, readable, writable):
ev = EpollEvent()
ev.data.fd = fd
events = 0
if readable:
events |= EPOLLIN
if writable:
events |= EPOLLOUT
ev.events = events
ret = epoll_ctl(self.efd, EPOLL_CTL_MOD, fd, byref(ev))
if ret == -1:
raise OSError(get_errno(), 'epoll_ctl failed')
def unregister(self, fd):
ev = EpollEvent()
ret = epoll_ctl(self.efd, EPOLL_CTL_DEL, fd, byref(ev))
if ret == -1:
raise OSError(get_errno(), 'epoll_ctl failed')
def __del__(self):
efd = self.efd
if efd >= 0:
del self.efd
close(efd)
close = __del__
...@@ -17,7 +17,8 @@ ...@@ -17,7 +17,8 @@
from time import time from time import time
import neo.lib import neo.lib
from .epoll import Epoll from select import epoll, EPOLLIN, EPOLLOUT, EPOLLERR, EPOLLHUP
from errno import EINTR, EAGAIN
from .profiling import profiler_decorator from .profiling import profiler_decorator
class EpollEventManager(object): class EpollEventManager(object):
...@@ -27,7 +28,7 @@ class EpollEventManager(object): ...@@ -27,7 +28,7 @@ class EpollEventManager(object):
self.connection_dict = {} self.connection_dict = {}
self.reader_set = set([]) self.reader_set = set([])
self.writer_set = set([]) self.writer_set = set([])
self.epoll = Epoll() self.epoll = epoll()
self._pending_processing = [] self._pending_processing = []
def close(self): def close(self):
...@@ -111,8 +112,19 @@ class EpollEventManager(object): ...@@ -111,8 +112,19 @@ class EpollEventManager(object):
self._poll(timeout=0) self._poll(timeout=0)
def _poll(self, timeout=1): def _poll(self, timeout=1):
rlist, wlist, elist = self.epoll.poll(timeout) try:
for fd in frozenset(rlist): event_list = self.epoll.poll(timeout)
except IOError, exc:
if exc.errno in (0, EAGAIN):
neo.lib.logging.info('epoll.poll triggered undocumented '
'error %r', exc.errno)
elif exc.errno != EINTR:
raise
event_list = ()
wlist = []
elist = []
for fd, event in event_list:
if event & EPOLLIN:
conn = self.connection_dict[fd] conn = self.connection_dict[fd]
conn.lock() conn.lock()
try: try:
...@@ -121,8 +133,12 @@ class EpollEventManager(object): ...@@ -121,8 +133,12 @@ class EpollEventManager(object):
conn.unlock() conn.unlock()
if conn.hasPendingMessages(): if conn.hasPendingMessages():
self._addPendingConnection(conn) self._addPendingConnection(conn)
if event & EPOLLOUT:
wlist.append(fd)
if event & (EPOLLERR | EPOLLHUP):
elist.append(fd)
for fd in frozenset(wlist): for fd in wlist:
# This can fail, if a connection is closed in readable(). # This can fail, if a connection is closed in readable().
try: try:
conn = self.connection_dict[fd] conn = self.connection_dict[fd]
...@@ -135,7 +151,7 @@ class EpollEventManager(object): ...@@ -135,7 +151,7 @@ class EpollEventManager(object):
finally: finally:
conn.unlock() conn.unlock()
for fd in frozenset(elist): for fd in elist:
# This can fail, if a connection is closed in previous calls to # This can fail, if a connection is closed in previous calls to
# readable() or writable(). # readable() or writable().
try: try:
...@@ -165,7 +181,8 @@ class EpollEventManager(object): ...@@ -165,7 +181,8 @@ class EpollEventManager(object):
fd = connector.getDescriptor() fd = connector.getDescriptor()
if fd not in self.reader_set: if fd not in self.reader_set:
self.reader_set.add(fd) self.reader_set.add(fd)
self.epoll.modify(fd, 1, fd in self.writer_set) self.epoll.modify(fd, EPOLLIN | (
fd in self.writer_set and EPOLLOUT))
def removeReader(self, conn): def removeReader(self, conn):
connector = conn.getConnector() connector = conn.getConnector()
...@@ -173,7 +190,7 @@ class EpollEventManager(object): ...@@ -173,7 +190,7 @@ class EpollEventManager(object):
fd = connector.getDescriptor() fd = connector.getDescriptor()
if fd in self.reader_set: if fd in self.reader_set:
self.reader_set.remove(fd) self.reader_set.remove(fd)
self.epoll.modify(fd, 0, fd in self.writer_set) self.epoll.modify(fd, fd in self.writer_set and EPOLLOUT)
@profiler_decorator @profiler_decorator
def addWriter(self, conn): def addWriter(self, conn):
...@@ -182,7 +199,8 @@ class EpollEventManager(object): ...@@ -182,7 +199,8 @@ class EpollEventManager(object):
fd = connector.getDescriptor() fd = connector.getDescriptor()
if fd not in self.writer_set: if fd not in self.writer_set:
self.writer_set.add(fd) self.writer_set.add(fd)
self.epoll.modify(fd, fd in self.reader_set, 1) self.epoll.modify(fd, EPOLLOUT | (
fd in self.reader_set and EPOLLIN))
def removeWriter(self, conn): def removeWriter(self, conn):
connector = conn.getConnector() connector = conn.getConnector()
...@@ -190,7 +208,7 @@ class EpollEventManager(object): ...@@ -190,7 +208,7 @@ class EpollEventManager(object):
fd = connector.getDescriptor() fd = connector.getDescriptor()
if fd in self.writer_set: if fd in self.writer_set:
self.writer_set.remove(fd) self.writer_set.remove(fd)
self.epoll.modify(fd, fd in self.reader_set, 0) self.epoll.modify(fd, fd in self.reader_set and EPOLLIN)
def log(self): def log(self):
neo.lib.logging.info('Event Manager:') neo.lib.logging.info('Event Manager:')
......
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
import unittest import unittest
from mock import Mock from mock import Mock
from . import NeoUnitTestBase from . import NeoUnitTestBase
from neo.lib.epoll import Epoll from select import epoll, EPOLLIN, EPOLLOUT
from neo.lib.event import EpollEventManager from neo.lib.event import EpollEventManager
class EventTests(NeoUnitTestBase): class EventTests(NeoUnitTestBase):
...@@ -28,7 +28,7 @@ class EventTests(NeoUnitTestBase): ...@@ -28,7 +28,7 @@ class EventTests(NeoUnitTestBase):
self.assertEqual(len(em.connection_dict), 0) self.assertEqual(len(em.connection_dict), 0)
self.assertEqual(len(em.reader_set), 0) self.assertEqual(len(em.reader_set), 0)
self.assertEqual(len(em.writer_set), 0) self.assertEqual(len(em.writer_set), 0)
self.assertTrue(isinstance(em.epoll, Epoll)) self.assertTrue(isinstance(em.epoll, epoll))
# use a mock object instead of epoll # use a mock object instead of epoll
em.epoll = Mock() em.epoll = Mock()
connector = self.getFakeConnector(descriptor=1014) connector = self.getFakeConnector(descriptor=1014)
...@@ -94,9 +94,8 @@ class EventTests(NeoUnitTestBase): ...@@ -94,9 +94,8 @@ class EventTests(NeoUnitTestBase):
w_conn = self.getFakeConnection(connector=w_connector) w_conn = self.getFakeConnection(connector=w_connector)
em.register(w_conn) em.register(w_conn)
em.epoll = Mock({"poll":( em.epoll = Mock({"poll":(
(r_connector.getDescriptor(),), (r_connector.getDescriptor(), EPOLLIN),
(w_connector.getDescriptor(),), (w_connector.getDescriptor(), EPOLLOUT),
(),
)}) )})
em.poll(timeout=10) em.poll(timeout=10)
# check it called poll on epoll # check it called poll on epoll
......
...@@ -138,7 +138,7 @@ class SerializedEventManager(EventManager): ...@@ -138,7 +138,7 @@ class SerializedEventManager(EventManager):
def _poll(self, timeout=1): def _poll(self, timeout=1):
if self._pending_processing: if self._pending_processing:
assert not timeout assert timeout <= 0
elif 0 == self._timeout == timeout == Serialized.pending == len( elif 0 == self._timeout == timeout == Serialized.pending == len(
self.writer_set): self.writer_set):
return return
...@@ -324,7 +324,7 @@ class NeoCTL(neo.neoctl.app.NeoCTL): ...@@ -324,7 +324,7 @@ class NeoCTL(neo.neoctl.app.NeoCTL):
def __init__(self, cluster, address=(getVirtualIp('admin'), 0)): def __init__(self, cluster, address=(getVirtualIp('admin'), 0)):
self._cluster = cluster self._cluster = cluster
super(NeoCTL, self).__init__(address) super(NeoCTL, self).__init__(address)
self.em._timeout = None self.em._timeout = -1
server = property(lambda self: self._cluster.resolv(self._server), server = property(lambda self: self._cluster.resolv(self._server),
lambda self, address: setattr(self, '_server', address)) lambda self, address: setattr(self, '_server', address))
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment