Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
slapos
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Léo-Paul Géneau
slapos
Commits
1e3d0810
Commit
1e3d0810
authored
Jul 29, 2021
by
Jérome Perrin
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
software/nginx-push-stream: add a software release test
parent
8f0a8b10
Changes
3
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
131 additions
and
0 deletions
+131
-0
software/nginx-push-stream/test/README.md
software/nginx-push-stream/test/README.md
+1
-0
software/nginx-push-stream/test/setup.py
software/nginx-push-stream/test/setup.py
+50
-0
software/nginx-push-stream/test/test.py
software/nginx-push-stream/test/test.py
+80
-0
No files found.
software/nginx-push-stream/test/README.md
0 → 100644
View file @
1e3d0810
Tests for Nginx Push Stream software release
software/nginx-push-stream/test/setup.py
0 → 100644
View file @
1e3d0810
##############################################################################
#
# Copyright (c) 2021 Nexedi SA and Contributors. All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
from
setuptools
import
setup
,
find_packages
version
=
'0.0.1.dev0'
name
=
'slapos.test.nginx_push_stream'
long_description
=
open
(
"README.md"
).
read
()
setup
(
name
=
name
,
version
=
version
,
description
=
"Test for SlapOS' Nginx Push Stream"
,
long_description
=
long_description
,
long_description_content_type
=
'text/markdown'
,
maintainer
=
"Nexedi"
,
maintainer_email
=
"info@nexedi.com"
,
url
=
"https://lab.nexedi.com/nexedi/slapos"
,
packages
=
find_packages
(),
install_requires
=
[
'slapos.core'
,
'slapos.libnetworkcache'
,
'requests'
,
],
zip_safe
=
True
,
test_suite
=
'test'
,
)
software/nginx-push-stream/test/test.py
0 → 100644
View file @
1e3d0810
##############################################################################
# coding: utf-8
# Copyright (c) 2021 Nexedi SA and Contributors. All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
import
os
import
multiprocessing
import
requests
from
slapos.testing.testcase
import
makeModuleSetUpAndTestCaseClass
setUpModule
,
SlapOSInstanceTestCase
=
makeModuleSetUpAndTestCaseClass
(
os
.
path
.
abspath
(
os
.
path
.
join
(
os
.
path
.
dirname
(
__file__
),
'..'
,
'software.cfg'
)))
class
TestNginxPushStream
(
SlapOSInstanceTestCase
):
def
setUp
(
self
):
self
.
connection_parameters
=
\
self
.
computer_partition
.
getConnectionParameterDict
()
def
test_push_stream_scenario
(
self
):
def
process_messages
(
q
):
# type:(multiprocessing.Queue[bytes]) -> None
req
=
requests
.
get
(
self
.
connection_parameters
[
'subscriber-url'
]
+
'/channel_id'
,
verify
=
False
,
stream
=
True
,
)
if
not
req
.
ok
:
q
.
put
((
'error: wrong status code %s'
%
req
.
status_code
).
encode
())
q
.
put
(
b'ready'
)
for
_
,
line
in
zip
(
range
(
2
),
req
.
iter_lines
()):
q
.
put
(
line
)
q
=
multiprocessing
.
Queue
()
# type: multiprocessing.Queue[bytes]
subscriber
=
multiprocessing
.
Process
(
target
=
process_messages
,
args
=
(
q
,
))
subscriber
.
start
()
self
.
assertEqual
(
q
.
get
(
timeout
=
30
),
b'ready'
)
resp
=
requests
.
post
(
self
.
connection_parameters
[
'publisher-url'
]
+
'?id=channel_id'
,
verify
=
False
,
data
=
'Hello'
,
timeout
=
2
,
)
resp
.
raise_for_status
()
try
:
subscriber
.
join
(
timeout
=
30
)
except
multiprocessing
.
TimeoutError
:
subscriber
.
terminate
()
subscriber
.
join
(
timeout
=
30
)
self
.
fail
(
'Process did not terminate'
)
self
.
assertEqual
(
q
.
get_nowait
(),
b': '
)
self
.
assertEqual
(
q
.
get_nowait
(),
b'data: Hello'
)
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment