Commit b2129230 authored by Denis Bilenko's avatar Denis Bilenko

test__socket_dns.py: simplify and fix some bugs

do not sort the results of getaddrinfo() because the order is actually important here
parent 27b60ba6
......@@ -10,9 +10,48 @@ import gevent
import gevent.socket as gevent_socket
# these are the exceptions that can have different values in gevent.socket compared to original socket
# e.g. in gaierror ares error codes and messages are used
MISMATCH_EXCEPTIONS = (TypeError, socket.gaierror, socket.herror)
# also test: '<broadcast>'
accept_results = [
# Python's socketmodule.c randomly chooses between gaierror and herror when raising an exception
# There are also a number of error code that are mapped to ARES_ENOTFOUND
("gaierror(4, 'ARES_ENOTFOUND: Domain name not found')",
"herror(1, 'Unknown host')"),
("gaierror(4, 'ARES_ENOTFOUND: Domain name not found')",
"gaierror(-2, 'Name or service not known')"),
("gaierror(4, 'ARES_ENOTFOUND: Domain name not found')",
"gaierror(-5, 'No address associated with hostname')"),
("gaierror(1, 'ARES_ENODATA: DNS server returned answer with no data')",
"herror(4, 'No address associated with name')"),
("gaierror(1, 'ARES_ENODATA: DNS server returned answer with no data')",
"gaierror(-5, 'No address associated with hostname')"),
# _socket.gethostbyname_ex('\x00') checks for zeroes and raises TypeError
# but it's not worth the trouble
("gaierror(1, 'ARES_ENODATA: DNS server returned answer with no data')",
"TypeError('must be string without null bytes, not str',)",),
# in some cases gevent error messages are better:
("gaierror(-1, 'Bad value for ai_flags: 0x34fb5e0')",
"gaierror(-1, 'Bad value for ai_flags')"),
("gaierror(-8, 'Invalid value for port: -1')",
"gaierror(-8, 'Servname not supported for ai_socktype')"),
("gaierror(-8, 'Invalid value for port: 65536')",
"gaierror(-8, 'Servname not supported for ai_socktype')"),
# in some cases the error is different, but that's OK
("error('sockaddr resolved to multiple addresses',)",
"TypeError('must be string, not None',)")
]
assert gevent_socket.gaierror is socket.gaierror
assert gevent_socket.error is socket.error
......@@ -42,9 +81,7 @@ def _run(function, *args):
result = function(*args)
assert not isinstance(result, Exception), repr(result)
return result
except MISMATCH_EXCEPTIONS:
return sys.exc_info()[1]
except (socket.error, UnicodeError):
except Exception:
return sys.exc_info()[1]
......@@ -52,8 +89,8 @@ def log_fcall(function, args):
args = repr(args)
if args.endswith(',)'):
args = args[:-2] + ')'
log('\n%s.%s%s',
function.__module__.replace('gevent.socket', ' gevent'),
log('\n%7s.%s%s',
function.__module__.replace('gevent.socket', 'gevent'),
function.__name__,
args,
newline=False)
......@@ -80,42 +117,11 @@ def log_call(result, function, *args):
log_fresult(result)
def sort_lists(result):
if isinstance(result, list):
return sorted(result)
if isinstance(result, tuple):
return tuple(sort_lists(x) for x in result)
return result
class TestCase(greentest.TestCase):
__timeout__ = 15
def _test(self, func, *args, **kwargs):
expected = kwargs.pop('expected', None)
assert_equal = kwargs.pop('assert_equal', None)
assert assert_equal in (False, True, None, "type"), repr(assert_equal)
assert not kwargs, kwargs
if assert_equal is not None:
old_assert_equal = self.assert_equal
self.assert_equal = assert_equal
try:
if expected is None:
return self._test_against_real(func, args)
else:
return self._test_against_expected(expected, func, args)
finally:
if assert_equal is not None:
self.assert_equal = old_assert_equal
def _test_against_expected(self, expected, func, args):
gevent_func = getattr(gevent_socket, func)
result = run(gevent_func, *args)
self.assertEqualResults(expected, result)
return result
def _test_against_real(self, func, args):
def _test(self, func, *args):
gevent_func = getattr(gevent_socket, func)
real_func = getattr(socket, func)
result = run(gevent_func, *args)
......@@ -130,58 +136,25 @@ class TestCase(greentest.TestCase):
self.assertEqualResults(real_result, result)
return result
def _test_all(self, hostname, assert_equal=None):
if assert_equal is not None:
old_assert_equal = self.assert_equal
self.assert_equal = assert_equal
try:
self._test('getaddrinfo', hostname, 'http')
ipaddr = self._test('gethostbyname', hostname)
self._test('gethostbyname_ex', hostname)
if not isinstance(ipaddr, Exception):
self._test('gethostbyaddr', ipaddr)
self._test('gethostbyaddr', hostname)
self._test('getnameinfo', (hostname, 80), 0)
finally:
if assert_equal is not None:
self.assert_equal = old_assert_equal
def _test_all(self, hostname):
self._test('getaddrinfo', hostname, 'http')
ipaddr = self._test('gethostbyname', hostname)
self._test('gethostbyname_ex', hostname)
if not isinstance(ipaddr, Exception):
self._test('gethostbyaddr', ipaddr)
self._test('gethostbyaddr', hostname)
self._test('getnameinfo', (hostname, 80), 0)
def assertEqualResults(self, real_result, gevent_result):
if type(real_result) is socket.herror and type(gevent_result) is socket.gaierror:
# gevent never raises herror while stdlib socket occasionally does
# do not consider that a failure
good = True
elif type(real_result) is type(gevent_result) and type(real_result) in MISMATCH_EXCEPTIONS:
good = True
else:
good = False
try:
real_result = sort_lists(real_result)
gevent_result = sort_lists(gevent_result)
if isinstance(real_result, BaseException) and isinstance(gevent_result, BaseException):
self.assertEqual(repr(real_result), repr(gevent_result))
else:
self.assertEqual(real_result, gevent_result)
except AssertionError:
ex = sys.exc_info()[1]
if good or self.assert_equal is not True:
self.warning("WARNING in %s: %s" % (self.testcasename, ex))
else:
raise
if self.assert_equal == 'type':
self.assertEqual(type(real_result), type(gevent_result))
def assertTypeEqual(self, real_result, gevent_result):
if self.assert_equal:
if type(real_result) != type(gevent_result):
raise AssertionError('%r != %r' % (real_result, gevent_result))
def warning(self, warning, cache=set()):
if warning not in cache:
cache.add(warning)
log(warning)
assert_equal = True
if type(real_result) is TypeError and type(gevent_result) is TypeError:
return
real_result = repr(real_result)
gevent_result = repr(gevent_result)
if real_result == gevent_result:
return
if (gevent_result, real_result) in accept_results:
return
raise AssertionError('%s != %s' % (gevent_result, real_result))
def get_test(ip, host):
......@@ -200,17 +173,15 @@ class TestLocal(TestCase):
def test_hostname(self):
assert socket.gethostname is gevent_socket.gethostname
hostname = socket.gethostname()
self.assert_equal = False # XXX 'types'
self._test_all(hostname)
def test_localhost_getaddrinfo(self):
# certain tests in test_patched_socket.py only work if getaddrinfo('localhost') does not switch
# (e.g. NetworkConnectionAttributesTest.testSourceAddress)
self.switch_expected = False
gevent_socket.getaddrinfo('localhost', 80)
def test_localhost(self):
# socket.gethostbyname_ex returns
# ('localhost.localdomain',
# ['localhost', 'ip6-localhost', 'ip6-loopback', 'localhost'],
# ['127.0.0.1', '127.0.0.1'])
# while gevent returns
# ('localhost.localdomain', ['localhost'], ['127.0.0.1'])
self.assert_equal = 'type'
self._test_all('localhost')
def test_127_0_0_1(self):
......@@ -220,15 +191,16 @@ class TestLocal(TestCase):
self._test_all('1.2.3.4')
def test_notexistent(self):
self.switch_expected = True
self._test_all('notexistent')
# <broadcast>, 127.0.0.1 special-cased in socketmodule.c?
def test_None(self):
self.switch_expected = False
self._test_all(None)
def test_25(self):
self.switch_expected = False
self._test_all(25)
try:
......@@ -261,23 +233,39 @@ class TestFamily(TestCase):
cls._result = getattr(socket, 'getaddrinfo')('gevent.org', None)
return cls._result
def assert_error(self, error, function, *args):
try:
result = function(*args)
raise AssertionError('%s: Expected to raise %s, instead returned %r' % (function, error, result))
except Exception, ex:
if isinstance(error, basestring):
repr_error = error
else:
repr_error = repr(error)
if type(ex) is not type(error):
raise
if repr(ex) == repr_error:
return
raise
def test_inet(self):
self._test('getaddrinfo', 'gevent.org', None, socket.AF_INET, expected=self.getresult())
self.assertEqual(gevent_socket.getaddrinfo('gevent.org', None, socket.AF_INET), self.getresult())
def test_inet6(self):
expected = socket.gaierror(1, 'ARES_ENODATA: DNS server returned answer with no data')
self._test('getaddrinfo', 'gevent.org', None, socket.AF_INET6, expected=expected)
self.assert_error(expected, gevent_socket.getaddrinfo, 'gevent.org', None, socket.AF_INET6)
def test_unspec(self):
self._test('getaddrinfo', 'gevent.org', None, socket.AF_UNSPEC, expected=self.getresult())
self.assertEqual(gevent_socket.getaddrinfo('gevent.org', None, socket.AF_UNSPEC), self.getresult())
def test_badvalue(self):
expected = socket.gaierror(5, 'ARES_ENOTIMP: DNS server does not implement requested operation')
self._test('getaddrinfo', 'gevent.org', None, 255, expected=expected)
self._test('getaddrinfo', 'gevent.org', None, 255000, expected=expected)
self._test('getaddrinfo', 'gevent.org', None, -1, expected=expected)
self.switch_expected = False
self._test('getaddrinfo', 'gevent.org', None, 255)
self._test('getaddrinfo', 'gevent.org', None, 255000)
self._test('getaddrinfo', 'gevent.org', None, -1)
def test_badtype(self):
self.switch_expected = False
self._test('getaddrinfo', 'gevent.org', 'x')
......@@ -291,6 +279,9 @@ class Test_getaddrinfo(TestCase):
def test_80(self):
self._test_getaddrinfo('gevent.org', 80)
def test_int_string(self):
self._test_getaddrinfo('gevent.org', '80')
def test_0(self):
self._test_getaddrinfo('gevent.org', 0)
......@@ -301,7 +292,7 @@ class Test_getaddrinfo(TestCase):
self._test_getaddrinfo('myhost.mytld', 53)
def test_notexistent_dot_com(self):
self._test_getaddrinfo('sdfsdfgu5e66098032453245wfdggd.com')
self._test_getaddrinfo('sdfsdfgu5e66098032453245wfdggd.com', 80)
def test1(self):
return self._test_getaddrinfo('gevent.org', 52, socket.AF_UNSPEC, socket.SOCK_STREAM, 0, 0)
......@@ -354,9 +345,11 @@ class TestIPv6(TestCase):
host = 'aaaa.test-ipv6.com'
def test(self):
#self.getaddrinfo_args = [(), (AF_UNSPEC, ), (AF_INET, ), (AF_INET6, )]
self._test_all(self.host)
def test_(self):
self._test('getaddrinfo', self.host, 'http')
def test_inet(self):
self._test('getaddrinfo', self.host, None, socket.AF_INET)
......@@ -373,13 +366,6 @@ class TestIPv6_ds(TestIPv6):
host = 'ds.test-ipv6.com'
class TestBadPort(TestCase):
def test(self):
self.PORTS = ['xxxxxx']
self._test_all('gevent.org')
class TestBadIP(TestCase):
def test_name(self):
......@@ -401,11 +387,9 @@ class Test_getnameinfo(TestCase):
def test_NOFQDN(self):
# I get ('localhost', 'www') with _socket but ('localhost.localdomain', 'www') with gevent.socket
self.assert_equal = 'type'
self._test('getnameinfo', ('127.0.0.1', 80), socket.NI_NOFQDN)
def test_NUMERICHOST(self):
#self.assert_equal = False
self._test('getnameinfo', ('gevent.org', 80), 0)
self._test('getnameinfo', ('gevent.org', 80), socket.NI_NUMERICHOST)
......@@ -438,7 +422,6 @@ class Test_getnameinfo_fail(TestCase):
self._test('getnameinfo', ('www.gevent.org', -1), 0)
self._test('getnameinfo', ('www.gevent.org', None), 0)
self._test('getnameinfo', ('www.gevent.org', 'x'), 0)
self.assert_equal = False
self._test('getnameinfo', ('www.gevent.org', 65536), 0)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment