Commit 766f5f5a authored by Jason Madden's avatar Jason Madden Committed by GitHub

More coverage (#1085)

* Massively simplify the internal _threading.py by removing everything we don't use.

Enable coverage testing for Python 3 and some for PyPy.

* PyPy can't handle coverage :(

* Account for a racy test and tweak the makefile to ignore errors uploading to coveralls. Python 3.7 was seen to generate that: https://travis-ci.org/gevent/gevent/jobs/334831358

* Manually exclude .so files. They caused coveralls to fail: https://travis-ci.org/gevent/gevent/jobs/334839765#L1755

* Enable coverage on pypy too.

* coverage pragmas

* Skip a test that fails under coverage sometimes
parent f76c65cc
......@@ -10,7 +10,7 @@ env:
matrix:
# These are ordered to get as much diversity in the
# first group of parallel runs (4) as posible
- TASK=lint-py27
- TASK=test-py27
- TASK=test-pypy
- TASK=test-py36
- TASK=test-py37
......
......@@ -23,6 +23,9 @@
They have always had the same default as Python 3, namely an empty
tuple and false, but now are accessible to Python 2.
- The internal, undocumented module ``gevent._threading`` has been
simplified.
1.3a1 (2018-01-27)
==================
......
......@@ -17,9 +17,9 @@ clean:
rm -f src/gevent/ares.c src/gevent/ares.h
rm -f src/gevent/_semaphore.c src/gevent/_semaphore.h
rm -f src/gevent/local.c src/gevent/local.h
rm -f src/gevent/*.so src/gevent/libev/*.so src/gevent/libuv/*.so
rm -f src/gevent/*.so src/gevent/*.pyd src/gevent/libev/*.so src/gevent/libuv/*.so src/gevent/libev/*.pyd src/gevent/libuv/*.pyd
rm -rf src/gevent/libev/*.o src/gevent/libuv/*.o src/gevent/*.o
rm -rf src/gevent/__pycache__ src/greentest/__pycache__ src/gevent/libev/__pycache__
rm -rf src/gevent/__pycache__ src/greentest/__pycache__ src/greentest/greentest/__pycache__ src/gevent/libev/__pycache__
rm -rf src/gevent/*.pyc src/greentest/*.pyc src/gevent/libev/*.pyc
rm -rf src/greentest/htmlcov src/greentest/.coverage
rm -rf build
......@@ -80,13 +80,17 @@ threadfiletest:
allbackendtest:
${PYTHON} scripts/travis.py fold_start default "Testing default backend"
GEVENT_CORE_CFFI_ONLY= make alltest
GEVENT_CORE_CFFI_ONLY= GEVENTTEST_COVERAGE=1 make alltest
${PYTHON} scripts/travis.py fold_end default
make cffibackendtest
GEVENTTEST_COVERAGE=1 make cffibackendtest
# because we set parallel=true, each run produces new and different coverage files; they all need
# to be combined
make coverage_combine
cffibackendtest:
${PYTHON} scripts/travis.py fold_start libuv "Testing libuv backend"
GEVENT_CORE_CFFI_ONLY=libuv make alltest
GEVENT_CORE_CFFI_ONLY=libuv GEVENTTEST_COVERAGE=1 make alltest
${PYTHON} scripts/travis.py fold_end libuv
${PYTHON} scripts/travis.py fold_start libev "Testing libev CFFI backend"
GEVENT_CORE_CFFI_ONLY=libev make alltest
......@@ -100,16 +104,15 @@ leaktest: test_prelim
bench:
${PYTHON} src/greentest/bench_sendall.py
travis_test_linters:
make lint
GEVENTTEST_COVERAGE=1 make leaktest
GEVENTTEST_COVERAGE=1 make cffibackendtest
# because we set parallel=true, each run produces new and different coverage files; they all need
# to be combined
make leaktest
make cffibackendtest
coverage_combine:
coverage combine . src/greentest/
coveralls --rcfile=src/greentest/.coveragerc
-coveralls --rcfile=src/greentest/.coveragerc
.PHONY: clean doc prospector lint travistest travis
......@@ -176,11 +179,8 @@ develop:
GEVENTSETUP_EV_VERIFY=3 python -m pip install -U -r dev-requirements.txt
${PYTHON} scripts/travis.py fold_end install
lint-py27: $(PY27)
PYTHON=python2.7.14 PATH=$(BUILD_RUNTIMES)/versions/python2.7.14/bin:$(PATH) make develop travis_test_linters
test-py27: $(PY27)
PYTHON=python2.7.14 PATH=$(BUILD_RUNTIMES)/versions/python2.7.14/bin:$(PATH) make develop allbackendtest
PYTHON=python2.7.14 PATH=$(BUILD_RUNTIMES)/versions/python2.7.14/bin:$(PATH) make develop lint leaktest allbackendtest
test-py34: $(PY34)
PYTHON=python3.4.7 PATH=$(BUILD_RUNTIMES)/versions/python3.4.7/bin:$(PATH) make develop allbackendtest
......@@ -195,7 +195,7 @@ test-py37: $(PY37)
PYTHON=python3.7.0a3 PATH=$(BUILD_RUNTIMES)/versions/python3.7.0a3/bin:$(PATH) make develop allbackendtest
test-pypy: $(PYPY)
PYTHON=$(PYPY) PATH=$(BUILD_RUNTIMES)/versions/pypy590/bin:$(PATH) make develop cffibackendtest
PYTHON=$(PYPY) PATH=$(BUILD_RUNTIMES)/versions/pypy590/bin:$(PATH) make develop cffibackendtest coverage_combine
test-pypy3: $(PYPY3)
PYTHON=$(PYPY3) PATH=$(BUILD_RUNTIMES)/versions/pypy3.5_590/bin:$(PATH) make develop basictest
......
......@@ -103,13 +103,6 @@ def only_if_watcher(func):
return _NoWatcherResult
return if_w
def error_if_no_watcher(func):
@functools.wraps(func)
def no_w(self):
if not self._watcher:
raise ValueError("No watcher present", self)
func(self)
return no_w
class LazyOnClass(object):
......@@ -122,7 +115,7 @@ class LazyOnClass(object):
self.func = func
def __get__(self, inst, klass):
if inst is None:
if inst is None: # pragma: no cover
return self
val = self.func(inst)
......@@ -147,7 +140,7 @@ class AbstractWatcherType(type):
def __new__(cls, name, bases, cls_dict):
if name != 'watcher' and not cls_dict.get('_watcher_skip_ffi'):
cls._fill_watcher(name, bases, cls_dict)
if '__del__' in cls_dict and not ALLOW_WATCHER_DEL:
if '__del__' in cls_dict and not ALLOW_WATCHER_DEL: # pragma: no cover
raise TypeError("CFFI watchers are not allowed to have __del__")
return type.__new__(cls, name, bases, cls_dict)
......@@ -166,7 +159,7 @@ class AbstractWatcherType(type):
return getattr(b, attr)
except AttributeError:
continue
if error:
if error: # pragma: no cover
raise AttributeError(attr)
_watcher_prefix = cls_dict.get('_watcher_prefix') or _mro_get('_watcher_prefix', bases)
......
......@@ -665,7 +665,7 @@ if hasattr(_socket, "socketpair"):
b = socket(family, type, proto, b.detach())
return a, b
else:
else: # pragma: no cover
# Origin: https://gist.github.com/4325783, by Geert Jansen. Public domain.
# gevent: taken from 3.6 release. Expected to be used only on Win. Added to Win/3.5
......@@ -1035,8 +1035,10 @@ class _basefileobject(object):
try:
from gevent.fileobject import FileObjectPosix
except ImportError:
# Manual implementation
except ImportError: # pragma: no cover
# Manual implementation, only on Windows
# XXX: I think we could simplify this using FileObjectCommon
# and just implementing the IOBase interface?
_fileobject = _basefileobject
else:
class _fileobject(FileObjectPosix):
......
......@@ -6,96 +6,31 @@ or not).
This module is missing 'Thread' class, but includes 'Queue'.
"""
from __future__ import absolute_import
try:
from Queue import Full, Empty
except ImportError:
from queue import Full, Empty # pylint:disable=import-error
from collections import deque
import heapq
from time import time as _time, sleep as _sleep
from itertools import islice as _islice
from gevent import monkey
from gevent._compat import PY3
__all__ = ['Condition',
'Event',
'Lock',
'RLock',
'Semaphore',
'BoundedSemaphore',
'Queue',
'local',
'stack_size']
__all__ = [
'Condition',
'Lock',
'Queue',
]
thread_name = '_thread' if PY3 else 'thread'
start_new_thread, Lock, get_ident, local, stack_size = monkey.get_original(thread_name, [
'start_new_thread', 'allocate_lock', 'get_ident', '_local', 'stack_size'])
class RLock(object):
def __init__(self):
self.__block = Lock()
self.__owner = None
self.__count = 0
def __repr__(self):
owner = self.__owner
return "<%s owner=%r count=%d>" % (
self.__class__.__name__, owner, self.__count)
def acquire(self, blocking=1):
me = get_ident()
if self.__owner == me:
self.__count = self.__count + 1
return 1
rc = self.__block.acquire(blocking)
if rc:
self.__owner = me
self.__count = 1
return rc
__enter__ = acquire
def release(self):
if self.__owner != get_ident():
raise RuntimeError("cannot release un-acquired lock")
self.__count = count = self.__count - 1
if not count:
self.__owner = None
self.__block.release()
def __exit__(self, t, v, tb):
self.release()
# Internal methods used by condition variables
def _acquire_restore(self, count_owner):
count, owner = count_owner
self.__block.acquire()
self.__count = count
self.__owner = owner
def _release_save(self):
count = self.__count
self.__count = 0
owner = self.__owner
self.__owner = None
self.__block.release()
return (count, owner)
def _is_owned(self):
return self.__owner == get_ident()
start_new_thread, Lock, = monkey.get_original(thread_name, [
'start_new_thread', 'allocate_lock',
])
class Condition(object):
# pylint:disable=method-hidden
def __init__(self, lock=None):
if lock is None:
lock = RLock()
def __init__(self, lock):
self.__lock = lock
# Export the lock's acquire() and release() methods
self.acquire = lock.acquire
......@@ -140,7 +75,7 @@ class Condition(object):
return False
return True
def wait(self, timeout=None):
def wait(self):
if not self._is_owned():
raise RuntimeError("cannot wait on un-acquired lock")
waiter = Lock()
......@@ -148,146 +83,34 @@ class Condition(object):
self.__waiters.append(waiter)
saved_state = self._release_save()
try: # restore state no matter what (e.g., KeyboardInterrupt)
if timeout is None:
waiter.acquire()
else:
# Balancing act: We can't afford a pure busy loop, so we
# have to sleep; but if we sleep the whole timeout time,
# we'll be unresponsive. The scheme here sleeps very
# little at first, longer as time goes on, but never longer
# than 20 times per second (or the timeout time remaining).
endtime = _time() + timeout
delay = 0.0005 # 500 us -> initial delay of 1 ms
while True:
gotit = waiter.acquire(0)
if gotit:
break
remaining = endtime - _time()
if remaining <= 0:
break
delay = min(delay * 2, remaining, .05)
_sleep(delay)
if not gotit:
try:
self.__waiters.remove(waiter)
except ValueError:
pass
waiter.acquire()
finally:
self._acquire_restore(saved_state)
def notify(self, n=1):
if not self._is_owned():
raise RuntimeError("cannot notify on un-acquired lock")
__waiters = self.__waiters
waiters = __waiters[:n]
if not waiters:
all_waiters = self.__waiters
waiters_to_notify = deque(_islice(all_waiters, n))
if not waiters_to_notify:
return
for waiter in waiters:
for waiter in waiters_to_notify:
waiter.release()
try:
__waiters.remove(waiter)
all_waiters.remove(waiter)
except ValueError:
pass
def notify_all(self):
self.notify(len(self.__waiters))
class Semaphore(object):
# After Tim Peters' semaphore class, but not quite the same (no maximum)
def __init__(self, value=1):
if value < 0:
raise ValueError("semaphore initial value must be >= 0")
self.__cond = Condition(Lock())
self.__value = value
def acquire(self, blocking=1):
rc = False
self.__cond.acquire()
while self.__value == 0:
if not blocking:
break
self.__cond.wait()
else:
self.__value = self.__value - 1
rc = True
self.__cond.release()
return rc
__enter__ = acquire
def release(self):
self.__cond.acquire()
self.__value = self.__value + 1
self.__cond.notify()
self.__cond.release()
def __exit__(self, t, v, tb):
self.release()
class BoundedSemaphore(Semaphore):
"""Semaphore that checks that # releases is <= # acquires"""
def __init__(self, value=1):
Semaphore.__init__(self, value)
self._initial_value = value
def release(self):
if self.Semaphore__value >= self._initial_value: # pylint:disable=no-member
raise ValueError("Semaphore released too many times")
return Semaphore.release(self)
class Queue(object):
"""Create a queue object.
class Event(object):
The queue is always infinite size.
"""
# After Tim Peters' event class (without is_posted())
def __init__(self):
self.__cond = Condition(Lock())
self.__flag = False
def _reset_internal_locks(self):
# private! called by Thread._reset_internal_locks by _after_fork()
self.__cond.__init__()
def is_set(self):
return self.__flag
def set(self):
self.__cond.acquire()
try:
self.__flag = True
self.__cond.notify_all()
finally:
self.__cond.release()
def clear(self):
self.__cond.acquire()
try:
self.__flag = False
finally:
self.__cond.release()
def wait(self, timeout=None):
self.__cond.acquire()
try:
if not self.__flag:
self.__cond.wait(timeout)
return self.__flag
finally:
self.__cond.release()
class Queue: # pylint:disable=old-style-class
"""Create a queue object with a given maximum size.
If maxsize is <= 0, the queue size is infinite.
"""
def __init__(self, maxsize=0):
self.maxsize = maxsize
self._init(maxsize)
self.queue = deque()
# mutex must be held whenever the queue is mutating. All methods
# that acquire mutex must release it before returning. mutex
# is shared between the three conditions, so acquiring and
......@@ -296,12 +119,7 @@ class Queue: # pylint:disable=old-style-class
# Notify not_empty whenever an item is added to the queue; a
# thread waiting to get is notified then.
self.not_empty = Condition(self.mutex)
# Notify not_full whenever an item is removed from the queue;
# a thread waiting to put is notified then.
self.not_full = Condition(self.mutex)
# Notify all_tasks_done whenever the number of unfinished tasks
# drops to zero; thread waiting to join() is notified to resume
self.all_tasks_done = Condition(self.mutex)
self.unfinished_tasks = 0
def task_done(self):
......@@ -318,198 +136,39 @@ class Queue: # pylint:disable=old-style-class
Raises a ValueError if called more times than there were items
placed in the queue.
"""
self.all_tasks_done.acquire()
try:
with self.mutex:
unfinished = self.unfinished_tasks - 1
if unfinished <= 0:
if unfinished < 0:
raise ValueError('task_done() called too many times')
self.all_tasks_done.notify_all()
self.unfinished_tasks = unfinished
finally:
self.all_tasks_done.release()
def join(self):
"""Blocks until all items in the Queue have been gotten and processed.
The count of unfinished tasks goes up whenever an item is added to the
queue. The count goes down whenever a consumer thread calls task_done()
to indicate the item was retrieved and all work on it is complete.
When the count of unfinished tasks drops to zero, join() unblocks.
"""
self.all_tasks_done.acquire()
try:
while self.unfinished_tasks:
self.all_tasks_done.wait()
finally:
self.all_tasks_done.release()
def qsize(self):
def qsize(self, len=len):
"""Return the approximate size of the queue (not reliable!)."""
self.mutex.acquire()
try:
return self._qsize()
finally:
self.mutex.release()
with self.mutex:
return len(self.queue)
def empty(self):
"""Return True if the queue is empty, False otherwise (not reliable!)."""
self.mutex.acquire()
try:
return not self._qsize()
finally:
self.mutex.release()
return not self.qsize()
def full(self):
"""Return True if the queue is full, False otherwise (not reliable!)."""
self.mutex.acquire()
try:
if self.maxsize <= 0:
return False
if self.maxsize >= self._qsize():
return True
finally:
self.mutex.release()
return False
def put(self, item, block=True, timeout=None):
def put(self, item):
"""Put an item into the queue.
If optional args 'block' is true and 'timeout' is None (the default),
block if necessary until a free slot is available. If 'timeout' is
a positive number, it blocks at most 'timeout' seconds and raises
the Full exception if no free slot was available within that time.
Otherwise ('block' is false), put an item on the queue if a free slot
is immediately available, else raise the Full exception ('timeout'
is ignored in that case).
"""
self.not_full.acquire()
try:
if self.maxsize > 0:
if not block:
if self._qsize() >= self.maxsize:
raise Full
elif timeout is None:
while self._qsize() >= self.maxsize:
self.not_full.wait()
elif timeout < 0:
raise ValueError("'timeout' must be a positive number")
else:
endtime = _time() + timeout
while self._qsize() >= self.maxsize:
remaining = endtime - _time()
if remaining <= 0.0:
raise Full
self.not_full.wait(remaining)
self._put(item)
with self.mutex:
self.queue.append(item)
self.unfinished_tasks += 1
self.not_empty.notify()
finally:
self.not_full.release()
def put_nowait(self, item):
"""Put an item into the queue without blocking.
Only enqueue the item if a free slot is immediately available.
Otherwise raise the Full exception.
"""
return self.put(item, False)
def get(self, block=True, timeout=None):
def get(self):
"""Remove and return an item from the queue.
If optional args 'block' is true and 'timeout' is None (the default),
block if necessary until an item is available. If 'timeout' is
a positive number, it blocks at most 'timeout' seconds and raises
the Empty exception if no item was available within that time.
Otherwise ('block' is false), return an item if one is immediately
available, else raise the Empty exception ('timeout' is ignored
in that case).
"""
self.not_empty.acquire()
try:
if not block:
if not self._qsize():
raise Empty
elif timeout is None:
while not self._qsize():
self.not_empty.wait()
elif timeout < 0:
raise ValueError("'timeout' must be a positive number")
else:
endtime = _time() + timeout
while not self._qsize():
remaining = endtime - _time()
if remaining <= 0.0:
raise Empty
self.not_empty.wait(remaining)
item = self._get()
self.not_full.notify()
with self.mutex:
while not self.queue:
self.not_empty.wait()
item = self.queue.popleft()
return item
finally:
self.not_empty.release()
def get_nowait(self):
"""Remove and return an item from the queue without blocking.
Only get an item if one is immediately available. Otherwise
raise the Empty exception.
"""
return self.get(False)
# Override these methods to implement other queue organizations
# (e.g. stack or priority queue).
# These will only be called with appropriate locks held
# Initialize the queue representation
def _init(self, maxsize):
# pylint:disable=unused-argument
self.queue = deque()
def _qsize(self, len=len):
return len(self.queue)
# Put a new item in the queue
def _put(self, item):
self.queue.append(item)
# Get an item from the queue
def _get(self):
return self.queue.popleft()
class PriorityQueue(Queue):
'''Variant of Queue that retrieves open entries in priority order (lowest first).
Entries are typically tuples of the form: (priority number, data).
'''
def _init(self, maxsize):
self.queue = []
def _qsize(self, len=len):
return len(self.queue)
def _put(self, item, heappush=heapq.heappush):
# pylint:disable=arguments-differ
heappush(self.queue, item)
def _get(self, heappop=heapq.heappop):
# pylint:disable=arguments-differ
return heappop(self.queue)
class LifoQueue(Queue):
'''Variant of Queue that retrieves most recently added entries first.'''
def _init(self, maxsize):
self.queue = []
def _qsize(self, len=len):
return len(self.queue)
def _put(self, item):
self.queue.append(item)
def _get(self):
return self.queue.pop()
from __future__ import absolute_import, print_function
__all__ = [
]
import gevent.libuv._corecffi as _corecffi # pylint:disable=no-name-in-module,import-error
ffi = _corecffi.ffi # pylint:disable=no-member
libuv = _corecffi.lib # pylint:disable=no-member
......@@ -135,10 +135,6 @@ class watcher(_base.watcher):
_dbg("Creating", type(self), "with ref", ref)
self.ref = ref
def _watcher_ffi_set_priority(self, priority):
# libuv has no concept of priority
pass
def _watcher_ffi_init(self, args):
# TODO: we could do a better job chokepointing this
return self._watcher_init(self.loop.ptr,
......
......@@ -462,7 +462,7 @@ try:
local.__new__ = __new__
else:
local.__new__ = classmethod(__new__)
except TypeError:
except TypeError: # pragma: no cover
pass
finally:
del sys
......@@ -36,8 +36,7 @@ class _FakeTimer(object):
def stop(self):
return
def cancel(self):
return
cancel = stop
stop = close = cancel
......
......@@ -3,15 +3,28 @@
# concurrency=greenlet, except it causes coverage itself to import
# gevent. That messes up our coverage numbers for top-level
# statements, so we use greenlet instead. See https://github.com/gevent/gevent/pull/655#issuecomment-141198002
# See also .coveragerc-pypy
concurrency = greenlet
parallel = True
source = gevent
omit = test_*
omit =
# This is for <= 2.7.8, which we don't test
src/gevent/_ssl2.py
src/gevent/libev/_corecffi_build.py
src/gevent/libuv/_corecffi_build.py
src/gevent/win32util.py
# having concurrency=greenlet means that the Queue class
# which is used from multiple real threads doesn't
# properly get covered.
src/gevent/_threading.py
test_*
# local.so sometimes gets included, and it can't be parsed
# as source, so it fails the whole process.
*.so
[report]
# Coverage is run on Linux under cPython 2, so
# exclude branches that are windows specific or pypy/python3
# specific
# Coverage is run on Linux under cPython 2/3 and pypy
exclude_lines =
pragma: no cover
def __repr__
......@@ -19,9 +32,6 @@ exclude_lines =
raise NotImplementedError
except ImportError:
if __name__ == .__main__.:
if PYPY:
if PY3:
if sys.platform == 'win32':
if mswindows:
if is_windows:
if sys.version_info.*>=.*3
[run]
# This is just like .coveragerc, but
# used for PyPy running. pypy doesn't support concurrency=greenlet
parallel = True
source = gevent
omit =
# This is for <= 2.7.8, which we don't test
src/gevent/_ssl2.py
src/gevent/libev/_corecffi_build.py
src/gevent/libuv/_corecffi_build.py
src/gevent/win32util.py
# having concurrency=greenlet means that the Queue class
# which is used from multiple real threads doesn't
# properly get covered.
src/gevent/_threading.py
test_*
# local.so sometimes gets included, and it can't be parsed
# as source, so it fails the whole process.
*.so
[report]
# Coverage is run on Linux under cPython 2/3 and pypy, so
# exclude branches that are windows specific or pypy
# specific
exclude_lines =
pragma: no cover
def __repr__
raise AssertionError
raise NotImplementedError
except ImportError:
if __name__ == .__main__.:
if sys.platform == 'win32':
if mswindows:
if is_windows:
......@@ -2,4 +2,16 @@
# on the path as per https://coverage.readthedocs.io/en/coverage-4.0b3/subprocess.html.
# Note that this disables other sitecustomize.py files.
import coverage
coverage.process_startup()
try:
coverage.process_startup()
except coverage.CoverageException as e:
if str(e) == "Can't support concurrency=greenlet with PyTracer, only threads are supported":
pass
else:
import traceback
traceback.print_exc()
raise
except:
import traceback
traceback.print_exc()
raise
......@@ -66,6 +66,8 @@ if sysinfo.PYPY3:
else:
skipOnPyPy3 = _do_not_skip
skipUnderCoverage = unittest.skip if sysinfo.RUN_COVERAGE else _do_not_skip
skipIf = unittest.skipIf
......
......@@ -13,6 +13,7 @@ from multiprocessing import cpu_count
from greentest import util
from greentest.util import log
from greentest.sysinfo import RUNNING_ON_CI
from greentest.sysinfo import PYPY
from greentest import six
......@@ -37,11 +38,22 @@ RUN_ALONE = [
IGNORE_COVERAGE = [
# Hangs forever
'test__threading_vs_settrace.py',
# times out
'test_socket.py',
# Doesn't get the exceptions it expects
'test_selectors.py',
# XXX ?
'test__issue302monkey.py',
"test_subprocess.py",
]
if PYPY:
IGNORE_COVERAGE += [
# Tends to timeout
'test__refcount.py',
'test__greenletset.py'
]
def run_many(tests, expected=(), failfast=False, quiet=False):
# pylint:disable=too-many-locals
......@@ -283,6 +295,8 @@ def main():
coverage = True
# NOTE: This must be run from the greentest directory
os.environ['COVERAGE_PROCESS_START'] = os.path.abspath(".coveragerc")
if PYPY:
os.environ['COVERAGE_PROCESS_START'] = os.path.abspath(".coveragerc-pypy")
os.environ['PYTHONPATH'] = os.path.abspath("coveragesite") + os.pathsep + os.environ.get("PYTHONPATH", "")
# We change directory often, use an absolute path to keep all the
# coverage files (which will have distinct suffixes because of parallel=true in .coveragerc
......
from __future__ import absolute_import, print_function, division
import greentest
import gevent
from gevent.event import Event, AsyncResult
import greentest
from greentest.skipping import skipUnderCoverage
from greentest.six import xrange
DELAY = 0.01
......@@ -100,28 +103,31 @@ class TestAsyncResult(greentest.TestCase):
gevent.sleep(0)
self.assertEqual(log, [('caught', obj)])
@skipUnderCoverage("This test is racy and sometimes fails")
def test_set(self):
event1 = AsyncResult()
event2 = AsyncResult()
timer_exc = MyException('interrupted')
g = gevent.spawn_later(DELAY / 2.0, event1.set, 'hello event1')
# Notice that this test is racy
g = gevent.spawn_later(DELAY, event1.set, 'hello event1')
t = gevent.Timeout.start_new(0, timer_exc)
try:
with self.assertRaises(MyException) as exc:
event1.get()
self.assertEqual(timer_exc, exc.exception)
X = object()
result = gevent.with_timeout(DELAY, event2.get, timeout_value=X)
self.assertIs(
result, X,
'Nobody sent anything to event2 yet it received %r' % (result, ))
finally:
t.close()
g.kill()
def test_set_with_timeout(self):
event2 = AsyncResult()
X = object()
result = gevent.with_timeout(DELAY, event2.get, timeout_value=X)
self.assertIs(
result, X,
'Nobody sent anything to event2 yet it received %r' % (result, ))
def test_nonblocking_get(self):
ar = AsyncResult()
self.assertRaises(gevent.Timeout, ar.get, block=False)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment