Commit 04c1ddc3 authored by Denis Bilenko's avatar Denis Bilenko

add max_accept parameter to StreamServer

documentation is taken from http://haproxy.1wt.eu/download/1.5/doc/configuration.txt
parent 89cd7a1e
...@@ -33,6 +33,10 @@ class StreamServer(BaseServer): ...@@ -33,6 +33,10 @@ class StreamServer(BaseServer):
The delay starts with :attr:`min_delay` and doubles with each successive error until it reaches :attr:`max_delay`. The delay starts with :attr:`min_delay` and doubles with each successive error until it reaches :attr:`max_delay`.
A successful :func:`accept` resets the delay to :attr:`min_delay` again. A successful :func:`accept` resets the delay to :attr:`min_delay` again.
""" """
# Sets the maximum number of consecutive accepts that a process may perform on
# a single wake up. High values give higher priority to high connection rates,
# while lower values give higher priority to already established connections.
max_accept = 100
# the number of seconds to sleep in case there was an error in accept() call # the number of seconds to sleep in case there was an error in accept() call
# for consecutive errors the delay will double until it reaches max_delay # for consecutive errors the delay will double until it reaches max_delay
...@@ -102,37 +106,38 @@ class StreamServer(BaseServer): ...@@ -102,37 +106,38 @@ class StreamServer(BaseServer):
self._start_accepting_timer = None self._start_accepting_timer = None
def _do_accept(self): def _do_accept(self):
address = None for _ in xrange(self.max_accept):
try: address = None
if self.full():
self.stop_accepting()
return
try: try:
client_socket, address = self.socket.accept() if self.full():
except socket.error, err: self.stop_accepting()
if err[0] == errno.EAGAIN: return
try:
client_socket, address = self.socket.accept()
except socket.error, err:
if err[0] == errno.EAGAIN:
return
raise
self.delay = self.min_delay
client_socket = socket.socket(_sock=client_socket)
spawn = self._spawn
if spawn is None:
self._handle(client_socket, address)
else:
spawn(self._handle, client_socket, address)
except:
self.loop.handle_error((address, self), *sys.exc_info())
ex = sys.exc_info()[1]
if self.is_fatal_error(ex):
self.kill()
sys.stderr.write('ERROR: %s failed with %s\n' % (self, str(ex) or repr(ex)))
return return
raise if self.delay >= 0:
self.delay = self.min_delay self.stop_accepting()
client_socket = socket.socket(_sock=client_socket) self._start_accepting_timer = self.loop.timer(self.delay)
spawn = self._spawn self._start_accepting_timer.start(self._start_accepting_if_started)
if spawn is None: self.delay = min(self.max_delay, self.delay * 2)
self._handle(client_socket, address) break
else:
spawn(self._handle, client_socket, address)
return
except:
self.loop.handle_error((address, self), *sys.exc_info())
ex = sys.exc_info()[1]
if self.is_fatal_error(ex):
self.kill()
sys.stderr.write('ERROR: %s failed with %s\n' % (self, str(ex) or repr(ex)))
return
if self.delay >= 0:
self.stop_accepting()
self._start_accepting_timer = self.loop.timer(self.delay)
self._start_accepting_timer.start(self._start_accepting_if_started)
self.delay = min(self.max_delay, self.delay * 2)
def is_fatal_error(self, ex): def is_fatal_error(self, ex):
return isinstance(ex, socket.error) and ex[0] in (errno.EBADF, errno.EINVAL, errno.ENOTSOCK) return isinstance(ex, socket.error) and ex[0] in (errno.EBADF, errno.EINVAL, errno.ENOTSOCK)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment