Commit 68f74a4e authored by Denis Bilenko's avatar Denis Bilenko

add gevent.server module with StreamServer class

parent b8217e45
# Copyright (c) 2009-2010 Denis Bilenko. See LICENSE for details.
import sys
from gevent.greenlet import Greenlet
from gevent.pool import GreenletSet, Pool
from gevent import socket
from gevent import sleep
__all__ = ['StreamServer']
class StreamServer(Greenlet):
backlog = 256
first_delay = 0.1
_allowed_ssl_args = ['keyfile', 'certfile', 'cert_reqs', 'ssl_version', 'ca_certs', 'suppress_ragged_eofs']
def __init__(self, listener, backlog=None, pool=None, log=sys.stderr, **ssl_args):
self.ssl_enabled = False
if hasattr(listener, 'accept'):
self.socket = listener
self.address = listener.getsockname()
self.ssl_enabled = hasattr(listener, 'do_handshake')
else:
if not isinstance(listener, tuple):
raise TypeError('Expected a socket instance or a tuple: %r' % (listener, ))
if backlog is None:
backlog = self.backlog
self.address = listener
if ssl_args:
self.ssl_enabled = True
for key, value in ssl_args:
if key not in self._allowed_ssl_args:
raise TypeError('Unexpected argument: %r' % (key, ))
else:
setattr(self, key, value)
if pool is None:
self.pool = GreenletSet()
elif hasattr(pool, 'spawn'):
self.pool = pool
elif isinstance(pool, int):
self.pool = Pool(pool)
self.log = log
Greenlet.__init__(self)
def __str__(self):
return '<%s on %s>' % (self.__class__.__name__, self.socket)
def log_message(self, message):
self.log.write(message + '\n')
@property
def server_host(self):
return self.address[0]
@property
def server_port(self):
return self.address[1]
def pre_start(self):
if not hasattr(self, 'socket'):
self.socket = socket.tcp_listener(self.address, backlog=self.backlog)
self.address = self.socket.getsockname()
if self.ssl_enabled:
from gevent.ssl import wrap_socket
args = {}
for arg in self._allowed_ssl_args:
try:
value = getattr(self, arg)
except AttributeError:
pass
else:
args[arg] = value
self.socket = wrap_socket(self.socket, **args)
def start(self):
self.pre_start()
Greenlet.start(self)
def _run(self):
try:
self.delay = self.first_delay
while True:
try:
client_socket, address = self.socket.accept()
self.delay = self.first_delay
self.pool.spawn(self.handle, client_socket, address)
except socket.error, e:
self.log_message('WARNING: %s: accept() failed with %s: %s; will sleep %s seconds' % (self, e[0], e, self.delay))
sleep(self.delay)
self.delay *= 2
finally:
try:
self.socket.close()
except Exception:
pass
def stop(self):
self.kill(block=True)
def serve_forever(self):
if not self: # XXX will this work: server.start(); server.serve_forever()
self.start()
self.server.join()
def handle(self, socket, address):
raise NotImplementedError('override in a subclass')
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment