Commit 2a016d48 authored by Kirill Smelkov's avatar Kirill Smelkov

Draft support for E-UTRAN IP Throughput KPI

The most interesting patches are

- d102ffaa (drb: Start of the package)
- 5bf7dc1c (amari.{drb,xlog}: Provide aggregated DRB statistics in the form of synthetic x.drb_stats message)
- 499a7c1b (amari.kpi: Teach LogMeasure to handle x.drb_stats messages)
- 2824f50d (kpi: Calc: Add support for E-UTRAN IP Throughput KPI)
- 4b2c8c21 (demo/kpidemo.*: Add support for E-UTRAN IP Throughput KPI + demonstrate it in the notebook)

The other patches introduce or adjust needed infrastructure. A byproduct
of particular note is that kpi.Measurement now supports QCI.

A demo might be seen in the last part of
https://nbviewer.org/urls/lab.nexedi.com/kirr/xlte/raw/43aac33e/demo/kpidemo.ipynb

And below we provide the overall overview of the implementation.

Overview of E-UTRAN IP Throughput computation
---------------------------------------------

Before we begin explaining how IP Throughput is computed, let's first refresh
what it is and have a look at what is required to compute it reasonably.

This KPI is defined in TS 32.450[1] and aggregates transmission volume and
time over bursts of transmissions from an average UE point of view. It should be
particularly noted that only the time, during which transmission is going on,
should be accounted. For example if an UE receives 10KB over 4ms burst and the rest of
the time there is no transmission to it during, say, 1 minute, the downlink IP
Throughput for that UE over the minute is 20Mbit/s (= 8·10KB/4ms), not 1.3Kbit/s (= 8·10KB/60s).
This KPI basically shows what would be the speed to e.g. download a response for
HTTP request issued from a mobile.

[1] https://www.etsi.org/deliver/etsi_ts/132400_132499/132450/16.00.00_60/ts_132450v160000p.pdf#page=13

To compute IP Throughput we thus need to know Σ of transmitted amount
of bytes, and Σ of the time of all transmission bursts.

Σ of the bytes is relatively easy to get. eNB already provides close values in
overall `stats` and in per-UE `ue_get[stats]` messages. However there is no
anything readily available out-of-the box for Σ of bursts transmission time.
Thus we need to measure the time of transmission bursts ourselves somehow.

It turns out that with current state of things the only practical way to
measure it to some degree is to poll eNB frequently with `ue_get[stats]` and
estimate transmission time based on δ of `ue_get` timestamps.

Let's see how frequently we need to poll to get to reasonably accuracy of resulting throughput.

A common situation for HTTP requests issued via LTE is that response content
downloading time takes only few milliseconds. For example I used chromium
network profiler to access various sites via internet tethered from my phone
and saw that for many requests response content downloading time was e.g. 4ms,
5ms, 3.2ms, etc. The accuracy of measuring transmission time should be thus in
the order of millisecond to cover that properly. It makes a real difference for
reported throughput, if say a download sample with 10KB took 4ms, or it took
e.g. "something under 100ms". In the first case we know that for that sample
downlink throughput is 2500KB/s, while in the second case all we know is that
downlink throughput is "higher than 100KB/s" - a 25 times difference and not
certain. Similarly if we poll at 10ms rate we would get that throughput is "higher
than 1000KB/s" - a 2.5 times difference from actual value. The accuracy of 1
millisecond coincides with TTI time and with how downlink/uplink transmissions
generally work in LTE.

With the above the scheme to compute IP Throughput looks to be as
follows: poll eNB at 1000Hz rate for `ue_get[stats]`, process retrieved
information into per-UE and per-QCI streams, detect bursts on each UE/QCI pair,
and aggregate `tx_bytes` and `tx_time` from every burst.

It looks to be straightforward, but 1000Hz polling will likely create
non-negligible additional load on the system and disturb eNB itself
introducing much jitter and harming its latency requirements. That's probably
why eNB actually rate-limits WebSocket requests not to go higher than 100Hz -
the frequency 10 times less compared to what we need to get to reasonable
accuracy for IP throughput.

Fortunately there is additional information that provides a way to improve
accuracy of measured `tx_time` even when polled every 10ms at 100Hz rate:
that additional information is the number of transmitted transport blocks to/from
an UE. If we know that during 10ms frame it was e.g. 4 transport blocks transmitted
to the UE, that there were no retransmissions *and* that eNB is not congested, we can
reasonably estimate that it was actually a 4ms transmission. And if eNB is
congested we can still say that transmission time is somewhere in `[4ms, 10ms]`
interval because transmitting each transport block takes 1 TTI. Even if
imprecise that still provides some information that could be useful.

Also 100Hz polling turns to be acceptable from performance point of view and
does not disturb the system much. For example on the callbox machine the process,
that issues polls, takes only about 3% of CPU load and only on one core, and
the CPU usage of eNB does not practically change and its reported tx/rx latency
does not change as well. For sure, there is some disturbance, but it appears to
be small. To have a better idea of what rate of polling is possible, I've made
an experiment with the poller accessing my own websocket echo server quickly
implemented in python. Both the poller and the echo server are not optimized,
but without rate-limiting they could go to 8000Hz frequency with reaching 100%
CPU usage of one CPU core. That 8000Hz is 80x times more compared to 100Hz
frequency actually allowed by eNB. This shows what kind of polling
frequency limit the system can handle, if absolutely needed, and that 100Hz
turns out to be not so high a frequency. Also the Linux 5.6 kernel, installed
on the callbox from Fedora32, is configured with `CONFIG_HZ=1000`, which is
likely helping here.

Implementation overview
~~~~~~~~~~~~~~~~~~~~~~~

The scheme to compute E-UTRAN IP Throughput is thus as follows: poll eNB at
100Hz frequency for `ue_get[stats]` and retrieve information about per-UE/QCI
streams and the number of transport blocks dl/ul-ed to the UE in question
during that 10ms frame. Estimate `tx_time` taking into account
the number of transmitted transport blocks. And estimate whether eNB is congested or
not based on `dl_use_avg`/`ul_use_avg` taken from `stats`. For the latter we
also need to poll for `stats` at 100Hz frequency and synchronize
`ue_get[stats]` and `stats` requests in time so that they both cover the same
time interval of particular frame.

Then organize the polling process to provide aggregated statistics in the form of
new `x.drb_stats` message, and teach `xamari xlog` to save that messages to
`enb.xlog` together with `stats`. Then further adjust `amari.kpi.LogMeasure`
and generic `kpi.Measurement` and `kpi.Calc` to handle DRB-related data.

That is how it is implemented.

The main part, that performs 100Hz polling and flow aggregation, is in amari/drb.py.
There `Sampler` extracts bursts of data transmissions from stream of `ue_get[stats]`
observations and `x_stats_srv` organizes whole 100Hz sampling process and provides
aggregated `x.drb_stats` messages to `amari.xlog`.

Even though the main idea is relatively straightforward, several aspects
deserves to be noted:

1. information about transmitted bytes and corresponding transmitted transport
   blocks is emitted by eNB not synchronized in time. The reason here is that,
   for example, for DL a block is transmitted via PDCCH+PDSCH during one TTI, and
   then the base station awaits HARQ ACK/NACK. That ACK/NACK comes later via
   PUCCH or PUSCH. The time window in between original transmission and
   reception of the ACK/NACK is 4 TTIs for FDD and 4-13 TTIs for TDD(*).
   And Amarisoft LTEENB updates counters for dl_total_bytes and dl_tx at
   different times:

       ue.erab.dl_total_bytes      - right after sending data on  PDCCH+PDSCH
       ue.cell.{dl_tx,dl_retx}     - after receiving ACK/NACK via PUCCH|PUSCH

   this way an update to dl_total_bytes might be seen in one frame (= 10·TTI),
   while corresponding update to dl_tx/dl_retx might be seen in either same, or
   next, or next-next frame.

   `Sampler` brings δ(tx_bytes) and #tx_tb in sync itself via `BitSync`.

2. when we see multiple transmissions related to UE on different QCIs, we
   cannot directly use corresponding global number of transport blocks to estimate
   transmissions times because we do not know how eNB scheduler placed those
   transmissions onto resource map. So without additional information we can only
   estimate corresponding lower and upper bounds.

3. for output stability and to avoid throughput being affected by partial fill
   of tail TTI of a burst, E-UTRAN IP Throughput is required to be computed
   without taking into account last TTI of every sample. We don't have that
   level of details since all we have is total amount of transmitted bytes in a
   burst and estimation of how long in time the burst is. Thus, once again, we
   can only provide an estimation so that resulting E-UTRAN IP
   Throughput uncertainty window cover the right value required by 3GPP standard.

A curious reader might be interested to look at tests in `amari/drb_test.py` ,
and at the whole changes that brought E-UTRAN IP Throughput alive.

Limitations
~~~~~~~~~~~

Current implementation has the following limitations:

- we account whole PDCP instead of only IP traffic.
- the KPI is computed with uncertainty window instead of being precise even when the
  connection to eNB is alive all the time. The shorter bursts are the more
  the uncertainty.
- the implementation works correctly for FDD, but not for TDD. That's because
  BitSync currently supports only "next frame" case and support for "next-next
  frame" case is marked as TODO.
- eNB `t` monitor command practically stops working and now only reports
  ``Warning, remote API ue_get (stats = true) pending...`` instead of reporting
  useful information. This is due to that contrary to `stats`, for `ue_get` eNB
  does not maintain per-connection state and uses global singleton counters.
- the performance overhead might be more noticeable on machines less
  powerful compared to callbox.

To address the limitations I plan to talk to Amarisoft about eNB improvements
so that E-UTRAN IP Throughput could be computed precisely from DRB statistics
directly provided by eNB itself.

However it is still useful to have current implementation, even with all its
limitations, because it already works today with existing eNB versions.

Kirill
parents e1a5ceea 43aac33e
......@@ -5,6 +5,7 @@
XLTE repository provides assorted tools and packages with functionality related to LTE:
- `kpi` - process measurements and compute KPIs from them.
- `amari.drb` - infrastructure to process flows on data radio bearers.
- `amari.kpi` - driver for Amarisoft LTE stack to retrieve KPI-related measurements from logs.
- `amari.xlog` - extra logging facilities for Amarisoft LTE stack.
- `xamari` - supplementary tool for managing Amarisoft LTE services.
# -*- coding: utf-8 -*-
# Copyright (C) 2022 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com>
# Copyright (C) 2022-2023 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com>
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your
......@@ -45,21 +45,23 @@ class ConnClosedError(ConnError):
# connect connects to a service via WebSocket.
def connect(wsuri): # -> Conn
def connect(ctx, wsuri): # -> Conn
#websocket.enableTrace(True) # TODO on $XLTE_AMARI_WS_DEBUG=y ?
ws = websocket.WebSocket()
ws.settimeout(5) # reasonable default
try:
# FIXME handle ctx cancel (but it won't stuck forever due to ._ws own timeout)
ws.connect(wsuri)
except Exception as ex:
raise ConnError("connect") from ex
return Conn(ws)
return Conn(ws, wsuri)
# Conn represents WebSocket connection to a service.
#
# It provides functionality to issue requests, and (TODO) to receive notifications.
# Conn should be created via connect.
class Conn:
# .wsuri websocket uri of the service
# ._ws websocket connection to service
# ._srv_ready_msg message we got for "ready"
......@@ -71,7 +73,7 @@ class Conn:
# ._rx_wg sync.WorkGroup for spawned _serve_recv
# ._down_once sync.Once
def __init__(conn, ws):
def __init__(conn, ws, wsuri):
try:
msg0_raw = ws.recv()
msg0 = json.loads(msg0_raw)
......@@ -82,6 +84,7 @@ class Conn:
ws.close()
raise ConnError("handshake") from ex
conn.wsuri = wsuri
conn._ws = ws
conn._srv_ready_msg = msg0
......@@ -167,13 +170,13 @@ class Conn:
# req sends request and waits for response.
def req(conn, msg, args_dict): # -> response
rx, _ = conn.req_(msg, args_dict)
def req(conn, ctx, msg, args_dict): # -> response
rx, _ = conn.req_(ctx, msg, args_dict)
return rx
@func
def req_(conn, msg, args_dict): # -> response, raw_response
rxq = conn._send_msg(msg, args_dict)
def req_(conn, ctx, msg, args_dict): # -> response, raw_response
rxq = conn._send_msg(ctx, msg, args_dict)
# handle rx timeout ourselves. We cannot rely on global rx timeout
# since e.g. other replies might be coming in again and again.
......@@ -185,10 +188,13 @@ class Conn:
rxt = _.c
_, _rx = select(
rxt.recv, # 0
rxq.recv_, # 1
ctx.done().recv, # 0
rxt.recv, # 1
rxq.recv_, # 2
)
if _ == 0:
raise ctx.err()
if _ == 1:
raise websocket.WebSocketTimeoutException("timed out waiting for response")
_, ok = _rx
......@@ -201,7 +207,7 @@ class Conn:
# _send_msg sends message to the service.
def _send_msg(conn, msg, args_dict): # -> rxq
def _send_msg(conn, ctx, msg, args_dict): # -> rxq
assert isinstance(args_dict, dict)
assert 'message' not in args_dict
assert 'message_id' not in args_dict
......@@ -217,6 +223,7 @@ class Conn:
d.update(args_dict)
jmsg = json.dumps(d)
try:
# FIXME handle ctx cancel (but it won't stuck forever due to ._ws own timeout)
conn._ws.send(jmsg)
except Exception as ex:
raise ConnError("send") from ex
......
# -*- coding: utf-8 -*-
# Copyright (C) 2023 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com>
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your
# option) any later version, as published by the Free Software Foundation.
#
# You can also Link and Combine this program with other software covered by
# the terms of any of the Free Software licenses or any of the Open Source
# Initiative approved licenses and Convey the resulting work. Corresponding
# source of such a combination shall include the source code for all other
# software used.
#
# This program is distributed WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See COPYING file for full licensing terms.
# See https://www.nexedi.com/licensing for rationale and options.
"""Package drb provides infrastructure to process flows on data radio bearers.
- Sampler converts information about data flows obtained via ue_get[stats] into
Samples that represent bursts of continuous transmissions.
- _x_stats_srv uses Sampler to process data flows at 100Hz rate and aggregates
results into information needed to compute E-UTRAN IP Throughput KPI. The
information is emitted in the form of synthetic x.drb_stats message whose
generation is integrated into amari.xlog package.
Please see amari.kpi and xlte.kpi packages that turn x.drb_stats data into
final E-UTRAN IP Throughput KPI value.
See also the following related 3GPP standards references:
- TS 32.450 6.3.1 "E-UTRAN IP Throughput"
- TS 32.425 4.4.6 "IP Throughput measurements"
"""
from xlte import amari
from golang import chan, select, default, nilchan, func, defer
from golang import sync, time
import math
import sys
tti = 1*time.millisecond # = 1·subframe Ts = 1/(2048·15000)·s ≈ 32.6 ns
# Tsymb = 2048·Ts ≈ 66.7 μs
# Slot = 7.5·Tsymb = 15350·Ts = 0.5 ms
# Subframe = 2·Slot = 1 ms
# Frame = 10·Subframe = 10 ms
# Sampler collects information about DRB usage and converts that to per-QCI UE Samples.
#
# - use .add to append ue_stats/stats as input information and get finalized Samples.
# - use .finish to wrap-up and retrieve rest of the Samples and reset the sampler.
class Sampler:
__slots__ = (
'_dl_sampler', # _Sampler('dl')
'_ul_sampler', # _Sampler('ul')
)
# Sample represents one burst of continuous transmission to/from particular UE on particular QCI.
#
# A transmission is continuous if during its time corresponding transmission
# buffer is not empty. For example a transmission where something is sent
# during 5 consecutive TTIs is continuous. As well as if something is sent not
# every TTI, but the buffer is not empty during pauses and the pauses are e.g.
# due to congestion - it is also said to be continuous transmission:
#
# | |x|x|x|x|x| |
#
# | |x|x| |x| | |x|x| |
# ↑ ↑ ↑
# buffer is not empty - the transmission sample continues
class Sample:
__slots__ = (
'tx_bytes', # amount of bytes transmitted
'tx_time', # time interval during which sample transmission was made
'tx_time_err', # accuracy of tx_time measurement
)
# _Sampler serves Sampler for one of 'dl' or 'ul' direction.
class _Sampler:
__slots__ = (
'dir', # 'dl' or 'ul'
'use_bitsync', # whether to use _BitSync
'use_ri', # whether to pay attention to rank indicator
't', # timestamp of last ue_stats
'ues', # {} ue -> _UE current state of all tracked UEs
)
# _UE represents tracking of data transmission of particular UE.
class _UE:
__slots__ = (
'erab_flows', # {} erab_id -> _ERAB_Flow current state of all erabs related to UE
'qci_flows', # {} qci -> _QCI_Flow in-progress collection of UE-related samples
'bitsync', # None | _BitSync to synchronize δtx_bytes with #tx on updates
)
# _ERAB_Flow tracks data transmission on particular ERAB of particular UE.
class _ERAB_Flow:
__slots__ = (
'qci', # qci as last reported by ue_get
'tx_total_bytes', # total amount transmitted as last reported by ue_get
)
# _QCI_Flow represents in-progress collection to make up a Sample.
#
# .update(δt, tx_bytes, #tx, ...) updates flow with information about next
# transmission period and potentially yields some finalized Samples.
# .finish() completes Sample collection.
class _QCI_Flow:
__slots__ = (
'tx_bytes', # already accumulated bytes
'tx_time', # already accumulated time
'tx_time_err', # accuracy of ^^^
)
# _BitSync helps _Sampler to match δtx_bytes and #tx in transmission updates.
#
# For example for DL a block is transmitted via PDCCH+PDSCH during one TTI, and
# then the base station awaits HARQ ACK/NACK. That ACK/NACK comes later via
# PUCCH or PUSCH. The time window in between original transmission and
# reception of the ACK/NACK is 4 TTIs for FDD and 4-13 TTIs for TDD(*).
# And Amarisoft LTEENB updates counters for dl_total_bytes and dl_tx at
# different times:
#
# ue.erab.dl_total_bytes - right after sending data on PDCCH+PDSCH
# ue.cell.{dl_tx,dl_retx} - after receiving ACK/NACK via PUCCH|PUSCH
#
# this way an update to dl_total_bytes might be seen in one frame (= 10·TTI),
# while corresponding update to dl_tx/dl_retx might be seen in either same, or
# next, or next-next frame.
#
# What _BitSync does is that it processes stream of tx_bytes/#tx and emits
# adjusted stream with #tx corresponding to tx_bytes coming together
# synchronized in time.
#
# .next(δt, tx_bytes, #tx, X) -> [](δt', tx_bytes', #tx', X')
# .finish() -> [](δt', tx_bytes', #tx', X')
#
# (*) see e.g. Figure 8.1 in "An introduction to LTE, 2nd ed."
class _BitSync:
__slots__ = (
'txq', # [](δt,tx_bytes,#tx,X) not-yet fully processed tail of whole txv
'i_txq', # txq represents txv[i_txq:]
'i_lshift', # next left shift will be done on txv[i_lshift] <- txv[i_lshift+1]
)
# Sampler() creates new sampler that will start sampling from ue_stats0/stats0 state.
@func(Sampler)
def __init__(s, ue_stats0, stats0):
s._dl_sampler = _Sampler('dl', ue_stats0, stats0, use_bitsync=True, use_ri=True)
s._ul_sampler = _Sampler('ul', ue_stats0, stats0,
use_bitsync=False, # for ul tx_bytes and #tx come, it seems, synchronized out of the box
use_ri=False) # no rank indication for ul - assume siso
# TODO also use upcoming ul_rank+ul_n_layer
@func(_Sampler)
def __init__(s, dir, ue_stats0, stats0, use_bitsync, use_ri):
s.dir = dir
s.t = -1 # so that add(t=0, init) works
s.use_bitsync = use_bitsync
s.use_ri = use_ri
s.ues = {}
_ = s.add(ue_stats0, stats0, init=True)
assert _ == {}
for ue in s.ues.values():
assert ue.qci_flows == {}
# _UE() creates new empty UE-tracking entry.
@func(_UE)
def __init__(ue, use_bitsync):
ue.erab_flows = {}
ue.qci_flows = {}
ue.bitsync = _BitSync() if use_bitsync else None
# finish wraps up all in-progress flows.
#
# and returns all remaining samples.
# The sampler is reset after retrieval.
@func(Sampler)
def finish(s): # dl/ul samples ; dl/ul = {} qci -> []Sample
dl = s._dl_sampler.finish()
ul = s._ul_sampler.finish()
return (dl, ul)
@func(_Sampler)
def finish(s):
qci_samples = {}
for ue in s.ues.values():
# wrap-up in-progress bitsync
if ue.bitsync is not None:
bitnext = ue.bitsync.finish()
ue._update_qci_flows(bitnext, qci_samples)
# wrap-up all in-progress flows
for qci, flow in ue.qci_flows.items():
_ = flow.finish()
for sample in _:
qci_samples.setdefault(qci, []).append(sample)
ue.qci_flows = {}
# preserve .erab_flows as if we were initialized with corresponding ue_stats0.
return qci_samples
# add feeds next ue_get[stats] + stats reports to the sampler.
#
# and returns samples that become finalized during this addition.
@func(Sampler)
def add(s, ue_stats, stats): # -> dl/ul samples ; dl/ul = {} qci -> []Sample
dl = s._dl_sampler.add(ue_stats, stats)
ul = s._ul_sampler.add(ue_stats, stats)
return dl, ul
class _Utx: # transmission state passed through bitsync
__slots__ = (
'qtx_bytes',
'rank',
'xl_use_avg',
)
@func(_Sampler)
def add(s, ue_stats, stats, init=False):
t = ue_stats['utc']
δt = t - s.t
s.t = t
assert δt > 0
qci_samples = {} # qci -> []Sample samples finalized during this add
ue_live = set() # of ue ue that are present in ue_stats
# go through all UEs and update/finalize flows from information on per-UE erabs.
for ju in ue_stats['ue_list']:
ue_id = ju['enb_ue_id'] # TODO 5G: -> ran_ue_id + qos_flow_list + sst?
ue_live.add(ue_id)
if len(ju['cells']) != 1:
raise RuntimeError(("ue #%s belongs to %d cells; "+
"but only single-cell configurations are supported") % (ue_id, len(ju(['cells']))))
cell = ju['cells'][0]
tx = cell['%s_tx' % s.dir] # in transport blocks
retx = cell['%s_retx' % s.dir] # ----//----
assert tx >= 0, tx
assert retx >= 0, retx
cell_id = cell['cell_id'] # int
scell = stats['cells'][str(cell_id)]
u = _Utx()
u.qtx_bytes = {} # qci -> Σδerab_qci=qci
u.rank = cell['ri'] if s.use_ri else 1
u.xl_use_avg = scell['%s_use_avg' % s.dir]
ue = s.ues.get(ue_id)
if ue is None:
ue = s.ues[ue_id] = _UE(s.use_bitsync)
# erabs: δ(tx_total_bytes) -> tx_bytes ; prepare per-qci tx_bytes
tx_bytes = 0 # Σδerab
eflows_live = set() # of erab erabs that are present in ue_stats for this ue
for erab in ju['erab_list']:
erab_id = erab['erab_id']
qci = erab['qci']
eflows_live.add(erab_id)
ef = ue.erab_flows.get(erab_id)
if ef is None:
ef = ue.erab_flows[erab_id] = _ERAB_Flow()
ef.qci = qci
ef.tx_total_bytes = 0
etx_total_bytes = erab['%s_total_bytes' % s.dir]
if not (ef.qci == qci and ef.tx_total_bytes <= etx_total_bytes):
# restart erab flow on change of qci or tx_total_bytes↓
ef.qci = qci
ef.tx_total_bytes = 0
etx_bytes = etx_total_bytes - ef.tx_total_bytes
ef.tx_total_bytes = etx_total_bytes
tx_bytes += etx_bytes
if etx_bytes != 0:
u.qtx_bytes[qci] = u.qtx_bytes.get(qci,0) + etx_bytes
# debug
if 0 and s.dir == 'dl' and (etx_bytes != 0 or tx != 0 or retx != 0) and qci==9:
sfnx = ((t // tti) / 10) % 1024 # = SFN.subframe
_debug('% 4.1f ue%s %s .%d: etx_total_bytes: %d +%5d tx: %2d retx: %d ri: %d bitrate: %d' % \
(sfnx, ue_id, s.dir, qci, etx_total_bytes, etx_bytes, tx, retx, u.rank, cell['%s_bitrate' % s.dir]))
# gc non-live erabs
for erab_id in set(ue.erab_flows.keys()):
if erab_id not in eflows_live:
del ue.erab_flows[erab_id]
# bitsync <- (δt, tx_bytes, #tx, u)
tx += retx # both transmission and retransmission take time
if ue.bitsync is not None:
bitnext = ue.bitsync.next(δt, tx_bytes, tx, u)
else:
bitnext = [(δt, tx_bytes, tx, u)]
# update qci flows
if init:
continue
ue._update_qci_flows(bitnext, qci_samples)
# finish non-live ue
for ue_id in set(s.ues.keys()):
if ue_id not in ue_live:
ue = s.ues.pop(ue_id)
if ue.bitsync is not None:
bitnext = ue.bitsync.finish()
ue._update_qci_flows(bitnext, qci_samples)
return qci_samples
# _update_qci_flows updates .qci_flows for ue with (δt, tx_bytes, #tx, _Utx) yielded from bitsync.
#
# yielded samples are appended to qci_samples ({} qci -> []Sample).
@func(_UE)
def _update_qci_flows(ue, bitnext, qci_samples):
for (δt, tx_bytes, tx, u) in bitnext:
qflows_live = set() # of qci qci flows that get updated from current utx entry
# it might happen that even with correct bitsync we could end up with receiving tx=0 here.
# for example it happens if finish interrupts proper bitsync workflow e.g. as follows:
#
# 1000 0
# <-- finish
# 0 10
#
# if we see #tx = 0 we say that it might be anything in between 1 and δt.
tx_lo = tx_hi = tx
if tx == 0:
tx_hi = δt/tti
tx_lo = min(1, tx_hi)
for qci, tx_bytes_qci in u.qtx_bytes.items():
qflows_live.add(qci)
qf = ue.qci_flows.get(qci)
if qf is None:
qf = ue.qci_flows[qci] = _QCI_Flow()
# share/distribute #tx transport blocks over all QCIs.
#
# Consider two streams "x" and "o" and how LTE scheduler might
# place them into resource map: if the streams have the same
# priority they might be scheduled e.g. as shown in case "a".
# However if "x" has higher priority compared to "o" the
# streams might be scheduled as shown in case "b":
#
# ^ ^
# RB│ x x o o RB│ x x o o
# │ o o x x │ x x o o
# │ x x o o │ x x o o
# │ o o x x │ x x o o
#
# ───────> ───────>
# time time
#
# case "a" case "b"
# same priority pri(x) > pri(o)
#
#
# Here overall #tx=4, but #tx(x) = 4 for case "a" and = 2 for case "b".
#
# -> without knowing QCI priorities and actual behaviour of LTE
# scheduler we can only estimate #tx(x) to be:
#
# tx_bytes(x)
# ───────────·#tx ≤ #tx(x) ≤ #tx
# Σtx_bytes
qtx_lo = tx_bytes_qci * tx_lo / tx_bytes
if qtx_lo > tx_hi: # e.g. 6.6 * 11308 / 11308 = 6.6 + ~1e-15
qtx_lo -= 1e-4
assert 0 < qtx_lo <= tx_hi, (qtx_lo, tx_hi, tx_bytes_qci, tx_bytes)
_ = qf.update(δt, tx_bytes_qci, qtx_lo, tx_hi, u.rank, u.xl_use_avg)
for sample in _:
qci_samples.setdefault(qci, []).append(sample)
# finish flows that did not get an update
for qci in set(ue.qci_flows.keys()):
if qci not in qflows_live:
qf = ue.qci_flows.pop(qci)
_ = qf.finish()
for sample in _:
qci_samples.setdefault(qci, []).append(sample)
# _QCI_Flow() creates new empty flow.
@func(_QCI_Flow)
def __init__(qf):
qf.tx_bytes = 0
qf.tx_time = 0
qf.tx_time_err = 0
# update updates flow with information that so many bytes were transmitted during
# δt with using #tx transport blocks somewhere in [tx_lo,tx_hi] and with
# specified rank. It is also known that overall average usage of resource
# blocks corresponding to tx direction in the resource map is xl_use_avg.
@func(_QCI_Flow)
def update(qf, δt, tx_bytes, tx_lo, tx_hi, rank, xl_use_avg): # -> []Sample
#_debug('QF.update %.2ftti %5db %.1f-%.1ftx %drank %.2fuse' % (δt/tti, tx_bytes, tx_lo, tx_hi, rank, xl_use_avg))
tx_lo /= rank # normalize TB to TTI (if it is e.g. 2x2 mimo, we have 2x more transport blocks)
tx_hi /= rank
vout = []
s = qf._update(δt, tx_bytes, tx_lo, tx_hi, xl_use_avg)
if s is not None:
vout.append(s)
return vout
@func(_QCI_Flow)
def _update(qf, δt, tx_bytes, tx_lo, tx_hi, xl_use_avg): # -> ?Sample
assert tx_bytes > 0
δt_tti = δt / tti
tx_lo = min(tx_lo, δt_tti) # protection (should not happen)
tx_hi = min(tx_hi, δt_tti) # protection (should not happen)
# tx time is somewhere in [tx, δt_tti]
if xl_use_avg < 0.9:
# not congested: it likely took the time to transmit ≈ #tx
pass
else:
# potentially congested: we don't know how much congested it is and
# which QCIs are affected more and which less
# -> all we can say tx_time is only somewhere in between limits
tx_hi = δt_tti
tx_time = (tx_lo + tx_hi) / 2 * tti
tx_time_err = (tx_hi - tx_lo) / 2 * tti
cont = (qf.tx_time != 0) # if this update is continuing current sample
qf.tx_bytes += tx_bytes
qf.tx_time += tx_time
qf.tx_time_err += tx_time_err
# if we are continuing the sample, it might be that current update is either small or big.
# - if it is big - the sample continues.
# - if it is not big - it coalesces and ends the sample.
# NOTE: without throwing away last tti the overall throughput statistics
# stays the same irregardless of whether we do coalesce small txes or not.
if cont and tx_hi < 0.9*δt_tti:
s = qf._sample()
qf.tx_bytes = 0
qf.tx_time = 0
qf.tx_time_err = 0
return s
return None
# finish tells the flow that no updates will be coming anymore.
@func(_QCI_Flow)
def finish(qf): # -> []Sample
#_debug('QF.finish')
vout = []
if qf.tx_time != 0:
s = qf._sample()
qf.tx_bytes = 0
qf.tx_time = 0
qf.tx_time_err = 0
vout.append(s)
return vout
# _sample creates new Sample from what accumulated in the flow.
@func(_QCI_Flow)
def _sample(qf):
s = Sample()
s.tx_bytes = qf.tx_bytes
s.tx_time = qf.tx_time
s.tx_time_err = qf.tx_time_err
assert s.tx_bytes > 0 and \
s.tx_time > 0 and \
s.tx_time_err >= 0 and \
s.tx_time - s.tx_time_err > 0 \
, s
#_debug(" ", s)
return s
# _BitSync creates new empty bitsync.
@func(_BitSync)
def __init__(s):
s.txq = []
s.i_txq = 0
s.i_lshift = 0
# next feeds next (δt, tx_bytes, tx) into bitsync.
#
# and returns ready parts of adjusted stream.
@func(_BitSync)
def next(s, δt, tx_bytes, tx, X): # -> [](δt', tx_bytes', tx', X')
s.txq.append((δt, tx_bytes, tx, X))
# XXX for simplicity we currently handle sync in between only current and
# next frames. That is enough to support FDD. TODO handle next-next case to support TDD
#
# XXX for simplicity we also assume all δt are ~ 10·tti and do not generally handle them
# TODO handle arbitrary δt
# shift #tx to the left:
#
# in previous frame₁ we saw that transmitting tx_bytes₁ resulted in tx₁
# transport blocks in that frame. In the next frame we saw tx_bytes₂
# transmission and tx₂ transport blocks. That tx₂ is the sum of transport
# blocks a) acknowledged in frame₂, but originally transmitted in frame₁,
# and b) transmitted in frame₂ and acknowledged in that same frame₂:
#
# tx_bytes₁ tx₁
# tx_bytes₂ tx₂ = t₂(1) + t₂(2)
#
# we can estimate t₂(2) by assuming that tx_bytes transmission results in
# proportional #tx in that frame. i.e.
#
# tx₁ t₂(2)
# ───────── = ─────────
# tx_bytes₁ tx_bytes₂
#
# and then having t₂(2) we can know t₂(1) = tx₂-t₂(2).
#
# The result of transport blocks associated with frame₁ is tx₁+t₂(1).
def lshift(i):
#print(' > lshift', i, s.txq)
assert s.i_txq <= i < s.i_txq + len(s.txq)
i -= s.i_txq
δt1, b1, t1, X1 = s.txq[i]
δt2, b2, t2, X2 = s.txq[i+1]
if b1 != 0:
t22 = b2*t1/b1
else:
t22 = t2
t21 = t2-t22
if t21 > 0:
# e.g. b₁=1000 t₁=10, b₂=1000, t₂=0 yields t21=-10
t1 += t21 # move t21 from frame₂ -> frame₁
t2 -= t21
assert t1 >= 0, t1
assert t2 >= 0, t2
s.txq[i] = (δt1, b1, t1, X1)
s.txq[i+1] = (δt2, b2, t2, X2)
#print(' < lshift ', s.txq)
while s.i_lshift+1 < s.i_txq + len(s.txq):
lshift(s.i_lshift)
s.i_lshift += 1
# we are close to be ready to yield txq[0].
# yield it, after balancing #tx again a bit, since ^^^ procedure can yield
# t=0 for b!=0 e.g. for
#
# 1000 0
# 1000 10
# 0 0
vout = []
while len(s.txq) >= 3:
s._rebalance(2)
_ = s.txq.pop(0)
s.i_txq += 1
vout.append(_)
return vout
# finish tells bitsync to flush its output queue.
#
# the bitsync becomes reset.
@func(_BitSync)
def finish(s): # -> [](δt', tx_bytes', tx', X')
assert len(s.txq) < 3
s._rebalance(len(s.txq))
vout = s.txq
s.txq = []
return vout
# _rebalance redistributes tx_i in .txq[:l] proportional to tx_bytes_i:
#
# We adjust #tx as follows: consider 3 transmission entries that each sent
# b_i bytes and yielded t_i for #tx. We want to adjust t_i -> t'_i so that
# t'_i correlates with b_i and that whole transmission time stays the same:
#
# b₁ t₁ t'₁
# b₂ t₂ -> t'₂ t'_i = α·b_i Σt' = Σt
# b₃ t₃ t'₃
#
# that gives
#
# Σt
# α = ──
# Σb
#
# and has the effect of moving #tx from periods with tx_bytes=0, to periods
# where transmission actually happened (tx_bytes > 0).
@func(_BitSync)
def _rebalance(s, l):
#print(' > rebalance', s.txq[:l])
assert l <= len(s.txq)
assert l <= 3
Σb = sum(_[1] for _ in s.txq[:l])
Σt = sum(_[2] for _ in s.txq[:l])
if Σb != 0:
for i in range(l):
δt_i, b_i, t_i, X_i = s.txq[i]
t_i = b_i * Σt / Σb
assert t_i >= 0, t_i
s.txq[i] = (δt_i, b_i, t_i, X_i)
#print(' < rebalance', s.txq[:l])
# __repr__ returns human-readable representation of Sample.
@func(Sample)
def __repr__(s):
def div(a,b):
if b != 0:
return a/b
return float('inf') if a != 0 else \
float('nan')
t_lo = s.tx_time - s.tx_time_err
t_hi = s.tx_time + s.tx_time_err
b_lo = div(s.tx_bytes*8, t_hi)
b_hi = div(s.tx_bytes*8, t_lo)
return "Sample(%db, %.1f ±%.1ftti)\t# %.0f ±%.0f bit/s" % \
(s.tx_bytes, s.tx_time/tti, s.tx_time_err/tti, div(s.tx_bytes*8, s.tx_time), (b_hi - b_lo)/2)
# ----------------------------------------
# _x_stats_srv provides server for x.drb_stats queries.
#
# To do so it polls eNB every 10ms at 100Hz frequency with `ue_get[stats]`
# and tries to further improve accuracy of retrieved DL/UL samples timing
# towards 1ms via heuristic on how much transport blocks were tx/rx'ed
# during each observation.
#
# This heuristic can be used unless eNB is congested. To detect congestion
# _x_stats_srv also polls eNB with `stats` at the same 100Hz frequency and
# synchronized in time with `ue_get[stats]`. The congestion is detected by
# dl_use_avg / ul_use_avg being close to 1.
#
# Since we can detect only the fact of likely congestion, but not the level
# of congestion, nor other details related to QCIs priorities, for congested
# case the heuristic is not used and throughput is reported via rough, but
# relatively true, interval estimates.
#
# NOTE we cannot go polling to higher than 100Hz frequency, since enb
# rate-limits websocket requests to execute not faster than 10ms each.
@func
def _x_stats_srv(ctx, reqch: chan, conn: amari.Conn):
δt_rate = 10*tti
# rx_ue_get_stats sends `ue_get[stats]` request and returns server response.
rtt_ue_stats = _IncStats() # time it takes to send ue_get and to receive response
δt_ue_stats = _IncStats() # δ(ue_stats.timestamp)
t_ue_stats = None # last ue_stats.timestamp
def rx_ue_get_stats(ctx): # -> ue_stats
nonlocal t_ue_stats
t_tx = time.now()
ue_stats = conn.req(ctx, 'ue_get', {'stats': True})
t_rx = time.now()
rtt_ue_stats.add(t_rx-t_tx)
t = ue_stats['utc']
if t_ue_stats is not None:
δt_ue_stats.add(t-t_ue_stats)
t_ue_stats = t
return ue_stats
# rx_stats sends `stats` request and returns server response.
# we need to query stats to get dl_use/ul_use.
# Establish separate connection for that since if we use the same conn for
# both ue_get and stats queries, due to overall 100Hz rate-limiting, ue_get
# would be retrieved at only 50Hz rate. With separate connection for stats
# we can retrieve both ue_get and stats each at 100Hz simultaneously.
conn_stats = amari.connect(ctx, conn.wsuri)
defer(conn_stats.close)
rtt_stats = _IncStats() # like rtt_ue_stats but for stat instead of ue_get
δt_stats = _IncStats() # δ(stats.timestamp)
t_stats = None # last stats.timestamp
def rx_stats(ctx): # -> stats
nonlocal t_stats
t_tx = time.now()
stats = conn_stats.req(ctx, 'stats', {})
t_rx = time.now()
rtt_stats.add(t_rx-t_tx)
t = stats['utc']
if t_stats is not None:
δt_stats.add(t-t_stats)
t_stats = t
return stats
# issue first dummy stats. It won't report most of statistics due to
# initial_delay=0, but it will make the next stats query avoid pausing for 0.4s.
conn_stats.req(ctx, 'stats', {'initial_delay': 0})
# rx_all simultaneously issues `ue_get[stats]` and `stats` requests and returns server responses.
# the requests are issued synchronized in time.
δ_ue_stats = _IncStats() # ue_stats.timestamp - stats.timestamp
def rx_all(ctx): # -> ue_stats, stats
uq = chan(1)
sq = chan(1)
_, _rx = select(
ctx.done().recv, # 0
(ueget_reqch.send, uq), # 1
)
if _ == 0:
raise ctx.err()
_, _rx = select(
ctx.done().recv, # 0
(stats_reqch.send, sq), # 1
)
if _ == 0:
raise ctx.err()
ue_stats = stats = None
while ue_stats is None or stats is None:
_, _rx = select(
ctx.done().recv, # 0
uq.recv, # 1
sq.recv, # 2
)
if _ == 0:
raise ctx.err()
if _ == 1:
ue_stats = _rx
uq = nilchan
if _ == 2:
stats = _rx
sq = nilchan
δ_ue_stats.add(ue_stats['utc'] - stats['utc'])
return ue_stats, stats
ueget_reqch = chan()
def Trx_ue_get(ctx):
while 1:
_, _rx = select(
ctx.done().recv, # 0
ueget_reqch.recv, # 1
)
if _ == 0:
raise ctx.err()
retq = _rx
ue_stats = rx_ue_get_stats(ctx)
retq.send(ue_stats) # cap = 1
stats_reqch = chan()
def Trx_stats(ctx):
while 1:
_, _rx = select(
ctx.done().recv, # 0
stats_reqch.recv, # 1
)
if _ == 0:
raise ctx.err()
retq = _rx
stats = rx_stats(ctx)
retq.send(stats) # cap = 1
# Tmain is the main thread that drives the process overall
def Tmain(ctx):
nonlocal rtt_ue_stats, δt_ue_stats
nonlocal rtt_stats, δt_stats
nonlocal δ_ue_stats
t_req = time.now()
ue_stats, stats = rx_all(ctx)
S = Sampler(ue_stats, stats)
qci_Σdl = {} # qci -> _Σ for dl
qci_Σul = {} # ----//---- for ul
class _Σ:
__slots__ = (
'tx_bytes',
'tx_time',
'tx_time_err',
'tx_time_notailtti',
'tx_time_notailtti_err',
'tx_nsamples',
)
def __init__(Σ):
for x in Σ.__slots__:
setattr(Σ, x, 0)
# account accounts samples into Σtx_time/Σtx_bytes in qci_Σ.
def account(qci_Σ, qci_samples):
for qci, samplev in qci_samples.items():
Σ = qci_Σ.get(qci)
if Σ is None:
Σ = qci_Σ[qci] = _Σ()
for s in samplev:
# do not account short transmissions
# ( tx with 1 tti should be ignored per standard, but it is
# also that small ICMP messages span 2 transport blocks sometimes )
t_lo = s.tx_time - s.tx_time_err
t_hi = s.tx_time + s.tx_time_err
if t_hi <= 1*tti or (t_hi <= 2 and s.tx_bytes < 1000):
continue
Σ.tx_nsamples += 1
Σ.tx_bytes += s.tx_bytes
Σ.tx_time += s.tx_time
Σ.tx_time_err += s.tx_time_err
# also aggregate .tx_time without tail tti (IP Throughput KPI needs this)
tt_hi = math.ceil(t_hi/tti - 1) # in tti
tt_lo = t_lo / tti # in tti
if tt_lo > 1:
tt_lo = math.ceil(tt_lo - 1)
tt = (tt_lo + tt_hi) / 2
tt_err = (tt_hi - tt_lo) / 2
Σ.tx_time_notailtti += tt * tti
Σ.tx_time_notailtti_err += tt_err * tti
while 1:
# TODO explicitly detect underrun?
_, _rx = select(
ctx.done().recv, # 0
reqch.recv, # 1
default, # 2
)
if _ == 0:
raise ctx.err()
if _ == 1:
# client requests to retrieve message for accumulated data
opts, respch = _rx
# TODO verify/handle opts?
# wrap-up flows and account finalized samples
qci_dl, qci_ul = S.finish()
account(qci_Σdl, qci_dl)
account(qci_Σul, qci_ul)
_debug()
_debug('rtt_ue: %s ms' % rtt_ue_stats .str('%.2f', time.millisecond))
_debug('δt_ue: %s ms' % δt_ue_stats .str('%.2f', time.millisecond))
_debug('rtt_stats: %s ms' % rtt_stats .str('%.2f', time.millisecond))
_debug('δt_stats: %s ms' % δt_stats .str('%.2f', time.millisecond))
_debug('δ(ue,stat): %s ms' % δ_ue_stats .str('%.2f', time.millisecond))
qci_dict = {}
Σ0 = _Σ()
for qci in set(qci_Σdl.keys()) .union(qci_Σul.keys()):
Σdl = qci_Σdl.get(qci, Σ0)
Σul = qci_Σul.get(qci, Σ0)
qci_dict[qci] = {
'dl_tx_bytes': Σdl.tx_bytes,
'dl_tx_time': Σdl.tx_time,
'dl_tx_time_err': Σdl.tx_time_err,
'dl_tx_time_notailtti': Σdl.tx_time_notailtti,
'dl_tx_time_notailtti_err': Σdl.tx_time_notailtti_err,
'dl_tx_nsamples': Σdl.tx_nsamples,
'ul_tx_bytes': Σul.tx_bytes,
'ul_tx_time': Σul.tx_time,
'ul_tx_time_err': Σul.tx_time_err,
'ul_tx_time_notailtti': Σul.tx_time_notailtti,
'ul_tx_time_notailtti_err': Σul.tx_time_notailtti_err,
'u;_tx_nsamples': Σul.tx_nsamples,
}
r = {'time': ue_stats['time'],
'utc': ue_stats['utc'],
'qci_dict': qci_dict,
'δt_ueget': {
'min': δt_ue_stats.min,
'avg': δt_ue_stats.avg(),
'max': δt_ue_stats.max,
'std': δt_ue_stats.std(),
},
'δ_ueget_vs_stats': {
'min': δ_ue_stats.min,
'avg': δ_ue_stats.avg(),
'max': δ_ue_stats.max,
'std': δ_ue_stats.std(),
},
}
respch.send(r)
# reset
qci_Σdl = {}
qci_Σul = {}
rtt_ue_stats = _IncStats()
δt_ue_stats = _IncStats()
rtt_stats = _IncStats()
δt_stats = _IncStats()
δ_ue_stats = _IncStats()
# sync time to keep t_req' - t_req ≈ δt_rate
# this should automatically translate to δt(ue_stats) ≈ δt_rate
t = time.now()
δtsleep = δt_rate - (t - t_req)
if δtsleep > 0:
time.sleep(δtsleep)
# retrieve ue_get[stats] and stats data for next frame from enb
t_req = time.now()
ue_stats, stats = rx_all(ctx)
# pass data to sampler and account already detected samples
qci_dl, qci_ul = S.add(ue_stats, stats)
account(qci_Σdl, qci_dl)
account(qci_Σul, qci_ul)
# run everything
wg = sync.WorkGroup(ctx)
wg.go(Trx_ue_get)
wg.go(Trx_stats)
wg.go(Tmain)
wg.wait()
# _IncStats incrementally computes statistics on provided values.
#
# Provide values via .add().
# Retrieve statistical properties via .avg/.std/.var/.min/.max .
class _IncStats:
__slots__ = (
'n', # number of samples seen so far
'μ', # current mean
'σ2', # ~ current variance
'min', # current min / max
'max',
)
def __init__(s):
s.n = 0
s.μ = 0.
s.σ2 = 0.
s.min = +float('inf')
s.max = -float('inf')
def add(s, x):
# https://www.johndcook.com/blog/standard_deviation/
s.n += 1
μ_ = s.μ # μ_{n-1}
s.μ += (x - μ_)/s.n
s.σ2 += (x - μ_)*(x - s.μ)
s.min = min(s.min, x)
s.max = max(s.max, x)
def avg(s):
if s.n == 0:
return float('nan')
return s.μ
def var(s):
if s.n == 0:
return float('nan')
return s.σ2 / s.n # note johndcook uses / (s.n-1) to unbias
def std(s):
return math.sqrt(s.var())
def __str__(s):
return s.str('%s', 1)
def str(s, fmt, scale):
t = "min/avg/max/σ "
if s.n == 0:
t += "?/?/? ±?"
else:
μ = s.avg() / scale
σ = s.std() / scale
min = s.min / scale
max = s.max / scale
f = "%s/%s/%s ±%s" % ((fmt,)*4)
t += f % (min, μ, max, σ)
return t
# ----------------------------------------
__debug = False
def _debug(*argv):
if __debug:
print(*argv, file=sys.stderr)
# -*- coding: utf-8 -*-
# Copyright (C) 2023 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com>
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your
# option) any later version, as published by the Free Software Foundation.
#
# You can also Link and Combine this program with other software covered by
# the terms of any of the Free Software licenses or any of the Open Source
# Initiative approved licenses and Convey the resulting work. Corresponding
# source of such a combination shall include the source code for all other
# software used.
#
# This program is distributed WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See COPYING file for full licensing terms.
# See https://www.nexedi.com/licensing for rationale and options.
from xlte.amari.drb import _Sampler, Sample, _BitSync, tti, _IncStats
import numpy as np
from golang import func
# tSampler, UE, Etx and S provide infrastructure for testing _Sampler:
# Etx represents transmission on erab with qci of tx_bytes.
class Etx:
def __init__(etx, erab_id, qci, tx_bytes, tx_total=False):
etx.erab_id = erab_id
etx.qci = qci
etx.tx_bytes = tx_bytes
etx.tx_total = tx_total
# UE represents one entry about an UE in ue_get[stats].ue_list .
class UE:
def __init__(ue, ue_id, tx, retx, *etxv, ri=1):
for _ in etxv:
assert isinstance(_, Etx)
ue.ue_id = ue_id
ue.tx = tx
ue.retx = retx
ue.etxv = etxv
ue.ri = ri
# tSampler provides testing environment for _Sampler.
#
# For easier testing and contrary to _Sampler collected samples are returned as
# a whole from final get, not incrementally.
class tSampler:
def __init__(t, *uev, use_bitsync=False, use_ri=False):
t.tstats = _tUEstats()
ue_stats0, stats0 = t.tstats.next(0, *uev)
t.sampler = _Sampler('zz', ue_stats0, stats0, use_bitsync=use_bitsync, use_ri=use_ri)
t.qci_samples = {} # in-progress collection until final get
def add(t, δt_tti, *uev):
ue_stats, stats = t.tstats.next(δt_tti, *uev)
qci_samples = t.sampler.add(ue_stats, stats)
t._update_qci_samples(qci_samples)
def get(t):
qci_samples = t.sampler.finish()
t._update_qci_samples(qci_samples)
qci_samples = t.qci_samples
t.qci_samples = {}
return qci_samples
def _update_qci_samples(t, qci_samples):
for (qci, samplev) in qci_samples.items():
t.qci_samples.setdefault(qci, []).extend(samplev)
# _tUEstats provides environment to generate test ue_get[stats].
class _tUEstats:
def __init__(t):
t.τ = 0
t.tx_total = {} # (ue,erab) -> tx_total_bytes
# next returns next (ue_stats, stats) with specified ue transmissions
def next(t, δτ_tti, *uev):
for _ in uev:
assert isinstance(_, UE)
t.τ += δτ_tti * tti
tx_total = t.tx_total
t.tx_total = {} # if ue/erab is missing in ue_stats, its tx_total is reset
ue_list = []
ue_stats = {
'time': t.τ,
'utc': 100 + t.τ,
'ue_list': ue_list
}
for ue in uev:
erab_list = []
ue_list.append({
'enb_ue_id': ue.ue_id, # TODO test both 4G and 5G flavours
'cells': [
{
'cell_id': 1,
'ri': ue.ri,
'zz_tx': ue.tx,
'zz_retx': ue.retx,
}
],
'erab_list': erab_list,
})
for etx in ue.etxv:
efkey = (ue.ue_id, etx.erab_id)
etx_total = etx.tx_bytes
if not etx.tx_total:
etx_total += tx_total.get(efkey, 0)
t.tx_total[efkey] = etx_total
erab_list.append({
'erab_id': etx.erab_id,
'qci': etx.qci,
'zz_total_bytes': etx_total,
})
stats = {
'time': ue_stats['time'],
'utc': ue_stats['utc'],
'cells': {
'1': {
'zz_use_avg': 0.1 # TODO add test for congested case
}
}
}
return ue_stats, stats
# S is shortcut to create Sample.
def S(tx_bytes, tx_time_tti):
if isinstance(tx_time_tti, tuple):
τ_lo, τ_hi = tx_time_tti
else:
τ_lo = τ_hi = tx_time_tti
s = Sample()
s.tx_bytes = tx_bytes
s.tx_time = (τ_lo + τ_hi) / 2 * tti
s.tx_time_err = (τ_hi - τ_lo) / 2 * tti
return s
# -------- tests --------
# test_Sampler1 verifies Sampler on single erab/qci flows.
def test_Sampler1():
# _ constructs tSampler, feeds tx stats into it and returns yielded Samples.
#
# tx_statsv = [](δt_tti, tx_bytes, #tx, #retx)
#
# only 1 ue, 1 qci and 1 erab are used in this test to verify the tricky
# parts of the Sampler in how single flow is divided into samples. The other
# tests verify how Sampler handles other aspects - e.g. multiple erabs,
# multiple qci, etc...
def _(*tx_statsv, bitsync=None): # -> []Sample
def b(bitsync):
t = tSampler(use_bitsync=bitsync)
for (δt_tti, tx_bytes, tx, retx) in tx_statsv:
t.add(δt_tti, UE(17, tx, retx, Etx(23, 4, tx_bytes)))
qci_samplev = t.get()
if len(qci_samplev) == 0:
return []
assert set(qci_samplev.keys()) == {4}
return qci_samplev[4]
boff = None # verify with both bitsync=off/on if bitsync=None
bon = None
if bitsync is None or (not bitsync):
boff = b(False)
if bitsync is None or bitsync:
bon = b(True)
if bitsync is None:
assert boff == bon
return bon if bitsync else boff
# δt_tti tx_bytes #tx #retx
assert _() == []
assert _((10, 1000, 1, 0)) == [S(1000, 1)]
assert _((10, 1000, 2, 0)) == [S(1000, 2)]
assert _((10, 1000, 3, 0)) == [S(1000, 3)]
for tx in range(2,10+1):
assert _((10,1000, tx, 0)) == [S(1000, tx)]
assert _((10, 1000, 1, 1)) == [S(1000, 2)] # 1 tx + 1 retx = 2 TTI
assert _((10, 1000, 1, 2)) == [S(1000, 3)] # tx_time is estimated via (tx+retx)
for tx in range(1,10+1):
for retx in range(1,10-tx+1):
assert _((10,1000, tx, retx)) == [S(1000, tx+retx)]
assert _((10, 1000, 77, 88)) == [S(1000, 10)] # tx_time ≤ δt (bug in #tx / #retx)
# coalesce/wrap-up 2 frames
def _2tx(tx1, tx2): return _((10, 100*tx1, tx1, 0),
(10, 100*tx2, tx2, 0))
assert _2tx(4, 3) == [S(700,7)] # small tx1 and tx2: coalesce as if tx1 comes in the end of frame₁
assert _2tx(4, 4) == [S(800,8)] # and tx2 in the beginning of frame₂
assert _2tx(4, 5) == [S(900,9)] # ----//----
assert _2tx(3, 5) == [S(800,8)] # ...
assert _2tx(2, 5) == [S(700,7)]
assert _2tx(5, 4) == [S(900,9)]
assert _2tx(5, 3) == [S(800,8)]
assert _2tx(5, 2) == [S(700,7)]
assert _2tx(10, 0) == [S(1000,10)] # full + no tx
assert _2tx(10, 1) == [S(1100,11)] # full + 1 tti tx
assert _2tx(10, 2) == [S(1200,12)] # full + 2 ttis
for tx2 in range(2,10+1):
assert _2tx(10, tx2) == [S((10+tx2)*100, 10+tx2)]
# coalesce/wrap-up 3 frames: small tx + med-full + small tx
def _3tx(tx1, tx2, tx3): return _((10, 100*tx1, tx1, 0),
(10, 100*tx2, tx2, 0),
(10, 100*tx3, tx3, 0))
assert _3tx(4, 0, 3) == [S(400,4), S(300,3)] # empty middle
assert _3tx(4, 1, 3) == [S(500,5), S(300,3)] # middle only 1 tti - coalesced to left
assert _3tx(4, 2, 3) == [S(600,6), S(300,3)] # middle small - coalesced to left
assert _3tx(4, 3, 3) == [S(700,7), S(300,3)] # ----//----
assert _3tx(4, 4, 3) == [S(800,8), S(300,3)] # ----//----
assert _3tx(4, 8, 3) == [S(1200,12), S(300,3)] # ----//----
assert _3tx(4, 9, 3) == [S(1600,16)] # middle big - coalesced to left and right
assert _3tx(4,10, 3) == [S(1700,17)] # ----//----
# coalesce/wrap-up 4 frames: small tx + med-full + med-full + small tx
def _4tx(tx1, tx2, tx3, tx4): return _((10, 100*tx1, tx1, 0),
(10, 100*tx2, tx2, 0),
(10, 100*tx3, tx3, 0),
(10, 100*tx4, tx4, 0))
assert _4tx(4, 0, 0, 3) == [S(400,4), S(300,3)] # empty m1, m2
assert _4tx(4, 1, 0, 3) == [S(500,5), S(300,3)] # m1 - only 1 tti - coalesces to left
assert _4tx(4, 0, 1, 3) == [S(400,4), S(400,4)] # m2 - only 1 tti - coalesces to right
assert _4tx(4, 2, 0, 3) == [S(600,6), S(300,3)] # m1 small - coalesces to left
assert _4tx(4, 0, 2, 3) == [S(400,4), S(500,5)] # m2 small - coalesces to right
assert _4tx(4, 3, 4, 3) == [S(700,7), S(700,7)] # m1 and m2 small - m1 coalesces to left, m2 to right
assert _4tx(4, 9, 4, 3) == [S(400+900+400,4+9+4), S(300,3)] # m1 big - coalesces s1 and m2
assert _4tx(4, 3, 9, 3) == [S(700,7), S(1200,12)] # m2 big - it only starts new sample and coalesces to right
assert _4tx(4, 9,10, 3) == [S(400+900+1000+300,4+9+10+3)] # m1 and m2 big - all coalesces
# zero #tx
# this might happen even with bitsync if finish divides the stream at an
# unfortunate moment e.g. as follows:
#
# 1000 0
# <-- finish
# 0 10
assert _((10, 1000, 0, 0)) == [S(1000, (1,10))]
# bitsync lightly (BitSync itself is verified in details in test_BitSync)
def b(*btx_statsv):
tx_statsv = []
for (tx_bytes, tx) in btx_statsv: # note: no δt_tti, #retx
tx_statsv.append((10, tx_bytes, tx, 0))
return _(*tx_statsv, bitsync=True)
# tx_bytes #tx
assert b() == []
assert b((1000, 0)) == [S(1000, (1,10))]
assert b((1000, 0),
(0, 10)) == [S(1000, 10)]
assert b((1000, 4), # 4
( 500, 8), # 6 2
(1000, 7), # 3 4
( 0, 6), # 6
( 0, 0)) == [S(1000+500,10+5), S(1000,10)]
# sampler starts from non-scratch - correctly detects δ for erabs.
def test_Sampler_start_from_nonscratch():
t = tSampler(UE(17, 0,0, Etx(23, 4, 10000, tx_total=True)))
t.add(10, UE(17, 10,0, Etx(23, 4, 123)))
assert t.get() == {4: [S(123,10)]}
# erab disappears and appears again -> tx_total_bytes is reset
def test_Sampler_erab_reestablish():
def ue(tx, *etxv): return UE(17, tx, 0, *etxv)
def etx(tx_bytes): return Etx(23, 4, tx_bytes, tx_total=True)
t = tSampler()
t.add(10, ue(2, etx(1000)))
t.add(10, ue(0, )) # erab disappears due to release
t.add(10, ue(10,etx(5000))) # erab reappears - tx_total_bytes handling restarted from scratch
assert t.get() == {4: [S(1000,2), S(5000,10)]}
# erab changes qci on the fly -> erab is considered to be reestablished
def test_Sampler_erab_change_qci():
def ue(tx, *etxv): return UE(17, tx, 0, *etxv)
def etx(qci, tx_bytes, **kw): return Etx(23, qci, tx_bytes, **kw)
t = tSampler()
t.add(10, ue(10, etx(9, 2000, tx_total=True))) # tx with qci=9
t.add(10, ue(10, etx(5, 3000, tx_total=True))) # tx with qci=5
assert t.get() == {9: [S(2000,10)], 5: [S(3000,10)]} # would be S(3000,20) if δqci was not handled
# erab is considered to be reestablished on decreased tx_total_bytes
def test_Sampler_tx_total_down():
def ue(tx, *etxv): return UE(17, tx, 0, *etxv)
def etx(tx_bytes, **kw): return Etx(23, 4, tx_bytes, **kw)
t = tSampler()
t.add(10, ue(10, etx(4000, tx_total=True)))
t.add(10, ue(10, etx(3000, tx_total=True)))
assert t.get() == {4: [S(7000,20)]} # would be e.g. S(4000,10) if tx_total_bytes↓ not handled
# N tx transport blocks is shared/distributed between multiple QCIs
#
# tx_lo ∼ tx_bytes / Σtx_bytes
# tx_hi = whole #tx even if tx_bytes are different
def test_Sampler_txtb_shared_between_qci():
def ue(tx, *etxv): return UE(17, tx, 0, *etxv)
t = tSampler()
t.add(10, ue(10, Etx(1, 9, 4000),
Etx(2, 5, 1000)))
assert t.get() == {9: [S(4000, (8,10))], 5: [S(1000, (2,10))]}
# multiple UE are correctly taken into account
def test_Sampler_multiple_ue():
def ue(ue_id, tx, *etxv): return UE(ue_id, tx, 0, *etxv)
def etx(tx_bytes): return Etx(23, 4, tx_bytes)
t = tSampler()
t.add(10, ue(17, 4, etx(1000)),
ue(18, 5, etx(2000)))
assert t.get() == {4: [S(1000,4), S(2000,5)]}
# rank affects DL max #TB/TTI (ul: no info)
def test_Sampler_rank():
def ue(tx, *etxv): return UE(17, tx, 0, *etxv, ri=2)
def etx(tx_bytes): return Etx(23, 4, tx_bytes)
t = tSampler(use_ri=True)
t.add(10, ue(3, etx(1000)))
assert t.get() == {4: [S(1000, 1.5)]} # tx_time=1.5, not 3
t.add(10, ue(10, etx(1000)))
assert t.get() == {4: [S(1000, 5)]} # tx_time=5, not 10
t.add(10, ue(10*2, etx(1000)))
assert t.get() == {4: [S(1000,10)]} # now tx_time=10
# verify that use_ri=False does not take ue.ri into account
t = tSampler(use_ri=False)
t.add(10, ue(3, etx(1000)))
assert t.get() == {4: [S(1000,3)]} # tx_time=3, not 1.5
# verify _BitSync works ok.
def test_BitSync():
# _ passes txv_in into _BitSync and returns output stream.
#
# txv_in = [](tx_bytes, #tx) ; δt=10·tti
def _(*txv_in):
def do_bitsync(*txv_in):
txv_out = []
xv_out = ''
bitsync = _BitSync()
for x, (tx_bytes, tx) in enumerate(txv_in):
_ = bitsync.next(10*tti, tx_bytes, tx,
chr(ord('a')+x))
for (δt, tx_bytes, tx, x_) in _:
assert δt == 10*tti
txv_out.append((tx_bytes, tx))
xv_out += x_
_ = bitsync.finish()
for (δt, tx_bytes, tx, x_) in _:
assert δt == 10*tti
txv_out.append((tx_bytes, tx))
xv_out += x_
assert xv_out == 'abcdefghijklmnopqrstuvwxyz'[:len(txv_in)]
return txv_out
txv_out = do_bitsync(*txv_in)
# also check with 0-tail -> it should give the same
txv_out_ = do_bitsync(*(txv_in + ((0,0),)*10))
assert txv_out_ == txv_out + [(0,0)]*10
return txv_out
# tx_bytes tx
assert _((1000, 10), # all ACK in the same frame
( 0, 0),
( 0, 0)) == [(1000, 10),
( 0, 0),
( 0, 0)]
assert _((1000, 0), # all ACK in next frame
( 0, 10),
( 0, 0)) == [(1000, 10),
( 0, 0),
( 0, 0)]
#assert _((1000, 0), # all ACK in next-next frame
# ( 0, 0),
# ( 0, 10)) == [(1000, 10),
# ( 0, 0),
# ( 0, 0)]
assert _((1000, 2), # some ACK in the same frame, some in next
( 0, 8),
( 0, 0)) == [(1000, 10),
( 0, 0),
( 0, 0)]
#assert _((1000, 2), # some ACK in the same frame, some in next, some in next-next
# ( 0, 5),
# ( 0, 3)) == [(1000, 10),
# ( 0, 0),
# ( 0, 0)]
# 1000 1000
assert _((1000, 10), # consecutive transmission (ack in same)
(1000, 10),
( 500, 5),
( 0, 0),
( 0, 0)) == [(1000, 10),
(1000, 10),
( 500, 5),
( 0, 0),
( 0, 0)]
assert _((1000, 0), # consecutive transmission (ack in next)
(1000, 10),
( 500, 10),
( 0, 5),
( 0, 0)) == [(1000, 10),
(1000, 10),
( 500, 5),
( 0, 0),
( 0, 0)]
assert _((1000, 4), # consecutive transmission (ack scattered)
(1000, 10), # 6 4
( 500, 8), # 6 2
( 0, 3), # 3
( 0, 0)) == [(1000, 10),
(1000, 10),
( 500, 5),
( 0, 0),
( 0, 0)]
#assert _((1000, 2), # consecutive transmission (ack scattered to next and next-next)
# (1000, 8), # 5 3
# ( 500, 8), # 3 5 0
# ( 0, 6), # 2 4
# ( 0, 1), # 1
# ( 0, 0)) == [(1000, 10),
# (1000, 10),
# ( 500, 5),
# ( 0, 0),
# ( 0, 0)]
# 1000 500 1000
assert _((1000, 10), # consecutive transmission (ack in same)
( 500, 5),
(1000, 10),
( 0, 0),
( 0, 0)) == [(1000, 10),
( 500, 5),
(1000, 10),
( 0, 0),
( 0, 0)]
assert _((1000, 0), # consecutive transmission (ack in next)
( 500, 10),
(1000, 5),
( 0, 10),
( 0, 0)) == [(1000, 10),
( 500, 5),
(1000, 10),
( 0, 0),
( 0, 0)]
assert _((1000, 4), # consecutive transmission (ack scattered)
( 500, 8), # 6 2
(1000, 7), # 3 4
( 0, 6), # 6
( 0, 0)) == [(1000, 10),
( 500, 5),
(1000, 10),
( 0, 0),
( 0, 0)]
#assert _((1000, 2), # consecutive transmission (ack scattered to next and next-next)
# ( 500, 8), # 5 3
# (1000, 5), # 3 1 1
# ( 0, 5), # 1 4
# ( 0, 5), # 5
# ( 0, 0)) == [(1000, 10),
# ( 500, 5),
# (1000, 10),
# ( 0, 0),
# ( 0, 0)]
# transmission is scattered to two frames with all acks only in the second frame
assert _((1000, 0),
(1000, 10)) == [(1000, 5),
(1000, 5)]
assert _((1000, 0),
(1000, 10),
( 0, 0)) == [(1000, 5),
(1000, 5),
( 0, 0)]
assert _((1000, 0), # steady tx (ack in next)
(1000, 10),
( 500, 10),
( 500, 5),
( 500, 5),
( 0, 5),
( 0, 0)) == [(1000, 10),
(1000, 10),
( 500, 5),
( 500, 5),
( 500, 5),
( 0, 0),
( 0, 0)]
#assert _((1000, 0), # steady tx (ack in next-next)
# (1000, 0),
# ( 500, 10),
# ( 500, 10),
# ( 500, 5),
# ( 0, 5),
# ( 0, 5),
# ( 0, 0)) == [(1000, 10),
# (1000, 10),
# ( 500, 5),
# ( 500, 5),
# ( 500, 5),
# ( 0, 0),
# ( 0, 0),
# ( 0, 0)]
assert _((1000, 10), # yields t21 < 0 in lshift
(1000, 0),
( 0, 10)) == [(1000, 10),
(1000, 10),
( 0, 0)]
# real-life example
assert _(( 6168, 0),
(14392, 8),
( 0, 0)) == [( 6168, 2.4),
(14392, 5.6),
( 0, 0 )]
# ---- misc ----
# teach tests to compare Samples
@func(Sample)
def __eq__(a, b):
if not isinstance(b, Sample):
return False
# compare tx_time with tolerance to level-out floating point errors
return (abs(a.tx_time - b.tx_time) < (tti / 1e6)) and \
(a.tx_bytes == b.tx_bytes)
def test_incstats():
X = list(3+_ for _ in range(20))
Xs = _IncStats()
for (n,x) in enumerate(X):
Xs.add(x)
Xn = X[:n+1]
assert Xs.avg() == np.mean(Xn)
assert Xs.std() == np.std(Xn)
assert Xs.min == min(Xn)
assert Xs.max == max(Xn)
# -*- coding: utf-8 -*-
# Copyright (C) 2022 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com>
# Copyright (C) 2022-2023 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com>
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your
......@@ -46,9 +46,13 @@ class LogMeasure:
# ._rxlog IO reader for enb.xlog
# ._rlog IO reader for enb.log
#
# ._event currently handled xlog.Event | LogError | None
# ._stats currently handled xlog.Message with last read stats result | None
# ._m kpi.Measurement being prepared covering [_stats_prev, _stats) | None
# ._estats \/ last xlog.Message with read stats result
# \/ last xlog.Event | LogError
# \/ None
# ._m kpi.Measurement being prepared covering [_estats_prev, _estats) | None
# ._m_next kpi.Measurement being prepared covering [_estats, _estats_next) | None
#
# ._drb_stats last xlog.Message with x.drb_stats | None ; reset on error|event
pass
......@@ -61,9 +65,10 @@ class LogMeasure:
def __init__(logm, rxlog, rlog):
logm._rxlog = xlog.Reader(rxlog)
logm._rlog = rlog
logm._event = None
logm._stats = None
logm._estats = None
logm._m = None
logm._m_next = None
logm._drb_stats = None
# close releases resources associated with LogMeasure and closes underlying readers.
@func(LogMeasure)
......@@ -77,6 +82,7 @@ def close(logm):
# It reads data from enb.xlog (TODO and enb.log) as needed.
@func(LogMeasure)
def read(logm): # -> kpi.Measurement | None
_trace('\n\n LogMeasure.read')
m = logm._read()
_trace(' <-', m)
return m
......@@ -99,41 +105,42 @@ def _read(logm):
#
#
# (*) see kpi.Measurement documentation for more details about init/fini correction.
m = None # kpi.Measurement to return
while 1:
_trace()
_trace('._event:\t', logm._event)
_trace('._stats:\t', logm._stats)
_trace('._m: \t', logm._m)
# flush the queue fully at an error or an event, e.g. at "service detach".
event = logm._event
if event is not None:
# <- M for [stats_prev, stats)
_trace('m: \t', m)
_trace('._m: \t', logm._m)
_trace('._estats:\t', logm._estats)
_trace('._m_next:\t', logm._m_next)
_trace('._drb_stats:\t', logm._drb_stats)
if m is not None:
return m
# flush the queue at an error or an event, e.g. at "service detach".
estats = logm._estats
if isinstance(estats, (xlog.Event, LogError)):
# <- M for [estats_prev, estats)
m = logm._m
if m is not None:
logm._m = None
return m
# <- M(ø) for [stats, event)
stats = logm._stats
if stats is not None:
logm._stats = None
if event.timestamp is not None:
m = kpi.Measurement()
m['X.Tstart'] = stats.timestamp
m['X.δT'] = event.timestamp - stats.timestamp
return m
# note ._m_next is not flushed:
# if ._m_next != None - it remains initialized with X.Tstart = estats.timestamp
# <- error|EOF
if isinstance(event, LogError):
logm._event = None
if event is LogError.EOF:
if isinstance(estats, LogError):
logm._estats = None
if estats is LogError.EOF:
return None
raise event
raise estats
# queue should be fully flushed now
assert logm._stats is None
assert logm._m is None
# event might remain non-none, e.g. "service detach", but not an error
assert isinstance(event, xlog.Event)
# queue should be flushed now till including estats with
# event remaining non-none, e.g. "service detach", but not an error
assert logm._m is None
assert isinstance(logm._estats, xlog.Event)
assert isinstance(logm._m_next, kpi.Measurement)
assert logm._m_next['X.Tstart'] == logm._estats.timestamp
# fetch next entry from xlog
......@@ -145,35 +152,53 @@ def _read(logm):
if x is None:
x = LogError.EOF # represent EOF as LogError
if isinstance(x, LogError):
logm._event = x # it is ok to forget previous event after e.g. bad line with ParseError
continue # flush the queue
elif isinstance(x, xlog.Event):
event_prev = logm._event
logm._event = x
if event_prev is None:
continue # flush
# <- M(ø) for [event_prev, event)
assert event_prev.timestamp is not None # LogErrors are raised after queue flush
m = kpi.Measurement()
m['X.Tstart'] = event_prev.timestamp
m['X.δT'] = x.timestamp - event_prev.timestamp
return m
assert isinstance(x, xlog.Message)
if x.message != "stats":
continue
# handle messages that update current Measurement
if isinstance(x, xlog.Message):
if x.message == "x.drb_stats":
logm._handle_drb_stats(x)
continue
if x.message != "stats":
continue # ignore other messages
# it is an error, event or stats.
# if it is an event or stats -> finalize timestamp for _m_next.
# start building next _m_next covering [x, x_next).
# shift m <- ._m <- ._m_next <- (new Measurement | None for LogError)
# a LogError throws away preceding Measurement and does not start a new one after it
if logm._m_next is not None:
if not isinstance(x, LogError):
logm._m_next['X.δT'] = x.timestamp - logm._m_next['X.Tstart']
else:
logm._m_next = None # throw it away on seeing e.g. "stats, error"
m = logm._m
logm._m = logm._m_next
if not isinstance(x, LogError):
logm._m_next = kpi.Measurement()
logm._m_next['X.Tstart'] = x.timestamp # note X.δT remains NA until next stats|event
else:
logm._m_next = None
if isinstance(x, (xlog.Event, LogError)):
logm._estats = x # it is ok to forget previous event after e.g. bad line with ParseError
logm._drb_stats = None # reset ._drb_stats at an error or event
continue # flush the queue
m = logm._read_stats(x)
if m is not None:
return m
assert isinstance(x, xlog.Message)
assert x.message == "stats"
logm._handle_stats(x, m)
# NOTE _handle_stats indicates logic error in x by setting ._estats to
# LogError instead of stats. However those LogErrors come with
# timestamp and are thus treated similarly to events: we do not throw
# away neither ._m, nor ._m_next like we do with LogErrors that
# represent errors at the log parsing level.
continue
# _read_stats handles next stats xlog entry upon _read request.
# _handle_stats handles next stats xlog entry upon _read request.
@func(LogMeasure)
def _read_stats(logm, stats: xlog.Message): # -> kpi.Measurement|None(to retry)
def _handle_stats(logm, stats: xlog.Message, m_prev: kpi.Measurement):
# build Measurement from stats' counters.
#
# we take δ(stats_prev, stat) and process it mapping Amarisoft counters to
......@@ -207,37 +232,26 @@ def _read_stats(logm, stats: xlog.Message): # -> kpi.Measurement|None(to retry)
try:
_stats_check(stats)
except LogError as e:
event_prev = logm._event
logm._event = e
if event_prev is not None:
# <- M(ø) for [event, bad_stats)
m = kpi.Measurement()
m['X.Tstart'] = event_prev.timestamp
m['X.δT'] = stats.timestamp - event_prev.timestamp
return m
return None # flush
logm._estats = e # stays M(ø) for [estats_prev, bad_stats)
return
# stats is pre-checked to be good. push it to the queue.
stats_prev = logm._stats
logm._stats = stats
# first stats after service attach -> M(ø)
if stats_prev is None:
event_prev = logm._event
if event_prev is not None:
# <- M(ø) for [event, stats)
logm._event = None
m = kpi.Measurement()
m['X.Tstart'] = event_prev.timestamp
m['X.δT'] = stats.timestamp - event_prev.timestamp
return m
return None
estats_prev = logm._estats
logm._estats = stats
# we have 2 adjacent stats. Start building new Measurement from their δ.
# first stats after e.g. service attach -> stays M(ø) for [event_prev, stats)
if estats_prev is None:
return
if isinstance(estats_prev, (xlog.Event, LogError)):
return
assert isinstance(estats_prev, xlog.Message)
assert estats_prev.message == "stats"
stats_prev = estats_prev
# we have 2 adjacent stats. Adjust corresponding Measurement from their δ.
# do init/fini correction if there was also third preceding stats message.
m = kpi.Measurement() # [stats_prev, stats)
m['X.Tstart'] = stats_prev.timestamp
m['X.δT'] = stats.timestamp - stats_prev.timestamp
m = logm._m.copy() # [stats_prev, stats)
# δcc(counter) tells how specified cumulative counter changed since last stats result.
def δcc(counter):
......@@ -250,8 +264,8 @@ def _read_stats(logm, stats: xlog.Message): # -> kpi.Measurement|None(to retry)
# m_initfini populates m[init] and m[fini] from vinit and vfini values.
# copy of previous ._m[fini] is correspondingly adjusted for init/fini correction.
p = None
if logm._m is not None:
p = logm._m.copy()
if m_prev is not None:
p = m_prev.copy()
def m_initfini(init, vinit, fini, vfini):
m[init] = vinit
m[fini] = vfini
......@@ -303,13 +317,14 @@ def _read_stats(logm, stats: xlog.Message): # -> kpi.Measurement|None(to retry)
_ = e
e = LogError(stats.timestamp, "internal failure")
e.__cause__ = _
logm._stats = None
logm._event = e
return None
logm._estats = e
return
# all adjustments and checks are over.
logm._m = m # we can now remember pre-built Measurement for current stats,
return p # and return adjusted previous measurement, if it was there.
logm._m = m # we can now remember our Measurement adjustments for current stats,
if m_prev is not None: # and commit adjustments to previous measurement, if it was there.
m_prev.put((0,), p) # copy m_prev <- p
return
# _stats_check verifies stats message to have required structure.
......@@ -346,6 +361,109 @@ def _stats_cc(stats: xlog.Message, counter: str):
return cc_dict['messages'].get(counter, 0)
# _handle_drb_stats handles next x.drb_stats xlog entry upon _read request.
@func(LogMeasure)
def _handle_drb_stats(logm, drb_stats: xlog.Message):
# TODO precheck for correct message structure similarly to _stats_check
drb_stats_prev = logm._drb_stats
logm._drb_stats = drb_stats
# first drb_stats after an event - we don't know which time period it covers
if drb_stats_prev is None:
return
assert isinstance(drb_stats_prev, xlog.Message)
assert drb_stats_prev.message == "x.drb_stats"
# time coverage for current drb_stats
τ_lo = drb_stats_prev.timestamp
τ_hi = drb_stats.timestamp
δτ = τ_hi - τ_lo
# see with which ._m or ._m_next, if any, drb_stats overlaps with ≥ 50% of
# time first, and update that measurement correspondingly.
if not (δτ > 0):
return
if logm._m is not None:
m_lo = logm._m['X.Tstart']
m_hi = m_lo + logm._m['X.δT']
d = max(0, min(τ_hi, m_hi) -
max(τ_lo, m_lo))
if d >= δτ/2: # NOTE ≥ 50%, not > 50% not to skip drb_stats if fill is exactly 50%
_drb_update(logm._m, drb_stats)
return
if logm._m_next is not None:
n_lo = logm._m_next['X.Tstart']
# n_hi - don't know as _m_next['X.δT'] is ø yet
d = max(0, τ_hi -
max(τ_lo, n_lo))
if d >= δτ/2:
_drb_update(logm._m_next, drb_stats)
return
# _drb_update updates Measurement from dl/ul DRB statistics related to measurement's time coverage.
def _drb_update(m: kpi.Measurement, drb_stats: xlog.Message):
# TODO Exception -> LogError("internal failure") similarly to _handle_stats
qci_trx = drb_stats.get1("qci_dict", dict)
for dir in ('dl', 'ul'):
qvol = m['DRB.IPVol%s.QCI' % dir.capitalize()]
qtime = m['DRB.IPTime%s.QCI' % dir.capitalize()]
qtime_err = m['XXX.DRB.IPTime%s_err.QCI' % dir.capitalize()]
# qci_dict carries entries only for qci's with non-zero values, but if
# we see drb_stats we know we have information for all qcis.
# -> pre-initialize to zero everything
if kpi.isNA(qvol).all(): qvol[:] = 0
if kpi.isNA(qtime).all(): qtime[:] = 0
if kpi.isNA(qtime_err).all(): qtime_err[:] = 0
for qci_str, trx in qci_trx.items():
qci = int(qci_str)
# DRB.IPVol and DRB.IPTime are collected to compute throughput.
#
# thp = ΣB*/ΣT* where B* is tx'ed bytes in the sample without taking last tti into account
# and T* is time of tx also without taking that sample's tail tti.
#
# we only know ΣB (whole amount of tx), ΣT and ΣT* with some error.
#
# -> thp can be estimated to be inside the following interval:
#
# ΣB ΣB
# ───── ≤ thp ≤ ───── (1)
# ΣT_hi ΣT*_lo
#
# the upper layer in xlte.kpi will use the following formula for
# final throughput calculation:
#
# DRB.IPVol
# thp = ────────── (2)
# DRB.IPTime
#
# -> set DRB.IPTime and its error to mean and δ of ΣT_hi and ΣT*_lo
# so that (2) becomes (1).
# FIXME we account whole PDCP instead of only IP traffic
ΣB = trx['%s_tx_bytes' % dir]
ΣT = trx['%s_tx_time' % dir]
ΣT_err = trx['%s_tx_time_err' % dir]
ΣTT = trx['%s_tx_time_notailtti' % dir]
ΣTT_err = trx['%s_tx_time_notailtti_err' % dir]
ΣT_hi = ΣT + ΣT_err
ΣTT_lo = ΣTT - ΣTT_err
qvol[qci] = 8*ΣB # in bits
qtime[qci] = (ΣT_hi + ΣTT_lo) / 2
qtime_err[qci] = (ΣT_hi - ΣTT_lo) / 2
# LogError(timestamp|None, *argv).
@func(LogError)
def __init__(e, τ, *argv):
......
# -*- coding: utf-8 -*-
# Copyright (C) 2022 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com>
# Copyright (C) 2022-2023 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com>
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your
......@@ -19,9 +19,9 @@
# See https://www.nexedi.com/licensing for rationale and options.
from xlte.amari.kpi import LogMeasure, LogError, _trace as trace
from xlte.kpi import Measurement
from xlte.kpi import Measurement, isNA
from golang import func, defer, b
import io, json
import io, json, re
from pytest import raises
......@@ -61,6 +61,7 @@ class tLogMeasure:
# xlog appends one line to enb.xlog.
def xlog(t, line):
trace('xlog += %s' % line)
line = b(line)
assert b'\n' not in line
pos = t._fxlog.tell()
......@@ -71,7 +72,7 @@ class tLogMeasure:
# _mok_init reinitializes ._mok with Measurement defaults.
def _mok_init(t):
t._mok = Measurement()
# init fields handled by amari.kpi to 0
# init fields extracted by amari.kpi from stats to 0
# this will be default values to verify against
for field in (
'RRC.ConnEstabAtt.sum',
......@@ -90,6 +91,24 @@ class tLogMeasure:
def expect1(t, field, vok):
if t._mok is None:
t._mok_init()
# if a particular X.QCI[qci] is expected - default all other qcis to 0
_ = re.match(r"^(.*)\.([0-9]+)$", field)
if _ is not None:
farr = "%s.QCI" % _.group(1)
if isNA(t._mok[farr]).all():
t._mok[farr][:] = 0
# also automatically initialize XXX.DRB.IPTimeX_err to 0.01 upon seeing DRB.IPTimeX
# ( in tests we use precise values for tx_time and tx_time_notailtti
# with δ=0.02 - see drb_trx and jdrb_stats)
n = _.group(1)
if n.startswith('DRB.IPTime'):
ferr = "XXX.%s_err" % n
if isNA(t._mok[ferr+'.QCI']).all():
t._mok[ferr+'.QCI'][:] = 0
t._mok["%s.%s" % (ferr, _.group(2))] = ((vok + 0.01) - (vok - 0.01)) / 2 # ≈ 0.01
t._mok[field] = vok
# expect_nodata requests to verify all fields besides timestamp-related to be NA.
......@@ -121,24 +140,12 @@ def test_LogMeasure():
_ = t.expect1
# empty stats after first attach
t.xlog( jstats(0.7, {}) )
t.xlog( jstats(1, {}) )
_('X.Tstart', 0.02)
_('X.δT', 0.7-0.02)
_('X.δT', 1-0.02)
t.expect_nodata()
t.read()
# note: no t.read() - see tstats
# further empty stats
t.xlog( jstats(1.0, {}) )
_('X.Tstart', 0.7)
_('X.δT', 1-0.7)
_('RRC.ConnEstabAtt.sum', 0)
_('RRC.ConnEstabSucc.sum', 0)
_('S1SIG.ConnEstabAtt', 0)
_('S1SIG.ConnEstabSucc', 0)
_('ERAB.EstabInitAttNbr.sum', 0)
_('ERAB.EstabInitSuccNbr.sum', 0)
_('ERAB.EstabAddAttNbr.sum', 0)
_('ERAB.EstabAddSuccNbr.sum', 0)
# tstats is the verb to check handling of stats message.
#
......@@ -194,6 +201,33 @@ def test_LogMeasure():
τ_logm += 1
counters_prev = {} # reset
# tdrb_stats is the verb to verify handling of x.drb_stats message.
#
# it xlogs drb stats with given δτ relative to either previous (δτ > 0) or
# next (δτ < 0) stats or event.
def tdrb_stats(δτ, qci_trx):
if δτ >= 0:
τ = τ_xlog + δτ # after previous stats or event
else:
τ = τ_xlog+1 + δτ # before next stats or event
trace('\n>>> tdrb_stats τ: %s τ_xlog: %s τ_logm: %s' % (τ, τ_xlog, τ_logm))
t.xlog( jdrb_stats(τ, qci_trx) )
# further empty stats
tstats({})
_('X.Tstart', 1)
_('X.δT', 1)
_('RRC.ConnEstabAtt.sum', 0)
_('RRC.ConnEstabSucc.sum', 0)
_('S1SIG.ConnEstabAtt', 0)
_('S1SIG.ConnEstabSucc', 0)
_('ERAB.EstabInitAttNbr.sum', 0)
_('ERAB.EstabInitSuccNbr.sum', 0)
_('ERAB.EstabAddAttNbr.sum', 0)
_('ERAB.EstabAddSuccNbr.sum', 0)
# RRC.ConnEstab
#
......@@ -262,6 +296,70 @@ def test_LogMeasure():
_('ERAB.EstabAddSuccNbr.sum', 2)
# DRB.IPVol / DRB.IPTime (testing all variants of stats/x.drb_stats interaction)
tδstats({})
tδstats({}) # ──S₁·d₁─────S₂·d₂─────S₃·d₃──
tdrb_stats(+0.1, {1: drb_trx(1.1,10, 1.2,20),
11: drb_trx(1.3,30, 1.4,40)})
# nothing here - d₁ comes as the first drb_stats
tδstats({}) # S₂
tdrb_stats(+0.1, {2: drb_trx(2.1,100, 2.2,200), # d₂ is included into S₁-S₂
22: drb_trx(2.3,300, 2.4,400)})
_('DRB.IPTimeDl.2', 2.1); _('DRB.IPVolDl.2', 8*100)
_('DRB.IPTimeUl.2', 2.2); _('DRB.IPVolUl.2', 8*200)
_('DRB.IPTimeDl.22', 2.3); _('DRB.IPVolDl.22', 8*300)
_('DRB.IPTimeUl.22', 2.4); _('DRB.IPVolUl.22', 8*400)
tδstats({}) # S₃
tdrb_stats(+0.1, {3: drb_trx(3.1,1000, 3.2,2000), # d₃ is included int S₂-S₃
33: drb_trx(3.3,3000, 3.4,4000)})
_('DRB.IPTimeDl.3', 3.1); _('DRB.IPVolDl.3', 8*1000)
_('DRB.IPTimeUl.3', 3.2); _('DRB.IPVolUl.3', 8*2000)
_('DRB.IPTimeDl.33', 3.3); _('DRB.IPVolDl.33', 8*3000)
_('DRB.IPTimeUl.33', 3.4); _('DRB.IPVolUl.33', 8*4000)
tdrb_stats(-0.1, {1: drb_trx(1.1,11, 1.2,12)}) # ──S·d─────d·S─────d·S──
tδstats({}) # cont↑
_('DRB.IPTimeDl.1', 1.1); _('DRB.IPVolDl.1', 8*11)
_('DRB.IPTimeUl.1', 1.2); _('DRB.IPVolUl.1', 8*12)
tdrb_stats(-0.1, {2: drb_trx(2.1,21, 2.2,22)})
tδstats({})
_('DRB.IPTimeDl.2', 2.1); _('DRB.IPVolDl.2', 8*21)
_('DRB.IPTimeUl.2', 2.2); _('DRB.IPVolUl.2', 8*22)
tdrb_stats(-0.1, {3: drb_trx(3.1,31, 3.2,32)}) # ──d·S─────d·S─────d·S·d──
tδstats({}) # cont↑
_('DRB.IPTimeDl.3', 3.1); _('DRB.IPVolDl.3', 8*31)
_('DRB.IPTimeUl.3', 3.2); _('DRB.IPVolUl.3', 8*32)
tdrb_stats(-0.1, {4: drb_trx(4.1,41, 4.2,42)})
tδstats({})
tdrb_stats(+0.1, {5: drb_trx(5.1,51, 5.2,52)})
_('DRB.IPTimeDl.4', 4.1); _('DRB.IPVolDl.4', 8*41)
_('DRB.IPTimeUl.4', 4.2); _('DRB.IPVolUl.4', 8*42)
_('DRB.IPTimeDl.5', 5.1); _('DRB.IPVolDl.5', 8*51)
_('DRB.IPTimeUl.5', 5.2); _('DRB.IPVolUl.5', 8*52)
tdrb_stats(+0.5, {6: drb_trx(6.1,61, 6.2,62)}) # ──d·S·d──d──S───d──S──
tδstats({}) # cont↑
_('DRB.IPTimeDl.6', 6.1); _('DRB.IPVolDl.6', 8*61)
_('DRB.IPTimeUl.6', 6.2); _('DRB.IPVolUl.6', 8*62)
tdrb_stats(+0.51,{7: drb_trx(7.1,71, 7.2,72)})
tδstats({})
_('DRB.IPTimeDl.7', 7.1); _('DRB.IPVolDl.7', 8*71)
_('DRB.IPTimeUl.7', 7.2); _('DRB.IPVolUl.7', 8*72)
tdrb_stats(-0.1, {8: drb_trx(8.1,81, 8.2,82)}) # combined d + S with nonzero counters
tδstats({'s1_initial_context_setup_request': +3, # d──S────d·S──
's1_initial_context_setup_response': +2}) # cont↑
_('DRB.IPTimeDl.8', 8.1); _('DRB.IPVolDl.8', 8*81)
_('DRB.IPTimeUl.8', 8.2); _('DRB.IPVolUl.8', 8*82)
_('S1SIG.ConnEstabAtt', 3)
_('S1SIG.ConnEstabSucc', 2)
_('ERAB.EstabInitAttNbr.sum', 3) # currently same as S1SIG.ConnEstab
_('ERAB.EstabInitSuccNbr.sum', 2) # ----//----
# service detach/attach, connect failure, xlog failure
tδstats({}) # untie from previous history
i, f = 'rrc_connection_request', 'rrc_connection_setup_complete'
......@@ -373,17 +471,25 @@ def test_LogMeasure_badinput():
" but only single-cell configurations are supported"):
t.read()
tbadcell(11, 0)
read_nodata(11, 1)
tbadcell(12, 0)
read_nodata(12, 1)
tbadcell(13, 2)
read_nodata(13, 1)
tbadcell(14, 3)
def tbadstats(τ, error):
with raises(LogError, match="t%s: stats: %s" % (τ, error)):
t.read()
read_nodata(14, 7)
tbadstats(21, ":10/cells/1 no `counters`")
read_nodata(21, 1)
tbadstats(22, ":11/cells/1/counters no `messages`")
read_nodata(22, 1)
tbadstats(23, ":12/ no `counters`")
read_nodata(23, 1)
tbadstats(24, ":13/counters no `messages`")
read_nodata(24, 7)
readok(31, 5) # 31-32
def tbadline():
......@@ -409,23 +515,24 @@ def test_LogMeasure_cc_wraparound():
t.xlog( jstats(1, {}) )
t.xlog( jstats(2, {cc: 13}) )
t.xlog( jstats(3, {cc: 12}) ) # cc↓ - should be reported
t.xlog( jstats(4, {cc: 140}) ) # cc↑↑ - should should start afresh
t.xlog( jstats(4, {cc: 140}) ) # cc↑↑ - should start afresh
t.xlog( jstats(5, {cc: 150}) )
def readok(τ, CC_value):
_('X.Tstart', τ)
_('X.δT', 1)
_(CC, CC_value)
_('X.δT', int(τ+1)-τ)
if CC_value is not None:
_(CC, CC_value)
else:
t.expect_nodata()
t.read()
_('X.Tstart', 0.02) # attach-1
_('X.δT', 0.98)
t.expect_nodata()
t.read()
readok(0.02, None) # attach-1
readok(1, 13) # 1-2
readok(2, None) # 2-3 M(ø)
with raises(LogError, match=r"t3: cc %s↓ \(13 → 12\)" % cc):
t.read() # 2-3
t.read() # 2-3 raise
readok(3, None) # 3-4 M(ø)
readok(4, 10) # 4-5
......@@ -456,6 +563,52 @@ def test_jstats():
'{"message": "stats", "utc": 123.4, "cells": {"1": {"counters": {"messages": {"rrc_x": 1, "rrc_z": 3}}}}, "counters": {"messages": {"s1_y": 2, "x2_zz": 4}}}'
# jdrb_stats, similarly to jstats, returns json-encoded x.drb_stats message
# corresponding to per-QCI dl/ul tx_time/tx_bytes.
def jdrb_stats(τ, qci_dlul): # -> str
qci_dlul = qci_dlul.copy()
for qci, dlul in qci_dlul.items():
assert isinstance(dlul, dict)
assert set(dlul.keys()) == {"dl_tx_bytes", "dl_tx_time", "dl_tx_time_notailtti",
"ul_tx_bytes", "ul_tx_time", "ul_tx_time_notailtti"}
dlul["dl_tx_time_err"] = 0 # original time is simulated to be
dlul["ul_tx_time_err"] = 0 # measured precisely in tess.
dlul["dl_tx_time_notailtti_err"] = 0 # ----//----
dlul["ul_tx_time_notailtti_err"] = 0 #
s = {
"message": "x.drb_stats",
"utc": τ,
"qci_dict": qci_dlul,
}
return json.dumps(s)
def test_jdrb_stats():
# NOTE json encodes 5 and 9 keys are strings, not integers
x = 0.01
assert jdrb_stats(100, {5: drb_trx(0.1,1234, 0.2,4321),
9: drb_trx(1.1,7777, 1.2,8888)}) == ( \
'{"message": "x.drb_stats", "utc": 100, "qci_dict":' + \
' {"5": {"dl_tx_bytes": 1234, "dl_tx_time": %(0.1+x)s, "dl_tx_time_notailtti": %(0.1-x)s,' + \
' "ul_tx_bytes": 4321, "ul_tx_time": %(0.2+x)s, "ul_tx_time_notailtti": %(0.2-x)s,' + \
' "dl_tx_time_err": 0, "ul_tx_time_err": 0, "dl_tx_time_notailtti_err": 0, "ul_tx_time_notailtti_err": 0},' + \
' "9": {"dl_tx_bytes": 7777, "dl_tx_time": 1.11, "dl_tx_time_notailtti": 1.09,' + \
' "ul_tx_bytes": 8888, "ul_tx_time": 1.21, "ul_tx_time_notailtti": 1.19,' + \
' "dl_tx_time_err": 0, "ul_tx_time_err": 0, "dl_tx_time_notailtti_err": 0, "ul_tx_time_notailtti_err": 0}' + \
'}}') % {
'0.1-x': 0.1-x, '0.1+x': 0.1+x, # working-around float impreciseness
'0.2-x': 0.2-x, '0.2+x': 0.2+x,
}
# drb_trx returns dict describing dl/ul transmissions of a data radio bearer.
# such dict is used as per-QCI entry in x.drb_stats
def drb_trx(dl_tx_time, dl_tx_bytes, ul_tx_time, ul_tx_bytes):
return {"dl_tx_bytes": dl_tx_bytes, "dl_tx_time": dl_tx_time + 0.01, "dl_tx_time_notailtti": dl_tx_time - 0.01,
"ul_tx_bytes": ul_tx_bytes, "ul_tx_time": ul_tx_time + 0.01, "ul_tx_time_notailtti": ul_tx_time - 0.01}
# ionone returns empty data source.
def ionone():
return io.BytesIO(b'')
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Based on https://lab.nexedi.com/nexedi/zodbtools/blob/master/zodbtools/zodb.py
# Copyright (C) 2017-2022 Nexedi SA and Contributors.
# Copyright (C) 2017-2023 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com>
# Jérome Perrin <jerome@nexedi.com>
#
......@@ -30,6 +30,10 @@ import getopt
import importlib
import sys
from golang import func, defer, chan, go
from golang import context, os as gos, syscall
from golang.os import signal
# command_name -> command_module
command_dict = {}
......@@ -97,6 +101,7 @@ def help(argv):
sys.exit(2)
@func
def main():
try:
optv, argv = getopt.getopt(sys.argv[1:], "h", ["help"])
......@@ -127,7 +132,24 @@ def main():
print("Run 'xamari help' for usage.", file=sys.stderr)
sys.exit(2)
return command_module.main(argv)
# SIGINT/SIGTERM -> ctx cancel
ctx, cancel = context.with_cancel(context.background())
sigq = chan(1, dtype=gos.Signal)
signal.Notify(sigq, syscall.SIGINT, syscall.SIGTERM)
def _():
signal.Stop(sigq)
sigq.close()
defer(_)
def _(cancel):
sig, ok = sigq.recv_()
if not ok:
return
print("# %s" % sig, file=sys.stderr)
cancel()
go(_, cancel)
defer(cancel)
return command_module.main(ctx, argv)
if __name__ == '__main__':
......
# -*- coding: utf-8 -*-
# Copyright (C) 2022 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com>
# Copyright (C) 2022-2023 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com>
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your
......@@ -24,7 +24,7 @@
- use Reader to read logged information from xlog.
(*) for example result of stats, ue_get and erab_get queries.
(*) for example result of stats, ue_get, erab_get and synthetic queries.
"""
# XLog protocol
......@@ -59,11 +59,12 @@
from xlte import amari
from xlte.amari import drb
import json
import traceback
from golang import func, defer
from golang import time
from golang import func, defer, chan, select
from golang import context, sync, time
from golang.gcompat import qq
import logging; log = logging.getLogger('xlte.amari.xlog')
......@@ -124,7 +125,7 @@ class LogSpec:
# xlog queries service @wsuri periodically according to queries specified by
# logspecv and logs the result.
def xlog(wsuri, logspecv):
def xlog(ctx, wsuri, logspecv):
xl = _XLogger(wsuri, logspecv)
slogspecv = ' '.join(['%s' % _ for _ in logspecv])
......@@ -132,8 +133,10 @@ def xlog(wsuri, logspecv):
while 1:
try:
xl.xlog1()
xl.xlog1(ctx)
except Exception as ex:
if ctx.err() is not None:
raise
if not isinstance(ex, amari.ConnError):
log.exception('xlog failure:')
try:
......@@ -144,7 +147,7 @@ def xlog(wsuri, logspecv):
time.sleep(3)
# _XLogger serves xlog implementation.
class _XLogger:
def __init__(xl, wsuri, logspecv):
xl.wsuri = wsuri
......@@ -152,6 +155,7 @@ class _XLogger:
# emit saves line to the log.
def emit(xl, line):
assert isinstance(line, str)
assert '\n' not in line, line
print(line, flush=True)
......@@ -164,10 +168,10 @@ class _XLogger:
# xlog1 performs one cycle of attach/log,log,log.../detach.
@func
def xlog1(xl):
def xlog1(xl, ctx):
# connect to the service
try:
conn = amari.connect(xl.wsuri)
conn = amari.connect(ctx, xl.wsuri)
except Exception as ex:
xl.jemit("service connect failure", {"reason": str(ex)})
if not isinstance(ex, amari.ConnError):
......@@ -180,22 +184,48 @@ class _XLogger:
"srv_type": conn.srv_type,
"srv_version": conn.srv_version}
xl.jemit("service attach", srv_info)
try:
xl._xlog1(conn)
except Exception as ex:
d = srv_info.copy()
d['reason'] = str(ex)
xl.jemit("service detach", d)
if not isinstance(ex, amari.ConnError):
def _():
try:
raise
except Exception as ex:
d = srv_info.copy()
d['reason'] = str(ex)
xl.jemit("service detach", d)
if not isinstance(ex, amari.ConnError):
raise
defer(_)
wg = sync.WorkGroup(ctx)
defer(wg.wait)
# spawn servers to handle queries with synthetic messages
xmsgsrv_dict = {}
for l in xl.logspecv:
if l.query in _xmsg_registry:
xsrv = _XMsgServer(l.query, _xmsg_registry[l.query])
xmsgsrv_dict[l.query] = xsrv
xsrv_ready = chan() # wait for xmsg._runCtx to be initialized
wg.go(xsrv.run, conn, xsrv_ready)
xsrv_ready.recv()
# spawn main logger
wg.go(xl._xlog1, conn, xmsgsrv_dict)
def _xlog1(xl, ctx, conn, xmsgsrv_dict):
# req_ queries either amari service directly, or an extra message service.
def req_(ctx, query, opts): # -> resp_raw
if query in xmsgsrv_dict:
query_xsrv = xmsgsrv_dict[query]
_, resp_raw = query_xsrv.req_(ctx, opts)
else:
_, resp_raw = conn.req_(ctx, query, opts)
return resp_raw
def _xlog1(xl, conn):
# emit config_get after attach
_, cfg_raw = conn.req_('config_get', {})
cfg_raw = req_(ctx, 'config_get', {})
xl.emit(cfg_raw)
# loop emitting requested logspecs
t0 = time.now()
tnextv = [0]*len(xl.logspecv) # [i] - next time to arm for logspecv[i] relative to t0
......@@ -230,12 +260,90 @@ class _XLogger:
tarm = t0 + tmin
δtsleep = tarm - tnow
if δtsleep > 0:
time.sleep(δtsleep)
_, resp_raw = conn.req_(logspec.query, opts)
_, _rx = select(
ctx.done().recv, # 0
time.after(δtsleep).recv, # 1
)
if _ == 0:
raise ctx.err()
resp_raw = req_(ctx, logspec.query, opts)
xl.emit(resp_raw)
# _XMsgServer represents a server for handling particular synthetic requests.
#
# for example the server for synthetic x.drb_stats query.
class _XMsgServer:
def __init__(xsrv, name, f):
xsrv.name = name # str message name, e.g. "x.drb_stats"
xsrv._func = f # func(ctx, conn) to run the service
xsrv._reqch = chan() # chan<respch> to send requests to the service
xsrv._runCtx = None # context not done while .run is running
# run runs the extra server on amari service attached to via conn.
@func
def run(xsrv, ctx, conn: amari.Conn, ready: chan):
xsrv._runCtx, cancel = context.with_cancel(ctx)
defer(cancel)
ready.close()
# establish dedicated conn2 so that server does not semantically
# affect requests issued by main logger. For example if we do not and
# main logger queries stats, and x.drb_stats server also queries stats
# internally, then data received by main logger will cover only small
# random period of time instead of full wanted period.
conn2 = amari.connect(ctx, conn.wsuri)
defer(conn2.close)
xsrv._func(ctx, xsrv._reqch, conn2)
# req queries the server and returns its response.
@func
def req_(xsrv, ctx, opts): # -> resp, resp_raw
origCtx = ctx
ctx, cancel = context.merge(ctx, xsrv._runCtx) # need only merge_cancel
defer(cancel)
respch = chan(1)
_, _rx = select(
ctx.done().recv, # 0
(xsrv._reqch.send, (opts, respch)), # 1
)
if _ == 0:
if xsrv._runCtx.err() and not origCtx.err():
raise RuntimeError("%s server is down" % xsrv.name)
raise ctx.err()
_, _rx = select(
ctx.done().recv, # 0
respch.recv, # 1
)
if _ == 0:
if xsrv._runCtx.err() and not origCtx.err():
raise RuntimeError("%s server is down" % xsrv.name)
raise ctx.err()
resp = _rx
r = {'message': xsrv.name} # place 'message' first
r.update(resp)
resp = r
resp_raw = json.dumps(resp,
separators=(',', ':'), # most compact, like Amari does
ensure_ascii=False) # so that e.g. δt comes as is
return resp, resp_raw
# @_xmsg registers func f to provide server for extra messages with specified name.
_xmsg_registry = {} # name -> xsrv_func(ctx, reqch, conn)
def _xmsg(name, f, doc1):
assert name not in _xmsg_registry
f.xlog_doc1 = doc1
_xmsg_registry[name] = f
_xmsg("x.drb_stats", drb._x_stats_srv, "retrieve statistics about data radio bearers")
# ----------------------------------------
# Reader wraps IO reader to read information generated by xlog.
......@@ -420,14 +528,21 @@ Example for <logspec>+:
stats[samples,rf]/30s ue_get[stats] erab_get/10s qos_flow_get
Besides queries supported by Amarisoft LTE stack natively, support for the
following synthetic queries is also provided:
%s
Options:
-h --help show this help
""" % LogSpec.DEFAULT_PERIOD, file=out)
""" % (LogSpec.DEFAULT_PERIOD,
'\n'.join(" %-14s %s" % (q, f.xlog_doc1)
for q, f in sorted(_xmsg_registry.items()))),
file=out)
def main(argv):
def main(ctx, argv):
try:
optv, argv = getopt.getopt(argv[1:], "h", ["help"])
except getopt.GetoptError as e:
......@@ -450,4 +565,4 @@ def main(argv):
for arg in argv[1:]:
logspecv.append( LogSpec.parse(arg) )
xlog(wsuri, logspecv)
xlog(ctx, wsuri, logspecv)
This source diff could not be displayed because it is too large. You can view the blob instead.
......@@ -11,7 +11,8 @@ from golang import func, defer
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
from datetime import datetime
from matplotlib import ticker
from datetime import datetime, timedelta
import sys
from urllib.request import urlopen
......@@ -45,37 +46,58 @@ def main():
# The data, as contained in the measurement log, is kept there in the form
# of kpi.Measurement, which is driver-independent representation for
# KPI-related measurement data.
mlog = kpi.MeasurementLog()
while 1:
m = alogm.read()
if m is None:
break
mlog.append(m)
# Step 3. Compute E-RAB Accessibility KPI over MeasurementLog with
# specified granularity period. We partition entries in the measurement log
# by specified time period, and further use kpi.Calc to compute the KPI
# over each period.
def load_measurements(alogm: akpi.LogMeasure) -> kpi.MeasurementLog:
mlog = kpi.MeasurementLog()
while 1:
m = alogm.read()
if m is None:
break
mlog.append(m)
return mlog
mlog = load_measurements(alogm)
# Step 3. Compute KPIs over MeasurementLog with specified granularity
# period. We partition entries in the measurement log by specified time
# period, and further use kpi.Calc to compute the KPIs over each period.
# calc_each_period partitions mlog data into periods and yields kpi.Calc for each period.
def calc_each_period(mlog: kpi.MeasurementLog, tperiod: float): # -> yield kpi.Calc
τ = mlog.data()[0]['X.Tstart']
for m in mlog.data()[1:]:
τ_ = m['X.Tstart']
if (τ_ - τ) >= tperiod:
calc = kpi.Calc(mlog, τ, τ+tperiod)
τ = calc.τ_hi
yield calc
tperiod = float(sys.argv[1])
vτ = []
vInititialEPSBEstabSR = []
vAddedEPSBEstabSR = []
vIPThp_qci = []
for calc in calc_each_period(mlog, tperiod):
vτ.append(calc.τ_lo)
_ = calc.erab_accessibility() # E-RAB Accessibility
vInititialEPSBEstabSR.append(_[0])
vAddedEPSBEstabSR .append(_[1])
τ = mlog.data()[0]['X.Tstart']
for m in mlog.data()[1:]:
τ_ = m['X.Tstart']
if (τ_ - τ) >= tperiod:
calc = kpi.Calc(mlog, τ, τ+tperiod)
vτ.append(calc.τ_lo)
τ = calc.τ_hi
_ = calc.erab_accessibility()
vInititialEPSBEstabSR.append(_[0])
vAddedEPSBEstabSR .append(_[1])
_ = calc.eutran_ip_throughput() # E-UTRAN IP Throughput
vIPThp_qci.append(_)
vτ = np.asarray([datetime.fromtimestamp(_) for _ in vτ])
vInititialEPSBEstabSR = np.asarray(vInititialEPSBEstabSR)
vAddedEPSBEstabSR = np.asarray(vAddedEPSBEstabSR)
vIPThp_qci = np.asarray(vIPThp_qci)
# Step 4. Plot computed KPI.
# The E-RAB Accessibility KPI has two parts: initial E-RAB establishment
# Step 4. Plot computed KPIs.
# 4a) The E-RAB Accessibility KPI has two parts: initial E-RAB establishment
# success rate, and additional E-RAB establishment success rate. kpi.Calc
# provides both of them in the form of their confidence intervals. The
# lower margin of the confidence interval coincides with 3GPP definition of
......@@ -94,37 +116,124 @@ def main():
#
# For each of the parts we plot both its lower margin and the whole
# confidence interval area.
fig, (ax1, ax2) = plt.subplots(2, 1, sharex=True, layout='constrained')
pmin, psec = divmod(tperiod, 60)
fig.suptitle("E-RAB Accessibility / %s%s" % ("%d'" % pmin if pmin else '',
'%d"' % psec if psec else ''))
ax1.set_title("Initial E-RAB establishment success rate")
ax2.set_title("Added E-RAB establishment success rate")
vτ = [datetime.fromtimestamp(_) for _ in vτ]
def plot1(ax, v, label): # plot1 plots KPI data from vector v on ax.
v = np.asarray(v)
ax.plot(vτ, v['lo'], drawstyle='steps-post', label=label)
ax.fill_between(vτ, v['lo'], v['hi'],
step='post', alpha=0.1, label='%s\nuncertainty' % label)
# 4b) The E-UTRAN IP Throughput KPI provides throughput measurements for
# all QCIs and does not have uncertainty. QCIs for which throughput data is
# all zeros are said to be silent and are not plotted.
plot1(ax1, vInititialEPSBEstabSR, "InititialEPSBEstabSR")
plot1(ax2, vAddedEPSBEstabSR, "AddedEPSBEstabSR")
fig = plt.figure(constrained_layout=True, figsize=(12,8))
facc, fthp = fig.subfigures(1, 2)
figplot_erab_accessibility (facc, vτ, vInititialEPSBEstabSR, vAddedEPSBEstabSR, tperiod)
figplot_eutran_ip_throughput(fthp, vτ, vIPThp_qci, tperiod)
plt.show()
for ax in (ax1, ax2):
ax.set_ylabel("%")
ax.set_ylim([0-10, 100+10])
ax.set_yticks([0,20,40,60,80,100])
xloc = mdates.AutoDateLocator()
xfmt = mdates.ConciseDateFormatter(xloc)
ax.xaxis.set_major_locator(xloc)
ax.xaxis.set_major_formatter(xfmt)
# ---- plotting routines ----
ax.grid(True)
ax.legend(loc='upper left')
# figplot_erab_accessibility plots E-RAB Accessibility KPI data on the figure.
def figplot_erab_accessibility(fig: plt.Figure, vτ, vInititialEPSBEstabSR, vAddedEPSBEstabSR, tperiod=None):
ax1, ax2 = fig.subplots(2, 1, sharex=True)
fig.suptitle("E-RAB Accessibility / %s" % (tpretty(tperiod) if tperiod is not None else
vτ_period_pretty(vτ)))
ax1.set_title("Initial E-RAB establishment success rate")
ax2.set_title("Added E-RAB establishment success rate")
plt.show()
plot_success_rate(ax1, vτ, vInititialEPSBEstabSR, "InititialEPSBEstabSR")
plot_success_rate(ax2, vτ, vAddedEPSBEstabSR, "AddedEPSBEstabSR")
# figplot_eutran_ip_throughput plots E-UTRAN IP Throughput KPI data on the figure.
def figplot_eutran_ip_throughput(fig: plt.Figure, vτ, vIPThp_qci, tperiod=None):
ax1, ax2 = fig.subplots(2, 1, sharex=True)
fig.suptitle("E-UTRAN IP Throughput / %s" % (tpretty(tperiod) if tperiod is not None else
vτ_period_pretty(vτ)))
ax1.set_title("Downlink")
ax2.set_title("Uplink")
ax1.set_ylabel("Mbit/s")
ax2.set_ylabel("Mbit/s")
v_qci = (vIPThp_qci .view(np.float64) / 1e6) \
.view(vIPThp_qci.dtype)
plot_per_qci(ax1, vτ, v_qci[:,:]['dl'], 'IPThp')
plot_per_qci(ax2, vτ, v_qci[:,:]['ul'], 'IPThp')
_, dmax = ax1.get_ylim()
_, umax = ax2.get_ylim()
ax1.set_ylim(ymin=0, ymax=dmax*1.05)
ax2.set_ylim(ymin=0, ymax=umax*1.05)
# plot_success_rate plots success-rate data from vector v on ax.
# v is array with Intervals.
def plot_success_rate(ax, vτ, v, label):
ax.plot(vτ, v['lo'], drawstyle='steps-post', label=label)
ax.fill_between(vτ, v['lo'], v['hi'],
step='post', alpha=0.1, label='%s\nuncertainty' % label)
ax.set_ylabel("%")
ax.set_ylim([0-10, 100+10])
ax.set_yticks([0,20,40,60,80,100])
fmt_dates_pretty(ax.xaxis)
ax.grid(True)
ax.legend(loc='upper left')
# plot_per_qci plots data from per-QCI vector v_qci.
#
# v_qci should be array[t, QCI].
# QCIs, for which v[:,qci] is all zeros, are said to be silent and are not plotted.
def plot_per_qci(ax, vτ, v_qci, label):
ax.set_xlim((vτ[0], vτ[-1])) # to have correct x range even if we have no data
assert len(v_qci.shape) == 2
silent = True
propv = list(plt.rcParams['axes.prop_cycle'])
for qci in range(v_qci.shape[1]):
v = v_qci[:, qci]
if (v['hi'] == 0).all(): # skip silent QCIs
continue
silent = False
prop = propv[qci % len(propv)] # to have same colors for same qci in different graphs
ax.plot(vτ, v['lo'], label="%s.%d" % (label, qci), **prop)
ax.fill_between(vτ, v['lo'], v['hi'], alpha=0.3, **prop)
if silent:
ax.plot([],[], ' ', label="all QCI silent")
fmt_dates_pretty(ax.xaxis)
ax.grid(True)
ax.legend(loc='upper left')
# fmt_dates_pretty instructs axis to use concise dates formatting.
def fmt_dates_pretty(axis):
xloc = mdates.AutoDateLocator()
xfmt = mdates.ConciseDateFormatter(xloc)
axis.set_major_locator(xloc)
axis.set_major_formatter(xfmt)
axis.set_minor_locator(ticker.AutoMinorLocator(5))
# tpretty returns pretty form for time, e.g. 1'2" for 62 seconds.
def tpretty(t):
tmin, tsec = divmod(t, 60)
return "%s%s" % ("%d'" % tmin if tmin else '',
'%d"' % tsec if tsec else '')
# vτ_period_pretty returns pretty form for time period in vector vτ.
# for example [2,5,8,11] gives 3'.
def vτ_period_pretty(vτ):
if len(vτ) < 2:
return "?"
s = timedelta(seconds=1)
δvτ = (vτ[1:] - vτ[:-1]) / s # in seconds
min = δvτ.min()
avg = δvτ.mean()
max = δvτ.max()
std = δvτ.std()
if min == max:
return tpretty(min)
return "%s ±%s [%s, %s]" % (tpretty(avg), tpretty(std), tpretty(min), tpretty(max))
if __name__ == '__main__':
......
# -*- coding: utf-8 -*-
# Copyright (C) 2022 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com>
# Copyright (C) 2022-2023 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com>
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your
......@@ -21,7 +21,7 @@
- Calc is KPI calculator. It can be instantiated on MeasurementLog and time
interval over which to perform computations. Use Calc methods such as
.erab_accessibility() to compute KPIs.
.erab_accessibility() and .eutran_ip_throughput() to compute KPIs.
- MeasurementLog maintains journal with result of measurements. Use .append()
to populate it with data.
......@@ -58,6 +58,7 @@ from golang import func
# following methods for computing 3GPP KPIs:
#
# .erab_accessibility() - TS 32.450 6.1.1 "E-RAB Accessibility"
# .eutran_ip_throughput() - TS 32.450 6.3.1 "E-UTRAN IP Throughput"
# TODO other KPIs
#
# Upon construction specified time interval is potentially widened to cover
......@@ -75,7 +76,7 @@ class Calc:
pass
# MeasurementLog represent journal of performed Measurements.
# MeasurementLog represents journal of performed Measurements.
#
# It semantically consists of
#
......@@ -178,11 +179,13 @@ class Measurement(np.void):
# TODO mean -> total + npkt?
#('DRB.IPLatDl.QCI', Ttime), # s 4.4.5.1 32.450:6.3.2 NOTE not ms
# DRB.IPThpX.QCI = DRB.IPThpVolX.QCI / DRB.IPThpTimeX.QCI
('DRB.IPThpVolDl.QCI', np.int64), # bit 4.4.6.1 32.450:6.3.1 NOTE not kbit
('DRB.IPThpVolUl.QCI', np.int64), # bit 4.4.6.2 32.450:6.3.1 NOTE not kbit
('DRB.IPThpTimeDl.QCI', Ttime), # s
('DRB.IPThpTimeUl.QCI', Ttime), # s
# DRB.IPThpX.QCI = DRB.IPVolX.QCI / DRB.IPTimeX.QCI 4.4.6.1-2 32.450:6.3.1
('DRB.IPVolDl.QCI', np.int64), # bit 4.4.6.3 32.450:6.3.1 NOTE not kbit
('DRB.IPVolUl.QCI', np.int64), # bit 4.4.6.4 32.450:6.3.1 NOTE not kbit
('DRB.IPTimeDl.QCI', Ttime), # s 4.4.6.5 32.450:6.3.1 NOTE not ms
('DRB.IPTimeUl.QCI', Ttime), # s 4.4.6.6 32.450:6.3.1 NOTE not ms
('XXX.DRB.IPTimeDl_err.QCI', Ttime), # s XXX error for DRB.IPTimeDl.QCI (will be removed)
('XXX.DRB.IPTimeUl_err.QCI', Ttime), # s XXX error for DRB.IPTimeUl.QCI (will be removed)
('RRU.CellUnavailableTime.CAUSE', Ttime), # s 4.5.6
......@@ -220,18 +223,24 @@ class Interval(np.void):
@func(Measurement)
def __new__(cls):
m = _newscalar(cls, cls._dtype)
for field in m.dtype.names:
for field in m._dtype0.names:
fdtype = m.dtype.fields[field][0]
m[field] = NA(fdtype)
if fdtype.shape == ():
m[field] = NA(fdtype) # scalar
else:
m[field][:] = NA(fdtype.base) # subarray
return m
# _all_qci expands <name>.QCI into <name>.sum and [] of <name>.<qci> for all possible qci values.
# TODO remove and use direct array access (after causes are expanded into array too)
nqci = 256 # all possible QCIs ∈ [0,255], standard ones are described in 23.203 Table 6.1.7
def _all_qci(name_qci: str): # -> name_sum, ()name_qciv
if not name_qci.endswith(".QCI"):
raise AssertionError("invalid name_qci %r: no .QCI suffix" % name_qci)
name = name_qci[:-len(".QCI")]
return name+".sum", () # TODO add all possible QCIs - TS 36.413 (S1AP)
name_qciv = tuple("%s.%d" % (name,q) for q in range(nqci))
return name+".sum", name_qciv
# _all_cause expands <name>.CAUSE into <name>.sum and [] of <name>.<cause> for all possible cause values.
def _all_cause(name_cause: str): # -> name_sum, ()name_causev
......@@ -242,13 +251,16 @@ def _all_cause(name_cause: str): # -> name_sum, ()name_causev
# expand all .QCI and .CAUSE in Measurement._dtype .
def _():
expv = [] # of (name, typ)
# expand X.QCI -> X.sum + X.QCI[nqci]
qnamev = [] # X from X.QCI
expv = [] # of (name, typ[, shape])
for name in Measurement._dtype .names:
typ = Measurement._dtype .fields[name][0].type
if name.endswith('.QCI'):
Σ, qciv = _all_qci(name)
for _ in (Σ,)+qciv:
expv.append((_, typ))
_ = name[:-len('.QCI')]
qnamev.append(_)
expv.append(('%s.sum' % _, typ)) # X.sum
expv.append((name, typ, nqci)) # X.QCI[nqci]
elif name.endswith('.CAUSE'):
Σ, causev = _all_cause(name)
......@@ -258,7 +270,33 @@ def _():
else:
expv.append((name, typ))
Measurement._dtype = np.dtype(expv)
_dtype = np.dtype(expv)
# also provide .QCI aliases, e.g. X.5 -> X.QCI[5]
namev = []
formatv = []
offsetv = []
for name in _dtype.names:
fd, off = _dtype.fields[name]
namev .append(name)
formatv.append(fd)
offsetv.append(off)
for qname in qnamev:
qarr, off0 = _dtype.fields[qname+'.QCI']
assert len(qarr.shape) == 1
for qci in range(qarr.shape[0]):
namev .append('%s.%d' % (qname, qci))
formatv.append(qarr.base)
offsetv.append(off0 + qci*qarr.base.itemsize)
Measurement._dtype0 = _dtype # ._dtype without aliases
Measurement._dtype = np.dtype({
'names': namev,
'formats': formatv,
'offsets': offsetv,
})
assert Measurement._dtype.itemsize == Measurement._dtype0.itemsize
_()
del _
......@@ -268,22 +306,40 @@ del _
@func(Measurement)
def __repr__(m):
initv = []
for field in m.dtype.names:
v = m[field]
if not isNA(v):
initv.append("%s=%r" % (field, v))
for field in m._dtype0.names:
vs = _vstr(m[field])
if vs != 'ø':
initv.append("%s=%s" % (field, vs))
return "Measurement(%s)" % ', '.join(initv)
# __str__ returns "(v1, v2, ...)".
# NA values are represented as "ø".
# .QCI arrays are represented as {qci₁:v₁ qci₂:v₂ ...} with zero values omitted.
# if all values are NA - then the whole array is represented as ø.
@func(Measurement)
def __str__(m):
vv = []
for field in m.dtype.names:
v = m[field]
vv.append('ø' if isNA(v) else str(v))
for field in m._dtype0.names:
vv.append(_vstr(m[field]))
return "(%s)" % ', '.join(vv)
# _vstr returns string representation of scalar or subarray v.
def _vstr(v): # -> str
if v.shape == (): # scalar
return 'ø' if isNA(v) else str(v)
assert len(v.shape) == 1
if isNA(v).all(): # subarray full of ø
return 'ø'
va = [] # subarray with some non-ø data
for k in range(v.shape[0]):
if v[k] == 0:
continue
va.append('%d:%s' % (k, 'ø' if isNA(v[k]) else str(v[k])))
return "{%s}" % ' '.join(va)
# ==, != for Measurement.
@func(Measurement)
def __eq__(a, b):
......@@ -291,7 +347,10 @@ def __eq__(a, b):
# return np.array_equal(a, b, equal_nan=True) # for NA==NA
if not isinstance(b, Measurement):
return False
return a.data.tobytes() == b.data.tobytes()
# cast to dtype without aliases to avoid
# "dtypes with overlapping or out-of-order fields are not representable as buffers"
return a.view(a._dtype0).data.tobytes() == \
b.view(b._dtype0).data.tobytes()
@func(Measurement)
def __ne__(a, b):
......@@ -314,6 +373,8 @@ def _check_valid(m):
for field in m.dtype.names:
v = m[field]
if v.shape != (): # skip subarrays - rely on aliases
continue
if isNA(v):
continue
......@@ -359,8 +420,9 @@ def append(mlog, m: Measurement):
if not (τ_ + δτ_ <= τ):
raise AssertionError(".Tstart overlaps with previous measurement: %s ∈ [%s, %s)" %
(τ, τ_, τ_ + δτ_))
_ = np.append(mlog._data, m)
_ = np.append(
mlog._data.view(Measurement._dtype0), # dtype0 because np.append does not handle aliased
m.view(Measurement._dtype0)) # fields as such and increases out itemsize
mlog._data = _.view((Measurement, Measurement._dtype)) # np.append looses Measurement from dtype
# forget_past deletes measurements with .Tstart ≤ Tcut
......@@ -528,6 +590,71 @@ def _success_rate(calc, fini, init): # -> Interval in [0,1]
return Interval(a,b)
# eutran_ip_throughput computes "E-UTRAN IP Throughput" KPI.
#
# It returns the following:
#
# - IPThp[QCI][dl,ul] IP throughput per QCI for downlink and uplink (bit/s)
#
# All elements are returned as Intervals with information about confidence for
# computed values.
#
# NOTE: the unit of the result is bit/s, not kbit/s.
#
# 3GPP reference: TS 32.450 6.3.1 "E-UTRAN IP Throughput".
@func(Calc)
def eutran_ip_throughput(calc): # -> IPThp[QCI][dl,ul]
qdlΣv = np.zeros(nqci, dtype=np.float64)
qdlΣt = np.zeros(nqci, dtype=np.float64)
qdlΣte = np.zeros(nqci, dtype=np.float64)
qulΣv = np.zeros(nqci, dtype=np.float64)
qulΣt = np.zeros(nqci, dtype=np.float64)
qulΣte = np.zeros(nqci, dtype=np.float64)
for m in calc._miter():
τ = m['X.δT']
for qci in range(nqci):
dl_vol = m["DRB.IPVolDl.QCI"] [qci]
dl_time = m["DRB.IPTimeDl.QCI"] [qci]
dl_time_err = m["XXX.DRB.IPTimeDl_err.QCI"] [qci]
ul_vol = m["DRB.IPVolUl.QCI"] [qci]
ul_time = m["DRB.IPTimeUl.QCI"] [qci]
ul_time_err = m["XXX.DRB.IPTimeUl_err.QCI"] [qci]
if isNA(dl_vol) or isNA(dl_time) or isNA(dl_time_err):
# don't account uncertainty - here it is harder to do compared
# to erab_accessibility and the benefit is not clear. Follow
# plain 3GPP spec for now.
pass
else:
qdlΣv[qci] += dl_vol
qdlΣt[qci] += dl_time
qdlΣte[qci] += dl_time_err
if isNA(ul_vol) or isNA(ul_time) or isNA(ul_time_err):
# no uncertainty accounting - see ^^^
pass
else:
qulΣv[qci] += ul_vol
qulΣt[qci] += ul_time
qulΣte[qci] += ul_time_err
thp = np.zeros(nqci, dtype=np.dtype([
('dl', Interval._dtype),
('ul', Interval._dtype),
]))
for qci in range(nqci):
if qdlΣt[qci] > 0:
thp[qci]['dl']['lo'] = qdlΣv[qci] / (qdlΣt[qci] + qdlΣte[qci])
thp[qci]['dl']['hi'] = qdlΣv[qci] / (qdlΣt[qci] - qdlΣte[qci])
if qulΣt[qci] > 0:
thp[qci]['ul']['lo'] = qulΣv[qci] / (qulΣt[qci] + qulΣte[qci])
thp[qci]['ul']['hi'] = qulΣv[qci] / (qulΣt[qci] - qulΣte[qci])
return thp
# _miter iterates through [.τ_lo, .τ_hi) yielding Measurements.
#
# The measurements are yielded with consecutive timestamps. There is no gaps
......@@ -602,7 +729,7 @@ def _Σx(m: Measurement, name_x: str, _all_x: func):
if not isNA(s):
return s
s = s.dtype.type(0)
ok = True
ok = True if len(name_xv) > 0 else False
for _ in name_xv:
v = m[_]
# we don't know the answer even if single value is NA
......@@ -627,6 +754,7 @@ def _newscalar(typ, dtype):
_ = np.zeros(shape=(), dtype=(typ, dtype))
s = _[()]
assert type(s) is typ
assert s.dtype == dtype
return s
......@@ -634,18 +762,24 @@ def _newscalar(typ, dtype):
# NA returns "Not Available" value for dtype.
def NA(dtype):
typ = dtype.type
# float
if issubclass(dtype.type, np.floating):
return np.nan
if issubclass(typ, np.floating):
na = np.nan
# int: NA is min value
if issubclass(dtype.type, np.signedinteger):
return np.iinfo(dtype.type).min
elif issubclass(typ, np.signedinteger):
na = np.iinfo(typ).min
else:
raise AssertionError("NA not defined for dtype %s" % (dtype,))
raise AssertionError("NA not defined for dtype %s" % (dtype,))
return typ(na) # return the same type as dtype has, e.g. np.int32, not int
# isNA returns whether value represent NA.
# value must be numpy scalar.
#
# returns True/False if value is scalar.
# returns array(True/False) if value is array.
def isNA(value):
na = NA(value.dtype)
if np.isnan(na):
......
# -*- coding: utf-8 -*-
# Copyright (C) 2022 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com>
# Copyright (C) 2022-2023 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com>
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your
......@@ -18,7 +18,7 @@
# See COPYING file for full licensing terms.
# See https://www.nexedi.com/licensing for rationale and options.
from xlte.kpi import Calc, MeasurementLog, Measurement, Interval, NA, isNA
from xlte.kpi import Calc, MeasurementLog, Measurement, Interval, NA, isNA, Σqci, Σcause, nqci
import numpy as np
from pytest import raises
......@@ -29,12 +29,18 @@ def test_Measurement():
# verify that all fields are initialized to NA
def _(name):
assert isNA(m[name])
v = m[name]
if v.shape == ():
assert isNA(v) # scalar
else:
assert isNA(v).all() # array
# several fields explicitly
_('X.Tstart') # time
_('RRC.ConnEstabAtt.sum') # Tcc
_('DRB.PdcpSduBitrateDl.sum') # float32
_('DRB.IPThpVolDl.sum') # int64
_('DRB.IPVolDl.sum') # int64
_('DRB.IPTimeDl.7') # .QCI alias
_('DRB.IPTimeDl.QCI') # .QCI array
# everything automatically
for name in m.dtype.names:
_(name)
......@@ -45,16 +51,29 @@ def test_Measurement():
assert m['S1SIG.ConnEstabAtt'] == 123
m['RRC.ConnEstabAtt.sum'] = 17
assert m['RRC.ConnEstabAtt.sum'] == 17
m['DRB.IPVolDl.QCI'][:] = 0
m['DRB.IPVolDl.5'] = 55
m['DRB.IPVolDl.7'] = NA(m['DRB.IPVolDl.7'].dtype)
m['DRB.IPVolDl.QCI'][9] = 99
assert m['DRB.IPVolDl.5'] == 55; assert m['DRB.IPVolDl.QCI'][5] == 55
assert isNA(m['DRB.IPVolDl.7']); assert isNA(m['DRB.IPVolDl.QCI'][7])
assert m['DRB.IPVolDl.9'] == 99; assert m['DRB.IPVolDl.QCI'][9] == 99
for k in range(len(m['DRB.IPVolDl.QCI'])):
if k in {5,7,9}:
continue
assert m['DRB.IPVolDl.%d' % k] == 0
assert m['DRB.IPVolDl.QCI'][k] == 0
# str/repr
assert repr(m) == "Measurement(RRC.ConnEstabAtt.sum=17, S1SIG.ConnEstabAtt=123)"
assert repr(m) == "Measurement(RRC.ConnEstabAtt.sum=17, DRB.IPVolDl.QCI={5:55 7:ø 9:99}, S1SIG.ConnEstabAtt=123)"
s = str(m)
assert s[0] == '('
assert s[-1] == ')'
v = s[1:-1].split(', ')
vok = ['ø'] * len(m.dtype.names)
vok = ['ø'] * len(m._dtype0.names)
vok[m.dtype.names.index("RRC.ConnEstabAtt.sum")] = "17"
vok[m.dtype.names.index("S1SIG.ConnEstabAtt")] = "123"
vok[m.dtype.names.index("DRB.IPVolDl.QCI")] = "{5:55 7:ø 9:99}"
assert v == vok
# verify that time fields has enough precision
......@@ -420,9 +439,107 @@ def test_Calc_erab_accessibility():
_(InititialEPSBEstabSR, 100 * 2*3*4 / (7*8*9))
# verify Calc.eutran_ip_throughput .
def test_Calc_eutran_ip_throughput():
# most of the job is done by drivers collecting DRB.IPVol{Dl,Ul} and DRB.IPTime{Dl,Ul}
# here we verify final aggregation, that eutran_ip_throughput does, only lightly.
m = Measurement()
m['X.Tstart'] = 10
m['X.δT'] = 10
m['DRB.IPVolDl.5'] = 55e6
m['DRB.IPVolUl.5'] = 55e5
m['DRB.IPTimeDl.5'] = 1e2
m['DRB.IPTimeUl.5'] = 1e2
m['DRB.IPVolDl.7'] = 75e6
m['DRB.IPVolUl.7'] = 75e5
m['DRB.IPTimeDl.7'] = 1e2
m['DRB.IPTimeUl.7'] = 1e2
m['DRB.IPVolDl.9'] = 0
m['DRB.IPVolUl.9'] = 0
m['DRB.IPTimeDl.9'] = 0
m['DRB.IPTimeUl.9'] = 0
for qci in {5,7,9}:
m['XXX.DRB.IPTimeDl_err.QCI'][qci] = 0
m['XXX.DRB.IPTimeUl_err.QCI'][qci] = 0
# (other QCIs are left with na)
for qci in set(range(nqci)).difference({5,7,9}):
assert isNA(m['DRB.IPVolDl.QCI'][qci])
assert isNA(m['DRB.IPVolUl.QCI'][qci])
assert isNA(m['DRB.IPTimeDl.QCI'][qci])
assert isNA(m['DRB.IPTimeUl.QCI'][qci])
assert isNA(m['XXX.DRB.IPTimeDl_err.QCI'][qci])
assert isNA(m['XXX.DRB.IPTimeUl_err.QCI'][qci])
mlog = MeasurementLog()
mlog.append(m)
calc = Calc(mlog, 10,20)
thp = calc.eutran_ip_throughput()
def I(x): return Interval(x,x)
assert thp[5]['dl'] == I(55e4)
assert thp[5]['ul'] == I(55e3)
assert thp[7]['dl'] == I(75e4)
assert thp[7]['ul'] == I(75e3)
assert thp[9]['dl'] == I(0)
assert thp[9]['ul'] == I(0)
for qci in set(range(nqci)).difference({5,7,9}):
assert thp[qci]['dl'] == I(0)
assert thp[qci]['ul'] == I(0)
# verify Σqci.
def test_Σqci():
m = Measurement()
x = 'ERAB.EstabInitAttNbr'
def Σ():
return Σqci(m, x+'.QCI')
assert isNA(Σ())
m[x+'.sum'] = 123
assert Σ() == 123
m[x+'.17'] = 17
m[x+'.23'] = 23
m[x+'.255'] = 255
assert Σ() == 123 # from .sum
m[x+'.sum'] = NA(m[x+'.sum'].dtype)
assert isNA(Σ()) # from array, but NA values lead to sum being NA
v = m[x+'.QCI']
l = len(v)
for i in range(l):
v[i] = 1 + i
assert Σ() == 1*l + (l-1)*l/2
# verify Σcause.
def test_Σcause():
m = Measurement()
x = 'RRC.ConnEstabAtt'
def Σ():
return Σcause(m, x+'.CAUSE')
assert isNA(Σ())
m[x+'.sum'] = 123
assert Σ() == 123
# TODO sum over individual causes (when implemented)
def test_NA():
def _(typ):
return NA(typ(0).dtype)
na = NA(typ(0).dtype)
assert type(na) is typ
assert isNA(na)
return na
assert np.isnan( _(np.float16) )
assert np.isnan( _(np.float32) )
......
#!/usr/bin/env python
# Copyright (C) 2023 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com>
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your
# option) any later version, as published by the Free Software Foundation.
#
# You can also Link and Combine this program with other software covered by
# the terms of any of the Free Software licenses or any of the Open Source
# Initiative approved licenses and Convey the resulting work. Corresponding
# source of such a combination shall include the source code for all other
# software used.
#
# This program is distributed WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See COPYING file for full licensing terms.
# See https://www.nexedi.com/licensing for rationale and options.
"""Program udpflood sends/floods packets via UDP.
It is useful to test how E-UTRAN IP Throughput KPI implementation handles bursts.
Usage: udpflood host:port npkt/period pause_ms
"""
import sys, time
from socket import socket, AF_INET, SOCK_DGRAM, IPPROTO_UDP
def main():
addr = sys.argv[1]
host, port = addr.split(':')
port = int(port)
npkt_period = 1
pause_ms = 0
if len(sys.argv) >= 3:
npkt_period = int(sys.argv[2])
if len(sys.argv) >= 4:
pause_ms = int(sys.argv[3])
print("# udpflood -> %s :%s %d pkt/period, %dms pause in between periods" %
(host, port, npkt_period, pause_ms))
sk = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)
pkt = b'\xff'*1000
while 1:
for _ in range(npkt_period):
sk.sendto(pkt, (host, port))
if pause_ms:
time.sleep(pause_ms*0.001)
if __name__ == '__main__':
main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment