Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
slapos
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
zhifan huang
slapos
Commits
1aaaa03c
Commit
1aaaa03c
authored
Mar 26, 2019
by
Jérome Perrin
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
seleniumserver/test: use slapos.testing
parent
6ab16083
Changes
3
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
80 additions
and
404 deletions
+80
-404
software/seleniumserver/test/setup.py
software/seleniumserver/test/setup.py
+17
-15
software/seleniumserver/test/test.py
software/seleniumserver/test/test.py
+63
-67
software/seleniumserver/test/utils.py
software/seleniumserver/test/utils.py
+0
-322
No files found.
software/seleniumserver/test/setup.py
View file @
1aaaa03c
...
...
@@ -28,18 +28,20 @@ from setuptools import setup, find_packages
version
=
'0.0.1.dev0'
name
=
'slapos.test.seleniumserver'
long_description
=
open
(
"README.md"
).
read
()
with
open
(
"README.md"
)
as
f
:
long_description
=
f
.
read
()
setup
(
name
=
name
,
version
=
version
,
description
=
"Test for SlapOS' Selenium Server"
,
long_description
=
long_description
,
long_description_content_type
=
'text/markdown'
,
maintainer
=
"Nexedi"
,
maintainer_email
=
"info@nexedi.com"
,
url
=
"https://lab.nexedi.com/nexedi/slapos"
,
packages
=
find_packages
(),
install_requires
=
[
setup
(
name
=
name
,
version
=
version
,
description
=
"Test for SlapOS' Selenium Server"
,
long_description
=
long_description
,
long_description_content_type
=
'text/markdown'
,
maintainer
=
"Nexedi"
,
maintainer_email
=
"info@nexedi.com"
,
url
=
"https://lab.nexedi.com/nexedi/slapos"
,
packages
=
find_packages
(),
install_requires
=
[
'slapos.core'
,
'supervisor'
,
'slapos.libnetworkcache'
,
...
...
@@ -49,7 +51,7 @@ setup(name=name,
'image'
,
'requests'
,
'paramiko'
,
],
zip_safe
=
True
,
test_suite
=
'test'
,
)
],
zip_safe
=
True
,
test_suite
=
'test'
,
)
software/seleniumserver/test/test.py
View file @
1aaaa03c
...
...
@@ -47,14 +47,12 @@ from selenium.webdriver.common.desired_capabilities import DesiredCapabilities
from
selenium.webdriver.support
import
expected_conditions
as
EC
from
selenium.webdriver.support.ui
import
WebDriverWait
from
utils
import
SlapOSInstanceTestCase
,
findFreeTCPPort
from
slapos.testing.testcase
import
makeModuleSetUpAndTestCaseClass
from
slapos.testing.utils
import
findFreeTCPPort
debug_mode
=
os
.
environ
.
get
(
'SLAPOS_TEST_DEBUG'
)
# for development: debugging logs and install Ctrl+C handler
if
debug_mode
:
import
logging
logging
.
basicConfig
(
level
=
logging
.
DEBUG
)
unittest
.
installHandler
()
setUpModule
,
SeleniumServerTestCase
=
makeModuleSetUpAndTestCaseClass
(
os
.
path
.
abspath
(
os
.
path
.
join
(
os
.
path
.
dirname
(
__file__
),
'..'
,
'software.cfg'
)))
class
WebServerMixin
(
object
):
...
...
@@ -71,12 +69,14 @@ class WebServerMixin(object):
- upload a file and the file content will be displayed in div.uploadedfile
"""
def
log_message
(
self
,
*
args
,
**
kw
):
if
debug_mode
:
if
SeleniumServerTestCase
.
_debug
:
BaseHTTPRequestHandler
.
log_message
(
self
,
*
args
,
**
kw
)
def
do_GET
(
self
):
self
.
send_response
(
200
)
self
.
end_headers
()
self
.
wfile
.
write
(
'''
self
.
wfile
.
write
(
'''
<html>
<title>Test page</title>
<body>
...
...
@@ -87,18 +87,22 @@ class WebServerMixin(object):
</form>
</body>
</html>'''
)
def
do_POST
(
self
):
form
=
cgi
.
FieldStorage
(
fp
=
self
.
rfile
,
headers
=
self
.
headers
,
environ
=
{
'REQUEST_METHOD'
:
'POST'
,
'CONTENT_TYPE'
:
self
.
headers
[
'Content-Type'
],})
environ
=
{
'REQUEST_METHOD'
:
'POST'
,
'CONTENT_TYPE'
:
self
.
headers
[
'Content-Type'
],
})
self
.
send_response
(
200
)
self
.
end_headers
()
file_data
=
'no file'
if
form
.
has_key
(
'f'
):
file_data
=
form
[
'f'
].
file
.
read
()
self
.
wfile
.
write
(
'''
self
.
wfile
.
write
(
'''
<html>
<title>%s</title>
<div>%s</div>
...
...
@@ -128,8 +132,9 @@ class BrowserCompatibilityMixin(WebServerMixin):
def
setUp
(
self
):
super
(
BrowserCompatibilityMixin
,
self
).
setUp
()
self
.
driver
=
webdriver
.
Remote
(
command_executor
=
self
.
computer_partition
.
getConnectionParameterDict
()[
'backend-url'
],
desired_capabilities
=
self
.
desired_capabilities
)
command_executor
=
self
.
computer_partition
.
getConnectionParameterDict
()
[
'backend-url'
],
desired_capabilities
=
self
.
desired_capabilities
)
def
tearDown
(
self
):
self
.
driver
.
quit
()
...
...
@@ -142,7 +147,7 @@ class BrowserCompatibilityMixin(WebServerMixin):
def
test_simple_submit_scenario
(
self
):
self
.
driver
.
get
(
self
.
server_url
)
input_element
=
WebDriverWait
(
self
.
driver
,
3
).
until
(
input_element
=
WebDriverWait
(
self
.
driver
,
3
).
until
(
EC
.
visibility_of_element_located
((
By
.
NAME
,
'q'
)))
input_element
.
send_keys
(
self
.
id
())
input_element
.
submit
()
...
...
@@ -158,9 +163,7 @@ class BrowserCompatibilityMixin(WebServerMixin):
self
.
driver
.
find_element_by_xpath
(
'//input[@name="f"]'
).
send_keys
(
f
.
name
)
self
.
driver
.
find_element_by_xpath
(
'//input[@type="submit"]'
).
click
()
self
.
assertEqual
(
self
.
id
(),
self
.
driver
.
find_element_by_xpath
(
'//div'
).
text
)
self
.
assertEqual
(
self
.
id
(),
self
.
driver
.
find_element_by_xpath
(
'//div'
).
text
)
def
test_screenshot
(
self
):
self
.
driver
.
get
(
self
.
server_url
)
...
...
@@ -169,7 +172,9 @@ class BrowserCompatibilityMixin(WebServerMixin):
self
.
assertGreater
(
len
(
screenshot
.
getcolors
(
maxcolors
=
512
)),
2
)
def
test_window_and_screen_size
(
self
):
size
=
json
.
loads
(
self
.
driver
.
execute_script
(
'''
size
=
json
.
loads
(
self
.
driver
.
execute_script
(
'''
return JSON.stringify({
'screen.width': window.screen.width,
'screen.height': window.screen.height,
...
...
@@ -188,7 +193,9 @@ class BrowserCompatibilityMixin(WebServerMixin):
def
test_resize_window
(
self
):
self
.
driver
.
set_window_size
(
800
,
900
)
size
=
json
.
loads
(
self
.
driver
.
execute_script
(
'''
size
=
json
.
loads
(
self
.
driver
.
execute_script
(
'''
return JSON.stringify({
'outerWidth': window.outerWidth,
'outerHeight': window.outerHeight
...
...
@@ -201,10 +208,11 @@ class BrowserCompatibilityMixin(WebServerMixin):
webdriver_url
=
parameter_dict
[
'backend-url'
]
queue
=
multiprocessing
.
Queue
()
def
_test
(
q
,
server_url
):
driver
=
webdriver
.
Remote
(
command_executor
=
webdriver_url
,
desired_capabilities
=
self
.
desired_capabilities
)
command_executor
=
webdriver_url
,
desired_capabilities
=
self
.
desired_capabilities
)
try
:
driver
.
get
(
server_url
)
q
.
put
(
driver
.
title
==
'Test page'
)
...
...
@@ -213,32 +221,22 @@ class BrowserCompatibilityMixin(WebServerMixin):
nb_workers
=
10
workers
=
[]
for
i
in
range
(
nb_workers
):
for
_
in
range
(
nb_workers
):
worker
=
multiprocessing
.
Process
(
target
=
_test
,
args
=
(
queue
,
self
.
server_url
))
target
=
_test
,
args
=
(
queue
,
self
.
server_url
))
worker
.
start
()
workers
.
append
(
worker
)
del
worker
# pylint
del
worker
# pylint
_
=
[
worker
.
join
(
timeout
=
30
)
for
worker
in
workers
]
# terminate workers if they are still alive after 30 seconds
_
=
[
worker
.
terminate
()
for
worker
in
workers
if
worker
.
is_alive
()]
_
=
[
worker
.
join
()
for
worker
in
workers
]
del
_
# pylint
del
_
# pylint
self
.
assertEqual
(
[
True
]
*
nb_workers
,
[
queue
.
get
()
for
_
in
range
(
nb_workers
)])
class
SeleniumServerTestCase
(
SlapOSInstanceTestCase
):
"""Test the remote driver on a minimal web server.
"""
@
classmethod
def
getSoftwareURLList
(
cls
):
return
(
os
.
path
.
abspath
(
os
.
path
.
join
(
os
.
path
.
dirname
(
__file__
),
'..'
,
'software.cfg'
)),
)
[
True
]
*
nb_workers
,
[
queue
.
get
()
for
_
in
range
(
nb_workers
)])
class
TestBrowserSelection
(
WebServerMixin
,
SeleniumServerTestCase
):
...
...
@@ -249,18 +247,15 @@ class TestBrowserSelection(WebServerMixin, SeleniumServerTestCase):
webdriver_url
=
parameter_dict
[
'backend-url'
]
driver
=
webdriver
.
Remote
(
command_executor
=
webdriver_url
,
desired_capabilities
=
DesiredCapabilities
.
CHROME
)
command_executor
=
webdriver_url
,
desired_capabilities
=
DesiredCapabilities
.
CHROME
)
driver
.
get
(
self
.
server_url
)
self
.
assertEqual
(
'Test page'
,
driver
.
title
)
self
.
assertIn
(
'Chrome'
,
driver
.
execute_script
(
'return navigator.userAgent'
))
self
.
assertIn
(
'Chrome'
,
driver
.
execute_script
(
'return navigator.userAgent'
))
self
.
assertNotIn
(
'Firefox'
,
driver
.
execute_script
(
'return navigator.userAgent'
))
'Firefox'
,
driver
.
execute_script
(
'return navigator.userAgent'
))
driver
.
quit
()
def
test_firefox
(
self
):
...
...
@@ -268,15 +263,14 @@ class TestBrowserSelection(WebServerMixin, SeleniumServerTestCase):
webdriver_url
=
parameter_dict
[
'backend-url'
]
driver
=
webdriver
.
Remote
(
command_executor
=
webdriver_url
,
desired_capabilities
=
DesiredCapabilities
.
FIREFOX
)
command_executor
=
webdriver_url
,
desired_capabilities
=
DesiredCapabilities
.
FIREFOX
)
driver
.
get
(
self
.
server_url
)
self
.
assertEqual
(
'Test page'
,
driver
.
title
)
self
.
assertIn
(
'Firefox'
,
driver
.
execute_script
(
'return navigator.userAgent'
))
'Firefox'
,
driver
.
execute_script
(
'return navigator.userAgent'
))
driver
.
quit
()
def
test_firefox_desired_version
(
self
):
...
...
@@ -286,16 +280,16 @@ class TestBrowserSelection(WebServerMixin, SeleniumServerTestCase):
desired_capabilities
=
DesiredCapabilities
.
FIREFOX
.
copy
()
desired_capabilities
[
'version'
]
=
'60.0.2esr'
driver
=
webdriver
.
Remote
(
command_executor
=
webdriver_url
,
desired_capabilities
=
desired_capabilities
)
command_executor
=
webdriver_url
,
desired_capabilities
=
desired_capabilities
)
self
.
assertIn
(
'Gecko/20100101 Firefox/60.0'
,
driver
.
execute_script
(
'return navigator.userAgent'
))
driver
.
quit
()
desired_capabilities
[
'version'
]
=
'52.9.0esr'
driver
=
webdriver
.
Remote
(
command_executor
=
webdriver_url
,
desired_capabilities
=
desired_capabilities
)
command_executor
=
webdriver_url
,
desired_capabilities
=
desired_capabilities
)
self
.
assertIn
(
'Gecko/20100101 Firefox/52.0'
,
driver
.
execute_script
(
'return navigator.userAgent'
))
...
...
@@ -313,9 +307,7 @@ class TestFrontend(WebServerMixin, SeleniumServerTestCase):
self
.
assertEqual
(
'admin'
,
parsed
.
username
)
self
.
assertTrue
(
parsed
.
password
)
self
.
assertIn
(
'Grid Console'
,
requests
.
get
(
admin_url
,
verify
=
False
).
text
)
self
.
assertIn
(
'Grid Console'
,
requests
.
get
(
admin_url
,
verify
=
False
).
text
)
def
test_browser_use_hub
(
self
):
parameter_dict
=
self
.
computer_partition
.
getConnectionParameterDict
()
...
...
@@ -325,8 +317,8 @@ class TestFrontend(WebServerMixin, SeleniumServerTestCase):
self
.
assertTrue
(
parsed
.
password
)
driver
=
webdriver
.
Remote
(
command_executor
=
webdriver_url
,
desired_capabilities
=
DesiredCapabilities
.
CHROME
)
command_executor
=
webdriver_url
,
desired_capabilities
=
DesiredCapabilities
.
CHROME
)
driver
.
get
(
self
.
server_url
)
self
.
assertEqual
(
'Test page'
,
driver
.
title
)
...
...
@@ -346,27 +338,30 @@ class TestSSHServer(SeleniumServerTestCase):
self
.
assertEqual
(
'ssh'
,
parsed
.
scheme
)
client
=
paramiko
.
SSHClient
()
class
TestKeyPolicy
(
object
):
"""Accept server key and keep it in self.key for inspection
"""
def
missing_host_key
(
self
,
client
,
hostname
,
key
):
self
.
key
=
key
key_policy
=
TestKeyPolicy
()
client
.
set_missing_host_key_policy
(
key_policy
)
with
contextlib
.
closing
(
client
):
client
.
connect
(
username
=
urlparse
.
urlparse
(
ssh_url
).
username
,
hostname
=
urlparse
.
urlparse
(
ssh_url
).
hostname
,
port
=
urlparse
.
urlparse
(
ssh_url
).
port
,
pkey
=
self
.
ssh_key
,
username
=
urlparse
.
urlparse
(
ssh_url
).
username
,
hostname
=
urlparse
.
urlparse
(
ssh_url
).
hostname
,
port
=
urlparse
.
urlparse
(
ssh_url
).
port
,
pkey
=
self
.
ssh_key
,
)
# Check fingerprint from server matches the published one.
# The publish format is the raw output of ssh-keygen and is something like this:
# 521 SHA256:9aZruv3LmFizzueIFdkd78eGtzghDoPSCBXFkkrHqXE user@hostname (ECDSA)
# we only want to parse SHA256:9aZruv3LmFizzueIFdkd78eGtzghDoPSCBXFkkrHqXE
_
,
fingerprint_string
,
_
,
key_type
=
parameter_dict
[
'ssh-fingerprint'
].
split
()
_
,
fingerprint_string
,
_
,
key_type
=
parameter_dict
[
'ssh-fingerprint'
].
split
()
self
.
assertEqual
(
key_type
,
'(ECDSA)'
)
fingerprint_algorithm
,
fingerprint
=
fingerprint_string
.
split
(
':'
,
1
)
...
...
@@ -374,10 +369,11 @@ class TestSSHServer(SeleniumServerTestCase):
# Paramiko does not allow to get the fingerprint as SHA256 easily yet
# https://github.com/paramiko/paramiko/pull/1103
self
.
assertEqual
(
fingerprint
,
# XXX with sha256, we need to remove that trailing =
base64
.
b64encode
(
hashlib
.
new
(
fingerprint_algorithm
,
key_policy
.
key
.
asbytes
()).
digest
())[:
-
1
]
)
fingerprint
,
# XXX with sha256, we need to remove that trailing =
base64
.
b64encode
(
hashlib
.
new
(
fingerprint_algorithm
,
key_policy
.
key
.
asbytes
()).
digest
())[:
-
1
])
channel
=
client
.
invoke_shell
()
channel
.
settimeout
(
30
)
...
...
@@ -397,7 +393,7 @@ class TestFirefox52(BrowserCompatibilityMixin, SeleniumServerTestCase):
user_agent
=
'Gecko/20100101 Firefox/52.0'
# resizing window is not supported on firefox 52 geckodriver
test_resize_window
=
unittest
.
expectedFailure
(
BrowserCompatibilityMixin
.
test_resize_window
)
BrowserCompatibilityMixin
.
test_resize_window
)
class
TestFirefox60
(
BrowserCompatibilityMixin
,
SeleniumServerTestCase
):
...
...
software/seleniumserver/test/utils.py
deleted
100644 → 0
View file @
6ab16083
##############################################################################
#
# Copyright (c) 2018 Nexedi SA and Contributors. All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
import
unittest
import
os
import
socket
from
contextlib
import
closing
import
logging
import
StringIO
import
xmlrpclib
import
supervisor.xmlrpc
from
erp5.util.testnode.SlapOSControler
import
SlapOSControler
from
erp5.util.testnode.ProcessManager
import
ProcessManager
# Utility functions
def
findFreeTCPPort
(
ip
=
''
):
"""Find a free TCP port to listen to.
"""
family
=
socket
.
AF_INET6
if
':'
in
ip
else
socket
.
AF_INET
with
closing
(
socket
.
socket
(
family
,
socket
.
SOCK_STREAM
))
as
s
:
s
.
bind
((
ip
,
0
))
return
s
.
getsockname
()[
1
]
# TODO:
# - allow requesting multiple instances ?
class
SlapOSInstanceTestCase
(
unittest
.
TestCase
):
"""Install one slapos instance.
This test case install software(s) and request one instance during `setUpClass`
and destroy the instance during `tearDownClass`.
Software Release URL, Instance Software Type and Instance Parameters can be defined
on the class.
All tests from the test class will run with the same instance.
The following class attributes are available:
* `computer_partition`: the `slapos.core.XXX` computer partition instance.
* `computer_partition_root_path`: the path of the instance root directory,
"""
# Methods to be defined by subclasses.
@
classmethod
def
getSoftwareURLList
(
cls
):
"""Return URL of software releases to install.
To be defined by subclasses.
"""
raise
NotImplementedError
()
@
classmethod
def
getInstanceParameterDict
(
cls
):
"""Return instance parameters
To be defined by subclasses if they need to request instance with specific
parameters.
"""
return
{}
@
classmethod
def
getInstanceSoftwareType
(
cls
):
"""Return software type for instance, default "default"
To be defined by subclasses if they need to request instance with specific
software type.
"""
return
"default"
# Utility methods.
def
getSupervisorRPCServer
(
self
):
"""Returns a XML-RPC connection to the supervisor used by slapos node
Refer to http://supervisord.org/api.html for details of available methods.
"""
# xmlrpc over unix socket https://stackoverflow.com/a/11746051/7294664
return
xmlrpclib
.
ServerProxy
(
'http://slapos-supervisor'
,
transport
=
supervisor
.
xmlrpc
.
SupervisorTransport
(
None
,
None
,
# XXX hardcoded socket path
serverurl
=
"unix://{working_directory}/inst/supervisord.socket"
.
format
(
**
self
.
config
)))
# Unittest methods
@
classmethod
def
setUpClass
(
cls
):
"""Setup the class, build software and request an instance.
If you have to override this method, do not forget to call this method on
parent class.
"""
try
:
cls
.
setUpWorkingDirectory
()
cls
.
setUpConfig
()
cls
.
setUpSlapOSController
()
cls
.
runSoftwareRelease
()
# XXX instead of "runSoftwareRelease", it would be better to be closer to slapos usage:
# cls.supplySoftwares()
# cls.installSoftwares()
cls
.
runComputerPartition
()
# XXX instead of "runComputerPartition", it would be better to be closer to slapos usage:
# cls.requestInstances()
# cls.createInstances()
# cls.requestInstances()
except
BaseException
:
cls
.
stopSlapOSProcesses
()
cls
.
setUp
=
lambda
self
:
self
.
fail
(
'Setup Class failed.'
)
raise
@
classmethod
def
tearDownClass
(
cls
):
"""Tear down class, stop the processes and destroy instance.
"""
cls
.
stopSlapOSProcesses
()
# Implementation
@
classmethod
def
stopSlapOSProcesses
(
cls
):
if
hasattr
(
cls
,
'_process_manager'
):
cls
.
_process_manager
.
killPreviousRun
()
@
classmethod
def
setUpWorkingDirectory
(
cls
):
"""Initialise the directories"""
cls
.
working_directory
=
os
.
environ
.
get
(
'SLAPOS_TEST_WORKING_DIR'
,
os
.
path
.
join
(
os
.
path
.
dirname
(
__file__
),
'.slapos'
))
# To prevent error: Cannot open an HTTP server: socket.error reported
# AF_UNIX path too long This `working_directory` should not be too deep.
# Socket path is 108 char max on linux
# https://github.com/torvalds/linux/blob/3848ec5/net/unix/af_unix.c#L234-L238
# Supervisord socket name contains the pid number, which is why we add
# .xxxxxxx in this check.
if
len
(
cls
.
working_directory
+
'/inst/supervisord.socket.xxxxxxx'
)
>
108
:
raise
RuntimeError
(
'working directory ( {} ) is too deep, try setting '
'SLAPOS_TEST_WORKING_DIR'
.
format
(
cls
.
working_directory
))
if
not
os
.
path
.
exists
(
cls
.
working_directory
):
os
.
mkdir
(
cls
.
working_directory
)
@
classmethod
def
setUpConfig
(
cls
):
"""Create slapos configuration"""
cls
.
config
=
{
"working_directory"
:
cls
.
working_directory
,
"slapos_directory"
:
cls
.
working_directory
,
"log_directory"
:
cls
.
working_directory
,
"computer_id"
:
'slapos.test'
,
# XXX
'proxy_database'
:
os
.
path
.
join
(
cls
.
working_directory
,
'proxy.db'
),
'partition_reference'
:
cls
.
__name__
,
# "proper" slapos command must be in $PATH
'slapos_binary'
:
'slapos'
,
'node_quantity'
:
'3'
,
}
# Some tests are expecting that local IP is not set to 127.0.0.1
ipv4_address
=
os
.
environ
.
get
(
'SLAPOS_TEST_IPV4'
,
'127.0.1.1'
)
ipv6_address
=
os
.
environ
[
'SLAPOS_TEST_IPV6'
]
cls
.
config
[
'proxy_host'
]
=
cls
.
config
[
'ipv4_address'
]
=
ipv4_address
cls
.
config
[
'ipv6_address'
]
=
ipv6_address
cls
.
config
[
'proxy_port'
]
=
findFreeTCPPort
(
ipv4_address
)
cls
.
config
[
'master_url'
]
=
'http://{proxy_host}:{proxy_port}'
.
format
(
**
cls
.
config
)
@
classmethod
def
setUpSlapOSController
(
cls
):
"""Create the a "slapos controller" and supply softwares from `getSoftwareURLList`.
This is equivalent to:
slapos proxy start
for sr in getSoftwareURLList; do
slapos supply $SR $COMP
done
"""
cls
.
_process_manager
=
ProcessManager
()
# XXX this code is copied from testnode code
cls
.
slapos_controler
=
SlapOSControler
(
cls
.
working_directory
,
cls
.
config
)
slapproxy_log
=
os
.
path
.
join
(
cls
.
config
[
'log_directory'
],
'slapproxy.log'
)
logger
=
logging
.
getLogger
(
__name__
)
logger
.
debug
(
'Configured slapproxy log to %r'
,
slapproxy_log
)
cls
.
software_url_list
=
cls
.
getSoftwareURLList
()
cls
.
slapos_controler
.
initializeSlapOSControler
(
slapproxy_log
=
slapproxy_log
,
process_manager
=
cls
.
_process_manager
,
reset_software
=
False
,
software_path_list
=
cls
.
software_url_list
)
# XXX we should check *earlier* if that pidfile exist and if supervisord
# process still running, because if developer started supervisord (or bugs?)
# then another supervisord will start and starting services a second time
# will fail.
cls
.
_process_manager
.
supervisord_pid_file
=
os
.
path
.
join
(
cls
.
slapos_controler
.
instance_root
,
'var'
,
'run'
,
'supervisord.pid'
)
@
classmethod
def
runSoftwareRelease
(
cls
):
"""Run all the software releases that were supplied before.
This is the equivalent of `slapos node software`.
The tests will be marked file if software building fail.
"""
logger
=
logging
.
getLogger
()
logger
.
level
=
logging
.
DEBUG
stream
=
StringIO
.
StringIO
()
stream_handler
=
logging
.
StreamHandler
(
stream
)
logger
.
addHandler
(
stream_handler
)
try
:
cls
.
software_status_dict
=
cls
.
slapos_controler
.
runSoftwareRelease
(
cls
.
config
,
environment
=
os
.
environ
)
stream
.
seek
(
0
)
stream
.
flush
()
message
=
''
.
join
(
stream
.
readlines
()[
-
100
:])
assert
cls
.
software_status_dict
[
'status_code'
]
==
0
,
message
finally
:
logger
.
removeHandler
(
stream_handler
)
del
stream
@
classmethod
def
runComputerPartition
(
cls
):
"""Instanciate the software.
This is the equivalent of doing:
slapos request --type=getInstanceSoftwareType --parameters=getInstanceParameterDict
slapos node instance
and return the slapos request instance parameters.
This can be called by tests to simulate re-request with different parameters.
"""
logger
=
logging
.
getLogger
()
logger
.
level
=
logging
.
DEBUG
stream
=
StringIO
.
StringIO
()
stream_handler
=
logging
.
StreamHandler
(
stream
)
logger
.
addHandler
(
stream_handler
)
if
cls
.
getInstanceSoftwareType
()
!=
'default'
:
raise
NotImplementedError
instance_parameter_dict
=
cls
.
getInstanceParameterDict
()
try
:
cls
.
instance_status_dict
=
cls
.
slapos_controler
.
runComputerPartition
(
cls
.
config
,
cluster_configuration
=
instance_parameter_dict
,
environment
=
os
.
environ
)
stream
.
seek
(
0
)
stream
.
flush
()
message
=
''
.
join
(
stream
.
readlines
()[
-
100
:])
assert
cls
.
instance_status_dict
[
'status_code'
]
==
0
,
message
finally
:
logger
.
removeHandler
(
stream_handler
)
del
stream
# FIXME: similar to test node, only one (root) partition is really
# supported for now.
computer_partition_list
=
[]
for
i
in
range
(
len
(
cls
.
software_url_list
)):
computer_partition_list
.
append
(
cls
.
slapos_controler
.
slap
.
registerOpenOrder
().
request
(
cls
.
software_url_list
[
i
],
# This is how testnode's SlapOSControler name created partitions
partition_reference
=
'testing partition {i}'
.
format
(
i
=
i
,
**
cls
.
config
),
partition_parameter_kw
=
instance_parameter_dict
))
# expose some class attributes so that tests can use them:
# the ComputerPartition instances, to getInstanceParameterDict
cls
.
computer_partition
=
computer_partition_list
[
0
]
# the path of the instance on the filesystem, for low level inspection
cls
.
computer_partition_root_path
=
os
.
path
.
join
(
cls
.
config
[
'working_directory'
],
'inst'
,
cls
.
computer_partition
.
getId
())
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment