Commit 42c7aa1e authored by Joanne Hugé's avatar Joanne Hugé

Initial commit

parents
ltemock/__pycache__
# Amarisoft LTE mock
Mock different Amarisoft LTE softwares for testing purposes, to avoid relying on proprietary software and SDR hardware to launch the simpleran SR
ltemock/lteenb.py
\ No newline at end of file
ltemock/lteims.py
\ No newline at end of file
ltemock/ltemme.py
\ No newline at end of file
#!/usr/bin/env python3
import asyncio
import datetime
import json
import logging
import io, os, sys
import pcpp
import time
import yaml
import websockets
START = time.time()
def time_from_start():
return int((time.time() - START) * 1000) / 1000
def utc():
return int(time.time() * 1000) / 1000
def yamlpp_load(path):
with open(path, 'r') as f:
data = f.read() # original input
p = pcpp.Preprocessor()
p.parse(data)
f = io.StringIO()
p.write(f)
data_ = f.getvalue() # preprocessed input
return yaml.load(data_, Loader=yaml.Loader)
def ready(name):
return json.dumps({
"message" : "ready",
"challenge" : "773bc1c2cf2df900a2d4f60c58b5ddb7",
"type" : name,
"name" : name,
"version" : "2024-11-21",
"time" : time_from_start(),
"utc" : utc(),
})
def main(server, name):
conf = yamlpp_load(sys.argv[1])
with open(conf['log_filename'], 'w+') as log:
log.write("# lte{} mock version 2024-11-21 (x86-64)\n".format(name))
log.write("# Started on {}\n".format(
datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")))
if 'com_addr' in conf:
ip, port = conf['com_addr'].split(':')
asyncio.run(server(ip, port))
while True:
time.sleep(1)
if __name__ == "__main__":
main()
#!/usr/bin/env python3
import asyncio
import json
import websockets
from ltemock import common
async def echo(websocket):
try:
await websocket.send(json.dumps({
"message" : "authenticate",
"challenge" : "773bc1c2cf2df900a2d4f60c58b5ddb7",
"type" : "ENB",
"name" : "ENB",
"version" : "2024-11-21",
"time" : common.time_from_start(),
"utc" : common.utc(),
}))
async for message in websocket:
data = json.loads(message)
if data["message"] == "authenticate":
await websocket.send(json.dumps({
"message" : "authenticate",
"ready" : True,
"time" : common.time_from_start(),
"utc" : common.utc(),
}))
elif data["message"] == "stats":
await websocket.send(json.dumps({
"message": "stats", "instance_id": "6746fc88", "cpu": {"global": 32.2}, "gtp_rx_bitrate": 0, "gtp_tx_bitrate": 0, "rf": {"rx_sample_rate": 23.039985, "tx_sample_rate": 23.039985, "rx_cpu_time": 0.5, "tx_cpu_time": 3, "rx_count": 1376294400, "tx_count": 1376294400, "rxtx_delay_min": 193562551.498, "rxtx_delay_avg": 193592421.401, "rxtx_delay_max": 193622286.486, "rxtx_delay_sd": 17259.742}, "samples": {"tx": [{"rms": -62.07354736328125, "max": -12.665970802307129, "sat": 0, "count": 26557524, "rms_dbm": -68.20354461669922}, {"rms": -62.07354736328125, "max": -12.665970802307129, "sat": 0, "count": 26557524, "rms_dbm": -68.20354461669922}], "rx": [{"rms": -36.66127395629883, "max": -14.014264106750488, "sat": 0, "count": 58927536, "rms_dbm": -55.90127563476562}, {"rms": -38.40644454956055, "max": -15.395360946655273, "sat": 0, "count": 58927536, "rms_dbm": -57.646446228027344}]}, "cells": {"1": {"dl_bitrate": 0, "ul_bitrate": 0, "dl_tx": 0, "ul_tx": 0, "dl_err": 0, "ul_err": 0, "dl_retx": 0, "ul_retx": 0, "dl_use_min": 0, "dl_use_max": 0.549, "dl_use_avg": 0.017, "ul_use_min": 0.078, "ul_use_max": 0.314, "ul_use_avg": 0.137, "dl_sched_users_min": 0, "dl_sched_users_max": 0, "dl_sched_users_avg": 0, "ul_sched_users_min": 0, "ul_sched_users_max": 0, "ul_sched_users_avg": 0, "ue_count_min": 0, "ue_count_max": 0, "ue_count_avg": 0, "ue_active_count_min": 0, "ue_active_count_max": 0, "ue_active_count_avg": 0, "ue_inactive_count_min": 0, "ue_inactive_count_max": 0, "ue_inactive_count_avg": 0, "drb_count_min": 0, "drb_count_max": 0, "drb_count_avg": 0, "dl_gbr_use_min": 0, "dl_gbr_use_max": 0, "dl_gbr_use_avg": 0, "ul_gbr_use_min": 0, "ul_gbr_use_max": 0, "ul_gbr_use_avg": 0, "counters": {"messages": {"prach": 1669}, "errors": {}}}}, "rf_ports": {"0": {"rxtx_delay": {"min": -2147483.648, "max": -2147483.648, "avg": -2147483.648, "sd": 0}}}, "counters": {"messages": {"ng_setup_request": 1, "ng_setup_response": 1}, "errors": {}}, "duration": 60.013, "time": common.time_from_start(), "utc": common.utc()
}))
elif data["message"] == "rf":
await websocket.send(json.dumps({
"message":"rf","tx_gain":[77,77],"rx_gain":[20,20],"rf_info":"TRX SDR driver 2024-11-20, API v15\nPCIe RFIC /dev/sdr0:\n Hardware ID: 0x4b01\n DNA: [0x0070b5443f4a2854]\n Serial: ''\n FPGA revision: 2021-10-08 15:38:12\n FPGA vccint: 1.01 V\n FPGA vccaux: 1.78 V\n FPGA vccbram: 1.01 V\n FPGA temperature: 55.5 °C\n AD9361 temperature: 37 °C\n AGC: Off\n Sync: gps (locked)\n Clock: internal (locked)\n Clock tune: -0.2 ppm\n NUMA: -1\n Caps: \n TX channels: 2; RX channels: 2\n DMA: 1 ch, 32 bits, SMem index: Off, RX Headers: Off\n DMA0: TX fifo: 33.33us Usage=8/12288 (0%)\n DMA0: RX fifo: 33.33us Usage=8/12288 (0%)\n DMA0: TX_Underflows: 0 RX_Overflows: 0\n BUFS: bps=32 TX idx=9293176/52600.113 (33.3us) RX: idx=9293176/52600.111 (33.3us)\n\nGPS info:\n UTC: 2024-11-29 10:21:54\n pos: lat=50.64428° long=3.07739°\n height: 39.1m nb_sats: 12\n","warning":"<input>:1: unused property 'rf_info'","time":common.time_from_start(),"utc":common.utc()
}))
elif data["message"] == "config_get":
await websocket.send(json.dumps({
"message":"config_get","message_id":1,"version":"2024-11-21","type":"ENB","name":"ENB","logs":{"layers":{"PHY":{"level":"info","max_size":0,"key":False,"crypto":False,"payload":False,"signal":False,"cch":False,"rep":False,"dci_size":False,"csi":False,"cell_meas":False,"ntn":False,"icic":False},"MAC":{"level":"error","max_size":0,"key":False,"crypto":False,"payload":False,"sched":False},"RLC":{"level":"error","max_size":0,"key":False,"crypto":False,"payload":False},"PDCP":{"level":"error","max_size":0,"key":False,"crypto":False,"payload":False},"RRC":{"level":"debug","max_size":1,"key":False,"crypto":False,"payload":False,"cell_meas":False},"NAS":{"level":"debug","max_size":1,"key":False,"crypto":False,"payload":False,"plmn":False},"S72":{"level":"error","max_size":0,"key":False,"crypto":False,"payload":False},"S1AP":{"level":"debug","max_size":1,"key":False,"crypto":False,"payload":False},"NGAP":{"level":"debug","max_size":1,"key":False,"crypto":False,"payload":False},"GTPU":{"level":"error","max_size":0,"key":False,"crypto":False,"payload":False},"X2AP":{"level":"debug","max_size":1,"key":False,"crypto":False,"payload":False},"XnAP":{"level":"debug","max_size":1,"key":False,"crypto":False,"payload":False},"M2AP":{"level":"error","max_size":0,"key":False,"crypto":False,"payload":False},"LPPa":{"level":"error","max_size":0,"key":False,"crypto":False,"payload":False},"NRPPa":{"level":"error","max_size":0,"key":False,"crypto":False,"payload":False},"COM":{"level":"error","max_size":0,"key":False,"crypto":False,"payload":False},"TRX":{"level":"error","max_size":0,"key":False,"crypto":False,"payload":False}},"bcch":False,"mib":False,"signal":False,"cch":False,"rep":False,"dci_size":False,"csi":False,"cell_meas":False,"ntn":False,"count":8192,"rotate":1000000000,"path":"/dev/null"},"tai":1732875924.823,"global_enb_id":{"plmn":"00101","enb_id_type":"macro","enb_id":131075,"enb_name":"enb20003"},"cells":{"35":{"n_antenna_dl":2,"n_antenna_ul":2,"n_layer_dl":2,"n_layer_ul":1,"gain":0,"ul_disabled":False,"rf_port":0,"dl_qam":256,"ul_qam":64,"ecgi":{"plmn":"00101","eci":33555235},"cell_barred":False,"n_id_cell":203,"n_rb_dl":100,"n_rb_ul":100,"dl_earfcn":38550,"ul_earfcn":38550,"band":39,"dl_freq":1910000000,"ul_freq":1910000000,"mode":"TDD","uldl_config":6,"sp_config":7,"prach_sequence_index":303,"dl_cyclic_prefix":"normal","ul_cyclic_prefix":"normal","prach_config_index":4,"prach_freq_offset":4,"delta_pucch_shift":2,"n_rb_cqi":1,"n_cs_an":0,"pucch_allocation":[{"type":"2/2a/2b","rbs":1,"n":6}],"pucch_ack_nack_start":11,"pucch_reserved_rbs":[0,0,7,7,7,0,0,7,7,0],"sr_resource_count":110,"cqi_resource_count":120,"srs_resources":{"offsets":20,"freqs":20,"total":800},"gbr":{"dl_limit":4243840,"ul_limit":5267520},"connected_mobility":{"scell_config_a4_a2":False,"scell_config_a6":False,"eutra_handover_intra":True,"eutra_handover_inter":True,"eutra_cell_redirect_intra":False,"eutra_cell_redirect_inter":False,"nr_handover":True,"nr_cell_redirect":False,"en_dc_setup":False},"ncell_list":[{"rat":"nr","ssb_nr_arfcn":629088,"n_id_nrcell":200,"ncgi":{"plmn":"00101","nci":33554464},"handover_target":True,"cell_redirect_target":True},{"rat":"eutra","dl_earfcn":38550,"n_id_cell":202,"ecgi":{"plmn":"00101","eci":33554978},"handover_target":True,"cell_redirect_target":True,"eps_fallback_target":False,"emergency_fallback_target":False},{"rat":"eutra","dl_earfcn":38550,"n_id_cell":201,"ecgi":{"plmn":"00101","eci":33554721},"handover_target":True,"cell_redirect_target":True,"eps_fallback_target":False,"emergency_fallback_target":False}],"manual_ref_signal_power":True,"ref_signal_power":42,"tac":1,"plmn_list":[{"plmn":"00101","reserved":False}]}},"rx_channels":[{"gain":20,"freq":1910,"port":0},{"gain":20,"freq":1910,"port":0}],"tx_channels":[{"gain":77,"freq":1910,"port":0},{"gain":77,"freq":1910,"port":0}],"rf_ports":[{"sample_rate":23040000,"ul_freq_shift":0}],"time":common.time_from_start(),"utc":common.utc()
}))
except (asyncio.exceptions.IncompleteReadError, asyncio.exceptions.CancelledError, websockets.exceptions.ConnectionClosedError) as e:
pass
async def server(ip, port):
async with websockets.serve(echo, ip, port):
await asyncio.get_running_loop().create_future() # run forever
def main():
common.main(server, "lte")
if __name__ == "__main__":
main()
#!/usr/bin/env python3
import asyncio
import websockets
from ltemock import common
async def echo(websocket):
try:
await websocket.send(common.ready("IMS"))
async for message in websocket:
pass
except (asyncio.exceptions.IncompleteReadError, asyncio.exceptions.CancelledError, websockets.exceptions.ConnectionClosedError) as e:
pass
async def server(ip, port):
async with websockets.serve(echo, ip, port):
await asyncio.get_running_loop().create_future() # run forever
def main():
common.main(server, "ims")
if __name__ == "__main__":
main()
#!/usr/bin/env python3
import asyncio
import websockets
from ltemock import common
async def echo(websocket):
try:
await websocket.send(common.ready("MME"))
async for message in websocket:
pass
except (asyncio.exceptions.IncompleteReadError, asyncio.exceptions.CancelledError, websockets.exceptions.ConnectionClosedError) as e:
pass
async def server(ip, port):
async with websockets.serve(echo, ip, port):
await asyncio.get_running_loop().create_future() # run forever
def main():
common.main(server, "mme")
if __name__ == "__main__":
main()
#!/usr/bin/env python3
import asyncio
import websockets
from ltemock import common
async def echo(websocket):
try:
await websocket.send(common.ready("UE"))
async for message in websocket:
pass
except (asyncio.exceptions.IncompleteReadError, asyncio.exceptions.CancelledError, websockets.exceptions.ConnectionClosedError) as e:
pass
async def server(ip, port):
async with websockets.serve(echo, ip, port):
await asyncio.get_running_loop().create_future() # run forever
def main():
common.main(server, "ue")
if __name__ == "__main__":
main()
ltemock/lteue.py
\ No newline at end of file
from setuptools import setup, find_packages
setup(
name='amarisoft-lte-mock',
version='0.1.0',
author = 'Nexedi',
license = 'GPL 2+',
packages=find_packages(),
entry_points = {
'console_scripts': [
'lteenb=ltemock.lteenb:main',
'ltemme=ltemock.ltemme:main',
'lteims=ltemock.lteims:main',
],
},
install_requires = ['pcpp', 'PyYAML', 'websockets'],
)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment