Commit e76166f4 authored by Denis Bilenko's avatar Denis Bilenko

implement dns functions in gevent.socket using c-ares

- gevent.socket module adds 3 new methods: gethostbyname_ex, gethostbyaddr, getnameinfo (in addition to getaddrinfo and gethostbyname)
- added new bindings: core.ares_channel class
- Hub gets a new property: 'resolver'
- Added a new gevent.resolver_ares.Resolver class which translates from low-lever c-ares bindings into Python socket compatible functions
parent 1595ebc5
static void gevent_callback(struct ev_loop *, void *, int);
static void gevent_signal_check(struct ev_loop *, void *, int);
struct PyGeventLoopObject;
static void gevent_handle_error(struct PyGeventLoopObject* loop, PyObject* where);
#if defined(_WIN32)
static void gevent_periodic_signal_check(struct ev_loop *, void *, int);
......
cdef extern from "ares.h":
struct ares_options:
void* sock_state_cb
void* sock_state_cb_data
int ARES_OPT_SOCK_STATE_CB
int ARES_LIB_INIT_ALL
int ARES_SOCKET_BAD
int ARES_SUCCESS
int ARES_ENODATA
int ARES_EFORMERR
int ARES_ESERVFAIL
int ARES_ENOTFOUND
int ARES_ENOTIMP
int ARES_EREFUSED
int ARES_EBADQUERY
int ARES_EBADNAME
int ARES_EBADFAMILY
int ARES_EBADRESP
int ARES_ECONNREFUSED
int ARES_ETIMEOUT
int ARES_EOF
int ARES_EFILE
int ARES_ENOMEM
int ARES_EDESTRUCTION
int ARES_EBADSTR
int ARES_EBADFLAGS
int ARES_ENONAME
int ARES_EBADHINTS
int ARES_ENOTINITIALIZED
int ARES_ELOADIPHLPAPI
int ARES_EADDRGETNETWORKPARAMS
int ARES_ECANCELLED
int ARES_NI_NOFQDN
int ARES_NI_NUMERICHOST
int ARES_NI_NAMEREQD
int ARES_NI_NUMERICSERV
int ARES_NI_DGRAM
int ARES_NI_TCP
int ARES_NI_UDP
int ARES_NI_SCTP
int ARES_NI_DCCP
int ARES_NI_NUMERICSCOPE
int ARES_NI_LOOKUPHOST
int ARES_NI_LOOKUPSERVICE
int ares_library_init(int flags)
void ares_library_cleanup()
int ares_init_options(void *channelptr, ares_options *options, int)
int ares_init(void *channelptr)
void ares_destroy(void *channelptr)
void ares_gethostbyname(void* channel, char *name, int family, void* callback, void *arg)
void ares_gethostbyaddr(void* channel, void *addr, int addrlen, int family, void* callback, void *arg)
void ares_process_fd(void* channel, int read_fd, int write_fd)
char* ares_strerror(int code)
void ares_cancel(void* channel)
void ares_getnameinfo(void* channel, void* sa, int salen, int flags, void* callback, void *arg)
cdef extern from "inet_net_pton.h":
int ares_inet_pton(int af, char *src, void *dst)
This diff is collapsed.
......@@ -617,3 +617,6 @@ def set_exc_info(object type, object value):
Py_INCREF(<void*>value)
tstate.exc_value = <void *>value
tstate.exc_traceback = NULL
include "cares.pxi"
#include "Python.h"
#include "ares__close_sockets.c"
#include "ares_data.c"
#include "ares_destroy.c"
#include "ares_expand_name.c"
#include "ares_free_hostent.c"
#include "ares_free_string.c"
#include "ares_gethostbyaddr.c"
#include "ares_gethostbyname.c"
#include "ares__get_hostent.c"
#include "ares_getnameinfo.c"
#include "ares_init.c"
#include "ares_library_init.c"
#include "ares_llist.c"
#include "ares_mkquery.c"
#include "ares_nowarn.c"
#include "ares_options.c"
#include "ares_parse_aaaa_reply.c"
#include "ares_parse_a_reply.c"
#include "ares_parse_ptr_reply.c"
#include "ares_process.c"
#include "ares_query.c"
#include "ares__read_line.c"
#include "ares_search.c"
#include "ares_send.c"
#include "ares_strerror.c"
//#include "ares_timeout.c"
#include "ares__timeval.c"
//#include "ares_version.c"
#include "bitncmp.c"
#include "inet_net_pton.c"
#include "inet_ntop.c"
#ifdef _WIN32
#include "windows_port.c"
#endif
static PyObject* _socket_error = 0;
static PyObject* _socket_gaierror = 0;
static inline PyObject* get_socket_object(PyObject** pobject, const char* name, int incref)
{
if (!*pobject) {
PyObject* _socket;
_socket = PyImport_ImportModuleNoBlock("_socket");
if (_socket) {
*pobject = PyObject_GetAttrString(_socket, name);
if (!*pobject) {
PyErr_WriteUnraisable(Py_None);
}
Py_DECREF(_socket);
}
else {
PyErr_WriteUnraisable(Py_None);
}
if (!*pobject) {
*pobject = PyExc_IOError;
}
}
if (incref)
Py_INCREF(*pobject);
return *pobject;
}
static inline PyObject* get_socket_error() { return get_socket_object(&_socket_error, "error", 1); }
static inline PyObject* get_socket_gaierror() { return get_socket_object(&_socket_gaierror, "gaierror", 1); }
static int gevent_append_addr(PyObject* list, int family, void* src, char* tmpbuf, size_t tmpsize) {
int status = -1;
PyObject* tmp;
if (ares_inet_ntop(family, src, tmpbuf, tmpsize)) {
tmp = PyString_FromString(tmpbuf);
if (tmp) {
status = PyList_Append(list, tmp);
Py_DECREF(tmp);
}
}
return status;
}
static PyObject*
parse_h_aliases(struct hostent *h)
{
char **pch;
PyObject *result = NULL;
PyObject *tmp;
result = PyList_New(0);
if (result && h->h_aliases) {
for (pch = h->h_aliases; *pch != NULL; pch++) {
if (*pch != h->h_name && strcmp(*pch, h->h_name)) {
int status;
tmp = PyString_FromString(*pch);
if (tmp == NULL) {
break;
}
status = PyList_Append(result, tmp);
Py_DECREF(tmp);
if (status) {
break;
}
}
}
}
return result;
}
static PyObject *
parse_h_addr_list(struct hostent *h)
{
char **pch;
PyObject *result = NULL;
PyObject *tmp;
result = PyList_New(0);
if (result) {
switch (h->h_addrtype) {
case AF_INET:
{
char tmpbuf[sizeof "255.255.255.255"];
for (pch = h->h_addr_list; *pch != NULL; pch++) {
if (gevent_append_addr(result, AF_INET, *pch, tmpbuf, sizeof(tmpbuf))) {
break;
}
}
break;
}
case AF_INET6:
{
char tmpbuf[sizeof("ffff:ffff:ffff:ffff:ffff:ffff:255.255.255.255")];
for (pch = h->h_addr_list; *pch != NULL; pch++) {
if (gevent_append_addr(result, AF_INET6, *pch, tmpbuf, sizeof(tmpbuf))) {
break;
}
}
break;
}
default:
PyErr_SetString(get_socket_object(&_socket_error, "error", 0), "unsupported address family");
Py_DECREF(result);
result = NULL;
}
}
return result;
}
static inline int gevent_make_sockaddr(char* hostp, int port, int flowinfo, int scope_id, struct sockaddr_in6* sa6) {
if ( ares_inet_pton(AF_INET, hostp, &((struct sockaddr_in*)sa6)->sin_addr.s_addr) > 0 ) {
((struct sockaddr_in*)sa6)->sin_family = AF_INET;
((struct sockaddr_in*)sa6)->sin_port = htons(port);
return sizeof(struct sockaddr_in);
}
else if ( ares_inet_pton(AF_INET6, hostp, &sa6->sin6_addr.s6_addr) > 0 ) {
sa6->sin6_family = AF_INET6;
sa6->sin6_port = htons(port);
sa6->sin6_flowinfo = flowinfo;
sa6->sin6_scope_id = scope_id;
return sizeof(struct sockaddr_in6);
}
return -1;
}
......@@ -194,6 +194,7 @@ class Hub(greenlet):
SYSTEM_ERROR = (KeyboardInterrupt, SystemExit, SystemError)
loop_class = 'gevent.core.loop'
resolver_class = 'gevent.resolver_ares.Resolver'
pformat = 'pprint.pformat'
def __init__(self, loop=None, default=None):
......@@ -208,6 +209,7 @@ class Hub(greenlet):
loop_class = _import(self.loop_class)
self.loop = loop_class(flags=loop, default=default)
self.loop.error_handler = self
self._resolver = None
self.pformat = _import(self.pformat)
def handle_error(self, where, type, value, tb):
......@@ -285,11 +287,27 @@ class Hub(greenlet):
if _threadlocal.__dict__.get('hub') is self:
_threadlocal.__dict__.pop('hub')
self.run = None
return
try:
self.switch()
except LoopExit:
pass
else:
try:
self.switch()
except LoopExit:
pass
self.loop.error_handler = None # break the ref cycle
def _get_resolver(self):
if self._resolver is None:
if self.resolver_class is not None:
resolver_class = _import(self.resolver_class)
self._resolver = resolver_class(hub=self)
return self._resolver
def _set_resolver(self, value):
self._resolver = value
def _del_resolver(self):
del self._resolver
resolver = property(_get_resolver, _set_resolver, _del_resolver)
class LoopExit(Exception):
......
from _socket import getservbyname, getaddrinfo, gaierror, error
from gevent.hub import Waiter, get_hub, _import
from gevent.socket import AF_UNSPEC, AF_INET, AF_INET6, SOCK_STREAM, SOCK_DGRAM, SOCK_RAW, AI_NUMERICHOST, EAI_SERVICE
__all__ = ['Resolver']
class Resolver(object):
ares_class = 'gevent.core.ares_channel'
def __init__(self, hub=None, ares=None):
if hub is None:
hub = get_hub()
self.hub = hub
if ares is None:
ares_class = _import(self.ares_class)
ares = ares_class(hub.loop)
self.ares = ares
def gethostbyname(self, hostname, family=AF_INET):
return self.gethostbyname_ex(hostname, family)[-1][0]
def gethostbyname_ex(self, hostname, family=AF_INET):
waiter = Waiter(self.hub)
self.ares.gethostbyname(waiter, hostname, family)
return waiter.get()
def getaddrinfo(self, host, port, family=0, socktype=0, proto=0, flags=0):
if isinstance(host, unicode):
host = host.encode('idna')
elif not isinstance(host, str) or (flags & AI_NUMERICHOST):
# this handles cases which do not require network access
# 1) host is None
# 2) host is of an invalid type
# 3) AI_NUMERICHOST flag is set
return getaddrinfo(host, port, family, socktype, proto, flags)
# we also call _socket.getaddrinfo below if family is not one of AF_*
if isinstance(port, basestring):
try:
if socktype == 0:
try:
port = getservbyname(port, 'tcp')
socktype = SOCK_STREAM
except error:
port = getservbyname(port, 'udp')
socktype = SOCK_DGRAM
elif socktype == SOCK_STREAM:
port = getservbyname(port, 'tcp')
elif socktype == SOCK_DGRAM:
port = getservbyname(port, 'udp')
else:
raise gaierror(EAI_SERVICE, 'Servname not supported for ai_socktype')
except error, ex:
if 'not found' in str(ex):
raise gaierror(EAI_SERVICE, 'Servname not supported for ai_socktype')
else:
raise gaierror(str(ex))
elif port is None:
port = 0
socktype_proto = [(SOCK_STREAM, 6), (SOCK_DGRAM, 17), (SOCK_RAW, 0)]
if socktype:
socktype_proto = [(x, y) for (x, y) in socktype_proto if socktype == x]
if proto:
socktype_proto = [(x, y) for (x, y) in socktype_proto if proto == y]
if family == AF_UNSPEC:
values = Values(2)
self.ares.gethostbyname(values, host, AF_INET)
self.ares.gethostbyname(values, host, AF_INET6)
elif family == AF_INET:
values = Values(1)
self.ares.gethostbyname(values, host, AF_INET)
elif family == AF_INET6:
values = Values(1)
self.ares.gethostbyname(values, host, AF_INET6)
else:
# most likely will raise the exception, let the original getaddrinfo do it
return getaddrinfo(host, port, family, socktype, proto, flags)
values = values.get()
if len(values) == 2 and values[0] == values[1]:
values.pop()
result = []
try:
for addrs in values:
if addrs.family == AF_INET:
for addr in addrs[-1]:
sockaddr = (addr, port)
for socktype, proto in socktype_proto:
result.append((AF_INET, socktype, proto, '', sockaddr))
elif addrs.family == AF_INET6:
for addr in addrs[-1]:
sockaddr = (addr, port, 0, 0)
for socktype, proto in socktype_proto:
result.append((AF_INET6, socktype, proto, '', sockaddr))
else:
raise error('Internal error in %s' % self.ares.gethostbyname)
except error:
if not result:
raise
return result
def gethostbyaddr(self, ip_address):
waiter = Waiter(self.hub)
self.ares.gethostbyaddr(waiter, ip_address)
try:
return waiter.get()
except ValueError, ex:
if not str(ex).startswith('illegal IP'):
raise
# socket.gethostbyaddr also accepts domain names; let's do that too
_ip_address = self.gethostbyname(ip_address, 0)
if _ip_address == ip_address:
raise
waiter.clear()
self.ares.gethostbyaddr(waiter, _ip_address)
return waiter.get()
def getnameinfo(self, sockaddr, flags):
waiter = Waiter(self.hub)
self.ares.getnameinfo(waiter, sockaddr, flags)
try:
result = waiter.get()
except ValueError, ex:
if not str(ex).startswith('illegal IP'):
raise
# socket.getnameinfo also accepts domain names; let's do that too
_ip_address = self.gethostbyname(sockaddr[0], 0)
if _ip_address == sockaddr[0]:
raise
waiter.clear()
self.ares.getnameinfo(waiter, (_ip_address, ) + sockaddr[1:], flags)
result = waiter.get()
if result[1] is None:
return (result[0], str(sockaddr[1])) + result[2:]
return result
class Values(object):
# helper to collect multiple values; ignore errors unless nothing has succeeded
# QQQ could probable be moved somewhere - hub.py?
__slots__ = ['count', 'values', 'error', 'waiter']
def __init__(self, count=1):
self.count = count
self.values = []
self.error = None
self.waiter = Waiter()
def __call__(self, source):
self.count -= 1
if source.exception is None:
self.values.append(source.value)
else:
self.error = source.exception
if self.count <= 0:
self.waiter.switch()
def get(self):
self.waiter.get()
if self.values:
return self.values
else:
raise self.error
......@@ -35,6 +35,9 @@ as well as the constants from :mod:`socket` module are imported into this module
__implements__ = ['create_connection',
'getaddrinfo',
'gethostbyname',
'gethostbyname_ex',
'gethostbyaddr',
'getnameinfo',
'socket',
'SocketType',
'fromfd',
......@@ -75,7 +78,6 @@ __imports__ = ['error',
import sys
import time
import re
is_windows = sys.platform == 'win32'
......@@ -122,14 +124,9 @@ for name in __socket__.__all__:
del name, value
# XXX: implement blocking functions that are not yet implemented
from gevent.hub import getcurrent, get_hub, Waiter
from gevent.hub import get_hub
from gevent.timeout import Timeout
_ip4_re = re.compile('^[\d\.]+$')
## XXX wait() no longer used by socket class
def wait(io, timeout=None, timeout_exc=timeout('timed out')):
"""Block the current greenlet until *io* is ready.
......@@ -148,7 +145,6 @@ def wait(io, timeout=None, timeout_exc=timeout('timed out')):
if timeout is not None:
timeout.cancel()
# rename "io" to "watcher" because wait() works with any watcher
# make wait() a Hub's method
def wait_read(fileno, timeout=None, timeout_exc=timeout('timed out')):
......@@ -343,11 +339,12 @@ class socket(object):
setattr(self, method, dummy)
def connect(self, address):
#if isinstance(address, tuple) and len(address) == 2:
# address = gethostbyname(address[0]), address[1]
if self.timeout == 0.0:
return self._sock.connect(address)
sock = self._sock
if isinstance(address, tuple):
r = getaddrinfo(address[0], address[1], sock.family, sock.type, sock.proto)
address = r[0][-1]
if self.timeout is not None:
timer = Timeout.start_new(self.timeout, timeout('timed out'))
else:
......@@ -630,46 +627,24 @@ def create_connection(address, timeout=_GLOBAL_DEFAULT_TIMEOUT, source_address=N
raise error("getaddrinfo returns an empty list")
def gethostbyname(host):
return _socket.gethostbyname(host)
# artificial limitations that regular gethostbyname has
if host is None:
raise TypeError('gethostbyname() argument must be string')
host = host.encode('ascii')
res = getaddrinfo(host, 0)
if res:
return res[0][4][0]
def gethostbyname(hostname):
return get_hub().resolver.gethostbyname(hostname)
def _sync_call(function, *args):
waiter = Waiter()
request = function(waiter.switch_args, *args)
try:
result, request_error = waiter.get()
except:
if request is not None:
request.cancel()
raise
if request_error is None:
return result
raise request_error
def gethostbyname_ex(hostname):
return get_hub().resolver.gethostbyname_ex(hostname)
def getaddrinfo(host, port, family=0, socktype=0, proto=0, flags=0):
return _socket.getaddrinfo(host, port, family, socktype, proto, flags)
dns = get_hub().reactor.dns
if dns is None:
return _socket.getaddrinfo(host, port, family, socktype, proto, flags)
else:
return _sync_call(dns.getaddrinfo, host, port, family, socktype, proto, flags)
return get_hub().resolver.getaddrinfo(host, port, family, socktype, proto, flags)
def resolve_ipv4(name, flags=0):
dns = get_hub().reactor.dns
if dns is None:
raise IOError('dns is not available')
else:
return _sync_call(dns.resolve_ipv4, name, flags)
def gethostbyaddr(ip_address):
return get_hub().resolver.gethostbyaddr(ip_address)
def getnameinfo(sockaddr, flags):
return get_hub().resolver.getnameinfo(sockaddr, flags)
try:
......
......@@ -15,16 +15,40 @@ from distutils.command import build_ext
__version__ = re.search("__version__\s*=\s*'(.*)'", open('gevent/__init__.py').read(), re.M).group(1)
assert __version__
libev_embed = os.path.exists('libev')
cares_embed = os.path.exists('c-ares')
defines = [('EV_STANDALONE', '1'),
('EV_COMMON', '')]
cython_output = 'gevent/core.c'
gcc_options = ['-Wno-unused-variable', '-Wno-unused-result'] # disable warnings from ev.c
defines = []
include_dirs = []
libraries = []
gcc_options = []
cares_configure_command = './configure CONFIG_COMMANDS= CONFIG_FILES='
if libev_embed:
defines += [('EV_STANDALONE', '1'),
('EV_COMMON', ''), # we don't use void* data
# libev watchers that we don't use currently:
('EV_STAT_ENABLE', '0'),
('EV_CHECK_ENABLE', '0'),
('EV_FORK_ENABLE', '0'),
('EV_CLEANUP_ENABLE', '0'),
('EV_EMBED_ENABLE', '0'),
('EV_ASYNC_ENABLE', '0'),
("EV_PERIODIC_ENABLE", '0'),
("EV_CHILD_ENABLE", '0')]
include_dirs += ['libev']
gcc_options += ['-Wno-unused-variable', '-Wno-unused-result'] # disable warnings from ev.c
if cares_embed:
include_dirs += ['c-ares']
defines += [('HAVE_CONFIG_H', '')]
if sys.platform == 'win32':
libraries = ['ws2_32']
defines += [('FD_SETSIZE', '1024'), ('GEVENT_WINDOWS', '1')]
else:
libraries = []
libraries += ['ws2_32']
defines += [('FD_SETSIZE', '1024'), ('_WIN32', '1')]
def has_changed(destination, *source):
......@@ -57,15 +81,21 @@ class my_build_ext(build_ext.build_ext):
def compile_cython(self):
if has_changed('gevent/core.pyx', 'gevent/core_.pyx', 'gevent/libev.pxd'):
system('m4 gevent/core_.pyx > core.pyx && mv core.pyx gevent/')
if has_changed(cython_output, 'gevent/*.p*x*', 'gevent/*.h', 'gevent/*.c'):
if 0 == system('%s gevent/core.pyx -o core.c && mv core.c gevent/' % (self.cython, )):
if has_changed(cython_output, 'gevent/*.p*x*', 'gevent/*.h', 'gevent/*.c', 'libev/*.c', 'c-ares/*.c'):
if 0 == system('%s gevent/core.pyx -o core.c && mv core.* gevent/' % (self.cython, )):
data = open(cython_output).read()
data = data.replace('\n\n#endif /* Py_PYTHON_H */', '\n#include "callbacks.c"\n#endif /* Py_PYTHON_H */')
open(cython_output, 'w').write(data)
def configure_cares(self):
if sys.platform != 'win32' and not os.path.exists('c-ares/ares_config.h'):
os.system('cd c-ares && %s' % cares_configure_command)
def build_extension(self, ext):
if self.cython:
self.compile_cython()
if cares_embed:
self.configure_cares()
try:
if self.compiler.compiler[0] == 'gcc' and '-Wall' in self.compiler.compiler and not gevent_core.extra_compile_args:
gevent_core.extra_compile_args = gcc_options
......@@ -102,7 +132,7 @@ class my_build_ext(build_ext.build_ext):
gevent_core = Extension(name='gevent.core',
sources=[cython_output],
include_dirs=['libev'],
include_dirs=include_dirs,
libraries=libraries,
define_macros=defines)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment