Commit bf96c767 authored by Kirill Smelkov's avatar Kirill Smelkov

kpi: Add way to compute aggregated counters + showcase this

- add Calc.cum to aggregate Measurements.

- add ΣMeasurement type to represent result of this. It is very similar
  to Measurement, but every field comes accompanied with information
  about how much time there was no data for that field. In other words
  it is not all or nothing for NA in the result. For example a field
  might be present 90% of the time and NA only 10% of the time. We want to
  preserver knowledge about that 90% of valid values in the result. And we
  also want to know how much time there was no data.

- amend and kpidemo.ipynb to demonstrate this.
#!/usr/bin/env python
"""kpidemo - plot KPIs computed from enb.xlog
Also print total for raw counters.
Usage: kpidemo <time period> <enb.xlog uri>
......@@ -127,7 +129,15 @@ def main():
facc, fthp = fig.subfigures(1, 2)
figplot_erab_accessibility (facc, vτ, vInititialEPSBEstabSR, vAddedEPSBEstabSR, tperiod)
figplot_eutran_ip_throughput(fthp, vτ, vIPThp_qci, tperiod)
# Step 5. Print total for raw counters.
mhead =[0]
mtail =[-1]
calc_total = kpi.Calc(mlog, mhead['X.Tstart'], mtail['X.Tstart']+mtail['X.δT'])
Σ = calc_total.sum()
# ---- plotting routines ----
......@@ -238,5 +248,43 @@ def vτ_period_pretty(vτ):
return "%s ±%s [%s, %s]" % (tpretty(avg), tpretty(std), tpretty(min), tpretty(max))
# ---- printing routines ----
# print_ΣMeasurement prints aggregated counters.
def print_ΣMeasurement(Σ: kpi.ΣMeasurement):
print("Time:\t%s - %s" % (datetime.fromtimestamp(Σ['X.Tstart']),
datetime.fromtimestamp(Σ['X.Tstart'] + Σ['X.δT'])))
# emit1 prints one field.
def emit1(name, v, τ_na):
fmt = "%12s "
if kpi.isNA(v):
s = fmt % "NA"
if isinstance(v, np.floating):
fmt = "%15.2f"
s = fmt % v
pna = τ_na / Σ['X.δT'] * 100
if pna >= 0.01:
s += " (%.2f%% NA)" % pna
print("%-32s:\t%s" % (name, s))
for field in Σ._dtype0.names:
if field in ('X.Tstart', 'X.δT'):
v = Σ[field]['value']
τ_na = Σ[field]['τ_na']
if v.shape == (): # scalar
emit1(field, v, τ_na)
assert len(v.shape) == 1
if kpi.isNA(v).all(): # subarray full of ø
emit1(field, v[0], τ_na[0])
else: # subarray with some non-ø data
for k in range(v.shape[0]):
if v[k] != 0:
fieldk = '%s.%d' % (field[:field.rfind('.')], k) # name.QCI -> name.k
emit1(fieldk, v[k], τ_na[k])
if __name__ == '__main__':
......@@ -21,7 +21,8 @@
- Calc is KPI calculator. It can be instantiated on MeasurementLog and time
interval over which to perform computations. Use Calc methods such as
.erab_accessibility() and .eutran_ip_throughput() to compute KPIs.
.erab_accessibility() and .eutran_ip_throughput() to compute KPIs, and .sum()
to compute aggregated measurements.
- MeasurementLog maintains journal with result of measurements. Use .append()
to populate it with data.
......@@ -70,8 +71,9 @@ from golang import func
# ──────|─────|────[────|────)──────|──────|────────>
# ←─ τ_lo τ_hi ──→ time
# It is also possible to merely aggregate measured values via .sum() .
# See also: MeasurementLog, Measurement.
# See also: MeasurementLog, Measurement, ΣMeasurement.
class Calc:
# ._data []Measurement - fully inside [.τ_lo, .τ_hi)
# [.τ_lo, .τ_hi) time interval to compute over. Potentially wider than originally requested.
......@@ -217,6 +219,27 @@ class Interval(np.void):
# ΣMeasurement represents result of aggregation of several Measurements.
# It is similar to Measurement, but each value comes accompanied with
# information about how much time there was no data for that field:
# Σ[f].value = Σ Mi[f] if Mi[f] ≠ NA
# i
# Σ[f].τ_na = Σ Mi[X.δT] if Mi[f] = NA
# i
class ΣMeasurement(np.void):
_ = []
for name in Measurement._dtype.names:
typ = Measurement._dtype.fields[name][0].type
if not name.startswith('X.'): # X.Tstart, X.δT
typ = np.dtype([('value', typ), ('τ_na', Measurement.Ttime)])
_.append((name, typ))
_dtype = np.dtype(_)
del _
# ----------------------------------------
# Measurement is the central part around which everything is organized.
# Let's have it go first.
......@@ -233,6 +256,23 @@ def __new__(cls):
m[field][:] = NA(fdtype.base) # subarray
return m
# ΣMeasurement() creates new ΣMeasurement instance.
# For all fields .value is initialized with NA and .τ_na with 0.
def __new__(cls):
Σ = _newscalar(cls, cls._dtype)
for field in Σ.dtype.names:
fdtype = Σ.dtype.fields[field][0]
if fdtype.shape != (): # skip subarrays - rely on aliases
if field.startswith('X.'): # X.Tstart, X.δT
Σ[field] = NA(fdtype)
Σ[field]['value'] = NA(fdtype.fields['value'][0])
Σ[field]['τ_na'] = 0
return Σ
# _all_qci expands <name>.QCI into <name>.sum and [] of <name>.<qci> for all possible qci values.
# TODO remove and use direct array access (after causes are expanded into array too)
......@@ -251,26 +291,26 @@ def _all_cause(name_cause: str): # -> name_sum, ()name_causev
name = name_cause[:-len(".CAUSE")]
return name+".sum", () # TODO add all possible CAUSEes - TS 36.331 (RRC)
# expand all .QCI and .CAUSE in Measurement._dtype .
def _():
# expand all .QCI and .CAUSE in ._dtype of Measurement and ΣMeasurement.
def _(Klass):
# expand X.QCI -> X.sum + X.QCI[nqci]
qnamev = [] # X from X.QCI
expv = [] # of (name, typ[, shape])
for name in Measurement._dtype .names:
typ = Measurement._dtype .fields[name][0].type
for name in Klass._dtype .names:
dtyp = Klass._dtype .fields[name][0]
if name.endswith('.QCI'):
_ = name[:-len('.QCI')]
expv.append(('%s.sum' % _, typ)) # X.sum
expv.append((name, typ, nqci)) # X.QCI[nqci]
expv.append(('%s.sum' % _, dtyp)) # X.sum
expv.append((name, dtyp, nqci)) # X.QCI[nqci]
elif name.endswith('.CAUSE'):
Σ, causev = _all_cause(name)
for _ in (Σ,)+causev:
expv.append((_, typ))
expv.append((_, dtyp))
expv.append((name, typ))
expv.append((name, dtyp))
_dtype = np.dtype(expv)
......@@ -292,14 +332,15 @@ def _():
offsetv.append(off0 + qci*qarr.base.itemsize)
Measurement._dtype0 = _dtype # ._dtype without aliases
Measurement._dtype = np.dtype({
Klass._dtype0 = _dtype # ._dtype without aliases
Klass._dtype = np.dtype({
'names': namev,
'formats': formatv,
'offsets': offsetv,
assert Measurement._dtype.itemsize == Measurement._dtype0.itemsize
assert Klass._dtype.itemsize == Klass._dtype0.itemsize
del _
......@@ -657,6 +698,33 @@ def eutran_ip_throughput(calc): # -> IPThp[QCI][dl,ul]
return thp
# sum aggregates values of all Measurements in covered time interval.
# TODO tests
def sum(calc): # -> ΣMeasurement
Σ = ΣMeasurement()
Σ['X.Tstart'] = calc.τ_lo
Σ['X.δT'] = calc.τ_hi - calc.τ_lo
for m in calc._miter():
for field in m.dtype.names:
if field.startswith('X.'): # X.Tstart, X.δT
v = m[field]
if v.shape != (): # skip subarrays - rely on aliases
if isNA(v):
Σ[field]['τ_na'] += m['X.δT']
if isNA(Σ[field]['value']):
Σ[field]['value'] = 0
Σ[field]['value'] += v
return Σ
# _miter iterates through [.τ_lo, .τ_hi) yielding Measurements.
# The measurements are yielded with consecutive timestamps. There is no gaps
