Commit dde9546b authored by Jason Madden's avatar Jason Madden

Move the core of socket._wait into Cython. This speeds up bench_sendall by about 3%

parent 92b1a6b6
...@@ -10,6 +10,8 @@ cdef InvalidSwitchError ...@@ -10,6 +10,8 @@ cdef InvalidSwitchError
cdef _waiter cdef _waiter
cdef _greenlet_primitives cdef _greenlet_primitives
cdef traceback cdef traceback
cdef _timeout_error
cdef Timeout
cdef extern from "greenlet/greenlet.h": cdef extern from "greenlet/greenlet.h":
...@@ -56,5 +58,12 @@ cdef class _WaitIterator: ...@@ -56,5 +58,12 @@ cdef class _WaitIterator:
cdef _cleanup(self) cdef _cleanup(self)
cpdef iwait(objects, timeout=*, count=*) cpdef iwait_on_objects(objects, timeout=*, count=*)
cpdef wait(objects=*, timeout=*, count=*) cpdef wait_on_objects(objects=*, timeout=*, count=*)
cdef _primitive_wait(watcher, timeout, timeout_exc, WaitOperationsGreenlet hub)
cpdef wait_on_watcher(watcher, timeout=*, timeout_exc=*, WaitOperationsGreenlet hub=*)
cpdef wait_read(fileno, timeout=*, timeout_exc=*)
cpdef wait_write(fileno, timeout=*, timeout_exc=*, event=*)
cpdef wait_readwrite(fileno, timeout=*, timeout_exc=*, event=*)
cpdef wait_on_socket(socket, watcher, timeout_exc=*)
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# copyright (c) 2018 gevent. See LICENSE. # copyright (c) 2018 gevent. See LICENSE.
# cython: auto_pickle=False,embedsignature=True,always_allow_keywords=False # cython: auto_pickle=False,embedsignature=True,always_allow_keywords=False,binding=True
""" """
A collection of primitives used by the hub, and suitable for A collection of primitives used by the hub, and suitable for
compilation with Cython because of their frequency of use. compilation with Cython because of their frequency of use.
...@@ -14,10 +14,13 @@ from __future__ import print_function ...@@ -14,10 +14,13 @@ from __future__ import print_function
import traceback import traceback
from gevent.exceptions import InvalidSwitchError from gevent.exceptions import InvalidSwitchError
from gevent.exceptions import ConcurrentObjectUseError
from gevent import _greenlet_primitives from gevent import _greenlet_primitives
from gevent import _waiter from gevent import _waiter
from gevent._util import _NONE
from gevent._hub_local import get_hub_noargs as get_hub from gevent._hub_local import get_hub_noargs as get_hub
from gevent.timeout import Timeout
# In Cython, we define these as 'cdef inline' functions. The # In Cython, we define these as 'cdef inline' functions. The
# compilation unit cannot have a direct assignment to them (import # compilation unit cannot have a direct assignment to them (import
...@@ -31,8 +34,11 @@ locals()['SwitchOutGreenletWithLoop'] = _greenlet_primitives.SwitchOutGreenletWi ...@@ -31,8 +34,11 @@ locals()['SwitchOutGreenletWithLoop'] = _greenlet_primitives.SwitchOutGreenletWi
__all__ = [ __all__ = [
'WaitOperationsGreenlet', 'WaitOperationsGreenlet',
'iwait', 'iwait_on_objects',
'wait', 'wait_on_objects',
'wait_read',
'wait_write',
'wait_readwrite',
] ]
class WaitOperationsGreenlet(SwitchOutGreenletWithLoop): # pylint:disable=undefined-variable class WaitOperationsGreenlet(SwitchOutGreenletWithLoop): # pylint:disable=undefined-variable
...@@ -160,7 +166,7 @@ class _WaitIterator(object): ...@@ -160,7 +166,7 @@ class _WaitIterator(object):
traceback.print_exc() traceback.print_exc()
def iwait(objects, timeout=None, count=None): def iwait_on_objects(objects, timeout=None, count=None):
""" """
Iteratively yield *objects* as they are ready, until all (or *count*) are ready Iteratively yield *objects* as they are ready, until all (or *count*) are ready
or *timeout* expired. or *timeout* expired.
...@@ -189,7 +195,7 @@ def iwait(objects, timeout=None, count=None): ...@@ -189,7 +195,7 @@ def iwait(objects, timeout=None, count=None):
return _WaitIterator(objects, hub, timeout, count) return _WaitIterator(objects, hub, timeout, count)
def wait(objects=None, timeout=None, count=None): def wait_on_objects(objects=None, timeout=None, count=None):
""" """
Wait for ``objects`` to become ready or for event loop to finish. Wait for ``objects`` to become ready or for event loop to finish.
...@@ -226,8 +232,131 @@ def wait(objects=None, timeout=None, count=None): ...@@ -226,8 +232,131 @@ def wait(objects=None, timeout=None, count=None):
if objects is None: if objects is None:
hub = get_hub() hub = get_hub()
return hub.join(timeout=timeout) # pylint:disable= return hub.join(timeout=timeout) # pylint:disable=
return list(iwait(objects, timeout, count)) return list(iwait_on_objects(objects, timeout, count))
_timeout_error = Exception
def set_default_timeout_error(e):
global _timeout_error
_timeout_error = e
def _primitive_wait(watcher, timeout, timeout_exc, hub):
if watcher.callback is not None:
raise ConcurrentObjectUseError('This socket is already used by another greenlet: %r'
% (watcher.callback, ))
if hub is None:
hub = get_hub()
if timeout is None:
hub.wait(watcher)
return
timeout = Timeout._start_new_or_dummy(
timeout,
(timeout_exc
if timeout_exc is not _NONE or timeout is None
else _timeout_error('timed out')))
with timeout:
hub.wait(watcher)
# Suitable to be bound as an instance method
def wait_on_socket(socket, watcher, timeout_exc=None):
_primitive_wait(watcher, socket.timeout,
timeout_exc if timeout_exc is not None else _NONE,
socket.hub)
def wait_on_watcher(watcher, timeout=None, timeout_exc=_NONE, hub=None):
"""
wait(watcher, timeout=None, [timeout_exc=None]) -> None
Block the current greenlet until *watcher* is ready.
If *timeout* is non-negative, then *timeout_exc* is raised after
*timeout* second has passed.
If :func:`cancel_wait` is called on *io* by another greenlet,
raise an exception in this blocking greenlet
(``socket.error(EBADF, 'File descriptor was closed in another
greenlet')`` by default).
:param io: A libev watcher, most commonly an IO watcher obtained from
:meth:`gevent.core.loop.io`
:keyword timeout_exc: The exception to raise if the timeout expires.
By default, a :class:`socket.timeout` exception is raised.
If you pass a value for this keyword, it is interpreted as for
:class:`gevent.timeout.Timeout`.
"""
_primitive_wait(watcher, timeout, timeout_exc, hub)
def wait_read(fileno, timeout=None, timeout_exc=_NONE):
"""
wait_read(fileno, timeout=None, [timeout_exc=None]) -> None
Block the current greenlet until *fileno* is ready to read.
For the meaning of the other parameters and possible exceptions,
see :func:`wait`.
.. seealso:: :func:`cancel_wait`
"""
hub = get_hub()
io = hub.loop.io(fileno, 1)
try:
return wait_on_watcher(io, timeout, timeout_exc, hub)
finally:
io.close()
def wait_write(fileno, timeout=None, timeout_exc=_NONE, event=_NONE):
"""
wait_write(fileno, timeout=None, [timeout_exc=None]) -> None
Block the current greenlet until *fileno* is ready to write.
For the meaning of the other parameters and possible exceptions,
see :func:`wait`.
.. deprecated:: 1.1
The keyword argument *event* is ignored. Applications should not pass this parameter.
In the future, doing so will become an error.
.. seealso:: :func:`cancel_wait`
"""
# pylint:disable=unused-argument
hub = get_hub()
io = hub.loop.io(fileno, 2)
try:
return wait_on_watcher(io, timeout, timeout_exc, hub)
finally:
io.close()
def wait_readwrite(fileno, timeout=None, timeout_exc=_NONE, event=_NONE):
"""
wait_readwrite(fileno, timeout=None, [timeout_exc=None]) -> None
Block the current greenlet until *fileno* is ready to read or
write.
For the meaning of the other parameters and possible exceptions,
see :func:`wait`.
.. deprecated:: 1.1
The keyword argument *event* is ignored. Applications should not pass this parameter.
In the future, doing so will become an error.
.. seealso:: :func:`cancel_wait`
"""
# pylint:disable=unused-argument
hub = get_hub()
io = hub.loop.io(fileno, 3)
try:
return wait_on_watcher(io, timeout, timeout_exc, hub)
finally:
io.close()
def _init(): def _init():
......
...@@ -103,6 +103,7 @@ class _closedsocket(object): ...@@ -103,6 +103,7 @@ class _closedsocket(object):
timeout_default = object() timeout_default = object()
from gevent._hub_primitives import wait_on_socket as _wait_on_socket
class socket(object): class socket(object):
""" """
...@@ -180,22 +181,7 @@ class socket(object): ...@@ -180,22 +181,7 @@ class socket(object):
ref = property(_get_ref, _set_ref) ref = property(_get_ref, _set_ref)
def _wait(self, watcher, timeout_exc=timeout('timed out')): _wait = _wait_on_socket
"""Block the current greenlet until *watcher* has pending events.
If *timeout* is non-negative, then *timeout_exc* is raised after *timeout* second has passed.
By default *timeout_exc* is ``socket.timeout('timed out')``.
If :func:`cancel_wait` is called, raise ``socket.error(EBADF, 'File descriptor was closed in another greenlet')``.
"""
if watcher.callback is not None:
raise _socketcommon.ConcurrentObjectUseError('This socket is already used by another greenlet: %r' % (watcher.callback, ))
timeout = Timeout._start_new_or_dummy(self.timeout, timeout_exc, ref=False)
try:
self.hub.wait(watcher)
finally:
timeout.close()
def accept(self): def accept(self):
sock = self._sock sock = self._sock
......
...@@ -67,6 +67,7 @@ class _wrefsocket(_socket.socket): ...@@ -67,6 +67,7 @@ class _wrefsocket(_socket.socket):
timeout = property(lambda s: s.gettimeout(), timeout = property(lambda s: s.gettimeout(),
lambda s, nv: s.settimeout(nv)) lambda s, nv: s.settimeout(nv))
from gevent._hub_primitives import wait_on_socket as _wait_on_socket
class socket(object): class socket(object):
""" """
...@@ -181,22 +182,7 @@ class socket(object): ...@@ -181,22 +182,7 @@ class socket(object):
ref = property(_get_ref, _set_ref) ref = property(_get_ref, _set_ref)
def _wait(self, watcher, timeout_exc=timeout('timed out')): _wait = _wait_on_socket
"""Block the current greenlet until *watcher* has pending events.
If *timeout* is non-negative, then *timeout_exc* is raised after *timeout* second has passed.
By default *timeout_exc* is ``socket.timeout('timed out')``.
If :func:`cancel_wait` is called, raise ``socket.error(EBADF, 'File descriptor was closed in another greenlet')``.
"""
if watcher.callback is not None:
raise _socketcommon.ConcurrentObjectUseError('This socket is already used by another greenlet: %r' % (watcher.callback, ))
timer = Timeout._start_new_or_dummy(self.timeout, timeout_exc, ref=False)
try:
self.hub.wait(watcher)
finally:
timer.close()
def dup(self): def dup(self):
"""dup() -> socket object """dup() -> socket object
......
...@@ -134,96 +134,13 @@ del _name, _value ...@@ -134,96 +134,13 @@ del _name, _value
_timeout_error = timeout # pylint: disable=undefined-variable _timeout_error = timeout # pylint: disable=undefined-variable
from gevent import _hub_primitives
_hub_primitives.set_default_timeout_error(_timeout_error)
def wait(io, timeout=None, timeout_exc=_NONE): wait = _hub_primitives.wait_on_watcher
""" wait_read = _hub_primitives.wait_read
Block the current greenlet until *io* is ready. wait_write = _hub_primitives.wait_write
wait_readwrite = _hub_primitives.wait_readwrite
If *timeout* is non-negative, then *timeout_exc* is raised after
*timeout* second has passed. By default *timeout_exc* is
``socket.timeout('timed out')``.
If :func:`cancel_wait` is called on *io* by another greenlet,
raise an exception in this blocking greenlet
(``socket.error(EBADF, 'File descriptor was closed in another
greenlet')`` by default).
:param io: A libev watcher, most commonly an IO watcher obtained from
:meth:`gevent.core.loop.io`
:keyword timeout_exc: The exception to raise if the timeout expires.
By default, a :class:`socket.timeout` exception is raised.
If you pass a value for this keyword, it is interpreted as for
:class:`gevent.timeout.Timeout`.
"""
if io.callback is not None:
raise ConcurrentObjectUseError('This socket is already used by another greenlet: %r' % (io.callback, ))
timeout = Timeout._start_new_or_dummy(
timeout,
(timeout_exc
if timeout_exc is not _NONE or timeout is None
else _timeout_error('timed out')))
with timeout:
return get_hub().wait(io)
# rename "io" to "watcher" because wait() works with any watcher
def wait_read(fileno, timeout=None, timeout_exc=_NONE):
"""
Block the current greenlet until *fileno* is ready to read.
For the meaning of the other parameters and possible exceptions,
see :func:`wait`.
.. seealso:: :func:`cancel_wait`
"""
io = get_hub().loop.io(fileno, 1)
try:
return wait(io, timeout, timeout_exc)
finally:
io.close()
def wait_write(fileno, timeout=None, timeout_exc=_NONE, event=_NONE):
"""
Block the current greenlet until *fileno* is ready to write.
For the meaning of the other parameters and possible exceptions,
see :func:`wait`.
:keyword event: Ignored. Applications should not pass this parameter.
In the future, it may become an error.
.. seealso:: :func:`cancel_wait`
"""
# pylint:disable=unused-argument
io = get_hub().loop.io(fileno, 2)
try:
return wait(io, timeout, timeout_exc)
finally:
io.close()
def wait_readwrite(fileno, timeout=None, timeout_exc=_NONE, event=_NONE):
"""
Block the current greenlet until *fileno* is ready to read or
write.
For the meaning of the other parameters and possible exceptions,
see :func:`wait`.
:keyword event: Ignored. Applications should not pass this parameter.
In the future, it may become an error.
.. seealso:: :func:`cancel_wait`
"""
# pylint:disable=unused-argument
io = get_hub().loop.io(fileno, 3)
try:
return wait(io, timeout, timeout_exc)
finally:
io.close()
#: The exception raised by default on a call to :func:`cancel_wait` #: The exception raised by default on a call to :func:`cancel_wait`
class cancel_wait_ex(error): # pylint: disable=undefined-variable class cancel_wait_ex(error): # pylint: disable=undefined-variable
...@@ -238,8 +155,6 @@ def cancel_wait(watcher, error=cancel_wait_ex): ...@@ -238,8 +155,6 @@ def cancel_wait(watcher, error=cancel_wait_ex):
get_hub().cancel_wait(watcher, error) get_hub().cancel_wait(watcher, error)
def gethostbyname(hostname): def gethostbyname(hostname):
""" """
gethostbyname(host) -> address gethostbyname(host) -> address
......
...@@ -50,8 +50,8 @@ from gevent._hub_primitives import WaitOperationsGreenlet ...@@ -50,8 +50,8 @@ from gevent._hub_primitives import WaitOperationsGreenlet
# Export # Export
from gevent import _hub_primitives from gevent import _hub_primitives
wait = _hub_primitives.wait wait = _hub_primitives.wait_on_objects
iwait = _hub_primitives.iwait iwait = _hub_primitives.iwait_on_objects
from gevent.monkey import get_original from gevent.monkey import get_original
......
...@@ -347,7 +347,7 @@ class Values(object): ...@@ -347,7 +347,7 @@ class Values(object):
else: else:
self.error = source.exception self.error = source.exception
if self.count <= 0: if self.count <= 0:
self.waiter.switch() self.waiter.switch(None)
def get(self): def get(self):
self.waiter.get() self.waiter.get()
......
...@@ -18,8 +18,8 @@ from __future__ import absolute_import, print_function, division ...@@ -18,8 +18,8 @@ from __future__ import absolute_import, print_function, division
from gevent._compat import string_types from gevent._compat import string_types
from gevent._util import _NONE from gevent._util import _NONE
from gevent.hub import getcurrent from greenlet import getcurrent
from gevent.hub import _get_hub_noargs as get_hub from gevent._hub_local import get_hub_noargs as get_hub
__all__ = [ __all__ = [
'Timeout', 'Timeout',
......
...@@ -408,25 +408,15 @@ class TestFunctions(greentest.TestCase): ...@@ -408,25 +408,15 @@ class TestFunctions(greentest.TestCase):
# Issue #635 # Issue #635
import gevent.socket import gevent.socket
import gevent._socketcommon import gevent._socketcommon
orig_get_hub = gevent.socket.get_hub
class get_hub(object):
def wait(self, _io):
gevent.sleep(10)
class io(object): class io(object):
callback = None callback = None
gevent._socketcommon.get_hub = get_hub def start(self, *_args):
try: gevent.sleep(10)
try:
gevent.socket.wait(io(), timeout=0.01) with self.assertRaises(gevent.socket.timeout):
except gevent.socket.timeout: gevent.socket.wait(io(), timeout=0.01)
pass
else:
self.fail("Should raise timeout error")
finally:
gevent._socketcommon.get_hub = orig_get_hub
def test_signatures(self): def test_signatures(self):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment