Commit b8bd2d34 authored by Romain Courteaud's avatar Romain Courteaud

Lint

parent 072a67f9
...@@ -23,28 +23,34 @@ class UrlCheckerDNSTestCase(unittest.TestCase): ...@@ -23,28 +23,34 @@ class UrlCheckerDNSTestCase(unittest.TestCase):
def test_logDnsQuery_insertFirst(self): def test_logDnsQuery_insertFirst(self):
domain = "http://example.org" domain = "http://example.org"
resolver_ip = "127.0.0.1" resolver_ip = "127.0.0.1"
rdtype = 'foo' rdtype = "foo"
answer_list = ['4.3.2.1', '1.2.3.4'] answer_list = ["4.3.2.1", "1.2.3.4"]
status_id = logStatus(self.db, "foo") status_id = logStatus(self.db, "foo")
result = logDnsQuery(self.db, status_id, resolver_ip, domain, rdtype, answer_list) result = logDnsQuery(
self.db, status_id, resolver_ip, domain, rdtype, answer_list
)
assert self.db.DnsChange.select().count() == 1 assert self.db.DnsChange.select().count() == 1
assert self.db.DnsChange.get().resolver_ip == resolver_ip assert self.db.DnsChange.get().resolver_ip == resolver_ip
assert self.db.DnsChange.get().domain == domain assert self.db.DnsChange.get().domain == domain
assert self.db.DnsChange.get().rdtype == rdtype assert self.db.DnsChange.get().rdtype == rdtype
assert self.db.DnsChange.get().response == '1.2.3.4, 4.3.2.1' assert self.db.DnsChange.get().response == "1.2.3.4, 4.3.2.1"
assert self.db.DnsChange.get().status_id == status_id assert self.db.DnsChange.get().status_id == status_id
assert result == status_id assert result == status_id
def test_logDnsQuery_insertOnlyOnePerStatusIdIPUrl(self): def test_logDnsQuery_insertOnlyOnePerStatusIdIPUrl(self):
domain = "http://example.org" domain = "http://example.org"
resolver_ip = "127.0.0.1" resolver_ip = "127.0.0.1"
rdtype = 'foo' rdtype = "foo"
answer_list = ['4.3.2.1', '1.2.3.4'] answer_list = ["4.3.2.1", "1.2.3.4"]
status_id = logStatus(self.db, "foo") status_id = logStatus(self.db, "foo")
result = logDnsQuery(self.db, status_id, resolver_ip, domain, rdtype, answer_list) result = logDnsQuery(
answer_list.extend('127.0.0.1') self.db, status_id, resolver_ip, domain, rdtype, answer_list
)
answer_list.extend("127.0.0.1")
try: try:
logDnsQuery(self.db, status_id, resolver_ip, domain, rdtype, answer_list) logDnsQuery(
self.db, status_id, resolver_ip, domain, rdtype, answer_list
)
except peewee.IntegrityError: except peewee.IntegrityError:
assert self.db.DnsChange.select().count() == 1 assert self.db.DnsChange.select().count() == 1
assert self.db.DnsChange.get().status_id == status_id assert self.db.DnsChange.get().status_id == status_id
...@@ -54,18 +60,22 @@ class UrlCheckerDNSTestCase(unittest.TestCase): ...@@ -54,18 +60,22 @@ class UrlCheckerDNSTestCase(unittest.TestCase):
def test_logDnsQuery_skipIdenticalPreviousValues(self): def test_logDnsQuery_skipIdenticalPreviousValues(self):
domain = "http://example.org" domain = "http://example.org"
resolver_ip = "127.0.0.1" resolver_ip = "127.0.0.1"
rdtype = 'foo' rdtype = "foo"
answer_list = ['4.3.2.1', '1.2.3.4'] answer_list = ["4.3.2.1", "1.2.3.4"]
status_id = logStatus(self.db, "foo") status_id = logStatus(self.db, "foo")
result = logDnsQuery(self.db, status_id, resolver_ip, domain, rdtype, answer_list) result = logDnsQuery(
self.db, status_id, resolver_ip, domain, rdtype, answer_list
)
status_id_2 = logStatus(self.db, "foo") status_id_2 = logStatus(self.db, "foo")
result_2 = logDnsQuery(self.db, status_id_2, resolver_ip, domain, rdtype, answer_list) result_2 = logDnsQuery(
self.db, status_id_2, resolver_ip, domain, rdtype, answer_list
)
assert self.db.DnsChange.select().count() == 1 assert self.db.DnsChange.select().count() == 1
assert self.db.DnsChange.get().resolver_ip == resolver_ip assert self.db.DnsChange.get().resolver_ip == resolver_ip
assert self.db.DnsChange.get().domain == domain assert self.db.DnsChange.get().domain == domain
assert self.db.DnsChange.get().rdtype == rdtype assert self.db.DnsChange.get().rdtype == rdtype
assert self.db.DnsChange.get().response == '1.2.3.4, 4.3.2.1' assert self.db.DnsChange.get().response == "1.2.3.4, 4.3.2.1"
assert self.db.DnsChange.get().status_id == status_id assert self.db.DnsChange.get().status_id == status_id
assert result == status_id assert result == status_id
assert result == result_2 assert result == result_2
...@@ -73,46 +83,112 @@ class UrlCheckerDNSTestCase(unittest.TestCase): ...@@ -73,46 +83,112 @@ class UrlCheckerDNSTestCase(unittest.TestCase):
def test_logDnsQuery_insertWhenDifferentAnswer(self): def test_logDnsQuery_insertWhenDifferentAnswer(self):
domain = "http://example.org" domain = "http://example.org"
resolver_ip = "127.0.0.1" resolver_ip = "127.0.0.1"
rdtype = 'foo' rdtype = "foo"
answer_list = ['4.3.2.1', '1.2.3.4'] answer_list = ["4.3.2.1", "1.2.3.4"]
status_id = logStatus(self.db, "foo") status_id = logStatus(self.db, "foo")
result = logDnsQuery(self.db, status_id, resolver_ip, domain, rdtype, answer_list) result = logDnsQuery(
answer_list_2 = ['4.3.2.1', '1.2.3.4', '0.0.0.0'] self.db, status_id, resolver_ip, domain, rdtype, answer_list
)
answer_list_2 = ["4.3.2.1", "1.2.3.4", "0.0.0.0"]
status_id_2 = logStatus(self.db, "foo") status_id_2 = logStatus(self.db, "foo")
result_2 = logDnsQuery(self.db, status_id_2, resolver_ip, domain, rdtype, answer_list_2) result_2 = logDnsQuery(
self.db, status_id_2, resolver_ip, domain, rdtype, answer_list_2
)
assert result_2 != result assert result_2 != result
assert self.db.DnsChange.select().count() == 2 assert self.db.DnsChange.select().count() == 2
assert self.db.DnsChange.get(self.db.DnsChange.status == status_id).resolver_ip == resolver_ip assert (
assert self.db.DnsChange.get(self.db.DnsChange.status == status_id).domain == domain self.db.DnsChange.get(
assert self.db.DnsChange.get(self.db.DnsChange.status == status_id).rdtype == rdtype self.db.DnsChange.status == status_id
assert self.db.DnsChange.get(self.db.DnsChange.status == status_id).response == '1.2.3.4, 4.3.2.1' ).resolver_ip
assert self.db.DnsChange.get(self.db.DnsChange.status == status_id).status_id == status_id == resolver_ip
)
assert self.db.DnsChange.get(self.db.DnsChange.status == status_id_2).resolver_ip == resolver_ip assert (
assert self.db.DnsChange.get(self.db.DnsChange.status == status_id_2).domain == domain self.db.DnsChange.get(self.db.DnsChange.status == status_id).domain
assert self.db.DnsChange.get(self.db.DnsChange.status == status_id_2).rdtype == rdtype == domain
assert self.db.DnsChange.get(self.db.DnsChange.status == status_id_2).response == '0.0.0.0, 1.2.3.4, 4.3.2.1' )
assert self.db.DnsChange.get(self.db.DnsChange.status == status_id_2).status_id == status_id_2 assert (
self.db.DnsChange.get(self.db.DnsChange.status == status_id).rdtype
== rdtype
)
assert (
self.db.DnsChange.get(
self.db.DnsChange.status == status_id
).response
== "1.2.3.4, 4.3.2.1"
)
assert (
self.db.DnsChange.get(
self.db.DnsChange.status == status_id
).status_id
== status_id
)
assert (
self.db.DnsChange.get(
self.db.DnsChange.status == status_id_2
).resolver_ip
== resolver_ip
)
assert (
self.db.DnsChange.get(
self.db.DnsChange.status == status_id_2
).domain
== domain
)
assert (
self.db.DnsChange.get(
self.db.DnsChange.status == status_id_2
).rdtype
== rdtype
)
assert (
self.db.DnsChange.get(
self.db.DnsChange.status == status_id_2
).response
== "0.0.0.0, 1.2.3.4, 4.3.2.1"
)
assert (
self.db.DnsChange.get(
self.db.DnsChange.status == status_id_2
).status_id
== status_id_2
)
def test_logDnsQuery_insertDifferentUrl(self): def test_logDnsQuery_insertDifferentUrl(self):
domain = "http://example.org" domain = "http://example.org"
domain_2 = domain + '.' domain_2 = domain + "."
resolver_ip = "127.0.0.1" resolver_ip = "127.0.0.1"
resolver_ip_2 = resolver_ip + '1' resolver_ip_2 = resolver_ip + "1"
rdtype = 'foo' rdtype = "foo"
rdtype_2 = rdtype + 'bar' rdtype_2 = rdtype + "bar"
answer_list = ['4.3.2.1', '1.2.3.4'] answer_list = ["4.3.2.1", "1.2.3.4"]
status_id = logStatus(self.db, "foo") status_id = logStatus(self.db, "foo")
result = logDnsQuery(self.db, status_id, resolver_ip, domain, rdtype, answer_list) result = logDnsQuery(
logDnsQuery(self.db, status_id, resolver_ip_2, domain, rdtype, answer_list) self.db, status_id, resolver_ip, domain, rdtype, answer_list
logDnsQuery(self.db, status_id, resolver_ip, domain_2, rdtype, answer_list) )
logDnsQuery(self.db, status_id, resolver_ip, domain, rdtype_2, answer_list) logDnsQuery(
logDnsQuery(self.db, status_id, resolver_ip_2, domain_2, rdtype, answer_list) self.db, status_id, resolver_ip_2, domain, rdtype, answer_list
logDnsQuery(self.db, status_id, resolver_ip_2, domain, rdtype_2, answer_list) )
logDnsQuery(self.db, status_id, resolver_ip, domain_2, rdtype_2, answer_list) logDnsQuery(
logDnsQuery(self.db, status_id, resolver_ip_2, domain_2, rdtype_2, answer_list) self.db, status_id, resolver_ip, domain_2, rdtype, answer_list
)
logDnsQuery(
self.db, status_id, resolver_ip, domain, rdtype_2, answer_list
)
logDnsQuery(
self.db, status_id, resolver_ip_2, domain_2, rdtype, answer_list
)
logDnsQuery(
self.db, status_id, resolver_ip_2, domain, rdtype_2, answer_list
)
logDnsQuery(
self.db, status_id, resolver_ip, domain_2, rdtype_2, answer_list
)
logDnsQuery(
self.db, status_id, resolver_ip_2, domain_2, rdtype_2, answer_list
)
assert self.db.DnsChange.select().count() == 8 assert self.db.DnsChange.select().count() == 8
......
...@@ -82,9 +82,7 @@ def getResolverDict(db, status_id, resolver_ip_list): ...@@ -82,9 +82,7 @@ def getResolverDict(db, status_id, resolver_ip_list):
resolver_tuple_list = [x for x in resolver_dict.items()] resolver_tuple_list = [x for x in resolver_dict.items()]
for ip, resolver in resolver_tuple_list: for ip, resolver in resolver_tuple_list:
resolver_state = "open" resolver_state = "open"
answer_list = queryDNS( answer_list = queryDNS(db, status_id, ip, resolver, URL_TO_CHECK, "A")
db, status_id, ip, resolver, URL_TO_CHECK, "A"
)
if len(answer_list) == 0: if len(answer_list) == 0:
# We expect a valid response # We expect a valid response
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment