Commit 00e41393 authored by Romain Courteaud's avatar Romain Courteaud

First report version

parent 04a267cf
import time
from urlchecker_db import LogDB
from urlchecker_configuration import createConfiguration, logConfiguration
from urlchecker_status import logStatus
from urlchecker_status import logStatus, reportStatus
from urlchecker_dns import (
getReachableResolverList,
expandDomainList,
getDomainIpDict,
reportDnsQuery,
)
from urlchecker_http import getRootUrl, getUrlHostname, checkHttpStatus
from urlchecker_network import isTcpPortOpen
from urlchecker_http import (
getRootUrl,
getUrlHostname,
checkHttpStatus,
reportHttp,
)
from urlchecker_network import isTcpPortOpen, reportNetwork
__version__ = "0.0.3"
......@@ -26,6 +32,22 @@ class WebBot:
self._db = LogDB(self.config["SQLITE"])
self._db.createTables()
def calculateUrlList(self):
return self.config["URL"].split()
def calculateFullDomainList(self):
# Calculate the full list of domain to check
domain_list = self.config["DOMAIN"].split()
# Extract the list of URL domains
url_list = self.calculateUrlList()
for url in url_list:
domain_list.append(getUrlHostname(url))
domain_list = list(set(domain_list))
# Expand with all parent domains
return expandDomainList(domain_list)
def iterateLoop(self):
status_id = logStatus(self._db, "loop")
# logPlatform(self._db, __version__, status_id)
......@@ -36,17 +58,9 @@ class WebBot:
)
if not resolver_ip_list:
return
# Calculate the full list of domain to check
domain_list = self.config["DOMAIN"].split()
# Extract the list of URL domains
url_list = self.config["URL"].split()
for url in url_list:
domain_list.append(getUrlHostname(url))
domain_list = list(set(domain_list))
# Expand with all parent domains
domain_list = expandDomainList(domain_list)
# Get list of all domains
domain_list = self.calculateFullDomainList()
# Get the list of server to check
# XXX Check DNS expiration
......@@ -69,7 +83,7 @@ class WebBot:
url_dict[url].append(server_ip)
# XXX put back orignal url list
for url in url_list:
for url in self.calculateUrlList():
if url not in url_dict:
root_url = getRootUrl(url)
if root_url in url_dict:
......@@ -83,6 +97,111 @@ class WebBot:
# XXX Parse HTML, fetch found link, css, js, image
# XXX Check HTTP Cache
def status(self):
# XXX
self.initDB()
# Report the bot status
print("# STATUS")
status = reportStatus(self._db).get()
print(" ", status.text, status.timestamp)
# Report the list of DNS server status
query = reportNetwork(
self._db, port="53", transport="UDP", ip=self.config["DNS"].split()
)
print("# DNS SERVER")
resolver_ip_list = []
for network_change in query.dicts().iterator():
if network_change["state"] == "open":
resolver_ip_list.append(network_change["ip"])
print(
" ",
network_change["ip"],
network_change["state"],
network_change["timestamp"],
)
if not resolver_ip_list:
return
domain_list = self.calculateFullDomainList()
# Report list of DNS query
query = reportDnsQuery(
self._db,
domain=domain_list,
resolver_ip=resolver_ip_list,
rdtype="A",
)
print("# DNS STATUS")
server_ip_dict = {}
for dns_change in query.dicts().iterator():
print(
" ",
dns_change["domain"],
dns_change["resolver_ip"],
dns_change["timestamp"],
dns_change["response"],
)
for server_ip in dns_change["response"].split(", "):
if not server_ip:
# drop empty response
continue
if server_ip not in server_ip_dict:
server_ip_dict[server_ip] = []
server_ip_dict[server_ip].append(dns_change["domain"])
# Report the list of CDN status
query = reportNetwork(
self._db,
port=["80", "443"],
transport="TCP",
ip=[x for x in server_ip_dict.keys()],
)
print("# HTTP SERVER")
url_dict = {}
for network_change in query.dicts().iterator():
print(
" ",
network_change["ip"],
network_change["state"],
network_change["port"],
network_change["timestamp"],
", ".join(server_ip_dict[network_change["ip"]]),
)
if network_change["state"] == "open":
for hostname in server_ip_dict[network_change["ip"]]:
protocol = (
"http" if (network_change["port"] == 80) else "https"
)
url = "%s://%s" % (protocol, hostname)
if url not in url_dict:
url_dict[url] = []
url_dict[url].append(network_change["ip"])
# XXX put back orignal url list
for url in self.calculateUrlList():
if url not in url_dict:
root_url = getRootUrl(url)
if root_url in url_dict:
url_dict[url] = url_dict[root_url]
# Get the list of HTTP servers to check
query = reportHttp(
self._db,
ip=[x for x in server_ip_dict.keys()],
url=[x for x in url_dict.keys()],
)
print("# HTTP")
for network_change in query.dicts().iterator():
print(
" ",
network_change["status_code"],
network_change["url"],
network_change["ip"],
network_change["timestamp"],
)
def stop(self):
self._running = False
logStatus(self._db, "stop")
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment