Commit a0412e9a authored by Romain Courteaud's avatar Romain Courteaud

whois: support of domain expiration

TODO:
* support more TLDs
parent 92bdae55
...@@ -43,6 +43,7 @@ setup( ...@@ -43,6 +43,7 @@ setup(
"peewee>2.10.1", "peewee>2.10.1",
"click>=7.0", "click>=7.0",
"dnspython", "dnspython",
"python-whois",
"miniupnpc", "miniupnpc",
], ],
extras_require={ extras_require={
......
...@@ -28,6 +28,11 @@ from .dns import ( ...@@ -28,6 +28,11 @@ from .dns import (
reportDnsQuery, reportDnsQuery,
packDns, packDns,
) )
from .domain import (
queryWhois,
reportWhoisQuery,
packDomain,
)
from .http import ( from .http import (
getRootUrl, getRootUrl,
getUrlHostname, getUrlHostname,
...@@ -73,6 +78,41 @@ def filterWarningStatus(status_dict, interval, not_critical_url_list): ...@@ -73,6 +78,41 @@ def filterWarningStatus(status_dict, interval, not_critical_url_list):
if not status_dict["bot_status"]: if not status_dict["bot_status"]:
del status_dict["bot_status"] del status_dict["bot_status"]
for i in range(len(status_dict["whois"]) - 1, -1, -1):
expiration_date = status_dict["whois"][i]["expiration_date"]
if (expiration_date is None) or (
(expiration_date is not None)
and (
(60 * 60 * 24 * 14)
< (
parsedate_to_datetime(expiration_date) - now
).total_seconds()
)
):
# Not handled whois entry hidden. Only check DNS warning in such case
# Warn 2 weeks before expiration
del status_dict["whois"][i]
else:
# Drop columns with too much info
del status_dict["whois"][i]["registrar"]
del status_dict["whois"][i]["whois_server"]
del status_dict["whois"][i]["creation_date"]
del status_dict["whois"][i]["updated_date"]
del status_dict["whois"][i]["name_servers"]
del status_dict["whois"][i]["whois_status"]
del status_dict["whois"][i]["emails"]
del status_dict["whois"][i]["dnssec"]
del status_dict["whois"][i]["name"]
del status_dict["whois"][i]["org"]
del status_dict["whois"][i]["address"]
del status_dict["whois"][i]["city"]
del status_dict["whois"][i]["state"]
del status_dict["whois"][i]["zipcode"]
del status_dict["whois"][i]["country"]
if not status_dict["whois"]:
del status_dict["whois"]
for i in range(len(status_dict["dns_server"]) - 1, -1, -1): for i in range(len(status_dict["dns_server"]) - 1, -1, -1):
state = status_dict["dns_server"][i]["state"] state = status_dict["dns_server"][i]["state"]
if state == "open": if state == "open":
...@@ -181,6 +221,24 @@ class WebBot: ...@@ -181,6 +221,24 @@ class WebBot:
public_suffix_list=self.config["PUBLIC_SUFFIX"].split(), public_suffix_list=self.config["PUBLIC_SUFFIX"].split(),
) )
def calculateWhoisDomainList(self, domain_list):
# Calculate the top domain for whois
domain_list = domain_list.copy()
domain_list.sort(key=lambda x: x.count("."))
i = 0
while i < len(domain_list):
base_domain = ".%s" % domain_list[i]
j = i + 1
while j < len(domain_list):
sub_domain = domain_list[j]
if sub_domain.endswith(base_domain):
domain_list.pop(j)
else:
j += 1
i += 1
return domain_list
def calculateNotCriticalUrlList(self): def calculateNotCriticalUrlList(self):
domain_list = self.config["DOMAIN"].split() domain_list = self.config["DOMAIN"].split()
url_list = self.config["URL"].split() url_list = self.config["URL"].split()
...@@ -208,6 +266,13 @@ class WebBot: ...@@ -208,6 +266,13 @@ class WebBot:
elapsed_moderate = float(self.config["ELAPSED_moderate"]) elapsed_moderate = float(self.config["ELAPSED_moderate"])
# logPlatform(self._db, __version__, status_id) # logPlatform(self._db, __version__, status_id)
# Get list of all domains
domain_list = self.calculateFullDomainList()
whois_domain_list = self.calculateWhoisDomainList(domain_list)
for whois_domain in whois_domain_list:
queryWhois(self._db, status_id, whois_domain)
# Calculate the resolver list # Calculate the resolver list
resolver_ip_list = getReachableResolverList( resolver_ip_list = getReachableResolverList(
self._db, status_id, self.config["NAMESERVER"].split(), timeout self._db, status_id, self.config["NAMESERVER"].split(), timeout
...@@ -215,9 +280,6 @@ class WebBot: ...@@ -215,9 +280,6 @@ class WebBot:
if not resolver_ip_list: if not resolver_ip_list:
return return
# Get list of all domains
domain_list = self.calculateFullDomainList()
# Get the list of server to check # Get the list of server to check
# XXX Check DNS expiration # XXX Check DNS expiration
server_ip_dict = getDomainIpDict( server_ip_dict = getDomainIpDict(
...@@ -293,6 +355,42 @@ class WebBot: ...@@ -293,6 +355,42 @@ class WebBot:
{"text": bot_status.text, "date": rfc822(bot_status.timestamp)} {"text": bot_status.text, "date": rfc822(bot_status.timestamp)}
) )
domain_list = self.calculateFullDomainList()
whois_domain_list = self.calculateWhoisDomainList(domain_list)
# Report list of Whois query
query = reportWhoisQuery(self._db, domain=whois_domain_list)
result_dict["whois"] = []
for domain_change in query.dicts().iterator():
result_dict["whois"].append(
{
"domain": domain_change["domain"],
"date": rfc822(domain_change["status"]),
"registrar": domain_change["registrar"],
"whois_server": domain_change["whois_server"],
"creation_date": rfc822(domain_change["creation_date"])
if (domain_change["creation_date"] is not None)
else None,
"updated_date": rfc822(domain_change["updated_date"])
if (domain_change["updated_date"] is not None)
else None,
"expiration_date": rfc822(domain_change["expiration_date"])
if (domain_change["expiration_date"] is not None)
else None,
"name_servers": domain_change["name_servers"],
"whois_status": domain_change["whois_status"],
"emails": domain_change["emails"],
"dnssec": domain_change["dnssec"],
"name": domain_change["name"],
"org": domain_change["org"],
"address": domain_change["address"],
"city": domain_change["city"],
"state": domain_change["state"],
"zipcode": domain_change["zipcode"],
"country": domain_change["country"],
}
)
# Report the list of DNS server status # Report the list of DNS server status
checked_resolver_ip_dict = {} checked_resolver_ip_dict = {}
query = reportNetwork( query = reportNetwork(
...@@ -315,7 +413,6 @@ class WebBot: ...@@ -315,7 +413,6 @@ class WebBot:
} }
) )
domain_list = self.calculateFullDomainList()
checked_domain_dict = {} checked_domain_dict = {}
# Report list of DNS query # Report list of DNS query
query = reportDnsQuery( query = reportDnsQuery(
...@@ -487,6 +584,7 @@ class WebBot: ...@@ -487,6 +584,7 @@ class WebBot:
def pack(self): def pack(self):
logStatus(self._db, "packing") logStatus(self._db, "packing")
packDns(self._db) packDns(self._db)
packDomain(self._db)
packHttp(self._db) packHttp(self._db)
packNetwork(self._db) packNetwork(self._db)
packSslCertificate(self._db) packSslCertificate(self._db)
......
...@@ -106,6 +106,30 @@ class LogDB: ...@@ -106,6 +106,30 @@ class LogDB:
(("resolver_ip", "domain", "rdtype", "status"), True), (("resolver_ip", "domain", "rdtype", "status"), True),
) )
class DomainChange(BaseModel):
status = peewee.ForeignKeyField(Status)
domain = peewee.TextField()
registrar = peewee.TextField(null=True)
whois_server = peewee.TextField(null=True)
creation_date = peewee.DateTimeField(null=True)
updated_date = peewee.DateTimeField(null=True)
expiration_date = peewee.DateTimeField(null=True)
name_servers = peewee.TextField(null=True)
whois_status = peewee.TextField(null=True)
emails = peewee.TextField(null=True)
dnssec = peewee.TextField(null=True)
name = peewee.TextField(null=True)
org = peewee.TextField(null=True)
address = peewee.TextField(null=True)
city = peewee.TextField(null=True)
state = peewee.TextField(null=True)
zipcode = peewee.TextField(null=True)
country = peewee.TextField(null=True)
class Meta:
primary_key = peewee.CompositeKey("status", "domain")
indexes = ((("domain", "status"), True),)
class SslChange(BaseModel): class SslChange(BaseModel):
status = peewee.ForeignKeyField(Status) status = peewee.ForeignKeyField(Status)
ip = peewee.TextField() ip = peewee.TextField()
...@@ -140,13 +164,14 @@ class LogDB: ...@@ -140,13 +164,14 @@ class LogDB:
self.PlatformChange = PlatformChange self.PlatformChange = PlatformChange
self.NetworkChange = NetworkChange self.NetworkChange = NetworkChange
self.DnsChange = DnsChange self.DnsChange = DnsChange
self.DomainChange = DomainChange
self.HttpCodeChange = HttpCodeChange self.HttpCodeChange = HttpCodeChange
self.SslChange = SslChange self.SslChange = SslChange
def createTables(self): def createTables(self):
# http://www.sqlite.org/pragma.html#pragma_user_version # http://www.sqlite.org/pragma.html#pragma_user_version
db_version = self._db.pragma("user_version") db_version = self._db.pragma("user_version")
expected_version = 5 expected_version = 6
if db_version != expected_version: if db_version != expected_version:
with self._db.transaction(): with self._db.transaction():
...@@ -167,6 +192,9 @@ class LogDB: ...@@ -167,6 +192,9 @@ class LogDB:
# version 1 without SSL support # version 1 without SSL support
self._db.create_tables([self.SslChange]) self._db.create_tables([self.SslChange])
if db_version <= 5:
self._db.create_tables([self.DomainChange])
migrator = SqliteMigrator(self._db) migrator = SqliteMigrator(self._db)
migration_list = [] migration_list = []
......
# Copyright (C) 2021 Nexedi SA and Contributors.
# Romain Courteaud <romain@nexedi.com>
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your
# option) any later version, as published by the Free Software Foundation.
#
# You can also Link and Combine this program with other software covered by
# the terms of any of the Free Software licenses or any of the Open Source
# Initiative approved licenses and Convey the resulting work. Corresponding
# source of such a combination shall include the source code for all other
# software used.
#
# This program is distributed WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See COPYING file for full licensing terms.
# See https://www.nexedi.com/licensing for rationale and options.
import sys
import os
import whois
from peewee import fn
def reportWhoisQuery(db, domain=None):
query = (
db.DomainChange.select(db.DomainChange)
.group_by(db.DomainChange.domain)
.having(db.DomainChange.status_id == fn.MAX(db.DomainChange.status_id))
)
if domain is not None:
if type(domain) == list:
query = query.where(db.DomainChange.domain << domain)
else:
query = query.where(db.DomainChange.domain == domain)
return query
def packDomain(db):
with db._db.atomic():
result = [x for x in reportWhoisQuery(db)]
for dns_change in result:
db.DomainChange.delete().where(
db.DomainChange.status_id != dns_change.status_id,
db.DomainChange.domain == dns_change.domain,
).execute()
def logWhoisQuery(
db,
status_id,
domain_text,
registrar,
whois_server,
creation_date,
updated_date,
expiration_date,
name_servers,
whois_status,
emails,
dnssec,
name,
org,
address,
city,
state,
zipcode,
country,
):
with db._db.atomic():
try:
# Check previous parameter value
previous_entry = reportWhoisQuery(db, domain=domain_text).get()
except db.DomainChange.DoesNotExist:
previous_entry = None
if (
(previous_entry is None)
or (previous_entry.registrar != registrar)
or (previous_entry.whois_server != whois_server)
or (previous_entry.creation_date != creation_date)
or (previous_entry.updated_date != updated_date)
or (previous_entry.expiration_date != expiration_date)
or (previous_entry.name_servers != name_servers)
or (previous_entry.whois_status != whois_status)
or (previous_entry.emails != emails)
or (previous_entry.dnssec != dnssec)
or (previous_entry.name != name)
or (previous_entry.org != org)
or (previous_entry.address != address)
or (previous_entry.city != city)
or (previous_entry.state != state)
or (previous_entry.zipcode != zipcode)
or (previous_entry.country != country)
):
previous_entry = db.DomainChange.create(
domain=domain_text,
registrar=registrar,
whois_server=whois_server,
creation_date=creation_date,
updated_date=updated_date,
expiration_date=expiration_date,
name_servers=name_servers,
whois_status=whois_status,
emails=emails,
dnssec=dnssec,
name=name,
org=org,
address=address,
city=city,
state=state,
zipcode=zipcode,
country=country,
status=status_id,
)
return previous_entry.status_id
def queryWhois(db, status_id, domain_text):
# Hide lib message:
# Error trying to connect to socket: closing socket
_stdout = sys.stdout
sys.stdout = open(os.devnull, "w")
whois_dict = whois.whois(domain_text)
sys.stdout = _stdout
arg_list = []
for arg in [
whois_dict.registrar,
whois_dict.whois_server,
whois_dict.creation_date,
whois_dict.updated_date,
whois_dict.expiration_date,
whois_dict.name_servers,
whois_dict.status,
whois_dict.emails,
whois_dict.dnssec,
whois_dict.name,
whois_dict.org,
whois_dict.address,
whois_dict.city,
whois_dict.state,
whois_dict.zipcode,
whois_dict.country,
]:
if type(arg) == list:
arg = arg[0]
arg_list.append(arg)
logWhoisQuery(db, status_id, domain_text, *arg_list)
return whois_dict
...@@ -21,6 +21,7 @@ import unittest ...@@ -21,6 +21,7 @@ import unittest
from surykatka.bot import WebBot from surykatka.bot import WebBot
import mock import mock
from test_dns import MockAnswer from test_dns import MockAnswer
from test_domain import MockAnswer as MockWhoisAnswer
import surykatka.dns import surykatka.dns
...@@ -68,6 +69,17 @@ def checkDnsChange(bot, result_list): ...@@ -68,6 +69,17 @@ def checkDnsChange(bot, result_list):
assert db_result_list == result_list assert db_result_list == result_list
def checkDomainChange(bot, result_list):
select_list = bot._db.DomainChange.select().order_by(
bot._db.DomainChange.domain.asc()
)
db_result_list = [x.domain for x in select_list]
assert bot._db.DomainChange.select().count() == len(
result_list
), db_result_list
assert db_result_list == result_list
class SurykatkaBotTestCase(unittest.TestCase): class SurykatkaBotTestCase(unittest.TestCase):
def test_emptyConfiguration(self): def test_emptyConfiguration(self):
resolver_ip = "192.168.0.254" resolver_ip = "192.168.0.254"
...@@ -100,6 +112,8 @@ class SurykatkaBotTestCase(unittest.TestCase): ...@@ -100,6 +112,8 @@ class SurykatkaBotTestCase(unittest.TestCase):
checkHttpCodeChange(bot, []) checkHttpCodeChange(bot, [])
checkDomainChange(bot, [])
def test_oneNameserverOneDomainOneIp(self): def test_oneNameserverOneDomainOneIp(self):
resolver_ip = "127.0.0.1" resolver_ip = "127.0.0.1"
...@@ -107,6 +121,8 @@ class SurykatkaBotTestCase(unittest.TestCase): ...@@ -107,6 +121,8 @@ class SurykatkaBotTestCase(unittest.TestCase):
resolver.nameservers.append(resolver_ip) resolver.nameservers.append(resolver_ip)
with mock.patch( with mock.patch(
"surykatka.domain.whois.whois"
) as mock_whois, mock.patch(
"surykatka.configuration.get_default_resolver" "surykatka.configuration.get_default_resolver"
) as mock_get_default_resolver, mock.patch( ) as mock_get_default_resolver, mock.patch(
"surykatka.dns.dns.resolver.Resolver.query" "surykatka.dns.dns.resolver.Resolver.query"
...@@ -119,6 +135,24 @@ class SurykatkaBotTestCase(unittest.TestCase): ...@@ -119,6 +135,24 @@ class SurykatkaBotTestCase(unittest.TestCase):
) as mock_request: ) as mock_request:
mock_request.return_value.headers = {"Etag": "foobar"} mock_request.return_value.headers = {"Etag": "foobar"}
mock_whois.return_value = MockWhoisAnswer(
"registrar",
"whois_server",
"creation_date",
"updated_date",
"expiration_date",
"name_servers",
"whois_status",
"emails",
"dnssec",
"name",
"org",
"address",
"city",
"state",
"zipcode",
"country",
)
mock_get_default_resolver.return_value = resolver mock_get_default_resolver.return_value = resolver
mock_query.return_value = [MockAnswer("1.2.3.4")] mock_query.return_value = [MockAnswer("1.2.3.4")]
mock_create_default_context.return_value.wrap_socket.return_value.getpeercert.side_effect = [ mock_create_default_context.return_value.wrap_socket.return_value.getpeercert.side_effect = [
...@@ -138,6 +172,7 @@ class SurykatkaBotTestCase(unittest.TestCase): ...@@ -138,6 +172,7 @@ class SurykatkaBotTestCase(unittest.TestCase):
bot.iterateLoop() bot.iterateLoop()
assert mock_whois.call_count == 1
assert mock_query.call_count == 2 assert mock_query.call_count == 2
assert mock_socket.call_count == 3 assert mock_socket.call_count == 3
assert mock_create_default_context.call_count == 1 assert mock_create_default_context.call_count == 1
...@@ -149,6 +184,8 @@ class SurykatkaBotTestCase(unittest.TestCase): ...@@ -149,6 +184,8 @@ class SurykatkaBotTestCase(unittest.TestCase):
checkDnsChange(bot, [(resolver_ip, "example.org")]) checkDnsChange(bot, [(resolver_ip, "example.org")])
checkDomainChange(bot, ["example.org"])
checkSslChange(bot, [("1.2.3.4", 443, "example.org")]) checkSslChange(bot, [("1.2.3.4", 443, "example.org")])
checkHttpCodeChange( checkHttpCodeChange(
...@@ -173,6 +210,8 @@ class SurykatkaBotTestCase(unittest.TestCase): ...@@ -173,6 +210,8 @@ class SurykatkaBotTestCase(unittest.TestCase):
bot.initDB() bot.initDB()
with mock.patch( with mock.patch(
"surykatka.domain.whois.whois"
) as mock_whois, mock.patch(
"surykatka.dns.dns.resolver.Resolver.query" "surykatka.dns.dns.resolver.Resolver.query"
) as mock_query, mock.patch( ) as mock_query, mock.patch(
"surykatka.network.socket.socket" "surykatka.network.socket.socket"
...@@ -182,6 +221,24 @@ class SurykatkaBotTestCase(unittest.TestCase): ...@@ -182,6 +221,24 @@ class SurykatkaBotTestCase(unittest.TestCase):
"surykatka.http.request" "surykatka.http.request"
) as mock_request: ) as mock_request:
mock_whois.return_value = MockWhoisAnswer(
"registrar",
"whois_server",
"creation_date",
"updated_date",
"expiration_date",
"name_servers",
"whois_status",
"emails",
"dnssec",
"name",
"org",
"address",
"city",
"state",
"zipcode",
"country",
)
mock_request.return_value.headers = {"Etag": "foobar"} mock_request.return_value.headers = {"Etag": "foobar"}
mock_query.return_value = [MockAnswer("1.2.3.4")] mock_query.return_value = [MockAnswer("1.2.3.4")]
mock_create_default_context.return_value.wrap_socket.return_value.getpeercert.side_effect = [ mock_create_default_context.return_value.wrap_socket.return_value.getpeercert.side_effect = [
...@@ -196,6 +253,7 @@ class SurykatkaBotTestCase(unittest.TestCase): ...@@ -196,6 +253,7 @@ class SurykatkaBotTestCase(unittest.TestCase):
bot.iterateLoop() bot.iterateLoop()
assert mock_whois.call_count == 1
assert mock_query.call_count == 4 assert mock_query.call_count == 4
assert mock_socket.call_count == 3 assert mock_socket.call_count == 3
assert mock_create_default_context.call_count == 1 assert mock_create_default_context.call_count == 1
...@@ -211,6 +269,8 @@ class SurykatkaBotTestCase(unittest.TestCase): ...@@ -211,6 +269,8 @@ class SurykatkaBotTestCase(unittest.TestCase):
], ],
) )
checkDomainChange(bot, ["example.org"])
checkDnsChange( checkDnsChange(
bot, [(resolver_ip, "example.org"), (resolver_ip_2, "example.org")] bot, [(resolver_ip, "example.org"), (resolver_ip_2, "example.org")]
) )
...@@ -240,6 +300,8 @@ class SurykatkaBotTestCase(unittest.TestCase): ...@@ -240,6 +300,8 @@ class SurykatkaBotTestCase(unittest.TestCase):
bot.initDB() bot.initDB()
with mock.patch( with mock.patch(
"surykatka.domain.whois.whois"
) as mock_whois, mock.patch(
"surykatka.dns.dns.resolver.Resolver.query" "surykatka.dns.dns.resolver.Resolver.query"
) as mock_query, mock.patch( ) as mock_query, mock.patch(
"surykatka.network.socket.socket" "surykatka.network.socket.socket"
...@@ -249,6 +311,24 @@ class SurykatkaBotTestCase(unittest.TestCase): ...@@ -249,6 +311,24 @@ class SurykatkaBotTestCase(unittest.TestCase):
"surykatka.http.request" "surykatka.http.request"
) as mock_request: ) as mock_request:
mock_whois.return_value = MockWhoisAnswer(
"registrar",
"whois_server",
"creation_date",
"updated_date",
"expiration_date",
"name_servers",
"whois_status",
"emails",
"dnssec",
"name",
"org",
"address",
"city",
"state",
"zipcode",
"country",
)
mock_request.return_value.headers = {"Etag": "foobar"} mock_request.return_value.headers = {"Etag": "foobar"}
mock_query.return_value = [MockAnswer("1.2.3.4")] mock_query.return_value = [MockAnswer("1.2.3.4")]
mock_create_default_context.return_value.wrap_socket.return_value.getpeercert.side_effect = [ mock_create_default_context.return_value.wrap_socket.return_value.getpeercert.side_effect = [
...@@ -270,6 +350,7 @@ class SurykatkaBotTestCase(unittest.TestCase): ...@@ -270,6 +350,7 @@ class SurykatkaBotTestCase(unittest.TestCase):
bot.iterateLoop() bot.iterateLoop()
assert mock_whois.call_count == 1
assert mock_query.call_count == 3 assert mock_query.call_count == 3
assert mock_socket.call_count == 4 assert mock_socket.call_count == 4
assert mock_create_default_context.call_count == 2 assert mock_create_default_context.call_count == 2
...@@ -279,6 +360,8 @@ class SurykatkaBotTestCase(unittest.TestCase): ...@@ -279,6 +360,8 @@ class SurykatkaBotTestCase(unittest.TestCase):
bot, [(resolver_ip, 53), ("1.2.3.4", 80), ("1.2.3.4", 443)] bot, [(resolver_ip, 53), ("1.2.3.4", 80), ("1.2.3.4", 443)]
) )
checkDomainChange(bot, ["example.org"])
checkDnsChange(bot, [(resolver_ip, domain_1), (resolver_ip, domain_2)]) checkDnsChange(bot, [(resolver_ip, domain_1), (resolver_ip, domain_2)])
checkSslChange( checkSslChange(
...@@ -309,6 +392,8 @@ class SurykatkaBotTestCase(unittest.TestCase): ...@@ -309,6 +392,8 @@ class SurykatkaBotTestCase(unittest.TestCase):
bot.initDB() bot.initDB()
with mock.patch( with mock.patch(
"surykatka.domain.whois.whois"
) as mock_whois, mock.patch(
"surykatka.dns.dns.resolver.Resolver.query" "surykatka.dns.dns.resolver.Resolver.query"
) as mock_query, mock.patch( ) as mock_query, mock.patch(
"surykatka.network.socket.socket" "surykatka.network.socket.socket"
...@@ -318,6 +403,24 @@ class SurykatkaBotTestCase(unittest.TestCase): ...@@ -318,6 +403,24 @@ class SurykatkaBotTestCase(unittest.TestCase):
"surykatka.http.request" "surykatka.http.request"
) as mock_request: ) as mock_request:
mock_whois.return_value = MockWhoisAnswer(
"registrar",
"whois_server",
"creation_date",
"updated_date",
"expiration_date",
"name_servers",
"whois_status",
"emails",
"dnssec",
"name",
"org",
"address",
"city",
"state",
"zipcode",
"country",
)
mock_request.return_value.headers = {"Etag": "foobar"} mock_request.return_value.headers = {"Etag": "foobar"}
mock_query.return_value = [ mock_query.return_value = [
MockAnswer("1.2.3.4"), MockAnswer("1.2.3.4"),
...@@ -342,6 +445,7 @@ class SurykatkaBotTestCase(unittest.TestCase): ...@@ -342,6 +445,7 @@ class SurykatkaBotTestCase(unittest.TestCase):
bot.iterateLoop() bot.iterateLoop()
assert mock_whois.call_count == 1
assert mock_query.call_count == 2 assert mock_query.call_count == 2
assert mock_socket.call_count == 6 assert mock_socket.call_count == 6
assert mock_create_default_context.call_count == 2 assert mock_create_default_context.call_count == 2
...@@ -358,6 +462,8 @@ class SurykatkaBotTestCase(unittest.TestCase): ...@@ -358,6 +462,8 @@ class SurykatkaBotTestCase(unittest.TestCase):
], ],
) )
checkDomainChange(bot, ["example.org"])
checkDnsChange(bot, [(resolver_ip, domain)]) checkDnsChange(bot, [(resolver_ip, domain)])
checkSslChange( checkSslChange(
...@@ -390,6 +496,8 @@ class SurykatkaBotTestCase(unittest.TestCase): ...@@ -390,6 +496,8 @@ class SurykatkaBotTestCase(unittest.TestCase):
bot.initDB() bot.initDB()
with mock.patch( with mock.patch(
"surykatka.domain.whois.whois"
) as mock_whois, mock.patch(
"surykatka.dns.dns.resolver.Resolver.query" "surykatka.dns.dns.resolver.Resolver.query"
) as mock_query, mock.patch( ) as mock_query, mock.patch(
"surykatka.network.socket.socket" "surykatka.network.socket.socket"
...@@ -399,6 +507,24 @@ class SurykatkaBotTestCase(unittest.TestCase): ...@@ -399,6 +507,24 @@ class SurykatkaBotTestCase(unittest.TestCase):
"surykatka.http.request" "surykatka.http.request"
) as mock_request: ) as mock_request:
mock_whois.return_value = MockWhoisAnswer(
"registrar",
"whois_server",
"creation_date",
"updated_date",
"expiration_date",
"name_servers",
"whois_status",
"emails",
"dnssec",
"name",
"org",
"address",
"city",
"state",
"zipcode",
"country",
)
mock_request.return_value.headers = {"Etag": "foobar"} mock_request.return_value.headers = {"Etag": "foobar"}
mock_query.return_value = [MockAnswer("1.2.3.4")] mock_query.return_value = [MockAnswer("1.2.3.4")]
mock_create_default_context.return_value.wrap_socket.return_value.getpeercert.side_effect = [ mock_create_default_context.return_value.wrap_socket.return_value.getpeercert.side_effect = [
...@@ -420,6 +546,7 @@ class SurykatkaBotTestCase(unittest.TestCase): ...@@ -420,6 +546,7 @@ class SurykatkaBotTestCase(unittest.TestCase):
bot.iterateLoop() bot.iterateLoop()
assert mock_whois.call_count == 1
assert mock_query.call_count == 3 assert mock_query.call_count == 3
assert mock_socket.call_count == 4 assert mock_socket.call_count == 4
assert mock_create_default_context.call_count == 2 assert mock_create_default_context.call_count == 2
...@@ -429,6 +556,8 @@ class SurykatkaBotTestCase(unittest.TestCase): ...@@ -429,6 +556,8 @@ class SurykatkaBotTestCase(unittest.TestCase):
bot, [(resolver_ip, 53), ("1.2.3.4", 80), ("1.2.3.4", 443)] bot, [(resolver_ip, 53), ("1.2.3.4", 80), ("1.2.3.4", 443)]
) )
checkDomainChange(bot, ["example.org"])
checkDnsChange(bot, [(resolver_ip, domain), (resolver_ip, sub_domain)]) checkDnsChange(bot, [(resolver_ip, domain), (resolver_ip, sub_domain)])
checkSslChange( checkSslChange(
...@@ -461,6 +590,8 @@ class SurykatkaBotTestCase(unittest.TestCase): ...@@ -461,6 +590,8 @@ class SurykatkaBotTestCase(unittest.TestCase):
bot.initDB() bot.initDB()
with mock.patch( with mock.patch(
"surykatka.domain.whois.whois"
) as mock_whois, mock.patch(
"surykatka.dns.dns.resolver.Resolver.query" "surykatka.dns.dns.resolver.Resolver.query"
) as mock_query, mock.patch( ) as mock_query, mock.patch(
"surykatka.network.socket.socket" "surykatka.network.socket.socket"
...@@ -470,6 +601,24 @@ class SurykatkaBotTestCase(unittest.TestCase): ...@@ -470,6 +601,24 @@ class SurykatkaBotTestCase(unittest.TestCase):
"surykatka.http.request" "surykatka.http.request"
) as mock_request: ) as mock_request:
mock_whois.return_value = MockWhoisAnswer(
"registrar",
"whois_server",
"creation_date",
"updated_date",
"expiration_date",
"name_servers",
"whois_status",
"emails",
"dnssec",
"name",
"org",
"address",
"city",
"state",
"zipcode",
"country",
)
mock_request.return_value.headers = {"Etag": "foobar"} mock_request.return_value.headers = {"Etag": "foobar"}
mock_query.return_value = [MockAnswer("1.2.3.4")] mock_query.return_value = [MockAnswer("1.2.3.4")]
mock_create_default_context.return_value.wrap_socket.return_value.getpeercert.side_effect = [ mock_create_default_context.return_value.wrap_socket.return_value.getpeercert.side_effect = [
...@@ -484,6 +633,7 @@ class SurykatkaBotTestCase(unittest.TestCase): ...@@ -484,6 +633,7 @@ class SurykatkaBotTestCase(unittest.TestCase):
bot.iterateLoop() bot.iterateLoop()
assert mock_whois.call_count == 1
assert mock_query.call_count == 2 assert mock_query.call_count == 2
assert mock_socket.call_count == 3 assert mock_socket.call_count == 3
assert mock_create_default_context.call_count == 1 assert mock_create_default_context.call_count == 1
...@@ -493,6 +643,8 @@ class SurykatkaBotTestCase(unittest.TestCase): ...@@ -493,6 +643,8 @@ class SurykatkaBotTestCase(unittest.TestCase):
bot, [(resolver_ip, 53), ("1.2.3.4", 80), ("1.2.3.4", 443)] bot, [(resolver_ip, 53), ("1.2.3.4", 80), ("1.2.3.4", 443)]
) )
checkDomainChange(bot, ["foo.example.com"])
checkDnsChange( checkDnsChange(
bot, [(resolver_ip, "example.org"), (resolver_ip, sub_domain)] bot, [(resolver_ip, "example.org"), (resolver_ip, sub_domain)]
) )
...@@ -521,6 +673,8 @@ class SurykatkaBotTestCase(unittest.TestCase): ...@@ -521,6 +673,8 @@ class SurykatkaBotTestCase(unittest.TestCase):
bot.initDB() bot.initDB()
with mock.patch( with mock.patch(
"surykatka.domain.whois.whois"
) as mock_whois, mock.patch(
"surykatka.dns.dns.resolver.Resolver.query" "surykatka.dns.dns.resolver.Resolver.query"
) as mock_query, mock.patch( ) as mock_query, mock.patch(
"surykatka.network.socket.socket" "surykatka.network.socket.socket"
...@@ -530,6 +684,24 @@ class SurykatkaBotTestCase(unittest.TestCase): ...@@ -530,6 +684,24 @@ class SurykatkaBotTestCase(unittest.TestCase):
"surykatka.http.request" "surykatka.http.request"
) as mock_request: ) as mock_request:
mock_whois.return_value = MockWhoisAnswer(
"registrar",
"whois_server",
"creation_date",
"updated_date",
"expiration_date",
"name_servers",
"whois_status",
"emails",
"dnssec",
"name",
"org",
"address",
"city",
"state",
"zipcode",
"country",
)
mock_request.return_value.headers = {"Etag": "foobar"} mock_request.return_value.headers = {"Etag": "foobar"}
mock_query.return_value = [MockAnswer("1.2.3.4")] mock_query.return_value = [MockAnswer("1.2.3.4")]
mock_create_default_context.return_value.wrap_socket.return_value.getpeercert.side_effect = [ mock_create_default_context.return_value.wrap_socket.return_value.getpeercert.side_effect = [
...@@ -543,6 +715,7 @@ class SurykatkaBotTestCase(unittest.TestCase): ...@@ -543,6 +715,7 @@ class SurykatkaBotTestCase(unittest.TestCase):
] ]
bot.iterateLoop() bot.iterateLoop()
assert mock_whois.call_count == 1
assert mock_query.call_count == 2 assert mock_query.call_count == 2
assert mock_socket.call_count == 3 assert mock_socket.call_count == 3
assert mock_create_default_context.call_count == 1 assert mock_create_default_context.call_count == 1
...@@ -552,6 +725,8 @@ class SurykatkaBotTestCase(unittest.TestCase): ...@@ -552,6 +725,8 @@ class SurykatkaBotTestCase(unittest.TestCase):
bot, [(resolver_ip, 53), ("1.2.3.4", 80), ("1.2.3.4", 443)] bot, [(resolver_ip, 53), ("1.2.3.4", 80), ("1.2.3.4", 443)]
) )
checkDomainChange(bot, ["example.org"])
checkDnsChange(bot, [(resolver_ip, domain)]) checkDnsChange(bot, [(resolver_ip, domain)])
checkSslChange(bot, [("1.2.3.4", 443, domain)]) checkSslChange(bot, [("1.2.3.4", 443, domain)])
...@@ -579,6 +754,8 @@ class SurykatkaBotTestCase(unittest.TestCase): ...@@ -579,6 +754,8 @@ class SurykatkaBotTestCase(unittest.TestCase):
bot.initDB() bot.initDB()
with mock.patch( with mock.patch(
"surykatka.domain.whois.whois"
) as mock_whois, mock.patch(
"surykatka.dns.dns.resolver.Resolver.query" "surykatka.dns.dns.resolver.Resolver.query"
) as mock_query, mock.patch( ) as mock_query, mock.patch(
"surykatka.network.socket.socket" "surykatka.network.socket.socket"
...@@ -586,16 +763,37 @@ class SurykatkaBotTestCase(unittest.TestCase): ...@@ -586,16 +763,37 @@ class SurykatkaBotTestCase(unittest.TestCase):
"surykatka.http.request" "surykatka.http.request"
) as mock_request: ) as mock_request:
mock_whois.return_value = MockWhoisAnswer(
"registrar",
"whois_server",
"creation_date",
"updated_date",
"expiration_date",
"name_servers",
"whois_status",
"emails",
"dnssec",
"name",
"org",
"address",
"city",
"state",
"zipcode",
"country",
)
mock_request.return_value.headers = {"Etag": "foobar"} mock_request.return_value.headers = {"Etag": "foobar"}
mock_query.side_effect = [[MockAnswer("1.2.3.4")], []] mock_query.side_effect = [[MockAnswer("1.2.3.4")], []]
bot.iterateLoop() bot.iterateLoop()
assert mock_whois.call_count == 1
assert mock_query.call_count == 2 assert mock_query.call_count == 2
assert mock_socket.call_count == 0 assert mock_socket.call_count == 0
assert mock_request.call_count == 0 assert mock_request.call_count == 0
checkNetworkChange(bot, [(resolver_ip, 53)]) checkNetworkChange(bot, [(resolver_ip, 53)])
checkDomainChange(bot, ["example2.org"])
checkDnsChange( checkDnsChange(
bot, [(resolver_ip, domain), (resolver_ip, "example2.org")] bot, [(resolver_ip, domain), (resolver_ip, "example2.org")]
) )
...@@ -630,6 +828,7 @@ class SurykatkaBotStatusTestCase(unittest.TestCase): ...@@ -630,6 +828,7 @@ class SurykatkaBotStatusTestCase(unittest.TestCase):
assert result["bot_status"][0]["text"] == "" assert result["bot_status"][0]["text"] == ""
assert result == { assert result == {
"bot_status": result["bot_status"], "bot_status": result["bot_status"],
"whois": [],
"dns_server": [], "dns_server": [],
"dns_query": [], "dns_query": [],
"missing_data": [ "missing_data": [
...@@ -668,6 +867,7 @@ class SurykatkaBotStatusTestCase(unittest.TestCase): ...@@ -668,6 +867,7 @@ class SurykatkaBotStatusTestCase(unittest.TestCase):
assert len(result["bot_status"]) == 1 assert len(result["bot_status"]) == 1
assert result["bot_status"][0]["text"] == "" assert result["bot_status"][0]["text"] == ""
assert len(result["dns_server"]) == 0 assert len(result["dns_server"]) == 0
assert len(result["whois"]) == 0
assert len(result["dns_query"]) == 0 assert len(result["dns_query"]) == 0
assert len(result["http_server"]) == 0 assert len(result["http_server"]) == 0
assert len(result["ssl_certificate"]) == 0 assert len(result["ssl_certificate"]) == 0
......
...@@ -92,10 +92,8 @@ class SurykatkaDBTestCase(unittest.TestCase): ...@@ -92,10 +92,8 @@ class SurykatkaDBTestCase(unittest.TestCase):
def setUp(self): def setUp(self):
self.db = LogDB(":memory:") self.db = LogDB(":memory:")
def test_createTable(self): def checkDatabaseStatus(self):
assert self.db._db.pragma("user_version") == 0 assert self.db._db.pragma("user_version") == 6
self.db.createTables()
assert self.db._db.pragma("user_version") == 5
assert validate_schema(self.db.HttpCodeChange).valid, validate_schema( assert validate_schema(self.db.HttpCodeChange).valid, validate_schema(
self.db.HttpCodeChange self.db.HttpCodeChange
) )
...@@ -105,9 +103,26 @@ class SurykatkaDBTestCase(unittest.TestCase): ...@@ -105,9 +103,26 @@ class SurykatkaDBTestCase(unittest.TestCase):
assert validate_schema(self.db.DnsChange).valid, validate_schema( assert validate_schema(self.db.DnsChange).valid, validate_schema(
self.db.DnsChange self.db.DnsChange
) )
assert validate_schema(self.db.DomainChange).valid, validate_schema(
self.db.DomainChange
)
assert validate_schema(self.db.NetworkChange).valid, validate_schema( assert validate_schema(self.db.NetworkChange).valid, validate_schema(
self.db.NetworkChange self.db.NetworkChange
) )
assert validate_schema(self.db.PlatformChange).valid, validate_schema(
self.db.PlatformChange
)
assert validate_schema(
self.db.ConfigurationChange
).valid, validate_schema(self.db.ConfigurationChange)
assert validate_schema(self.db.Status).valid, validate_schema(
self.db.Status
)
def test_createTable(self):
assert self.db._db.pragma("user_version") == 0
self.db.createTables()
self.checkDatabaseStatus()
def test_downgrade(self): def test_downgrade(self):
assert self.db._db.pragma("user_version") == 0 assert self.db._db.pragma("user_version") == 0
...@@ -165,19 +180,7 @@ class SurykatkaDBTestCase(unittest.TestCase): ...@@ -165,19 +180,7 @@ class SurykatkaDBTestCase(unittest.TestCase):
self.db._db.pragma("user_version", 1) self.db._db.pragma("user_version", 1)
self.db.createTables() self.db.createTables()
assert self.db._db.pragma("user_version") == 5 self.checkDatabaseStatus()
assert validate_schema(self.db.HttpCodeChange).valid, validate_schema(
self.db.HttpCodeChange
)
assert validate_schema(self.db.SslChange).valid, validate_schema(
self.db.SslChange
)
assert validate_schema(self.db.DnsChange).valid, validate_schema(
self.db.DnsChange
)
assert validate_schema(self.db.NetworkChange).valid, validate_schema(
self.db.NetworkChange
)
def test_migrationFromVersion2(self): def test_migrationFromVersion2(self):
assert self.db._db.pragma("user_version") == 0 assert self.db._db.pragma("user_version") == 0
...@@ -230,19 +233,7 @@ class SurykatkaDBTestCase(unittest.TestCase): ...@@ -230,19 +233,7 @@ class SurykatkaDBTestCase(unittest.TestCase):
self.db._db.pragma("user_version", 2) self.db._db.pragma("user_version", 2)
self.db.createTables() self.db.createTables()
assert self.db._db.pragma("user_version") == 5 self.checkDatabaseStatus()
assert validate_schema(self.db.HttpCodeChange).valid, validate_schema(
self.db.HttpCodeChange
)
assert validate_schema(self.db.SslChange).valid, validate_schema(
self.db.SslChange
)
assert validate_schema(self.db.DnsChange).valid, validate_schema(
self.db.DnsChange
)
assert validate_schema(self.db.NetworkChange).valid, validate_schema(
self.db.NetworkChange
)
def test_migrationFromVersion3(self): def test_migrationFromVersion3(self):
assert self.db._db.pragma("user_version") == 0 assert self.db._db.pragma("user_version") == 0
...@@ -290,19 +281,7 @@ class SurykatkaDBTestCase(unittest.TestCase): ...@@ -290,19 +281,7 @@ class SurykatkaDBTestCase(unittest.TestCase):
self.db._db.pragma("user_version", 3) self.db._db.pragma("user_version", 3)
self.db.createTables() self.db.createTables()
assert self.db._db.pragma("user_version") == 5 self.checkDatabaseStatus()
assert validate_schema(self.db.HttpCodeChange).valid, validate_schema(
self.db.HttpCodeChange
)
assert validate_schema(self.db.SslChange).valid, validate_schema(
self.db.SslChange
)
assert validate_schema(self.db.DnsChange).valid, validate_schema(
self.db.DnsChange
)
assert validate_schema(self.db.NetworkChange).valid, validate_schema(
self.db.NetworkChange
)
def test_migrationFromVersion4(self): def test_migrationFromVersion4(self):
assert self.db._db.pragma("user_version") == 0 assert self.db._db.pragma("user_version") == 0
...@@ -345,19 +324,28 @@ class SurykatkaDBTestCase(unittest.TestCase): ...@@ -345,19 +324,28 @@ class SurykatkaDBTestCase(unittest.TestCase):
self.db._db.pragma("user_version", 4) self.db._db.pragma("user_version", 4)
self.db.createTables() self.db.createTables()
assert self.db._db.pragma("user_version") == 5 self.checkDatabaseStatus()
assert validate_schema(self.db.HttpCodeChange).valid, validate_schema(
self.db.HttpCodeChange def test_migrationFromVersion5(self):
) assert self.db._db.pragma("user_version") == 0
assert validate_schema(self.db.SslChange).valid, validate_schema( # Recreate version 5
self.db.SslChange with self.db._db.transaction():
)
assert validate_schema(self.db.DnsChange).valid, validate_schema( self.db._db.create_tables(
self.db.DnsChange [
) self.db.Status,
assert validate_schema(self.db.NetworkChange).valid, validate_schema( self.db.ConfigurationChange,
self.db.NetworkChange self.db.HttpCodeChange,
) self.db.NetworkChange,
self.db.PlatformChange,
self.db.DnsChange,
self.db.SslChange,
]
)
self.db._db.pragma("user_version", 5)
self.db.createTables()
self.checkDatabaseStatus()
def suite(): def suite():
......
# Copyright (C) 2021 Nexedi SA and Contributors.
# Romain Courteaud <romain@nexedi.com>
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your
# option) any later version, as published by the Free Software Foundation.
#
# You can also Link and Combine this program with other software covered by
# the terms of any of the Free Software licenses or any of the Open Source
# Initiative approved licenses and Convey the resulting work. Corresponding
# source of such a combination shall include the source code for all other
# software used.
#
# This program is distributed WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See COPYING file for full licensing terms.
# See https://www.nexedi.com/licensing for rationale and options.
import unittest
from surykatka.db import LogDB
import peewee
import datetime
from surykatka.domain import logWhoisQuery, packDomain, queryWhois
from surykatka.status import logStatus
import mock
class MockAnswer(object):
def __init__(
self,
registrar,
whois_server,
creation_date,
updated_date,
expiration_date,
name_servers,
status,
emails,
dnssec,
name,
org,
address,
city,
state,
zipcode,
country,
):
self.registrar = registrar
self.whois_server = whois_server
self.creation_date = creation_date
self.updated_date = updated_date
self.expiration_date = expiration_date
self.name_servers = name_servers
self.status = status
self.emails = emails
self.dnssec = dnssec
self.name = name
self.org = org
self.address = address
self.city = city
self.state = state
self.zipcode = zipcode
self.country = country
class SurykatkaDomainTestCase(unittest.TestCase):
def setUp(self):
self.db = LogDB(":memory:")
self.db.createTables()
################################################
# logWhoisQuery
################################################
def test_logWhoisQuery_insertFirst(self):
domain = "http://example.org"
registrar = "REGISTRAR"
whois_server = "WHOISSERVER"
creation_date = datetime.datetime.utcnow()
updated_date = datetime.datetime.utcnow()
expiration_date = datetime.datetime.utcnow()
name_servers = "NAMESERVER"
whois_status = "WHOISSTATUS"
emails = "EMAIL"
dnssec = "DNSSEC"
name = "NAME"
org = "ORG"
address = "ADDRESS"
city = "CITY"
state = "STATE"
zipcode = "ZIPCODE"
country = "COUNTRY"
status_id = logStatus(self.db, "foo")
result = logWhoisQuery(
self.db,
status_id,
domain,
registrar,
whois_server,
creation_date,
updated_date,
expiration_date,
name_servers,
whois_status,
emails,
dnssec,
name,
org,
address,
city,
state,
zipcode,
country,
)
assert self.db.DomainChange.select().count() == 1
assert self.db.DomainChange.get().domain == domain
assert self.db.DomainChange.get().registrar == registrar
assert self.db.DomainChange.get().whois_server == whois_server
assert self.db.DomainChange.get().creation_date == creation_date
assert self.db.DomainChange.get().updated_date == updated_date
assert self.db.DomainChange.get().expiration_date == expiration_date
assert self.db.DomainChange.get().name_servers == name_servers
assert self.db.DomainChange.get().whois_status == whois_status
assert self.db.DomainChange.get().emails == emails
assert self.db.DomainChange.get().dnssec == dnssec
assert self.db.DomainChange.get().name == name
assert self.db.DomainChange.get().org == org
assert self.db.DomainChange.get().address == address
assert self.db.DomainChange.get().city == city
assert self.db.DomainChange.get().state == state
assert self.db.DomainChange.get().zipcode == zipcode
assert self.db.DomainChange.get().country == country
assert self.db.DomainChange.get().status_id == status_id
assert result == status_id
def test_logWhoisQuery_insertOnlyOnePerStatusIdDomain(self):
domain = "http://example.org"
registrar = "REGISTRAR"
whois_server = "WHOISSERVER"
creation_date = datetime.datetime.utcnow()
updated_date = datetime.datetime.utcnow()
expiration_date = datetime.datetime.utcnow()
name_servers = "NAMESERVER"
whois_status = "WHOISSTATUS"
emails = "EMAIL"
dnssec = "DNSSEC"
name = "NAME"
org = "ORG"
address = "ADDRESS"
city = "CITY"
state = "STATE"
zipcode = "ZIPCODE"
country = "COUNTRY"
status_id = logStatus(self.db, "foo")
logWhoisQuery(
self.db,
status_id,
domain,
registrar,
whois_server,
creation_date,
updated_date,
expiration_date,
name_servers,
whois_status,
emails,
dnssec,
name,
org,
address,
city,
state,
zipcode,
country,
)
country += "FOO"
try:
logWhoisQuery(
self.db,
status_id,
domain,
registrar,
whois_server,
creation_date,
updated_date,
expiration_date,
name_servers,
whois_status,
emails,
dnssec,
name,
org,
address,
city,
state,
zipcode,
country,
)
except peewee.IntegrityError:
assert self.db.DomainChange.select().count() == 1
assert self.db.DomainChange.get().status_id == status_id
else:
raise NotImplementedError("Expected IntegrityError")
def test_logDomainQuery_skipIdenticalPreviousValues(self):
domain = "http://example.org"
registrar = "REGISTRAR"
whois_server = "WHOISSERVER"
creation_date = datetime.datetime.utcnow()
updated_date = datetime.datetime.utcnow()
expiration_date = datetime.datetime.utcnow()
name_servers = "NAMESERVER"
whois_status = "WHOISSTATUS"
emails = "EMAIL"
dnssec = "DNSSEC"
name = "NAME"
org = "ORG"
address = "ADDRESS"
city = "CITY"
state = "STATE"
zipcode = "ZIPCODE"
country = "COUNTRY"
status_id = logStatus(self.db, "foo")
result = logWhoisQuery(
self.db,
status_id,
domain,
registrar,
whois_server,
creation_date,
updated_date,
expiration_date,
name_servers,
whois_status,
emails,
dnssec,
name,
org,
address,
city,
state,
zipcode,
country,
)
status_id_2 = logStatus(self.db, "foo")
result_2 = logWhoisQuery(
self.db,
status_id_2,
domain,
registrar,
whois_server,
creation_date,
updated_date,
expiration_date,
name_servers,
whois_status,
emails,
dnssec,
name,
org,
address,
city,
state,
zipcode,
country,
)
assert self.db.DomainChange.select().count() == 1
assert self.db.DomainChange.get().domain == domain
assert self.db.DomainChange.get().registrar == registrar
assert self.db.DomainChange.get().whois_server == whois_server
assert self.db.DomainChange.get().creation_date == creation_date
assert self.db.DomainChange.get().updated_date == updated_date
assert self.db.DomainChange.get().expiration_date == expiration_date
assert self.db.DomainChange.get().name_servers == name_servers
assert self.db.DomainChange.get().whois_status == whois_status
assert self.db.DomainChange.get().emails == emails
assert self.db.DomainChange.get().dnssec == dnssec
assert self.db.DomainChange.get().name == name
assert self.db.DomainChange.get().org == org
assert self.db.DomainChange.get().address == address
assert self.db.DomainChange.get().city == city
assert self.db.DomainChange.get().state == state
assert self.db.DomainChange.get().zipcode == zipcode
assert self.db.DomainChange.get().country == country
assert self.db.DomainChange.get().status_id == status_id
assert result == status_id
assert result_2 == status_id
def test_logDomainQuery_insertWhenDifferentAnswer(self):
domain = "http://example.org"
registrar = "REGISTRAR"
registrar_2 = registrar + "foo"
whois_server = "WHOISSERVER"
whois_server_2 = whois_server + "foo"
creation_date = datetime.datetime(2000, 1, 1)
creation_date_2 = datetime.datetime(2000, 1, 2)
updated_date = datetime.datetime(2000, 1, 3)
updated_date_2 = datetime.datetime(2000, 1, 4)
expiration_date = datetime.datetime(2000, 1, 5)
expiration_date_2 = datetime.datetime(2000, 1, 6)
name_servers = "NAMESERVER"
name_servers_2 = name_servers + "foo"
whois_status = "WHOISSTATUS"
whois_status_2 = whois_status + "foo"
emails = "EMAIL"
emails_2 = emails + "foo"
dnssec = "DNSSEC"
dnssec_2 = dnssec + "foo"
name = "NAME"
name_2 = name + "foo"
org = "ORG"
org_2 = org + "foo"
address = "ADDRESS"
address_2 = address + "foo"
city = "CITY"
city_2 = city + "foo"
state = "STATE"
state_2 = state + "foo"
zipcode = "ZIPCODE"
zipcode_2 = zipcode + "foo"
country = "COUNTRY"
country_2 = country + "foo"
logWhoisQuery(
self.db,
logStatus(self.db, "foo"),
domain,
registrar,
whois_server,
creation_date,
updated_date,
expiration_date,
name_servers,
whois_status,
emails,
dnssec,
name,
org,
address,
city,
state,
zipcode,
country,
)
assert self.db.DomainChange.select().count() == 1
logWhoisQuery(
self.db,
logStatus(self.db, "foo"),
domain,
registrar_2,
whois_server,
creation_date,
updated_date,
expiration_date,
name_servers,
whois_status,
emails,
dnssec,
name,
org,
address,
city,
state,
zipcode,
country,
)
assert self.db.DomainChange.select().count() == 2
logWhoisQuery(
self.db,
logStatus(self.db, "foo"),
domain,
registrar_2,
whois_server_2,
creation_date,
updated_date,
expiration_date,
name_servers,
whois_status,
emails,
dnssec,
name,
org,
address,
city,
state,
zipcode,
country,
)
assert self.db.DomainChange.select().count() == 3
logWhoisQuery(
self.db,
logStatus(self.db, "foo"),
domain,
registrar_2,
whois_server_2,
creation_date_2,
updated_date,
expiration_date,
name_servers,
whois_status,
emails,
dnssec,
name,
org,
address,
city,
state,
zipcode,
country,
)
assert self.db.DomainChange.select().count() == 4
logWhoisQuery(
self.db,
logStatus(self.db, "foo"),
domain,
registrar_2,
whois_server_2,
creation_date_2,
updated_date_2,
expiration_date,
name_servers,
whois_status,
emails,
dnssec,
name,
org,
address,
city,
state,
zipcode,
country,
)
assert self.db.DomainChange.select().count() == 5
logWhoisQuery(
self.db,
logStatus(self.db, "foo"),
domain,
registrar_2,
whois_server_2,
creation_date_2,
updated_date_2,
expiration_date_2,
name_servers,
whois_status,
emails,
dnssec,
name,
org,
address,
city,
state,
zipcode,
country,
)
assert self.db.DomainChange.select().count() == 6
logWhoisQuery(
self.db,
logStatus(self.db, "foo"),
domain,
registrar_2,
whois_server_2,
creation_date_2,
updated_date_2,
expiration_date_2,
name_servers_2,
whois_status,
emails,
dnssec,
name,
org,
address,
city,
state,
zipcode,
country,
)
assert self.db.DomainChange.select().count() == 7
logWhoisQuery(
self.db,
logStatus(self.db, "foo"),
domain,
registrar_2,
whois_server_2,
creation_date_2,
updated_date_2,
expiration_date_2,
name_servers_2,
whois_status_2,
emails,
dnssec,
name,
org,
address,
city,
state,
zipcode,
country,
)
assert self.db.DomainChange.select().count() == 8
logWhoisQuery(
self.db,
logStatus(self.db, "foo"),
domain,
registrar_2,
whois_server_2,
creation_date_2,
updated_date_2,
expiration_date_2,
name_servers_2,
whois_status_2,
emails_2,
dnssec,
name,
org,
address,
city,
state,
zipcode,
country,
)
assert self.db.DomainChange.select().count() == 9
logWhoisQuery(
self.db,
logStatus(self.db, "foo"),
domain,
registrar_2,
whois_server_2,
creation_date_2,
updated_date_2,
expiration_date_2,
name_servers_2,
whois_status_2,
emails_2,
dnssec_2,
name,
org,
address,
city,
state,
zipcode,
country,
)
assert self.db.DomainChange.select().count() == 10
logWhoisQuery(
self.db,
logStatus(self.db, "foo"),
domain,
registrar_2,
whois_server_2,
creation_date_2,
updated_date_2,
expiration_date_2,
name_servers_2,
whois_status_2,
emails_2,
dnssec_2,
name_2,
org,
address,
city,
state,
zipcode,
country,
)
assert self.db.DomainChange.select().count() == 11
logWhoisQuery(
self.db,
logStatus(self.db, "foo"),
domain,
registrar_2,
whois_server_2,
creation_date_2,
updated_date_2,
expiration_date_2,
name_servers_2,
whois_status_2,
emails_2,
dnssec_2,
name_2,
org_2,
address,
city,
state,
zipcode,
country,
)
assert self.db.DomainChange.select().count() == 12
logWhoisQuery(
self.db,
logStatus(self.db, "foo"),
domain,
registrar_2,
whois_server_2,
creation_date_2,
updated_date_2,
expiration_date_2,
name_servers_2,
whois_status_2,
emails_2,
dnssec_2,
name_2,
org_2,
address_2,
city,
state,
zipcode,
country,
)
assert self.db.DomainChange.select().count() == 13
logWhoisQuery(
self.db,
logStatus(self.db, "foo"),
domain,
registrar_2,
whois_server_2,
creation_date_2,
updated_date_2,
expiration_date_2,
name_servers_2,
whois_status_2,
emails_2,
dnssec_2,
name_2,
org_2,
address_2,
city_2,
state,
zipcode,
country,
)
assert self.db.DomainChange.select().count() == 14
logWhoisQuery(
self.db,
logStatus(self.db, "foo"),
domain,
registrar_2,
whois_server_2,
creation_date_2,
updated_date_2,
expiration_date_2,
name_servers_2,
whois_status_2,
emails_2,
dnssec_2,
name_2,
org_2,
address_2,
city_2,
state_2,
zipcode,
country,
)
assert self.db.DomainChange.select().count() == 15
logWhoisQuery(
self.db,
logStatus(self.db, "foo"),
domain,
registrar_2,
whois_server_2,
creation_date_2,
updated_date_2,
expiration_date_2,
name_servers_2,
whois_status_2,
emails_2,
dnssec_2,
name_2,
org_2,
address_2,
city_2,
state_2,
zipcode_2,
country,
)
assert self.db.DomainChange.select().count() == 16
logWhoisQuery(
self.db,
logStatus(self.db, "foo"),
domain,
registrar_2,
whois_server_2,
creation_date_2,
updated_date_2,
expiration_date_2,
name_servers_2,
whois_status_2,
emails_2,
dnssec_2,
name_2,
org_2,
address_2,
city_2,
state_2,
zipcode_2,
country_2,
)
assert self.db.DomainChange.select().count() == 17
def test_logWhoisQuery_insertDifferentDomain(self):
domain = "http://example.org"
domain_2 = domain + "foo"
registrar = "REGISTRAR"
whois_server = "WHOISSERVER"
creation_date = datetime.datetime.utcnow()
updated_date = datetime.datetime.utcnow()
expiration_date = datetime.datetime.utcnow()
name_servers = "NAMESERVER"
whois_status = "WHOISSTATUS"
emails = "EMAIL"
dnssec = "DNSSEC"
name = "NAME"
org = "ORG"
address = "ADDRESS"
city = "CITY"
state = "STATE"
zipcode = "ZIPCODE"
country = "COUNTRY"
status_id = logStatus(self.db, "foo")
logWhoisQuery(
self.db,
status_id,
domain,
registrar,
whois_server,
creation_date,
updated_date,
expiration_date,
name_servers,
whois_status,
emails,
dnssec,
name,
org,
address,
city,
state,
zipcode,
country,
)
logWhoisQuery(
self.db,
status_id,
domain_2,
registrar,
whois_server,
creation_date,
updated_date,
expiration_date,
name_servers,
whois_status,
emails,
dnssec,
name,
org,
address,
city,
state,
zipcode,
country,
)
assert self.db.DomainChange.select().count() == 2
################################################
# queryWhois
################################################
def test_queryWhois_default(self):
domain = "http://example.org"
registrar = "REGISTRAR"
whois_server = "WHOISSERVER"
creation_date = datetime.datetime.utcnow()
updated_date = datetime.datetime.utcnow()
expiration_date = datetime.datetime.utcnow()
name_servers = "NAMESERVER"
whois_status = "WHOISSTATUS"
emails = "EMAIL"
dnssec = "DNSSEC"
name = "NAME"
org = "ORG"
address = "ADDRESS"
city = "CITY"
state = "STATE"
zipcode = "ZIPCODE"
country = "COUNTRY"
status_id = logStatus(self.db, "foo")
mock_answer = MockAnswer(
registrar,
whois_server,
creation_date,
updated_date,
expiration_date,
name_servers,
whois_status,
emails,
dnssec,
name,
org,
address,
city,
state,
zipcode,
country,
)
with mock.patch("surykatka.domain.whois.whois") as mock_whois:
mock_whois.return_value = mock_answer
result = queryWhois(self.db, status_id, domain)
assert mock_whois.call_count == 1
mock_whois.assert_called_with(domain)
assert self.db.DomainChange.select().count() == 1
assert self.db.DomainChange.get().domain == domain
assert self.db.DomainChange.get().registrar == registrar
assert self.db.DomainChange.get().whois_server == whois_server
assert self.db.DomainChange.get().creation_date == creation_date
assert self.db.DomainChange.get().updated_date == updated_date
assert self.db.DomainChange.get().expiration_date == expiration_date
assert self.db.DomainChange.get().name_servers == name_servers
assert self.db.DomainChange.get().whois_status == whois_status
assert self.db.DomainChange.get().emails == emails
assert self.db.DomainChange.get().dnssec == dnssec
assert self.db.DomainChange.get().name == name
assert self.db.DomainChange.get().org == org
assert self.db.DomainChange.get().address == address
assert self.db.DomainChange.get().city == city
assert self.db.DomainChange.get().state == state
assert self.db.DomainChange.get().zipcode == zipcode
assert self.db.DomainChange.get().country == country
assert self.db.DomainChange.get().status_id == status_id
assert result == mock_answer
################################################
# packDns
################################################
def test_packDomain_oldLog(self):
domain = "http://example.org"
registrar = "REGISTRAR"
registrar_2 = registrar + "foo"
whois_server = "WHOISSERVER"
creation_date = datetime.datetime(2000, 1, 1)
updated_date = datetime.datetime(2000, 1, 3)
expiration_date = datetime.datetime(2000, 1, 5)
name_servers = "NAMESERVER"
whois_status = "WHOISSTATUS"
emails = "EMAIL"
dnssec = "DNSSEC"
name = "NAME"
org = "ORG"
address = "ADDRESS"
city = "CITY"
state = "STATE"
zipcode = "ZIPCODE"
country = "COUNTRY"
logWhoisQuery(
self.db,
logStatus(self.db, "foo"),
domain,
registrar,
whois_server,
creation_date,
updated_date,
expiration_date,
name_servers,
whois_status,
emails,
dnssec,
name,
org,
address,
city,
state,
zipcode,
country,
)
status_id_2 = logStatus(self.db, "foo")
logWhoisQuery(
self.db,
status_id_2,
domain,
registrar_2,
whois_server,
creation_date,
updated_date,
expiration_date,
name_servers,
whois_status,
emails,
dnssec,
name,
org,
address,
city,
state,
zipcode,
country,
)
assert self.db.DomainChange.select().count() == 2
result = packDomain(self.db)
assert self.db.DomainChange.select().count() == 1
assert self.db.DomainChange.get().status_id == status_id_2
assert result == None
def test_packDns_keepDifferentUrl(self):
domain = "http://example.org"
domain_2 = domain + "foo"
registrar = "REGISTRAR"
whois_server = "WHOISSERVER"
creation_date = datetime.datetime.utcnow()
updated_date = datetime.datetime.utcnow()
expiration_date = datetime.datetime.utcnow()
name_servers = "NAMESERVER"
whois_status = "WHOISSTATUS"
emails = "EMAIL"
dnssec = "DNSSEC"
name = "NAME"
org = "ORG"
address = "ADDRESS"
city = "CITY"
state = "STATE"
zipcode = "ZIPCODE"
country = "COUNTRY"
status_id = logStatus(self.db, "foo")
logWhoisQuery(
self.db,
status_id,
domain,
registrar,
whois_server,
creation_date,
updated_date,
expiration_date,
name_servers,
whois_status,
emails,
dnssec,
name,
org,
address,
city,
state,
zipcode,
country,
)
logWhoisQuery(
self.db,
status_id,
domain_2,
registrar,
whois_server,
creation_date,
updated_date,
expiration_date,
name_servers,
whois_status,
emails,
dnssec,
name,
org,
address,
city,
state,
zipcode,
country,
)
assert self.db.DomainChange.select().count() == 2
result = packDomain(self.db)
assert self.db.DomainChange.select().count() == 2
assert result == None
def suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(SurykatkaDomainTestCase))
return suite
if __name__ == "__main__":
unittest.main(defaultTest="suite")
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment