Commit 2889c11a authored by Denis Bilenko's avatar Denis Bilenko

Merged in geertj/gevent (pull request #20)

parents 249d7521 58ded201
...@@ -6,6 +6,8 @@ to provide a high-level synchronous API on top of libev event loop. ...@@ -6,6 +6,8 @@ to provide a high-level synchronous API on top of libev event loop.
See http://www.gevent.org/ for the documentation. See http://www.gevent.org/ for the documentation.
""" """
from __future__ import absolute_import
version_info = (1, 0, 0, 'dev', None) version_info = (1, 0, 0, 'dev', None)
__version__ = '1.0dev' __version__ = '1.0dev'
...@@ -32,7 +34,7 @@ __all__ = ['get_hub', ...@@ -32,7 +34,7 @@ __all__ = ['get_hub',
import sys import sys
if sys.platform == 'win32': if sys.platform == 'win32':
__import__('socket') # trigger WSAStartup call import socket # trigger WSAStartup call
del sys del sys
...@@ -43,7 +45,7 @@ spawn_later = Greenlet.spawn_later ...@@ -43,7 +45,7 @@ spawn_later = Greenlet.spawn_later
from gevent.timeout import Timeout, with_timeout from gevent.timeout import Timeout, with_timeout
from gevent.hub import getcurrent, GreenletExit, spawn_raw, sleep, idle, kill, signal, reinit from gevent.hub import getcurrent, GreenletExit, spawn_raw, sleep, idle, kill, signal, reinit
try: try:
from gevent.hub import fork from gevent.os import fork
except ImportError: except ImportError:
__all__.remove('fork') __all__.remove('fork')
......
from python cimport * from python cimport *
import os # Work around lack of absolute_import in Cython.
os = __import__('os', level=0)
__all__ = ['set_exc_info'] __all__ = ['set_exc_info']
......
...@@ -3,7 +3,8 @@ cimport cython ...@@ -3,7 +3,8 @@ cimport cython
cimport libev cimport libev
from python cimport * from python cimport *
import sys import sys
import os # Work around lack of absolute_import in Cython
os = __import__('os', level=0)
import traceback import traceback
import signal as signalmodule import signal as signalmodule
......
from __future__ import absolute_import
import sys import sys
import os import os
from gevent.hub import get_hub from gevent.hub import get_hub
from gevent.socket import EBADF from gevent.socket import EBADF
from gevent.os import os_read, os_write
try: try:
...@@ -104,7 +106,7 @@ else: ...@@ -104,7 +106,7 @@ else:
bytes_written = 0 bytes_written = 0
while bytes_written < bytes_total: while bytes_written < bytes_total:
try: try:
bytes_written += os.write(fileno, _get_memory(data, bytes_written)) bytes_written += os_write(fileno, _get_memory(data, bytes_written))
except (IOError, OSError): except (IOError, OSError):
code = sys.exc_info()[1].args[0] code = sys.exc_info()[1].args[0]
if code not in ignore_errors: if code not in ignore_errors:
...@@ -115,7 +117,7 @@ else: ...@@ -115,7 +117,7 @@ else:
def recv(self, size): def recv(self, size):
while True: while True:
try: try:
data = os.read(self.fileno(), size) data = os_read(self.fileno(), size)
except (IOError, OSError): except (IOError, OSError):
code = sys.exc_info()[1].args[0] code = sys.exc_info()[1].args[0]
if code in ignore_errors: if code in ignore_errors:
......
# Copyright (c) 2009-2012 Denis Bilenko. See LICENSE for details. # Copyright (c) 2009-2012 Denis Bilenko. See LICENSE for details.
from __future__ import absolute_import
import sys import sys
import os import os
import traceback import traceback
...@@ -19,7 +20,6 @@ __all__ = ['getcurrent', ...@@ -19,7 +20,6 @@ __all__ = ['getcurrent',
'kill', 'kill',
'signal', 'signal',
'reinit', 'reinit',
'fork',
'get_hub', 'get_hub',
'Hub', 'Hub',
'Waiter'] 'Waiter']
...@@ -44,7 +44,7 @@ def __import_py_magic_greenlet(): ...@@ -44,7 +44,7 @@ def __import_py_magic_greenlet():
pass pass
try: try:
greenlet = __import__('greenlet').greenlet from greenlet import greenlet
except ImportError: except ImportError:
greenlet = __import_py_magic_greenlet() greenlet = __import_py_magic_greenlet()
if greenlet is None: if greenlet is None:
...@@ -63,17 +63,12 @@ if GreenletExit.__bases__[0] is Exception: ...@@ -63,17 +63,12 @@ if GreenletExit.__bases__[0] is Exception:
GreenletExit.__bases__ = (BaseException, ) GreenletExit.__bases__ = (BaseException, )
if sys.version_info[0] <= 2: if sys.version_info[0] <= 2:
thread = __import__('thread') import thread
else: else:
thread = __import__('_thread') import _thread as thread
threadlocal = thread._local threadlocal = thread._local
_threadlocal = threadlocal() _threadlocal = threadlocal()
_threadlocal.Hub = None _threadlocal.Hub = None
try:
_original_fork = os.fork
except AttributeError:
_original_fork = None
__all__.remove('fork')
get_ident = thread.get_ident get_ident = thread.get_ident
MAIN_THREAD = get_ident() MAIN_THREAD = get_ident()
...@@ -183,15 +178,6 @@ def reinit(): ...@@ -183,15 +178,6 @@ def reinit():
hub.loop.reinit() hub.loop.reinit()
if _original_fork is not None:
def fork():
result = _original_fork()
if not result:
reinit()
return result
def get_hub_class(): def get_hub_class():
"""Return the type of hub to use for the current thread. """Return the type of hub to use for the current thread.
......
...@@ -56,6 +56,7 @@ Monkey patches: ...@@ -56,6 +56,7 @@ Monkey patches:
- thread-local storage becomes greenlet-local storage - thread-local storage becomes greenlet-local storage
""" """
from __future__ import absolute_import
import sys import sys
__all__ = ['patch_all', __all__ = ['patch_all',
...@@ -124,12 +125,7 @@ def patch_module(name, items=None): ...@@ -124,12 +125,7 @@ def patch_module(name, items=None):
def patch_os(): def patch_os():
"""Replace :func:`os.fork` with :func:`gevent.fork`. Does nothing if fork is not available.""" """Replace :func:`os.fork` with :func:`gevent.fork`. Does nothing if fork is not available."""
try: patch_module('os')
from gevent.hub import fork
except ImportError:
return
import os
patch_item(os, 'fork', fork)
def patch_time(): def patch_time():
......
# Wrapper module for stdlib os. Written by Geert Jansen.
"""
This module provides cooperative versions of os.read() and os.write().
On Posix platforms this uses non-blocking IO, on Windows a threadpool
is used.
"""
from __future__ import absolute_import
import os
import sys
from gevent.hub import get_hub, reinit
from gevent.socket import EBADF, EAGAIN
try:
import fcntl
except ImportError:
fcntl = None
__implements__ = ['read', 'write', 'fork']
__all__ = __implements__
os_read = os.read
os_write = os.write
os_fork = os.fork
def _map_errors(func, *args):
"""Map IOError to OSError."""
try:
return func(*args)
except IOError, e:
# IOError is structered like OSError in that it has two args: an error
# number and a error string. So we can just re-raise OSError passing it
# the IOError args. If at some point we want to catch other errors and
# map those to OSError as well, we need to make sure that it follows
# the OSError convention and it gets passed a valid error number and
# error string.
raise OSError(*e.args), None, sys.exc_info()[2]
def posix_read(fd, n):
"""Read up to `n` bytes from file descriptor `fd`. Return a string
containing the bytes read. If end-of-file is reached, an empty string
is returned."""
while True:
flags = _map_errors(fcntl.fcntl, fd, fcntl.F_GETFL, 0)
if not flags & os.O_NONBLOCK:
_map_errors(fcntl.fcntl, fd, fcntl.F_SETFL, flags|os.O_NONBLOCK)
try:
return os_read(fd, n)
except OSError, e:
if e.errno != EAGAIN:
raise
sys.exc_clear()
finally:
# Be sure to restore the fcntl flags before we switch into the hub.
# Sometimes multiple file descriptors share the same fcntl flags
# (e.g. when using ttys/ptys). Those other file descriptors are
# impacted by our change of flags, so we should restore them
# before any other code can possibly run.
if not flags & os.O_NONBLOCK:
_map_errors(fcntl.fcntl, fd, fcntl.F_SETFL, flags)
hub = get_hub()
event = hub.loop.io(fd, 1)
_map_errors(hub.wait, event)
def posix_write(fd, buf):
"""Write bytes from buffer `buf` to file descriptor `fd`. Return the
number of bytes written."""
while True:
flags = _map_errors(fcntl.fcntl, fd, fcntl.F_GETFL, 0)
if not flags & os.O_NONBLOCK:
_map_errors(fcntl.fcntl, fd, fcntl.F_SETFL, flags|os.O_NONBLOCK)
try:
return os_write(fd, buf)
except OSError, e:
if e.errno != EAGAIN:
raise
sys.exc_clear()
finally:
# See note in posix_read().
if not flags & os.O_NONBLOCK:
_map_errors(fcntl.fcntl, fd, fcntl.F_SETFL, flags)
hub = get_hub()
event = hub.loop.io(fd, 2)
_map_errors(hub.wait, event)
def threadpool_read(fd, n):
"""Read up to `n` bytes from file descriptor `fd`. Return a string
containing the bytes read. If end-of-file is reached, an empty string
is returned."""
threadpool = get_hub().threadpool
return _map_errors(threadpool.apply, os_read, (fd, n))
def threadpool_write(fd, buf):
"""Write bytes from buffer `buf` to file descriptor `fd`. Return the
number of bytes written."""
threadpool = get_hub().threadpool
return _map_errors(threadpool.apply, os_write, (fd, buf))
if fcntl is None:
read = threadpool_read
write = threadpool_write
else:
read = posix_read
write = posix_write
if hasattr(os, 'fork'):
def fork():
result = os_fork()
if not result:
_map_errors(reinit)
return result
else:
__all__.remove('fork')
...@@ -20,14 +20,15 @@ means repeatedly calling :meth:`get <Queue.get>` until :meth:`get <Queue.get>` r ...@@ -20,14 +20,15 @@ means repeatedly calling :meth:`get <Queue.get>` until :meth:`get <Queue.get>` r
2 2
""" """
from __future__ import absolute_import
import sys import sys
import heapq import heapq
import collections import collections
if sys.version_info[0] == 2: if sys.version_info[0] == 2:
__queue__ = __import__('Queue') import Queue as __queue__
else: else:
__queue__ = __import__('queue') import queue as __queue__
Full = __queue__.Full Full = __queue__.Full
Empty = __queue__.Empty Empty = __queue__.Empty
......
# Copyright (c) 2011 Denis Bilenko. See LICENSE for details. # Copyright (c) 2011 Denis Bilenko. See LICENSE for details.
from __future__ import absolute_import
import os import os
import sys import sys
from _socket import getservbyname, getaddrinfo, gaierror, error from _socket import getservbyname, getaddrinfo, gaierror, error
......
# Copyright (c) 2009-2011 Denis Bilenko. See LICENSE for details. # Copyright (c) 2009-2011 Denis Bilenko. See LICENSE for details.
from __future__ import absolute_import
import sys import sys
from gevent.timeout import Timeout from gevent.timeout import Timeout
from gevent.event import Event from gevent.event import Event
...@@ -7,7 +8,7 @@ from gevent.hub import get_hub ...@@ -7,7 +8,7 @@ from gevent.hub import get_hub
__implements__ = ['select'] __implements__ = ['select']
__all__ = ['error'] + __implements__ __all__ = ['error'] + __implements__
__select__ = __import__('select') import select as __select__
error = __select__.error error = __select__.error
......
...@@ -31,6 +31,8 @@ For convenience, exceptions (like :class:`error <socket.error>` and :class:`time ...@@ -31,6 +31,8 @@ For convenience, exceptions (like :class:`error <socket.error>` and :class:`time
as well as the constants from :mod:`socket` module are imported into this module. as well as the constants from :mod:`socket` module are imported into this module.
""" """
from __future__ import absolute_import
# standard functions and classes that this module re-implements in a gevent-aware way: # standard functions and classes that this module re-implements in a gevent-aware way:
__implements__ = ['create_connection', __implements__ = ['create_connection',
'socket', 'socket',
...@@ -111,7 +113,7 @@ except ImportError: ...@@ -111,7 +113,7 @@ except ImportError:
import _socket import _socket
_realsocket = _socket.socket _realsocket = _socket.socket
__socket__ = __import__('socket') import socket as __socket__
_fileobject = __socket__._fileobject _fileobject = __socket__._fileobject
for name in __imports__[:]: for name in __imports__[:]:
......
...@@ -11,7 +11,8 @@ it requires `ssl package`_ to be installed. ...@@ -11,7 +11,8 @@ it requires `ssl package`_ to be installed.
.. _`ssl package`: http://pypi.python.org/pypi/ssl .. _`ssl package`: http://pypi.python.org/pypi/ssl
""" """
__ssl__ = __import__('ssl') from __future__ import absolute_import
import ssl as __ssl__
try: try:
_ssl = __ssl__._ssl _ssl = __ssl__._ssl
......
from __future__ import absolute_import
import sys import sys
import os import os
import errno import errno
...@@ -10,7 +11,7 @@ from gevent.hub import get_hub ...@@ -10,7 +11,7 @@ from gevent.hub import get_hub
from gevent.fileobject import FileObject from gevent.fileobject import FileObject
from gevent.greenlet import Greenlet, joinall from gevent.greenlet import Greenlet, joinall
spawn = Greenlet.spawn spawn = Greenlet.spawn
__subprocess__ = __import__('subprocess') import subprocess as __subprocess__
# Standard functions and classes that this module re-implements in a gevent-aware way. # Standard functions and classes that this module re-implements in a gevent-aware way.
......
...@@ -6,6 +6,7 @@ ...@@ -6,6 +6,7 @@
used directly. For spawning greenlets in your applications, prefer used directly. For spawning greenlets in your applications, prefer
:class:`Greenlet` class. :class:`Greenlet` class.
""" """
from __future__ import absolute_import
import sys import sys
__implements__ = ['allocate_lock', __implements__ = ['allocate_lock',
...@@ -18,10 +19,9 @@ __implements__ = ['allocate_lock', ...@@ -18,10 +19,9 @@ __implements__ = ['allocate_lock',
__imports__ = ['error'] __imports__ = ['error']
if sys.version_info[0] <= 2: if sys.version_info[0] <= 2:
__target__ = 'thread' import thread as __thread__
else: else:
__target__ = '_thread' import _thread as __thread__
__thread__ = __import__(__target__)
error = __thread__.error error = __thread__.error
from gevent.hub import getcurrent, GreenletExit from gevent.hub import getcurrent, GreenletExit
from gevent.greenlet import Greenlet from gevent.greenlet import Greenlet
......
# Copyright (c) 2012 Denis Bilenko. See LICENSE for details. # Copyright (c) 2012 Denis Bilenko. See LICENSE for details.
from __future__ import with_statement from __future__ import with_statement, absolute_import
import sys import sys
import os import os
from gevent.hub import get_hub, getcurrent, sleep, integer_types from gevent.hub import get_hub, getcurrent, sleep, integer_types
......
...@@ -23,6 +23,7 @@ ...@@ -23,6 +23,7 @@
The code is taken from twisted.python.win32 module. The code is taken from twisted.python.win32 module.
""" """
from __future__ import absolute_import
import os import os
......
...@@ -10,7 +10,8 @@ MAPPING = {'gevent.local': '_threading_local', ...@@ -10,7 +10,8 @@ MAPPING = {'gevent.local': '_threading_local',
'gevent.select': 'select', 'gevent.select': 'select',
'gevent.ssl': 'ssl', 'gevent.ssl': 'ssl',
'gevent.thread': 'thread', 'gevent.thread': 'thread',
'gevent.subprocess': 'subprocess'} 'gevent.subprocess': 'subprocess',
'gevent.os': 'os'}
class ANY(object): class ANY(object):
...@@ -22,7 +23,8 @@ ANY = ANY() ...@@ -22,7 +23,8 @@ ANY = ANY()
NOT_IMPLEMENTED = { NOT_IMPLEMENTED = {
'socket': ['CAPI'], 'socket': ['CAPI'],
'thread': ['allocate', 'exit_thread', 'interrupt_main', 'start_new'], 'thread': ['allocate', 'exit_thread', 'interrupt_main', 'start_new'],
'select': ANY} 'select': ANY,
'os': ANY}
COULD_BE_MISSING = { COULD_BE_MISSING = {
'socket': ['create_connection', 'RAND_add', 'RAND_egd', 'RAND_status']} 'socket': ['create_connection', 'RAND_add', 'RAND_egd', 'RAND_status']}
......
from gevent import monkey; monkey.patch_all()
import os
from select import PIPE_BUF
from greentest import TestCase, main
from gevent import Greenlet, joinall
from gevent.socket import EAGAIN
from errno import EINTR
try:
import fcntl
except ImportError:
fcntl = None
class TestOS(TestCase):
__timeout__ = 5
def test_if_pipe_blocks(self):
r, w = os.pipe()
# set nbytes such that for sure it is > maximum pipe buffer
nbytes = 1000000
block = 'x' * 4096
buf = buffer(block)
# Lack of "nonlocal" keyword in Python 2.x:
bytesread = [0]
byteswritten = [0]
def produce():
while byteswritten[0] != nbytes:
bytesleft = nbytes - byteswritten[0]
try:
byteswritten[0] += os.write(w, buf[:min(bytesleft, 4096)])
except OSError:
code = sys.exc_info()[1].args[0]
assert code != EAGAIN
if code == EINTR:
continue
raise
def consume():
while bytesread[0] != nbytes:
bytesleft = nbytes - bytesread[0]
try:
bytesread[0] += len(os.read(r, min(bytesleft, 4096)))
except OSError:
code = sys.exc_info()[1].args[0]
assert code != EAGAIN
if code == EINTR:
continue
raise
producer = Greenlet(produce)
producer.start()
consumer = Greenlet(consume)
consumer.start_later(1)
# If patching was not succesful, the producer will have filled
# the pipe before the consumer starts, and would block the entire
# process. Therefore the next line would never finish.
joinall([producer, consumer])
assert bytesread[0] == nbytes
assert bytesread[0] == byteswritten[0]
def test_fd_flags_restored(self):
if fcntl is None:
return
r, w = os.pipe()
flags = fcntl.fcntl(r, fcntl.F_GETFL, 0)
assert not flags & os.O_NONBLOCK
flags = fcntl.fcntl(w, fcntl.F_GETFL, 0)
assert not flags & os.O_NONBLOCK
os.write(w, 'foo')
buf = os.read(r, 3)
assert buf == 'foo'
flags = fcntl.fcntl(r, fcntl.F_GETFL, 0)
assert not flags & os.O_NONBLOCK
flags = fcntl.fcntl(w, fcntl.F_GETFL, 0)
assert not flags & os.O_NONBLOCK
if __name__ == '__main__':
main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment