Commit 3e5feacd authored by Jason Madden's avatar Jason Madden

Reimplement FileObjectPosix on top of the io package under Python2. Fixes #151.

parent a41e16aa
...@@ -46,6 +46,10 @@ Unreleased ...@@ -46,6 +46,10 @@ Unreleased
get the ``repr`` of the main thread, and other CPython platforms to get the ``repr`` of the main thread, and other CPython platforms to
return an unjoinable DummyThread. (Note that this is not return an unjoinable DummyThread. (Note that this is not
recommended.) Reported in :issue:`153`. recommended.) Reported in :issue:`153`.
- Under Python 2, use the ``io`` package to implement
``FileObjectPosix``. This unifies the code with the Python 3
implementation, and fixes problems with using ``seek()``. See
:issue:`151`.
1.1a2 (Jul 8, 2015) 1.1a2 (Jul 8, 2015)
=================== ===================
......
...@@ -4,19 +4,19 @@ import sys ...@@ -4,19 +4,19 @@ import sys
from types import UnboundMethodType from types import UnboundMethodType
from gevent._fileobjectcommon import cancel_wait_ex from gevent._fileobjectcommon import cancel_wait_ex
from gevent._fileobjectcommon import FileObjectClosed
from gevent._socket2 import _fileobject from gevent._socket2 import _fileobject
from gevent._socket2 import _get_memory from gevent._socket2 import _get_memory
from gevent.hub import get_hub, PYPY, integer_types from gevent.hub import get_hub, integer_types
from gevent.os import _read from gevent.os import _read
from gevent.os import _write from gevent.os import _write
from gevent.os import ignored_errors from gevent.os import ignored_errors
from gevent.os import make_nonblocking from gevent.os import make_nonblocking
from gevent.socket import EBADF from gevent.socket import EBADF
from gevent._fileobjectposix import FileObjectPosix
try: try:
from gevent._util import SocketAdapter__del__, noop from gevent._util import SocketAdapter__del__
except ImportError: except ImportError:
SocketAdapter__del__ = None SocketAdapter__del__ = None
noop = None noop = None
...@@ -147,61 +147,3 @@ class SocketAdapter(object): ...@@ -147,61 +147,3 @@ class SocketAdapter(object):
if SocketAdapter__del__: if SocketAdapter__del__:
SocketAdapter.__del__ = UnboundMethodType(SocketAdapter__del__, None, SocketAdapter) SocketAdapter.__del__ = UnboundMethodType(SocketAdapter__del__, None, SocketAdapter)
class FileObjectPosix(_fileobject):
def __init__(self, fobj=None, mode='rb', bufsize=-1, close=True):
if isinstance(fobj, integer_types):
fileno = fobj
fobj = None
else:
fileno = fobj.fileno()
sock = SocketAdapter(fileno, mode, close=close)
self._fobj = fobj
self._closed = False
_fileobject.__init__(self, sock, mode=mode, bufsize=bufsize, close=close)
if PYPY:
sock._drop()
def __repr__(self):
if self._sock is None:
return '<%s closed>' % self.__class__.__name__
elif self._fobj is None:
return '<%s %s>' % (self.__class__.__name__, self._sock)
else:
return '<%s %s _fobj=%r>' % (self.__class__.__name__, self._sock, self._fobj)
def close(self):
if self._closed:
# make sure close() is only ran once when called concurrently
# cannot rely on self._sock for this because we need to keep that until flush() is done
return
self._closed = True
sock = self._sock
if sock is None:
return
try:
self.flush()
finally:
if self._fobj is not None or not self._close:
sock.detach()
else:
sock._drop()
self._sock = None
self._fobj = None
def __getattr__(self, item):
assert item != '_fobj'
if self._fobj is None:
raise FileObjectClosed
return getattr(self._fobj, item)
if not noop:
def __del__(self):
# disable _fileobject's __del__
pass
if noop:
FileObjectPosix.__del__ = UnboundMethodType(FileObjectPosix, None, noop)
import os from gevent._fileobjectposix import FileObjectPosix
import io
from io import BufferedRandom
from io import BufferedReader
from io import BufferedWriter
from io import BytesIO
from io import DEFAULT_BUFFER_SIZE
from io import RawIOBase
from io import TextIOWrapper
from io import UnsupportedOperation
from gevent._fileobjectcommon import cancel_wait_ex
from gevent.hub import get_hub
from gevent.os import _read
from gevent.os import _write
from gevent.os import ignored_errors
from gevent.os import make_nonblocking
class GreenFileDescriptorIO(RawIOBase):
def __init__(self, fileno, mode='r', closefd=True):
RawIOBase.__init__(self)
self._closed = False
self._closefd = closefd
self._fileno = fileno
make_nonblocking(fileno)
self._readable = 'r' in mode
self._writable = 'w' in mode
self.hub = get_hub()
io = self.hub.loop.io
if self._readable:
self._read_event = io(fileno, 1)
else:
self._read_event = None
if self._writable:
self._write_event = io(fileno, 2)
else:
self._write_event = None
def readable(self):
return self._readable
def writable(self):
return self._writable
def fileno(self):
return self._fileno
@property
def closed(self):
return self._closed
def close(self):
if self._closed:
return
self.flush()
self._closed = True
if self._readable:
self.hub.cancel_wait(self._read_event, cancel_wait_ex)
if self._writable:
self.hub.cancel_wait(self._write_event, cancel_wait_ex)
fileno = self._fileno
if self._closefd:
self._fileno = None
os.close(fileno)
def read(self, n=1):
if not self._readable:
raise UnsupportedOperation('readinto')
while True:
try:
return _read(self._fileno, n)
except (IOError, OSError) as ex:
if ex.args[0] not in ignored_errors:
raise
self.hub.wait(self._read_event)
def readall(self):
ret = BytesIO()
while True:
data = self.read(DEFAULT_BUFFER_SIZE)
if not data:
break
ret.write(data)
return ret.getvalue()
def readinto(self, b):
data = self.read(len(b))
n = len(data)
try:
b[:n] = data
except TypeError as err:
import array
if not isinstance(b, array.array):
raise err
b[:n] = array.array(b'b', data)
return n
def write(self, b):
if not self._writable:
raise UnsupportedOperation('write')
while True:
try:
return _write(self._fileno, b)
except (IOError, OSError) as ex:
if ex.args[0] not in ignored_errors:
raise
self.hub.wait(self._write_event)
class FileObjectPosix:
default_bufsize = io.DEFAULT_BUFFER_SIZE
def __init__(self, fobj, mode='rb', bufsize=-1, close=True):
if isinstance(fobj, int):
fileno = fobj
fobj = None
else:
fileno = fobj.fileno()
if not isinstance(fileno, int):
raise TypeError('fileno must be int: %r' % fileno)
mode = (mode or 'rb').replace('b', '')
if 'U' in mode:
self._translate = True
mode = mode.replace('U', '')
else:
self._translate = False
assert len(mode) == 1, 'mode can only be [rb, rU, wb]'
self._fobj = fobj
self._closed = False
self._close = close
self.fileio = GreenFileDescriptorIO(fileno, mode, closefd=close)
if bufsize < 0:
bufsize = self.default_bufsize
if mode == 'r':
if bufsize == 0:
bufsize = 1
elif bufsize == 1:
bufsize = self.default_bufsize
self.io = BufferedReader(self.fileio, bufsize)
elif mode == 'w':
if bufsize == 0:
bufsize = 1
elif bufsize == 1:
bufsize = self.default_bufsize
self.io = BufferedWriter(self.fileio, bufsize)
else:
# QQQ: not used
self.io = BufferedRandom(self.fileio, bufsize)
if self._translate:
self.io = TextIOWrapper(self.io)
@property
def closed(self):
"""True if the file is cloed"""
return self._closed
def close(self):
if self._closed:
# make sure close() is only ran once when called concurrently
return
self._closed = True
try:
self.io.close()
self.fileio.close()
finally:
self._fobj = None
def flush(self):
self.io.flush()
def fileno(self):
return self.io.fileno()
def write(self, data):
self.io.write(data)
def writelines(self, list):
self.io.writelines(list)
def read(self, size=-1):
return self.io.read(size)
def readline(self, size=-1):
return self.io.readline(size)
def readlines(self, sizehint=0):
return self.io.readlines(sizehint)
def __iter__(self):
return self.io
from __future__ import absolute_import
import os
import io
from io import BufferedRandom
from io import BufferedReader
from io import BufferedWriter
from io import BytesIO
from io import DEFAULT_BUFFER_SIZE
from io import RawIOBase
from io import TextIOWrapper
from io import UnsupportedOperation
from gevent._fileobjectcommon import cancel_wait_ex
from gevent.hub import get_hub
from gevent.os import _read
from gevent.os import _write
from gevent.os import ignored_errors
from gevent.os import make_nonblocking
class GreenFileDescriptorIO(RawIOBase):
def __init__(self, fileno, mode='r', closefd=True):
RawIOBase.__init__(self)
self._closed = False
self._closefd = closefd
self._fileno = fileno
make_nonblocking(fileno)
self._readable = 'r' in mode
self._writable = 'w' in mode
self.hub = get_hub()
io = self.hub.loop.io
if self._readable:
self._read_event = io(fileno, 1)
else:
self._read_event = None
if self._writable:
self._write_event = io(fileno, 2)
else:
self._write_event = None
self._seekable = None
def readable(self):
return self._readable
def writable(self):
return self._writable
def seekable(self):
if self._seekable is None:
try:
os.lseek(self._fileno, 0, os.SEEK_CUR)
except OSError:
self._seekable = False
else:
self._seekable = True
return self._seekable
def fileno(self):
return self._fileno
@property
def closed(self):
return self._closed
def close(self):
if self._closed:
return
self.flush()
self._closed = True
if self._readable:
self.hub.cancel_wait(self._read_event, cancel_wait_ex)
if self._writable:
self.hub.cancel_wait(self._write_event, cancel_wait_ex)
fileno = self._fileno
if self._closefd:
self._fileno = None
os.close(fileno)
def read(self, n=1):
if not self._readable:
raise UnsupportedOperation('readinto')
while True:
try:
return _read(self._fileno, n)
except (IOError, OSError) as ex:
if ex.args[0] not in ignored_errors:
raise
self.hub.wait(self._read_event)
def readall(self):
ret = BytesIO()
while True:
data = self.read(DEFAULT_BUFFER_SIZE)
if not data:
break
ret.write(data)
return ret.getvalue()
def readinto(self, b):
data = self.read(len(b))
n = len(data)
try:
b[:n] = data
except TypeError as err:
import array
if not isinstance(b, array.array):
raise err
b[:n] = array.array(b'b', data)
return n
def write(self, b):
if not self._writable:
raise UnsupportedOperation('write')
while True:
try:
return _write(self._fileno, b)
except (IOError, OSError) as ex:
if ex.args[0] not in ignored_errors:
raise
self.hub.wait(self._write_event)
def seek(self, offset, whence=0):
return os.lseek(self._fileno, offset, whence)
class FileObjectPosix(object):
default_bufsize = io.DEFAULT_BUFFER_SIZE
def __init__(self, fobj, mode='rb', bufsize=-1, close=True):
if isinstance(fobj, int):
fileno = fobj
fobj = None
else:
fileno = fobj.fileno()
if not isinstance(fileno, int):
raise TypeError('fileno must be int: %r' % fileno)
mode = (mode or 'rb').replace('b', '')
if 'U' in mode:
self._translate = True
mode = mode.replace('U', '')
else:
self._translate = False
assert len(mode) == 1, 'mode can only be [rb, rU, wb]'
self._fobj = fobj
self._closed = False
self._close = close
self.fileio = GreenFileDescriptorIO(fileno, mode, closefd=close)
if bufsize < 0:
bufsize = self.default_bufsize
if mode == 'r':
if bufsize == 0:
bufsize = 1
elif bufsize == 1:
bufsize = self.default_bufsize
self.io = BufferedReader(self.fileio, bufsize)
elif mode == 'w':
if bufsize == 0:
bufsize = 1
elif bufsize == 1:
bufsize = self.default_bufsize
self.io = BufferedWriter(self.fileio, bufsize)
else:
# QQQ: not used
self.io = BufferedRandom(self.fileio, bufsize)
if self._translate:
self.io = TextIOWrapper(self.io)
@property
def closed(self):
"""True if the file is cloed"""
return self._closed
def close(self):
if self._closed:
# make sure close() is only ran once when called concurrently
return
self._closed = True
try:
self.io.close()
self.fileio.close()
finally:
self._fobj = None
def flush(self):
self.io.flush()
def fileno(self):
return self.io.fileno()
def write(self, data):
self.io.write(data)
def writelines(self, lines):
self.io.writelines(lines)
def read(self, size=-1):
return self.io.read(size)
def readline(self, size=-1):
return self.io.readline(size)
def readlines(self, sizehint=0):
return self.io.readlines(sizehint)
def seek(self, *args, **kwargs):
return self.io.seek(*args, **kwargs)
def seekable(self):
return self.io.seekable()
def tell(self):
return self.io.tell()
def truncate(self, size=None):
return self.io.truncate(size)
def __iter__(self):
return self.io
def __getattr__(self, name):
return getattr(self._fobj, name)
...@@ -39,25 +39,30 @@ class _DummyThread(_DummyThread_): ...@@ -39,25 +39,30 @@ class _DummyThread(_DummyThread_):
# Make sure the MainThread can be found by our current greenlet ID, # Make sure the MainThread can be found by our current greenlet ID,
# otherwise we get a new DummyThread, which cannot be joined. # otherwise we get a new DummyThread, which cannot be joined.
# Fixes tests in test_threading_2 under PyPy, and generally makes things nicer # Fixes tests in test_threading_2 under PyPy, and generally makes things nicer
# when threading is imported before monkey patching # when gevent.threading is imported before monkey patching or not at all
# XXX: This assumes that the import is happening in the "main" greenlet # XXX: This assumes that the import is happening in the "main" greenlet
if _get_ident() not in __threading__._active and len(__threading__._active) == 1: if _get_ident() not in __threading__._active and len(__threading__._active) == 1:
k, v = next(iter(__threading__._active.items())) k, v = next(iter(__threading__._active.items()))
del __threading__._active[k] del __threading__._active[k]
v._Thread__ident = _get_ident() v._Thread__ident = _get_ident()
__threading__._active[_get_ident()] = v __threading__._active[_get_ident()] = v
del k
del v
# Avoid printing an error on shutdown trying to remove the thread entry # Avoid printing an error on shutdown trying to remove the thread entry
# we just replaced if we're not fully monkey patched in # we just replaced if we're not fully monkey patched in
_MAIN_THREAD = __threading__._get_ident() if hasattr(__threading__, '_get_ident') else __threading__.get_ident() # XXX: This causes a hang on PyPy for some unknown reason (as soon as class _active
# defines __delitem__, shutdown hangs. Maybe due to something with the GC?)
class _active(dict): if not PYPY:
def __delitem__(self, k): _MAIN_THREAD = __threading__._get_ident() if hasattr(__threading__, '_get_ident') else __threading__.get_ident()
if k == _MAIN_THREAD and k not in self:
return class _active(dict):
dict.__delitem__(self, k) def __delitem__(self, k):
if k == _MAIN_THREAD and k not in self:
__threading__._active = _active(__threading__._active) return
dict.__delitem__(self, k)
__threading__._active = _active(__threading__._active)
import sys import sys
......
...@@ -33,7 +33,9 @@ NOT_IMPLEMENTED = { ...@@ -33,7 +33,9 @@ NOT_IMPLEMENTED = {
COULD_BE_MISSING = { COULD_BE_MISSING = {
'socket': ['create_connection', 'RAND_add', 'RAND_egd', 'RAND_status']} 'socket': ['create_connection', 'RAND_add', 'RAND_egd', 'RAND_status']}
NO_ALL = ['gevent.threading', 'gevent._util', 'gevent._socketcommon', 'gevent._fileobjectcommon', NO_ALL = ['gevent.threading', 'gevent._util',
'gevent._socketcommon',
'gevent._fileobjectcommon', 'gevent._fileobjectposix',
'gevent._tblib'] 'gevent._tblib']
......
import os import os
import sys import sys
import tempfile
import greentest import greentest
import gevent import gevent
from gevent.fileobject import FileObject, FileObjectThread from gevent.fileobject import FileObject, FileObjectThread
...@@ -18,14 +19,16 @@ class Test(greentest.TestCase): ...@@ -18,14 +19,16 @@ class Test(greentest.TestCase):
if PYPY: if PYPY:
s.close() s.close()
else: else:
del s del s # Deliberately getting ResourceWarning under Py3
try: try:
os.close(w) os.close(w)
except OSError: except OSError:
pass # expected, because SocketAdapter already closed it pass # expected, because SocketAdapter already closed it
else: else:
raise AssertionError('os.close(%r) must not succeed' % w) raise AssertionError('os.close(%r) must not succeed' % w)
self.assertEqual(FileObject(r).read(), b'x') fobj = FileObject(r, 'rb')
self.assertEqual(fobj.read(), b'x')
fobj.close()
def test_del(self): def test_del(self):
self._test_del() self._test_del()
...@@ -52,11 +55,39 @@ class Test(greentest.TestCase): ...@@ -52,11 +55,39 @@ class Test(greentest.TestCase):
lines = [b'line1\n', b'line2\r', b'line3\r\n', b'line4\r\nline5', b'\nline6'] lines = [b'line1\n', b'line2\r', b'line3\r\n', b'line4\r\nline5', b'\nline6']
g = gevent.spawn(writer, FileObject(w, 'wb'), lines) g = gevent.spawn(writer, FileObject(w, 'wb'), lines)
try: try:
result = FileObject(r, 'rU').read() fobj = FileObject(r, 'rU')
result = fobj.read()
fobj.close()
self.assertEqual('line1\nline2\nline3\nline4\nline5\nline6', result) self.assertEqual('line1\nline2\nline3\nline4\nline5\nline6', result)
finally: finally:
g.kill() g.kill()
def test_seek(self):
fileno, path = tempfile.mkstemp()
s = b'a' * 1024
os.write(fileno, b'B' * 15)
os.write(fileno, s)
os.close(fileno)
try:
with open(path, 'rb') as f:
f.seek(15)
native_data = f.read(1024)
with open(path, 'rb') as f_raw:
f = FileObject(f_raw, 'rb')
if hasattr(f, 'seekable'):
# Py3
self.assertTrue(f.seekable())
f.seek(15)
self.assertEqual(15, f.tell())
fileobj_data = f.read(1024)
self.assertEqual(native_data, s)
self.assertEqual(native_data, fileobj_data)
finally:
os.remove(path)
def writer(fobj, line): def writer(fobj, line):
for character in line: for character in line:
......
...@@ -46,7 +46,7 @@ sys.stdout.write("..finishing..") ...@@ -46,7 +46,7 @@ sys.stdout.write("..finishing..")
""" """
class ThreadTrace(unittest.TestCase): class TestTrace(unittest.TestCase):
def test_untraceable_lock(self): def test_untraceable_lock(self):
if hasattr(sys, 'gettrace'): if hasattr(sys, 'gettrace'):
old = sys.gettrace() old = sys.gettrace()
...@@ -66,8 +66,10 @@ class ThreadTrace(unittest.TestCase): ...@@ -66,8 +66,10 @@ class ThreadTrace(unittest.TestCase):
self.failUnless(lst == [], "trace not empty") self.failUnless(lst == [], "trace not empty")
def run_script(self, more_args=[]): def run_script(self, more_args=()):
rc = subprocess.call([sys.executable, "-c", script] + more_args) args = [sys.executable, "-c", script]
args.extend(more_args)
rc = subprocess.call(args)
self.failIf(rc == 2, "interpreter was blocked") self.failIf(rc == 2, "interpreter was blocked")
self.failUnless(rc == 0, "Unexpected error") self.failUnless(rc == 0, "Unexpected error")
...@@ -79,8 +81,5 @@ class ThreadTrace(unittest.TestCase): ...@@ -79,8 +81,5 @@ class ThreadTrace(unittest.TestCase):
if __name__ == "__main__": if __name__ == "__main__":
try: import greentest
from test import support greentest.main()
except ImportError:
from test import test_support as support
support.run_unittest(ThreadTrace)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment