Commit b56b6ba0 authored by Kirill Smelkov's avatar Kirill Smelkov

py2: *: Greek -> Latin

Python2 does not support unicode characters in identifiers.
parent 612a3d0f
......@@ -185,10 +185,10 @@ class Conn:
# handle rx timeout ourselves. We cannot rely on global rx timeout
# since e.g. other replies might be coming in again and again.
δt = conn._ws.gettimeout()
dt = conn._ws.gettimeout()
rxt = nilchan
if δt is not None:
_ = time.Timer(δt)
if dt is not None:
_ = time.Timer(dt)
defer(_.stop)
rxt = _.c
......
This diff is collapsed.
......@@ -57,8 +57,8 @@ class tSampler:
t.sampler = _Sampler('zz', ue_stats0, stats0, use_bitsync=use_bitsync, use_ri=use_ri)
t.qci_samples = {} # in-progress collection until final get
def add(t, δt_tti, *uev):
ue_stats, stats = t.tstats.next(δt_tti, *uev)
def add(t, dt_tti, *uev):
ue_stats, stats = t.tstats.next(dt_tti, *uev)
qci_samples = t.sampler.add(ue_stats, stats)
t._update_qci_samples(qci_samples)
......@@ -77,21 +77,21 @@ class tSampler:
# _tUEstats provides environment to generate test ue_get[stats].
class _tUEstats:
def __init__(t):
t.τ = 0
t.tau = 0
t.tx_total = {} # (ue,erab) -> tx_total_bytes
# next returns next (ue_stats, stats) with specified ue transmissions
def next(t, δτ_tti, *uev):
def next(t, dtau_tti, *uev):
for _ in uev:
assert isinstance(_, UE)
t.τ += δτ_tti * tti
t.tau += dtau_tti * tti
tx_total = t.tx_total
t.tx_total = {} # if ue/erab is missing in ue_stats, its tx_total is reset
ue_list = []
ue_stats = {
'time': t.τ,
'utc': 100 + t.τ,
'time': t.tau,
'utc': 100 + t.tau,
'ue_list': ue_list
}
for ue in uev:
......@@ -137,14 +137,14 @@ class _tUEstats:
# S is shortcut to create Sample.
def S(tx_bytes, tx_time_tti):
if isinstance(tx_time_tti, tuple):
τ_lo, τ_hi = tx_time_tti
tau_lo, tau_hi = tx_time_tti
else:
τ_lo = τ_hi = tx_time_tti
tau_lo = tau_hi = tx_time_tti
s = Sample()
s.tx_bytes = tx_bytes
s.tx_time = (τ_lo + τ_hi) / 2 * tti
s.tx_time_err = (τ_hi - τ_lo) / 2 * tti
s.tx_time = (tau_lo + tau_hi) / 2 * tti
s.tx_time_err = (tau_hi - tau_lo) / 2 * tti
return s
......@@ -154,7 +154,7 @@ def S(tx_bytes, tx_time_tti):
def test_Sampler1():
# _ constructs tSampler, feeds tx stats into it and returns yielded Samples.
#
# tx_statsv = [](δt_tti, tx_bytes, #tx, #retx)
# tx_statsv = [](dt_tti, tx_bytes, #tx, #retx)
#
# only 1 ue, 1 qci and 1 erab are used in this test to verify the tricky
# parts of the Sampler in how single flow is divided into samples. The other
......@@ -163,8 +163,8 @@ def test_Sampler1():
def _(*tx_statsv, bitsync=None): # -> []Sample
def b(bitsync):
t = tSampler(use_bitsync=bitsync)
for (δt_tti, tx_bytes, tx, retx) in tx_statsv:
t.add(δt_tti, UE(17, tx, retx, Etx(23, 4, tx_bytes)))
for (dt_tti, tx_bytes, tx, retx) in tx_statsv:
t.add(dt_tti, UE(17, tx, retx, Etx(23, 4, tx_bytes)))
qci_samplev = t.get()
if len(qci_samplev) == 0:
return []
......@@ -181,7 +181,7 @@ def test_Sampler1():
return bon if bitsync else boff
# δt_tti tx_bytes #tx #retx
# dt_tti tx_bytes #tx #retx
assert _() == []
assert _((10, 1000, 1, 0)) == [S(1000, 1)]
assert _((10, 1000, 2, 0)) == [S(1000, 2)]
......@@ -195,7 +195,7 @@ def test_Sampler1():
for retx in range(1,10-tx+1):
assert _((10,1000, tx, retx)) == [S(1000, tx+retx)]
assert _((10, 1000, 77, 88)) == [S(1000, 10)] # tx_time ≤ δt (bug in #tx / #retx)
assert _((10, 1000, 77, 88)) == [S(1000, 10)] # tx_time ≤ dt (bug in #tx / #retx)
# coalesce/wrap-up 2 frames
def _2tx(tx1, tx2): return _((10, 100*tx1, tx1, 0),
......@@ -255,7 +255,7 @@ def test_Sampler1():
# bitsync lightly (BitSync itself is verified in details in test_BitSync)
def b(*btx_statsv):
tx_statsv = []
for (tx_bytes, tx) in btx_statsv: # note: no δt_tti, #retx
for (tx_bytes, tx) in btx_statsv: # note: no dt_tti, #retx
tx_statsv.append((10, tx_bytes, tx, 0))
return _(*tx_statsv, bitsync=True)
......@@ -272,7 +272,7 @@ def test_Sampler1():
( 0, 0)) == [S(1000+500,10+5), S(1000,10)]
# sampler starts from non-scratch - correctly detects δ for erabs.
# sampler starts from non-scratch - correctly detects delta for erabs.
def test_Sampler_start_from_nonscratch():
t = tSampler(UE(17, 0,0, Etx(23, 4, 10000, tx_total=True)))
t.add(10, UE(17, 10,0, Etx(23, 4, 123)))
......@@ -313,7 +313,7 @@ def test_Sampler_tx_total_down():
# N tx transport blocks is shared/distributed between multiple QCIs
#
# tx_lo ∼ tx_bytes / Σtx_bytes
# tx_lo ∼ tx_bytes / Stx_bytes
# tx_hi = whole #tx even if tx_bytes are different
def test_Sampler_txtb_shared_between_qci():
def ue(tx, *etxv): return UE(17, tx, 0, *etxv)
......@@ -356,7 +356,7 @@ def test_Sampler_rank():
def test_BitSync():
# _ passes txv_in into _BitSync and returns output stream.
#
# txv_in = [](tx_bytes, #tx) ; δt=10·tti
# txv_in = [](tx_bytes, #tx) ; dt=10·tti
def _(*txv_in):
def do_bitsync(*txv_in):
txv_out = []
......@@ -365,14 +365,14 @@ def test_BitSync():
for x, (tx_bytes, tx) in enumerate(txv_in):
_ = bitsync.next(10*tti, tx_bytes, tx,
chr(ord('a')+x))
for (δt, tx_bytes, tx, x_) in _:
assert δt == 10*tti
for (dt, tx_bytes, tx, x_) in _:
assert dt == 10*tti
txv_out.append((tx_bytes, tx))
xv_out += x_
_ = bitsync.finish()
for (δt, tx_bytes, tx, x_) in _:
assert δt == 10*tti
for (dt, tx_bytes, tx, x_) in _:
assert dt == 10*tti
txv_out.append((tx_bytes, tx))
xv_out += x_
......
......@@ -259,8 +259,8 @@ def _handle_stats(logm, stats: xlog.Message, m_prev: kpi.Measurement):
# do init/fini correction if there was also third preceding stats message.
m = logm._m.copy() # [stats_prev, stats)
# δcc(counter) tells how specified cumulative counter changed since last stats result.
def δcc(counter):
# dcc(counter) tells how specified cumulative counter changed since last stats result.
def dcc(counter):
old = _stats_cc(stats_prev, counter)
new = _stats_cc(stats, counter)
if new < old:
......@@ -285,38 +285,38 @@ def _handle_stats(logm, stats: xlog.Message, m_prev: kpi.Measurement):
# overall statistics if it is computed taking both periods into account.
if p is not None:
if p[fini] < p[init]:
δ = min(p[init]-p[fini], m[fini])
p[fini] += δ
m[fini] -= δ
delta = min(p[init]-p[fini], m[fini])
p[fini] += delta
m[fini] -= delta
# if we still have too much fini - throw it away pretending that it
# came from even older uncovered period
if m[fini] > m[init]:
m[fini] = m[init]
# compute δ for counters.
# compute delta for counters.
# any logic error in data will be reported via LogError.
try:
# RRC: connection establishment
m_initfini(
'RRC.ConnEstabAtt.sum', δcc('rrc_connection_request'),
'RRC.ConnEstabSucc.sum', δcc('rrc_connection_setup_complete'))
'RRC.ConnEstabAtt.sum', dcc('rrc_connection_request'),
'RRC.ConnEstabSucc.sum', dcc('rrc_connection_setup_complete'))
# S1: connection establishment
m_initfini(
'S1SIG.ConnEstabAtt', δcc('s1_initial_context_setup_request'),
'S1SIG.ConnEstabSucc', δcc('s1_initial_context_setup_response'))
'S1SIG.ConnEstabAtt', dcc('s1_initial_context_setup_request'),
'S1SIG.ConnEstabSucc', dcc('s1_initial_context_setup_response'))
# ERAB: Initial establishment
# FIXME not correct if multiple ERABs are present in one message
m_initfini(
'ERAB.EstabInitAttNbr.sum', δcc('s1_initial_context_setup_request'),
'ERAB.EstabInitSuccNbr.sum', δcc('s1_initial_context_setup_response'))
'ERAB.EstabInitAttNbr.sum', dcc('s1_initial_context_setup_request'),
'ERAB.EstabInitSuccNbr.sum', dcc('s1_initial_context_setup_response'))
# ERAB: Additional establishment
# FIXME not correct if multiple ERABs are present in one message
m_initfini(
'ERAB.EstabAddAttNbr.sum', δcc('s1_erab_setup_request'),
'ERAB.EstabAddSuccNbr.sum', δcc('s1_erab_setup_response'))
'ERAB.EstabAddAttNbr.sum', dcc('s1_erab_setup_request'),
'ERAB.EstabAddSuccNbr.sum', dcc('s1_erab_setup_response'))
except Exception as e:
if not isinstance(e, LogError):
......@@ -383,22 +383,22 @@ def _handle_drb_stats(logm, drb_stats: xlog.Message):
assert drb_stats_prev.message == "x.drb_stats"
# time coverage for current drb_stats
τ_lo = drb_stats_prev.timestamp
τ_hi = drb_stats.timestamp
δτ = τ_hi - τ_lo
tau_lo = drb_stats_prev.timestamp
tau_hi = drb_stats.timestamp
dtau = tau_hi - tau_lo
# see with which ._m or ._m_next, if any, drb_stats overlaps with ≥ 50% of
# time first, and update that measurement correspondingly.
if not (δτ > 0):
if not (dtau > 0):
return
if logm._m is not None:
m_lo = logm._m['X.Tstart']
m_hi = m_lo + logm._m['X.δT']
d = max(0, min(τ_hi, m_hi) -
max(τ_lo, m_lo))
if d >= δτ/2: # NOTE ≥ 50%, not > 50% not to skip drb_stats if fill is exactly 50%
d = max(0, min(tau_hi, m_hi) -
max(tau_lo, m_lo))
if d >= dtau/2: # NOTE ≥ 50%, not > 50% not to skip drb_stats if fill is exactly 50%
_drb_update(logm._m, drb_stats)
return
......@@ -406,9 +406,9 @@ def _handle_drb_stats(logm, drb_stats: xlog.Message):
n_lo = logm._m_next['X.Tstart']
# n_hi - don't know as _m_next['X.δT'] is ø yet
d = max(0, τ_hi -
max(τ_lo, n_lo))
if d >= δτ/2:
d = max(0, tau_hi -
max(tau_lo, n_lo))
if d >= dtau/2:
_drb_update(logm._m_next, drb_stats)
return
......@@ -434,16 +434,16 @@ def _drb_update(m: kpi.Measurement, drb_stats: xlog.Message):
# DRB.IPVol and DRB.IPTime are collected to compute throughput.
#
# thp = ΣB*/ΣT* where B* is tx'ed bytes in the sample without taking last tti into account
# thp = SB*/ST* where B* is tx'ed bytes in the sample without taking last tti into account
# and T* is time of tx also without taking that sample's tail tti.
#
# we only know ΣB (whole amount of tx), ΣT and ΣT* with some error.
# we only know SB (whole amount of tx), ST and ST* with some error.
#
# -> thp can be estimated to be inside the following interval:
#
# ΣB ΣB
# SB SB
# ───── ≤ thp ≤ ───── (1)
# ΣT_hi ΣT*_lo
# ST_hi ST*_lo
#
# the upper layer in xlte.kpi will use the following formula for
# final throughput calculation:
......@@ -452,28 +452,28 @@ def _drb_update(m: kpi.Measurement, drb_stats: xlog.Message):
# thp = ────────── (2)
# DRB.IPTime
#
# -> set DRB.IPTime and its error to mean and δ of ΣT_hi and ΣT*_lo
# -> set DRB.IPTime and its error to mean and delta of ST_hi and ST*_lo
# so that (2) becomes (1).
# FIXME we account whole PDCP instead of only IP traffic
ΣB = trx['%s_tx_bytes' % dir]
ΣT = trx['%s_tx_time' % dir]
ΣT_err = trx['%s_tx_time_err' % dir]
ΣTT = trx['%s_tx_time_notailtti' % dir]
ΣTT_err = trx['%s_tx_time_notailtti_err' % dir]
SB = trx['%s_tx_bytes' % dir]
ST = trx['%s_tx_time' % dir]
ST_err = trx['%s_tx_time_err' % dir]
STT = trx['%s_tx_time_notailtti' % dir]
STT_err = trx['%s_tx_time_notailtti_err' % dir]
ΣT_hi = ΣT + ΣT_err
ΣTT_lo = ΣTT - ΣTT_err
ST_hi = ST + ST_err
STT_lo = STT - STT_err
qvol[qci] = 8*ΣB # in bits
qtime[qci] = (ΣT_hi + ΣTT_lo) / 2
qtime_err[qci] = (ΣT_hi - ΣTT_lo) / 2
qvol[qci] = 8*SB # in bits
qtime[qci] = (ST_hi + STT_lo) / 2
qtime_err[qci] = (ST_hi - STT_lo) / 2
# LogError(timestamp|None, *argv).
@func(LogError)
def __init__(e, τ, *argv):
e.timestamp = τ
def __init__(e, tau, *argv):
e.timestamp = tau
super(LogError, e).__init__(*argv)
# __str__ returns human-readable form.
......
This diff is collapsed.
......@@ -190,20 +190,20 @@ def xlog(ctx, wsuri, logspecv):
# e.g. disk full in xl.jemit itself
log.exception('xlog failure (second level):')
δt_reconnect = min(3, lsync.period)
dt_reconnect = min(3, lsync.period)
_, _rx = select(
ctx.done().recv, # 0
time.after(δt_reconnect).recv, # 1
time.after(dt_reconnect).recv, # 1
)
if _ == 0:
raise ctx.err()
# _XLogger serves xlog implementation.
class _XLogger:
def __init__(xl, wsuri, logspecv, δt_sync):
def __init__(xl, wsuri, logspecv, dt_sync):
xl.wsuri = wsuri
xl.logspecv = logspecv
xl.δt_sync = δt_sync # = logspecv.get("meta.sync").period
xl.dt_sync = dt_sync # = logspecv.get("meta.sync").period
xl.tsync = float('-inf') # never yet
# emit saves line to the log.
......@@ -235,7 +235,7 @@ class _XLogger:
def xlog1(xl, ctx):
# emit sync periodically even in detached state
# this is useful to still know e.g. intended logspec if the service is stopped for a long time
if time.now() - xl.tsync >= xl.δt_sync:
if time.now() - xl.tsync >= xl.dt_sync:
xl.jemit_sync("detached", "periodic", {})
# connect to the service
......@@ -336,11 +336,11 @@ class _XLogger:
# TODO detect time overruns and correct schedule correspondingly
tnow = time.now()
tarm = t0 + tmin
δtsleep = tarm - tnow
if δtsleep > 0:
dtsleep = tarm - tnow
if dtsleep > 0:
_, _rx = select(
ctx.done().recv, # 0
time.after(δtsleep).recv, # 1
time.after(dtsleep).recv, # 1
)
if _ == 0:
raise ctx.err()
......@@ -420,7 +420,7 @@ class _XMsgServer:
resp_raw = json.dumps(resp,
separators=(',', ':'), # most compact, like Amari does
ensure_ascii=False) # so that e.g. δt comes as is
ensure_ascii=False) # so that e.g. dt comes as is
return resp, resp_raw
......
......@@ -145,10 +145,10 @@ def test_Reader_readahead_vs_eof():
fxlog.seek(pos, io.SEEK_SET)
xr = xlog.Reader(fxlog)
def expect_msg(τ, msg):
def expect_msg(tau, msg):
_ = xr.read()
assert type(_) is xlog.Message
assert _.timestamp == τ
assert _.timestamp == tau
assert _.message == msg
logit('{"message": "aaa", "utc": 1}')
......
......@@ -116,26 +116,26 @@
"\n",
"# calc_each_period partitions mlog data into periods and yields kpi.Calc for each period.\n",
"def calc_each_period(mlog: kpi.MeasurementLog, tperiod: float): # -> yield kpi.Calc\n",
" τ = mlog.data()[0]['X.Tstart']\n",
" tau = mlog.data()[0]['X.Tstart']\n",
" for m in mlog.data()[1:]:\n",
" τ_ = m['X.Tstart']\n",
" if (τ_ - τ) >= tperiod:\n",
" calc = kpi.Calc(mlog, τ, τ+tperiod)\n",
" τ = calc.τ_hi\n",
" tau_ = m['X.Tstart']\n",
" if (tau_ - tau) >= tperiod:\n",
" calc = kpi.Calc(mlog, tau, tau+tperiod)\n",
" tau = calc.tau_hi\n",
" yield calc\n",
"\n",
"tperiod = 1*60 # 1 minute\n",
"vτ = []\n",
"vtau = []\n",
"vInititialEPSBEstabSR = []\n",
"vAddedEPSBEstabSR = []\n",
"\n",
"for calc in calc_each_period(mlog, tperiod):\n",
" vτ.append(calc.τ_lo)\n",
" vtau.append(calc.tau_lo)\n",
" _ = calc.erab_accessibility() # E-RAB Accessibility\n",
" vInititialEPSBEstabSR.append(_[0])\n",
" vAddedEPSBEstabSR .append(_[1])\n",
"\n",
"vτ = np.asarray([datetime.fromtimestamp(_) for _ in vτ])\n",
"vtau = np.asarray([datetime.fromtimestamp(_) for _ in vtau])\n",
"vInititialEPSBEstabSR = np.asarray(vInititialEPSBEstabSR)\n",
"vAddedEPSBEstabSR = np.asarray(vAddedEPSBEstabSR)"
]
......@@ -188,7 +188,7 @@
"from xlte.demo import kpidemo\n",
"import matplotlib.pyplot as plt\n",
"\n",
"kpidemo.figplot_erab_accessibility(plt.gcf(), vτ, vInititialEPSBEstabSR, vAddedEPSBEstabSR, tperiod)"
"kpidemo.figplot_erab_accessibility(plt.gcf(), vtau, vInititialEPSBEstabSR, vAddedEPSBEstabSR, tperiod)"
]
},
{
......@@ -264,15 +264,15 @@
"outputs": [],
"source": [
"tperiod = 3 # 3 seconds\n",
"vτ = []\n",
"vtau = []\n",
"vIPThp_qci = []\n",
"\n",
"for calc in calc_each_period(mlog, tperiod):\n",
" vτ.append(calc.τ_lo)\n",
" vtau.append(calc.tau_lo)\n",
" _ = calc.eutran_ip_throughput() # E-UTRAN IP Throughput\n",
" vIPThp_qci.append(_)\n",
"\n",
"vτ = np.asarray([datetime.fromtimestamp(_) for _ in vτ])\n",
"vtau = np.asarray([datetime.fromtimestamp(_) for _ in vtau])\n",
"vIPThp_qci = np.asarray(vIPThp_qci)"
]
},
......@@ -304,7 +304,7 @@
"source": [
"fig = plt.gcf()\n",
"fig.set_size_inches(10, 8)\n",
"kpidemo.figplot_eutran_ip_throughput(fig, vτ, vIPThp_qci, tperiod)"
"kpidemo.figplot_eutran_ip_throughput(fig, vtau, vIPThp_qci, tperiod)"
]
},
{
......
......@@ -67,22 +67,22 @@ def main():
# calc_each_period partitions mlog data into periods and yields kpi.Calc for each period.
def calc_each_period(mlog: kpi.MeasurementLog, tperiod: float): # -> yield kpi.Calc
τ = mlog.data()[0]['X.Tstart']
tau = mlog.data()[0]['X.Tstart']
for m in mlog.data()[1:]:
τ_ = m['X.Tstart']
if (τ_ - τ) >= tperiod:
calc = kpi.Calc(mlog, τ, τ+tperiod)
τ = calc.τ_hi
tau_ = m['X.Tstart']
if (tau_ - tau) >= tperiod:
calc = kpi.Calc(mlog, tau, tau+tperiod)
tau = calc.tau_hi
yield calc
tperiod = float(sys.argv[1])
vτ = []
vtau = []
vInititialEPSBEstabSR = []
vAddedEPSBEstabSR = []
vIPThp_qci = []
for calc in calc_each_period(mlog, tperiod):
vτ.append(calc.τ_lo)
vtau.append(calc.tau_lo)
_ = calc.erab_accessibility() # E-RAB Accessibility
vInititialEPSBEstabSR.append(_[0])
......@@ -91,7 +91,7 @@ def main():
_ = calc.eutran_ip_throughput() # E-UTRAN IP Throughput
vIPThp_qci.append(_)
vτ = np.asarray([datetime.fromtimestamp(_) for _ in vτ])
vtau = np.asarray([datetime.fromtimestamp(_) for _ in vtau])
vInititialEPSBEstabSR = np.asarray(vInititialEPSBEstabSR)
vAddedEPSBEstabSR = np.asarray(vAddedEPSBEstabSR)
vIPThp_qci = np.asarray(vIPThp_qci)
......@@ -125,30 +125,30 @@ def main():
fig = plt.figure(constrained_layout=True, figsize=(12,8))
facc, fthp = fig.subfigures(1, 2)
figplot_erab_accessibility (facc, vτ, vInititialEPSBEstabSR, vAddedEPSBEstabSR, tperiod)
figplot_eutran_ip_throughput(fthp, vτ, vIPThp_qci, tperiod)
figplot_erab_accessibility (facc, vtau, vInititialEPSBEstabSR, vAddedEPSBEstabSR, tperiod)
figplot_eutran_ip_throughput(fthp, vtau, vIPThp_qci, tperiod)
plt.show()
# ---- plotting routines ----
# figplot_erab_accessibility plots E-RAB Accessibility KPI data on the figure.
def figplot_erab_accessibility(fig: plt.Figure, vτ, vInititialEPSBEstabSR, vAddedEPSBEstabSR, tperiod=None):
def figplot_erab_accessibility(fig: plt.Figure, vtau, vInititialEPSBEstabSR, vAddedEPSBEstabSR, tperiod=None):
ax1, ax2 = fig.subplots(2, 1, sharex=True)
fig.suptitle("E-RAB Accessibility / %s" % (tpretty(tperiod) if tperiod is not None else
vτ_period_pretty(vτ)))
vtau_period_pretty(vtau)))
ax1.set_title("Initial E-RAB establishment success rate")
ax2.set_title("Added E-RAB establishment success rate")
plot_success_rate(ax1, vτ, vInititialEPSBEstabSR, "InititialEPSBEstabSR")
plot_success_rate(ax2, vτ, vAddedEPSBEstabSR, "AddedEPSBEstabSR")
plot_success_rate(ax1, vtau, vInititialEPSBEstabSR, "InititialEPSBEstabSR")
plot_success_rate(ax2, vtau, vAddedEPSBEstabSR, "AddedEPSBEstabSR")
# figplot_eutran_ip_throughput plots E-UTRAN IP Throughput KPI data on the figure.
def figplot_eutran_ip_throughput(fig: plt.Figure, vτ, vIPThp_qci, tperiod=None):
def figplot_eutran_ip_throughput(fig: plt.Figure, vtau, vIPThp_qci, tperiod=None):
ax1, ax2 = fig.subplots(2, 1, sharex=True)
fig.suptitle("E-UTRAN IP Throughput / %s" % (tpretty(tperiod) if tperiod is not None else
vτ_period_pretty(vτ)))
vtau_period_pretty(vtau)))
ax1.set_title("Downlink")
ax2.set_title("Uplink")
ax1.set_ylabel("Mbit/s")
......@@ -156,8 +156,8 @@ def figplot_eutran_ip_throughput(fig: plt.Figure, vτ, vIPThp_qci, tperiod=None)
v_qci = (vIPThp_qci .view(np.float64) / 1e6) \
.view(vIPThp_qci.dtype)
plot_per_qci(ax1, vτ, v_qci[:,:]['dl'], 'IPThp')
plot_per_qci(ax2, vτ, v_qci[:,:]['ul'], 'IPThp')
plot_per_qci(ax1, vtau, v_qci[:,:]['dl'], 'IPThp')
plot_per_qci(ax2, vtau, v_qci[:,:]['ul'], 'IPThp')
_, dmax = ax1.get_ylim()
_, umax = ax2.get_ylim()
......@@ -167,9 +167,9 @@ def figplot_eutran_ip_throughput(fig: plt.Figure, vτ, vIPThp_qci, tperiod=None)
# plot_success_rate plots success-rate data from vector v on ax.
# v is array with Intervals.
def plot_success_rate(ax, vτ, v, label):
ax.plot(vτ, v['lo'], drawstyle='steps-post', label=label)
ax.fill_between(vτ, v['lo'], v['hi'],
def plot_success_rate(ax, vtau, v, label):
ax.plot(vtau, v['lo'], drawstyle='steps-post', label=label)
ax.fill_between(vtau, v['lo'], v['hi'],
step='post', alpha=0.1, label='%s\nuncertainty' % label)
ax.set_ylabel("%")
......@@ -185,8 +185,8 @@ def plot_success_rate(ax, vτ, v, label):
#
# v_qci should be array[t, QCI].
# QCIs, for which v[:,qci] is all zeros, are said to be silent and are not plotted.
def plot_per_qci(ax, vτ, v_qci, label):
ax.set_xlim((vτ[0], vτ[-1])) # to have correct x range even if we have no data
def plot_per_qci(ax, vtau, v_qci, label):
ax.set_xlim((vtau[0], vtau[-1])) # to have correct x range even if we have no data
assert len(v_qci.shape) == 2
silent = True
propv = list(plt.rcParams['axes.prop_cycle'])
......@@ -196,8 +196,8 @@ def plot_per_qci(ax, vτ, v_qci, label):
continue
silent = False
prop = propv[qci % len(propv)] # to have same colors for same qci in different graphs
ax.plot(vτ, v['lo'], label="%s.%d" % (label, qci), **prop)
ax.fill_between(vτ, v['lo'], v['hi'], alpha=0.3, **prop)
ax.plot(vtau, v['lo'], label="%s.%d" % (label, qci), **prop)
ax.fill_between(vtau, v['lo'], v['hi'], alpha=0.3, **prop)
if silent:
ax.plot([],[], ' ', label="all QCI silent")
......@@ -222,17 +222,17 @@ def tpretty(t):
return "%s%s" % ("%d'" % tmin if tmin else '',
'%d"' % tsec if tsec else '')
# vτ_period_pretty returns pretty form for time period in vector vτ.
# vtau_period_pretty returns pretty form for time period in vector vtau.
# for example [2,5,8,11] gives 3'.
def vτ_period_pretty(vτ):
if len(vτ) < 2:
def vtau_period_pretty(vtau):
if len(vtau) < 2:
return "?"
s = timedelta(seconds=1)
δvτ = (vτ[1:] - vτ[:-1]) / s # in seconds
min = δvτ.min()
avg = δvτ.mean()
max = δvτ.max()
std = δvτ.std()
dvtau = (vtau[1:] - vtau[:-1]) / s # in seconds
min = dvtau.min()
avg = dvtau.mean()
max = dvtau.max()
std = dvtau.std()
if min == max:
return tpretty(min)
return "%s ±%s [%s, %s]" % (tpretty(avg), tpretty(std), tpretty(min), tpretty(max))
......
#!/bin/bash -e
for f in `git ls-files |grep -v greek2lat`; do
sed -e "
s/Σqci/Sqci/g
s/Σcause/Scause/g
s/τ/tau/g
s/Σ/S/g
s/δtau/dtau/g
s/δt/dt/g
s/δ_ue_stats/d_ue_stats/g
s/\bμ\b/mu/g
s/\bμ_\b/mu_/g
s/\bσ\b/std/g
s/\bσ2\b/s2/g
s/tδstats/tdstats/g
s/δcounters/dcounters/g
s/\bδv\b/dv/g
s/δcc/dcc/g
s/ δ / delta /g
s/ δ$/ delta/g
s/δvtau/dvtau/g
" -i $f
done
This diff is collapsed.
......@@ -20,7 +20,7 @@
from __future__ import print_function, division, absolute_import
from xlte.kpi import Calc, MeasurementLog, Measurement, Interval, NA, isNA, Σqci, Σcause, nqci
from xlte.kpi import Calc, MeasurementLog, Measurement, Interval, NA, isNA, Sqci, Scause, nqci
import numpy as np
from pytest import raises
......@@ -81,10 +81,10 @@ def test_Measurement():
# verify that time fields has enough precision
t2022 = 1670691601.8999548 # in 2022.Dec
t2118 = 4670691601.1234567 # in 2118.Jan
def _(τ):
m['X.Tstart'] = τ
τ_ = m['X.Tstart']
assert τ_ == τ
def _(tau):
m['X.Tstart'] = tau
tau_ = m['X.Tstart']
assert tau_ == tau
_(t2022)
_(t2118)
......@@ -166,15 +166,15 @@ def test_MeasurementLog():
assert _.shape == (0,)
# verify (τ_lo, τ_hi) widening and overlapping with Measurements on Calc initialization.
# verify (tau_lo, tau_hi) widening and overlapping with Measurements on Calc initialization.
def test_Calc_init():
mlog = MeasurementLog()
# _ asserts that Calc(mlog, τ_lo,τ_hi) has .τ_lo/.τ_hi as specified by
# τ_wlo/τ_whi, and ._data as specified by mokv.
def _(τ_lo, τ_hi, τ_wlo, τ_whi, *mokv):
c = Calc(mlog, τ_lo,τ_hi)
assert (c.τ_lo, c.τ_hi) == (τ_wlo, τ_whi)
# _ asserts that Calc(mlog, tau_lo,tau_hi) has .tau_lo/.tau_hi as specified by
# tau_wlo/tau_whi, and ._data as specified by mokv.
def _(tau_lo, tau_hi, tau_wlo, tau_whi, *mokv):
c = Calc(mlog, tau_lo,tau_hi)
assert (c.tau_lo, c.tau_hi) == (tau_wlo, tau_whi)
mv = list(c._data[i] for i in range(len(c._data)))
assert mv == list(mokv)
......@@ -223,18 +223,18 @@ def test_Calc_init():
def test_Calc_miter():
mlog = MeasurementLog()
# _ asserts that Calc(mlog, τ_lo,τ_hi)._miter yields Measurement as specified by mokv.
def _(τ_lo, τ_hi, *mokv):
c = Calc(mlog, τ_lo,τ_hi)
# _ asserts that Calc(mlog, tau_lo,tau_hi)._miter yields Measurement as specified by mokv.
def _(tau_lo, tau_hi, *mokv):
c = Calc(mlog, tau_lo,tau_hi)
mv = list(c._miter())
assert mv == list(mokv)
# na returns Measurement with specified τ_lo/τ_hi and NA for all other data.
def na(τ_lo, τ_hi):
assert τ_lo <= τ_hi
# na returns Measurement with specified tau_lo/tau_hi and NA for all other data.
def na(tau_lo, tau_hi):
assert tau_lo <= tau_hi
m = Measurement()
m['X.Tstart'] = τ_lo
m['X.δT'] = τ_hi - τ_lo
m['X.Tstart'] = tau_lo
m['X.δT'] = tau_hi - tau_lo
return m
# mlog(ø)
......@@ -275,10 +275,10 @@ def test_Calc_success_rate():
fini = "S1SIG.ConnEstabSucc"
# M returns Measurement with specified time coverage and init/fini values.
def M(τ_lo,τ_hi, vinit=None, vfini=None):
def M(tau_lo,tau_hi, vinit=None, vfini=None):
m = Measurement()
m['X.Tstart'] = τ_lo
m['X.δT'] = τ_hi - τ_lo
m['X.Tstart'] = tau_lo
m['X.δT'] = tau_hi - tau_lo
if vinit is not None:
m[init] = vinit
if vfini is not None:
......@@ -292,10 +292,10 @@ def test_Calc_success_rate():
for m in mv:
mlog.append(m)
# _ asserts that Calc(mlog, τ_lo,τ_hi)._success_rate(fini, init) returns Interval(sok_lo, sok_hi).
def _(τ_lo, τ_hi, sok_lo, sok_hi):
# _ asserts that Calc(mlog, tau_lo,tau_hi)._success_rate(fini, init) returns Interval(sok_lo, sok_hi).
def _(tau_lo, tau_hi, sok_lo, sok_hi):
sok = Interval(sok_lo, sok_hi)
c = Calc(mlog, τ_lo, τ_hi)
c = Calc(mlog, tau_lo, tau_hi)
s = c._success_rate(fini, init)
assert type(s) is Interval
eps = np.finfo(s['lo'].dtype).eps
......@@ -323,7 +323,7 @@ def test_Calc_success_rate():
# i₁=8
# f₁=4
# ────|──────|─────────────|──────────
# 10 t₁ 20 ←── t₂ ──→ τ_hi
# 10 t₁ 20 ←── t₂ ──→ tau_hi
#
# t with data: t₁
# t with no data: t₂
......@@ -355,7 +355,7 @@ def test_Calc_success_rate():
# i₁=8 i₂=50
# f₁=4 f₂=50
# ────|──────|──────|───────|──────────────────|──────────
# 10 t₁ 20 ↑ 30 t₂ 40 ↑ τ_hi
# 10 t₁ 20 ↑ 30 t₂ 40 ↑ tau_hi
# │ │
# │ │
# `────────────────── t₃
......@@ -387,18 +387,18 @@ def test_Calc_success_rate():
_( 0,99, 0.18808777429467083, 0.9860675722744688) # t₃=79
# Σqci
init = "Σqci ERAB.EstabInitAttNbr.QCI"
fini = "Σqci ERAB.EstabInitSuccNbr.QCI"
# Sqci
init = "Sqci ERAB.EstabInitAttNbr.QCI"
fini = "Sqci ERAB.EstabInitSuccNbr.QCI"
m = M(10,20)
m['ERAB.EstabInitAttNbr.sum'] = 10
m['ERAB.EstabInitSuccNbr.sum'] = 2
Mlog(m)
_(10,20, 1/5, 1/5)
# Σcause
init = "Σcause RRC.ConnEstabAtt.CAUSE"
fini = "Σcause RRC.ConnEstabSucc.CAUSE"
# Scause
init = "Scause RRC.ConnEstabAtt.CAUSE"
fini = "Scause RRC.ConnEstabSucc.CAUSE"
m = M(10,20)
m['RRC.ConnEstabSucc.sum'] = 5
m['RRC.ConnEstabAtt.sum'] = 10
......@@ -496,42 +496,42 @@ def test_Calc_eutran_ip_throughput():
assert thp[qci]['ul'] == I(0)
# verify Σqci.
def test_Σqci():
# verify Sqci.
def test_Sqci():
m = Measurement()
x = 'ERAB.EstabInitAttNbr'
def Σ():
return Σqci(m, x+'.QCI')
def S():
return Sqci(m, x+'.QCI')
assert isNA(Σ())
assert isNA(S())
m[x+'.sum'] = 123
assert Σ() == 123
assert S() == 123
m[x+'.17'] = 17
m[x+'.23'] = 23
m[x+'.255'] = 255
assert Σ() == 123 # from .sum
assert S() == 123 # from .sum
m[x+'.sum'] = NA(m[x+'.sum'].dtype)
assert isNA(Σ()) # from array, but NA values lead to sum being NA
assert isNA(S()) # from array, but NA values lead to sum being NA
v = m[x+'.QCI']
l = len(v)
for i in range(l):
v[i] = 1 + i
assert Σ() == 1*l + (l-1)*l/2
assert S() == 1*l + (l-1)*l/2
# verify Σcause.
def test_Σcause():
# verify Scause.
def test_Scause():
m = Measurement()
x = 'RRC.ConnEstabAtt'
def Σ():
return Σcause(m, x+'.CAUSE')
def S():
return Scause(m, x+'.CAUSE')
assert isNA(Σ())
assert isNA(S())
m[x+'.sum'] = 123
assert Σ() == 123
assert S() == 123
# TODO sum over individual causes (when implemented)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment