Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
S
slapos.toolbox
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Justin
slapos.toolbox
Commits
325f7993
Commit
325f7993
authored
Mar 29, 2021
by
Łukasz Nowak
Browse files
Options
Browse Files
Download
Plain Diff
Feature/domain2ip
See merge request
nexedi/slapos.toolbox!94
parents
408aec2b
44c18a7d
Changes
3
Show whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
214 additions
and
1 deletion
+214
-1
setup.py
setup.py
+4
-1
slapos/dnsresolver.py
slapos/dnsresolver.py
+123
-0
slapos/test/test_dnsresolver.py
slapos/test/test_dnsresolver.py
+87
-0
No files found.
setup.py
View file @
325f7993
...
...
@@ -54,6 +54,8 @@ setup(name=name,
'pycurl'
,
'six'
,
'cryptography'
,
'click'
,
'ipaddress; python_version<"3"'
,
),
extras_require
=
{
'lampconfigure'
:
[
"mysqlclient"
],
#needed for MySQL Database access
...
...
@@ -104,7 +106,8 @@ setup(name=name,
'slaprunnerteststandalone = slapos.runner.runnertest:runStandaloneUnitTest'
,
'zodbpack = slapos.zodbpack:run [zodbpack]'
,
'networkbench = slapos.networkbench:main'
,
'cachechecker = slapos.cachechecker:web_checker_utility'
'cachechecker = slapos.cachechecker:web_checker_utility'
,
'dnsresolver = slapos.dnsresolver:cli'
,
]
},
test_suite
=
'slapos.test'
,
...
...
slapos/dnsresolver.py
0 → 100644
View file @
325f7993
# coding: utf-8
# Copyright (C) 2021 Nexedi SA and Contributors.
# Łukasz Nowak <luke@nexedi.com>
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your
# option) any later version, as published by the Free Software Foundation.
#
# You can also Link and Combine this program with other software covered by
# the terms of any of the Free Software licenses or any of the Open Source
# Initiative approved licenses and Convey the resulting work. Corresponding
# source of such a combination shall include the source code for all other
# software used.
#
# This program is distributed WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See COPYING file for full licensing terms.
# See https://www.nexedi.com/licensing for rationale and options.
from
__future__
import
print_function
import
click
import
dns.exception
import
dns.resolver
import
ipaddress
import
json
import
os
import
time
TIMEOUT
=
5
@
click
.
command
(
short_help
=
"Resolves domains and IPv4s to IPv4s"
)
@
click
.
option
(
"--style"
,
"-s"
,
help
=
"Output style of the JSON."
,
type
=
click
.
Choice
([
"list"
,
"full"
]),
default
=
"list"
,
show_default
=
True
)
@
click
.
option
(
"--output"
,
"-o"
,
help
=
"Output file."
,
type
=
click
.
Path
()
# can't use click.File, as file has to be opened an
# closed on each run
)
@
click
.
option
(
"--delay"
,
"-d"
,
help
=
"Delay in seconds between iterations, use -1 for one time processing."
,
default
=
300
,
show_default
=
True
,
type
=
click
.
INT
)
@
click
.
argument
(
'input_list'
,
nargs
=-
1
,
type
=
click
.
Path
(),
# can't use click.File, as existence is required and
# that increases usage complexity
)
def
cli
(
style
,
output
,
input_list
,
delay
):
app
(
style
,
output
,
input_list
,
delay
)
def
app
(
style
,
output
,
input_list
,
delay
):
while
True
:
address_dict
=
{}
# use default resolver of the machine, as this is expected approach
resolver
=
dns
.
resolver
.
Resolver
(
configure
=
True
)
resolver
.
timeout
=
TIMEOUT
resolver
.
lifetime
=
TIMEOUT
resolver
.
edns
=
-
1
for
filename
in
input_list
:
if
not
os
.
path
.
exists
(
filename
):
continue
with
open
(
filename
,
'rb'
)
as
fh
:
for
line
in
fh
.
readlines
():
line
=
line
.
decode
(
'utf-8'
).
strip
()
if
line
.
startswith
(
'#'
):
continue
for
entry
in
line
.
split
():
try
:
address_dict
[
entry
]
=
[
str
(
ipaddress
.
ip_address
(
entry
).
exploded
)]
except
Exception
:
try
:
address_dict
[
entry
]
=
[
answer
.
address
for
answer
in
resolver
.
query
(
entry
)]
except
(
dns
.
resolver
.
NXDOMAIN
,
dns
.
resolver
.
NoAnswer
,
dns
.
exception
.
Timeout
,
dns
.
resolver
.
NoNameservers
,
):
address_dict
[
entry
]
=
[]
if
style
==
'list'
:
merged_list
=
[]
for
q
in
address_dict
.
values
():
merged_list
.
extend
(
q
)
dumps
=
json
.
dumps
(
sorted
(
set
(
merged_list
)),
indent
=
2
)
else
:
dumps
=
json
.
dumps
(
address_dict
,
indent
=
2
)
if
output
is
None
:
print
(
dumps
)
else
:
with
open
(
output
,
'wb'
)
as
fh
:
fh
.
write
(
dumps
.
encode
())
if
delay
==
-
1
:
break
else
:
time
.
sleep
(
delay
)
if
__name__
==
'__main__'
:
cli
()
slapos/test/test_dnsresolver.py
0 → 100644
View file @
325f7993
# coding: utf-8
# Copyright (C) 2021 Nexedi SA and Contributors.
# Łukasz Nowak <luke@nexedi.com>
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your
# option) any later version, as published by the Free Software Foundation.
#
# You can also Link and Combine this program with other software covered by
# the terms of any of the Free Software licenses or any of the Open Source
# Initiative approved licenses and Convey the resulting work. Corresponding
# source of such a combination shall include the source code for all other
# software used.
#
# This program is distributed WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See COPYING file for full licensing terms.
# See https://www.nexedi.com/licensing for rationale and options.
import
dns.resolver
import
json
import
mock
import
os
import
shutil
import
tempfile
import
unittest
from
slapos
import
dnsresolver
class
MockAnswer
(
object
):
def
__init__
(
self
,
address
):
self
.
address
=
address
class
Answer
(
object
):
def
__init__
(
self
,
address
):
self
.
address
=
address
class
DNSResolverTestCase
(
unittest
.
TestCase
):
def
_query
(
self
,
h
):
return
self
.
_query_answer
.
get
(
h
,
[])
def
setUp
(
self
):
self
.
working_directory
=
tempfile
.
mkdtemp
()
self
.
patcher_list
=
[
mock
.
patch
.
object
(
dns
.
resolver
.
Resolver
,
"query"
,
new
=
self
.
_query
)
]
[
q
.
start
()
for
q
in
self
.
patcher_list
]
def
tearDown
(
self
):
shutil
.
rmtree
(
self
.
working_directory
,
True
)
[
q
.
stop
()
for
q
in
self
.
patcher_list
]
def
_fillInput
(
self
,
value
):
with
tempfile
.
NamedTemporaryFile
(
dir
=
self
.
working_directory
,
delete
=
False
)
as
fh
:
fh
.
write
(
value
.
encode
())
return
fh
.
name
def
test
(
self
):
ip
=
'123.124.125.126'
input_list
=
[
self
.
_fillInput
(
'1.example.com non.example.com
\
n
%s'
%
(
ip
,)),
self
.
_fillInput
(
'2.example.com %s'
%
(
ip
,)),
self
.
_fillInput
(
''
),
self
.
_fillInput
(
'# 1.2.3.4'
),
]
output
=
os
.
path
.
join
(
self
.
working_directory
,
'output'
)
self
.
_query_answer
=
{
'1.example.com'
:
[
Answer
(
'127.0.0.1'
),
Answer
(
'127.0.0.3'
)],
'2.example.com'
:
[
Answer
(
'127.0.0.2'
)]
}
dnsresolver
.
app
(
"list"
,
output
,
input_list
,
-
1
)
self
.
assertTrue
(
os
.
path
.
exists
(
output
))
with
open
(
output
,
'rb'
)
as
fh
:
self
.
assertEqual
(
json
.
load
(
fh
),
[
'123.124.125.126'
,
'127.0.0.1'
,
'127.0.0.2'
,
'127.0.0.3'
]
)
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment