Commit b44f1bab authored by Joanne Hugé's avatar Joanne Hugé

Add websocket client

parent 3e1f0a39
import time
import json
from websocket import create_connection
url = "ws://[2a11:9ac0:d::1]:9002"
class UeError(Exception):
pass
def send(ws, msg):
ws.send(json.dumps(msg))
def recv(ws):
return json.loads(ws.recv())
def ue_get(ws):
send(ws, {
"message": "ue_get",
})
result = recv(ws)
if result['message'] != 'ue_get':
raise UeError("Unexpected response")
if not result['ue_list']:
raise UeError("No UE")
return result['ue_list'][0]
def power_on(ws, ue_id):
if ue_get(ws)['power_on']:
raise UeError("UE already powered on")
send(ws, {
"message": "power_on",
"ue_id": ue_id,
})
recv(ws)
def power_off(ws, ue_id):
if not ue_get(ws)['power_on']:
raise UeError("UE already powered off")
send(ws, {
"message": "power_off",
"ue_id": ue_id,
})
recv(ws)
try:
try:
ws = create_connection(url)
except ConnectionRefusedError:
raise UeError("Couldn't connect to remote API")
try:
result = recv(ws)
if result['message'] != 'ready':
raise UeError("lteue not ready")
result = ue_get(ws)
ue_id = result['ue_id']
try:
power_on(ws, ue_id)
time.sleep(2)
result = ue_get(ws)
if 'pdn_list' not in result:
raise UeError("UE didn't connect")
print(result['pdn_list'][0]['ipv4'])
finally:
power_off(ws, ue_id)
finally:
ws.close()
except UeError as e:
print(e)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment