Commit 7164e99c authored by Kirill Smelkov's avatar Kirill Smelkov

amari.{kpi,drb}: Fix multicell handling

1) Fix amari.kpi to handle stats messages in enb.xlog that come with multiple cells.
   Previously such messages were leading to the following errors on KPI calculator place (e.g. Wendelin):

        xlte.amari.kpi.LogError: t1731059787.321: stats describes 2 cells;  but only single-cell configurations are supported

2) Fix amari.drb to generate x.drb_stats messages when an UE is associated to multiple cells due to e.g. Carrier Aggregation.
   Previously CA was leading to

        raise RuntimeError(("ue #%s belongs to %d cells;  "+
            "but only single-cell configurations are supported") % (ue_id, len(ju(['cells']))))

   error on eNB side.

+ minor fixes and enhancements done along the way.

Please see individual patches for details.

An example enb.xlog for eNB with 2 cells and an UE Carrier-Aggregated to both cells is here:

https://lab.nexedi.com/kirr/misc/-/blob/6a04cf3/lte/20241111-2cell.xlog

And here is how it looks when visualized via kpidemo.py :

https://lab.nexedi.com/kirr/misc/-/blob/6a04cf3/lte/20241111-2cell.png

Kirill

/cc @lu.xu
/reviewed-by @paul.graydon
/reviewed-on !7
parents c5e92b6a 10837c10
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# Copyright (C) 2023 Nexedi SA and Contributors. # Copyright (C) 2023-2024 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com> # Kirill Smelkov <kirr@nexedi.com>
# #
# This program is free software: you can Use, Study, Modify and Redistribute # This program is free software: you can Use, Study, Modify and Redistribute
...@@ -111,6 +111,8 @@ class _ERAB_Flow: ...@@ -111,6 +111,8 @@ class _ERAB_Flow:
# _QCI_Flow represents in-progress collection to make up a Sample. # _QCI_Flow represents in-progress collection to make up a Sample.
# #
# It tracks data transmission on particular QCI of particular UE.
#
# .update(δt, tx_bytes, #tx, ...) updates flow with information about next # .update(δt, tx_bytes, #tx, ...) updates flow with information about next
# transmission period and potentially yields some finalized Samples. # transmission period and potentially yields some finalized Samples.
# .finish() completes Sample collection. # .finish() completes Sample collection.
...@@ -141,15 +143,38 @@ class _QCI_Flow: ...@@ -141,15 +143,38 @@ class _QCI_Flow:
# adjusted stream with #tx corresponding to tx_bytes coming together # adjusted stream with #tx corresponding to tx_bytes coming together
# synchronized in time. # synchronized in time.
# #
# .next(δt, tx_bytes, #tx, X) -> [](δt', tx_bytes', #tx', X') # .next(δt, tx_bytes, {C → #tx, bitrate}) -> [](δt', tx_bytes', {C → #tx'})
# .finish() -> [](δt', tx_bytes', #tx', X') # .finish() -> [](δt', tx_bytes', {C → #tx'})
# #
# (*) see e.g. Figure 8.1 in "An introduction to LTE, 2nd ed." # (*) see e.g. Figure 8.1 in "An introduction to LTE, 2nd ed."
class _BitSync: class _BitSync:
__slots__ = ( __slots__ = (
'txq', # [](δt,tx_bytes,#tx,X) not-yet fully processed tail of whole txv 'txsplit', # _CTXBytesSplitter that splits total tx_bytes into per-cell parts
'txq', # [](δt, _Utx + .tx_bytes) not-yet fully processed tail of whole txv
'i_txq', # txq represents txv[i_txq:] 'i_txq', # txq represents txv[i_txq:]
'i_lshift', # next left shift will be done on txv[i_lshift] <- txv[i_lshift+1] 'cbitsync1', # {} cell -> _BitSync1 s1.i_txq = .i_txq; len(s1.txq) = len(.txq)
# s1.i_txq and s1.i_lshift are kept in sync
# in between all bitsync1s
)
# _BitSync1 serves _BitSync by handling transmission substream on one particuar cell.
#
# .next(ctx_bytes, #tx) -> [](ctx_bytes', #tx')
# .finish() -> [](ctx_bytes', #tx')
class _BitSync1:
__slots__ = (
'txq', # [](ctx_bytes,#tx) not-yet fully processed tail of whole txv/cell
'i_txq', # txq represents txv/cell[i_txq:]
'i_lshift', # next left shift of #tx will be done on txv/cell[i_lshift] <- txv/cell[i_lshift+1]
)
# _CTXBytesSplitter serves _BitSync by spliting total tx_bytes into per-cell parts.
#
# .next(δt, tx_bytes, {C → #tx, bitrate}) -> [](δt', {C → #tx, ctx_bytes})
# .finish() -> [](δt', {C → #tx, ctx_bytes})
class _CTXBytesSplitter:
__slots__ = (
'txq', # [](δt, tx_bytes, _Utx)
) )
...@@ -221,11 +246,22 @@ def add(s, ue_stats, stats): # -> dl/ul samples ; dl/ul = {} qci -> []Sample ...@@ -221,11 +246,22 @@ def add(s, ue_stats, stats): # -> dl/ul samples ; dl/ul = {} qci -> []Sample
ul = s._ul_sampler.add(ue_stats, stats) ul = s._ul_sampler.add(ue_stats, stats)
return dl, ul return dl, ul
class _Utx: # transmission state passed through bitsync class _Utx: # UE transmission state
__slots__ = (
'qtx_bytes', # {} qci -> Σδerab_qci=qci
'cutx', # {} cell -> _UCtx
)
class _UCtx: # UE transmission state on particular cell
__slots__ = ( __slots__ = (
'qtx_bytes', 'tx',
'retx',
'bitrate',
'rank', 'rank',
'xl_use_avg', 'xl_use_avg',
# tx_bytes is per-cell part of total tx_bytes estimated by _CTXBytesSplitter
'tx_bytes', # initially set to None
) )
@func(_Sampler) @func(_Sampler)
...@@ -243,22 +279,29 @@ def add(s, ue_stats, stats, init=False): ...@@ -243,22 +279,29 @@ def add(s, ue_stats, stats, init=False):
ue_id = ju['enb_ue_id'] # TODO 5G: -> ran_ue_id + qos_flow_list + sst? ue_id = ju['enb_ue_id'] # TODO 5G: -> ran_ue_id + qos_flow_list + sst?
ue_live.add(ue_id) ue_live.add(ue_id)
if len(ju['cells']) != 1:
raise RuntimeError(("ue #%s belongs to %d cells; "+
"but only single-cell configurations are supported") % (ue_id, len(ju(['cells']))))
cell = ju['cells'][0]
tx = cell['%s_tx' % s.dir] # in transport blocks
retx = cell['%s_retx' % s.dir] # ----//----
assert tx >= 0, tx
assert retx >= 0, retx
cell_id = cell['cell_id'] # int
scell = stats['cells'][str(cell_id)]
u = _Utx() u = _Utx()
u.qtx_bytes = {} # qci -> Σδerab_qci=qci u.qtx_bytes = {} # qci -> Σδerab_qci=qci
u.rank = cell['ri'] if s.use_ri else 1 u.cutx = {} # cell -> _UCtx
u.xl_use_avg = scell['%s_use_avg' % s.dir]
for ucell in ju['cells']:
cell_id = ucell['cell_id'] # int
stats_cell = stats['cells'][str(cell_id)]
uc = _UCtx()
assert cell_id not in u.cutx, u.cutx
u.cutx[cell_id] = uc
uc.tx = ucell['%s_tx' % s.dir] # in transport blocks
uc.retx = ucell['%s_retx' % s.dir] # ----//----
uc.bitrate = ucell['%s_bitrate' % s.dir] # bits/s
assert uc.tx >= 0, uc.tx
assert uc.retx >= 0, uc.retx
assert uc.bitrate >= 0, uc.bitrate
uc.rank = ucell['ri'] if s.use_ri else 1
uc.xl_use_avg = stats_cell['%s_use_avg' % s.dir]
uc.tx_bytes = None
ue = s.ues.get(ue_id) ue = s.ues.get(ue_id)
if ue is None: if ue is None:
...@@ -292,22 +335,30 @@ def add(s, ue_stats, stats, init=False): ...@@ -292,22 +335,30 @@ def add(s, ue_stats, stats, init=False):
u.qtx_bytes[qci] = u.qtx_bytes.get(qci,0) + etx_bytes u.qtx_bytes[qci] = u.qtx_bytes.get(qci,0) + etx_bytes
# debug # debug
if 0 and s.dir == 'dl' and (etx_bytes != 0 or tx != 0 or retx != 0) and qci==9: if 0 and \
s.dir == 'dl' and ( \
etx_bytes != 0 or \
any([(uc.tx != 0 or uc.retx != 0 or uc.bitrate != 0) for uc in u.cutx.values()]) \
) and qci==9:
sfnx = ((t // tti) / 10) % 1024 # = SFN.subframe sfnx = ((t // tti) / 10) % 1024 # = SFN.subframe
_debug('% 4.1f ue%s %s .%d: etx_total_bytes: %d +%5d tx: %2d retx: %d ri: %d bitrate: %d' % \ dtx = '% 4.1f ue%s %s .%d: etx_total_bytes: %d +%5d' % \
(sfnx, ue_id, s.dir, qci, etx_total_bytes, etx_bytes, tx, retx, u.rank, cell['%s_bitrate' % s.dir])) (sfnx, ue_id, s.dir, qci, etx_total_bytes, etx_bytes)
for cell_id in sorted(u.cutx):
uc = u.cutx[cell_id]
dtx += '| C%d: tx %2d retx %d ri %d bitrate %d' % \
(cell_id, uc.tx, uc.retx, uc.rank, uc.bitrate)
_debug(dtx)
# gc non-live erabs # gc non-live erabs
for erab_id in set(ue.erab_flows.keys()): for erab_id in set(ue.erab_flows.keys()):
if erab_id not in eflows_live: if erab_id not in eflows_live:
del ue.erab_flows[erab_id] del ue.erab_flows[erab_id]
# bitsync <- (δt, tx_bytes, #tx, u) # bitsync <- (δt, tx_bytes, u)
tx += retx # both transmission and retransmission take time
if ue.bitsync is not None: if ue.bitsync is not None:
bitnext = ue.bitsync.next(δt, tx_bytes, tx, u) bitnext = ue.bitsync.next(δt, tx_bytes, u)
else: else:
bitnext = [(δt, tx_bytes, tx, u)] bitnext = [(δt, tx_bytes, u)]
# update qci flows # update qci flows
if init: if init:
...@@ -326,27 +377,55 @@ def add(s, ue_stats, stats, init=False): ...@@ -326,27 +377,55 @@ def add(s, ue_stats, stats, init=False):
return qci_samples return qci_samples
# _update_qci_flows updates .qci_flows for ue with (δt, tx_bytes, #tx, _Utx) yielded from bitsync. # _update_qci_flows updates .qci_flows for ue with (δt, tx_bytes, _Utx) yielded from bitsync.
# #
# yielded samples are appended to qci_samples ({} qci -> []Sample). # yielded samples are appended to qci_samples ({} qci -> []Sample).
@func(_UE) @func(_UE)
def _update_qci_flows(ue, bitnext, qci_samples): def _update_qci_flows(ue, bitnext, qci_samples):
for (δt, tx_bytes, tx, u) in bitnext: for (δt, tx_bytes, u) in bitnext:
qflows_live = set() # of qci qci flows that get updated from current utx entry qflows_live = set() # of qci qci flows that get updated from current utx entry
# it might happen that even with correct bitsync we could end up with receiving tx=0 here. # estimate time for current transmission
# first normalize transport blocks to time in TTI units (if it is e.g.
# 2x2 mimo, we have 2x more transport blocks) and then estimate tx time
# from transmission time on different cells C₁ and C₂ as
#
# tx_time ∈ [max(t₁,t₂), min(t₁+t₂, δt/tti)]
δt_tti = δt / tti
tx_lo = 0
tx_hi = 0
for uc in u.cutx.values():
ctx = (uc.tx + uc.retx) / uc.rank # both transmission and retransmission take time
ctx = min(ctx, δt_tti) # protection (should not happen)
ctx_lo = ctx_hi = ctx
# it might happen that even with correct bitsync we could end up with receiving ctx=0 here.
# for example it happens if finish interrupts proper bitsync workflow e.g. as follows: # for example it happens if finish interrupts proper bitsync workflow e.g. as follows:
# #
# 1000 0 # 1000 0
# <-- finish # <-- finish
# 0 10 # 0 10
# #
# if we see #tx = 0 we say that it might be anything in between 1 and δt. # if we see ctx = 0 we say that it might be anything in between 1 and δt.
tx_lo = tx_hi = tx if ctx_lo == 0:
if tx == 0: ctx_hi = δt_tti
tx_hi = δt/tti ctx_lo = min(1, ctx_hi)
tx_lo = min(1, tx_hi)
# tx time on the cell is somewhere in [ctx, δt_tti]
if uc.xl_use_avg < 0.9:
# not congested: it likely took the time to transmit ≈ ctx
pass
else:
# potentially congested: we don't know how much congested it is and
# which QCIs are affected more and which less
# -> all we can say tx_time is only somewhere in between limits
ctx_hi = δt_tti
tx_lo = max(tx_lo, ctx_lo)
tx_hi += ctx_hi
tx_hi = min(tx_hi, δt_tti)
# share/distribute tx time over all QCIs.
for qci, tx_bytes_qci in u.qtx_bytes.items(): for qci, tx_bytes_qci in u.qtx_bytes.items():
qflows_live.add(qci) qflows_live.add(qci)
...@@ -354,8 +433,6 @@ def _update_qci_flows(ue, bitnext, qci_samples): ...@@ -354,8 +433,6 @@ def _update_qci_flows(ue, bitnext, qci_samples):
if qf is None: if qf is None:
qf = ue.qci_flows[qci] = _QCI_Flow() qf = ue.qci_flows[qci] = _QCI_Flow()
# share/distribute #tx transport blocks over all QCIs.
#
# Consider two streams "x" and "o" and how LTE scheduler might # Consider two streams "x" and "o" and how LTE scheduler might
# place them into resource map: if the streams have the same # place them into resource map: if the streams have the same
# priority they might be scheduled e.g. as shown in case "a". # priority they might be scheduled e.g. as shown in case "a".
...@@ -387,7 +464,7 @@ def _update_qci_flows(ue, bitnext, qci_samples): ...@@ -387,7 +464,7 @@ def _update_qci_flows(ue, bitnext, qci_samples):
if qtx_lo > tx_hi: # e.g. 6.6 * 11308 / 11308 = 6.6 + ~1e-15 if qtx_lo > tx_hi: # e.g. 6.6 * 11308 / 11308 = 6.6 + ~1e-15
qtx_lo -= 1e-4 qtx_lo -= 1e-4
assert 0 < qtx_lo <= tx_hi, (qtx_lo, tx_hi, tx_bytes_qci, tx_bytes) assert 0 < qtx_lo <= tx_hi, (qtx_lo, tx_hi, tx_bytes_qci, tx_bytes)
_ = qf.update(δt, tx_bytes_qci, qtx_lo, tx_hi, u.rank, u.xl_use_avg) _ = qf.update(δt, tx_bytes_qci, qtx_lo, tx_hi)
for sample in _: for sample in _:
qci_samples.setdefault(qci, []).append(sample) qci_samples.setdefault(qci, []).append(sample)
...@@ -407,39 +484,22 @@ def __init__(qf): ...@@ -407,39 +484,22 @@ def __init__(qf):
qf.tx_time_err = 0 qf.tx_time_err = 0
# update updates flow with information that so many bytes were transmitted during # update updates flow with information that so many bytes were transmitted during
# δt with using #tx transport blocks somewhere in [tx_lo,tx_hi] and with # δt with using tx transmission time somewhere in [tx_lo,tx_hi].
# specified rank. It is also known that overall average usage of resource
# blocks corresponding to tx direction in the resource map is xl_use_avg.
@func(_QCI_Flow) @func(_QCI_Flow)
def update(qf, δt, tx_bytes, tx_lo, tx_hi, rank, xl_use_avg): # -> []Sample def update(qf, δt, tx_bytes, tx_lo, tx_hi): # -> []Sample
#_debug('QF.update %.2ftti %5db %.1f-%.1ftx %drank %.2fuse' % (δt/tti, tx_bytes, tx_lo, tx_hi, rank, xl_use_avg)) #_debug('QF.update %.2ftti %5db %.1f-%.1ftx' % (δt/tti, tx_bytes, tx_lo, tx_hi))
tx_lo /= rank # normalize TB to TTI (if it is e.g. 2x2 mimo, we have 2x more transport blocks)
tx_hi /= rank
vout = [] vout = []
s = qf._update(δt, tx_bytes, tx_lo, tx_hi, xl_use_avg) s = qf._update(δt, tx_bytes, tx_lo, tx_hi)
if s is not None: if s is not None:
vout.append(s) vout.append(s)
return vout return vout
@func(_QCI_Flow) @func(_QCI_Flow)
def _update(qf, δt, tx_bytes, tx_lo, tx_hi, xl_use_avg): # -> ?Sample def _update(qf, δt, tx_bytes, tx_lo, tx_hi): # -> ?Sample
assert tx_bytes > 0 assert tx_bytes > 0
δt_tti = δt / tti δt_tti = δt / tti
tx_lo = min(tx_lo, δt_tti) # protection (should not happen)
tx_hi = min(tx_hi, δt_tti) # protection (should not happen)
# tx time is somewhere in [tx, δt_tti]
if xl_use_avg < 0.9:
# not congested: it likely took the time to transmit ≈ #tx
pass
else:
# potentially congested: we don't know how much congested it is and
# which QCIs are affected more and which less
# -> all we can say tx_time is only somewhere in between limits
tx_hi = δt_tti
tx_time = (tx_lo + tx_hi) / 2 * tti tx_time = (tx_lo + tx_hi) / 2 * tti
tx_time_err = (tx_hi - tx_lo) / 2 * tti tx_time_err = (tx_hi - tx_lo) / 2 * tti
...@@ -494,16 +554,167 @@ def _sample(qf): ...@@ -494,16 +554,167 @@ def _sample(qf):
# _BitSync creates new empty bitsync. # _BitSync creates new empty bitsync.
@func(_BitSync) @func(_BitSync)
def __init__(s): def __init__(s):
s.txsplit = _CTXBytesSplitter()
s.txq = [] s.txq = []
s.i_txq = 0 s.i_txq = 0
s.i_lshift = 0 s.cbitsync1 = {}
# next feeds next (δt, tx_bytes, tx) into bitsync. # _assert_all_insync asserts that data structures of bitsync and all bitsyncs1
# are in consistent synchronized state.
@func(_BitSync)
def _assert_all_insync(s):
if len(s.cbitsync1) == 0:
return
s1_base = _peek(s.cbitsync1.values())
assert s.i_txq == s1_base.i_txq , (s.i_txq, s1_base.i_txq)
assert len(s.txq) == len(s1_base.txq) , (s.txq, s1_base.txq)
for s1 in s.cbitsync1.values():
assert s1.i_txq == s1_base.i_txq , (s1.i_txq, s1_base.i_txq)
assert len(s1.txq) == len(s1_base.txq) , (s1.txq, s1_base.txq)
assert s1.i_lshift == s1_base.i_lshift , (s1.i_lshift, s1_base.i_lshift)
# next feeds next (δt, tx_bytes, _Utx) into bitsync.
# #
# and returns ready parts of adjusted stream. # and returns ready parts of adjusted stream.
@func(_BitSync) @func(_BitSync)
def next(s, δt, tx_bytes, tx, X): # -> [](δt', tx_bytes', tx', X') def next(s, δt, tx_bytes, u: _Utx): # -> [](δt', tx_bytes', u')
s.txq.append((δt, tx_bytes, tx, X)) vbitnext = []
# split total tx_bytes in between cells proportional to their bitrate
# yielded ub_ come with .tx_bytes set on each cell's _UCtx
for (δt, ub_) in s.txsplit.next(δt, tx_bytes, u):
vbitnext += s._next(δt, ub_)
return vbitnext
@func(_BitSync)
def _next(s, δt, u: _Utx):
s._assert_all_insync()
s.txq.append((δt, u))
cvbitnext1 = {} # cell -> [vbitnext1]
# base bitsync1 wrt which we will verify all other bitsync1s and bitsync
s1_base = None
s1_base_len_txq = None
s1_base_i_lshift = None
if len(s.cbitsync1) > 0:
s1_base = _peek(s.cbitsync1.values())
s1_base_len_txq = len(s1_base.txq)
s1_base_i_lshift = s1_base.i_lshift
# feed each bitsync1 with per-cell tx_bytes
for cell_id, uc in u.cutx.items():
if cell_id not in s.cbitsync1:
s1 = _BitSync1()
s1.i_txq = s.i_txq
s1.i_lshift = s.i_txq
# prefeed 0 to this bitsync1 to keep .i_lshift in sync with others
if s1_base is None:
s1_base = s1
s1_base_len_txq = len(s1_base.txq)
s1_base_i_lshift = s1_base.i_lshift
else:
while len(s1.txq) < s1_base_len_txq:
_ = s1.next(0, 0)
assert _ == []
assert s1.i_txq == s.i_txq
assert s1.i_lshift == s1_base_i_lshift
assert len(s1.txq) == s1_base_len_txq
s.cbitsync1[cell_id] = s1
else:
s1 = s.cbitsync1[cell_id]
cvbitnext1[cell_id] = s1.next(uc.tx_bytes, uc.tx + uc.retx)
# if a cell had no transmission activity it is fed with 0 tx_bytes/#tx so
# that its bitsync1 stays synchronized with bitsync1 of other cells
for cell_id in s.cbitsync1:
if cell_id not in u.cutx:
s1 = s.cbitsync1[cell_id]
cvbitnext1[cell_id] = s1.next(0, 0)
# merge results from all bitsync1s back into adjusted tx_bytes' and u'
vbitnext = s._merge_cvbitnext1(cvbitnext1)
s._assert_all_insync()
return vbitnext
# finish tells bitsync to flush its output queue.
#
# the bitsync becomes reset.
@func(_BitSync)
def finish(s): # -> [](δt', tx_bytes', u')
s._assert_all_insync()
# flush bitrate prefilter
vbitnext = []
for (δt, u) in s.txsplit.finish():
vbitnext += s._next(δt, u)
cvbitnext1 = {} # cell -> [vbitnext1]
for cell_id, s1 in s.cbitsync1.items():
cvbitnext1[cell_id] = s1.finish()
vbitnext += s._merge_cvbitnext1(cvbitnext1)
s._assert_all_insync()
assert len(s.txq) == 0
s.cbitsync1 = {}
return vbitnext
# _merge_cvbitnext1 combines per-cell results of _BitSync1.next or
# _BitSync1.finish for multiple cells into multi-cell result for _BitSync.next
# or _BitSync.finish.
@func(_BitSync)
def _merge_cvbitnext1(s, cvbitnext1): # -> [](δt', tx_bytes', u')
vbitnext = []
if len(cvbitnext1) > 0:
vbitnext1_base = _peek(cvbitnext1.values())
for vbitnext1 in cvbitnext1.values():
assert len(vbitnext1) == len(vbitnext1_base) , (vbitnext1, vbitnext1_base)
for i in range(len(vbitnext1_base)):
δt, u = s.txq.pop(0)
s.i_txq += 1
tx_bytes = 0
for cell_id, vbitnext1 in cvbitnext1.items():
if cell_id not in u.cutx:
# cell will soon appear for real. For now it appeared because
# _BitSync._next prepended zero transmissions to this cell to
# align its _BitSync1 with with bitsyncs of other cells.
u.cutx[cell_id] = uc = _UCtx()
uc.tx = 0
uc.retx = 0
uc.bitrate = 0
uc.rank = 1
uc.xl_use_avg = 0
uc.tx_bytes = 0
else:
uc = u.cutx[cell_id]
ctx_bytes, uc.tx = vbitnext1[i]
uc.retx = 0 # because individual bitsync1 moves all to .tx
tx_bytes += ctx_bytes
vbitnext.append((δt, tx_bytes, u))
return vbitnext
# _BitSync1 creates new empty bitsync1.
@func(_BitSync1)
def __init__(s):
s.txq = []
s.i_txq = 0
s.i_lshift = 0
# next feeds next (δt, tx_bytes, tx) into bitsync1.
#
# and returns ready parts of adjusted stream.
@func(_BitSync1)
def next(s, tx_bytes, tx): # -> [](tx_bytes', tx')
s.txq.append((tx_bytes, tx))
# XXX for simplicity we currently handle sync in between only current and # XXX for simplicity we currently handle sync in between only current and
# next frames. That is enough to support FDD. TODO handle next-next case to support TDD # next frames. That is enough to support FDD. TODO handle next-next case to support TDD
...@@ -537,8 +748,8 @@ def next(s, δt, tx_bytes, tx, X): # -> [](δt', tx_bytes', tx', X') ...@@ -537,8 +748,8 @@ def next(s, δt, tx_bytes, tx, X): # -> [](δt', tx_bytes', tx', X')
assert s.i_txq <= i < s.i_txq + len(s.txq) assert s.i_txq <= i < s.i_txq + len(s.txq)
i -= s.i_txq i -= s.i_txq
δt1, b1, t1, X1 = s.txq[i] b1, t1 = s.txq[i]
δt2, b2, t2, X2 = s.txq[i+1] b2, t2 = s.txq[i+1]
if b1 != 0: if b1 != 0:
t22 = b2*t1/b1 t22 = b2*t1/b1
else: else:
...@@ -551,8 +762,8 @@ def next(s, δt, tx_bytes, tx, X): # -> [](δt', tx_bytes', tx', X') ...@@ -551,8 +762,8 @@ def next(s, δt, tx_bytes, tx, X): # -> [](δt', tx_bytes', tx', X')
assert t1 >= 0, t1 assert t1 >= 0, t1
assert t2 >= 0, t2 assert t2 >= 0, t2
s.txq[i] = (δt1, b1, t1, X1) s.txq[i] = (b1, t1)
s.txq[i+1] = (δt2, b2, t2, X2) s.txq[i+1] = (b2, t2)
#print(' < lshift ', s.txq) #print(' < lshift ', s.txq)
while s.i_lshift+1 < s.i_txq + len(s.txq): while s.i_lshift+1 < s.i_txq + len(s.txq):
...@@ -574,15 +785,17 @@ def next(s, δt, tx_bytes, tx, X): # -> [](δt', tx_bytes', tx', X') ...@@ -574,15 +785,17 @@ def next(s, δt, tx_bytes, tx, X): # -> [](δt', tx_bytes', tx', X')
vout.append(_) vout.append(_)
return vout return vout
# finish tells bitsync to flush its output queue. # finish tells bitsync1 to flush its output queue.
# #
# the bitsync becomes reset. # the bitsync1 becomes reset.
@func(_BitSync) @func(_BitSync1)
def finish(s): # -> [](δt', tx_bytes', tx', X') def finish(s): # -> [](tx_bytes', tx')
assert len(s.txq) < 3 assert len(s.txq) < 3
s._rebalance(len(s.txq)) s._rebalance(len(s.txq))
vout = s.txq vout = s.txq
s.txq = [] s.txq = []
s.i_txq += len(vout)
s.i_lshift = s.i_txq
return vout return vout
# _rebalance redistributes tx_i in .txq[:l] proportional to tx_bytes_i: # _rebalance redistributes tx_i in .txq[:l] proportional to tx_bytes_i:
...@@ -603,23 +816,88 @@ def finish(s): # -> [](δt', tx_bytes', tx', X') ...@@ -603,23 +816,88 @@ def finish(s): # -> [](δt', tx_bytes', tx', X')
# #
# and has the effect of moving #tx from periods with tx_bytes=0, to periods # and has the effect of moving #tx from periods with tx_bytes=0, to periods
# where transmission actually happened (tx_bytes > 0). # where transmission actually happened (tx_bytes > 0).
@func(_BitSync) @func(_BitSync1)
def _rebalance(s, l): def _rebalance(s, l):
#print(' > rebalance', s.txq[:l]) #print(' > rebalance', s.txq[:l])
assert l <= len(s.txq) assert l <= len(s.txq)
assert l <= 3 assert l <= 3
Σb = sum(_[1] for _ in s.txq[:l]) Σb = sum(_[0] for _ in s.txq[:l])
Σt = sum(_[2] for _ in s.txq[:l]) Σt = sum(_[1] for _ in s.txq[:l])
if Σb != 0: if Σb != 0:
for i in range(l): for i in range(l):
δt_i, b_i, t_i, X_i = s.txq[i] b_i, t_i = s.txq[i]
t_i = b_i * Σt / Σb t_i = b_i * Σt / Σb
assert t_i >= 0, t_i assert t_i >= 0, t_i
s.txq[i] = (δt_i, b_i, t_i, X_i) s.txq[i] = (b_i, t_i)
#print(' < rebalance', s.txq[:l]) #print(' < rebalance', s.txq[:l])
# _CTXBytesSplitter creates new empty txsplit.
@func(_CTXBytesSplitter)
def __init__(s):
s.txq = []
# next feeds next (δt, tx_bytes, u) into txsplit.
#
# and returns ready parts of split stream.
@func(_CTXBytesSplitter)
def next(s, δt, tx_bytes, u: _Utx): # -> [](δt', u'+.txbytes)
# split tx_bytes in between cells according to (β₁+β₂)/Σcells(β₁+β₂)
# where βi is cell bandwidth in frame i.
assert len(s.txq) < 2
s.txq.append((δt, tx_bytes, u))
vtx = [] # of (δt', u'+.txbytes)
while len(s.txq) >= 2:
δt, tx_bytes, u1 = s.txq.pop(0)
_, _, u2 = s.txq[0]
Σβ12 = 0
for cell_id, uc1 in u1.cutx.items():
Σβ12 += uc1.bitrate
if cell_id in u2.cutx:
uc2 = u2.cutx[cell_id]
Σβ12 += uc2.bitrate
for cell_id, uc1 in u1.cutx.items():
β12 = uc1.bitrate
uc2 = u2.cutx.get(cell_id)
if uc2 is not None:
β12 += uc2.bitrate
if Σβ12 != 0:
uc1.tx_bytes = tx_bytes * β12 / Σβ12
else:
# should not happen, but divide equally just in case
uc1.tx_bytes = tx_bytes / len(u1.cutx)
vtx.append((δt, u1))
return vtx
# finish tells txsplit to flush its output queue.
#
# txsplit becomes reset.
@func(_CTXBytesSplitter)
def finish(s): # -> [](δt', u'+.txbytes)
assert len(s.txq) < 2
if len(s.txq) == 0:
return []
assert len(s.txq) == 1
# yield last chunk, by appending artificial empty tx frame
zutx = _Utx()
zutx.qtx_bytes = {}
zutx.cutx = {}
vtx = s.next(s.txq[0][0], 0, zutx)
assert len(vtx) == 1
assert len(s.txq) == 1
s.txq = []
return vtx
# __repr__ returns human-readable representation of Sample. # __repr__ returns human-readable representation of Sample.
@func(Sample) @func(Sample)
def __repr__(s): def __repr__(s):
...@@ -636,6 +914,15 @@ def __repr__(s): ...@@ -636,6 +914,15 @@ def __repr__(s):
return "Sample(%db, %.1f ±%.1ftti)\t# %.0f ±%.0f bit/s" % \ return "Sample(%db, %.1f ±%.1ftti)\t# %.0f ±%.0f bit/s" % \
(s.tx_bytes, s.tx_time/tti, s.tx_time_err/tti, div(s.tx_bytes*8, s.tx_time), (b_hi - b_lo)/2) (s.tx_bytes, s.tx_time/tti, s.tx_time_err/tti, div(s.tx_bytes*8, s.tx_time), (b_hi - b_lo)/2)
# __repr__ returns human-readable representation of _Utx and _UCtx.
@func(_Utx)
def __repr__(u):
return "Utx(qtx_bytes: %r, cutx: %r)" % (u.qtx_bytes, u.cutx)
@func(_UCtx)
def __repr__(uc):
return "UCtx(%dt, %dr, %.0f bit/s, %dri, %.2f use | %r tx_bytes)" % \
(uc.tx, uc.retx, uc.bitrate, uc.rank, uc.xl_use_avg, uc.tx_bytes)
# ---------------------------------------- # ----------------------------------------
...@@ -686,7 +973,7 @@ def _x_stats_srv(ctx, reqch: chan, conn: amari.Conn): ...@@ -686,7 +973,7 @@ def _x_stats_srv(ctx, reqch: chan, conn: amari.Conn):
# we can retrieve both ue_get and stats each at 100Hz simultaneously. # we can retrieve both ue_get and stats each at 100Hz simultaneously.
conn_stats = amari.connect(ctx, conn.wsuri) conn_stats = amari.connect(ctx, conn.wsuri)
defer(conn_stats.close) defer(conn_stats.close)
rtt_stats = _IncStats() # like rtt_ue_stats but for stat instead of ue_get rtt_stats = _IncStats() # like rtt_ue_stats but for stats instead of ue_get
δt_stats = _IncStats() # δ(stats.timestamp) δt_stats = _IncStats() # δ(stats.timestamp)
t_stats = None # last stats.timestamp t_stats = None # last stats.timestamp
def rx_stats(ctx): # -> stats def rx_stats(ctx): # -> stats
...@@ -869,7 +1156,7 @@ def _x_stats_srv(ctx, reqch: chan, conn: amari.Conn): ...@@ -869,7 +1156,7 @@ def _x_stats_srv(ctx, reqch: chan, conn: amari.Conn):
'ul_tx_time_err': Σul.tx_time_err, 'ul_tx_time_err': Σul.tx_time_err,
'ul_tx_time_notailtti': Σul.tx_time_notailtti, 'ul_tx_time_notailtti': Σul.tx_time_notailtti,
'ul_tx_time_notailtti_err': Σul.tx_time_notailtti_err, 'ul_tx_time_notailtti_err': Σul.tx_time_notailtti_err,
'u;_tx_nsamples': Σul.tx_nsamples, 'ul_tx_nsamples': Σul.tx_nsamples,
} }
r = {'time': ue_stats['time'], r = {'time': ue_stats['time'],
...@@ -993,3 +1280,9 @@ __debug = False ...@@ -993,3 +1280,9 @@ __debug = False
def _debug(*argv): def _debug(*argv):
if __debug: if __debug:
print(*argv, file=sys.stderr) print(*argv, file=sys.stderr)
# _peek peeks first item from a sequence.
# it is handy to use e.g. as _peek(dict.values()).
def _peek(seq):
return next(iter(seq))
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# Copyright (C) 2023 Nexedi SA and Contributors. # Copyright (C) 2023-2024 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com> # Kirill Smelkov <kirr@nexedi.com>
# #
# This program is free software: you can Use, Study, Modify and Redistribute # This program is free software: you can Use, Study, Modify and Redistribute
...@@ -20,12 +20,12 @@ ...@@ -20,12 +20,12 @@
from __future__ import print_function, division, absolute_import from __future__ import print_function, division, absolute_import
from xlte.amari.drb import _Sampler, Sample, _BitSync, tti, _IncStats from xlte.amari.drb import _Sampler, Sample, _BitSync, _BitSync1, _CTXBytesSplitter, _Utx, _UCtx, tti, _IncStats
import numpy as np import numpy as np
from golang import func from golang import func
# tSampler, UE, Etx and S provide infrastructure for testing _Sampler: # tSampler, UE, Etx, S and UCtx provide infrastructure for testing _Sampler:
# Etx represents transmission on erab with qci of tx_bytes. # Etx represents transmission on erab with qci of tx_bytes.
class Etx: class Etx:
...@@ -37,7 +37,7 @@ class Etx: ...@@ -37,7 +37,7 @@ class Etx:
# UE represents one entry about an UE in ue_get[stats].ue_list . # UE represents one entry about an UE in ue_get[stats].ue_list .
class UE: class UE:
def __init__(ue, ue_id, tx, retx, *etxv, ri=1): def __init__(ue, ue_id, tx, retx, *etxv, ri=1, bitrate=None):
for _ in etxv: for _ in etxv:
assert isinstance(_, Etx) assert isinstance(_, Etx)
ue.ue_id = ue_id ue.ue_id = ue_id
...@@ -45,6 +45,7 @@ class UE: ...@@ -45,6 +45,7 @@ class UE:
ue.retx = retx ue.retx = retx
ue.etxv = etxv ue.etxv = etxv
ue.ri = ri ue.ri = ri
ue.bitrate = bitrate if bitrate is not None else tx*1000
# tSampler provides testing environment for _Sampler. # tSampler provides testing environment for _Sampler.
# #
...@@ -104,6 +105,7 @@ class _tUEstats: ...@@ -104,6 +105,7 @@ class _tUEstats:
'ri': ue.ri, 'ri': ue.ri,
'zz_tx': ue.tx, 'zz_tx': ue.tx,
'zz_retx': ue.retx, 'zz_retx': ue.retx,
'zz_bitrate': ue.bitrate,
} }
], ],
'erab_list': erab_list, 'erab_list': erab_list,
...@@ -148,8 +150,22 @@ def S(tx_bytes, tx_time_tti): ...@@ -148,8 +150,22 @@ def S(tx_bytes, tx_time_tti):
return s return s
# UCtx is shortcut to create _UCtx.
def UCtx(tx, bitrate, rank, xl_use_avg):
uc = _UCtx()
uc.tx = tx
uc.retx = 0
uc.bitrate = bitrate
uc.rank = rank
uc.xl_use_avg = xl_use_avg
uc.tx_bytes = None
return uc
# -------- tests -------- # -------- tests --------
# TODO verify Sampler/multicell.
# test_Sampler1 verifies Sampler on single erab/qci flows. # test_Sampler1 verifies Sampler on single erab/qci flows.
def test_Sampler1(): def test_Sampler1():
# _ constructs tSampler, feeds tx stats into it and returns yielded Samples. # _ constructs tSampler, feeds tx stats into it and returns yielded Samples.
...@@ -252,7 +268,7 @@ def test_Sampler1(): ...@@ -252,7 +268,7 @@ def test_Sampler1():
# 0 10 # 0 10
assert _((10, 1000, 0, 0)) == [S(1000, (1,10))] assert _((10, 1000, 0, 0)) == [S(1000, (1,10))]
# bitsync lightly (BitSync itself is verified in details in test_BitSync) # bitsync lightly (BitSync itself is verified in details in test_BitSync*)
def b(*btx_statsv): def b(*btx_statsv):
tx_statsv = [] tx_statsv = []
for (tx_bytes, tx) in btx_statsv: # note: no δt_tti, #retx for (tx_bytes, tx) in btx_statsv: # note: no δt_tti, #retx
...@@ -352,36 +368,71 @@ def test_Sampler_rank(): ...@@ -352,36 +368,71 @@ def test_Sampler_rank():
assert t.get() == {4: [S(1000,3)]} # tx_time=3, not 1.5 assert t.get() == {4: [S(1000,3)]} # tx_time=3, not 1.5
# verify _BitSync works ok. # verify _BitSync with 1 cell.
def test_BitSync(): # this also verifies _BitSync1.
# _ passes txv_in into _BitSync and returns output stream. def test_BitSync1():
# _ passes txv_in into _BitSync1 and returns output stream.
# it also verifies that the result is the same when passed through _BitSync with 1 cell.
# #
# txv_in = [](tx_bytes, #tx) ; δt=10·tti # txv_in = [](tx_bytes, #tx) ; δt=10·tti
def _(*txv_in): def _(*txv_in):
def do_bitsync(*txv_in): def do_bitsync1(*txv_in):
txv_out = [] txv_out = []
xv_out = '' xv_out = []
bitsync1 = _BitSync1()
bitsync = _BitSync() bitsync = _BitSync()
for x, (tx_bytes, tx) in enumerate(txv_in):
_ = bitsync.next(10*tti, tx_bytes, tx,
chr(ord('a')+x))
for (δt, tx_bytes, tx, x_) in _:
assert δt == 10*tti
txv_out.append((tx_bytes, tx))
xv_out += x_
_ = bitsync.finish() # bitsync queue depth is more than queue depth of bitsync1 because
for (δt, tx_bytes, tx, x_) in _: # of _CTXBytesSplitter prefilter. Due to that we can only compare
assert δt == 10*tti # the overall yielded results, not results of each .next and .finish .
txv_out.append((tx_bytes, tx))
xv_out += x_ # ibitsync* yield data generated by bitsync* output.
def ibitsync1():
for tx_bytes, tx in txv_in:
yield from bitsync1.next(tx_bytes, tx)
yield from bitsync1.finish()
def ibitsync():
for bitrate, (tx_bytes, tx) in enumerate(txv_in):
u = _Utx()
u.qtx_bytes = None # bitsync itself does not use .qtx_bytes
u.cutx = {1: UCtx(tx, bitrate, 1, 0.1)}
yield from bitsync .next(10*tti, tx_bytes, u)
yield from bitsync.finish()
# ibitsync_checksame verifies that results of .next+.finish of bitsync1
# and bitsync match each other and yields that result.
def ibitsync_checksame(_1, _): # -> i[](tx_bytes, tx, uc)
_1 = list(_1)
_ = list(_)
assert len(_) == len(_1)
for i in range(len(_1)):
tx_bytes1, tx1 = _1[i]
δt, tx_bytes, u_ = _[i]
assert δt == 10*tti
assert tx_bytes == tx_bytes1
assert len(u_.cutx) == 1
assert list(u_.cutx.keys()) == [1]
uc_ = u_.cutx[1]
assert uc_.tx == tx1
assert uc_.retx == 0
assert uc_.rank == 1
assert uc_.xl_use_avg == 0.1
yield (tx_bytes1, tx1, uc_)
for (tx_bytes_, tx_, uc_) in ibitsync_checksame(ibitsync1(), ibitsync()):
txv_out.append((tx_bytes_, tx_))
xv_out .append(uc_.bitrate)
xv_out = ''.join(chr(ord('a')+_) for _ in xv_out)
assert xv_out == 'abcdefghijklmnopqrstuvwxyz'[:len(txv_in)] assert xv_out == 'abcdefghijklmnopqrstuvwxyz'[:len(txv_in)]
return txv_out return txv_out
txv_out = do_bitsync(*txv_in) txv_out = do_bitsync1(*txv_in)
# also check with 0-tail -> it should give the same # also check with 0-tail -> it should give the same
txv_out_ = do_bitsync(*(txv_in + ((0,0),)*10)) txv_out_ = do_bitsync1(*(txv_in + ((0,0),)*10))
assert txv_out_ == txv_out + [(0,0)]*10 assert txv_out_ == txv_out + [(0,0)]*10
return txv_out return txv_out
...@@ -558,6 +609,254 @@ def test_BitSync(): ...@@ -558,6 +609,254 @@ def test_BitSync():
( 0, 0 )] ( 0, 0 )]
# verify _BitSync with 2 cells.
def test_BitSync2():
# _ passes txv_in into _BitSync and returns output stream.
#
# txv_in = [](tx_bytes, #tx1,byterate1, #tx2, byterate2) ; δt=10·tti
def _(*txv_in):
def do_bitsync2(*txv_in):
txv_out = []
bitsync = _BitSync()
# Utx2 returns _Utx representing transmission on up to two cells.
def Utx2(tx1,byterate1, tx2,byterate2):
assert (tx1 is None) == (byterate1 is None)
assert (tx2 is None) == (byterate2 is None)
u = _Utx()
u.qtx_bytes = None # bitsync itself does not use .qtx_bytes
u.cutx = {}
if tx1 is not None:
u.cutx[1] = UCtx(tx1, 8*byterate1, 1, 0.1)
if tx2 is not None:
u.cutx[2] = UCtx(tx2, 8*byterate2, 2, 0.2)
return u
# b2iter yields result of bitsync .next/.finish in simplified form
# convenient for testing.
def b2iter(_): # -> i[](tx_bytes, tx1, tx2)
for (δt, tx_bytes, u) in _:
assert δt == 10*tti
assert set(u.cutx.keys()).issubset([1,2])
tx1 = None
tx2 = None
if 1 in u.cutx:
uc1 = u.cutx[1]
tx1 = uc1.tx
assert uc1.retx == 0
assert uc1.tx_bytes is not None
assert uc1.xl_use_avg in (0, 0.1)
assert uc1.rank == 1
if 2 in u.cutx:
uc2 = u.cutx[2]
tx2 = uc2.tx
assert uc2.retx == 0
assert uc2.tx_bytes is not None
assert uc2.xl_use_avg in (0, 0.2)
assert uc2.rank == 2 if uc2.xl_use_avg != 0 else 1
yield (tx_bytes, tx1, tx2)
for (tx_bytes, tx1, byterate1, tx2, byterate2) in txv_in:
_ = bitsync.next(10*tti, tx_bytes, Utx2(tx1,byterate1, tx2, byterate2))
txv_out += list(b2iter(_))
_ = bitsync.finish()
txv_out += list(b2iter(_))
return txv_out
txv_out = do_bitsync2(*txv_in)
# also check with 0-tail -> it should give the same
txv_out_ = do_bitsync2(*(txv_in + ((0, 0,0, 0,0),)*10))
assert txv_out_ == txv_out + [(0,0,0)]*10
return txv_out
# C1 C2
# tx_bytes tx,byterate tx,byterate
assert _((1000, 10,1000, 0, 0), # C1 C2
( 0, 0, 0, 0, 0), # tx_bytes tx tx
( 0, 0, 0, 0, 0)) == [(1000, 10, 0),
( 0, 0, 0),
( 0, 0, 0)]
assert _((1000, 0, 0, 10,1000),
( 0, 0, 0, 0, 0),
( 0, 0, 0, 0, 0)) == [(1000, 0, 10),
( 0, 0, 0),
( 0, 0, 0)]
assert _((2000, 10,1000, 10,1000),
( 0, 0, 0, 0, 0),
( 0, 0, 0, 0, 0)) == [(2000, 10, 10),
( 0, 0, 0),
( 0, 0, 0)]
assert _((2000, 0, 0, 10,1000), # all C1 ACK in next frame
( 0, 10,1000, 0, 0), # all C2 ACK in the same frame
( 0, 0, 0, 0, 0)) == [(2000, 10, 10),
( 0, 0, 0),
( 0, 0, 0)]
assert _((2000, 2, 200, 10,1000), # some C1 ACK in the same frame, some in next
( 0, 8, 800, 0, 0), # all C2 ACK in the same frame
( 0, 0, 0, 0, 0)) == [(2000, 10, 10),
( 0, 0, 0),
( 0, 0, 0)]
assert _(( 100, 1, 100, None,None), # C2 appears after C1
(2000, 10,1000, 10,1000), # NOTE tx2₀ ≠ None because _BitSync1 queue depth is 2
( 0, 0, 0, 0, 0), # and when new cell appears its _BitSync1 is prefed
( 0, 0, 0, 0, 0)) == [( 100, 1, 0), # zeros to align with
(2000, 10, 10), # other cells
( 0, 0, 0),
( 0, 0, 0)]
assert _(( 100, 1, 100, None,None), # C2 appears @ C1+2
( 200, 2, 200, None,None),
(2000, 10,1000, 10,1000),
( 0, 0, 0, 0, 0),
( 0, 0, 0, 0, 0)) == [( 100, 1, 0), # NOTE tx2₀ ≠ None
( 200, 2, 0), # NOTE tx2₁ ≠ None
(2000, 10, 10),
( 0, 0, 0),
( 0, 0, 0)]
assert _(( 100, 1, 100, None,None), # C2 appears @ C1+3
( 200, 2, 200, None,None),
( 300, 3, 300, None,None),
(2000, 10,1000, 10,1000),
( 0, 0, 0, 0, 0),
( 0, 0, 0, 0, 0)) == [( 100, 1, None), # NOTE tx2₀ = None
( 200, 2, 0),
( 300, 3, 0),
(2000, 10, 10),
( 0, 0, 0),
( 0, 0, 0)]
assert _((2000, 10,1000, 10,1000), # C2 disappears
( 0, 0, 0, None,None),
( 100, 1, 100, None,None),
( 200, 2, 200, None,None),
( 0, 0, 0, None,None),
( 0, 0, 0, None,None)) == [(2000, 10, 10),
( 0, 0, 0), # NOTE tx2 stays 0
( 100, 1, 0), # until reset
( 200, 2, 0),
( 0, 0, 0),
( 0, 0, 0)]
# verify how tx_bytes is partitioned in between cells by _BitSync.
def test_CTXBytesSplitter():
# _ passes txv_in into _CTXBytesSplitter and returns output stream.
#
# txv_in = [](tx_bytes, byterate1, byterate2)
def _(*txv_in):
def _do_txsplit(*txv_in):
txv_out = []
txsplit = _CTXBytesSplitter()
# Utx2 returns _Utx representing transmission on up to two cells.
def Utx2(byterate1, byterate2):
u = _Utx()
u.qtx_bytes = None # not used by _CTXBytesSplitter
u.cutx = {}
if byterate1 is not None:
u.cutx[1] = UCtx(None, 8*byterate1, None, None)
if byterate2 is not None:
u.cutx[2] = UCtx(None, 8*byterate2, None, None)
return u
# t2iter yields result of txsplit .next/.finish in simplified form
# convenient for testing.
def t2iter(_): # -> i[](tx_bytes1, tx_bytes2)
for (δt, u) in _:
assert δt == 10*tti
assert set(u.cutx.keys()).issubset([1,2])
tx_bytes1 = None
tx_bytes2 = None
if 1 in u.cutx:
tx_bytes1 = u.cutx[1].tx_bytes
if 2 in u.cutx:
tx_bytes2 = u.cutx[2].tx_bytes
yield (tx_bytes1, tx_bytes2)
for (tx_bytes, byterate1, byterate2) in txv_in:
_ = txsplit.next(10*tti, tx_bytes, Utx2(byterate1, byterate2))
txv_out += list(t2iter(_))
_ = txsplit.finish()
txv_out += list(t2iter(_))
return txv_out
def do_txsplit(*txv_in):
txv_out = _do_txsplit(*txv_in)
# verify the output is symmetrical in between C1 and C2
xtv_in = list((t, b2, b1) for (t, b1, b2) in txv_in)
xtv_out = _do_txsplit(*xtv_in)
xtv_out_ = list((t1, t2) for (t2, t1) in xtv_out)
assert xtv_out_ == txv_out
return txv_out
txv_out = do_txsplit(*txv_in)
# also check with 0-tail -> it should give the same
txv_out_ = do_txsplit(*(txv_in + ((0,0,0),)*10))
assert txv_out_ == txv_out + [(0,0)]*10
return txv_out
# C1 C2 C1 C2
# tx_bytes byterate byterate tx_bytes tx_bytes
# (1 element only)
assert _((1000, 1000, None)) == [(1000, None)] # identity for 1 cell
assert _((1000, 1000, 0)) == [(1000, 0)] # C2.bitrate = 0
assert _((1000, 0, 0)) == [( 500, 500)] # ΣC.bitrate = 0 -> divided equally
# (≥ 2 elements - tests queuing)
assert _((1000, 1000, None), # identity for 1 cell
(2000, 2000, None)) == [(1000, None),
(2000, None)]
assert _((1000, 1000, None), # C2 appears
(2000, 1500, 500),
(2000, 1500, 500),
(2000, 500, 1500)) == [(1000, None),
(1500, 500),
(1000, 1000),
( 500, 1500)]
assert _((2000, 1000, 1000), # C2 disappears
(2000, 1500, 500),
(1000, 500, None),
(1000, 1000, None)) == [(1250, 750),
(1600, 400),
(1000, None),
(1000, None)]
assert _((2000, 0, 0), # ΣC.bitrate = 0
(2000, 0, 0),
(1000, 0, 0),
(1000, 0, 0)) == [(1000, 1000),
(1000, 1000),
( 500, 500),
( 500, 500)]
assert _((2000, 1, 0), # C2.bitrate = 0
(2000, 1, 0),
(1000, 1, 0),
(1000, 1, 0)) == [(2000, 0),
(2000, 0),
(1000, 0),
(1000, 0)]
# ---- misc ---- # ---- misc ----
# teach tests to compare Samples # teach tests to compare Samples
......
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# Copyright (C) 2022-2023 Nexedi SA and Contributors. # Copyright (C) 2022-2024 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com> # Kirill Smelkov <kirr@nexedi.com>
# #
# This program is free software: you can Use, Study, Modify and Redistribute # This program is free software: you can Use, Study, Modify and Redistribute
...@@ -225,11 +225,12 @@ def _handle_stats(logm, stats: xlog.Message, m_prev: kpi.Measurement): ...@@ -225,11 +225,12 @@ def _handle_stats(logm, stats: xlog.Message, m_prev: kpi.Measurement):
# preserve statistical properties, but not more. See m_initfini below for # preserve statistical properties, but not more. See m_initfini below for
# details. # details.
# #
# - it is possible to handle eNB with single cell only. This limitation # - it is not easy to produce per-cell measurements. This limitation
# comes from the fact that in Amarisoft LTE stack S1-related counters # comes from the fact that in Amarisoft LTE stack S1-related counters
# come as "globals" ones, while e.g. RRC-related counters are "per-cell". # come as "globals" ones, while e.g. RRC-related counters are "per-cell".
# It is thus not possible to see how much S1 connection establishments # It is thus hard to see how much S1 connection establishments are associated
# are associated with one particular cell if there are several of them. # with one particular cell if there are several of them. One S1 connection could
# be even related to multiple cells simultaneously when carriers are aggregated.
# #
# TODO also parse enb.log to fix those issues. # TODO also parse enb.log to fix those issues.
...@@ -259,7 +260,7 @@ def _handle_stats(logm, stats: xlog.Message, m_prev: kpi.Measurement): ...@@ -259,7 +260,7 @@ def _handle_stats(logm, stats: xlog.Message, m_prev: kpi.Measurement):
# do init/fini correction if there was also third preceding stats message. # do init/fini correction if there was also third preceding stats message.
m = logm._m.copy() # [stats_prev, stats) m = logm._m.copy() # [stats_prev, stats)
# δcc(counter) tells how specified cumulative counter changed since last stats result. # δcc(counter) tells how specified global cumulative counter changed since last stats result.
def δcc(counter): def δcc(counter):
old = _stats_cc(stats_prev, counter) old = _stats_cc(stats_prev, counter)
new = _stats_cc(stats, counter) new = _stats_cc(stats, counter)
...@@ -267,6 +268,14 @@ def _handle_stats(logm, stats: xlog.Message, m_prev: kpi.Measurement): ...@@ -267,6 +268,14 @@ def _handle_stats(logm, stats: xlog.Message, m_prev: kpi.Measurement):
raise LogError(stats.timestamp, "cc %s↓ (%s → %s)" % (counter, old, new)) raise LogError(stats.timestamp, "cc %s↓ (%s → %s)" % (counter, old, new))
return new - old return new - old
# δcell_cc(counter) tells how specified per-cell cumulative counter changed since last stats result.
def δcell_cc(cell, counter):
old = _stats_cell_cc(stats_prev, cell, counter)
new = _stats_cell_cc(stats, cell, counter)
if new < old:
raise LogError(stats.timestamp, "cc C%s.%s↓ (%s → %s)" % (cell, counter, old, new))
return new - old
# m_initfini populates m[init] and m[fini] from vinit and vfini values. # m_initfini populates m[init] and m[fini] from vinit and vfini values.
# copy of previous ._m[fini] is correspondingly adjusted for init/fini correction. # copy of previous ._m[fini] is correspondingly adjusted for init/fini correction.
p = None p = None
...@@ -297,9 +306,24 @@ def _handle_stats(logm, stats: xlog.Message, m_prev: kpi.Measurement): ...@@ -297,9 +306,24 @@ def _handle_stats(logm, stats: xlog.Message, m_prev: kpi.Measurement):
# any logic error in data will be reported via LogError. # any logic error in data will be reported via LogError.
try: try:
# RRC: connection establishment # RRC: connection establishment
#
# Aggregate statistics for all cells because in E-RAB Accessibility we need
# aggregated RRC.ConnEstab* for whole eNB. It would be more logical to emit
# per-cell RRC statistics here and aggregate the result in KPI computation
# routine, but for now we are not delving to rework kpi.Measurement to
# contain per-cell values. For E-RAB Accessibility the end result is the
# same whether we do aggregation here or in kpi.Calc.erab_accessibility().
#
# TODO rework to emit per-cell measurements when/if we need per-cell KPIs
cells = set(stats['cells'].keys()) # NOTE cells are taken only from stats, not from stat_prev
δΣcell_rrc_connection_request = 0 # (if a cell disappears its counters stop to be accounted)
δΣcell_rrc_connection_setup_complete = 0
for cell in cells:
δΣcell_rrc_connection_request += δcell_cc(cell, 'rrc_connection_request')
δΣcell_rrc_connection_setup_complete += δcell_cc(cell, 'rrc_connection_setup_complete')
m_initfini( m_initfini(
'RRC.ConnEstabAtt.sum', δcc('rrc_connection_request'), 'RRC.ConnEstabAtt.sum', δΣcell_rrc_connection_request,
'RRC.ConnEstabSucc.sum', δcc('rrc_connection_setup_complete')) 'RRC.ConnEstabSucc.sum', δΣcell_rrc_connection_setup_complete)
# S1: connection establishment # S1: connection establishment
m_initfini( m_initfini(
...@@ -334,37 +358,28 @@ def _handle_stats(logm, stats: xlog.Message, m_prev: kpi.Measurement): ...@@ -334,37 +358,28 @@ def _handle_stats(logm, stats: xlog.Message, m_prev: kpi.Measurement):
# _stats_check verifies stats message to have required structure. # _stats_check verifies stats message to have required structure.
#
# only configurations with one single cell are supported.
# ( because else it would not be clear to which cell to associate e.g. global
# counters for S1 messages )
def _stats_check(stats: xlog.Message): def _stats_check(stats: xlog.Message):
cells = stats['cells']
if len(cells) != 1:
raise LogError(stats.timestamp, "stats describes %d cells; but only single-cell configurations are supported" % len(cells))
cellname = list(cells.keys())[0]
try: try:
stats.get1("counters", dict).get1("messages", dict) stats.get1("counters", dict).get1("messages", dict)
stats.get1("cells", dict).get1(cellname, dict).get1("counters", dict).get1("messages", dict) cells = stats.get1("cells", dict)
for cell in cells:
cells.get1(cell, dict).get1("counters", dict).get1("messages", dict)
except Exception as e: except Exception as e:
raise LogError(stats.timestamp, "stats: %s" % e) from None raise LogError(stats.timestamp, "stats: %s" % e) from None
return return
# _stats_cc returns specified cumulative counter from stats result. # _stats_cc returns specified global cumulative counter from stats result.
# #
# counter may be both "global" or "per-cell".
# stats is assumed to be already verified by _stats_check. # stats is assumed to be already verified by _stats_check.
def _stats_cc(stats: xlog.Message, counter: str): def _stats_cc(stats: xlog.Message, counter: str):
cells = stats['cells'] return stats['counters']['messages'].get(counter, 0)
cell = list(cells.values())[0]
if counter.startswith("rrc_"):
cc_dict = cell ['counters']
else:
cc_dict = stats['counters']
return cc_dict['messages'].get(counter, 0) # _stats_cell_cc is like _stats_cc but returns specified per-cell cumulative counter from stats result.
def _stats_cell_cc(stats: xlog.Message, cell: str, counter: str):
_ = stats['cells'].get(cell)
if _ is None:
return 0 # cell is absent in this stats
return _['counters']['messages'].get(counter, 0)
# _handle_drb_stats handles next x.drb_stats xlog entry upon _read request. # _handle_drb_stats handles next x.drb_stats xlog entry upon _read request.
...@@ -465,9 +480,9 @@ def _drb_update(m: kpi.Measurement, drb_stats: xlog.Message): ...@@ -465,9 +480,9 @@ def _drb_update(m: kpi.Measurement, drb_stats: xlog.Message):
ΣT_hi = ΣT + ΣT_err ΣT_hi = ΣT + ΣT_err
ΣTT_lo = ΣTT - ΣTT_err ΣTT_lo = ΣTT - ΣTT_err
qvol[qci] = 8*ΣB # in bits qvol[qci] += 8*ΣB # in bits
qtime[qci] = (ΣT_hi + ΣTT_lo) / 2 qtime[qci] += (ΣT_hi + ΣTT_lo) / 2
qtime_err[qci] = (ΣT_hi - ΣTT_lo) / 2 qtime_err[qci] += (ΣT_hi - ΣTT_lo) / 2
# LogError(timestamp|None, *argv). # LogError(timestamp|None, *argv).
......
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# Copyright (C) 2022-2023 Nexedi SA and Contributors. # Copyright (C) 2022-2024 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com> # Kirill Smelkov <kirr@nexedi.com>
# #
# This program is free software: you can Use, Study, Modify and Redistribute # This program is free software: you can Use, Study, Modify and Redistribute
...@@ -104,12 +104,13 @@ class tLogMeasure: ...@@ -104,12 +104,13 @@ class tLogMeasure:
# also automatically initialize XXX.DRB.IPTimeX_err to 0.01 upon seeing DRB.IPTimeX # also automatically initialize XXX.DRB.IPTimeX_err to 0.01 upon seeing DRB.IPTimeX
# ( in tests we use precise values for tx_time and tx_time_notailtti # ( in tests we use precise values for tx_time and tx_time_notailtti
# with δ=0.02 - see drb_trx and jdrb_stats) # with δ=0.02 - see drb_trx and jdrb_stats)
# this pre-initialization is correct only if there was single x.drb_stats message.
n = _.group(1) n = _.group(1)
if n.startswith('DRB.IPTime'): if n.startswith('DRB.IPTime'):
ferr = "XXX.%s_err" % n ferr = "XXX.%s_err" % n
if isNA(t._mok[ferr+'.QCI']).all(): if isNA(t._mok[ferr+'.QCI']).all():
t._mok[ferr+'.QCI'][:] = 0 t._mok[ferr+'.QCI'][:] = 0
t._mok["%s.%s" % (ferr, _.group(2))] = ((vok + 0.01) - (vok - 0.01)) / 2 # ≈ 0.01 t._mok["%s.%s" % (ferr, _.group(2))] = drb_terr(vok)
t._mok[field] = vok t._mok[field] = vok
...@@ -244,28 +245,28 @@ def test_LogMeasure(): ...@@ -244,28 +245,28 @@ def test_LogMeasure():
# init 0 3 2 5 0 # init 0 3 2 5 0
# fini ø ←─── 2 1←─── 2←─── 4←─── 1 # fini ø ←─── 2 1←─── 2←─── 4←─── 1
# fini' 0 3 ² 2 ² 3 ¹ 0 # fini' 0 3 ² 2 ² 3 ¹ 0
tstats({'rrc_connection_request': 0, tstats({'C1.rrc_connection_request': 0,
'rrc_connection_setup_complete': 2}) # completions for previous uncovered period 'C1.rrc_connection_setup_complete': 2}) # completions for previous uncovered period
_('RRC.ConnEstabAtt.sum', 0) _('RRC.ConnEstabAtt.sum', 0)
_('RRC.ConnEstabSucc.sum', 0) # not 2 _('RRC.ConnEstabSucc.sum', 0) # not 2
# p2 # p2
tstats({'rrc_connection_request': 0 +3, # 3 new initiations tstats({'C1.rrc_connection_request': 0 +3, # 3 new initiations
'rrc_connection_setup_complete': 2 +1}) # 1 new completion 'C1.rrc_connection_setup_complete': 2 +1}) # 1 new completion
_('RRC.ConnEstabAtt.sum', 3) _('RRC.ConnEstabAtt.sum', 3)
_('RRC.ConnEstabSucc.sum', 3) # not 1 _('RRC.ConnEstabSucc.sum', 3) # not 1
# p3 # p3
tstats({'rrc_connection_request': 0+3 +2, # 2 new initiations tstats({'C1.rrc_connection_request': 0+3 +2, # 2 new initiations
'rrc_connection_setup_complete': 2+1 +2}) # 2 completions for p2 'C1.rrc_connection_setup_complete': 2+1 +2}) # 2 completions for p2
_('RRC.ConnEstabAtt.sum', 2) _('RRC.ConnEstabAtt.sum', 2)
_('RRC.ConnEstabSucc.sum', 2) # 2, but it is 2 - 2(for_p2) + 2(from_p4) _('RRC.ConnEstabSucc.sum', 2) # 2, but it is 2 - 2(for_p2) + 2(from_p4)
# p4 # p4
tstats({'rrc_connection_request': 0+3+2 +5, # 5 new initiations tstats({'C1.rrc_connection_request': 0+3+2 +5, # 5 new initiations
'rrc_connection_setup_complete': 2+1+2 +4}) # 2 completions for p3 + 2 new 'C1.rrc_connection_setup_complete': 2+1+2 +4}) # 2 completions for p3 + 2 new
_('RRC.ConnEstabAtt.sum', 5) _('RRC.ConnEstabAtt.sum', 5)
_('RRC.ConnEstabSucc.sum', 3) _('RRC.ConnEstabSucc.sum', 3)
# p5 # p5
tstats({'rrc_connection_request': 0+3+2+5 +0, # no new initiations tstats({'C1.rrc_connection_request': 0+3+2+5 +0, # no new initiations
'rrc_connection_setup_complete': 2+1+2+4 +1}) # 1 completion for p4 'C1.rrc_connection_setup_complete': 2+1+2+4 +1}) # 1 completion for p4
_('RRC.ConnEstabAtt.sum', 0) _('RRC.ConnEstabAtt.sum', 0)
_('RRC.ConnEstabSucc.sum', 0) _('RRC.ConnEstabSucc.sum', 0)
...@@ -361,10 +362,19 @@ def test_LogMeasure(): ...@@ -361,10 +362,19 @@ def test_LogMeasure():
_('ERAB.EstabInitAttNbr.sum', 3) # currently same as S1SIG.ConnEstab _('ERAB.EstabInitAttNbr.sum', 3) # currently same as S1SIG.ConnEstab
_('ERAB.EstabInitSuccNbr.sum', 2) # ----//---- _('ERAB.EstabInitSuccNbr.sum', 2) # ----//----
tdrb_stats(+0.5, {9: drb_trx(9.1,91, 9.2,92)}) # multiple d are accumulated
tdrb_stats(+0.6, {9: drb_trx(0.2, 2, 1.2,12)}) # ─d·S──ddd─S──
tdrb_stats(+0.7, {9: drb_trx(0.3, 3, 1.3,13)}) # cont↑
tδstats({})
_('DRB.IPTimeDl.9', 9.1+0.2+0.3); _('DRB.IPVolDl.9', 8*(91+2+3))
_('DRB.IPTimeUl.9', 9.2+1.2+1.3); _('DRB.IPVolUl.9', 8*(92+12+13))
t._mok['XXX.DRB.IPTimeDl_err.9'] = drb_terr(9.1) + drb_terr(0.2) + drb_terr(0.3)
t._mok['XXX.DRB.IPTimeUl_err.9'] = drb_terr(9.2) + drb_terr(1.2) + drb_terr(1.3)
# service detach/attach, connect failure, xlog failure # service detach/attach, connect failure, xlog failure
tδstats({}) # untie from previous history tδstats({}) # untie from previous history
i, f = 'rrc_connection_request', 'rrc_connection_setup_complete' i, f = 'C1.rrc_connection_request', 'C1.rrc_connection_setup_complete'
I, F = 'RRC.ConnEstabAtt.sum', 'RRC.ConnEstabSucc.sum' I, F = 'RRC.ConnEstabAtt.sum', 'RRC.ConnEstabSucc.sum'
tδstats({i:2, f:1}) tδstats({i:2, f:1})
...@@ -412,25 +422,54 @@ def test_LogMeasure(): ...@@ -412,25 +422,54 @@ def test_LogMeasure():
t.expect_nodata() t.expect_nodata()
# verify that only stats with single cell and expected structure are accepted. # multiple cells
# TODO emit per-cell measurements instead of accumulating all cells
tstats({})
t.expect_nodata()
tstats({})
_('RRC.ConnEstabAtt.sum', 0)
_('RRC.ConnEstabSucc.sum', 0)
# C1 appears
tstats({'C1.rrc_connection_request': 12, 'C1.rrc_connection_setup_complete': 11})
_('RRC.ConnEstabAtt.sum', 12)
_('RRC.ConnEstabSucc.sum', 11+1)
# C2 appears
tstats({'C1.rrc_connection_request': 12+3, 'C1.rrc_connection_setup_complete': 11+3,
'C2.rrc_connection_request': 22, 'C2.rrc_connection_setup_complete': 21})
_('RRC.ConnEstabAtt.sum', 3+22)
_('RRC.ConnEstabSucc.sum', -1+3+21+2)
# C1 and C2 stays
tstats({'C1.rrc_connection_request': 12+3+3, 'C1.rrc_connection_setup_complete': 11+3+3,
'C2.rrc_connection_request': 22+4, 'C2.rrc_connection_setup_complete': 21+4})
_('RRC.ConnEstabAtt.sum', 3+4)
_('RRC.ConnEstabSucc.sum', -2+3+4+2)
# C1 disappears
tstats({'C2.rrc_connection_request': 22+4+4, 'C2.rrc_connection_setup_complete': 21+4+4})
_('RRC.ConnEstabAtt.sum', 4)
_('RRC.ConnEstabSucc.sum', 4-2)
# C2 disappears
tstats({})
_('RRC.ConnEstabAtt.sum', 0)
_('RRC.ConnEstabSucc.sum', 0)
tevent("service detach")
t.expect_nodata()
# verify that only stats with expected structure are accepted.
@func @func
def test_LogMeasure_badinput(): def test_LogMeasure_badinput():
t = tLogMeasure() t = tLogMeasure()
defer(t.close) defer(t.close)
_ = t.expect1 _ = t.expect1
cc = 'rrc_connection_request' cc = 'C1.rrc_connection_request'
CC = 'RRC.ConnEstabAtt.sum' CC = 'RRC.ConnEstabAtt.sum'
# initial ok entries # initial ok entries
t.xlog( jstats(1, {}) ) t.xlog( jstats(1, {}) )
t.xlog( jstats(2, {cc: 2}) ) t.xlog( jstats(2, {cc: 2}) )
t.xlog( jstats(3, {cc: 2+3}) ) t.xlog( jstats(3, {cc: 2+3}) )
# bad: not single cell
t.xlog('{"message":"stats", "utc":11, "cells": {}}')
t.xlog('{"message":"stats", "utc":12, "cells": {}}')
t.xlog('{"message":"stats", "utc":13, "cells": {"a": {}, "b": {}}}')
t.xlog('{"message":"stats", "utc":14, "cells": {"a": {}, "b": {}, "c": {}}}')
# bad: no counters # bad: no counters
t.xlog('{"message":"stats", "utc":21, "counters": {"messages": {}}, "cells": {"1": {}}}') t.xlog('{"message":"stats", "utc":21, "counters": {"messages": {}}, "cells": {"1": {}}}')
t.xlog('{"message":"stats", "utc":22, "counters": {"messages": {}}, "cells": {"1": {"counters": {}}}}') t.xlog('{"message":"stats", "utc":22, "counters": {"messages": {}}, "cells": {"1": {"counters": {}}}}')
...@@ -466,31 +505,18 @@ def test_LogMeasure_badinput(): ...@@ -466,31 +505,18 @@ def test_LogMeasure_badinput():
read_nodata(0.02, 0.98) # attach-1 read_nodata(0.02, 0.98) # attach-1
readok(1, 2) # 1-2 readok(1, 2) # 1-2
readok(2, 3) # 2-3 readok(2, 3) # 2-3
read_nodata(3, 8) # 3-11 read_nodata(3, 18) # 3-21
def tbadcell(τ, ncell):
with raises(LogError, match="t%s: stats describes %d cells;" % (τ, ncell) +
" but only single-cell configurations are supported"):
t.read()
tbadcell(11, 0)
read_nodata(11, 1)
tbadcell(12, 0)
read_nodata(12, 1)
tbadcell(13, 2)
read_nodata(13, 1)
tbadcell(14, 3)
def tbadstats(τ, error): def tbadstats(τ, error):
with raises(LogError, match="t%s: stats: %s" % (τ, error)): with raises(LogError, match="t%s: stats: %s" % (τ, error)):
t.read() t.read()
read_nodata(14, 7) tbadstats(21, ":6/cells/1 no `counters`")
tbadstats(21, ":10/cells/1 no `counters`")
read_nodata(21, 1) read_nodata(21, 1)
tbadstats(22, ":11/cells/1/counters no `messages`") tbadstats(22, ":7/cells/1/counters no `messages`")
read_nodata(22, 1) read_nodata(22, 1)
tbadstats(23, ":12/ no `counters`") tbadstats(23, ":8/ no `counters`")
read_nodata(23, 1) read_nodata(23, 1)
tbadstats(24, ":13/counters no `messages`") tbadstats(24, ":9/counters no `messages`")
read_nodata(24, 7) read_nodata(24, 7)
readok(31, 5) # 31-32 readok(31, 5) # 31-32
...@@ -511,7 +537,7 @@ def test_LogMeasure_cc_wraparound(): ...@@ -511,7 +537,7 @@ def test_LogMeasure_cc_wraparound():
defer(t.close) defer(t.close)
_ = t.expect1 _ = t.expect1
cc = 'rrc_connection_request' cc = 'C1.rrc_connection_request'
CC = 'RRC.ConnEstabAtt.sum' CC = 'RRC.ConnEstabAtt.sum'
t.xlog( jstats(1, {}) ) t.xlog( jstats(1, {}) )
...@@ -545,7 +571,7 @@ def test_LogMeasure_sync(): ...@@ -545,7 +571,7 @@ def test_LogMeasure_sync():
defer(t.close) defer(t.close)
_ = t.expect1 _ = t.expect1
cc = 'rrc_connection_request' cc = 'C1.rrc_connection_request'
CC = 'RRC.ConnEstabAtt.sum' CC = 'RRC.ConnEstabAtt.sum'
t.xlog( jstats(1, {}) ) t.xlog( jstats(1, {}) )
...@@ -568,31 +594,47 @@ def test_LogMeasure_sync(): ...@@ -568,31 +594,47 @@ def test_LogMeasure_sync():
# jstats returns json-encoded stats message corresponding to counters dict. # jstats returns json-encoded stats message corresponding to counters dict.
#
# if a counter goes as "Cxxx.yyy" it is emitted as counter yyy of cell xxx in the output.
# τ goes directly to stats['utc'] as is. # τ goes directly to stats['utc'] as is.
def jstats(τ, counters): # -> str def jstats(τ, counters): # -> str
g_cc = {} # global g_cc = {} # global cumulative counters
cell_cc = {} # per-cell cells = {} # .cells
for cc, value in counters.items(): for cc, value in counters.items():
if cc.startswith("rrc_"): _ = re.match(r"^C([^.]+)\.(.+)$", cc)
cell_cc[cc] = value if _ is not None:
cell = _.group(1)
cc = _.group(2)
cells.setdefault(cell, {}) \
.setdefault("counters", {}) \
.setdefault("messages", {}) \
[cc] = value
else: else:
g_cc[cc] = value g_cc[cc] = value
s = { s = {
"message": "stats", "message": "stats",
"utc": τ, "utc": τ,
"cells": {"1": {"counters": {"messages": cell_cc}}}, "cells": cells,
"counters": {"messages": g_cc}, "counters": {"messages": g_cc},
} }
return json.dumps(s) return json.dumps(s)
def test_jstats(): def test_jstats():
assert jstats(0, {}) == '{"message": "stats", "utc": 0, "cells": {"1": {"counters": {"messages": {}}}}, "counters": {"messages": {}}}' assert jstats(0, {}) == '{"message": "stats", "utc": 0, "cells": {}, "counters": {"messages": {}}}'
assert jstats(123.4, {"rrc_x": 1, "s1_y": 2, "rrc_z": 3, "x2_zz": 4}) == \ assert jstats(123.4, {"C1.rrc_x": 1, "s1_y": 2, "C1.rrc_z": 3, "x2_zz": 4}) == \
'{"message": "stats", "utc": 123.4, "cells": {"1": {"counters": {"messages": {"rrc_x": 1, "rrc_z": 3}}}}, "counters": {"messages": {"s1_y": 2, "x2_zz": 4}}}' '{"message": "stats", "utc": 123.4, "cells": {"1": {"counters": {"messages": {"rrc_x": 1, "rrc_z": 3}}}}, "counters": {"messages": {"s1_y": 2, "x2_zz": 4}}}'
# multiple cells
assert jstats(432.1, {"C1.rrc_x": 11, "C2.rrc_y": 22, "C3.xyz": 33, "C1.abc": 111, "xyz": 44}) == \
'{"message": "stats", "utc": 432.1, "cells": {' + \
'"1": {"counters": {"messages": {"rrc_x": 11, "abc": 111}}}, ' + \
'"2": {"counters": {"messages": {"rrc_y": 22}}}, ' + \
'"3": {"counters": {"messages": {"xyz": 33}}}}, ' + \
'"counters": {"messages": {"xyz": 44}}}'
# jdrb_stats, similarly to jstats, returns json-encoded x.drb_stats message # jdrb_stats, similarly to jstats, returns json-encoded x.drb_stats message
# corresponding to per-QCI dl/ul tx_time/tx_bytes. # corresponding to per-QCI dl/ul tx_time/tx_bytes.
...@@ -603,7 +645,7 @@ def jdrb_stats(τ, qci_dlul): # -> str ...@@ -603,7 +645,7 @@ def jdrb_stats(τ, qci_dlul): # -> str
assert set(dlul.keys()) == {"dl_tx_bytes", "dl_tx_time", "dl_tx_time_notailtti", assert set(dlul.keys()) == {"dl_tx_bytes", "dl_tx_time", "dl_tx_time_notailtti",
"ul_tx_bytes", "ul_tx_time", "ul_tx_time_notailtti"} "ul_tx_bytes", "ul_tx_time", "ul_tx_time_notailtti"}
dlul["dl_tx_time_err"] = 0 # original time is simulated to be dlul["dl_tx_time_err"] = 0 # original time is simulated to be
dlul["ul_tx_time_err"] = 0 # measured precisely in tess. dlul["ul_tx_time_err"] = 0 # measured precisely in tests.
dlul["dl_tx_time_notailtti_err"] = 0 # ----//---- dlul["dl_tx_time_notailtti_err"] = 0 # ----//----
dlul["ul_tx_time_notailtti_err"] = 0 # dlul["ul_tx_time_notailtti_err"] = 0 #
...@@ -640,6 +682,14 @@ def drb_trx(dl_tx_time, dl_tx_bytes, ul_tx_time, ul_tx_bytes): ...@@ -640,6 +682,14 @@ def drb_trx(dl_tx_time, dl_tx_bytes, ul_tx_time, ul_tx_bytes):
"ul_tx_bytes": ul_tx_bytes, "ul_tx_time": ul_tx_time + 0.01, "ul_tx_time_notailtti": ul_tx_time - 0.01} "ul_tx_bytes": ul_tx_bytes, "ul_tx_time": ul_tx_time + 0.01, "ul_tx_time_notailtti": ul_tx_time - 0.01}
# drb_terr returns what XXX.DRB.IPTimeX_err should be for given DRB.IPTimeX
# it is ≈ 0.01 but due to loss of float-point precision is a bit different for
# particular value of time.
# ( in tests we use precise values for tx_time and tx_time_notailtti
# with δ=0.02 - see drb_trx and jdrb_stats)
def drb_terr(t):
return ((t + 0.01) - (t - 0.01)) / 2 # ≈ 0.01
# ionone returns empty data source. # ionone returns empty data source.
def ionone(): def ionone():
return io.BytesIO(b'') return io.BytesIO(b'')
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# Copyright (C) 2022-2023 Nexedi SA and Contributors. # Copyright (C) 2022-2024 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com> # Kirill Smelkov <kirr@nexedi.com>
# #
# This program is free software: you can Use, Study, Modify and Redistribute # This program is free software: you can Use, Study, Modify and Redistribute
...@@ -655,8 +655,6 @@ def eutran_ip_throughput(calc): # -> IPThp[QCI][dl,ul] ...@@ -655,8 +655,6 @@ def eutran_ip_throughput(calc): # -> IPThp[QCI][dl,ul]
qulΣte = np.zeros(nqci, dtype=np.float64) qulΣte = np.zeros(nqci, dtype=np.float64)
for m in calc._miter(): for m in calc._miter():
τ = m['X.δT']
for qci in range(nqci): for qci in range(nqci):
dl_vol = m["DRB.IPVolDl.QCI"] [qci] dl_vol = m["DRB.IPVolDl.QCI"] [qci]
dl_time = m["DRB.IPTimeDl.QCI"] [qci] dl_time = m["DRB.IPTimeDl.QCI"] [qci]
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment