Commit 0dc161ef authored by Jason Madden's avatar Jason Madden

Rewrite fileobject core to handle always create FileIO if needed, not just on Py2.

Keeping control this way simplifies and unifies the implementations.

- Fix a concurrency bug in FileObjectPosix that we were hiding with buffering and a RuntimeError.
- Add all the error checking the stdlib does, with a few exceptions that make sense.

See #1441.
parent 83726b6d
...@@ -24,6 +24,10 @@ ...@@ -24,6 +24,10 @@
``r``, for consistency with the other file objects and the standard ``r``, for consistency with the other file objects and the standard
``open`` and ``io.open`` functions. ``open`` and ``io.open`` functions.
- Fix ``FilObjectPosix`` improperly being used from multiple
greenlets. Previously this was hidden by forcing buffering, which
raised ``RuntimeError``.
1.5a2 (2019-10-21) 1.5a2 (2019-10-21)
================== ==================
......
"""
gevent internals.
"""
from __future__ import absolute_import, print_function, division from __future__ import absolute_import, print_function, division
try: try:
...@@ -8,7 +11,7 @@ except ImportError: ...@@ -8,7 +11,7 @@ except ImportError:
import io import io
import functools import functools
import sys import sys
import os
from gevent.hub import _get_hub_noargs as get_hub from gevent.hub import _get_hub_noargs as get_hub
from gevent._compat import PY2 from gevent._compat import PY2
...@@ -23,35 +26,252 @@ class cancel_wait_ex(IOError): ...@@ -23,35 +26,252 @@ class cancel_wait_ex(IOError):
super(cancel_wait_ex, self).__init__( super(cancel_wait_ex, self).__init__(
EBADF, 'File descriptor was closed in another greenlet') EBADF, 'File descriptor was closed in another greenlet')
class FileObjectClosed(IOError): class FileObjectClosed(IOError):
def __init__(self): def __init__(self):
super(FileObjectClosed, self).__init__( super(FileObjectClosed, self).__init__(
EBADF, 'Bad file descriptor (FileObject was closed)') EBADF, 'Bad file descriptor (FileObject was closed)')
class _UniversalNewlineBytesWrapper(io.TextIOWrapper): class UniversalNewlineBytesWrapper(io.TextIOWrapper):
""" """
Uses TextWrapper to decode universal newlines, but returns the Uses TextWrapper to decode universal newlines, but returns the
results as bytes. results as bytes.
This is for Python 2 where the 'rU' mode did that. This is for Python 2 where the 'rU' mode did that.
""" """
mode = None
def __init__(self, fobj): def __init__(self, fobj, line_buffering):
io.TextIOWrapper.__init__(self, fobj, encoding='latin-1', newline=None) # latin-1 has the ability to round-trip arbitrary bytes.
io.TextIOWrapper.__init__(self, fobj, encoding='latin-1',
newline=None,
line_buffering=line_buffering)
def read(self, *args, **kwargs): def read(self, *args, **kwargs):
result = io.TextIOWrapper.read(self, *args, **kwargs) result = io.TextIOWrapper.read(self, *args, **kwargs)
return result.encode('latin-1') return result.encode('latin-1')
def readline(self, *args, **kwargs): def readline(self, limit=-1):
result = io.TextIOWrapper.readline(self, *args, **kwargs) result = io.TextIOWrapper.readline(self, limit)
return result.encode('latin-1') return result.encode('latin-1')
def readlines(self, *args, **kwargs): def __iter__(self):
result = io.TextIOWrapper.readlines(self, *args, **kwargs) # readlines() is implemented in terms of __iter__
return [x.encode('latin-1') for x in result] # and TextIOWrapper.__iter__ checks that readline returns
# a unicode object, which we don't, so we override
return self
def __next__(self):
line = self.readline()
if not line:
raise StopIteration
return line
next = __next__
class FlushingBufferedWriter(io.BufferedWriter):
def write(self, b):
ret = io.BufferedWriter.write(self, b)
self.flush()
return ret
class OpenDescriptor(object): # pylint:disable=too-many-instance-attributes
"""
Interprets the arguments to `open`.
Originally based on code in the stdlib's _pyio.py (Python implementation of
the :mod:`io` module), but modified for gevent:
- Native strings are returned on Python 2 when neither
'b' nor 't' are in the mode string and no encoding is specified.
- Universal newlines work in that mode.
- Allows unbuffered text IO.
"""
@staticmethod
def _collapse_arg(preferred_val, old_val, default):
if preferred_val is not None and old_val is not None:
raise TypeError
if preferred_val is None and old_val is None:
return default
return preferred_val if preferred_val is not None else old_val
def __init__(self, fobj, mode='r', bufsize=None, close=None,
encoding=None, errors=None, newline=None,
buffering=None, closefd=None):
# Based on code in the stdlib's _pyio.py from 3.8.
# pylint:disable=too-many-locals,too-many-branches,too-many-statements
closefd = self._collapse_arg(closefd, close, True)
del close
buffering = self._collapse_arg(buffering, bufsize, -1)
del bufsize
if not hasattr(fobj, 'fileno'):
if not isinstance(fobj, integer_types):
# Not a fd. Support PathLike on Python 2 and Python <= 3.5.
fobj = fspath(fobj)
if not isinstance(fobj, (str, bytes) + integer_types): # pragma: no cover
raise TypeError("invalid file: %r" % fobj)
if isinstance(fobj, (str, bytes)):
closefd = True
if not isinstance(mode, str):
raise TypeError("invalid mode: %r" % mode)
if not isinstance(buffering, integer_types):
raise TypeError("invalid buffering: %r" % buffering)
if encoding is not None and not isinstance(encoding, str):
raise TypeError("invalid encoding: %r" % encoding)
if errors is not None and not isinstance(errors, str):
raise TypeError("invalid errors: %r" % errors)
modes = set(mode)
if modes - set("axrwb+tU") or len(mode) > len(modes):
raise ValueError("invalid mode: %r" % mode)
creating = "x" in modes
reading = "r" in modes
writing = "w" in modes
appending = "a" in modes
updating = "+" in modes
text = "t" in modes
binary = "b" in modes
universal = 'U' in modes
if universal:
if creating or writing or appending or updating:
raise ValueError("mode U cannot be combined with 'x', 'w', 'a', or '+'")
import warnings
warnings.warn("'U' mode is deprecated",
DeprecationWarning, 4)
reading = True
if text and binary:
raise ValueError("can't have text and binary mode at once")
if creating + reading + writing + appending > 1:
raise ValueError("can't have read/write/append mode at once")
if not (creating or reading or writing or appending):
raise ValueError("must have exactly one of read/write/append mode")
if binary and encoding is not None:
raise ValueError("binary mode doesn't take an encoding argument")
if binary and errors is not None:
raise ValueError("binary mode doesn't take an errors argument")
if binary and newline is not None:
raise ValueError("binary mode doesn't take a newline argument")
if binary and buffering == 1:
import warnings
warnings.warn("line buffering (buffering=1) isn't supported in binary "
"mode, the default buffer size will be used",
RuntimeWarning, 4)
self.fobj = fobj
self.fileio_mode = (
(creating and "x" or "")
+ (reading and "r" or "")
+ (writing and "w" or "")
+ (appending and "a" or "")
+ (updating and "+" or "")
)
self.mode = self.fileio_mode + ('t' if text else '') + ('b' if binary else '')
self.creating = creating
self.reading = reading
self.writing = writing
self.appending = appending
self.updating = updating
self.text = text
self.binary = binary
self.native = (
not self.text and not self.binary # Neither t nor b given.
and not encoding and not errors # And no encoding or error handling either.
)
self.universal = universal
self.buffering = buffering
self.encoding = encoding
self.errors = errors
self.newline = newline
self.closefd = closefd
default_buffer_size = io.DEFAULT_BUFFER_SIZE
def is_fd(self):
return isinstance(self.fobj, integer_types)
def open(self):
return self.open_raw_and_wrapped()[1]
def open_raw_and_wrapped(self):
raw = self.open_raw()
try:
return raw, self.wrapped(raw)
except:
raw.close()
raise
def open_raw(self):
if hasattr(self.fobj, 'fileno'):
return self.fobj
return io.FileIO(self.fobj, self.fileio_mode, self.closefd)
def wrapped(self, raw):
"""
Wraps the raw IO object (`RawIOBase`) in buffers, text decoding,
and newline handling.
"""
# pylint:disable=too-many-branches
result = raw
buffering = self.buffering
line_buffering = False
if buffering == 1 or buffering < 0 and raw.isatty():
buffering = -1
line_buffering = True
if buffering < 0:
buffering = self.default_buffer_size
try:
bs = os.fstat(raw.fileno()).st_blksize
except (OSError, AttributeError):
pass
else:
if bs > 1:
buffering = bs
if buffering < 0: # pragma: no cover
raise ValueError("invalid buffering size")
if buffering != 0:
if self.updating:
Buffer = io.BufferedRandom
elif self.creating or self.writing or self.appending:
Buffer = io.BufferedWriter
elif self.reading:
Buffer = io.BufferedReader
else: # prgama: no cover
raise ValueError("unknown mode: %r" % self.mode)
try:
result = Buffer(raw, buffering)
except AttributeError:
# Python 2 file() objects don't have the readable/writable
# attributes. But they handle their own buffering.
result = raw
if self.binary:
return result
if PY2 and self.native:
# Neither text mode nor binary mode specified.
if self.universal:
# universal was requested, e.g., 'rU'
result = UniversalNewlineBytesWrapper(result, line_buffering)
else:
result = io.TextIOWrapper(result, self.encoding, self.errors, self.newline,
line_buffering)
try:
result.mode = self.mode
except (AttributeError, TypeError):
# AttributeError: No such attribute
# TypeError: Readonly attribute (py2)
pass
return result
class FileObjectBase(object): class FileObjectBase(object):
...@@ -86,13 +306,7 @@ class FileObjectBase(object): ...@@ -86,13 +306,7 @@ class FileObjectBase(object):
) )
# Whether we should apply a TextWrapper (the names are historical). _io = None
# Subclasses should set these before calling our constructor.
_translate = False
_translate_mode = None
_translate_encoding = None
_translate_errors = None
_translate_newline = None # None means universal
def __init__(self, fobj, closefd): def __init__(self, fobj, closefd):
""" """
...@@ -103,13 +317,6 @@ class FileObjectBase(object): ...@@ -103,13 +317,6 @@ class FileObjectBase(object):
# pass it along) for compatibility. # pass it along) for compatibility.
self._close = closefd self._close = closefd
if self._translate:
# This automatically handles delegation by assigning to
# self.io
self.translate_newlines(self._translate_mode,
self._translate_encoding,
self._translate_errors)
else:
self._do_delegate_methods() self._do_delegate_methods()
...@@ -137,17 +344,6 @@ class FileObjectBase(object): ...@@ -137,17 +344,6 @@ class FileObjectBase(object):
""" """
return method return method
def translate_newlines(self, mode, *text_args, **text_kwargs):
if mode == 'byte_newlines':
wrapper = _UniversalNewlineBytesWrapper(self._io)
mode = None
else:
wrapper = io.TextIOWrapper(self._io, *text_args, **text_kwargs)
if mode:
wrapper.mode = mode # pylint:disable=attribute-defined-outside-init
self.io = wrapper
self._translate = True
@property @property
def closed(self): def closed(self):
"""True if the file is closed""" """True if the file is closed"""
...@@ -181,78 +377,28 @@ class FileObjectBase(object): ...@@ -181,78 +377,28 @@ class FileObjectBase(object):
def __exit__(self, *args): def __exit__(self, *args):
self.close() self.close()
# Modes that work with native strings on Python 2 def __iter__(self):
_NATIVE_PY2_MODES = ('r', 'r+', 'w', 'w+', 'a', 'a+') return self
if PY2:
@classmethod
def _use_FileIO(cls, mode, encoding, errors):
return mode in cls._NATIVE_PY2_MODES \
and encoding is None and errors is None
else:
@classmethod
def _use_FileIO(cls, mode, encoding, errors): # pylint:disable=unused-argument
return False
@classmethod
def _open_raw(cls, fobj, mode='r', buffering=-1,
encoding=None, errors=None, newline=None, closefd=True):
"""
Uses :func:`io.open` on *fobj* and returns the IO object.
This is a compatibility wrapper for Python 2 and Python 3. It
supports PathLike objects for *fobj* on all versions.
If the object is already an object with a ``fileno`` method, def __next__(self):
it is returned unchanged. line = self.readline()
if not line:
raise StopIteration
return line
On Python 2, if the mode only specifies read, write or append, next = __next__
and encoding and errors are not specified, then
:obj:`io.FileIO` is used to get the IO object. This ensures we
return native strings unless explicitly requested.
.. versionchanged: 1.5 def __bool__(self):
Support closefd for Python 2 native string readers. return True
"""
if hasattr(fobj, 'fileno'):
return fobj
if not isinstance(fobj, integer_types): __nonzero__ = __bool__
# Not an integer. Support PathLike on Python 2 and Python <= 3.5.
fobj = fspath(fobj)
closefd = True
if cls._use_FileIO(mode, encoding, errors):
# Python 2, default open. Return native str type, not unicode, which
# is what would happen with io.open('r'), but we don't want to open the file
# in binary mode since that skips newline conversion.
fobj = io.FileIO(fobj, mode, closefd=closefd)
if '+' in mode:
BufFactory = io.BufferedRandom
elif mode[0] == 'r':
BufFactory = io.BufferedReader
else:
BufFactory = io.BufferedWriter
if buffering == -1:
fobj = BufFactory(fobj)
elif buffering != 0:
fobj = BufFactory(fobj, buffering)
else:
# Python 3, or we have a mode that Python 2's os.fdopen/open can't handle
# (x) or they explicitly asked for binary or text mode.
fobj = io.open(fobj, mode, buffering, encoding, errors, newline, closefd)
return fobj
class FileObjectBlock(FileObjectBase): class FileObjectBlock(FileObjectBase):
def __init__(self, fobj, *args, **kwargs): def __init__(self, fobj, *args, **kwargs):
closefd = kwargs['closefd'] = kwargs.pop('close', True) descriptor = OpenDescriptor(fobj, *args, **kwargs)
if 'bufsize' in kwargs: # compat with other constructors super(FileObjectBlock, self).__init__(descriptor.open(), descriptor.closefd)
kwargs['buffering'] = kwargs.pop('bufsize')
fobj = self._open_raw(fobj, *args, **kwargs)
super(FileObjectBlock, self).__init__(fobj, closefd)
def _do_close(self, fobj, closefd): def _do_close(self, fobj, closefd):
fobj.close() fobj.close()
...@@ -275,24 +421,30 @@ class FileObjectThread(FileObjectBase): ...@@ -275,24 +421,30 @@ class FileObjectThread(FileObjectBase):
Accept str and ``PathLike`` objects for *fobj* on all versions of Python. Accept str and ``PathLike`` objects for *fobj* on all versions of Python.
.. versionchanged:: 1.5 .. versionchanged:: 1.5
Add *encoding*, *errors* and *newline* arguments. Add *encoding*, *errors* and *newline* arguments.
.. versionchanged:: 1.5
Accept *closefd* and *buffering* instead of *close* and *bufsize* arguments.
The latter remain for backwards compatibility.
""" """
def __init__(self, fobj, mode='r', bufsize=-1, close=True, threadpool=None, lock=True,
encoding=None, errors=None, newline=None): def __init__(self, *args, **kwargs):
""" """
:param fobj: The underlying file-like object to wrap, or something :param fobj: The underlying file-like object to wrap, or something
acceptable to :func:`io.open` (along with *mode* and *bufsize*, which is translated acceptable to :func:`io.open` (along with *mode* and *buffering*)
to *buffering*).
:keyword bool lock: If True (the default) then all operations will :keyword bool lock: If True (the default) then all operations will
be performed one-by-one. Note that this does not guarantee that, if using be performed one-by-one. Note that this does not guarantee that, if using
this file object from multiple threads/greenlets, operations will be performed this file object from multiple threads/greenlets, operations will be performed
in any particular order, only that no two operations will be attempted at the in any particular order, only that no two operations will be attempted at the
same time. You can also pass your own :class:`gevent.lock.Semaphore` to synchronize same time. You can also pass your own :class:`gevent.lock.Semaphore` to synchronize
file operations with an external resource. file operations with an external resource.
:keyword bool close: If True (the default) then when this object is closed, :keyword bool closefd: If True (the default) then when this object is closed,
the underlying object is closed as well. the underlying object is closed as well. If *fobj* is a path, then
*closefd* must be True.
""" """
closefd = close lock = kwargs.pop('lock', True)
threadpool = kwargs.pop('threadpool', None)
descriptor = OpenDescriptor(*args, **kwargs)
self.threadpool = threadpool or get_hub().threadpool self.threadpool = threadpool or get_hub().threadpool
self.lock = lock self.lock = lock
if self.lock is True: if self.lock is True:
...@@ -301,18 +453,9 @@ class FileObjectThread(FileObjectBase): ...@@ -301,18 +453,9 @@ class FileObjectThread(FileObjectBase):
self.lock = DummySemaphore() self.lock = DummySemaphore()
if not hasattr(self.lock, '__enter__'): if not hasattr(self.lock, '__enter__'):
raise TypeError('Expected a Semaphore or boolean, got %r' % type(self.lock)) raise TypeError('Expected a Semaphore or boolean, got %r' % type(self.lock))
using_fileio = self._use_FileIO(mode.replace('U', ''), encoding, errors)
universal_newline = 'U' in mode or (not using_fileio and newline is None) self.__io_holder = [descriptor.open()] # signal for _wrap_method
mode = mode.replace('U', '') super(FileObjectThread, self).__init__(self.__io_holder[0], descriptor.closefd)
fobj = self._open_raw(fobj, mode, bufsize,
encoding=encoding, errors=errors, newline=newline,
closefd=close)
if self._use_FileIO(mode, encoding, errors) and universal_newline:
self._translate_mode = 'byte_newlines'
self._translate = True
self.__io_holder = [fobj] # signal for _wrap_method
super(FileObjectThread, self).__init__(fobj, closefd)
def _do_close(self, fobj, closefd): def _do_close(self, fobj, closefd):
self.__io_holder[0] = None # for _wrap_method self.__io_holder[0] = None # for _wrap_method
......
from __future__ import absolute_import from __future__ import absolute_import
from __future__ import print_function
import os import os
import sys import sys
import io
from io import BufferedReader
from io import BufferedWriter
from io import BytesIO from io import BytesIO
from io import DEFAULT_BUFFER_SIZE from io import DEFAULT_BUFFER_SIZE
from io import FileIO
from io import RawIOBase from io import RawIOBase
from io import UnsupportedOperation from io import UnsupportedOperation
from gevent._compat import reraise from gevent._compat import reraise
from gevent._compat import PY2
from gevent._compat import PY3
from gevent._fileobjectcommon import cancel_wait_ex from gevent._fileobjectcommon import cancel_wait_ex
from gevent._fileobjectcommon import FileObjectBase from gevent._fileobjectcommon import FileObjectBase
from gevent._fileobjectcommon import OpenDescriptor
from gevent._hub_primitives import wait_on_watcher
from gevent.hub import get_hub from gevent.hub import get_hub
from gevent.os import _read from gevent.os import _read
from gevent.os import _write from gevent.os import _write
...@@ -22,34 +23,35 @@ from gevent.os import make_nonblocking ...@@ -22,34 +23,35 @@ from gevent.os import make_nonblocking
class GreenFileDescriptorIO(RawIOBase): class GreenFileDescriptorIO(RawIOBase):
# Note that RawIOBase has a __del__ method that calls # Note that RawIOBase has a __del__ method that calls
# self.close(). (In C implementations like CPython, this is # self.close(). (In C implementations like CPython, this is
# the type's tp_dealloc slot; prior to Python 3, the object doesn't # the type's tp_dealloc slot; prior to Python 3, the object doesn't
# appear to have a __del__ method, even though it functionally does) # appear to have a __del__ method, even though it functionally does)
_read_event = None _read_watcher = None
_write_event = None _write_watcher = None
_closed = False _closed = False
_seekable = None _seekable = None
_keep_alive = None # An object that needs to live as long as we do.
def __init__(self, fileno, mode='r', closefd=True): def __init__(self, fileno, mode='r', closefd=True):
RawIOBase.__init__(self) # Python 2: pylint:disable=no-member,non-parent-init-called RawIOBase.__init__(self)
self._closefd = closefd self._closefd = closefd
self._fileno = fileno self._fileno = fileno
self.mode = mode
make_nonblocking(fileno) make_nonblocking(fileno)
readable = 'r' in mode readable = 'r' in mode or '+' in mode
writable = 'w' in mode writable = 'w' in mode or '+' in mode
self.hub = get_hub() self.hub = get_hub()
io_watcher = self.hub.loop.io io_watcher = self.hub.loop.io
try: try:
if readable: if readable:
self._read_event = io_watcher(fileno, 1) self._read_watcher = io_watcher(fileno, 1)
if writable: if writable:
self._write_event = io_watcher(fileno, 2) self._write_watcher = io_watcher(fileno, 2)
except: except:
# If anything goes wrong, it's important to go ahead and # If anything goes wrong, it's important to go ahead and
# close these watchers *now*, especially under libuv, so # close these watchers *now*, especially under libuv, so
...@@ -67,11 +69,18 @@ class GreenFileDescriptorIO(RawIOBase): ...@@ -67,11 +69,18 @@ class GreenFileDescriptorIO(RawIOBase):
self.close() self.close()
raise raise
def isatty(self):
f = FileIO(self._fileno, 'r', False)
try:
return f.isatty()
finally:
f.close()
def readable(self): def readable(self):
return self._read_event is not None return self._read_watcher is not None
def writable(self): def writable(self):
return self._write_event is not None return self._write_watcher is not None
def seekable(self): def seekable(self):
if self._seekable is None: if self._seekable is None:
...@@ -91,10 +100,10 @@ class GreenFileDescriptorIO(RawIOBase): ...@@ -91,10 +100,10 @@ class GreenFileDescriptorIO(RawIOBase):
return self._closed return self._closed
def __destroy_events(self): def __destroy_events(self):
read_event = self._read_event read_event = self._read_watcher
write_event = self._write_event write_event = self._write_watcher
hub = self.hub hub = self.hub
self.hub = self._read_event = self._write_event = None self.hub = self._read_watcher = self._write_watcher = None
if read_event is not None: if read_event is not None:
hub.cancel_wait(read_event, cancel_wait_ex, True) hub.cancel_wait(read_event, cancel_wait_ex, True)
...@@ -110,9 +119,14 @@ class GreenFileDescriptorIO(RawIOBase): ...@@ -110,9 +119,14 @@ class GreenFileDescriptorIO(RawIOBase):
self._closed = True self._closed = True
self.__destroy_events() self.__destroy_events()
fileno = self._fileno fileno = self._fileno
keep_alive = self._keep_alive
self._fileno = self._keep_alive = None
try:
if self._closefd: if self._closefd:
self._fileno = None
os.close(fileno) os.close(fileno)
finally:
if hasattr(keep_alive, 'close'):
keep_alive.close()
# RawIOBase provides a 'read' method that will call readall() if # RawIOBase provides a 'read' method that will call readall() if
# the `size` was missing or -1 and otherwise call readinto(). We # the `size` was missing or -1 and otherwise call readinto(). We
...@@ -122,20 +136,26 @@ class GreenFileDescriptorIO(RawIOBase): ...@@ -122,20 +136,26 @@ class GreenFileDescriptorIO(RawIOBase):
# this was fixed in Python 3.3, but we still need our workaround for 2.7. See # this was fixed in Python 3.3, but we still need our workaround for 2.7. See
# https://github.com/gevent/gevent/issues/675) # https://github.com/gevent/gevent/issues/675)
def __read(self, n): def __read(self, n):
if self._read_event is None: if self._read_watcher is None:
raise UnsupportedOperation('read') raise UnsupportedOperation('read')
while True: while 1:
try: try:
return _read(self._fileno, n) return _read(self._fileno, n)
except (IOError, OSError) as ex: except (IOError, OSError) as ex:
if ex.args[0] not in ignored_errors: if ex.args[0] not in ignored_errors:
raise raise
self.hub.wait(self._read_event) wait_on_watcher(self._read_watcher, None, None, self.hub)
def readall(self): def readall(self):
ret = BytesIO() ret = BytesIO()
while True: while True:
try:
data = self.__read(DEFAULT_BUFFER_SIZE) data = self.__read(DEFAULT_BUFFER_SIZE)
except cancel_wait_ex:
# We were closed while reading. A buffered reader
# just returns what it has handy at that point,
# so we do to.
data = None
if not data: if not data:
break break
ret.write(data) ret.write(data)
...@@ -154,7 +174,7 @@ class GreenFileDescriptorIO(RawIOBase): ...@@ -154,7 +174,7 @@ class GreenFileDescriptorIO(RawIOBase):
return n return n
def write(self, b): def write(self, b):
if self._write_event is None: if self._write_watcher is None:
raise UnsupportedOperation('write') raise UnsupportedOperation('write')
while True: while True:
try: try:
...@@ -162,7 +182,7 @@ class GreenFileDescriptorIO(RawIOBase): ...@@ -162,7 +182,7 @@ class GreenFileDescriptorIO(RawIOBase):
except (IOError, OSError) as ex: except (IOError, OSError) as ex:
if ex.args[0] not in ignored_errors: if ex.args[0] not in ignored_errors:
raise raise
self.hub.wait(self._write_event) wait_on_watcher(self._write_watcher, None, None, self.hub)
def seek(self, offset, whence=0): def seek(self, offset, whence=0):
try: try:
...@@ -176,16 +196,33 @@ class GreenFileDescriptorIO(RawIOBase): ...@@ -176,16 +196,33 @@ class GreenFileDescriptorIO(RawIOBase):
# See https://github.com/gevent/gevent/issues/1323 # See https://github.com/gevent/gevent/issues/1323
reraise(IOError, IOError(*ex.args), sys.exc_info()[2]) reraise(IOError, IOError(*ex.args), sys.exc_info()[2])
def __repr__(self):
return "<%s at 0x%x fileno=%s mode=%r>" % (
type(self).__name__, id(self), self._fileno, self.mode
)
class FlushingBufferedWriter(BufferedWriter):
def write(self, b): class GreenOpenDescriptor(OpenDescriptor):
ret = BufferedWriter.write(self, b)
self.flush() def open_raw(self):
return ret if self.is_fd():
fileio = GreenFileDescriptorIO(self.fobj, self.fileio_mode, closefd=self.closefd)
else:
closefd = False
# Either an existing file object or a path string (which
# we open to get a file object). In either case, the other object
# owns the descriptor and we must not close it.
closefd = False
if hasattr(self.fobj, 'fileno'):
raw = self.fobj
else:
raw = OpenDescriptor.open_raw(self)
fileno = raw.fileno()
fileio = GreenFileDescriptorIO(fileno, self.fileio_mode, closefd=closefd)
fileio._keep_alive = raw
return fileio
_marker = object()
class FileObjectPosix(FileObjectBase): class FileObjectPosix(FileObjectBase):
""" """
...@@ -239,19 +276,28 @@ class FileObjectPosix(FileObjectBase): ...@@ -239,19 +276,28 @@ class FileObjectPosix(FileObjectBase):
.. versionchanged:: 1.2a1 .. versionchanged:: 1.2a1
Document the ``fileio`` attribute for non-blocking reads. Document the ``fileio`` attribute for non-blocking reads.
.. versionchanged:: 1.5 .. versionchanged:: 1.5
The default value for *mode* was changed from ``rb`` to ``r``. The default value for *mode* was changed from ``rb`` to ``r``. This is consistent
with :func:`open`, :func:`io.open`, and :class:`~.FileObjectThread`, which is the
default ``FileObject`` on some platforms.
.. versionchanged:: 1.5 .. versionchanged:: 1.5
Support strings and ``PathLike`` objects for ``fobj`` on all versions Support strings and ``PathLike`` objects for ``fobj`` on all versions
of Python. Note that caution above. of Python. Note that caution above.
.. versionchanged:: 1.5 .. versionchanged:: 1.5
Add *encoding*, *errors* and *newline* argument. Add *encoding*, *errors* and *newline* argument.
.. versionchanged:: 1.5
Accept *closefd* and *buffering* instead of *close* and *bufsize* arguments.
The latter remain for backwards compatibility.
.. versionchanged:: 1.5
Stop forcing buffering. Previously, given a ``buffering=0`` argument,
*buffering8 would be set to 1, and ``buffering=1`` would be forced to
the default buffer size. This was a workaround for a long-standing concurrency
issue. Now the *buffering* argument is interpreted as intended.
""" """
#: platform specific default for the *bufsize* parameter #: platform specific default for the *bufsize* parameter
default_bufsize = io.DEFAULT_BUFFER_SIZE default_bufsize = DEFAULT_BUFFER_SIZE
def __init__(self, fobj, mode='r', bufsize=-1, close=True, def __init__(self, *args, **kwargs):
encoding=None, errors=None, newline=_marker):
# pylint:disable=too-many-locals # pylint:disable=too-many-locals
""" """
:param fobj: Either an integer fileno, or an object supporting the :param fobj: Either an integer fileno, or an object supporting the
...@@ -263,7 +309,7 @@ class FileObjectPosix(FileObjectBase): ...@@ -263,7 +309,7 @@ class FileObjectPosix(FileObjectBase):
if 't' is not in the mode, this will result in returning byte (native) strings; if 't' is not in the mode, this will result in returning byte (native) strings;
putting 't' in the mode will return text (unicode) strings. This may cause putting 't' in the mode will return text (unicode) strings. This may cause
:exc:`UnicodeDecodeError` to be raised. :exc:`UnicodeDecodeError` to be raised.
:keyword int bufsize: If given, the size of the buffer to use. The default :keyword int buffering: If given, the size of the buffer to use. The default
value means to use a platform-specific default value means to use a platform-specific default
Other values are interpreted as for the :mod:`io` package. Other values are interpreted as for the :mod:`io` package.
Buffering is ignored in text mode. Buffering is ignored in text mode.
...@@ -282,72 +328,11 @@ class FileObjectPosix(FileObjectBase): ...@@ -282,72 +328,11 @@ class FileObjectPosix(FileObjectBase):
in gevent 1.1 it was flushed when more than one byte was in gevent 1.1 it was flushed when more than one byte was
written. Note that this may have performance impacts. written. Note that this may have performance impacts.
""" """
descriptor = GreenOpenDescriptor(*args, **kwargs)
if isinstance(fobj, int):
fileno = fobj
fobj = None
else:
# The underlying object, if we have to open it, should be
# in binary mode. We'll take care of any coding needed.
raw_mode = mode.replace('t', 'b')
raw_mode = raw_mode + 'b' if 'b' not in raw_mode else raw_mode
new_fobj = self._open_raw(fobj, raw_mode, bufsize, closefd=False)
if new_fobj is not fobj:
close = True
fobj = new_fobj
fileno = fobj.fileno()
self.__fobj = fobj
assert isinstance(fileno, int)
# We want text mode if:
# - We're on Python 3, and no 'b' is in the mode.
# - A 't' is in the mode on any version.
# - We're on Python 2 and no 'b' is in the mode, and an encoding or errors is
# given.
text_mode = (
't' in mode
or (PY3 and 'b' not in mode)
or (PY2 and 'b' not in mode and (encoding, errors) != (None, None))
)
if text_mode:
self._translate = True
self._translate_newline = os.linesep if newline is _marker else newline
self._translate_encoding = encoding
self._transalate_errors = errors
if 'U' in mode:
self._translate = True
self._translate_newline = None
if PY2 and not text_mode:
self._translate_mode = 'byte_newlines'
self._orig_bufsize = bufsize
if bufsize < 0 or bufsize == 1:
bufsize = self.default_bufsize
elif bufsize == 0:
bufsize = 1
if mode[0] == 'r':
IOFamily = BufferedReader
else:
assert mode[0] == 'w'
IOFamily = BufferedWriter
if self._orig_bufsize == 0:
# We could also simply pass self.fileio as *io*, but this way
# we at least consistently expose a BufferedWriter in our *io*
# attribute.
IOFamily = FlushingBufferedWriter
# This attribute is documented as available for non-blocking reads. # This attribute is documented as available for non-blocking reads.
self.fileio = GreenFileDescriptorIO(fileno, mode, closefd=close) self.fileio, buffered_fobj = descriptor.open_raw_and_wrapped()
super(FileObjectPosix, self).__init__(buffered_fobj, descriptor.closefd)
buffered_fobj = IOFamily(self.fileio, bufsize)
super(FileObjectPosix, self).__init__(buffered_fobj, close)
def _do_close(self, fobj, closefd): def _do_close(self, fobj, closefd):
try: try:
...@@ -355,14 +340,5 @@ class FileObjectPosix(FileObjectBase): ...@@ -355,14 +340,5 @@ class FileObjectPosix(FileObjectBase):
# self.fileio already knows whether or not to close the # self.fileio already knows whether or not to close the
# file descriptor # file descriptor
self.fileio.close() self.fileio.close()
if closefd and self.__fobj is not None:
try:
self.__fobj.close()
except IOError:
pass
finally: finally:
self.__fobj = None
self.fileio = None self.fileio = None
def __iter__(self):
return self._io
...@@ -37,7 +37,9 @@ import os ...@@ -37,7 +37,9 @@ import os
import signal import signal
import sys import sys
import traceback import traceback
from gevent.event import AsyncResult from gevent.event import AsyncResult
from gevent.exceptions import ConcurrentObjectUseError
from gevent.hub import _get_hub_noargs as get_hub from gevent.hub import _get_hub_noargs as get_hub
from gevent.hub import linkproxy from gevent.hub import linkproxy
from gevent.hub import sleep from gevent.hub import sleep
...@@ -428,7 +430,7 @@ else: ...@@ -428,7 +430,7 @@ else:
_set_inheritable = lambda i, v: True _set_inheritable = lambda i, v: True
def FileObject(*args): def FileObject(*args, **kwargs):
# Defer importing FileObject until we need it # Defer importing FileObject until we need it
# to allow it to be configured more easily. # to allow it to be configured more easily.
from gevent.fileobject import FileObject as _FileObject from gevent.fileobject import FileObject as _FileObject
...@@ -611,26 +613,20 @@ class Popen(object): ...@@ -611,26 +613,20 @@ class Popen(object):
if PY3 and text_mode: if PY3 and text_mode:
# Under Python 3, if we left on the 'b' we'd get different results # Under Python 3, if we left on the 'b' we'd get different results
# depending on whether we used FileObjectPosix or FileObjectThread # depending on whether we used FileObjectPosix or FileObjectThread
self.stdin = FileObject(p2cwrite, 'wb', bufsize) self.stdin = FileObject(p2cwrite, 'w', bufsize,
self.stdin.translate_newlines(None,
write_through=True,
line_buffering=(bufsize == 1),
encoding=self.encoding, errors=self.errors) encoding=self.encoding, errors=self.errors)
else: else:
self.stdin = FileObject(p2cwrite, 'wb', bufsize) self.stdin = FileObject(p2cwrite, 'wb', bufsize)
if c2pread != -1: if c2pread != -1:
if universal_newlines or text_mode: if universal_newlines or text_mode:
if PY3: if PY3:
# FileObjectThread doesn't support the 'U' qualifier self.stdout = FileObject(c2pread, 'r', bufsize,
# with a bufsize of 0 encoding=self.encoding, errors=self.errors)
self.stdout = FileObject(c2pread, 'rb', bufsize)
# NOTE: Universal Newlines are broken on Windows/Py3, at least # NOTE: Universal Newlines are broken on Windows/Py3, at least
# in some cases. This is true in the stdlib subprocess module # in some cases. This is true in the stdlib subprocess module
# as well; the following line would fix the test cases in # as well; the following line would fix the test cases in
# test__subprocess.py that depend on python_universal_newlines, # test__subprocess.py that depend on python_universal_newlines,
# but would be inconsistent with the stdlib: # but would be inconsistent with the stdlib:
#msvcrt.setmode(self.stdout.fileno(), os.O_TEXT)
self.stdout.translate_newlines('r', encoding=self.encoding, errors=self.errors)
else: else:
self.stdout = FileObject(c2pread, 'rU', bufsize) self.stdout = FileObject(c2pread, 'rU', bufsize)
else: else:
...@@ -638,8 +634,8 @@ class Popen(object): ...@@ -638,8 +634,8 @@ class Popen(object):
if errread != -1: if errread != -1:
if universal_newlines or text_mode: if universal_newlines or text_mode:
if PY3: if PY3:
self.stderr = FileObject(errread, 'rb', bufsize) self.stderr = FileObject(errread, 'r', bufsize,
self.stderr.translate_newlines(None, encoding=encoding, errors=errors) encoding=encoding, errors=errors)
else: else:
self.stderr = FileObject(errread, 'rU', bufsize) self.stderr = FileObject(errread, 'rU', bufsize)
else: else:
...@@ -756,7 +752,13 @@ class Popen(object): ...@@ -756,7 +752,13 @@ class Popen(object):
def _read(): def _read():
try: try:
data = pipe.read() data = pipe.read()
except RuntimeError: except (
# io.Buffered* can raise RuntimeError: 'reentrant call'
RuntimeError,
# unbuffered Posix IO that we're already waiting on
# can raise this. Closing the pipe will free those greenlets up.
ConcurrentObjectUseError
):
return return
if not data: if not data:
return return
......
...@@ -24,9 +24,13 @@ import sys ...@@ -24,9 +24,13 @@ import sys
import gevent.core import gevent.core
from gevent import _compat as gsysinfo from gevent import _compat as gsysinfo
VERBOSE = sys.argv.count('-v') > 1
# Python implementations
PYPY = gsysinfo.PYPY PYPY = gsysinfo.PYPY
CPYTHON = not PYPY CPYTHON = not PYPY
VERBOSE = sys.argv.count('-v') > 1
# Platform/operating system
WIN = gsysinfo.WIN WIN = gsysinfo.WIN
LINUX = gsysinfo.LINUX LINUX = gsysinfo.LINUX
OSX = gsysinfo.OSX OSX = gsysinfo.OSX
......
from __future__ import print_function from __future__ import print_function
import functools
import gc
import io
import os import os
import sys import sys
import tempfile import tempfile
import gc
import unittest import unittest
import gevent import gevent
from gevent import fileobject from gevent import fileobject
from gevent._fileobjectcommon import OpenDescriptor
from gevent._compat import PY2 from gevent._compat import PY2
from gevent._compat import PY3
from gevent._compat import text_type
import gevent.testing as greentest import gevent.testing as greentest
from gevent.testing.sysinfo import PY3 from gevent.testing import sysinfo
from gevent.testing.flaky import reraiseFlakyTestRaceConditionLibuv
from gevent.testing.skipping import skipOnLibuvOnCIOnPyPy
from gevent.testing.skipping import skipOnLibuv
try: try:
ResourceWarning ResourceWarning
...@@ -35,8 +39,18 @@ def close_fd_quietly(fd): ...@@ -35,8 +39,18 @@ def close_fd_quietly(fd):
except (IOError, OSError): except (IOError, OSError):
pass pass
def skipUnlessWorksWithRegularFiles(func):
@functools.wraps(func)
def f(self):
if not self.WORKS_WITH_REGULAR_FILES:
self.skipTest("Doesn't work with regular files")
func(self)
return f
class TestFileObjectBlock(greentest.TestCase): # serves as a base for the concurrent tests too class TestFileObjectBlock(greentest.TestCase): # serves as a base for the concurrent tests too
WORKS_WITH_REGULAR_FILES = True
def _getTargetClass(self): def _getTargetClass(self):
return fileobject.FileObjectBlock return fileobject.FileObjectBlock
...@@ -86,8 +100,7 @@ class TestFileObjectBlock(greentest.TestCase): # serves as a base for the concur ...@@ -86,8 +100,7 @@ class TestFileObjectBlock(greentest.TestCase): # serves as a base for the concur
def test_del_close(self): def test_del_close(self):
self._test_del(close=True) self._test_del(close=True)
@skipOnLibuvOnCIOnPyPy("This appears to crash on libuv/pypy/travis.") @skipUnlessWorksWithRegularFiles
# No idea why, can't duplicate locally.
def test_seek(self): def test_seek(self):
fileno, path = tempfile.mkstemp('.gevent.test__fileobject.test_seek') fileno, path = tempfile.mkstemp('.gevent.test__fileobject.test_seek')
self.addCleanup(os.remove, path) self.addCleanup(os.remove, path)
...@@ -102,16 +115,7 @@ class TestFileObjectBlock(greentest.TestCase): # serves as a base for the concur ...@@ -102,16 +115,7 @@ class TestFileObjectBlock(greentest.TestCase): # serves as a base for the concur
native_data = f.read(1024) native_data = f.read(1024)
with open(path, 'rb') as f_raw: with open(path, 'rb') as f_raw:
try:
f = self._makeOne(f_raw, 'rb', close=False) f = self._makeOne(f_raw, 'rb', close=False)
except ValueError:
# libuv on Travis can raise EPERM
# from FileObjectPosix. I can't produce it on mac os locally,
# don't know what the issue is. This started happening on Jan 19,
# in the branch that caused all watchers to be explicitly closed.
# That shouldn't have any effect on io watchers, though, which were
# already being explicitly closed.
reraiseFlakyTestRaceConditionLibuv()
if PY3 or hasattr(f, 'seekable'): if PY3 or hasattr(f, 'seekable'):
# On Python 3, all objects should have seekable. # On Python 3, all objects should have seekable.
...@@ -128,33 +132,66 @@ class TestFileObjectBlock(greentest.TestCase): # serves as a base for the concur ...@@ -128,33 +132,66 @@ class TestFileObjectBlock(greentest.TestCase): # serves as a base for the concur
self.assertEqual(native_data, s) self.assertEqual(native_data, s)
self.assertEqual(native_data, fileobj_data) self.assertEqual(native_data, fileobj_data)
def test_str_default_to_native(self): def __check_native_matches(self, byte_data, open_mode,
# With no 'b' or 't' given, read and write native str. meth='read', **open_kwargs):
fileno, path = tempfile.mkstemp('.gevent_test_str_default') fileno, path = tempfile.mkstemp('.gevent_test_' + open_mode)
self.addCleanup(os.remove, path) self.addCleanup(os.remove, path)
os.write(fileno, b'abcdefg') os.write(fileno, byte_data)
os.close(fileno) os.close(fileno)
with open(path, 'r') as f: with io.open(path, open_mode, **open_kwargs) as f:
native_data = f.read() native_data = getattr(f, meth)()
with self._makeOne(path, 'r') as f: with self._makeOne(path, open_mode, **open_kwargs) as f:
gevent_data = f.read() gevent_data = getattr(f, meth)()
self.assertEqual(native_data, gevent_data) self.assertEqual(native_data, gevent_data)
return gevent_data
def test_text_encoding(self): @skipUnlessWorksWithRegularFiles
fileno, path = tempfile.mkstemp('.gevent_test_str_default') def test_str_default_to_native(self):
self.addCleanup(os.remove, path) # With no 'b' or 't' given, read and write native str.
gevent_data = self.__check_native_matches(b'abcdefg', 'r')
self.assertIsInstance(gevent_data, str)
os.write(fileno, u'\N{SNOWMAN}'.encode('utf-8')) @skipUnlessWorksWithRegularFiles
os.close(fileno) def test_text_encoding(self):
gevent_data = self.__check_native_matches(
u'\N{SNOWMAN}'.encode('utf-8'),
'r+',
buffering=5, encoding='utf-8'
)
self.assertIsInstance(gevent_data, text_type)
@skipUnlessWorksWithRegularFiles
def test_does_not_leak_on_exception(self):
# If an exception occurs during opening,
# everything still gets cleaned up.
pass
with self._makeOne(path, 'r+', bufsize=5, encoding='utf-8') as f: @skipUnlessWorksWithRegularFiles
gevent_data = f.read() def test_rbU_produces_bytes(self):
# Including U in rb still produces bytes.
# Note that the universal newline behaviour is
# essentially ignored in explicit bytes mode.
gevent_data = self.__check_native_matches(
b'line1\nline2\r\nline3\rlastline\n\n',
'rbU',
meth='readlines',
)
self.assertIsInstance(gevent_data[0], bytes)
self.assertEqual(len(gevent_data), 4)
@skipUnlessWorksWithRegularFiles
def test_rU_produces_native(self):
gevent_data = self.__check_native_matches(
b'line1\nline2\r\nline3\rlastline\n\n',
'rU',
meth='readlines',
)
self.assertIsInstance(gevent_data[0], str)
self.assertEqual(u'\N{SNOWMAN}', gevent_data)
def test_close_pipe(self): def test_close_pipe(self):
# Issue #190, 203 # Issue #190, 203
...@@ -264,6 +301,12 @@ class TestFileObjectThread(ConcurrentFileObjectMixin, ...@@ -264,6 +301,12 @@ class TestFileObjectThread(ConcurrentFileObjectMixin,
class TestFileObjectPosix(ConcurrentFileObjectMixin, class TestFileObjectPosix(ConcurrentFileObjectMixin,
TestFileObjectBlock): TestFileObjectBlock):
if sysinfo.LIBUV and sysinfo.LINUX:
# On Linux, initializing the watcher for a regular
# file results in libuv raising EPERM. But that works
# fine on other platforms.
WORKS_WITH_REGULAR_FILES = False
def _getTargetClass(self): def _getTargetClass(self):
return fileobject.FileObjectPosix return fileobject.FileObjectPosix
...@@ -293,14 +336,6 @@ class TestFileObjectPosix(ConcurrentFileObjectMixin, ...@@ -293,14 +336,6 @@ class TestFileObjectPosix(ConcurrentFileObjectMixin,
self.assertEqual(io_ex.args, os_ex.args) self.assertEqual(io_ex.args, os_ex.args)
self.assertEqual(str(io_ex), str(os_ex)) self.assertEqual(str(io_ex), str(os_ex))
@skipOnLibuv("libuv on linux raises EPERM ") # but works fine on macOS
def test_str_default_to_native(self):
TestFileObjectBlock.test_str_default_to_native(self)
@skipOnLibuv("libuv in linux raises EPERM")
def test_text_encoding(self):
TestFileObjectBlock.test_text_encoding(self)
class TestTextMode(unittest.TestCase): class TestTextMode(unittest.TestCase):
def test_default_mode_writes_linesep(self): def test_default_mode_writes_linesep(self):
...@@ -323,6 +358,39 @@ class TestTextMode(unittest.TestCase): ...@@ -323,6 +358,39 @@ class TestTextMode(unittest.TestCase):
self.assertEqual(data, os.linesep.encode('ascii')) self.assertEqual(data, os.linesep.encode('ascii'))
class TestOpenDescriptor(greentest.TestCase):
def _makeOne(self, *args, **kwargs):
return OpenDescriptor(*args, **kwargs)
def _check(self, regex, kind, *args, **kwargs):
with self.assertRaisesRegex(kind, regex):
self._makeOne(*args, **kwargs)
case = lambda re, **kwargs: (re, TypeError, kwargs)
vase = lambda re, **kwargs: (re, ValueError, kwargs)
CASES = (
case('mode', mode=42),
case('buffering', buffering='nope'),
case('encoding', encoding=42),
case('errors', errors=42),
vase('mode', mode='aoeug'),
vase('mode U cannot be combined', mode='wU'),
vase('text and binary', mode='rtb'),
vase('append mode at once', mode='rw'),
vase('exactly one', mode='+'),
vase('take an encoding', mode='rb', encoding='ascii'),
vase('take an errors', mode='rb', errors='strict'),
vase('take a newline', mode='rb', newline='\n'),
)
def pop():
for regex, kind, kwargs in TestOpenDescriptor.CASES:
setattr(
TestOpenDescriptor, 'test_' + regex,
lambda self, _re=regex, _kind=kind, _kw=kwargs: self._check(_re, _kind, 1, **_kw)
)
pop()
if __name__ == '__main__': if __name__ == '__main__':
......
...@@ -57,7 +57,7 @@ class BlockingTestMixin(object): ...@@ -57,7 +57,7 @@ class BlockingTestMixin(object):
self.fail("blocking function '%r' appeared not to block" % self.fail("blocking function '%r' appeared not to block" %
block_func) block_func)
self.t.join(10) # make sure the thread terminates self.t.join(10) # make sure the thread terminates
if self.t.isAlive(): if self.t.is_alive():
self.fail("trigger function '%r' appeared to not return" % self.fail("trigger function '%r' appeared to not return" %
trigger_func) trigger_func)
return self.result return self.result
...@@ -72,7 +72,7 @@ class BlockingTestMixin(object): ...@@ -72,7 +72,7 @@ class BlockingTestMixin(object):
block_func(*block_args) block_func(*block_args)
finally: finally:
self.t.join(10) # make sure the thread terminates self.t.join(10) # make sure the thread terminates
if self.t.isAlive(): if self.t.is_alive():
self.fail("trigger function '%r' appeared to not return" % self.fail("trigger function '%r' appeared to not return" %
trigger_func) trigger_func)
if not self.t.startedEvent.isSet(): if not self.t.startedEvent.isSet():
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment