Commit 8bd2b9cc authored by Romain Courteaud's avatar Romain Courteaud

MORE TEST

parent b1989d93
import unittest
from urlchecker_bot import WebBot
import mock
from test_urlchecker_dns import MockAnswer
import urlchecker_dns
def checkNetworkChange(bot, result_list):
assert bot._db.NetworkChange.select().count() == len(result_list)
select_list = (
bot._db.NetworkChange.select()
.order_by(bot._db.NetworkChange.ip.asc())
.order_by(bot._db.NetworkChange.port.asc())
)
assert [(x.ip, x.port) for x in select_list] == result_list
def checkHttpCodeChange(bot, result_list):
assert bot._db.HttpCodeChange.select().count() == len(result_list)
select_list = (
bot._db.HttpCodeChange.select()
.order_by(bot._db.HttpCodeChange.ip.asc())
.order_by(bot._db.HttpCodeChange.url.asc())
)
assert [(x.ip, x.url) for x in select_list] == result_list
def checkDnsChange(bot, result_list):
assert bot._db.DnsChange.select().count() == len(result_list)
select_list = (
bot._db.DnsChange.select()
.order_by(bot._db.DnsChange.resolver_ip.asc())
.order_by(bot._db.DnsChange.domain.asc())
)
assert [(x.resolver_ip, x.domain) for x in select_list] == result_list
class UrlCheckerBotTestCase(unittest.TestCase):
def test_emptyConfiguration(self):
resolver_ip = "192.168.0.254"
bot = WebBot(mapping={"SQLITE": ":memory:"})
bot.initDB()
resolver = urlchecker_dns.dns.resolver.Resolver(configure=False)
resolver.nameservers.append(resolver_ip)
with mock.patch(
"urlchecker_dns.get_default_resolver"
) as mock_get_default_resolver:
mock_get_default_resolver.return_value = resolver
with mock.patch(
"urlchecker_dns.dns.resolver.Resolver.query"
) as mock_query:
mock_query.return_value = [MockAnswer("1.2.3.4")]
bot.iterateLoop()
assert bot._db.Status.select().count() == 1
assert bot._db.Status.get().text == "loop"
checkNetworkChange(bot, [(resolver_ip, 53)])
checkDnsChange(bot, [(resolver_ip, "example.org")])
checkHttpCodeChange(bot, [])
def test_oneNameserverOneDomainOneIp(self):
resolver_ip = "127.0.0.1"
bot = WebBot(mapping={"SQLITE": ":memory:", "DOMAIN": "example.org"})
bot.initDB()
resolver = urlchecker_dns.dns.resolver.Resolver(configure=False)
resolver.nameservers.append(resolver_ip)
with mock.patch(
"urlchecker_dns.get_default_resolver"
) as mock_get_default_resolver, mock.patch(
"urlchecker_dns.dns.resolver.Resolver.query"
) as mock_query, mock.patch(
"urlchecker_network.socket.socket"
), mock.patch(
"urlchecker_http.request"
):
mock_get_default_resolver.return_value = resolver
mock_query.return_value = [MockAnswer("1.2.3.4")]
bot.iterateLoop()
checkNetworkChange(
bot, [(resolver_ip, 53), ("1.2.3.4", 80), ("1.2.3.4", 443)]
)
checkDnsChange(bot, [(resolver_ip, "example.org")])
checkHttpCodeChange(
bot,
[
("1.2.3.4", "http://example.org"),
("1.2.3.4", "https://example.org"),
],
)
def test_twoSimilarNameserverOneDomainOneIp(self):
resolver_ip = "127.0.0.1"
resolver_ip_2 = "127.0.0.2"
bot = WebBot(
mapping={
"SQLITE": ":memory:",
"DOMAIN": "example.org",
"DNS": "\n".join([resolver_ip, resolver_ip_2]),
}
)
bot.initDB()
with mock.patch(
"urlchecker_dns.dns.resolver.Resolver.query"
) as mock_query, mock.patch(
"urlchecker_network.socket.socket"
), mock.patch(
"urlchecker_http.request"
):
mock_query.return_value = [MockAnswer("1.2.3.4")]
bot.iterateLoop()
checkNetworkChange(
bot,
[
(resolver_ip, 53),
(resolver_ip_2, 53),
("1.2.3.4", 80),
("1.2.3.4", 443),
],
)
checkDnsChange(
bot, [(resolver_ip, "example.org"), (resolver_ip_2, "example.org")]
)
checkHttpCodeChange(
bot,
[
("1.2.3.4", "http://example.org"),
("1.2.3.4", "https://example.org"),
],
)
def test_oneNameserverTwoDomainOneIp(self):
resolver_ip = "127.0.0.1"
domain_1 = "example.org"
domain_2 = "foo.example.org"
bot = WebBot(
mapping={
"SQLITE": ":memory:",
"DOMAIN": "\n".join([domain_1, domain_2]),
"DNS": resolver_ip,
}
)
bot.initDB()
with mock.patch(
"urlchecker_dns.dns.resolver.Resolver.query"
) as mock_query, mock.patch(
"urlchecker_network.socket.socket"
), mock.patch(
"urlchecker_http.request"
):
mock_query.return_value = [MockAnswer("1.2.3.4")]
bot.iterateLoop()
checkNetworkChange(
bot, [(resolver_ip, 53), ("1.2.3.4", 80), ("1.2.3.4", 443)]
)
checkDnsChange(bot, [(resolver_ip, domain_1), (resolver_ip, domain_2)])
checkHttpCodeChange(
bot,
[
("1.2.3.4", "http://%s" % domain_1),
("1.2.3.4", "http://%s" % domain_2),
("1.2.3.4", "https://%s" % domain_1),
("1.2.3.4", "https://%s" % domain_2),
],
)
def test_oneNameserverOneDomainTwoIp(self):
resolver_ip = "127.0.0.1"
domain = "example.org"
bot = WebBot(
mapping={
"SQLITE": ":memory:",
"DOMAIN": domain,
"DNS": resolver_ip,
}
)
bot.initDB()
with mock.patch(
"urlchecker_dns.dns.resolver.Resolver.query"
) as mock_query, mock.patch(
"urlchecker_network.socket.socket"
), mock.patch(
"urlchecker_http.request"
):
mock_query.return_value = [
MockAnswer("1.2.3.4"),
MockAnswer("1.2.3.5"),
]
bot.iterateLoop()
checkNetworkChange(
bot,
[
(resolver_ip, 53),
("1.2.3.4", 80),
("1.2.3.5", 80),
("1.2.3.4", 443),
("1.2.3.5", 443),
],
)
checkDnsChange(bot, [(resolver_ip, domain)])
checkHttpCodeChange(
bot,
[
("1.2.3.4", "http://%s" % domain),
("1.2.3.5", "http://%s" % domain),
("1.2.3.4", "https://%s" % domain),
("1.2.3.5", "https://%s" % domain),
],
)
def test_oneNameserverOneSubDomainOneIp(self):
resolver_ip = "127.0.0.1"
domain = "example.org"
sub_domain = "foo.%s" % domain
bot = WebBot(
mapping={
"SQLITE": ":memory:",
"DOMAIN": sub_domain,
"DNS": resolver_ip,
}
)
bot.initDB()
with mock.patch(
"urlchecker_dns.dns.resolver.Resolver.query"
) as mock_query, mock.patch(
"urlchecker_network.socket.socket"
), mock.patch(
"urlchecker_http.request"
):
mock_query.return_value = [MockAnswer("1.2.3.4")]
bot.iterateLoop()
checkNetworkChange(
bot, [(resolver_ip, 53), ("1.2.3.4", 80), ("1.2.3.4", 443)]
)
checkDnsChange(bot, [(resolver_ip, domain), (resolver_ip, sub_domain)])
checkHttpCodeChange(
bot,
[
("1.2.3.4", "http://%s" % domain),
("1.2.3.4", "http://%s" % sub_domain),
("1.2.3.4", "https://%s" % domain),
("1.2.3.4", "https://%s" % sub_domain),
],
)
def test_oneNameserverOneUrlOneIp(self):
resolver_ip = "127.0.0.1"
domain = "example.org"
bot = WebBot(
mapping={
"SQLITE": ":memory:",
"URL": "https://example.org/foo",
"DNS": resolver_ip,
}
)
bot.initDB()
with mock.patch(
"urlchecker_dns.dns.resolver.Resolver.query"
) as mock_query, mock.patch(
"urlchecker_network.socket.socket"
), mock.patch(
"urlchecker_http.request"
):
mock_query.return_value = [MockAnswer("1.2.3.4")]
bot.iterateLoop()
checkNetworkChange(
bot, [(resolver_ip, 53), ("1.2.3.4", 80), ("1.2.3.4", 443)]
)
checkDnsChange(bot, [(resolver_ip, domain)])
checkHttpCodeChange(
bot,
[
("1.2.3.4", "http://%s" % domain),
("1.2.3.4", "https://%s" % domain),
("1.2.3.4", "https://example.org/foo"),
],
)
def suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(UrlCheckerBotTestCase))
return suite
if __name__ == "__main__":
unittest.main(defaultTest="suite")
...@@ -3,6 +3,7 @@ from urlchecker_db import LogDB ...@@ -3,6 +3,7 @@ from urlchecker_db import LogDB
import urlchecker_http import urlchecker_http
from urlchecker_http import ( from urlchecker_http import (
getUrlHostname, getUrlHostname,
getRootUrl,
getUserAgent, getUserAgent,
request, request,
logHttpStatus, logHttpStatus,
...@@ -26,6 +27,13 @@ class UrlCheckerHttpTestCase(unittest.TestCase): ...@@ -26,6 +27,13 @@ class UrlCheckerHttpTestCase(unittest.TestCase):
result = getUrlHostname("http://example.org/foo?bar=1") result = getUrlHostname("http://example.org/foo?bar=1")
assert result == "example.org" assert result == "example.org"
################################################
# getRootUrl
################################################
def test_getRootUrl(self):
result = getRootUrl("https://example.org/foo?bar=1")
assert result == "https://example.org"
################################################ ################################################
# getUserAgent # getUserAgent
################################################ ################################################
......
...@@ -18,6 +18,10 @@ class UrlCheckerStatusTestCase(unittest.TestCase): ...@@ -18,6 +18,10 @@ class UrlCheckerStatusTestCase(unittest.TestCase):
result2 = logStatus(self.db, "foo") result2 = logStatus(self.db, "foo")
assert self.db.Status.select().count() == 2 assert self.db.Status.select().count() == 2
assert result1 < result2 assert result1 < result2
assert (
self.db.Status.get(self.db.Status.id == result1).timestamp
<= self.db.Status.get(self.db.Status.id == result2).timestamp
)
def suite(): def suite():
......
import time import time
from urlchecker_db import LogDB from urlchecker_db import LogDB
from urlchecker_configuration import createConfiguration, logConfiguration from urlchecker_configuration import createConfiguration, logConfiguration
from urlchecker_platform import logPlatform
from urlchecker_status import logStatus from urlchecker_status import logStatus
from urlchecker_dns import ( from urlchecker_dns import (
getReachableResolverList, getReachableResolverList,
expandDomainList, expandDomainList,
getDomainIpDict, getDomainIpDict,
) )
from urlchecker_http import getUrlHostname, checkHttpStatus from urlchecker_http import getRootUrl, getUrlHostname, checkHttpStatus
from urlchecker_network import isTcpPortOpen from urlchecker_network import isTcpPortOpen
...@@ -23,13 +22,13 @@ class WebBot: ...@@ -23,13 +22,13 @@ class WebBot:
def __init__(self, **kw): def __init__(self, **kw):
self.config = createConfiguration(**kw) self.config = createConfiguration(**kw)
def initDB(self, sqlite_path): def initDB(self):
self._db = LogDB(sqlite_path) self._db = LogDB(self.config["SQLITE"])
self._db.createTables() self._db.createTables()
def iterateLoop(self): def iterateLoop(self):
status_id = logStatus(self._db, "loop") status_id = logStatus(self._db, "loop")
logPlatform(self._db, __version__, status_id) # logPlatform(self._db, __version__, status_id)
# Calculate the resolver list # Calculate the resolver list
resolver_ip_list = getReachableResolverList( resolver_ip_list = getReachableResolverList(
...@@ -37,7 +36,6 @@ class WebBot: ...@@ -37,7 +36,6 @@ class WebBot:
) )
if not resolver_ip_list: if not resolver_ip_list:
return return
# Calculate the full list of domain to check # Calculate the full list of domain to check
domain_list = self.config["DOMAIN"].split() domain_list = self.config["DOMAIN"].split()
...@@ -71,6 +69,9 @@ class WebBot: ...@@ -71,6 +69,9 @@ class WebBot:
url_dict[url].append(server_ip) url_dict[url].append(server_ip)
# XXX put back orignal url list # XXX put back orignal url list
for url in url_list:
if url not in url_dict:
url_dict[url] = url_dict[getRootUrl(url)]
# Check HTTP Status # Check HTTP Status
for url in url_dict: for url in url_dict:
...@@ -87,7 +88,7 @@ class WebBot: ...@@ -87,7 +88,7 @@ class WebBot:
self._db.close() self._db.close()
def run(self): def run(self):
self.initDB(self.config["SQLITE"]) self.initDB()
status_id = logStatus(self._db, "start") status_id = logStatus(self._db, "start")
logConfiguration(self._db, status_id, self.config) logConfiguration(self._db, status_id, self.config)
......
...@@ -17,9 +17,9 @@ class LogDB: ...@@ -17,9 +17,9 @@ class LogDB:
# This store the start, stop, loop time of the bot # This store the start, stop, loop time of the bot
# All other tables point to it to be able to group some info # All other tables point to it to be able to group some info
class Status(BaseModel): class Status(BaseModel):
text = peewee.TextField(index=True) text = peewee.TextField()
timestamp = peewee.TimestampField( timestamp = peewee.TimestampField(
index=True, constraints=[peewee.SQL("DEFAULT now")] constraints=[peewee.SQL("DEFAULT now")]
) )
# Store the configuration modification # Store the configuration modification
......
...@@ -11,6 +11,11 @@ def getUrlHostname(url): ...@@ -11,6 +11,11 @@ def getUrlHostname(url):
return urlparse(url).hostname return urlparse(url).hostname
def getRootUrl(url):
parsed_url = urlparse(url)
return "%s://%s" % (parsed_url.scheme, parsed_url.hostname)
def getUserAgent(version): def getUserAgent(version):
return "%s/%s (+%s)" % ( return "%s/%s (+%s)" % (
"URLCHECKER", "URLCHECKER",
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment