Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
slapos
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
Analytics
Analytics
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Commits
Issue Boards
Open sidebar
Alain Takoudjou
slapos
Commits
48db30d9
Commit
48db30d9
authored
Sep 14, 2023
by
Lu Xu
👀
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
software/ors-amarisoft: add firmware auto-upgrade over netconf
parent
aff6a8d9
Changes
6
Hide whitespace changes
Inline
Side-by-side
Showing
6 changed files
with
424 additions
and
197 deletions
+424
-197
software/ors-amarisoft/lopcomm-rrh-config.jinja2.py
software/ors-amarisoft/lopcomm-rrh-config.jinja2.py
+15
-74
software/ors-amarisoft/lopcomm-rrh-reset.jinja2.py
software/ors-amarisoft/lopcomm-rrh-reset.jinja2.py
+19
-0
software/ors-amarisoft/lopcomm-rrh-software.jinja2.py
software/ors-amarisoft/lopcomm-rrh-software.jinja2.py
+103
-0
software/ors-amarisoft/lopcomm-rrh-stats.jinja2.py
software/ors-amarisoft/lopcomm-rrh-stats.jinja2.py
+23
-123
software/ors-amarisoft/lopcomm-rrh-supervision.jinja2.py
software/ors-amarisoft/lopcomm-rrh-supervision.jinja2.py
+61
-0
software/ors-amarisoft/ncclient_common.py
software/ors-amarisoft/ncclient_common.py
+203
-0
No files found.
software/ors-amarisoft/lopcomm-rrh-config.jinja2.py
View file @
48db30d9
#!{{ python_path }}
import
logging
import
time
import
xmltodict
from
logging.handlers
import
RotatingFileHandler
from
ncclient
import
manager
from
ncclient.operations
import
RPCError
from
ncclient.xml_
import
*
from
ncclient.devices.default
import
DefaultDeviceHandler
class
LopcommNetconfClient
:
def
__init__
(
self
):
log_file
=
"{{ log_file }}"
self
.
logger
=
logging
.
getLogger
(
'logger'
)
self
.
logger
.
setLevel
(
logging
.
DEBUG
)
handler
=
RotatingFileHandler
(
log_file
,
maxBytes
=
100000
,
backupCount
=
5
)
self
.
logger
.
addHandler
(
handler
)
formatter
=
logging
.
Formatter
(
"%(asctime)s [%(levelname)s] %(message)s"
)
handler
.
setFormatter
(
formatter
)
if
{{
testing
}}:
return
def
connect
(
self
,
host
,
port
,
user
,
password
):
if
{{
testing
}}:
return
self
.
address
=
(
host
,
port
)
self
.
logger
.
info
(
'Connecting to %s, user %s...'
%
(
self
.
address
,
user
))
self
.
conn
=
manager
.
connect
(
host
=
host
,
port
=
port
,
username
=
user
,
password
=
password
,
timeout
=
1800
,
device_params
=
{
'name'
:
'default'
},
hostkey_verify
=
False
)
self
.
logger
.
info
(
'Connection to %s successful'
%
(
self
.
address
,))
def
edit_config
(
self
,
config_files
):
for
config_file
in
config_files
:
with
open
(
config_file
)
as
f
:
config_xml
=
f
.
read
()
try
:
self
.
logger
.
info
(
'Sending edit-config RPC request...'
)
self
.
conn
.
edit_config
(
target
=
'running'
,
config
=
config_xml
)
self
.
logger
.
info
(
'Edit-config RPC request sent successfully'
)
except
RPCError
as
e
:
self
.
logger
.
error
(
'Error sending edit-config RPC request: %s'
%
e
)
def
close
(
self
):
# Close not compatible between ncclient and netconf server
#self.conn.close()
pass
import
sys
sys
.
path
.
append
({{
repr
(
buildout_directory_path
)
}})
from
ncclient_common
import
LopcommNetconfClient
if
__name__
==
'__main__'
:
nc
=
LopcommNetconfClient
(
)
while
True
:
try
:
nc
.
connect
(
"{{ netaddr.IPAddress(slap_configuration.get('tap-ipv6-gateway', '')) }}"
,
830
,
"oranuser"
,
"oranpassword"
)
nc
.
edit_config
([
"{{ CreateProcessingEle_template }}"
,
"{{ cu_config_template }}"
])
break
except
Exception
as
e
:
nc
.
logger
.
debug
(
'Got exception, waiting 10 seconds before reconnecting...'
)
nc
.
logger
.
debug
(
e
)
time
.
sleep
(
10
)
finally
:
nc
.
close
()
nc
=
LopcommNetconfClient
(
log_file
=
"{{ log_file }}"
)
while
True
:
try
:
nc
.
connect
(
"{{ netaddr.IPAddress(slap_configuration.get('tap-ipv6-gateway', '')) }}"
,
830
,
"oranuser"
,
"oranpassword"
)
nc
.
edit_config
([
"{{ CreateProcessingEle_template }}"
,
"{{ cu_config_template }}"
])
break
except
Exception
as
e
:
nc
.
logger
.
debug
(
'Got exception, waiting 10 seconds before reconnecting...'
)
nc
.
logger
.
debug
(
e
)
time
.
sleep
(
10
)
finally
:
nc
.
close
()
software/ors-amarisoft/lopcomm-rrh-reset.jinja2.py
0 → 100644
View file @
48db30d9
#!{{ python_path }}
import
time
import
sys
sys
.
path
.
append
({{
repr
(
buildout_directory_path
)
}})
from
ncclient_common
import
LopcommNetconfClient
if
__name__
==
'__main__'
:
nc
=
LopcommNetconfClient
(
log_file
=
"{{ log_file }}"
)
while
True
:
try
:
nc
.
connect
(
"{{ netaddr.IPAddress(slap_configuration.get('tap-ipv6-gateway', '')) }}"
,
830
,
"oranuser"
,
"oranpassword"
)
nc
.
nc
.
reset_device
()
break
except
Exception
as
e
:
nc
.
logger
.
debug
(
'Got exception, waiting 10 seconds before reconnecting...'
)
nc
.
logger
.
debug
(
e
)
time
.
sleep
(
10
)
finally
:
nc
.
close
()
software/ors-amarisoft/lopcomm-rrh-software.jinja2.py
0 → 100644
View file @
48db30d9
#!{{ python_path }}
import
time
import
xmltodict
import
sys
import
re
import
os
sys
.
path
.
append
({{
repr
(
buildout_directory_path
)
}})
from
ncclient_common
import
LopcommNetconfClient
if
__name__
==
'__main__'
:
nc
=
LopcommNetconfClient
(
log_file
=
"{{ log_file }}"
,
software_reply_json_log_file
=
"{{ software_reply_json_log_file }}"
)
while
True
:
try
:
firmware_check_file
=
os
.
path
.
join
(
'{{etc_path}}'
,
'is_firmware_updated'
)
nc
.
connect
(
"{{ netaddr.IPAddress(slap_configuration.get('tap-ipv6-gateway', '')) }}"
,
830
,
"oranuser"
,
"oranpassword"
)
# Fetch software inventory
inventory_vars
=
nc
.
get_inventory
()
nonrunning_slot_name
=
inventory_vars
[
"nonrunning_slot_name"
]
running_slot_name
=
inventory_vars
[
"running_slot_name"
]
active_nonrunning_slot_name
=
inventory_vars
[
"active_nonrunning_slot_name"
]
nonrunning_slot_name_build_version
=
inventory_vars
[
"nonrunning_slot_name_build_version"
]
running_slot_name_build_version
=
inventory_vars
[
"running_slot_name_build_version"
]
if
running_slot_name
and
nonrunning_slot_name
:
if
running_slot_name
:
nc
.
logger
.
info
(
"One slot is running and one is non-running. Proceeding..."
)
if
running_slot_name_build_version
in
"{{firmware_name}}"
:
if
not
os
.
path
.
exists
(
firmware_check_file
):
open
(
firmware_check_file
,
"w"
).
write
(
'True'
)
nc
.
logger
.
info
(
"Running slot's build-version %s is already updated. Skipping install."
%
running_slot_name_build_version
)
else
:
if
os
.
path
.
exists
(
firmware_check_file
):
os
.
remove
(
firmware_check_file
)
nc
.
logger
.
info
(
"Current build version: %s"
%
running_slot_name_build_version
)
user_authorized_key
=
"""{{ slapparameter_dict.get('user-authorized-key', '') }}"""
match
=
re
.
match
(
r'ssh-rsa ([^\
s]+)
', user_authorized_key)
if match:
extracted_key = match.group(1)
else:
nc.logger.info("No valid key found in user authorized key.")
download_rpc_xml = f"""
<software-download xmlns="urn:o-ran:software-management:1.0">
<remote-file-path>{{remote_file_path}}</remote-file-path>
<server>
<keys>
<algorithm xmlns:ct="urn:ietf:params:xml:ns:yang:ietf-crypto-types">1024</algorithm>
<public-key>{extracted_key}</public-key>
</keys>
</server>
</software-download>
"""
download_reply_xml = nc.custom_rpc_request(download_rpc_xml)
nc.logger.info("Downloading software...")
time.sleep(60)
if download_reply_xml:
nc.logger.info("Download proceed.")
download_data = xmltodict.parse(download_reply_xml)
nc.software_reply_json_logger.info('', extra={'
data
': download_data})
install_rpc_xml = f"""
<software-install xmlns="urn:o-ran:software-management:1.0">
<slot-name>{nonrunning_slot_name}</slot-name>
<file-names>{{firmware_name}}</file-names>
</software-install>
"""
install_reply_xml = nc.custom_rpc_request(install_rpc_xml)
nc.logger.info("Installing software...")
time.sleep(60)
if install_reply_xml:
nc.logger.info("Installation proceed.")
install_data = xmltodict.parse(install_reply_xml)
nc.software_reply_json_logger.info('', extra={'
data
': install_data})
if nonrunning_slot_name_build_version in "{{firmware_name}}":
activate_rpc_xml = f"""
<software-activate xmlns="urn:o-ran:software-management:1.0">
<slot-name>{nonrunning_slot_name}</slot-name>
</software-activate>
"""
activate_reply_xml = nc.custom_rpc_request(activate_rpc_xml)
nc.logger.info("Activating software...")
time.sleep(60)
if activate_reply_xml:
nc.logger.info("Activation proceed.")
activate_data = xmltodict.parse(activate_reply_xml)
nc.software_reply_json_logger.info('', extra={'
data
': activate_data})
nc.get_inventory()
if nonrunning_slot_name_build_version in "{{firmware_name}}" and active_nonrunning_slot_name:
nc.logger.info("Active non-running slot has the updated build version. Resetting device.")
nc.reset_device()
break
except Exception as e:
nc.logger.debug('
Got
exception
,
waiting
10
seconds
before
reconnecting
...
')
nc.logger.debug(str(e))
time.sleep(10)
finally:
nc.close()
software/ors-amarisoft/lopcomm-rrh-stats.jinja2.py
View file @
48db30d9
#!{{ python_path }}
import
json
import
logging
import
time
import
xmltodict
from
logging.handlers
import
RotatingFileHandler
from
ncclient
import
manager
from
ncclient.xml_
import
*
from
ncclient.devices.default
import
DefaultDeviceHandler
class
LopcommNetconfClient
:
def
__init__
(
self
):
log_file
=
"{{ log_file }}"
json_log_file
=
"{{ json_log_file }}"
cfg_json_log_file
=
"{{ cfg_json_log_file }}"
supervision_json_log_file
=
"{{ supervision_json_log_file }}"
ncsession_json_log_file
=
"{{ ncsession_json_log_file }}"
self
.
logger
=
logging
.
getLogger
(
'logger'
)
self
.
json_logger
=
logging
.
getLogger
(
'json_logger'
)
self
.
cfg_json_logger
=
logging
.
getLogger
(
'cfg_json_logger'
)
self
.
supervision_json_logger
=
logging
.
getLogger
(
'supervision_json_logger'
)
self
.
ncsession_json_logger
=
logging
.
getLogger
(
'ncsession_json_logger'
)
self
.
logger
.
setLevel
(
logging
.
DEBUG
)
self
.
json_logger
.
setLevel
(
logging
.
DEBUG
)
self
.
cfg_json_logger
.
setLevel
(
logging
.
DEBUG
)
self
.
supervision_json_logger
.
setLevel
(
logging
.
DEBUG
)
self
.
ncsession_json_logger
.
setLevel
(
logging
.
DEBUG
)
json_handler
=
RotatingFileHandler
(
json_log_file
,
maxBytes
=
100000
,
backupCount
=
5
)
json_formatter
=
logging
.
Formatter
(
'{"time": "%(asctime)s", "log_level": "%(levelname)s", "message": "%(message)s", "data": %(data)s}'
)
json_handler
.
setFormatter
(
json_formatter
)
self
.
json_logger
.
addHandler
(
json_handler
)
cfg_json_handler
=
RotatingFileHandler
(
cfg_json_log_file
,
maxBytes
=
100000
,
backupCount
=
5
)
cfg_json_formatter
=
logging
.
Formatter
(
'{"time": "%(asctime)s", "log_level": "%(levelname)s", "message": "%(message)s", "data": %(data)s}'
)
cfg_json_handler
.
setFormatter
(
cfg_json_formatter
)
self
.
cfg_json_logger
.
addHandler
(
cfg_json_handler
)
supervision_json_handler
=
RotatingFileHandler
(
supervision_json_log_file
,
maxBytes
=
100000
,
backupCount
=
5
)
supervision_json_formatter
=
logging
.
Formatter
(
'{"time": "%(asctime)s", "log_level": "%(levelname)s", "message": "%(message)s", "data": %(data)s}'
)
supervision_json_handler
.
setFormatter
(
supervision_json_formatter
)
self
.
supervision_json_logger
.
addHandler
(
supervision_json_handler
)
ncsession_json_handler
=
RotatingFileHandler
(
ncsession_json_log_file
,
maxBytes
=
100000
,
backupCount
=
5
)
ncsession_json_formatter
=
logging
.
Formatter
(
'{"time": "%(asctime)s", "log_level": "%(levelname)s", "message": "%(message)s", "data": %(data)s}'
)
ncsession_json_handler
.
setFormatter
(
ncsession_json_formatter
)
self
.
ncsession_json_logger
.
addHandler
(
ncsession_json_handler
)
handler
=
RotatingFileHandler
(
log_file
,
maxBytes
=
100000
,
backupCount
=
5
)
self
.
logger
.
addHandler
(
handler
)
formatter
=
logging
.
Formatter
(
"%(asctime)s [%(levelname)s] %(message)s"
)
handler
.
setFormatter
(
formatter
)
if
{{
testing
}}:
return
def
connect
(
self
,
host
,
port
,
user
,
password
):
if
{{
testing
}}:
return
self
.
address
=
(
host
,
port
)
self
.
logger
.
info
(
'Connecting to %s, user %s...'
%
(
self
.
address
,
user
))
self
.
conn
=
manager
.
connect
(
host
=
host
,
port
=
port
,
username
=
user
,
password
=
password
,
timeout
=
1800
,
device_params
=
{
'name'
:
'default'
},
hostkey_verify
=
False
)
self
.
logger
.
info
(
'Connection to %s successful'
%
(
self
.
address
,))
def
subscribe
(
self
):
# Filter not compatible between ncclient and netconf server
#result = self.conn.create_subscription(filter=('xpath', '/o-ran-fm:*'))
sub
=
self
.
conn
.
create_subscription
()
self
.
logger
.
info
(
'Subscription to %s successful'
%
(
self
.
address
,))
def
get_notification
(
self
):
result
=
None
while
result
==
None
:
self
.
logger
.
debug
(
'Waiting for notification from %s...'
%
(
self
.
address
,))
result
=
self
.
conn
.
take_notification
(
block
=
True
)
if
result
:
self
.
logger
.
debug
(
'Got new notification from %s...'
%
(
self
.
address
,))
result_in_xml
=
result
.
_raw
data_dict
=
xmltodict
.
parse
(
result_in_xml
)
if
'alarm-notif'
in
data_dict
[
'notification'
]:
self
.
json_logger
.
info
(
''
,
extra
=
{
'data'
:
data_dict
})
elif
'supervision-notification'
in
data_dict
[
'notification'
]:
self
.
supervision_json_logger
.
info
(
''
,
extra
=
{
'data'
:
data_dict
})
elif
'netconf-session-start'
or
'netconf-session-end'
in
data_dict
[
'notification'
]:
self
.
ncsession_json_logger
.
info
(
''
,
extra
=
{
'data'
:
data_dict
})
else
:
self
.
cfg_json_logger
.
info
(
''
,
extra
=
{
'data'
:
data_dict
})
def
close
(
self
):
# Close not compatible between ncclient and netconf server
#self.conn.close()
pass
import
sys
sys
.
path
.
append
({{
repr
(
buildout_directory_path
)
}})
from
ncclient_common
import
LopcommNetconfClient
if
__name__
==
'__main__'
:
nc
=
LopcommNetconfClient
()
while
True
:
try
:
nc
.
connect
(
"{{ netaddr.IPAddress(slap_configuration.get('tap-ipv6-gateway', '')) }}"
,
830
,
"oranuser"
,
"oranpassword"
)
nc
.
subscribe
()
while
True
:
nc
.
get_notification
()
except
Exception
as
e
:
nc
.
logger
.
debug
(
'Got exception, waiting 10 seconds before reconnecting...'
)
nc
.
logger
.
debug
(
e
)
time
.
sleep
(
10
)
finally
:
nc
.
close
()
nc
=
LopcommNetconfClient
(
log_file
=
"{{ log_file }}"
,
json_log_file
=
"{{ json_log_file }}"
,
cfg_json_log_file
=
"{{ cfg_json_log_file }}"
,
supervision_json_log_file
=
"{{ supervision_json_log_file }}"
,
ncsession_json_log_file
=
"{{ ncsession_json_log_file }}"
,
software_json_log_file
=
"{{ software_json_log_file }}"
)
while
True
:
try
:
nc
.
connect
(
"{{ netaddr.IPAddress(slap_configuration.get('tap-ipv6-gateway', '')) }}"
,
830
,
"oranuser"
,
"oranpassword"
)
nc
.
subscribe
()
while
True
:
nc
.
get_notification
()
except
Exception
as
e
:
nc
.
logger
.
debug
(
'Got exception, waiting 10 seconds before reconnecting...'
)
nc
.
logger
.
debug
(
e
)
time
.
sleep
(
10
)
finally
:
nc
.
close
()
software/ors-amarisoft/lopcomm-rrh-supervision.jinja2.py
0 → 100644
View file @
48db30d9
#!{{ python_path }}
import
time
import
xmltodict
import
sys
import
re
import
os
sys
.
path
.
append
({{
repr
(
buildout_directory_path
)
}})
from
ncclient_common
import
LopcommNetconfClient
if
__name__
==
'__main__'
:
nc
=
LopcommNetconfClient
(
log_file
=
"{{ log_file }}"
,
supervision_reply_json_log_file
=
"{{ supervision_reply_json_log_file }}"
)
try
:
netconf_check_file
=
os
.
path
.
join
(
'{{etc_path}}'
,
'is_netconf_connected'
)
nc
.
connect
(
"{{ netaddr.IPAddress(slap_configuration.get('tap-ipv6-gateway', '')) }}"
,
830
,
"oranuser"
,
"oranpassword"
)
supervision_subscription_rpc_xml
=
"""
<create-subscription xmlns="urn:ietf:params:xml:ns:netconf:notification:1.0">
<stream>o-ran-supervision</stream>
</create-subscription>
"""
nc
.
logger
.
info
(
"Subscription creating..."
)
supervision_subscription_reply_xml
=
nc
.
custom_rpc_request
(
supervision_subscription_rpc_xml
)
if
supervision_subscription_reply_xml
:
nc
.
logger
.
info
(
"Subscription created"
)
supervision_subscription_data
=
xmltodict
.
parse
(
supervision_subscription_reply_xml
)
nc
.
supervision_reply_json_logger
.
info
(
''
,
extra
=
{
'data'
:
supervision_subscription_data
})
while
True
:
supervision_watchdog_rpc_xml
=
"""
<supervision-watchdog-reset xmlns="urn:o-ran:supervision:1.0">
<supervision-notification-interval>60</supervision-notification-interval>
<guard-timer-overhead>10</guard-timer-overhead>
</supervision-watchdog-reset>
"""
nc
.
logger
.
info
(
"NETCONF server replying..."
)
supervision_watchdog_reply_xml
=
nc
.
custom_rpc_request
(
supervision_watchdog_rpc_xml
)
if
supervision_watchdog_reply_xml
:
if
not
os
.
path
.
exists
(
netconf_check_file
):
open
(
netconf_check_file
,
"w"
).
write
(
'True'
)
nc
.
logger
.
info
(
"NETCONF server replied"
)
supervision_watchdog_data
=
xmltodict
.
parse
(
supervision_watchdog_reply_xml
)
nc
.
supervision_reply_json_logger
.
info
(
''
,
extra
=
{
'data'
:
supervision_watchdog_data
})
# It must be the same interval as <supervision-notification-interval>
time
.
sleep
(
60
)
else
:
if
os
.
path
.
exists
(
netconf_check_file
):
os
.
remove
(
netconf_check_file
)
else
:
nc
.
logger
.
debug
(
"Subscription failed."
)
except
Exception
as
e
:
nc
.
logger
.
debug
(
'Got exception, waiting 10 seconds before reconnecting...'
)
nc
.
logger
.
debug
(
str
(
e
))
time
.
sleep
(
10
)
finally
:
nc
.
close
()
\ No newline at end of file
software/ors-amarisoft/ncclient_common.py
0 → 100644
View file @
48db30d9
import
time
import
logging
import
xmltodict
from
logging.handlers
import
RotatingFileHandler
from
ncclient
import
manager
from
ncclient.operations
import
RPCError
from
ncclient.xml_
import
*
from
ncclient.devices.default
import
DefaultDeviceHandler
class
LopcommNetconfClient
:
def
__init__
(
self
,
log_file
,
json_log_file
=
None
,
cfg_json_log_file
=
None
,
supervision_json_log_file
=
None
,
ncsession_json_log_file
=
None
,
software_json_log_file
=
None
,
software_reply_json_log_file
=
None
,
supervision_reply_json_log_file
=
None
,
testing
=
False
):
self
.
logger
=
logging
.
getLogger
(
'logger'
)
self
.
logger
.
setLevel
(
logging
.
DEBUG
)
handler
=
RotatingFileHandler
(
log_file
,
maxBytes
=
100000
,
backupCount
=
5
)
self
.
logger
.
addHandler
(
handler
)
formatter
=
logging
.
Formatter
(
"%(asctime)s [%(levelname)s] %(message)s"
)
handler
.
setFormatter
(
formatter
)
if
json_log_file
:
self
.
json_logger
=
logging
.
getLogger
(
'json_logger'
)
self
.
json_logger
.
setLevel
(
logging
.
DEBUG
)
json_handler
=
RotatingFileHandler
(
json_log_file
,
maxBytes
=
100000
,
backupCount
=
5
)
json_formatter
=
logging
.
Formatter
(
'{"time": "%(asctime)s", "log_level": "%(levelname)s", "message": "%(message)s", "data": %(data)s}'
)
json_handler
.
setFormatter
(
json_formatter
)
self
.
json_logger
.
addHandler
(
json_handler
)
self
.
cfg_json_logger
=
logging
.
getLogger
(
'cfg_json_logger'
)
self
.
cfg_json_logger
.
setLevel
(
logging
.
DEBUG
)
cfg_json_handler
=
RotatingFileHandler
(
cfg_json_log_file
,
maxBytes
=
100000
,
backupCount
=
5
)
cfg_json_formatter
=
logging
.
Formatter
(
'{"time": "%(asctime)s", "log_level": "%(levelname)s", "message": "%(message)s", "data": %(data)s}'
)
cfg_json_handler
.
setFormatter
(
cfg_json_formatter
)
self
.
cfg_json_logger
.
addHandler
(
cfg_json_handler
)
self
.
supervision_json_logger
=
logging
.
getLogger
(
'supervision_json_logger'
)
self
.
supervision_json_logger
.
setLevel
(
logging
.
DEBUG
)
supervision_json_handler
=
RotatingFileHandler
(
supervision_json_log_file
,
maxBytes
=
100000
,
backupCount
=
5
)
supervision_json_formatter
=
logging
.
Formatter
(
'{"time": "%(asctime)s", "log_level": "%(levelname)s", "message": "%(message)s", "data": %(data)s}'
)
supervision_json_handler
.
setFormatter
(
supervision_json_formatter
)
self
.
supervision_json_logger
.
addHandler
(
supervision_json_handler
)
self
.
ncsession_json_logger
=
logging
.
getLogger
(
'ncsession_json_logger'
)
self
.
ncsession_json_logger
.
setLevel
(
logging
.
DEBUG
)
ncsession_json_handler
=
RotatingFileHandler
(
ncsession_json_log_file
,
maxBytes
=
100000
,
backupCount
=
5
)
ncsession_json_formatter
=
logging
.
Formatter
(
'{"time": "%(asctime)s", "log_level": "%(levelname)s", "message": "%(message)s", "data": %(data)s}'
)
ncsession_json_handler
.
setFormatter
(
ncsession_json_formatter
)
self
.
ncsession_json_logger
.
addHandler
(
ncsession_json_handler
)
self
.
software_json_logger
=
logging
.
getLogger
(
'software_json_logger'
)
self
.
software_json_logger
.
setLevel
(
logging
.
DEBUG
)
software_json_handler
=
RotatingFileHandler
(
software_json_log_file
,
maxBytes
=
100000
,
backupCount
=
5
)
software_json_formatter
=
logging
.
Formatter
(
'{"time": "%(asctime)s", "log_level": "%(levelname)s", "message": "%(message)s", "data": %(data)s}'
)
software_json_handler
.
setFormatter
(
software_json_formatter
)
self
.
software_json_logger
.
addHandler
(
software_json_handler
)
else
:
self
.
json_logger
=
None
self
.
cfg_json_logger
=
None
self
.
supervision_json_logger
=
None
self
.
ncsession_json_logger
=
None
self
.
software_json_logger
=
None
if
supervision_reply_json_log_file
:
self
.
supervision_reply_json_logger
=
logging
.
getLogger
(
'supervision_reply_json_logger'
)
self
.
supervision_reply_json_logger
.
setLevel
(
logging
.
DEBUG
)
supervision_reply_json_handler
=
RotatingFileHandler
(
supervision_reply_json_log_file
,
maxBytes
=
100000
,
backupCount
=
5
)
supervision_reply_json_formatter
=
logging
.
Formatter
(
'{"time": "%(asctime)s", "log_level": "%(levelname)s", "message": "%(message)s", "data": %(data)s}'
)
supervision_reply_json_handler
.
setFormatter
(
supervision_reply_json_formatter
)
self
.
supervision_reply_json_logger
.
addHandler
(
supervision_reply_json_handler
)
else
:
self
.
supervision_reply_json_logger
=
None
if
software_reply_json_log_file
:
self
.
software_reply_json_logger
=
logging
.
getLogger
(
'software_reply_json_logger'
)
self
.
software_reply_json_logger
.
setLevel
(
logging
.
DEBUG
)
software_reply_json_handler
=
RotatingFileHandler
(
software_reply_json_log_file
,
maxBytes
=
100000
,
backupCount
=
5
)
software_reply_json_formatter
=
logging
.
Formatter
(
'{"time": "%(asctime)s", "log_level": "%(levelname)s", "message": "%(message)s", "data": %(data)s}'
)
software_reply_json_handler
.
setFormatter
(
software_reply_json_formatter
)
self
.
software_reply_json_logger
.
addHandler
(
software_reply_json_handler
)
else
:
self
.
software_reply_json_logger
=
None
if
testing
:
return
def
connect
(
self
,
host
,
port
,
user
,
password
):
self
.
address
=
(
host
,
port
)
self
.
logger
.
info
(
'Connecting to %s, user %s...'
%
(
self
.
address
,
user
))
self
.
conn
=
manager
.
connect
(
host
=
host
,
port
=
port
,
username
=
user
,
password
=
password
,
timeout
=
1800
,
device_params
=
{
'name'
:
'default'
},
hostkey_verify
=
False
)
self
.
logger
.
info
(
'Connection to %s successful'
%
(
self
.
address
,))
def
subscribe
(
self
):
sub
=
self
.
conn
.
create_subscription
()
self
.
logger
.
info
(
'Subscription to %s successful'
%
(
self
.
address
,))
def
get_notification
(
self
):
result
=
None
while
result
==
None
:
self
.
logger
.
debug
(
'Waiting for notification from %s...'
%
(
self
.
address
,))
result
=
self
.
conn
.
take_notification
(
block
=
True
)
if
result
:
self
.
logger
.
debug
(
'Got new notification from %s...'
%
(
self
.
address
,))
result_in_xml
=
result
.
_raw
data_dict
=
xmltodict
.
parse
(
result_in_xml
)
if
'alarm-notif'
in
data_dict
[
'notification'
]:
self
.
json_logger
.
info
(
''
,
extra
=
{
'data'
:
data_dict
})
elif
'supervision-notification'
in
data_dict
[
'notification'
]:
self
.
supervision_json_logger
.
info
(
''
,
extra
=
{
'data'
:
data_dict
})
elif
'netconf-session-start'
in
data_dict
[
'notification'
]
or
'netconf-session-end'
in
data_dict
[
'notification'
]:
self
.
ncsession_json_logger
.
info
(
''
,
extra
=
{
'data'
:
data_dict
})
elif
any
(
event
in
data_dict
[
'notification'
]
for
event
in
[
'install-event'
,
'activation-event'
,
'download-event'
]):
self
.
software_json_logger
.
info
(
''
,
extra
=
{
'data'
:
data_dict
})
else
:
self
.
cfg_json_logger
.
info
(
''
,
extra
=
{
'data'
:
data_dict
})
def
edit_config
(
self
,
config_files
):
for
config_file
in
config_files
:
with
open
(
config_file
)
as
f
:
config_xml
=
f
.
read
()
try
:
self
.
logger
.
info
(
'Sending edit-config RPC request...'
)
self
.
conn
.
edit_config
(
target
=
'running'
,
config
=
config_xml
)
self
.
logger
.
info
(
'Edit-config RPC request sent successfully'
)
except
RPCError
as
e
:
self
.
logger
.
error
(
'Error sending edit-config RPC request: %s'
%
e
)
def
custom_rpc_request
(
self
,
rpc_xml
):
try
:
self
.
logger
.
info
(
'Sending custom RPC request...'
)
response
=
self
.
conn
.
dispatch
(
to_ele
(
rpc_xml
))
if
response
.
ok
:
self
.
logger
.
info
(
'Custom RPC request sent successfully'
)
return
response
.
xml
else
:
self
.
logger
.
error
(
'Error sending custom RPC request: %s'
%
response
.
error
)
except
RPCError
as
e
:
self
.
logger
.
error
(
'Error sending custom RPC request: %s'
%
e
)
def
reset_device
(
self
):
self
.
logger
.
info
(
'Resetting...'
)
reset_rpc_xml
=
"""
<reset xmlns="urn:o-ran:operations:1.0">
</reset>
"""
reset_reply_xml
=
self
.
custom_rpc_request
(
reset_rpc_xml
)
if
reset_reply_xml
:
reset_data
=
xmltodict
.
parse
(
reset_reply_xml
)
self
.
software_reply_json_logger
.
info
(
''
,
extra
=
{
'data'
:
reset_data
})
self
.
logger
.
info
(
'Wait 60 second then reboot!'
)
time
.
sleep
(
60
)
def
get_inventory
(
self
):
self
.
logger
.
info
(
'Fetching software inventory...'
)
inventory_rpc_xml
=
"""
<get xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">
<filter type="subtree">
<software-inventory xmlns="urn:o-ran:software-management:1.0" />
</filter>
</get>
"""
inventory_reply_xml
=
self
.
custom_rpc_request
(
inventory_rpc_xml
)
if
inventory_reply_xml
:
self
.
logger
.
info
(
'Finish fetching software inventory.'
)
inventory_data
=
xmltodict
.
parse
(
inventory_reply_xml
)
self
.
software_reply_json_logger
.
info
(
''
,
extra
=
{
'data'
:
inventory_data
})
nonrunning_slot_name
=
None
running_slot_name
=
None
active_nonrunning_slot_name
=
None
nonrunning_slot_name_build_version
=
None
running_slot_name_build_version
=
None
software_slots
=
inventory_data
[
'nc:rpc-reply'
][
'data'
][
'software-inventory'
][
'software-slot'
]
for
slot
in
software_slots
:
if
slot
[
'running'
]
==
'false'
:
nonrunning_slot_name
=
slot
[
'name'
]
nonrunning_slot_name_build_version
=
slot
[
'build-version'
]
if
slot
[
'running'
]
==
'true'
:
running_slot_name
=
slot
[
'name'
]
running_slot_name_build_version
=
slot
[
'build-version'
]
elif
slot
[
'active'
]
==
'true'
and
slot
[
'running'
]
==
'false'
:
active_nonrunning_slot_name
=
slot
[
'name'
]
return
{
"nonrunning_slot_name"
:
nonrunning_slot_name
,
"running_slot_name"
:
running_slot_name
,
"active_nonrunning_slot_name"
:
active_nonrunning_slot_name
,
"nonrunning_slot_name_build_version"
:
nonrunning_slot_name_build_version
,
"running_slot_name_build_version"
:
running_slot_name_build_version
}
def
close
(
self
):
# Close not compatible between ncclient and netconf server
#self.conn.close()
pass
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment