Commit d102ffaa authored by Kirill Smelkov's avatar Kirill Smelkov

amari.drb: Start of the package

This package will be used to implement E-UTRAN IP Throughput KPI.

In hereby patch we add `drb.Sampler` that extracts samples of
transmission bursts from `ue_get[stats]` observations.

Let's go through what E-UTRAN IP Throughput KPI is and how it motivates
functionality provided by this patch.

Overview of E-UTRAN IP Throughput computation
---------------------------------------------

This KPI is defined in TS 32.450 [1] and aggregates transmission volume and
time over bursts of transmissions from an average UE point of view. It should be
particularly noted that only the time, during which transmission is going on,
should be accounted. For example if an UE receives 10KB over 4ms burst and the rest of
the time there is no transmission to it during, say, 1 minute, the downlink IP
Throughput for that UE over the minute is 20Mbit/s (= 8·10KB/4ms), not 1.3Kbit/s (= 8·10KB/60s).
This KPI basically shows what would be the speed to e.g. download a response for
HTTP request issued from a mobile.

[1] https://www.etsi.org/deliver/etsi_ts/132400_132499/132450/16.00.00_60/ts_132450v160000p.pdf#page=13

To compute IP Throughput we thus need to know Σ of transmitted amount
of bytes, and Σ of the time of all transmission bursts.

Σ of the bytes is relatively easy to get. eNB already provides close values in
overall `stats` and in per-UE `ue_get[stats]` messages. However there is no
anything readily available out-of-the box for Σ of bursts transmission time.
Thus we need to measure the time of transmission bursts ourselves somehow.

It turns out that with current state of things the only practical way to
measure it to some degree is to poll eNB frequently with `ue_get[stats]` and
estimate transmission time based on δ of `ue_get` timestamps.

Let's see how frequently we need to poll to get to reasonably accuracy of resulting throughput.

A common situation for HTTP requests issued via LTE is that response content
downloading time takes only few milliseconds. For example I used chromium
network profiler to access various sites via internet tethered from my phone
and saw that for many requests response content downloading time was e.g. 4ms,
5ms, 3.2ms, etc. The accuracy of measuring transmission time should be thus in
the order of millisecond to cover that properly. It makes a real difference for
reported throughput, if say a download sample with 10KB took 4ms, or it took
e.g. "something under 100ms". In the first case we know that for that sample
downlink throughput is 2500KB/s, while in the second case all we know is that
downlink throughput is "higher than 100KB/s" - a 25 times difference and not
certain. Similarly if we poll at 10ms rate we would get that throughput is "higher
than 1000KB/s" - a 2.5 times difference from actual value. The accuracy of 1
millisecond coincides with TTI time and with how downlink/uplink transmissions
generally work in LTE.

With the above the scheme to compute IP Throughput looks to be as
follows: poll eNB at 1000Hz rate for `ue_get[stats]`, process retrieved
information into per-UE and per-QCI streams, detect bursts on each UE/QCI pair,
and aggregate `tx_bytes` and `tx_time` from every burst.

It looks to be straightforward, but 1000Hz polling will likely create
non-negligible additional load on the system and disturb eNB itself
introducing much jitter and harming its latency requirements. That's probably
why eNB actually rate-limits WebSocket requests not to go higher than 100Hz -
the frequency 10 times less compared to what we need to get to reasonable
accuracy for IP throughput.

Fortunately there is additional information that provides a way to improve
accuracy of measured `tx_time` even when polled every 10ms at 100Hz rate:
that additional information is the number of transmitted transport blocks to/from
an UE. If we know that during 10ms frame it was e.g. 4 transport blocks transmitted
to the UE, that there were no retransmissions *and* that eNB is not congested, we can
reasonably estimate that it was actually a 4ms transmission. And if eNB is
congested we can still say that transmission time is somewhere in `[4ms, 10ms]`
interval because transmitting each transport block takes 1 TTI. Even if
imprecise that still provides some information that could be useful.

Also 100Hz polling turns to be acceptable from performance point of view and
does not disturb the system much. For example on the callbox machine the process,
that issues polls, takes only about 3% of CPU load and only on one core, and
the CPU usage of eNB does not practically change and its reported tx/rx latency
does not change as well. For sure, there is some disturbance, but it appears to
be small. To have a better idea of what rate of polling is possible, I've made
an experiment with the poller accessing my own websocket echo server quickly
implemented in python. Both the poller and the echo server are not optimized,
but without rate-limiting they could go to 8000Hz frequency with reaching 100%
CPU usage of one CPU core. That 8000Hz is 80x times more compared to 100Hz
frequency actually allowed by eNB. This shows what kind of polling
frequency limit the system can handle, if absolutely needed, and that 100Hz
turns out to be not so high a frequency. Also the Linux 5.6 kernel, installed
on the callbox from Fedora32, is configured with `CONFIG_HZ=1000`, which is
likely helping here.

Implementation overview
~~~~~~~~~~~~~~~~~~~~~~~

The scheme to compute E-UTRAN IP Throughput is thus as follows: poll eNB at
100Hz frequency for `ue_get[stats]` and retrieve information about per-UE/QCI
streams and the number of transport blocks dl/ul-ed to the UE in question
during that 10ms frame. Estimate `tx_time` taking into account
the number of transmitted transport blocks. And estimate whether eNB is congested or
not based on `dl_use_avg`/`ul_use_avg` taken from `stats`. For the latter we
also need to poll for `stats` at 100Hz frequency and synchronize
`ue_get[stats]` and `stats` requests in time so that they both cover the same
time interval of particular frame.

Then organize the polling process to provide aggregated statistics in the form of
new `x.drb_stats` message, and teach `xamari xlog` to save that messages to
`enb.xlog` together with `stats`.  Then further adjust `amari.kpi.LogMeasure`
and generic `kpi.Measurement` and `kpi.Calc` to handle DRB-related data.

----------------------------------------

In this patch we provide first building block - `Sampler` that extracts bursts
of data transmissions from stream of `ue_get[stats]` observations.

Even though main idea behind `Sampler` is relatively straightforward, several
aspects deserves to be noted:

1. information about transmitted bytes and corresponding transmitted transport
   blocks is emitted by eNB not synchronized in time. The reason here is that,
   for example, for DL a block is transmitted via PDCCH+PDSCH during one TTI, and
   then the base station awaits HARQ ACK/NACK. That ACK/NACK comes later via
   PUCCH or PUSCH. The time window in between original transmission and
   reception of the ACK/NACK is 4 TTIs for FDD and 4-13 TTIs for TDD (*).
   And Amarisoft LTEENB updates counters for dl_total_bytes and dl_tx at
   different times:

       ue.erab.dl_total_bytes      - right after sending data on  PDCCH+PDSCH
       ue.cell.{dl_tx,dl_retx}     - after receiving ACK/NACK via PUCCH|PUSCH

   this way an update to dl_total_bytes might be seen in one frame (= 10·TTI),
   while corresponding update to dl_tx/dl_retx might be seen in either same, or
   next, or next-next frame.

   We bring `δ(tx_bytes)` and `#tx_tb` in sync ourselves via _BitSync.

   (*) see e.g. Figure 8.1 in "An introduction to LTE, 2nd ed."

2. when we see multiple transmissions related to UE on different QCIs, we
   cannot directly use corresponding number of transport blocks to estimate
   transmissions times because we do not know how eNB scheduler placed those
   transmissions onto resource map. So without additional information we can only
   estimate corresponding lower and upper bounds.
parent 79d10eb9
......@@ -5,6 +5,7 @@
XLTE repository provides assorted tools and packages with functionality related to LTE:
- `kpi` - process measurements and compute KPIs from them.
- `amari.drb` - infrastructure to process flows on data radio bearers.
- `amari.kpi` - driver for Amarisoft LTE stack to retrieve KPI-related measurements from logs.
- `amari.xlog` - extra logging facilities for Amarisoft LTE stack.
- `xamari` - supplementary tool for managing Amarisoft LTE services.
# -*- coding: utf-8 -*-
# Copyright (C) 2023 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com>
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your
# option) any later version, as published by the Free Software Foundation.
#
# You can also Link and Combine this program with other software covered by
# the terms of any of the Free Software licenses or any of the Open Source
# Initiative approved licenses and Convey the resulting work. Corresponding
# source of such a combination shall include the source code for all other
# software used.
#
# This program is distributed WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See COPYING file for full licensing terms.
# See https://www.nexedi.com/licensing for rationale and options.
"""Package drb provides infrastructure to process flows on data radio bearers.
- Sampler converts information about data flows obtained via ue_get[stats] into
Samples that represent bursts of continuous transmissions.
"""
from golang import func
from golang import time
import sys
tti = 1*time.millisecond # = 1·subframe Ts = 1/(2048·15000)·s ≈ 32.6 ns
# Tsymb = 2048·Ts ≈ 66.7 μs
# Slot = 7.5·Tsymb = 15350·Ts = 0.5 ms
# Subframe = 2·Slot = 1 ms
# Frame = 10·Subframe = 10 ms
# Sampler collects information about DRB usage and converts that to per-QCI UE Samples.
#
# - use .add to append ue_stats/stats as input information and get finalized Samples.
# - use .finish to wrap-up and retrieve rest of the Samples and reset the sampler.
class Sampler:
__slots__ = (
'_dl_sampler', # _Sampler('dl')
'_ul_sampler', # _Sampler('ul')
)
# Sample represents one burst of continuous transmission to/from particular UE on particular QCI.
#
# A transmission is continuous if during its time corresponding transmission
# buffer is not empty. For example a transmission where something is sent
# during 5 consecutive TTIs is continuous. As well as if something is sent not
# every TTI, but the buffer is not empty during pauses and the pauses are e.g.
# due to congestion - it is also said to be continuous transmission:
#
# | |x|x|x|x|x| |
#
# | |x|x| |x| | |x|x| |
# ↑ ↑ ↑
# buffer is not empty - the transmission sample continues
class Sample:
__slots__ = (
'tx_bytes', # amount of bytes transmitted
'tx_time', # time interval during which sample transmission was made
'tx_time_err', # accuracy of tx_time measurement
)
# _Sampler serves Sampler for one of 'dl' or 'ul' direction.
class _Sampler:
__slots__ = (
'dir', # 'dl' or 'ul'
'use_bitsync', # whether to use _BitSync
'use_ri', # whether to pay attention to rank indicator
't', # timestamp of last ue_stats
'ues', # {} ue -> _UE current state of all tracked UEs
)
# _UE represents tracking of data transmission of particular UE.
class _UE:
__slots__ = (
'erab_flows', # {} erab_id -> _ERAB_Flow current state of all erabs related to UE
'qci_flows', # {} qci -> _QCI_Flow in-progress collection of UE-related samples
'bitsync', # None | _BitSync to synchronize δtx_bytes with #tx on updates
)
# _ERAB_Flow tracks data transmission on particular ERAB of particular UE.
class _ERAB_Flow:
__slots__ = (
'qci', # qci as last reported by ue_get
'tx_total_bytes', # total amount transmitted as last reported by ue_get
)
# _QCI_Flow represents in-progress collection to make up a Sample.
#
# .update(δt, tx_bytes, #tx, ...) updates flow with information about next
# transmission period and potentially yields some finalized Samples.
# .finish() completes Sample collection.
class _QCI_Flow:
__slots__ = (
'tx_bytes', # already accumulated bytes
'tx_time', # already accumulated time
'tx_time_err', # accuracy of ^^^
)
# _BitSync helps _Sampler to match δtx_bytes and #tx in transmission updates.
#
# For example for DL a block is transmitted via PDCCH+PDSCH during one TTI, and
# then the base station awaits HARQ ACK/NACK. That ACK/NACK comes later via
# PUCCH or PUSCH. The time window in between original transmission and
# reception of the ACK/NACK is 4 TTIs for FDD and 4-13 TTIs for TDD(*).
# And Amarisoft LTEENB updates counters for dl_total_bytes and dl_tx at
# different times:
#
# ue.erab.dl_total_bytes - right after sending data on PDCCH+PDSCH
# ue.cell.{dl_tx,dl_retx} - after receiving ACK/NACK via PUCCH|PUSCH
#
# this way an update to dl_total_bytes might be seen in one frame (= 10·TTI),
# while corresponding update to dl_tx/dl_retx might be seen in either same, or
# next, or next-next frame.
#
# What _BitSync does is that it processes stream of tx_bytes/#tx and emits
# adjusted stream with #tx corresponding to tx_bytes coming together
# synchronized in time.
#
# .next(δt, tx_bytes, #tx, X) -> [](δt', tx_bytes', #tx', X')
# .finish() -> [](δt', tx_bytes', #tx', X')
#
# (*) see e.g. Figure 8.1 in "An introduction to LTE, 2nd ed."
class _BitSync:
__slots__ = (
'txq', # [](δt,tx_bytes,#tx,X) not-yet fully processed tail of whole txv
'i_txq', # txq represents txv[i_txq:]
'i_lshift', # next left shift will be done on txv[i_lshift] <- txv[i_lshift+1]
)
# Sampler() creates new sampler that will start sampling from ue_stats0/stats0 state.
@func(Sampler)
def __init__(s, ue_stats0, stats0):
s._dl_sampler = _Sampler('dl', ue_stats0, stats0, use_bitsync=True, use_ri=True)
s._ul_sampler = _Sampler('ul', ue_stats0, stats0,
use_bitsync=False, # for ul tx_bytes and #tx come, it seems, synchronized out of the box
use_ri=False) # no rank indication for ul - assume siso
# TODO also use upcoming ul_rank+ul_n_layer
@func(_Sampler)
def __init__(s, dir, ue_stats0, stats0, use_bitsync, use_ri):
s.dir = dir
s.t = -1 # so that add(t=0, init) works
s.use_bitsync = use_bitsync
s.use_ri = use_ri
s.ues = {}
_ = s.add(ue_stats0, stats0, init=True)
assert _ == {}
for ue in s.ues.values():
assert ue.qci_flows == {}
# _UE() creates new empty UE-tracking entry.
@func(_UE)
def __init__(ue, use_bitsync):
ue.erab_flows = {}
ue.qci_flows = {}
ue.bitsync = _BitSync() if use_bitsync else None
# finish wraps up all in-progress flows.
#
# and returns all remaining samples.
# The sampler is reset after retrieval.
@func(Sampler)
def finish(s): # dl/ul samples ; dl/ul = {} qci -> []Sample
dl = s._dl_sampler.finish()
ul = s._ul_sampler.finish()
return (dl, ul)
@func(_Sampler)
def finish(s):
qci_samples = {}
for ue in s.ues.values():
# wrap-up in-progress bitsync
if ue.bitsync is not None:
bitnext = ue.bitsync.finish()
ue._update_qci_flows(bitnext, qci_samples)
# wrap-up all in-progress flows
for qci, flow in ue.qci_flows.items():
_ = flow.finish()
for sample in _:
qci_samples.setdefault(qci, []).append(sample)
ue.qci_flows = {}
# preserve .erab_flows as if we were initialized with corresponding ue_stats0.
return qci_samples
# add feeds next ue_get[stats] + stats reports to the sampler.
#
# and returns samples that become finalized during this addition.
@func(Sampler)
def add(s, ue_stats, stats): # -> dl/ul samples ; dl/ul = {} qci -> []Sample
dl = s._dl_sampler.add(ue_stats, stats)
ul = s._ul_sampler.add(ue_stats, stats)
return dl, ul
class _Utx: # transmission state passed through bitsync
__slots__ = (
'qtx_bytes',
'rank',
'xl_use_avg',
)
@func(_Sampler)
def add(s, ue_stats, stats, init=False):
t = ue_stats['utc']
δt = t - s.t
s.t = t
assert δt > 0
qci_samples = {} # qci -> []Sample samples finalized during this add
ue_live = set() # of ue ue that are present in ue_stats
# go through all UEs and update/finalize flows from information on per-UE erabs.
for ju in ue_stats['ue_list']:
ue_id = ju['enb_ue_id'] # TODO 5G: -> ran_ue_id + qos_flow_list + sst?
ue_live.add(ue_id)
if len(ju['cells']) != 1:
raise RuntimeError(("ue #%s belongs to %d cells; "+
"but only single-cell configurations are supported") % (ue_id, len(ju(['cells']))))
cell = ju['cells'][0]
tx = cell['%s_tx' % s.dir] # in transport blocks
retx = cell['%s_retx' % s.dir] # ----//----
assert tx >= 0, tx
assert retx >= 0, retx
cell_id = cell['cell_id'] # int
scell = stats['cells'][str(cell_id)]
u = _Utx()
u.qtx_bytes = {} # qci -> Σδerab_qci=qci
u.rank = cell['ri'] if s.use_ri else 1
u.xl_use_avg = scell['%s_use_avg' % s.dir]
ue = s.ues.get(ue_id)
if ue is None:
ue = s.ues[ue_id] = _UE(s.use_bitsync)
# erabs: δ(tx_total_bytes) -> tx_bytes ; prepare per-qci tx_bytes
tx_bytes = 0 # Σδerab
eflows_live = set() # of erab erabs that are present in ue_stats for this ue
for erab in ju['erab_list']:
erab_id = erab['erab_id']
qci = erab['qci']
eflows_live.add(erab_id)
ef = ue.erab_flows.get(erab_id)
if ef is None:
ef = ue.erab_flows[erab_id] = _ERAB_Flow()
ef.qci = qci
ef.tx_total_bytes = 0
etx_total_bytes = erab['%s_total_bytes' % s.dir]
if not (ef.qci == qci and ef.tx_total_bytes <= etx_total_bytes):
# restart erab flow on change of qci or tx_total_bytes↓
ef.qci = qci
ef.tx_total_bytes = 0
etx_bytes = etx_total_bytes - ef.tx_total_bytes
ef.tx_total_bytes = etx_total_bytes
tx_bytes += etx_bytes
if etx_bytes != 0:
u.qtx_bytes[qci] = u.qtx_bytes.get(qci,0) + etx_bytes
# debug
if 0 and s.dir == 'dl' and (etx_bytes != 0 or tx != 0 or retx != 0) and qci==9:
sfnx = ((t // tti) / 10) % 1024 # = SFN.subframe
_debug('% 4.1f ue%s %s .%d: etx_total_bytes: %d +%5d tx: %2d retx: %d ri: %d bitrate: %d' % \
(sfnx, ue_id, s.dir, qci, etx_total_bytes, etx_bytes, tx, retx, u.rank, cell['%s_bitrate' % s.dir]))
# gc non-live erabs
for erab_id in set(ue.erab_flows.keys()):
if erab_id not in eflows_live:
del ue.erab_flows[erab_id]
# bitsync <- (δt, tx_bytes, #tx, u)
tx += retx # both transmission and retransmission take time
if ue.bitsync is not None:
bitnext = ue.bitsync.next(δt, tx_bytes, tx, u)
else:
bitnext = [(δt, tx_bytes, tx, u)]
# update qci flows
if init:
continue
ue._update_qci_flows(bitnext, qci_samples)
# finish non-live ue
for ue_id in set(s.ues.keys()):
if ue_id not in ue_live:
ue = s.ues.pop(ue_id)
if ue.bitsync is not None:
bitnext = ue.bitsync.finish()
ue._update_qci_flows(bitnext, qci_samples)
return qci_samples
# _update_qci_flows updates .qci_flows for ue with (δt, tx_bytes, #tx, _Utx) yielded from bitsync.
#
# yielded samples are appended to qci_samples ({} qci -> []Sample).
@func(_UE)
def _update_qci_flows(ue, bitnext, qci_samples):
for (δt, tx_bytes, tx, u) in bitnext:
qflows_live = set() # of qci qci flows that get updated from current utx entry
# it might happen that even with correct bitsync we could end up with receiving tx=0 here.
# for example it happens if finish interrupts proper bitsync workflow e.g. as follows:
#
# 1000 0
# <-- finish
# 0 10
#
# if we see #tx = 0 we say that it might be anything in between 1 and δt.
tx_lo = tx_hi = tx
if tx == 0:
tx_hi = δt/tti
tx_lo = min(1, tx_hi)
for qci, tx_bytes_qci in u.qtx_bytes.items():
qflows_live.add(qci)
qf = ue.qci_flows.get(qci)
if qf is None:
qf = ue.qci_flows[qci] = _QCI_Flow()
# share/distribute #tx transport blocks over all QCIs.
#
# Consider two streams "x" and "o" and how LTE scheduler might
# place them into resource map: if the streams have the same
# priority they might be scheduled e.g. as shown in case "a".
# However if "x" has higher priority compared to "o" the
# streams might be scheduled as shown in case "b":
#
# ^ ^
# RB│ x x o o RB│ x x o o
# │ o o x x │ x x o o
# │ x x o o │ x x o o
# │ o o x x │ x x o o
#
# ───────> ───────>
# time time
#
# case "a" case "b"
# same priority pri(x) > pri(o)
#
#
# Here overall #tx=4, but #tx(x) = 4 for case "a" and = 2 for case "b".
#
# -> without knowing QCI priorities and actual behaviour of LTE
# scheduler we can only estimate #tx(x) to be:
#
# tx_bytes(x)
# ───────────·#tx ≤ #tx(x) ≤ #tx
# Σtx_bytes
qtx_lo = tx_bytes_qci * tx_lo / tx_bytes
if qtx_lo > tx_hi: # e.g. 6.6 * 11308 / 11308 = 6.6 + ~1e-15
qtx_lo -= 1e-4
assert 0 < qtx_lo <= tx_hi, (qtx_lo, tx_hi, tx_bytes_qci, tx_bytes)
_ = qf.update(δt, tx_bytes_qci, qtx_lo, tx_hi, u.rank, u.xl_use_avg)
for sample in _:
qci_samples.setdefault(qci, []).append(sample)
# finish flows that did not get an update
for qci in set(ue.qci_flows.keys()):
if qci not in qflows_live:
qf = ue.qci_flows.pop(qci)
_ = qf.finish()
for sample in _:
qci_samples.setdefault(qci, []).append(sample)
# _QCI_Flow() creates new empty flow.
@func(_QCI_Flow)
def __init__(qf):
qf.tx_bytes = 0
qf.tx_time = 0
qf.tx_time_err = 0
# update updates flow with information that so many bytes were transmitted during
# δt with using #tx transport blocks somewhere in [tx_lo,tx_hi] and with
# specified rank. It is also known that overall average usage of resource
# blocks corresponding to tx direction in the resource map is xl_use_avg.
@func(_QCI_Flow)
def update(qf, δt, tx_bytes, tx_lo, tx_hi, rank, xl_use_avg): # -> []Sample
#_debug('QF.update %.2ftti %5db %.1f-%.1ftx %drank %.2fuse' % (δt/tti, tx_bytes, tx_lo, tx_hi, rank, xl_use_avg))
tx_lo /= rank # normalize TB to TTI (if it is e.g. 2x2 mimo, we have 2x more transport blocks)
tx_hi /= rank
vout = []
s = qf._update(δt, tx_bytes, tx_lo, tx_hi, xl_use_avg)
if s is not None:
vout.append(s)
return vout
@func(_QCI_Flow)
def _update(qf, δt, tx_bytes, tx_lo, tx_hi, xl_use_avg): # -> ?Sample
assert tx_bytes > 0
δt_tti = δt / tti
tx_lo = min(tx_lo, δt_tti) # protection (should not happen)
tx_hi = min(tx_hi, δt_tti) # protection (should not happen)
# tx time is somewhere in [tx, δt_tti]
if xl_use_avg < 0.9:
# not congested: it likely took the time to transmit ≈ #tx
pass
else:
# potentially congested: we don't know how much congested it is and
# which QCIs are affected more and which less
# -> all we can say tx_time is only somewhere in between limits
tx_hi = δt_tti
tx_time = (tx_lo + tx_hi) / 2 * tti
tx_time_err = (tx_hi - tx_lo) / 2 * tti
cont = (qf.tx_time != 0) # if this update is continuing current sample
qf.tx_bytes += tx_bytes
qf.tx_time += tx_time
qf.tx_time_err += tx_time_err
# if we are continuing the sample, it might be that current update is either small or big.
# - if it is big - the sample continues.
# - if it is not big - it coalesces and ends the sample.
# NOTE: without throwing away last tti the overall throughput statistics
# stays the same irregardless of whether we do coalesce small txes or not.
if cont and tx_hi < 0.9*δt_tti:
s = qf._sample()
qf.tx_bytes = 0
qf.tx_time = 0
qf.tx_time_err = 0
return s
return None
# finish tells the flow that no updates will be coming anymore.
@func(_QCI_Flow)
def finish(qf): # -> []Sample
#_debug('QF.finish')
vout = []
if qf.tx_time != 0:
s = qf._sample()
qf.tx_bytes = 0
qf.tx_time = 0
qf.tx_time_err = 0
vout.append(s)
return vout
# _sample creates new Sample from what accumulated in the flow.
@func(_QCI_Flow)
def _sample(qf):
s = Sample()
s.tx_bytes = qf.tx_bytes
s.tx_time = qf.tx_time
s.tx_time_err = qf.tx_time_err
assert s.tx_bytes > 0 and \
s.tx_time > 0 and \
s.tx_time_err >= 0 and \
s.tx_time - s.tx_time_err > 0 \
, s
#_debug(" ", s)
return s
# _BitSync creates new empty bitsync.
@func(_BitSync)
def __init__(s):
s.txq = []
s.i_txq = 0
s.i_lshift = 0
# next feeds next (δt, tx_bytes, tx) into bitsync.
#
# and returns ready parts of adjusted stream.
@func(_BitSync)
def next(s, δt, tx_bytes, tx, X): # -> [](δt', tx_bytes', tx', X')
s.txq.append((δt, tx_bytes, tx, X))
# XXX for simplicity we currently handle sync in between only current and
# next frames. That is enough to support FDD. TODO handle next-next case to support TDD
#
# XXX for simplicity we also assume all δt are ~ 10·tti and do not generally handle them
# TODO handle arbitrary δt
# shift #tx to the left:
#
# in previous frame₁ we saw that transmitting tx_bytes₁ resulted in tx₁
# transport blocks in that frame. In the next frame we saw tx_bytes₂
# transmission and tx₂ transport blocks. That tx₂ is the sum of transport
# blocks a) acknowledged in frame₂, but originally transmitted in frame₁,
# and b) transmitted in frame₂ and acknowledged in that same frame₂:
#
# tx_bytes₁ tx₁
# tx_bytes₂ tx₂ = t₂(1) + t₂(2)
#
# we can estimate t₂(2) by assuming that tx_bytes transmission results in
# proportional #tx in that frame. i.e.
#
# tx₁ t₂(2)
# ───────── = ─────────
# tx_bytes₁ tx_bytes₂
#
# and then having t₂(2) we can know t₂(1) = tx₂-t₂(2).
#
# The result of transport blocks associated with frame₁ is tx₁+t₂(1).
def lshift(i):
#print(' > lshift', i, s.txq)
assert s.i_txq <= i < s.i_txq + len(s.txq)
i -= s.i_txq
δt1, b1, t1, X1 = s.txq[i]
δt2, b2, t2, X2 = s.txq[i+1]
if b1 != 0:
t22 = b2*t1/b1
else:
t22 = t2
t21 = t2-t22
if t21 > 0:
# e.g. b₁=1000 t₁=10, b₂=1000, t₂=0 yields t21=-10
t1 += t21 # move t21 from frame₂ -> frame₁
t2 -= t21
assert t1 >= 0, t1
assert t2 >= 0, t2
s.txq[i] = (δt1, b1, t1, X1)
s.txq[i+1] = (δt2, b2, t2, X2)
#print(' < lshift ', s.txq)
while s.i_lshift+1 < s.i_txq + len(s.txq):
lshift(s.i_lshift)
s.i_lshift += 1
# we are close to be ready to yield txq[0].
# yield it, after balancing #tx again a bit, since ^^^ procedure can yield
# t=0 for b!=0 e.g. for
#
# 1000 0
# 1000 10
# 0 0
vout = []
while len(s.txq) >= 3:
s._rebalance(2)
_ = s.txq.pop(0)
s.i_txq += 1
vout.append(_)
return vout
# finish tells bitsync to flush its output queue.
#
# the bitsync becomes reset.
@func(_BitSync)
def finish(s): # -> [](δt', tx_bytes', tx', X')
assert len(s.txq) < 3
s._rebalance(len(s.txq))
vout = s.txq
s.txq = []
return vout
# _rebalance redistributes tx_i in .txq[:l] proportional to tx_bytes_i:
#
# We adjust #tx as follows: consider 3 transmission entries that each sent
# b_i bytes and yielded t_i for #tx. We want to adjust t_i -> t'_i so that
# t'_i correlates with b_i and that whole transmission time stays the same:
#
# b₁ t₁ t'₁
# b₂ t₂ -> t'₂ t'_i = α·b_i Σt' = Σt
# b₃ t₃ t'₃
#
# that gives
#
# Σt
# α = ──
# Σb
#
# and has the effect of moving #tx from periods with tx_bytes=0, to periods
# where transmission actually happened (tx_bytes > 0).
@func(_BitSync)
def _rebalance(s, l):
#print(' > rebalance', s.txq[:l])
assert l <= len(s.txq)
assert l <= 3
Σb = sum(_[1] for _ in s.txq[:l])
Σt = sum(_[2] for _ in s.txq[:l])
if Σb != 0:
for i in range(l):
δt_i, b_i, t_i, X_i = s.txq[i]
t_i = b_i * Σt / Σb
assert t_i >= 0, t_i
s.txq[i] = (δt_i, b_i, t_i, X_i)
#print(' < rebalance', s.txq[:l])
# __repr__ returns human-readable representation of Sample.
@func(Sample)
def __repr__(s):
def div(a,b):
if b != 0:
return a/b
return float('inf') if a != 0 else \
float('nan')
t_lo = s.tx_time - s.tx_time_err
t_hi = s.tx_time + s.tx_time_err
b_lo = div(s.tx_bytes*8, t_hi)
b_hi = div(s.tx_bytes*8, t_lo)
return "Sample(%db, %.1f ±%.1ftti)\t# %.0f ±%.0f bit/s" % \
(s.tx_bytes, s.tx_time/tti, s.tx_time_err/tti, div(s.tx_bytes*8, s.tx_time), (b_hi - b_lo)/2)
# ----------------------------------------
__debug = False
def _debug(*argv):
if __debug:
print(*argv, file=sys.stderr)
# -*- coding: utf-8 -*-
# Copyright (C) 2023 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com>
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your
# option) any later version, as published by the Free Software Foundation.
#
# You can also Link and Combine this program with other software covered by
# the terms of any of the Free Software licenses or any of the Open Source
# Initiative approved licenses and Convey the resulting work. Corresponding
# source of such a combination shall include the source code for all other
# software used.
#
# This program is distributed WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See COPYING file for full licensing terms.
# See https://www.nexedi.com/licensing for rationale and options.
from xlte.amari.drb import _Sampler, Sample, _BitSync, tti
from golang import func
# tSampler, UE, Etx and S provide infrastructure for testing _Sampler:
# Etx represents transmission on erab with qci of tx_bytes.
class Etx:
def __init__(etx, erab_id, qci, tx_bytes, tx_total=False):
etx.erab_id = erab_id
etx.qci = qci
etx.tx_bytes = tx_bytes
etx.tx_total = tx_total
# UE represents one entry about an UE in ue_get[stats].ue_list .
class UE:
def __init__(ue, ue_id, tx, retx, *etxv, ri=1):
for _ in etxv:
assert isinstance(_, Etx)
ue.ue_id = ue_id
ue.tx = tx
ue.retx = retx
ue.etxv = etxv
ue.ri = ri
# tSampler provides testing environment for _Sampler.
#
# For easier testing and contrary to _Sampler collected samples are returned as
# a whole from final get, not incrementally.
class tSampler:
def __init__(t, *uev, use_bitsync=False, use_ri=False):
t.tstats = _tUEstats()
ue_stats0, stats0 = t.tstats.next(0, *uev)
t.sampler = _Sampler('zz', ue_stats0, stats0, use_bitsync=use_bitsync, use_ri=use_ri)
t.qci_samples = {} # in-progress collection until final get
def add(t, δt_tti, *uev):
ue_stats, stats = t.tstats.next(δt_tti, *uev)
qci_samples = t.sampler.add(ue_stats, stats)
t._update_qci_samples(qci_samples)
def get(t):
qci_samples = t.sampler.finish()
t._update_qci_samples(qci_samples)
qci_samples = t.qci_samples
t.qci_samples = {}
return qci_samples
def _update_qci_samples(t, qci_samples):
for (qci, samplev) in qci_samples.items():
t.qci_samples.setdefault(qci, []).extend(samplev)
# _tUEstats provides environment to generate test ue_get[stats].
class _tUEstats:
def __init__(t):
t.τ = 0
t.tx_total = {} # (ue,erab) -> tx_total_bytes
# next returns next (ue_stats, stats) with specified ue transmissions
def next(t, δτ_tti, *uev):
for _ in uev:
assert isinstance(_, UE)
t.τ += δτ_tti * tti
tx_total = t.tx_total
t.tx_total = {} # if ue/erab is missing in ue_stats, its tx_total is reset
ue_list = []
ue_stats = {
'time': t.τ,
'utc': 100 + t.τ,
'ue_list': ue_list
}
for ue in uev:
erab_list = []
ue_list.append({
'enb_ue_id': ue.ue_id, # TODO test both 4G and 5G flavours
'cells': [
{
'cell_id': 1,
'ri': ue.ri,
'zz_tx': ue.tx,
'zz_retx': ue.retx,
}
],
'erab_list': erab_list,
})
for etx in ue.etxv:
efkey = (ue.ue_id, etx.erab_id)
etx_total = etx.tx_bytes
if not etx.tx_total:
etx_total += tx_total.get(efkey, 0)
t.tx_total[efkey] = etx_total
erab_list.append({
'erab_id': etx.erab_id,
'qci': etx.qci,
'zz_total_bytes': etx_total,
})
stats = {
'time': ue_stats['time'],
'utc': ue_stats['utc'],
'cells': {
'1': {
'zz_use_avg': 0.1 # TODO add test for congested case
}
}
}
return ue_stats, stats
# S is shortcut to create Sample.
def S(tx_bytes, tx_time_tti):
if isinstance(tx_time_tti, tuple):
τ_lo, τ_hi = tx_time_tti
else:
τ_lo = τ_hi = tx_time_tti
s = Sample()
s.tx_bytes = tx_bytes
s.tx_time = (τ_lo + τ_hi) / 2 * tti
s.tx_time_err = (τ_hi - τ_lo) / 2 * tti
return s
# -------- tests --------
# test_Sampler1 verifies Sampler on single erab/qci flows.
def test_Sampler1():
# _ constructs tSampler, feeds tx stats into it and returns yielded Samples.
#
# tx_statsv = [](δt_tti, tx_bytes, #tx, #retx)
#
# only 1 ue, 1 qci and 1 erab are used in this test to verify the tricky
# parts of the Sampler in how single flow is divided into samples. The other
# tests verify how Sampler handles other aspects - e.g. multiple erabs,
# multiple qci, etc...
def _(*tx_statsv, bitsync=None): # -> []Sample
def b(bitsync):
t = tSampler(use_bitsync=bitsync)
for (δt_tti, tx_bytes, tx, retx) in tx_statsv:
t.add(δt_tti, UE(17, tx, retx, Etx(23, 4, tx_bytes)))
qci_samplev = t.get()
if len(qci_samplev) == 0:
return []
assert set(qci_samplev.keys()) == {4}
return qci_samplev[4]
boff = None # verify with both bitsync=off/on if bitsync=None
bon = None
if bitsync is None or (not bitsync):
boff = b(False)
if bitsync is None or bitsync:
bon = b(True)
if bitsync is None:
assert boff == bon
return bon if bitsync else boff
# δt_tti tx_bytes #tx #retx
assert _() == []
assert _((10, 1000, 1, 0)) == [S(1000, 1)]
assert _((10, 1000, 2, 0)) == [S(1000, 2)]
assert _((10, 1000, 3, 0)) == [S(1000, 3)]
for tx in range(2,10+1):
assert _((10,1000, tx, 0)) == [S(1000, tx)]
assert _((10, 1000, 1, 1)) == [S(1000, 2)] # 1 tx + 1 retx = 2 TTI
assert _((10, 1000, 1, 2)) == [S(1000, 3)] # tx_time is estimated via (tx+retx)
for tx in range(1,10+1):
for retx in range(1,10-tx+1):
assert _((10,1000, tx, retx)) == [S(1000, tx+retx)]
assert _((10, 1000, 77, 88)) == [S(1000, 10)] # tx_time ≤ δt (bug in #tx / #retx)
# coalesce/wrap-up 2 frames
def _2tx(tx1, tx2): return _((10, 100*tx1, tx1, 0),
(10, 100*tx2, tx2, 0))
assert _2tx(4, 3) == [S(700,7)] # small tx1 and tx2: coalesce as if tx1 comes in the end of frame₁
assert _2tx(4, 4) == [S(800,8)] # and tx2 in the beginning of frame₂
assert _2tx(4, 5) == [S(900,9)] # ----//----
assert _2tx(3, 5) == [S(800,8)] # ...
assert _2tx(2, 5) == [S(700,7)]
assert _2tx(5, 4) == [S(900,9)]
assert _2tx(5, 3) == [S(800,8)]
assert _2tx(5, 2) == [S(700,7)]
assert _2tx(10, 0) == [S(1000,10)] # full + no tx
assert _2tx(10, 1) == [S(1100,11)] # full + 1 tti tx
assert _2tx(10, 2) == [S(1200,12)] # full + 2 ttis
for tx2 in range(2,10+1):
assert _2tx(10, tx2) == [S((10+tx2)*100, 10+tx2)]
# coalesce/wrap-up 3 frames: small tx + med-full + small tx
def _3tx(tx1, tx2, tx3): return _((10, 100*tx1, tx1, 0),
(10, 100*tx2, tx2, 0),
(10, 100*tx3, tx3, 0))
assert _3tx(4, 0, 3) == [S(400,4), S(300,3)] # empty middle
assert _3tx(4, 1, 3) == [S(500,5), S(300,3)] # middle only 1 tti - coalesced to left
assert _3tx(4, 2, 3) == [S(600,6), S(300,3)] # middle small - coalesced to left
assert _3tx(4, 3, 3) == [S(700,7), S(300,3)] # ----//----
assert _3tx(4, 4, 3) == [S(800,8), S(300,3)] # ----//----
assert _3tx(4, 8, 3) == [S(1200,12), S(300,3)] # ----//----
assert _3tx(4, 9, 3) == [S(1600,16)] # middle big - coalesced to left and right
assert _3tx(4,10, 3) == [S(1700,17)] # ----//----
# coalesce/wrap-up 4 frames: small tx + med-full + med-full + small tx
def _4tx(tx1, tx2, tx3, tx4): return _((10, 100*tx1, tx1, 0),
(10, 100*tx2, tx2, 0),
(10, 100*tx3, tx3, 0),
(10, 100*tx4, tx4, 0))
assert _4tx(4, 0, 0, 3) == [S(400,4), S(300,3)] # empty m1, m2
assert _4tx(4, 1, 0, 3) == [S(500,5), S(300,3)] # m1 - only 1 tti - coalesces to left
assert _4tx(4, 0, 1, 3) == [S(400,4), S(400,4)] # m2 - only 1 tti - coalesces to right
assert _4tx(4, 2, 0, 3) == [S(600,6), S(300,3)] # m1 small - coalesces to left
assert _4tx(4, 0, 2, 3) == [S(400,4), S(500,5)] # m2 small - coalesces to right
assert _4tx(4, 3, 4, 3) == [S(700,7), S(700,7)] # m1 and m2 small - m1 coalesces to left, m2 to right
assert _4tx(4, 9, 4, 3) == [S(400+900+400,4+9+4), S(300,3)] # m1 big - coalesces s1 and m2
assert _4tx(4, 3, 9, 3) == [S(700,7), S(1200,12)] # m2 big - it only starts new sample and coalesces to right
assert _4tx(4, 9,10, 3) == [S(400+900+1000+300,4+9+10+3)] # m1 and m2 big - all coalesces
# zero #tx
# this might happen even with bitsync if finish divides the stream at an
# unfortunate moment e.g. as follows:
#
# 1000 0
# <-- finish
# 0 10
assert _((10, 1000, 0, 0)) == [S(1000, (1,10))]
# bitsync lightly (BitSync itself is verified in details in test_BitSync)
def b(*btx_statsv):
tx_statsv = []
for (tx_bytes, tx) in btx_statsv: # note: no δt_tti, #retx
tx_statsv.append((10, tx_bytes, tx, 0))
return _(*tx_statsv, bitsync=True)
# tx_bytes #tx
assert b() == []
assert b((1000, 0)) == [S(1000, (1,10))]
assert b((1000, 0),
(0, 10)) == [S(1000, 10)]
assert b((1000, 4), # 4
( 500, 8), # 6 2
(1000, 7), # 3 4
( 0, 6), # 6
( 0, 0)) == [S(1000+500,10+5), S(1000,10)]
# sampler starts from non-scratch - correctly detects δ for erabs.
def test_Sampler_start_from_nonscratch():
t = tSampler(UE(17, 0,0, Etx(23, 4, 10000, tx_total=True)))
t.add(10, UE(17, 10,0, Etx(23, 4, 123)))
assert t.get() == {4: [S(123,10)]}
# erab disappears and appears again -> tx_total_bytes is reset
def test_Sampler_erab_reestablish():
def ue(tx, *etxv): return UE(17, tx, 0, *etxv)
def etx(tx_bytes): return Etx(23, 4, tx_bytes, tx_total=True)
t = tSampler()
t.add(10, ue(2, etx(1000)))
t.add(10, ue(0, )) # erab disappears due to release
t.add(10, ue(10,etx(5000))) # erab reappears - tx_total_bytes handling restarted from scratch
assert t.get() == {4: [S(1000,2), S(5000,10)]}
# erab changes qci on the fly -> erab is considered to be reestablished
def test_Sampler_erab_change_qci():
def ue(tx, *etxv): return UE(17, tx, 0, *etxv)
def etx(qci, tx_bytes, **kw): return Etx(23, qci, tx_bytes, **kw)
t = tSampler()
t.add(10, ue(10, etx(9, 2000, tx_total=True))) # tx with qci=9
t.add(10, ue(10, etx(5, 3000, tx_total=True))) # tx with qci=5
assert t.get() == {9: [S(2000,10)], 5: [S(3000,10)]} # would be S(3000,20) if δqci was not handled
# erab is considered to be reestablished on decreased tx_total_bytes
def test_Sampler_tx_total_down():
def ue(tx, *etxv): return UE(17, tx, 0, *etxv)
def etx(tx_bytes, **kw): return Etx(23, 4, tx_bytes, **kw)
t = tSampler()
t.add(10, ue(10, etx(4000, tx_total=True)))
t.add(10, ue(10, etx(3000, tx_total=True)))
assert t.get() == {4: [S(7000,20)]} # would be e.g. S(4000,10) if tx_total_bytes↓ not handled
# N tx transport blocks is shared/distributed between multiple QCIs
#
# tx_lo ∼ tx_bytes / Σtx_bytes
# tx_hi = whole #tx even if tx_bytes are different
def test_Sampler_txtb_shared_between_qci():
def ue(tx, *etxv): return UE(17, tx, 0, *etxv)
t = tSampler()
t.add(10, ue(10, Etx(1, 9, 4000),
Etx(2, 5, 1000)))
assert t.get() == {9: [S(4000, (8,10))], 5: [S(1000, (2,10))]}
# multiple UE are correctly taken into account
def test_Sampler_multiple_ue():
def ue(ue_id, tx, *etxv): return UE(ue_id, tx, 0, *etxv)
def etx(tx_bytes): return Etx(23, 4, tx_bytes)
t = tSampler()
t.add(10, ue(17, 4, etx(1000)),
ue(18, 5, etx(2000)))
assert t.get() == {4: [S(1000,4), S(2000,5)]}
# rank affects DL max #TB/TTI (ul: no info)
def test_Sampler_rank():
def ue(tx, *etxv): return UE(17, tx, 0, *etxv, ri=2)
def etx(tx_bytes): return Etx(23, 4, tx_bytes)
t = tSampler(use_ri=True)
t.add(10, ue(3, etx(1000)))
assert t.get() == {4: [S(1000, 1.5)]} # tx_time=1.5, not 3
t.add(10, ue(10, etx(1000)))
assert t.get() == {4: [S(1000, 5)]} # tx_time=5, not 10
t.add(10, ue(10*2, etx(1000)))
assert t.get() == {4: [S(1000,10)]} # now tx_time=10
# verify that use_ri=False does not take ue.ri into account
t = tSampler(use_ri=False)
t.add(10, ue(3, etx(1000)))
assert t.get() == {4: [S(1000,3)]} # tx_time=3, not 1.5
# verify _BitSync works ok.
def test_BitSync():
# _ passes txv_in into _BitSync and returns output stream.
#
# txv_in = [](tx_bytes, #tx) ; δt=10·tti
def _(*txv_in):
def do_bitsync(*txv_in):
txv_out = []
xv_out = ''
bitsync = _BitSync()
for x, (tx_bytes, tx) in enumerate(txv_in):
_ = bitsync.next(10*tti, tx_bytes, tx,
chr(ord('a')+x))
for (δt, tx_bytes, tx, x_) in _:
assert δt == 10*tti
txv_out.append((tx_bytes, tx))
xv_out += x_
_ = bitsync.finish()
for (δt, tx_bytes, tx, x_) in _:
assert δt == 10*tti
txv_out.append((tx_bytes, tx))
xv_out += x_
assert xv_out == 'abcdefghijklmnopqrstuvwxyz'[:len(txv_in)]
return txv_out
txv_out = do_bitsync(*txv_in)
# also check with 0-tail -> it should give the same
txv_out_ = do_bitsync(*(txv_in + ((0,0),)*10))
assert txv_out_ == txv_out + [(0,0)]*10
return txv_out
# tx_bytes tx
assert _((1000, 10), # all ACK in the same frame
( 0, 0),
( 0, 0)) == [(1000, 10),
( 0, 0),
( 0, 0)]
assert _((1000, 0), # all ACK in next frame
( 0, 10),
( 0, 0)) == [(1000, 10),
( 0, 0),
( 0, 0)]
#assert _((1000, 0), # all ACK in next-next frame
# ( 0, 0),
# ( 0, 10)) == [(1000, 10),
# ( 0, 0),
# ( 0, 0)]
assert _((1000, 2), # some ACK in the same frame, some in next
( 0, 8),
( 0, 0)) == [(1000, 10),
( 0, 0),
( 0, 0)]
#assert _((1000, 2), # some ACK in the same frame, some in next, some in next-next
# ( 0, 5),
# ( 0, 3)) == [(1000, 10),
# ( 0, 0),
# ( 0, 0)]
# 1000 1000
assert _((1000, 10), # consecutive transmission (ack in same)
(1000, 10),
( 500, 5),
( 0, 0),
( 0, 0)) == [(1000, 10),
(1000, 10),
( 500, 5),
( 0, 0),
( 0, 0)]
assert _((1000, 0), # consecutive transmission (ack in next)
(1000, 10),
( 500, 10),
( 0, 5),
( 0, 0)) == [(1000, 10),
(1000, 10),
( 500, 5),
( 0, 0),
( 0, 0)]
assert _((1000, 4), # consecutive transmission (ack scattered)
(1000, 10), # 6 4
( 500, 8), # 6 2
( 0, 3), # 3
( 0, 0)) == [(1000, 10),
(1000, 10),
( 500, 5),
( 0, 0),
( 0, 0)]
#assert _((1000, 2), # consecutive transmission (ack scattered to next and next-next)
# (1000, 8), # 5 3
# ( 500, 8), # 3 5 0
# ( 0, 6), # 2 4
# ( 0, 1), # 1
# ( 0, 0)) == [(1000, 10),
# (1000, 10),
# ( 500, 5),
# ( 0, 0),
# ( 0, 0)]
# 1000 500 1000
assert _((1000, 10), # consecutive transmission (ack in same)
( 500, 5),
(1000, 10),
( 0, 0),
( 0, 0)) == [(1000, 10),
( 500, 5),
(1000, 10),
( 0, 0),
( 0, 0)]
assert _((1000, 0), # consecutive transmission (ack in next)
( 500, 10),
(1000, 5),
( 0, 10),
( 0, 0)) == [(1000, 10),
( 500, 5),
(1000, 10),
( 0, 0),
( 0, 0)]
assert _((1000, 4), # consecutive transmission (ack scattered)
( 500, 8), # 6 2
(1000, 7), # 3 4
( 0, 6), # 6
( 0, 0)) == [(1000, 10),
( 500, 5),
(1000, 10),
( 0, 0),
( 0, 0)]
#assert _((1000, 2), # consecutive transmission (ack scattered to next and next-next)
# ( 500, 8), # 5 3
# (1000, 5), # 3 1 1
# ( 0, 5), # 1 4
# ( 0, 5), # 5
# ( 0, 0)) == [(1000, 10),
# ( 500, 5),
# (1000, 10),
# ( 0, 0),
# ( 0, 0)]
# transmission is scattered to two frames with all acks only in the second frame
assert _((1000, 0),
(1000, 10)) == [(1000, 5),
(1000, 5)]
assert _((1000, 0),
(1000, 10),
( 0, 0)) == [(1000, 5),
(1000, 5),
( 0, 0)]
assert _((1000, 0), # steady tx (ack in next)
(1000, 10),
( 500, 10),
( 500, 5),
( 500, 5),
( 0, 5),
( 0, 0)) == [(1000, 10),
(1000, 10),
( 500, 5),
( 500, 5),
( 500, 5),
( 0, 0),
( 0, 0)]
#assert _((1000, 0), # steady tx (ack in next-next)
# (1000, 0),
# ( 500, 10),
# ( 500, 10),
# ( 500, 5),
# ( 0, 5),
# ( 0, 5),
# ( 0, 0)) == [(1000, 10),
# (1000, 10),
# ( 500, 5),
# ( 500, 5),
# ( 500, 5),
# ( 0, 0),
# ( 0, 0),
# ( 0, 0)]
assert _((1000, 10), # yields t21 < 0 in lshift
(1000, 0),
( 0, 10)) == [(1000, 10),
(1000, 10),
( 0, 0)]
# real-life example
assert _(( 6168, 0),
(14392, 8),
( 0, 0)) == [( 6168, 2.4),
(14392, 5.6),
( 0, 0 )]
# ---- misc ----
# teach tests to compare Samples
@func(Sample)
def __eq__(a, b):
if not isinstance(b, Sample):
return False
# compare tx_time with tolerance to level-out floating point errors
return (abs(a.tx_time - b.tx_time) < (tti / 1e6)) and \
(a.tx_bytes == b.tx_bytes)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment