event.py 12.5 KB
Newer Older
Aurel's avatar
Aurel committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
#
# Copyright (C) 2006-2009  Nexedi SA
# 
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
# 
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.

18
from neo import logging
Yoshinori Okuji's avatar
Yoshinori Okuji committed
19 20 21
from select import select
from time import time

22
from neo.epoll import Epoll
Yoshinori Okuji's avatar
Yoshinori Okuji committed
23

Yoshinori Okuji's avatar
Yoshinori Okuji committed
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
class IdleEvent(object):
    """This class represents an event called when a connection is waiting for
    a message too long."""

    def __init__(self, conn, msg_id, timeout, additional_timeout):
        self._conn = conn
        self._id = msg_id
        t = time()
        self._time = t + timeout
        self._critical_time = t + timeout + additional_timeout
        self._additional_timeout = additional_timeout

    def getId(self):
        return self._id

    def getTime(self):
        return self._time

    def getCriticalTime(self):
        return self._critical_time

    def __call__(self, t):
        conn = self._conn
        if t > self._critical_time:
48 49 50
            # No answer after _critical_time, close connection.
            # This means that remote peer is processing the request for too
            # long, although being responsive at network level.
51 52
            conn.lock()
            try:
53 54
                logging.info('timeout for %r with %s:%d', 
                             self._id, *(conn.getAddress()))
55
                conn.close()
56
                conn.getHandler().timeoutExpired(conn)
57 58 59
                return True
            finally:
                conn.unlock()
Yoshinori Okuji's avatar
Yoshinori Okuji committed
60
        elif t > self._time:
61 62 63 64 65 66 67 68 69 70 71 72 73
            # Still no answer after _time, send a ping to see if connection is
            # broken.
            # Sending a ping triggers a new IdleEvent for the ping (hard timeout
            # after 5 seconds, see part on additional_timeout above).
            # XXX: Here, we return True, which causes the current IdleEvent
            # instance to be discarded, and a new instance is created with
            # reduced additional_timeout. It must be possible to avoid
            # recreating a new instance just to keep waiting for the same
            # response.
            # XXX: This code has no meaning if the remote peer is single-
            # threaded. Nevertheless, it should be kept in case it gets
            # multithreaded, someday (master & storage are the only candidates
            # for using this code, as other don't receive requests).
74 75 76
            conn.lock()
            try:
                if self._additional_timeout > 5:
77 78 79
                    # XXX this line is misleading: we modify self, but this
                    # instance is doomed anyway: we will return True, causing
                    # it to be discarded.
80 81
                    self._additional_timeout -= 5
                    conn.expectMessage(self._id, 5, self._additional_timeout)
82
                    conn.ping()
83 84 85 86 87
                else:
                    conn.expectMessage(self._id, self._additional_timeout, 0)
                return True
            finally:
                conn.unlock()
Yoshinori Okuji's avatar
Yoshinori Okuji committed
88 89
        return False

90 91
class SelectEventManager(object):
    """This class manages connections and events based on select(2)."""
Yoshinori Okuji's avatar
Yoshinori Okuji committed
92 93 94 95 96 97 98 99

    def __init__(self):
        self.connection_dict = {}
        self.reader_set = set([])
        self.writer_set = set([])
        self.exc_list = []
        self.event_list = []
        self.prev_time = time()
100
        self._pending_processing = []
Yoshinori Okuji's avatar
Yoshinori Okuji committed
101

102
    def getConnectionList(self):
Yoshinori Okuji's avatar
Yoshinori Okuji committed
103 104 105
        return self.connection_dict.values()

    def register(self, conn):
106
        self.connection_dict[conn.getConnector()] = conn
Yoshinori Okuji's avatar
Yoshinori Okuji committed
107 108

    def unregister(self, conn):
109 110 111 112 113 114
        new_pending_processing = [x for x in self._pending_processing
                                  if x is not conn]
        # Check that we removed at most one entry from
        # self._pending_processing .
        assert len(new_pending_processing) > len(self._pending_processing) - 2
        self._pending_processing = new_pending_processing
115
        del self.connection_dict[conn.getConnector()]
Yoshinori Okuji's avatar
Yoshinori Okuji committed
116

117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140
    def _getPendingConnection(self):
        if len(self._pending_processing):
            result = self._pending_processing.pop(0)
        else:
            result = None
        return result

    def _addPendingConnection(self, conn):
        self._pending_processing.append(conn)

    def poll(self, timeout = 1):
        to_process = self._getPendingConnection()
        if to_process is None:
            # Fetch messages from polled file descriptors
            self._poll(timeout=timeout)
            # See if there is anything to process
            to_process = self._getPendingConnection()
        if to_process is not None:
            # Process
            to_process.process()
            # ...and requeue if there are pending messages
            if to_process.hasPendingMessages():
                self._addPendingConnection(to_process)

141
    def _poll(self, timeout = 1):
Yoshinori Okuji's avatar
Yoshinori Okuji committed
142 143 144 145
        rlist, wlist, xlist = select(self.reader_set, self.writer_set, self.exc_list,
                                     timeout)
        for s in rlist:
            conn = self.connection_dict[s]
146 147 148 149 150
            conn.lock()
            try:
                conn.readable()
            finally:
                conn.unlock()
151 152
            if conn.hasPendingMessages():
                self._addPendingConnection(conn)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
153 154

        for s in wlist:
Yoshinori Okuji's avatar
Yoshinori Okuji committed
155 156 157
            # This can fail, if a connection is closed in readable().
            try:
                conn = self.connection_dict[s]
158 159 160
            except KeyError:
                pass
            else:
161 162 163 164 165
                conn.lock()
                try:
                    conn.writable()
                finally:
                    conn.unlock()
Yoshinori Okuji's avatar
Yoshinori Okuji committed
166 167 168 169 170 171 172 173

        # Check idle events. Do not check them out too often, because this
        # is somehow heavy.
        event_list = self.event_list
        if event_list:
            t = time()
            if t - self.prev_time >= 1:
                self.prev_time = t
174 175
                event_list.sort(key = lambda event: event.getTime(), 
                                reverse = True)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
176
                while event_list:
177
                    event = event_list.pop()
Yoshinori Okuji's avatar
Yoshinori Okuji committed
178
                    if event(t):
Yoshinori Okuji's avatar
Yoshinori Okuji committed
179 180 181 182
                        try:
                            event_list.remove(event)
                        except ValueError:
                            pass
Yoshinori Okuji's avatar
Yoshinori Okuji committed
183 184 185 186 187 188 189 190 191 192 193 194 195
                    else:
                        break

    def addIdleEvent(self, event):
        self.event_list.append(event)

    def removeIdleEvent(self, event):
        try:
            self.event_list.remove(event)
        except ValueError:
            pass

    def addReader(self, conn):
196
        self.reader_set.add(conn.getConnector())
Yoshinori Okuji's avatar
Yoshinori Okuji committed
197 198

    def removeReader(self, conn):
199
        self.reader_set.discard(conn.getConnector())
Yoshinori Okuji's avatar
Yoshinori Okuji committed
200 201

    def addWriter(self, conn):
202
        self.writer_set.add(conn.getConnector())
Yoshinori Okuji's avatar
Yoshinori Okuji committed
203 204

    def removeWriter(self, conn):
205
        self.writer_set.discard(conn.getConnector())
Yoshinori Okuji's avatar
Yoshinori Okuji committed
206

207 208 209 210 211 212 213 214 215 216
class EpollEventManager(object):
    """This class manages connections and events based on epoll(5)."""

    def __init__(self):
        self.connection_dict = {}
        self.reader_set = set([])
        self.writer_set = set([])
        self.event_list = []
        self.prev_time = time()
        self.epoll = Epoll()
217
        self._pending_processing = []
218 219 220 221

    def getConnectionList(self):
        return self.connection_dict.values()

222 223 224
    def getConnectionByUUID(self, uuid):
        """ Return the connection associated to the UUID, None if the UUID is
        None, invalid or not found"""
225
        if uuid is None:
226 227 228 229 230 231
            return None
        for conn in self.connection_dict.values():
            if conn.getUUID() == uuid:
                return conn
        return None

232
    def register(self, conn):
233
        fd = conn.getConnector().getDescriptor()
234 235 236 237
        self.connection_dict[fd] = conn
        self.epoll.register(fd)

    def unregister(self, conn):
238 239 240 241 242 243
        new_pending_processing = [x for x in self._pending_processing
                                  if x is not conn]
        # Check that we removed at most one entry from
        # self._pending_processing .
        assert len(new_pending_processing) > len(self._pending_processing) - 2
        self._pending_processing = new_pending_processing
244
        fd = conn.getConnector().getDescriptor()
245 246 247
        self.epoll.unregister(fd)
        del self.connection_dict[fd]

248 249 250 251 252 253 254 255 256 257
    def _getPendingConnection(self):
        if len(self._pending_processing):
            result = self._pending_processing.pop(0)
        else:
            result = None
        return result

    def _addPendingConnection(self, conn):
        self._pending_processing.append(conn)

258
    def poll(self, timeout = 1):
259 260 261 262 263 264 265
        to_process = self._getPendingConnection()
        if to_process is None:
            # Fetch messages from polled file descriptors
            self._poll(timeout=timeout)
            # See if there is anything to process
            to_process = self._getPendingConnection()
        if to_process is not None:
266 267 268 269 270 271 272
            try:
                # Process
                to_process.process()
            finally:
                # ...and requeue if there are pending messages
                if to_process.hasPendingMessages():
                    self._addPendingConnection(to_process)
273 274

    def _poll(self, timeout = 1):
275 276 277
        rlist, wlist = self.epoll.poll(timeout)
        for fd in rlist:
            try:
Yoshinori Okuji's avatar
Yoshinori Okuji committed
278
                conn = self.connection_dict[fd]
279 280 281
            except KeyError:
                pass
            else:
Yoshinori Okuji's avatar
Yoshinori Okuji committed
282 283 284 285 286
                conn.lock()
                try:
                    conn.readable()
                finally:
                    conn.unlock()
287 288
                if conn.hasPendingMessages():
                    self._addPendingConnection(conn)
289 290 291 292 293

        for fd in wlist:
            # This can fail, if a connection is closed in readable().
            try:
                conn = self.connection_dict[fd]
294 295 296
            except KeyError:
                pass
            else:
297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330
                conn.lock()
                try:
                    conn.writable()
                finally:
                    conn.unlock()

        # Check idle events. Do not check them out too often, because this
        # is somehow heavy.
        event_list = self.event_list
        if event_list:
            t = time()
            if t - self.prev_time >= 1:
                self.prev_time = t
                event_list.sort(key = lambda event: event.getTime())
                while event_list:
                    event = event_list[0]
                    if event(t):
                        try:
                            event_list.remove(event)
                        except ValueError:
                            pass
                    else:
                        break

    def addIdleEvent(self, event):
        self.event_list.append(event)

    def removeIdleEvent(self, event):
        try:
            self.event_list.remove(event)
        except ValueError:
            pass

    def addReader(self, conn):
331
        try:
332
            fd = conn.getConnector().getDescriptor()
333 334 335 336 337
            if fd not in self.reader_set:
                self.reader_set.add(fd)
                self.epoll.modify(fd, 1, fd in self.writer_set)
        except AttributeError:
            pass
338 339

    def removeReader(self, conn):
340
        try:
341
            fd = conn.getConnector().getDescriptor()
342 343 344 345 346
            if fd in self.reader_set:
                self.reader_set.remove(fd)
                self.epoll.modify(fd, 0, fd in self.writer_set)
        except AttributeError:
            pass
347 348

    def addWriter(self, conn):
349
        try:
350
            fd = conn.getConnector().getDescriptor()
351 352 353 354 355
            if fd not in self.writer_set:
                self.writer_set.add(fd)
                self.epoll.modify(fd, fd in self.reader_set, 1)
        except AttributeError:
            pass
356 357

    def removeWriter(self, conn):
358
        try:
359
            fd = conn.getConnector().getDescriptor()
360 361 362 363 364
            if fd in self.writer_set:
                self.writer_set.remove(fd)
                self.epoll.modify(fd, fd in self.reader_set, 0)
        except AttributeError:
            pass
365 366 367

# Default to EpollEventManager.
EventManager = EpollEventManager