Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
S
slapos
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Kirill Smelkov
slapos
Commits
cd5cfb49
Commit
cd5cfb49
authored
Sep 23, 2023
by
Kirill Smelkov
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
.
parent
5a98a2d7
Changes
4
Hide whitespace changes
Inline
Side-by-side
Showing
4 changed files
with
386 additions
and
0 deletions
+386
-0
software/ors-amarisoft/lopcomm-rrh-reset.jinja2.py
software/ors-amarisoft/lopcomm-rrh-reset.jinja2.py
+19
-0
software/ors-amarisoft/lopcomm-rrh-software.jinja2.py
software/ors-amarisoft/lopcomm-rrh-software.jinja2.py
+103
-0
software/ors-amarisoft/lopcomm-rrh-supervision.jinja2.py
software/ors-amarisoft/lopcomm-rrh-supervision.jinja2.py
+61
-0
software/ors-amarisoft/ncclient_common.py
software/ors-amarisoft/ncclient_common.py
+203
-0
No files found.
software/ors-amarisoft/lopcomm-rrh-reset.jinja2.py
0 → 100644
View file @
cd5cfb49
#!{{ python_path }}
import
time
import
sys
sys
.
path
.
append
({{
repr
(
buildout_directory_path
)
}})
from
ncclient_common
import
LopcommNetconfClient
if
__name__
==
'__main__'
:
nc
=
LopcommNetconfClient
(
log_file
=
"{{ log_file }}"
)
while
True
:
try
:
nc
.
connect
(
"{{ netaddr.IPAddress(slap_configuration.get('tap-ipv6-gateway', '')) }}"
,
830
,
"oranuser"
,
"oranpassword"
)
nc
.
nc
.
reset_device
()
break
except
Exception
as
e
:
nc
.
logger
.
debug
(
'Got exception, waiting 10 seconds before reconnecting...'
)
nc
.
logger
.
debug
(
e
)
time
.
sleep
(
10
)
finally
:
nc
.
close
()
software/ors-amarisoft/lopcomm-rrh-software.jinja2.py
0 → 100644
View file @
cd5cfb49
#!{{ python_path }}
import
time
import
xmltodict
import
sys
import
re
import
os
sys
.
path
.
append
({{
repr
(
buildout_directory_path
)
}})
from
ncclient_common
import
LopcommNetconfClient
if
__name__
==
'__main__'
:
nc
=
LopcommNetconfClient
(
log_file
=
"{{ log_file }}"
,
software_reply_json_log_file
=
"{{ software_reply_json_log_file }}"
)
while
True
:
try
:
firmware_check_file
=
os
.
path
.
join
(
'{{etc_path}}'
,
'is_firmware_updated'
)
nc
.
connect
(
"{{ netaddr.IPAddress(slap_configuration.get('tap-ipv6-gateway', '')) }}"
,
830
,
"oranuser"
,
"oranpassword"
)
# Fetch software inventory
inventory_vars
=
nc
.
get_inventory
()
nonrunning_slot_name
=
inventory_vars
[
"nonrunning_slot_name"
]
running_slot_name
=
inventory_vars
[
"running_slot_name"
]
active_nonrunning_slot_name
=
inventory_vars
[
"active_nonrunning_slot_name"
]
nonrunning_slot_name_build_version
=
inventory_vars
[
"nonrunning_slot_name_build_version"
]
running_slot_name_build_version
=
inventory_vars
[
"running_slot_name_build_version"
]
if
running_slot_name
and
nonrunning_slot_name
:
if
running_slot_name
:
nc
.
logger
.
info
(
"One slot is running and one is non-running. Proceeding..."
)
if
running_slot_name_build_version
in
"{{firmware_name}}"
:
if
not
os
.
path
.
exists
(
firmware_check_file
):
open
(
firmware_check_file
,
"w"
).
write
(
'True'
)
nc
.
logger
.
info
(
"Running slot's build-version %s is already updated. Skipping install."
%
running_slot_name_build_version
)
else
:
if
os
.
path
.
exists
(
firmware_check_file
):
os
.
remove
(
firmware_check_file
)
nc
.
logger
.
info
(
"Current build version: %s"
%
running_slot_name_build_version
)
user_authorized_key
=
"""{{ slapparameter_dict.get('user-authorized-key', '') }}"""
match
=
re
.
match
(
r'ssh-rsa ([^\
s]+)
', user_authorized_key)
if match:
extracted_key = match.group(1)
else:
nc.logger.info("No valid key found in user authorized key.")
download_rpc_xml = f"""
<software-download xmlns="urn:o-ran:software-management:1.0">
<remote-file-path>{{remote_file_path}}</remote-file-path>
<server>
<keys>
<algorithm xmlns:ct="urn:ietf:params:xml:ns:yang:ietf-crypto-types">1024</algorithm>
<public-key>{extracted_key}</public-key>
</keys>
</server>
</software-download>
"""
download_reply_xml = nc.custom_rpc_request(download_rpc_xml)
nc.logger.info("Downloading software...")
time.sleep(60)
if download_reply_xml:
nc.logger.info("Download proceed.")
download_data = xmltodict.parse(download_reply_xml)
nc.software_reply_json_logger.info('', extra={'
data
': download_data})
install_rpc_xml = f"""
<software-install xmlns="urn:o-ran:software-management:1.0">
<slot-name>{nonrunning_slot_name}</slot-name>
<file-names>{{firmware_name}}</file-names>
</software-install>
"""
install_reply_xml = nc.custom_rpc_request(install_rpc_xml)
nc.logger.info("Installing software...")
time.sleep(60)
if install_reply_xml:
nc.logger.info("Installation proceed.")
install_data = xmltodict.parse(install_reply_xml)
nc.software_reply_json_logger.info('', extra={'
data
': install_data})
if nonrunning_slot_name_build_version in "{{firmware_name}}":
activate_rpc_xml = f"""
<software-activate xmlns="urn:o-ran:software-management:1.0">
<slot-name>{nonrunning_slot_name}</slot-name>
</software-activate>
"""
activate_reply_xml = nc.custom_rpc_request(activate_rpc_xml)
nc.logger.info("Activating software...")
time.sleep(60)
if activate_reply_xml:
nc.logger.info("Activation proceed.")
activate_data = xmltodict.parse(activate_reply_xml)
nc.software_reply_json_logger.info('', extra={'
data
': activate_data})
nc.get_inventory()
if nonrunning_slot_name_build_version in "{{firmware_name}}" and active_nonrunning_slot_name:
nc.logger.info("Active non-running slot has the updated build version. Resetting device.")
nc.reset_device()
break
except Exception as e:
nc.logger.debug('
Got
exception
,
waiting
10
seconds
before
reconnecting
...
')
nc.logger.debug(str(e))
time.sleep(10)
finally:
nc.close()
software/ors-amarisoft/lopcomm-rrh-supervision.jinja2.py
0 → 100644
View file @
cd5cfb49
#!{{ python_path }}
import
time
import
xmltodict
import
sys
import
re
import
os
sys
.
path
.
append
({{
repr
(
buildout_directory_path
)
}})
from
ncclient_common
import
LopcommNetconfClient
if
__name__
==
'__main__'
:
nc
=
LopcommNetconfClient
(
log_file
=
"{{ log_file }}"
,
supervision_reply_json_log_file
=
"{{ supervision_reply_json_log_file }}"
)
try
:
netconf_check_file
=
os
.
path
.
join
(
'{{etc_path}}'
,
'is_netconf_connected'
)
nc
.
connect
(
"{{ netaddr.IPAddress(slap_configuration.get('tap-ipv6-gateway', '')) }}"
,
830
,
"oranuser"
,
"oranpassword"
)
supervision_subscription_rpc_xml
=
"""
<create-subscription xmlns="urn:ietf:params:xml:ns:netconf:notification:1.0">
<stream>o-ran-supervision</stream>
</create-subscription>
"""
nc
.
logger
.
info
(
"Subscription creating..."
)
supervision_subscription_reply_xml
=
nc
.
custom_rpc_request
(
supervision_subscription_rpc_xml
)
if
supervision_subscription_reply_xml
:
nc
.
logger
.
info
(
"Subscription created"
)
supervision_subscription_data
=
xmltodict
.
parse
(
supervision_subscription_reply_xml
)
nc
.
supervision_reply_json_logger
.
info
(
''
,
extra
=
{
'data'
:
supervision_subscription_data
})
while
True
:
supervision_watchdog_rpc_xml
=
"""
<supervision-watchdog-reset xmlns="urn:o-ran:supervision:1.0">
<supervision-notification-interval>60</supervision-notification-interval>
<guard-timer-overhead>10</guard-timer-overhead>
</supervision-watchdog-reset>
"""
nc
.
logger
.
info
(
"NETCONF server replying..."
)
supervision_watchdog_reply_xml
=
nc
.
custom_rpc_request
(
supervision_watchdog_rpc_xml
)
if
supervision_watchdog_reply_xml
:
if
not
os
.
path
.
exists
(
netconf_check_file
):
open
(
netconf_check_file
,
"w"
).
write
(
'True'
)
nc
.
logger
.
info
(
"NETCONF server replied"
)
supervision_watchdog_data
=
xmltodict
.
parse
(
supervision_watchdog_reply_xml
)
nc
.
supervision_reply_json_logger
.
info
(
''
,
extra
=
{
'data'
:
supervision_watchdog_data
})
# It must be the same interval as <supervision-notification-interval>
time
.
sleep
(
60
)
else
:
if
os
.
path
.
exists
(
netconf_check_file
):
os
.
remove
(
netconf_check_file
)
else
:
nc
.
logger
.
debug
(
"Subscription failed."
)
except
Exception
as
e
:
nc
.
logger
.
debug
(
'Got exception, waiting 10 seconds before reconnecting...'
)
nc
.
logger
.
debug
(
str
(
e
))
time
.
sleep
(
10
)
finally
:
nc
.
close
()
\ No newline at end of file
software/ors-amarisoft/ncclient_common.py
0 → 100644
View file @
cd5cfb49
import
time
import
logging
import
xmltodict
from
logging.handlers
import
RotatingFileHandler
from
ncclient
import
manager
from
ncclient.operations
import
RPCError
from
ncclient.xml_
import
*
from
ncclient.devices.default
import
DefaultDeviceHandler
class
LopcommNetconfClient
:
def
__init__
(
self
,
log_file
,
json_log_file
=
None
,
cfg_json_log_file
=
None
,
supervision_json_log_file
=
None
,
ncsession_json_log_file
=
None
,
software_json_log_file
=
None
,
software_reply_json_log_file
=
None
,
supervision_reply_json_log_file
=
None
,
testing
=
False
):
self
.
logger
=
logging
.
getLogger
(
'logger'
)
self
.
logger
.
setLevel
(
logging
.
DEBUG
)
handler
=
RotatingFileHandler
(
log_file
,
maxBytes
=
100000
,
backupCount
=
5
)
self
.
logger
.
addHandler
(
handler
)
formatter
=
logging
.
Formatter
(
"%(asctime)s [%(levelname)s] %(message)s"
)
handler
.
setFormatter
(
formatter
)
if
json_log_file
:
self
.
json_logger
=
logging
.
getLogger
(
'json_logger'
)
self
.
json_logger
.
setLevel
(
logging
.
DEBUG
)
json_handler
=
RotatingFileHandler
(
json_log_file
,
maxBytes
=
100000
,
backupCount
=
5
)
json_formatter
=
logging
.
Formatter
(
'{"time": "%(asctime)s", "log_level": "%(levelname)s", "message": "%(message)s", "data": %(data)s}'
)
json_handler
.
setFormatter
(
json_formatter
)
self
.
json_logger
.
addHandler
(
json_handler
)
self
.
cfg_json_logger
=
logging
.
getLogger
(
'cfg_json_logger'
)
self
.
cfg_json_logger
.
setLevel
(
logging
.
DEBUG
)
cfg_json_handler
=
RotatingFileHandler
(
cfg_json_log_file
,
maxBytes
=
100000
,
backupCount
=
5
)
cfg_json_formatter
=
logging
.
Formatter
(
'{"time": "%(asctime)s", "log_level": "%(levelname)s", "message": "%(message)s", "data": %(data)s}'
)
cfg_json_handler
.
setFormatter
(
cfg_json_formatter
)
self
.
cfg_json_logger
.
addHandler
(
cfg_json_handler
)
self
.
supervision_json_logger
=
logging
.
getLogger
(
'supervision_json_logger'
)
self
.
supervision_json_logger
.
setLevel
(
logging
.
DEBUG
)
supervision_json_handler
=
RotatingFileHandler
(
supervision_json_log_file
,
maxBytes
=
100000
,
backupCount
=
5
)
supervision_json_formatter
=
logging
.
Formatter
(
'{"time": "%(asctime)s", "log_level": "%(levelname)s", "message": "%(message)s", "data": %(data)s}'
)
supervision_json_handler
.
setFormatter
(
supervision_json_formatter
)
self
.
supervision_json_logger
.
addHandler
(
supervision_json_handler
)
self
.
ncsession_json_logger
=
logging
.
getLogger
(
'ncsession_json_logger'
)
self
.
ncsession_json_logger
.
setLevel
(
logging
.
DEBUG
)
ncsession_json_handler
=
RotatingFileHandler
(
ncsession_json_log_file
,
maxBytes
=
100000
,
backupCount
=
5
)
ncsession_json_formatter
=
logging
.
Formatter
(
'{"time": "%(asctime)s", "log_level": "%(levelname)s", "message": "%(message)s", "data": %(data)s}'
)
ncsession_json_handler
.
setFormatter
(
ncsession_json_formatter
)
self
.
ncsession_json_logger
.
addHandler
(
ncsession_json_handler
)
self
.
software_json_logger
=
logging
.
getLogger
(
'software_json_logger'
)
self
.
software_json_logger
.
setLevel
(
logging
.
DEBUG
)
software_json_handler
=
RotatingFileHandler
(
software_json_log_file
,
maxBytes
=
100000
,
backupCount
=
5
)
software_json_formatter
=
logging
.
Formatter
(
'{"time": "%(asctime)s", "log_level": "%(levelname)s", "message": "%(message)s", "data": %(data)s}'
)
software_json_handler
.
setFormatter
(
software_json_formatter
)
self
.
software_json_logger
.
addHandler
(
software_json_handler
)
else
:
self
.
json_logger
=
None
self
.
cfg_json_logger
=
None
self
.
supervision_json_logger
=
None
self
.
ncsession_json_logger
=
None
self
.
software_json_logger
=
None
if
supervision_reply_json_log_file
:
self
.
supervision_reply_json_logger
=
logging
.
getLogger
(
'supervision_reply_json_logger'
)
self
.
supervision_reply_json_logger
.
setLevel
(
logging
.
DEBUG
)
supervision_reply_json_handler
=
RotatingFileHandler
(
supervision_reply_json_log_file
,
maxBytes
=
100000
,
backupCount
=
5
)
supervision_reply_json_formatter
=
logging
.
Formatter
(
'{"time": "%(asctime)s", "log_level": "%(levelname)s", "message": "%(message)s", "data": %(data)s}'
)
supervision_reply_json_handler
.
setFormatter
(
supervision_reply_json_formatter
)
self
.
supervision_reply_json_logger
.
addHandler
(
supervision_reply_json_handler
)
else
:
self
.
supervision_reply_json_logger
=
None
if
software_reply_json_log_file
:
self
.
software_reply_json_logger
=
logging
.
getLogger
(
'software_reply_json_logger'
)
self
.
software_reply_json_logger
.
setLevel
(
logging
.
DEBUG
)
software_reply_json_handler
=
RotatingFileHandler
(
software_reply_json_log_file
,
maxBytes
=
100000
,
backupCount
=
5
)
software_reply_json_formatter
=
logging
.
Formatter
(
'{"time": "%(asctime)s", "log_level": "%(levelname)s", "message": "%(message)s", "data": %(data)s}'
)
software_reply_json_handler
.
setFormatter
(
software_reply_json_formatter
)
self
.
software_reply_json_logger
.
addHandler
(
software_reply_json_handler
)
else
:
self
.
software_reply_json_logger
=
None
if
testing
:
return
def
connect
(
self
,
host
,
port
,
user
,
password
):
self
.
address
=
(
host
,
port
)
self
.
logger
.
info
(
'Connecting to %s, user %s...'
%
(
self
.
address
,
user
))
self
.
conn
=
manager
.
connect
(
host
=
host
,
port
=
port
,
username
=
user
,
password
=
password
,
timeout
=
1800
,
device_params
=
{
'name'
:
'default'
},
hostkey_verify
=
False
)
self
.
logger
.
info
(
'Connection to %s successful'
%
(
self
.
address
,))
def
subscribe
(
self
):
sub
=
self
.
conn
.
create_subscription
()
self
.
logger
.
info
(
'Subscription to %s successful'
%
(
self
.
address
,))
def
get_notification
(
self
):
result
=
None
while
result
==
None
:
self
.
logger
.
debug
(
'Waiting for notification from %s...'
%
(
self
.
address
,))
result
=
self
.
conn
.
take_notification
(
block
=
True
)
if
result
:
self
.
logger
.
debug
(
'Got new notification from %s...'
%
(
self
.
address
,))
result_in_xml
=
result
.
_raw
data_dict
=
xmltodict
.
parse
(
result_in_xml
)
if
'alarm-notif'
in
data_dict
[
'notification'
]:
self
.
json_logger
.
info
(
''
,
extra
=
{
'data'
:
data_dict
})
elif
'supervision-notification'
in
data_dict
[
'notification'
]:
self
.
supervision_json_logger
.
info
(
''
,
extra
=
{
'data'
:
data_dict
})
elif
'netconf-session-start'
in
data_dict
[
'notification'
]
or
'netconf-session-end'
in
data_dict
[
'notification'
]:
self
.
ncsession_json_logger
.
info
(
''
,
extra
=
{
'data'
:
data_dict
})
elif
any
(
event
in
data_dict
[
'notification'
]
for
event
in
[
'install-event'
,
'activation-event'
,
'download-event'
]):
self
.
software_json_logger
.
info
(
''
,
extra
=
{
'data'
:
data_dict
})
else
:
self
.
cfg_json_logger
.
info
(
''
,
extra
=
{
'data'
:
data_dict
})
def
edit_config
(
self
,
config_files
):
for
config_file
in
config_files
:
with
open
(
config_file
)
as
f
:
config_xml
=
f
.
read
()
try
:
self
.
logger
.
info
(
'Sending edit-config RPC request...'
)
self
.
conn
.
edit_config
(
target
=
'running'
,
config
=
config_xml
)
self
.
logger
.
info
(
'Edit-config RPC request sent successfully'
)
except
RPCError
as
e
:
self
.
logger
.
error
(
'Error sending edit-config RPC request: %s'
%
e
)
def
custom_rpc_request
(
self
,
rpc_xml
):
try
:
self
.
logger
.
info
(
'Sending custom RPC request...'
)
response
=
self
.
conn
.
dispatch
(
to_ele
(
rpc_xml
))
if
response
.
ok
:
self
.
logger
.
info
(
'Custom RPC request sent successfully'
)
return
response
.
xml
else
:
self
.
logger
.
error
(
'Error sending custom RPC request: %s'
%
response
.
error
)
except
RPCError
as
e
:
self
.
logger
.
error
(
'Error sending custom RPC request: %s'
%
e
)
def
reset_device
(
self
):
self
.
logger
.
info
(
'Resetting...'
)
reset_rpc_xml
=
"""
<reset xmlns="urn:o-ran:operations:1.0">
</reset>
"""
reset_reply_xml
=
self
.
custom_rpc_request
(
reset_rpc_xml
)
if
reset_reply_xml
:
reset_data
=
xmltodict
.
parse
(
reset_reply_xml
)
self
.
software_reply_json_logger
.
info
(
''
,
extra
=
{
'data'
:
reset_data
})
self
.
logger
.
info
(
'Wait 60 second then reboot!'
)
time
.
sleep
(
60
)
def
get_inventory
(
self
):
self
.
logger
.
info
(
'Fetching software inventory...'
)
inventory_rpc_xml
=
"""
<get xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">
<filter type="subtree">
<software-inventory xmlns="urn:o-ran:software-management:1.0" />
</filter>
</get>
"""
inventory_reply_xml
=
self
.
custom_rpc_request
(
inventory_rpc_xml
)
if
inventory_reply_xml
:
self
.
logger
.
info
(
'Finish fetching software inventory.'
)
inventory_data
=
xmltodict
.
parse
(
inventory_reply_xml
)
self
.
software_reply_json_logger
.
info
(
''
,
extra
=
{
'data'
:
inventory_data
})
nonrunning_slot_name
=
None
running_slot_name
=
None
active_nonrunning_slot_name
=
None
nonrunning_slot_name_build_version
=
None
running_slot_name_build_version
=
None
software_slots
=
inventory_data
[
'nc:rpc-reply'
][
'data'
][
'software-inventory'
][
'software-slot'
]
for
slot
in
software_slots
:
if
slot
[
'running'
]
==
'false'
:
nonrunning_slot_name
=
slot
[
'name'
]
nonrunning_slot_name_build_version
=
slot
[
'build-version'
]
if
slot
[
'running'
]
==
'true'
:
running_slot_name
=
slot
[
'name'
]
running_slot_name_build_version
=
slot
[
'build-version'
]
elif
slot
[
'active'
]
==
'true'
and
slot
[
'running'
]
==
'false'
:
active_nonrunning_slot_name
=
slot
[
'name'
]
return
{
"nonrunning_slot_name"
:
nonrunning_slot_name
,
"running_slot_name"
:
running_slot_name
,
"active_nonrunning_slot_name"
:
active_nonrunning_slot_name
,
"nonrunning_slot_name_build_version"
:
nonrunning_slot_name_build_version
,
"running_slot_name_build_version"
:
running_slot_name_build_version
}
def
close
(
self
):
# Close not compatible between ncclient and netconf server
#self.conn.close()
pass
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment