Commit b1989d93 authored by Romain Courteaud's avatar Romain Courteaud

MORE TESTS

parent da87eced
...@@ -8,6 +8,7 @@ from urlchecker_dns import ( ...@@ -8,6 +8,7 @@ from urlchecker_dns import (
buildResolver, buildResolver,
queryDNS, queryDNS,
getReachableResolverList, getReachableResolverList,
getDomainIpDict,
) )
from urlchecker_status import logStatus from urlchecker_status import logStatus
import mock import mock
...@@ -499,6 +500,104 @@ class UrlCheckerDNSTestCase(unittest.TestCase): ...@@ -499,6 +500,104 @@ class UrlCheckerDNSTestCase(unittest.TestCase):
assert self.db.NetworkChange.get().state == "open" assert self.db.NetworkChange.get().state == "open"
assert self.db.NetworkChange.get().status_id == status_id assert self.db.NetworkChange.get().status_id == status_id
################################################
# getDomainIpDict
################################################
def test_getDomainIpDict_mutipleDomainIp(self):
resolver_ip_list = ["127.0.0.1"]
domain_list = ["example.org"]
rdtype = "A"
status_id = logStatus(self.db, "foo")
with mock.patch(
"urlchecker_dns.dns.resolver.Resolver.query"
) as mock_query:
mock_query.return_value = [
MockAnswer("4.3.2.1"),
MockAnswer("1.2.3.4"),
]
result = getDomainIpDict(
self.db, status_id, resolver_ip_list, domain_list, rdtype
)
assert mock_query.call_count == 1
mock_query.assert_called_with(
domain_list[0], rdtype, raise_on_no_answer=False
)
assert result == {
"1.2.3.4": ["example.org"],
"4.3.2.1": ["example.org"],
}
assert self.db.DnsChange.select().count() == 1
assert self.db.NetworkChange.select().count() == 0
def test_getDomainIpDict_mutipleDNSIp(self):
resolver_ip_list = ["127.0.0.1", "127.0.0.2"]
domain_list = ["example.org"]
rdtype = "A"
status_id = logStatus(self.db, "foo")
with mock.patch(
"urlchecker_dns.dns.resolver.Resolver.query"
) as mock_query:
mock_query.side_effect = [
[MockAnswer("4.3.2.1"), MockAnswer("1.2.3.4")],
[MockAnswer("4.3.2.1"), MockAnswer("1.2.3.5")],
]
result = getDomainIpDict(
self.db, status_id, resolver_ip_list, domain_list, rdtype
)
assert mock_query.call_count == 2
assert mock_query.call_args_list[0] == mock.call(
domain_list[0], rdtype, raise_on_no_answer=False
)
assert mock_query.call_args_list[1] == mock.call(
domain_list[0], rdtype, raise_on_no_answer=False
)
assert result == {
"1.2.3.4": ["example.org"],
"4.3.2.1": ["example.org"],
"1.2.3.5": ["example.org"],
}
assert self.db.DnsChange.select().count() == 2
assert self.db.NetworkChange.select().count() == 0
def test_getDomainIpDict_mutipleDomain(self):
resolver_ip_list = ["127.0.0.1"]
domain_list = ["example.org", "foo.example.org"]
rdtype = "A"
status_id = logStatus(self.db, "foo")
with mock.patch(
"urlchecker_dns.dns.resolver.Resolver.query"
) as mock_query:
mock_query.side_effect = [
[MockAnswer("4.3.2.1"), MockAnswer("1.2.3.4")],
[MockAnswer("4.3.2.1"), MockAnswer("1.2.3.5")],
]
result = getDomainIpDict(
self.db, status_id, resolver_ip_list, domain_list, rdtype
)
assert mock_query.call_count == 2
assert mock_query.call_args_list[0] == mock.call(
domain_list[0], rdtype, raise_on_no_answer=False
)
assert mock_query.call_args_list[1] == mock.call(
domain_list[1], rdtype, raise_on_no_answer=False
)
assert result == {
"1.2.3.4": ["example.org"],
"4.3.2.1": ["example.org", "foo.example.org"],
"1.2.3.5": ["foo.example.org"],
}
assert self.db.DnsChange.select().count() == 2
assert self.db.NetworkChange.select().count() == 0
def suite(): def suite():
suite = unittest.TestSuite() suite = unittest.TestSuite()
......
...@@ -6,7 +6,7 @@ from urlchecker_status import logStatus ...@@ -6,7 +6,7 @@ from urlchecker_status import logStatus
from urlchecker_dns import ( from urlchecker_dns import (
getReachableResolverList, getReachableResolverList,
expandDomainList, expandDomainList,
getServerIpDict, getDomainIpDict,
) )
from urlchecker_http import getUrlHostname, checkHttpStatus from urlchecker_http import getUrlHostname, checkHttpStatus
from urlchecker_network import isTcpPortOpen from urlchecker_network import isTcpPortOpen
...@@ -52,7 +52,7 @@ class WebBot: ...@@ -52,7 +52,7 @@ class WebBot:
# Get the list of server to check # Get the list of server to check
# XXX Check DNS expiration # XXX Check DNS expiration
server_ip_dict = getServerIpDict( server_ip_dict = getDomainIpDict(
self._db, status_id, resolver_ip_list, domain_list, "A" self._db, status_id, resolver_ip_list, domain_list, "A"
) )
......
...@@ -107,7 +107,7 @@ def expandDomainList(domain_list): ...@@ -107,7 +107,7 @@ def expandDomainList(domain_list):
return domain_list return domain_list
def getServerIpDict(db, status_id, resolver_ip_list, domain_list, rdtype): def getDomainIpDict(db, status_id, resolver_ip_list, domain_list, rdtype):
server_ip_dict = {} server_ip_dict = {}
for domain_text in domain_list: for domain_text in domain_list:
for resolver_ip in resolver_ip_list: for resolver_ip in resolver_ip_list:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment