Commit 4b864b82 authored by Łukasz Nowak's avatar Łukasz Nowak

check_surykatka_json: Implement whois checks

Also improve tests to minimize assertions and make them more readable,
including separating specific tests from global ones, which will make
it much easier to improve the coverage.
parent 48726608
Pipeline #32824 passed with stage
in 0 seconds
......@@ -29,7 +29,8 @@ class RunPromise(GenericPromise):
self.getConfig('failure-amount', self.getConfig('failure_amount', 1)))
self.enabled_sense_list = self.getConfig(
'enabled-sense-list',
'dns_query tcp_server http_query ssl_certificate elapsed_time').split()
'dns_query whois tcp_server http_query ssl_certificate'
' elapsed_time').split()
self.result_count = self.failure_amount
self.error = False
self.message_list = []
......@@ -260,6 +261,63 @@ class RunPromise(GenericPromise):
else:
self.appendError('IP %s:%s' % (ip, port))
def senseWhois(self):
key = 'whois'
self.appendMessage('%s:' % (key, ))
url = self.getConfig('url')
parsed_url = urlparse(url)
hostname = parsed_url.netloc
if not hostname:
self.appendError('url is incorrect')
return
domain_expiration_days = self.getConfig(
'domain-expiration-days', '30')
try:
domain_expiration_days = int(domain_expiration_days)
except ValueError:
self.appendError(
'domain-expiration-days %r is incorrect' % (
self.getConfig('domain-expiration-days')))
return
if key not in self.surykatka_json:
self.appendError("%r not in %r" % (key, self.json_file))
return
def checkHostnameDomain(hostname, domain):
if hostname == domain:
return True
elif hostname.endswith('.' + domain):
return True
return False
entry_list = [
q for q in self.surykatka_json[key]
if checkHostnameDomain(hostname, q['domain'])]
if len(entry_list) == 0:
self.appendError('No data')
return
if len(entry_list) > 1:
self.appendError('Bad data')
return
entry = entry_list[0]
expiration_date = entry['expiration_date']
if expiration_date is None:
self.appendError('Expiration date not avaliable')
timetuple = email.utils.parsedate(expiration_date)
if timetuple is None:
self.appendError("Can't parse date %s" % (expiration_date,))
domain_expiration_time = datetime.datetime.fromtimestamp(
time.mktime(timetuple))
if domain_expiration_time - datetime.timedelta(
days=domain_expiration_days) < self.utcnow:
self.appendError(
'%s expires in < %s days' % (entry['domain'], domain_expiration_days,))
else:
self.appendOk(
'%s expires in > %s days' % (entry['domain'], domain_expiration_days,))
def senseElapsedTime(self):
key = 'elapsed_time'
self.appendMessage('%s:' % (key, ))
......@@ -322,6 +380,7 @@ class RunPromise(GenericPromise):
elif report == 'http_query':
for check_name, check_method in [
('dns_query', self.senseDnsQuery),
('whois', self.senseWhois),
('tcp_server', self.senseTcpServer),
('http_query', self.senseHttpQuery),
('ssl_certificate', self.senseSslCertificate),
......
......@@ -24,11 +24,13 @@ class CheckSurykatkaJSONMixin(TestPromisePluginMixin):
day = 24 * 3600
create_date = email.utils.formatdate
self.time_past14d = create_date(now - 14 * day)
self.time_past29d = create_date(now - 29 * day)
self.time_past20m = create_date(now - 20 * minute)
self.time_past2m = create_date(now - 2 * minute)
self.time_future20m = create_date(now + 20 * minute)
self.time_future3d = create_date(now + 3 * day)
self.time_future14d = create_date(now + 14 * day)
self.time_future29d = create_date(now + 29 * day)
self.time_future60d = create_date(now + 60 * day)
def writeSurykatkaPromise(self, d=None):
......@@ -62,6 +64,23 @@ class CheckSurykatkaJSONMixin(TestPromisePluginMixin):
result['result']['message'],
message)
def runAndAssertPassedMessage(self, message):
self.configureLauncher(enable_anomaly=True)
self.launcher.run()
self.assertPassedMessage(
self.getPromiseResult(self.promise_name),
message
)
def runAndAssertFailedMessage(self, message):
self.configureLauncher(enable_anomaly=True)
with self.assertRaises(PromiseError):
self.launcher.run()
self.assertFailedMessage(
self.getPromiseResult(self.promise_name),
message
)
class TestCheckSurykatkaJSONBase(CheckSurykatkaJSONMixin):
def test_no_config(self):
......@@ -221,6 +240,13 @@ class TestCheckSurykatkaJSONBotStatus(CheckSurykatkaJSONMixin):
class TestCheckSurykatkaJSONHttpQuery(CheckSurykatkaJSONMixin):
def writeSurykatkaPromise(self, d):
d.update(**{
'report': 'http_query',
'json-file': self.json_file,
})
super().writeSurykatkaPromise(d)
def setUp(self):
super().setUp()
self.writeSurykatkaJson({
......@@ -249,62 +275,6 @@ class TestCheckSurykatkaJSONHttpQuery(CheckSurykatkaJSONMixin):
"url": "http://www.httpallok.com/",
"total_seconds": 4
},
{
"ip": "127.0.0.1",
"status_code": 302,
"url": "https://www.elapsedtoolong.com/",
"total_seconds": 6
},
{
"ip": "127.0.0.1",
"status_code": 302,
"url": "https://www.elapsednototal.com/",
},
{
"ip": "127.0.0.1",
"status_code": 200,
"url": "http://www.httpheader.com/",
"http_header_dict": {
"Vary": "Accept-Encoding", "Cache-Control": "max-age=300, public"},
},
{
"ip": "127.0.0.1",
"status_code": 302,
"url": "https://www.cert3.com/",
"total_seconds": 4
},
{
"ip": "127.0.0.1",
"status_code": 302,
"url": "https://www.cert14.com/",
"total_seconds": 4
},
{
"ip": "127.0.0.1",
"status_code": 302,
"url": "https://www.certminus14.com/",
"total_seconds": 4
},
{
"ip": "127.0.0.1",
"status_code": 302,
"url": "https://www.nosslcertificatedata.com/",
},
{
"ip": "127.0.0.1",
"status_code": 302,
"url": "http://www.badip.com/",
},
{
"ip": "127.0.0.4",
"status_code": 302,
"url": "http://www.badip.com/",
},
{
"ip": "127.0.0.1",
"status_code": 301,
"url": "https://www.sslcertnoinfo.com/",
},
],
"ssl_certificate": [
{
......@@ -317,26 +287,6 @@ class TestCheckSurykatkaJSONHttpQuery(CheckSurykatkaJSONMixin):
"ip": "127.0.0.2",
"not_after": self.time_future60d
},
{
"hostname": "www.cert3.com",
"ip": "127.0.0.1",
"not_after": self.time_future3d
},
{
"hostname": "www.cert14.com",
"ip": "127.0.0.1",
"not_after": self.time_future14d
},
{
"hostname": "www.certminus14.com",
"ip": "127.0.0.1",
"not_after": self.time_past14d
},
{
"hostname": "www.sslcertnoinfo.com",
"ip": "127.0.0.1",
"not_after": None
},
],
"dns_query": [
{
......@@ -351,18 +301,6 @@ class TestCheckSurykatkaJSONHttpQuery(CheckSurykatkaJSONMixin):
"resolver_ip": "1.2.3.4",
"response": "127.0.0.1, 127.0.0.2"
},
{
"domain": "www.badip.com",
"rdtype": "A",
"resolver_ip": "1.2.3.4",
"response": "127.0.0.1, 127.0.0.4"
},
{
"domain": "www.dnsquerynoreply.com",
"rdtype": "A",
"resolver_ip": "1.2.3.4",
"response": ""
},
],
"tcp_server": [
{
......@@ -389,61 +327,22 @@ class TestCheckSurykatkaJSONHttpQuery(CheckSurykatkaJSONMixin):
"port": 80,
"domain": "www.httpallok.com"
},
],
"whois": [
{
"ip": "127.0.0.1",
"state": "open",
"port": 80,
"domain": "www.httpheader.com"
},
{
"ip": "127.0.0.1",
"state": "open",
"port": 80,
"domain": "www.badip.com"
},
{
"ip": "127.0.0.4",
"state": "open",
"port": 80,
"domain": "www.badip.com"
},
{
"ip": "127.0.0.2",
"state": "open",
"port": 80,
"domain": "www.tcpservernoip.com"
"domain": "allok.com",
"expiration_date": self.time_future60d,
},
{
"ip": "127.0.0.1",
"state": "filtered",
"port": 80,
"domain": "www.tcpserverfiltered.com"
"domain": "httpallok.com",
"expiration_date": self.time_future60d,
},
]
})
def runAndAssertPassedMessage(self, message):
self.configureLauncher(enable_anomaly=True)
self.launcher.run()
self.assertPassedMessage(
self.getPromiseResult(self.promise_name),
message
)
def runAndAssertFailedMessage(self, message):
self.configureLauncher(enable_anomaly=True)
with self.assertRaises(PromiseError):
self.launcher.run()
self.assertFailedMessage(
self.getPromiseResult(self.promise_name),
message
)
def test_all_ok(self):
self.writeSurykatkaPromise(
{
'report': 'http_query',
'json-file': self.json_file,
'url': 'https://www.allok.com/',
'status-code': '302',
'ip-list': '127.0.0.1 127.0.0.2',
......@@ -453,6 +352,7 @@ class TestCheckSurykatkaJSONHttpQuery(CheckSurykatkaJSONMixin):
self.runAndAssertPassedMessage(
"https://www.allok.com/ : "
"dns_query: OK resolver's 1.2.3.4: 127.0.0.1 127.0.0.2 "
"whois: OK allok.com expires in > 30 days "
"tcp_server: OK IP 127.0.0.1:443 OK IP 127.0.0.2:443 "
"http_query: OK IP 127.0.0.1 status_code 302 OK IP 127.0.0.2 "
"status_code 302 "
......@@ -462,47 +362,6 @@ class TestCheckSurykatkaJSONHttpQuery(CheckSurykatkaJSONMixin):
"< 5.00s"
)
def test_maximum_elapsed_time_too_long(self):
self.writeSurykatkaPromise(
{
'report': 'http_query',
'json-file': self.json_file,
'url': 'https://www.elapsedtoolong.com/',
'status-code': '302',
'ip-list': '127.0.0.1',
'maximum-elapsed-time': '5',
}
)
self.runAndAssertFailedMessage(
"https://www.elapsedtoolong.com/ : "
"dns_query: ERROR No data "
"tcp_server: ERROR No data "
"http_query: OK IP 127.0.0.1 status_code 302 "
"ssl_certificate: ERROR No data "
"elapsed_time: ERROR IP 127.0.0.1 replied > 5.00s"
)
def test_maximum_elapsed_no_match(self):
self.writeSurykatkaPromise(
{
'report': 'http_query',
'json-file': self.json_file,
'url': 'https://www.elapsednototal.com/',
'status-code': '302',
'ip-list': '127.0.0.1',
'maximum-elapsed-time': '5',
}
)
self.runAndAssertFailedMessage(
"https://www.elapsednototal.com/ : "
"dns_query: ERROR No data "
"tcp_server: ERROR No data "
"http_query: OK IP 127.0.0.1 status_code 302 "
"ssl_certificate: ERROR No data "
"elapsed_time: ERROR No entry with total_seconds found. If the error "
"persist, please update surykatka"
)
def test_http_all_ok(self):
self.writeSurykatkaPromise(
{
......@@ -517,6 +376,7 @@ class TestCheckSurykatkaJSONHttpQuery(CheckSurykatkaJSONMixin):
self.runAndAssertPassedMessage(
"http://www.httpallok.com/ : "
"dns_query: OK resolver's 1.2.3.4: 127.0.0.1 127.0.0.2 "
"whois: OK httpallok.com expires in > 30 days "
"tcp_server: OK IP 127.0.0.1:80 OK IP 127.0.0.2:80 "
"http_query: OK IP 127.0.0.1 status_code 302 OK IP 127.0.0.2 "
"status_code 302 "
......@@ -525,56 +385,9 @@ class TestCheckSurykatkaJSONHttpQuery(CheckSurykatkaJSONMixin):
"< 5.00s"
)
def test_http_with_header_dict(self):
self.writeSurykatkaPromise(
{
'report': 'http_query',
'json-file': self.json_file,
'url': 'http://www.httpheader.com/',
'status-code': '200',
'http-header-dict': '{"Vary": "Accept-Encoding", "Cache-Control": '
'"max-age=300, public"}',
}
)
self.runAndAssertPassedMessage(
'http://www.httpheader.com/ : '
'dns_query: OK No check configured '
'tcp_server: OK No check configured '
'http_query: OK IP 127.0.0.1 status_code 200 OK IP 127.0.0.1 HTTP '
'Header {"Cache-Control": "max-age=300, public", "Vary": '
'"Accept-Encoding"} '
'ssl_certificate: OK No check needed '
'elapsed_time: OK No check configured'
)
def test_http_with_header_dict_mismatch(self):
self.writeSurykatkaPromise(
{
'report': 'http_query',
'json-file': self.json_file,
'url': 'http://www.httpheader.com/',
'status-code': '200',
'http-header-dict': '{"Vary": "Accept-Encoding", "Cache-Control": '
'"max-age=300"}',
}
)
self.runAndAssertFailedMessage(
'http://www.httpheader.com/ : '
'dns_query: OK No check configured '
'tcp_server: OK No check configured '
'http_query: OK IP 127.0.0.1 status_code 200 ERROR IP 127.0.0.1 '
'HTTP Header {"Cache-Control": "max-age=300", "Vary": '
'"Accept-Encoding"} != {"Cache-Control": "max-age=300, public", "Vary": '
'"Accept-Encoding"} '
'ssl_certificate: OK No check needed '
'elapsed_time: OK No check configured'
)
def test_configuration_no_ip_list(self):
self.writeSurykatkaPromise(
{
'report': 'http_query',
'json-file': self.json_file,
'url': 'https://www.allok.com/',
'status-code': '302',
}
......@@ -582,6 +395,7 @@ class TestCheckSurykatkaJSONHttpQuery(CheckSurykatkaJSONMixin):
self.runAndAssertPassedMessage(
"https://www.allok.com/ : "
"dns_query: OK No check configured "
"whois: OK allok.com expires in > 30 days "
"tcp_server: OK No check configured "
"http_query: OK IP 127.0.0.1 status_code 302 OK IP 127.0.0.2 "
"status_code 302 "
......@@ -590,196 +404,395 @@ class TestCheckSurykatkaJSONHttpQuery(CheckSurykatkaJSONMixin):
"elapsed_time: OK No check configured"
)
def test_good_certificate_2_day(self):
def test_all_ok_nothing_enabled(self):
self.writeSurykatkaPromise(
{
'report': 'http_query',
'json-file': self.json_file,
'url': 'https://www.cert3.com/',
'url': 'https://www.allok.com/',
'status-code': '302',
'certificate-expiration-days': '2'
'ip-list': '127.0.0.1 127.0.0.2',
'maximum-elapsed-time': '5',
'enabled-sense-list': '',
}
)
self.runAndAssertPassedMessage(
"https://www.cert3.com/ : "
"dns_query: OK No check configured "
"tcp_server: OK No check configured "
"http_query: OK IP 127.0.0.1 status_code 302 "
"ssl_certificate: OK IP 127.0.0.1 expires in > 2 days "
"elapsed_time: OK No check configured"
"https://www.allok.com/ :"
)
def test_expired_certificate_4_day(self):
def test_all_ok_no_ssl_certificate(self):
self.writeSurykatkaPromise(
{
'report': 'http_query',
'json-file': self.json_file,
'url': 'https://www.cert3.com/',
'url': 'https://www.allok.com/',
'status-code': '302',
'certificate-expiration-days': '4'
'ip-list': '127.0.0.1 127.0.0.2',
'maximum-elapsed-time': '5',
'enabled-sense-list': 'dns_query whois tcp_server http_query '
'elapsed_time',
}
)
self.runAndAssertFailedMessage(
"https://www.cert3.com/ : "
"dns_query: OK No check configured "
"tcp_server: OK No check configured "
"http_query: OK IP 127.0.0.1 status_code 302 "
"ssl_certificate: ERROR IP 127.0.0.1 expires in < 4 days "
"elapsed_time: OK No check configured"
self.runAndAssertPassedMessage(
"https://www.allok.com/ : "
"dns_query: OK resolver's 1.2.3.4: 127.0.0.1 127.0.0.2 "
"whois: OK allok.com expires in > 30 days "
"tcp_server: OK IP 127.0.0.1:443 OK IP 127.0.0.2:443 "
"http_query: OK IP 127.0.0.1 status_code 302 OK IP 127.0.0.2 "
"status_code 302 "
"elapsed_time: OK IP 127.0.0.1 replied < 5.00s OK IP 127.0.0.2 replied "
"< 5.00s"
)
def test_expired_certificate(self):
def test_all_ok_only_ssl_certificate(self):
self.writeSurykatkaPromise(
{
'report': 'http_query',
'json-file': self.json_file,
'url': 'https://www.cert14.com/',
'url': 'https://www.allok.com/',
'status-code': '302',
'ip-list': '127.0.0.1 127.0.0.2',
'maximum-elapsed-time': '5',
'enabled-sense-list': 'ssl_certificate',
}
)
self.runAndAssertPassedMessage(
"https://www.allok.com/ : "
"ssl_certificate: OK IP 127.0.0.1 expires in > 15 days OK IP "
"127.0.0.2 expires in > 15 days"
)
class TestCheckSurykatkaJSONHttpQueryDnsQuery(CheckSurykatkaJSONMixin):
def writeSurykatkaPromise(self, d):
d.update(**{
'report': 'http_query',
'json-file': self.json_file,
'enabled-sense-list': 'dns_query',
})
super().writeSurykatkaPromise(d)
def setUp(self):
super().setUp()
self.writeSurykatkaJson({
"dns_query": [
{
"domain": "www.httpallok.com",
"rdtype": "A",
"resolver_ip": "1.2.3.4",
"response": "127.0.0.1, 127.0.0.2"
},
{
"domain": "www.badip.com",
"rdtype": "A",
"resolver_ip": "1.2.3.4",
"response": "127.0.0.1, 127.0.0.4"
},
{
"domain": "www.dnsquerynoreply.com",
"rdtype": "A",
"resolver_ip": "1.2.3.4",
"response": ""
},
],
})
def test_bad_ip(self):
self.writeSurykatkaPromise(
{
'url': 'http://www.badip.com/',
'ip-list': '127.0.0.1 127.0.0.2',
}
)
self.configureLauncher(enable_anomaly=True)
with self.assertRaises(PromiseError):
self.launcher.run()
self.assertFailedMessage(
self.getPromiseResult(self.promise_name),
"http://www.badip.com/ : "
"dns_query: ERROR resolver's 1.2.3.4: 127.0.0.1 127.0.0.2 != "
"127.0.0.1 127.0.0.4"
)
def test_no_entry(self):
self.writeSurykatkaPromise(
{
'url': 'http://www.dnsquerynoentry.com/',
'ip-list': '127.0.0.1',
}
)
self.runAndAssertFailedMessage(
"https://www.cert14.com/ : "
"dns_query: OK No check configured "
"tcp_server: OK No check configured "
"http_query: OK IP 127.0.0.1 status_code 302 "
"ssl_certificate: ERROR IP 127.0.0.1 expires in < 15 days "
"elapsed_time: OK No check configured"
"http://www.dnsquerynoentry.com/ : "
"dns_query: ERROR No data"
)
def test_expired_certificate_before_today(self):
def test_query_no_key(self):
self.writeSurykatkaPromise(
{
'report': 'http_query',
'json-file': self.json_file,
'url': 'https://www.certminus14.com/',
'status-code': '302',
'url': 'http://www.dnsquerynokey.com/',
'ip-list': '127.0.0.1',
}
)
self.writeSurykatkaJson({})
self.runAndAssertFailedMessage(
"https://www.certminus14.com/ : "
"dns_query: OK No check configured "
"tcp_server: OK No check configured "
"http_query: OK IP 127.0.0.1 status_code 302 "
"ssl_certificate: ERROR IP 127.0.0.1 expires in < 15 days "
"elapsed_time: OK No check configured"
"http://www.dnsquerynokey.com/ : "
"dns_query: ERROR 'dns_query' not in %(json_file)r" % {
'json_file': self.json_file}
)
def test_no_http_query_data(self):
def test_mismatch(self):
self.writeSurykatkaPromise(
{
'report': 'http_query',
'json-file': self.json_file,
'url': 'http://www.httpquerynodata.com/',
'status-code': '302',
'url': 'http://www.httpallok.com/',
'ip-list': '127.0.0.1 127.0.0.9',
}
)
self.runAndAssertFailedMessage(
"http://www.httpquerynodata.com/ : "
"dns_query: OK No check configured "
"tcp_server: OK No check configured "
"http_query: ERROR No data "
"ssl_certificate: OK No check needed "
"elapsed_time: ERROR No data"
"http://www.httpallok.com/ : "
"dns_query: ERROR resolver's 1.2.3.4: 127.0.0.1 127.0.0.9 != "
"127.0.0.1 127.0.0.2"
)
def test_no_http_query_present(self):
def test_no_reply(self):
self.writeSurykatkaPromise(
{
'url': 'http://www.dnsquerynoreply.com/',
'ip-list': '127.0.0.1',
}
)
self.runAndAssertFailedMessage(
"http://www.dnsquerynoreply.com/ : "
"dns_query: ERROR resolver's 1.2.3.4: 127.0.0.1 != empty-reply"
)
class TestCheckSurykatkaJSONHttpQueryWhois(CheckSurykatkaJSONMixin):
def writeSurykatkaPromise(self, d):
d.update(**{
'report': 'http_query',
'json-file': self.json_file,
'url': 'http://www.httpquerynopresent.com/',
'status-code': '302',
'enabled-sense-list': 'whois',
})
super().writeSurykatkaPromise(d)
def setUp(self):
super().setUp()
self.writeSurykatkaJson({
"whois": [
{
"domain": "whois3.com",
"expiration_date": self.time_future3d,
},
{
"domain": "whois29.com",
"expiration_date": self.time_future29d
},
{
"domain": "whoisminus29.com",
"expiration_date": self.time_past29d
},
]
})
def test_no_entry(self):
self.writeSurykatkaPromise(
{
'url': 'http://www.whoisnoentry.com/',
'enabled-sense-list': 'whois',
}
)
self.runAndAssertFailedMessage(
"http://www.whoisnoentry.com/ : "
"whois: ERROR No data"
)
def test_no_key(self):
self.writeSurykatkaPromise(
{
'url': 'http://www.whoisnokey.com/',
}
)
self.writeSurykatkaJson({
"ssl_certificate": [],
"dns_query": [],
"tcp_server": [],
"dns_query": [
],
})
self.runAndAssertFailedMessage(
"http://www.httpquerynopresent.com/ : "
"dns_query: OK No check configured "
"tcp_server: OK No check configured "
"http_query: ERROR 'http_query' not in %(json_file)r "
"ssl_certificate: OK No check needed "
"elapsed_time: ERROR 'http_query' not in %(json_file)r" % {
"http://www.whoisnokey.com/ : "
"whois: ERROR 'whois' not in %(json_file)r" % {
'json_file': self.json_file}
)
def test_no_ssl_certificate_data(self):
def test_expires_2_day(self):
self.writeSurykatkaPromise(
{
'report': 'http_query',
'json-file': self.json_file,
'url': 'https://www.nosslcertificatedata.com/',
'status-code': '302',
'url': 'https://www.whois3.com/',
'domain-expiration-days': '2',
}
)
self.runAndAssertPassedMessage(
"https://www.whois3.com/ : "
"whois: OK whois3.com expires in > 2 days"
)
def test_expired_expires_2_day(self):
self.writeSurykatkaPromise(
{
'url': 'https://www.whois3.com/',
'domain-expiration-days': '4',
}
)
self.runAndAssertFailedMessage(
"https://www.nosslcertificatedata.com/ : "
"dns_query: OK No check configured "
"tcp_server: OK No check configured "
"http_query: OK IP 127.0.0.1 "
"status_code 302 "
"ssl_certificate: ERROR No data "
"elapsed_time: OK No check configured"
"https://www.whois3.com/ : "
"whois: ERROR whois3.com expires in < 4 days"
)
def test_no_ssl_certificate(self):
def test_expired(self):
self.writeSurykatkaPromise(
{
'report': 'http_query',
'json-file': self.json_file,
'url': 'https://www.nosslcertificate.com/',
'status-code': '302',
'url': 'https://www.whois29.com/',
}
)
self.runAndAssertFailedMessage(
"https://www.whois29.com/ : "
"whois: ERROR whois29.com expires in < 30 days"
)
def test_expired_before_today(self):
self.writeSurykatkaPromise(
{
'url': 'https://www.whoisminus29.com/',
}
)
self.runAndAssertFailedMessage(
"https://www.whoisminus29.com/ : "
"whois: ERROR whoisminus29.com expires in < 30 days"
)
class TestCheckSurykatkaJSONHttpQueryTcpServer(CheckSurykatkaJSONMixin):
def setUp(self):
super().setUp()
self.writeSurykatkaJson({
"http_query": [
"tcp_server": [
{
"ip": "127.0.0.1",
"status_code": 302,
"url": "https://www.nosslcertificate.com/"
"ip": "127.0.0.2",
"state": "open",
"port": 80,
"domain": "www.tcpservernoip.com"
},
],
"dns_query": [],
"tcp_server": []
{
"ip": "127.0.0.1",
"state": "filtered",
"port": 80,
"domain": "www.tcpserverfiltered.com"
},
]
})
def writeSurykatkaPromise(self, d):
d.update(**{
'report': 'http_query',
'json-file': self.json_file,
'enabled-sense-list': 'tcp_server',
})
super().writeSurykatkaPromise(d)
def test_tcp_server_no_ip(self):
self.writeSurykatkaPromise(
{
'url': 'http://www.tcpservernoip.com/',
'ip-list': '127.0.0.1',
}
)
self.runAndAssertFailedMessage(
"https://www.nosslcertificate.com/ : "
"dns_query: OK No check configured "
"tcp_server: OK No check configured "
"http_query: OK IP 127.0.0.1 status_code 302 "
"ssl_certificate: ERROR 'ssl_certificate' not in %(json_file)r "
"elapsed_time: OK No check configured" % {'json_file': self.json_file}
"http://www.tcpservernoip.com/ : "
"tcp_server: ERROR IP 127.0.0.1:80"
)
def test_bad_code(self):
def test_tcp_server_filtered(self):
self.writeSurykatkaPromise(
{
'report': 'http_query',
'json-file': self.json_file,
'url': 'https://www.allok.com/',
'status-code': '301',
'url': 'http://www.tcpserverfiltered.com/',
'ip-list': '127.0.0.1',
}
)
self.runAndAssertFailedMessage(
"https://www.allok.com/ : "
"dns_query: OK No check configured "
"tcp_server: OK No check configured "
"http_query: ERROR IP 127.0.0.1 status_code 302 != 301 ERROR "
"IP 127.0.0.2 status_code 302 != 301 "
"ssl_certificate: OK IP 127.0.0.1 expires in > 15 days OK IP "
"127.0.0.2 expires in > 15 days "
"elapsed_time: OK No check configured"
"http://www.tcpserverfiltered.com/ : "
"tcp_server: ERROR IP 127.0.0.1:80"
)
def _test_bad_code_explanation(self, status_code, explanation):
def test_tcp_server_no_entry(self):
self.writeSurykatkaPromise(
{
'url': 'http://www.tcpservernoentry.com/',
'ip-list': '127.0.0.1',
}
)
self.runAndAssertFailedMessage(
"http://www.tcpservernoentry.com/ : "
"tcp_server: ERROR No data"
)
def test_tcp_server_no_key(self):
self.writeSurykatkaPromise(
{
'url': 'http://www.tcpservernokey.com/',
'ip-list': '127.0.0.1',
}
)
self.writeSurykatkaJson({
"dns_query": [
],
})
self.runAndAssertFailedMessage(
"http://www.tcpservernokey.com/ : "
"tcp_server: ERROR 'tcp_server' not in %(json_file)r" % {
'json_file': self.json_file}
)
class TestCheckSurykatkaJSONHttpQueryHttpQuery(CheckSurykatkaJSONMixin):
def setUp(self):
super().setUp()
self.writeSurykatkaJson({
"http_query": [
{
"ip": "127.0.0.1",
"status_code": 302,
"url": "http://www.httpallok.com/",
"total_seconds": 4
},
{
"ip": "127.0.0.2",
"status_code": 302,
"url": "http://www.httpallok.com/",
"total_seconds": 4
},
{
"ip": "127.0.0.1",
"status_code": 200,
"url": "http://www.httpheader.com/",
"http_header_dict": {
"Vary": "Accept-Encoding", "Cache-Control": "max-age=300, public"},
},
{
"ip": "127.0.0.1",
"status_code": 302,
"url": "http://www.badip.com/",
},
{
"ip": "127.0.0.4",
"status_code": 302,
"url": "http://www.badip.com/",
},
],
})
def writeSurykatkaPromise(self, d):
d.update(**{
'report': 'http_query',
'json-file': self.json_file,
'enabled-sense-list': 'http_query',
})
super().writeSurykatkaPromise(d)
def _test_bad_code_explanation(self, status_code, explanation):
self.writeSurykatkaPromise(
{
'url': 'http://www.statuscode.com/',
'status-code': '301',
}
......@@ -792,17 +805,10 @@ class TestCheckSurykatkaJSONHttpQuery(CheckSurykatkaJSONMixin):
"url": "http://www.statuscode.com/"
}
],
"ssl_certificate": [],
"dns_query": [],
"tcp_server": [],
})
self.runAndAssertFailedMessage(
"http://www.statuscode.com/ : "
"dns_query: OK No check configured "
"tcp_server: OK No check configured "
"http_query: ERROR IP 127.0.0.1 status_code %s != 301 "
"ssl_certificate: OK No check needed "
"elapsed_time: OK No check configured" % (explanation,)
"http_query: ERROR IP 127.0.0.1 status_code %s != 301" % (explanation,)
)
def test_bad_code_explanation_520(self):
......@@ -817,270 +823,252 @@ class TestCheckSurykatkaJSONHttpQuery(CheckSurykatkaJSONMixin):
def test_bad_code_explanation_526(self):
self._test_bad_code_explanation(526, '526 (SSL Error)')
def test_bad_ip(self):
def test_bad_code(self):
self.writeSurykatkaPromise(
{
'url': 'http://www.httpallok.com/',
'status-code': '301',
}
)
self.runAndAssertFailedMessage(
"http://www.httpallok.com/ : "
"http_query: ERROR IP 127.0.0.1 status_code 302 != 301 ERROR "
"IP 127.0.0.2 status_code 302 != 301"
)
def test_not_present(self):
self.writeSurykatkaPromise(
{
'report': 'http_query',
'json-file': self.json_file,
'url': 'http://www.badip.com/',
'url': 'http://www.httpquerynopresent.com/',
'status-code': '302',
'ip-list': '127.0.0.1 127.0.0.2',
}
)
self.configureLauncher(enable_anomaly=True)
with self.assertRaises(PromiseError):
self.launcher.run()
self.assertFailedMessage(
self.getPromiseResult(self.promise_name),
"http://www.badip.com/ : "
"dns_query: ERROR resolver's 1.2.3.4: 127.0.0.1 127.0.0.2 != "
"127.0.0.1 127.0.0.4 "
"tcp_server: OK IP 127.0.0.1:80 ERROR IP 127.0.0.2:80 "
"http_query: OK IP 127.0.0.1 status_code 302 OK IP 127.0.0.4 "
"status_code 302 "
"ssl_certificate: OK No check needed "
"elapsed_time: OK No check configured"
self.writeSurykatkaJson({
"ssl_certificate": [],
"dns_query": [],
"tcp_server": [],
})
self.runAndAssertFailedMessage(
"http://www.httpquerynopresent.com/ : "
"http_query: ERROR 'http_query' not in %(json_file)r" % {
'json_file': self.json_file}
)
def test_https_no_cert(self):
def test_no_data(self):
self.writeSurykatkaPromise(
{
'report': 'http_query',
'json-file': self.json_file,
'url': 'https://www.sslcertnoinfo.com/',
'status-code': '301',
'url': 'http://www.httpquerynodata.com/',
'status-code': '302',
}
)
self.runAndAssertFailedMessage(
"https://www.sslcertnoinfo.com/ : "
"dns_query: OK No check configured "
"tcp_server: OK No check configured "
"http_query: OK IP 127.0.0.1 status_code 301 "
"ssl_certificate: ERROR IP 127.0.0.1 no information "
"elapsed_time: OK No check configured"
"http://www.httpquerynodata.com/ : "
"http_query: ERROR No data"
)
def test_dns_query_no_entry(self):
def test_header_dict_mismatch(self):
self.writeSurykatkaPromise(
{
'report': 'http_query',
'json-file': self.json_file,
'url': 'http://www.dnsquerynoentry.com/',
'status-code': '301',
'ip-list': '127.0.0.1'
'url': 'http://www.httpheader.com/',
'status-code': '200',
'http-header-dict': '{"Vary": "Accept-Encoding", "Cache-Control": '
'"max-age=300"}',
}
)
self.runAndAssertFailedMessage(
"http://www.dnsquerynoentry.com/ : "
"dns_query: ERROR No data "
"tcp_server: ERROR No data "
"http_query: ERROR No data "
"ssl_certificate: OK No check needed "
"elapsed_time: ERROR No data"
'http://www.httpheader.com/ : '
'http_query: OK IP 127.0.0.1 status_code 200 ERROR IP 127.0.0.1 '
'HTTP Header {"Cache-Control": "max-age=300", "Vary": '
'"Accept-Encoding"} != {"Cache-Control": "max-age=300, public", "Vary": '
'"Accept-Encoding"}'
)
def test_dns_query_no_key(self):
def test_header_dict(self):
self.writeSurykatkaPromise(
{
'report': 'http_query',
'json-file': self.json_file,
'url': 'http://www.dnsquerynokey.com/',
'status-code': '301',
'ip-list': '127.0.0.1',
'url': 'http://www.httpheader.com/',
'status-code': '200',
'http-header-dict': '{"Vary": "Accept-Encoding", "Cache-Control": '
'"max-age=300, public"}',
}
)
self.runAndAssertPassedMessage(
'http://www.httpheader.com/ : '
'http_query: OK IP 127.0.0.1 status_code 200 OK IP 127.0.0.1 HTTP '
'Header {"Cache-Control": "max-age=300, public", "Vary": '
'"Accept-Encoding"}'
)
class TestCheckSurykatkaJSONHttpQuerySslCertificate(CheckSurykatkaJSONMixin):
def setUp(self):
super().setUp()
self.writeSurykatkaJson({
"http_query": [
],
"ssl_certificate": [
{
"hostname": "www.cert3.com",
"ip": "127.0.0.1",
"not_after": self.time_future3d
},
{
"hostname": "www.cert14.com",
"ip": "127.0.0.1",
"not_after": self.time_future14d
},
{
"hostname": "www.certminus14.com",
"ip": "127.0.0.1",
"not_after": self.time_past14d
},
{
"hostname": "www.sslcertnoinfo.com",
"ip": "127.0.0.1",
"not_after": None
},
],
"tcp_server": []
})
self.runAndAssertFailedMessage(
"http://www.dnsquerynokey.com/ : "
"dns_query: ERROR 'dns_query' not in %(json_file)r "
"tcp_server: ERROR No data "
"http_query: ERROR No data "
"ssl_certificate: OK No check needed "
"elapsed_time: ERROR No data" % {'json_file': self.json_file}
)
def test_dns_query_mismatch(self):
self.writeSurykatkaPromise(
{
def writeSurykatkaPromise(self, d):
d.update(**{
'report': 'http_query',
'json-file': self.json_file,
'url': 'http://www.httpallok.com/',
'status-code': '302',
'ip-list': '127.0.0.1 127.0.0.9',
'enabled-sense-list': 'ssl_certificate',
})
super().writeSurykatkaPromise(d)
def test_good_certificate_2_day(self):
self.writeSurykatkaPromise(
{
'url': 'https://www.cert3.com/',
'certificate-expiration-days': '2',
}
)
self.runAndAssertFailedMessage(
"http://www.httpallok.com/ : "
"dns_query: ERROR resolver's 1.2.3.4: 127.0.0.1 127.0.0.9 != "
"127.0.0.1 127.0.0.2 "
"tcp_server: OK IP 127.0.0.1:80 ERROR IP 127.0.0.9:80 "
"http_query: OK IP 127.0.0.1 status_code 302 OK IP 127.0.0.2 "
"status_code 302 "
"ssl_certificate: OK No check needed "
"elapsed_time: OK No check configured"
self.runAndAssertPassedMessage(
"https://www.cert3.com/ : "
"ssl_certificate: OK IP 127.0.0.1 expires in > 2 days"
)
def test_dns_query_no_reply(self):
def test_expired_certificate_4_day(self):
self.writeSurykatkaPromise(
{
'report': 'http_query',
'json-file': self.json_file,
'url': 'http://www.dnsquerynoreply.com/',
'status-code': '301',
'ip-list': '127.0.0.1',
'url': 'https://www.cert3.com/',
'certificate-expiration-days': '4',
}
)
self.runAndAssertFailedMessage(
"http://www.dnsquerynoreply.com/ : "
"dns_query: ERROR resolver's 1.2.3.4: 127.0.0.1 != empty-reply "
"tcp_server: ERROR No data "
"http_query: ERROR No data "
"ssl_certificate: OK No check needed "
"elapsed_time: ERROR No data"
"https://www.cert3.com/ : "
"ssl_certificate: ERROR IP 127.0.0.1 expires in < 4 days"
)
def test_tcp_server_no_ip(self):
def test_expired_certificate(self):
self.writeSurykatkaPromise(
{
'report': 'http_query',
'json-file': self.json_file,
'url': 'http://www.tcpservernoip.com/',
'status-code': '301',
'ip-list': '127.0.0.1',
'url': 'https://www.cert14.com/',
}
)
self.runAndAssertFailedMessage(
"http://www.tcpservernoip.com/ : "
"dns_query: ERROR No data "
"tcp_server: ERROR IP 127.0.0.1:80 "
"http_query: ERROR No data "
"ssl_certificate: OK No check needed "
"elapsed_time: ERROR No data"
"https://www.cert14.com/ : "
"ssl_certificate: ERROR IP 127.0.0.1 expires in < 15 days"
)
def test_tcp_server_filtered(self):
def test_expired_certificate_before_today(self):
self.writeSurykatkaPromise(
{
'report': 'http_query',
'json-file': self.json_file,
'url': 'http://www.tcpserverfiltered.com/',
'status-code': '301',
'ip-list': '127.0.0.1',
'url': 'https://www.certminus14.com/',
}
)
self.runAndAssertFailedMessage(
"http://www.tcpserverfiltered.com/ : "
"dns_query: ERROR No data "
"tcp_server: ERROR IP 127.0.0.1:80 "
"http_query: ERROR No data "
"ssl_certificate: OK No check needed "
"elapsed_time: ERROR No data"
"https://www.certminus14.com/ : "
"ssl_certificate: ERROR IP 127.0.0.1 expires in < 15 days"
)
def test_tcp_server_no_entry(self):
def test_https_no_cert(self):
self.writeSurykatkaPromise(
{
'report': 'http_query',
'json-file': self.json_file,
'url': 'http://www.tcpservernoentry.com/',
'status-code': '301',
'ip-list': '127.0.0.1',
'url': 'https://www.sslcertnoinfo.com/',
}
)
self.runAndAssertFailedMessage(
"http://www.tcpservernoentry.com/ : "
"dns_query: ERROR No data "
"tcp_server: ERROR No data "
"http_query: ERROR No data "
"ssl_certificate: OK No check needed "
"elapsed_time: ERROR No data"
"https://www.sslcertnoinfo.com/ : "
"ssl_certificate: ERROR IP 127.0.0.1 no information"
)
def test_tcp_server_no_key(self):
def test_no_ssl_certificate(self):
self.writeSurykatkaPromise(
{
'report': 'http_query',
'json-file': self.json_file,
'url': 'http://www.tcpservernokey.com/',
'status-code': '301',
'ip-list': '127.0.0.1',
'url': 'https://www.nosslcertificate.com/',
}
)
self.writeSurykatkaJson({
"http_query": [
{
"ip": "127.0.0.1",
"status_code": 302,
"url": "https://www.nosslcertificate.com/"
},
],
"ssl_certificate": [
],
"dns_query": [
],
"dns_query": [],
"tcp_server": [],
"whois": []
})
self.runAndAssertFailedMessage(
"http://www.tcpservernokey.com/ : "
"dns_query: ERROR No data "
"tcp_server: ERROR 'tcp_server' not in %(json_file)r "
"http_query: ERROR No data "
"ssl_certificate: OK No check needed "
"elapsed_time: ERROR No data" % {'json_file': self.json_file}
"https://www.nosslcertificate.com/ : "
"ssl_certificate: ERROR 'ssl_certificate' not in %(json_file)r" % {
'json_file': self.json_file}
)
def test_all_ok_nothing_enabled(self):
self.writeSurykatkaPromise(
{
class TestCheckSurykatkaJSONHttpQueryElapsedTime(CheckSurykatkaJSONMixin):
def writeSurykatkaPromise(self, d):
d.update(**{
'report': 'http_query',
'json-file': self.json_file,
'url': 'https://www.allok.com/',
'status-code': '302',
'ip-list': '127.0.0.1 127.0.0.2',
'maximum-elapsed-time': '5',
'enabled-sense-list': '',
}
)
self.runAndAssertPassedMessage(
"https://www.allok.com/ :"
)
'enabled-sense-list': 'elapsed_time',
})
super().writeSurykatkaPromise(d)
def test_all_ok_no_ssl_certificate(self):
def setUp(self):
super().setUp()
self.writeSurykatkaJson({
"http_query": [
{
"ip": "127.0.0.1",
"status_code": 302,
"url": "https://www.elapsedtoolong.com/",
"total_seconds": 6
},
{
"ip": "127.0.0.1",
"status_code": 302,
"url": "https://www.elapsednototal.com/",
},
]
})
def test_too_long(self):
self.writeSurykatkaPromise(
{
'report': 'http_query',
'json-file': self.json_file,
'url': 'https://www.allok.com/',
'status-code': '302',
'ip-list': '127.0.0.1 127.0.0.2',
'url': 'https://www.elapsedtoolong.com/',
'ip-list': '127.0.0.1',
'maximum-elapsed-time': '5',
'enabled-sense-list': 'dns_query tcp_server http_query elapsed_time',
}
)
self.runAndAssertPassedMessage(
"https://www.allok.com/ : "
"dns_query: OK resolver's 1.2.3.4: 127.0.0.1 127.0.0.2 "
"tcp_server: OK IP 127.0.0.1:443 OK IP 127.0.0.2:443 "
"http_query: OK IP 127.0.0.1 status_code 302 OK IP 127.0.0.2 "
"status_code 302 "
"elapsed_time: OK IP 127.0.0.1 replied < 5.00s OK IP 127.0.0.2 replied "
"< 5.00s"
self.runAndAssertFailedMessage(
"https://www.elapsedtoolong.com/ : "
"elapsed_time: ERROR IP 127.0.0.1 replied > 5.00s"
)
def test_all_ok_only_ssl_certificate(self):
def test_no_match(self):
self.writeSurykatkaPromise(
{
'report': 'http_query',
'json-file': self.json_file,
'url': 'https://www.allok.com/',
'status-code': '302',
'ip-list': '127.0.0.1 127.0.0.2',
'url': 'https://www.elapsednototal.com/',
'ip-list': '127.0.0.1',
'maximum-elapsed-time': '5',
'enabled-sense-list': 'ssl_certificate',
}
)
self.runAndAssertPassedMessage(
"https://www.allok.com/ : "
"ssl_certificate: OK IP 127.0.0.1 expires in > 15 days OK IP "
"127.0.0.2 expires in > 15 days"
self.runAndAssertFailedMessage(
"https://www.elapsednototal.com/ : "
"elapsed_time: ERROR No entry with total_seconds found. If the error "
"persist, please update surykatka"
)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment