Commit 75e0d3ca authored by Jason Madden's avatar Jason Madden Committed by GitHub

Merge pull request #843 from gevent/fo-consistency

Unify lots of implementation details between FileObjectPosix/Thread, including making *bufsize* of 0 be respected.
parents 12cfd26d d080e4f8
......@@ -83,6 +83,8 @@ Stdlib Compatibility
with a timeout.
- ``FileObjectPosix`` exposes the ``read1`` method when in read mode,
and generally only exposes methods appropriate to the mode it is in.
- ``FileObjectPosix`` supports a *bufsize* of 0 in binary write modes.
Reported in :issue:`840` by Mike Lang.
Other Changes
-------------
......@@ -124,6 +126,11 @@ Other Changes
- If ``sys.stderr`` has been monkey-patched (not recommended),
exceptions that the hub reports aren't lost and can still be caught.
Reported in :issue:`825` by Jelle Smet.
- The various ``FileObject`` implementations are more consistent with
each other.
.. note:: Writing to the *io* property of a FileObject should be
considered deprecated after it is constructed.
1.1.2 (Jul 21, 2016)
====================
......
......@@ -7,6 +7,9 @@ coverage>=4.0
coveralls>=1.0
cffi
futures
# Makes tests faster, but causes issues on the old
# linux version Travis CI uses. We have a workaround.
psutil
# For viewing README.rst (restview --long-description),
# CONTRIBUTING.rst, etc.
# https://github.com/mgedmin/restview
......
......@@ -4,6 +4,115 @@ try:
except ImportError:
EBADF = 9
from io import TextIOWrapper
cancel_wait_ex = IOError(EBADF, 'File descriptor was closed in another greenlet')
FileObjectClosed = IOError(EBADF, 'Bad file descriptor (FileObject was closed)')
class FileObjectBase(object):
"""
Internal base class to ensure a level of consistency
between FileObjectPosix and FileObjectThread
"""
# List of methods we delegate to the wrapping IO object, if they
# implement them and we do not.
_delegate_methods = (
# General methods
'flush',
'fileno',
'writable',
'readable',
'seek',
'seekable',
'tell',
# Read
'read',
'readline',
'readlines',
'read1',
# Write
'write',
'writelines',
'truncate',
)
# Whether we are translating universal newlines or not.
_translate = False
def __init__(self, io, closefd):
"""
:param io: An io.IOBase-like object.
"""
self._io = io
# We don't actually use this property ourself, but we save it (and
# pass it along) for compatibility.
self._close = closefd
if self._translate:
# This automatically handles delegation.
self.translate_newlines(None)
else:
self._do_delegate_methods()
io = property(lambda s: s._io,
# Historically we either hand-wrote all the delegation methods
# to use self.io, or we simply used __getattr__ to look them up at
# runtime. This meant people could change the io attribute on the fly
# and it would mostly work (subprocess.py used to do that). We don't recommend
# that, but we still support it.
lambda s, nv: setattr(s, '_io', nv) or s._do_delegate_methods())
def _do_delegate_methods(self):
for meth_name in self._delegate_methods:
meth = getattr(self._io, meth_name, None)
implemented_by_class = hasattr(type(self), meth_name)
if meth and not implemented_by_class:
setattr(self, meth_name, self._wrap_method(meth))
elif hasattr(self, meth_name) and not implemented_by_class:
delattr(self, meth_name)
def _wrap_method(self, method):
"""
Wrap a method we're copying into our dictionary from the underlying
io object to do something special or different, if necessary.
"""
return method
def translate_newlines(self, mode, *text_args, **text_kwargs):
wrapper = TextIOWrapper(self._io, *text_args, **text_kwargs)
if mode:
wrapper.mode = mode
self.io = wrapper
self._translate = True
@property
def closed(self):
"""True if the file is closed"""
return self._io is None
def close(self):
if self._io is None:
return
io = self._io
self._io = None
self._do_close(io, self._close)
def _do_close(self, io, closefd):
raise NotImplementedError()
def __getattr__(self, name):
if self._io is None:
raise FileObjectClosed()
return getattr(self._io, name)
def __repr__(self):
return '<%s _fobj=%r%s>' % (self.__class__.__name__, self.io, self._extra_repr())
def _extra_repr(self):
return ''
......@@ -6,10 +6,10 @@ from io import BufferedWriter
from io import BytesIO
from io import DEFAULT_BUFFER_SIZE
from io import RawIOBase
from io import TextIOWrapper
from io import UnsupportedOperation
from gevent._fileobjectcommon import cancel_wait_ex
from gevent._fileobjectcommon import FileObjectBase
from gevent.hub import get_hub
from gevent.os import _read
from gevent.os import _write
......@@ -136,8 +136,14 @@ class GreenFileDescriptorIO(RawIOBase):
def seek(self, offset, whence=0):
return os.lseek(self._fileno, offset, whence)
class FlushingBufferedWriter(BufferedWriter):
class FileObjectPosix(object):
def write(self, b):
ret = BufferedWriter.write(self, b)
self.flush()
return ret
class FileObjectPosix(FileObjectBase):
"""
A file-like object that operates on non-blocking files but
provides a synchronous, cooperative interface.
......@@ -183,30 +189,6 @@ class FileObjectPosix(object):
#: platform specific default for the *bufsize* parameter
default_bufsize = io.DEFAULT_BUFFER_SIZE
# List of methods we delegate to the wrapping IO object, if they
# implement them.
_delegate_methods = (
# General methods
'flush',
'fileno',
'writable',
'readable',
'seek',
'seekable',
'tell',
# Read
'read',
'readline',
'readlines',
'read1',
# Write
'write',
'writelines',
'truncate',
)
def __init__(self, fobj, mode='rb', bufsize=-1, close=True):
"""
:keyword fobj: Either an integer fileno, or an object supporting the
......@@ -216,10 +198,20 @@ class FileObjectPosix(object):
(where the "b" or "U" can be omitted).
If "U" is part of the mode, IO will be done on text, otherwise bytes.
:keyword int bufsize: If given, the size of the buffer to use. The default
value means to use a platform-specific default, and a value of 0 is translated
to a value of 1. Other values are interpreted as for the :mod:`io` package.
value means to use a platform-specific default
Other values are interpreted as for the :mod:`io` package.
Buffering is ignored in text mode.
.. versionchanged:: 1.2a1
A bufsize of 0 in write mode is no longer forced to be 1.
Instead, the underlying buffer is flushed after every write
operation to simulate a bufsize of 0. In gevent 1.0, a
bufsize of 0 was flushed when a newline was written, while
in gevent 1.1 it was flushed when more than one byte was
written. Note that this may have performance impacts.
"""
if isinstance(fobj, int):
fileno = fobj
fobj = None
......@@ -246,11 +238,10 @@ class FileObjectPosix(object):
raise ValueError('mode can only be [rb, rU, wb], not %r' % (orig_mode,))
self._fobj = fobj
self._closed = False
self._close = close
self.fileio = GreenFileDescriptorIO(fileno, mode, closefd=close)
self._orig_bufsize = bufsize
if bufsize < 0 or bufsize == 1:
bufsize = self.default_bufsize
elif bufsize == 0:
......@@ -261,49 +252,28 @@ class FileObjectPosix(object):
else:
assert mode == 'w'
IOFamily = BufferedWriter
if self._orig_bufsize == 0:
# We could also simply pass self.fileio as *io*, but this way
# we at least consistently expose a BufferedWriter in our *io*
# attribute.
IOFamily = FlushingBufferedWriter
self._io = IOFamily(self.fileio, bufsize)
io = IOFamily(self.fileio, bufsize)
#else: # QQQ: not used, not reachable
#
# self.io = BufferedRandom(self.fileio, bufsize)
if self._translate:
self._io = TextIOWrapper(self._io)
self.__delegate_methods()
def __delegate_methods(self):
for meth_name in self._delegate_methods:
meth = getattr(self._io, meth_name, None)
if meth:
setattr(self, meth_name, meth)
elif hasattr(self, meth_name):
delattr(self, meth_name)
super(FileObjectPosix, self).__init__(io, close)
io = property(lambda s: s._io,
# subprocess.py likes to swizzle our io object for universal_newlines
lambda s, nv: setattr(s, '_io', nv) or s.__delegate_methods())
@property
def closed(self):
"""True if the file is closed"""
return self._closed
def close(self):
if self._closed:
# make sure close() is only run once when called concurrently
return
self._closed = True
def _do_close(self, io, closefd):
try:
self._io.close()
io.close()
# self.fileio already knows whether or not to close the
# file descriptor
self.fileio.close()
finally:
self._fobj = None
self.fileio = None
def __iter__(self):
return self._io
def __getattr__(self, name):
# XXX: Should this really be _fobj, or self.io?
# _fobj can easily be None but io never is
return getattr(self._fobj, name)
......@@ -34,9 +34,13 @@ Classes
=======
"""
from __future__ import absolute_import
import functools
import sys
import os
from gevent._fileobjectcommon import FileObjectClosed
from gevent._fileobjectcommon import FileObjectBase
from gevent.hub import get_hub
from gevent._compat import integer_types
from gevent._compat import reraise
......@@ -66,16 +70,25 @@ else:
from gevent._fileobjectposix import FileObjectPosix
class FileObjectThread(object):
class FileObjectThread(FileObjectBase):
"""
A file-like object wrapping another file-like object, performing all blocking
operations on that object in a background thread.
.. caution::
Attempting to change the threadpool or lock of an existing FileObjectThread
has undefined consequences.
.. versionchanged:: 1.1b1
The file object is closed using the threadpool. Note that whether or
not this action is synchronous or asynchronous is not documented.
"""
def __init__(self, fobj, *args, **kwargs):
def __init__(self, fobj, mode=None, bufsize=-1, close=True, threadpool=None, lock=True):
"""
:param fobj: The underlying file-like object to wrap, or an integer fileno
that will be pass to :func:`os.fdopen` along with everything in *args*.
that will be pass to :func:`os.fdopen` along with *mode* and *bufsize*.
:keyword bool lock: If True (the default) then all operations will
be performed one-by-one. Note that this does not guarantee that, if using
this file object from multiple threads/greenlets, operations will be performed
......@@ -85,11 +98,9 @@ class FileObjectThread(object):
:keyword bool close: If True (the default) then when this object is closed,
the underlying object is closed as well.
"""
self._close = kwargs.pop('close', True)
self.threadpool = kwargs.pop('threadpool', None)
self.lock = kwargs.pop('lock', True)
if kwargs:
raise TypeError('Unexpected arguments: %r' % kwargs.keys())
closefd = close
self.threadpool = threadpool or get_hub().threadpool
self.lock = lock
if self.lock is True:
self.lock = Semaphore()
elif not self.lock:
......@@ -97,39 +108,32 @@ class FileObjectThread(object):
if not hasattr(self.lock, '__enter__'):
raise TypeError('Expected a Semaphore or boolean, got %r' % type(self.lock))
if isinstance(fobj, integer_types):
if not self._close:
if not closefd:
# we cannot do this, since fdopen object will close the descriptor
raise TypeError('FileObjectThread does not support close=False')
fobj = os.fdopen(fobj, *args)
self.io = fobj
if self.threadpool is None:
self.threadpool = get_hub().threadpool
def _apply(self, func, args=None, kwargs=None):
with self.lock:
return self.threadpool.apply(func, args, kwargs)
def close(self):
"""
.. versionchanged:: 1.1b1
The file object is closed using the threadpool. Note that whether or
not this action is synchronous or asynchronous is not documented.
"""
fobj = self.io
if fobj is None:
return
self.io = None
raise TypeError('FileObjectThread does not support close=False on an fd.')
if mode is None:
assert bufsize == -1, "If you use the default mode, you can't choose a bufsize"
fobj = os.fdopen(fobj)
else:
fobj = os.fdopen(fobj, mode, bufsize)
self.__io_holder = [fobj] # signal for _wrap_method
super(FileObjectThread, self).__init__(fobj, closefd)
def _do_close(self, fobj, closefd):
self.__io_holder[0] = None # for _wrap_method
try:
self.flush(_fobj=fobj)
with self.lock:
self.threadpool.apply(fobj.flush)
finally:
if self._close:
# Note that we're not using self._apply; older code
if closefd:
# Note that we're not taking the lock; older code
# did fobj.close() without going through the threadpool at all,
# so acquiring the lock could potentially introduce deadlocks
# that weren't present before. Avoiding the lock doesn't make
# the existing race condition any worse.
# We wrap the close in an exception handler and re-raise directly
# to avoid the (common, expected) IOError from being logged
# to avoid the (common, expected) IOError from being logged by the pool
def close():
try:
fobj.close()
......@@ -139,24 +143,14 @@ class FileObjectThread(object):
if exc_info:
reraise(*exc_info)
def flush(self, _fobj=None):
if _fobj is not None:
fobj = _fobj
else:
fobj = self.io
if fobj is None:
raise FileObjectClosed
return self._apply(fobj.flush)
def __repr__(self):
return '<%s _fobj=%r threadpool=%r>' % (self.__class__.__name__, self.io, self.threadpool)
def __getattr__(self, item):
if self.io is None:
if item == 'closed':
return True
raise FileObjectClosed
return getattr(self.io, item)
def _do_delegate_methods(self):
super(FileObjectThread, self)._do_delegate_methods()
if not hasattr(self, 'read1') and 'r' in getattr(self._io, 'mode', ''):
self.read1 = self.read
self.__io_holder[0] = self._io
def _extra_repr(self):
return ' threadpool=%r' % (self.threadpool,)
def __iter__(self):
return self
......@@ -168,21 +162,24 @@ class FileObjectThread(object):
raise StopIteration
__next__ = next
def _wrap_method(self, method):
# NOTE: We are careful to avoid introducing a refcycle
# within self. Our wrapper cannot refer to self.
io_holder = self.__io_holder
lock = self.lock
threadpool = self.threadpool
def _wraps(method):
def x(self, *args, **kwargs):
fobj = self.io
if fobj is None:
raise FileObjectClosed
return self._apply(getattr(fobj, method), args, kwargs)
x.__name__ = method
return x
@functools.wraps(method)
def thread_method(*args, **kwargs):
if io_holder[0] is None:
# This is different than FileObjectPosix, etc,
# because we want to save the expensive trip through
# the threadpool.
raise FileObjectClosed()
with lock:
return threadpool.apply(method, args, kwargs)
_method = None
for _method in ('read', 'readinto', 'readline', 'readlines', 'write', 'writelines', 'xreadlines'):
setattr(FileObjectThread, _method, _wraps(_method))
del _method
del _wraps
return thread_method
try:
......@@ -191,28 +188,18 @@ except NameError:
FileObject = FileObjectThread
class FileObjectBlock(object):
class FileObjectBlock(FileObjectBase):
def __init__(self, fobj, *args, **kwargs):
self._close = kwargs.pop('close', True)
closefd = kwargs.pop('close', True)
if kwargs:
raise TypeError('Unexpected arguments: %r' % kwargs.keys())
if isinstance(fobj, integer_types):
if not self._close:
if not closefd:
# we cannot do this, since fdopen object will close the descriptor
raise TypeError('FileObjectBlock does not support close=False')
raise TypeError('FileObjectBlock does not support close=False on an fd.')
fobj = os.fdopen(fobj, *args)
self.io = fobj
def __repr__(self):
return '<%s %r>' % (self.__class__.__name__, self.io, )
def __getattr__(self, item):
assert item != '_fobj'
if self.io is None:
raise FileObjectClosed
return getattr(self.io, item)
super(FileObjectBlock, self).__init__(fobj, closefd)
config = os.environ.get('GEVENT_FILE')
if config:
......
......@@ -468,9 +468,9 @@ class Popen(object):
# Under Python 3, if we left on the 'b' we'd get different results
# depending on whether we used FileObjectPosix or FileObjectThread
self.stdin = FileObject(p2cwrite, 'wb', bufsize)
self.stdin._translate = True
self.stdin.io = io.TextIOWrapper(self.stdin.io, write_through=True,
line_buffering=(bufsize == 1))
self.stdin.translate_newlines(None,
write_through=True,
line_buffering=(bufsize == 1))
else:
self.stdin = FileObject(p2cwrite, 'wb', bufsize)
if c2pread is not None:
......@@ -485,9 +485,7 @@ class Popen(object):
# test__subprocess.py that depend on python_universal_newlines,
# but would be inconsistent with the stdlib:
#msvcrt.setmode(self.stdout.fileno(), os.O_TEXT)
self.stdout.io = io.TextIOWrapper(self.stdout.io)
self.stdout.io.mode = 'r'
self.stdout._translate = True
self.stdout.translate_newlines('r')
else:
self.stdout = FileObject(c2pread, 'rU', bufsize)
else:
......@@ -496,8 +494,7 @@ class Popen(object):
if universal_newlines:
if PY3:
self.stderr = FileObject(errread, 'rb', bufsize)
self.stderr.io = io.TextIOWrapper(self.stderr.io)
self.stderr._translate = True
self.stderr.translate_newlines(None)
else:
self.stderr = FileObject(errread, 'rU', bufsize)
else:
......
......@@ -633,11 +633,90 @@ def disabled_gc():
gc.enable()
import re
# Linux/OS X/BSD platforms can implement this by calling out to lsof
def _run_lsof():
import tempfile
pid = os.getpid()
fd, tmpname = tempfile.mkstemp('get_open_files')
os.close(fd)
lsof_command = 'lsof -p %s > %s' % (pid, tmpname)
if os.system(lsof_command):
raise OSError("lsof failed")
with open(tmpname) as fobj:
data = fobj.read().strip()
os.remove(tmpname)
return data
def get_open_files(pipes=False):
data = _run_lsof()
results = {}
for line in data.split('\n'):
line = line.strip()
if not line or line.startswith("COMMAND"):
# Skip header and blank lines
continue
split = re.split(r'\s+', line)
command, pid, user, fd = split[:4]
# Pipes (on OS X, at least) get an fd like "3" while normal files get an fd like "1u"
if fd[:-1].isdigit() or fd.isdigit():
if not pipes and fd[-1].isdigit():
continue
fd = int(fd[:-1]) if not fd[-1].isdigit() else int(fd)
if fd in results:
params = (fd, line, split, results.get(fd), data)
raise AssertionError('error when parsing lsof output: duplicate fd=%r\nline=%r\nsplit=%r\nprevious=%r\ndata:\n%s' % params)
results[fd] = line
if not results:
raise AssertionError('failed to parse lsof:\n%s' % (data, ))
results['data'] = data
return results
def get_number_open_files():
if os.path.exists('/proc/'):
# Linux only
fd_directory = '/proc/%d/fd' % os.getpid()
return len(os.listdir(fd_directory))
else:
try:
return len(get_open_files(pipes=True)) - 1
except (OSError, AssertionError):
return 0
lsof_get_open_files = get_open_files
try:
import psutil
except ImportError:
pass
else:
# If psutil is available (it is cross-platform) use that.
# It is *much* faster than shelling out to lsof each time
# (Running 14 tests takes 3.964s with lsof and 0.046 with psutil)
# However, it still doesn't completely solve the issue on Windows: fds are reported
# as -1 there, so we can't fully check those.
def get_open_files():
results = dict()
process = psutil.Process()
results['data'] = process.open_files() + process.connections('all')
for x in results['data']:
results[x.fd] = x
results['data'] += ['From psutil', process]
return results
def get_number_open_files():
process = psutil.Process()
try:
return process.num_fds()
except AttributeError:
# num_fds is unix only. Is num_handles close enough on Windows?
return 0
if RUNNING_ON_TRAVIS:
# XXX: Note: installing psutil on the travis linux vm caused failures in test__makefile_refs.
get_open_files = lsof_get_open_files
if PYPY:
......
......@@ -73,7 +73,7 @@ class Test(greentest.TestCase):
self._test_del(close=False)
self.fail("Shouldn't be able to create a FileObjectThread with close=False")
except TypeError as e:
self.assertEqual(str(e), 'FileObjectThread does not support close=False')
self.assertEqual(str(e), 'FileObjectThread does not support close=False on an fd.')
def test_newlines(self):
r, w = os.pipe()
......@@ -130,6 +130,20 @@ class Test(greentest.TestCase):
x.close()
y.close()
#if FileObject is not FileObjectThread:
def test_bufsize_0(self):
# Issue #840
r, w = os.pipe()
x = FileObject(r, 'rb', bufsize=0)
y = FileObject(w, 'wb', bufsize=0)
y.write(b'a')
b = x.read(1)
self.assertEqual(b, b'a')
y.writelines([b'2'])
b = x.read(1)
self.assertEqual(b, b'2')
def writer(fobj, line):
for character in line:
fobj.write(character)
......
......@@ -19,51 +19,11 @@ if PY3:
fd_types = (int, long)
WIN = sys.platform.startswith("win")
from greentest import get_open_files
try:
import psutil
except ImportError:
psutil = None
# Linux/OS X/BSD platforms can implement this by calling out to lsof
tmpname = '/tmp/test__makefile_ref.lsof.%s' % pid
lsof_command = 'lsof -p %s > %s' % (pid, tmpname)
def get_open_files():
if os.system(lsof_command):
raise OSError('lsof failed')
with open(tmpname) as fobj:
data = fobj.read().strip()
results = {}
for line in data.split('\n'):
line = line.strip()
if not line:
continue
split = re.split(r'\s+', line)
command, pid, user, fd = split[:4]
if fd[:-1].isdigit() and not fd[-1].isdigit():
fd = int(fd[:-1])
if fd in results:
params = (fd, line, split, results.get(fd), data)
raise AssertionError('error when parsing lsof output: duplicate fd=%r\nline=%r\nsplit=%r\nprevious=%r\ndata:\n%s' % params)
results[fd] = line
if not results:
raise AssertionError('failed to parse lsof:\n%s' % (data, ))
results['data'] = data
return results
else:
# If psutil is available (it is cross-platform) use that.
# It is *much* faster than shelling out to lsof each time
# (Running 14 tests takes 3.964s with lsof and 0.046 with psutil)
# However, it still doesn't completely solve the issue on Windows: fds are reported
# as -1 there, so we can't fully check those.
# XXX: Note: installing psutil on the travis linux vm caused failures.
process = psutil.Process()
def get_open_files():
results = dict()
results['data'] = process.open_files() + process.connections('all')
for x in results['data']:
results[x.fd] = x
return results
class Test(unittest.TestCase):
......
......@@ -60,9 +60,11 @@ class Test(greentest.TestCase):
if PYPY:
gc.collect()
gc.collect()
num_after = greentest.get_number_open_files()
self.assertEqual(num_before, num_after)
def test_communicate(self):
p = subprocess.Popen([sys.executable, "-c",
'import sys,os;'
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment