Commit f3e1eb97 authored by Denis Bilenko's avatar Denis Bilenko

fix #178: do not monkey patch os.read/os.write

- rename os.threadpool_read/write -> os.tp_read/write
- replace os.posix_read/write with os.nb_read/write must be only used on non-blocking file descriptors
- add os.make_nonblocking function to make fd non-blocking
- ignore test_thread.TestForkInThread.test_forkinthread
parent 748c156d
......@@ -17,8 +17,8 @@ try:
except ImportError:
fcntl = None
__implements__ = ['read', 'write', 'fork']
__all__ = __implements__
__implements__ = ['fork']
__extensions__ = ['tp_read', 'tp_write']
_read = os.read
_write = os.write
......@@ -27,100 +27,69 @@ _write = os.write
ignored_errors = [EAGAIN, errno.EINTR]
def _map_errors(func, *args):
"""Map IOError to OSError."""
try:
return func(*args)
except IOError, e:
# IOError is structured like OSError in that it has two args: an error
# number and a error string. So we can just re-raise OSError passing it
# the IOError args. If at some point we want to catch other errors and
# map those to OSError as well, we need to make sure that it follows
# the OSError convention and it gets passed a valid error number and
# error string.
raise OSError(*e.args), None, sys.exc_info()[2]
def posix_read(fd, n):
if fcntl:
__extensions__ += ['make_nonblocking', 'nb_read', 'nb_write']
def make_nonblocking(fd):
flags = fcntl.fcntl(fd, fcntl.F_GETFL, 0)
if not bool(flags & os.O_NONBLOCK):
fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
return True
def nb_read(fd, n):
"""Read up to `n` bytes from file descriptor `fd`. Return a string
containing the bytes read. If end-of-file is reached, an empty string
is returned."""
is returned.
The descriptor must be in non-blocking mode.
"""
hub, event = None, None
while True:
flags = _map_errors(fcntl.fcntl, fd, fcntl.F_GETFL, 0)
blocking_fd = not bool(flags & os.O_NONBLOCK)
if blocking_fd:
_map_errors(fcntl.fcntl, fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
try:
return _read(fd, n)
except OSError, e:
if e.errno not in ignored_errors:
raise
if e.errno == EAGAIN and not blocking_fd:
raise
sys.exc_clear()
finally:
# Be sure to restore the fcntl flags before we switch into the hub.
# Sometimes multiple file descriptors share the same fcntl flags
# (e.g. when using ttys/ptys). Those other file descriptors are
# impacted by our change of flags, so we should restore them
# before any other code can possibly run.
if blocking_fd:
_map_errors(fcntl.fcntl, fd, fcntl.F_SETFL, flags)
if hub is None:
hub = get_hub()
event = hub.loop.io(fd, 1)
hub.wait(event)
def posix_write(fd, buf):
def nb_write(fd, buf):
"""Write bytes from buffer `buf` to file descriptor `fd`. Return the
number of bytes written."""
number of bytes written.
The file descriptor must be in non-blocking mode.
"""
hub, event = None, None
while True:
flags = _map_errors(fcntl.fcntl, fd, fcntl.F_GETFL, 0)
blocking_fd = not bool(flags & os.O_NONBLOCK)
if blocking_fd:
_map_errors(fcntl.fcntl, fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
try:
return _write(fd, buf)
except OSError, e:
if e.errno not in ignored_errors:
raise
if e.errno == EAGAIN and not blocking_fd:
raise
sys.exc_clear()
finally:
# See note in posix_read().
if blocking_fd:
_map_errors(fcntl.fcntl, fd, fcntl.F_SETFL, flags)
if hub is None:
hub = get_hub()
event = hub.loop.io(fd, 2)
hub.wait(event)
def threadpool_read(fd, n):
def tp_read(fd, n):
"""Read up to `n` bytes from file descriptor `fd`. Return a string
containing the bytes read. If end-of-file is reached, an empty string
is returned."""
return get_hub().threadpool.apply(_read, (fd, n))
def threadpool_write(fd, buf):
def tp_write(fd, buf):
"""Write bytes from buffer `buf` to file descriptor `fd`. Return the
number of bytes written."""
return get_hub().threadpool.apply(_write, (fd, buf))
if fcntl is None:
read = threadpool_read
write = threadpool_write
else:
read = posix_read
write = posix_write
if hasattr(os, 'fork'):
_fork = os.fork
......@@ -131,4 +100,7 @@ if hasattr(os, 'fork'):
return result
else:
__all__.remove('fork')
__implements__.remove('fork')
__all__ = __implements__ + __extensions__
......@@ -131,14 +131,8 @@ disabled_tests = \
# this uses cookielib which we don't care about
, 'test_thread.ThreadRunningTests.test__count'
# XXX
# XXX remove these:
#, 'test_threading.ThreadedTests.test_ident_of_no_threading_threads'
#, 'test_threading.ThreadedTests.test_foreign_thread'
# this assert that dummy thread id is in threading._active
# however, we clean that up
, 'test_thread.TestForkInThread.test_forkinthread'
# XXX needs investigating
, 'test_urllib2net.OtherNetworkTests.test_urlwithfrag'
# fails dues to some changes on python.org
......
......@@ -15,7 +15,6 @@ from gevent import socket as gevent_socket
assert socket.create_connection is gevent_socket.create_connection
import os
assert 'built-in' not in repr(os.read), repr(os.read)
assert 'built-in' not in repr(os.write), repr(os.write)
assert 'built-in' not in repr(os.fork), repr(os.fork)
assert monkey.saved
from gevent import monkey; monkey.patch_all()
import os
from os import pipe
from gevent import os
from greentest import TestCase, main
from gevent import Greenlet, joinall
try:
......@@ -14,12 +13,21 @@ except ImportError:
errno = None
class TestOS(TestCase):
class TestOS_tp(TestCase):
__timeout__ = 5
def pipe(self):
return pipe()
def read(self, *args):
return os.tp_read(*args)
def write(self, *args):
return os.tp_write(*args)
def test_if_pipe_blocks(self):
r, w = os.pipe()
r, w = self.pipe()
# set nbytes such that for sure it is > maximum pipe buffer
nbytes = 1000000
block = 'x' * 4096
......@@ -31,12 +39,12 @@ class TestOS(TestCase):
def produce():
while byteswritten[0] != nbytes:
bytesleft = nbytes - byteswritten[0]
byteswritten[0] += os.write(w, buf[:min(bytesleft, 4096)])
byteswritten[0] += self.write(w, buf[:min(bytesleft, 4096)])
def consume():
while bytesread[0] != nbytes:
bytesleft = nbytes - bytesread[0]
bytesread[0] += len(os.read(r, min(bytesleft, 4096)))
bytesread[0] += len(self.read(r, min(bytesleft, 4096)))
producer = Greenlet(produce)
producer.start()
......@@ -49,74 +57,22 @@ class TestOS(TestCase):
assert bytesread[0] == nbytes
assert bytesread[0] == byteswritten[0]
def test_fd_flags_restored(self):
if fcntl is None:
return
r, w = os.pipe()
flags = fcntl.fcntl(r, fcntl.F_GETFL, 0)
assert not flags & os.O_NONBLOCK
flags = fcntl.fcntl(w, fcntl.F_GETFL, 0)
assert not flags & os.O_NONBLOCK
os.write(w, 'foo')
buf = os.read(r, 3)
assert buf == 'foo'
flags = fcntl.fcntl(r, fcntl.F_GETFL, 0)
assert not flags & os.O_NONBLOCK
flags = fcntl.fcntl(w, fcntl.F_GETFL, 0)
assert not flags & os.O_NONBLOCK
def test_o_nonblock_read(self):
if fcntl is None or errno is None:
return
r, w = os.pipe()
rflags = fcntl.fcntl(r, fcntl.F_GETFL, 0)
fcntl.fcntl(r, fcntl.F_SETFL, rflags | os.O_NONBLOCK)
gotEAGAIN = False
try:
os.read(r, 3)
except OSError, e:
if e.errno != errno.EAGAIN:
raise
gotEAGAIN = True
assert gotEAGAIN
os.write(w, "foo")
data = os.read(r, 3)
assert data == "foo"
gotEAGAIN = False
try:
os.read(r, 3)
except OSError, e:
if e.errno != errno.EAGAIN:
raise
gotEAGAIN = True
assert gotEAGAIN
def test_o_nonblock_write(self):
if fcntl is None or errno is None:
return
r, w = os.pipe()
wflags = fcntl.fcntl(r, fcntl.F_GETFL, 0)
fcntl.fcntl(w, fcntl.F_SETFL, wflags | os.O_NONBLOCK)
data = "d" * 1000000
# fill output buffer to force EAGAIN in next os.write() call
os.write(w, data)
gotEAGAIN = False
try:
os.write(w, data)
except OSError, e:
if e.errno != errno.EAGAIN:
raise
gotEAGAIN = True
assert gotEAGAIN
if hasattr(os, 'make_nonblocking'):
class TestOS_nb(TestOS_tp):
def pipe(self):
r, w = pipe()
os.make_nonblocking(r)
os.make_nonblocking(w)
return r, w
def read(self, *args):
return os.nb_read(*args)
def write(self, *args):
return os.nb_write(*args)
if __name__ == '__main__':
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment