Commit 44c18a7d authored by Łukasz Nowak's avatar Łukasz Nowak

dnsresolver: Introduce new tool

Simple and predictable tool to resolve massively domain names to IPs.
parent 408aec2b
......@@ -54,6 +54,8 @@ setup(name=name,
'pycurl',
'six',
'cryptography',
'click',
'ipaddress; python_version<"3"',
),
extras_require = {
'lampconfigure': ["mysqlclient"], #needed for MySQL Database access
......@@ -104,7 +106,8 @@ setup(name=name,
'slaprunnerteststandalone = slapos.runner.runnertest:runStandaloneUnitTest',
'zodbpack = slapos.zodbpack:run [zodbpack]',
'networkbench = slapos.networkbench:main',
'cachechecker = slapos.cachechecker:web_checker_utility'
'cachechecker = slapos.cachechecker:web_checker_utility',
'dnsresolver = slapos.dnsresolver:cli',
]
},
test_suite='slapos.test',
......
# coding: utf-8
# Copyright (C) 2021 Nexedi SA and Contributors.
# Łukasz Nowak <luke@nexedi.com>
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your
# option) any later version, as published by the Free Software Foundation.
#
# You can also Link and Combine this program with other software covered by
# the terms of any of the Free Software licenses or any of the Open Source
# Initiative approved licenses and Convey the resulting work. Corresponding
# source of such a combination shall include the source code for all other
# software used.
#
# This program is distributed WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See COPYING file for full licensing terms.
# See https://www.nexedi.com/licensing for rationale and options.
from __future__ import print_function
import click
import dns.exception
import dns.resolver
import ipaddress
import json
import os
import time
TIMEOUT = 5
@click.command(short_help="Resolves domains and IPv4s to IPv4s")
@click.option(
"--style",
"-s",
help="Output style of the JSON.",
type=click.Choice(["list", "full"]),
default="list",
show_default=True
)
@click.option(
"--output",
"-o",
help="Output file.",
type=click.Path() # can't use click.File, as file has to be opened an
# closed on each run
)
@click.option(
"--delay",
"-d",
help="Delay in seconds between iterations, use -1 for one time processing.",
default=300,
show_default=True,
type=click.INT
)
@click.argument(
'input_list',
nargs=-1,
type=click.Path(), # can't use click.File, as existence is required and
# that increases usage complexity
)
def cli(
style,
output,
input_list,
delay
):
app(style, output, input_list, delay)
def app(style, output, input_list, delay):
while True:
address_dict = {}
# use default resolver of the machine, as this is expected approach
resolver = dns.resolver.Resolver(configure=True)
resolver.timeout = TIMEOUT
resolver.lifetime = TIMEOUT
resolver.edns = -1
for filename in input_list:
if not os.path.exists(filename):
continue
with open(filename, 'rb') as fh:
for line in fh.readlines():
line = line.decode('utf-8').strip()
if line.startswith('#'):
continue
for entry in line.split():
try:
address_dict[entry] = [
str(ipaddress.ip_address(entry).exploded)]
except Exception:
try:
address_dict[entry] = [
answer.address for answer in resolver.query(entry)]
except (
dns.resolver.NXDOMAIN,
dns.resolver.NoAnswer,
dns.exception.Timeout,
dns.resolver.NoNameservers,
):
address_dict[entry] = []
if style == 'list':
merged_list = []
for q in address_dict.values():
merged_list.extend(q)
dumps = json.dumps(sorted(set(merged_list)), indent=2)
else:
dumps = json.dumps(address_dict, indent=2)
if output is None:
print(dumps)
else:
with open(output, 'wb') as fh:
fh.write(dumps.encode())
if delay == -1:
break
else:
time.sleep(delay)
if __name__ == '__main__':
cli()
# coding: utf-8
# Copyright (C) 2021 Nexedi SA and Contributors.
# Łukasz Nowak <luke@nexedi.com>
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your
# option) any later version, as published by the Free Software Foundation.
#
# You can also Link and Combine this program with other software covered by
# the terms of any of the Free Software licenses or any of the Open Source
# Initiative approved licenses and Convey the resulting work. Corresponding
# source of such a combination shall include the source code for all other
# software used.
#
# This program is distributed WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See COPYING file for full licensing terms.
# See https://www.nexedi.com/licensing for rationale and options.
import dns.resolver
import json
import mock
import os
import shutil
import tempfile
import unittest
from slapos import dnsresolver
class MockAnswer(object):
def __init__(self, address):
self.address = address
class Answer(object):
def __init__(self, address):
self.address = address
class DNSResolverTestCase(unittest.TestCase):
def _query(self, h):
return self._query_answer.get(h, [])
def setUp(self):
self.working_directory = tempfile.mkdtemp()
self.patcher_list = [
mock.patch.object(dns.resolver.Resolver, "query", new=self._query)
]
[q.start() for q in self.patcher_list]
def tearDown(self):
shutil.rmtree(self.working_directory, True)
[q.stop() for q in self.patcher_list]
def _fillInput(self, value):
with tempfile.NamedTemporaryFile(
dir=self.working_directory,
delete=False) as fh:
fh.write(value.encode())
return fh.name
def test(self):
ip = '123.124.125.126'
input_list = [
self._fillInput('1.example.com non.example.com\n%s' % (ip,)),
self._fillInput('2.example.com %s' % (ip,)),
self._fillInput(''),
self._fillInput('# 1.2.3.4'),
]
output = os.path.join(self.working_directory, 'output')
self._query_answer = {
'1.example.com': [Answer('127.0.0.1'), Answer('127.0.0.3')],
'2.example.com': [Answer('127.0.0.2')]
}
dnsresolver.app("list", output, input_list, -1)
self.assertTrue(os.path.exists(output))
with open(output, 'rb') as fh:
self.assertEqual(
json.load(fh),
['123.124.125.126', '127.0.0.1', '127.0.0.2', '127.0.0.3']
)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment