diff --git a/amari/drb.py b/amari/drb.py index fc62dbfe712647ca22a8183628945a7a66dfbd32..8a52d7236bb2c47dc480cdb55d83f90bede269b4 100644 --- a/amari/drb.py +++ b/amari/drb.py @@ -21,11 +21,23 @@ - Sampler converts information about data flows obtained via ue_get[stats] into Samples that represent bursts of continuous transmissions. + +- _x_stats_srv uses Sampler to process data flows at 100Hz rate and aggregates + results into information needed to compute E-UTRAN IP Throughput KPI. The + information is emitted in the form of synthetic x.drb_stats message whose + generation is integrated into amari.xlog package. + +See the following related 3GPP standards references: + + - TS 32.450 6.3.1 "E-UTRAN IP Throughput" + - TS 32.425 4.4.6 "IP Throughput measurements" """ -from golang import func -from golang import time +from xlte import amari + +from golang import chan, select, default, nilchan, func, defer +from golang import sync, time import math import sys @@ -623,6 +635,292 @@ def __repr__(s): # ---------------------------------------- +# _x_stats_srv provides server for x.drb_stats queries. +# +# To do so it polls eNB every 10ms at 100Hz frequency with `ue_get[stats]` +# and tries to further improve accuracy of retrieved DL/UL samples timing +# towards 1ms via heuristic on how much transport blocks were tx/rx'ed +# during each observation. +# +# This heuristic can be used unless eNB is congested. To detect congestion +# _x_stats_srv also polls eNB with `stats` at the same 100Hz frequency and +# synchronized in time with `ue_get[stats]`. The congestion is detected by +# dl_use_avg / ul_use_avg being close to 1. +# +# Since we can detect only the fact of likely congestion, but not the level +# of congestion, nor other details related to QCIs priorities, for congested +# case the heuristic is not used and throughput is reported via rough, but +# relatively true, interval estimates. +# +# NOTE we cannot go polling to higher than 100Hz frequency, since enb +# rate-limits websocket requests to execute not faster than 10ms each. +@func +def _x_stats_srv(ctx, reqch: chan, conn: amari.Conn): + 未t_rate = 10*tti + + # rx_ue_get_stats sends `ue_get[stats]` request and returns server response. + rtt_ue_stats = _IncStats() # time it takes to send ue_get and to receive response + 未t_ue_stats = _IncStats() # 未(ue_stats.timestamp) + t_ue_stats = None # last ue_stats.timestamp + def rx_ue_get_stats(ctx): # -> ue_stats + nonlocal t_ue_stats + t_tx = time.now() + ue_stats = conn.req(ctx, 'ue_get', {'stats': True}) + t_rx = time.now() + rtt_ue_stats.add(t_rx-t_tx) + t = ue_stats['utc'] + if t_ue_stats is not None: + 未t_ue_stats.add(t-t_ue_stats) + t_ue_stats = t + return ue_stats + + # rx_stats sends `stats` request and returns server response. + # we need to query stats to get dl_use/ul_use. + # Establish separate connection for that since if we use the same conn for + # both ue_get and stats queries, due to overall 100Hz rate-limiting, ue_get + # would be retrieved at only 50Hz rate. With separate connection for stats + # we can retrieve both ue_get and stats each at 100Hz simultaneously. + conn_stats = amari.connect(ctx, conn.wsuri) + defer(conn_stats.close) + rtt_stats = _IncStats() # like rtt_ue_stats but for stat instead of ue_get + 未t_stats = _IncStats() # 未(stats.timestamp) + t_stats = None # last stats.timestamp + def rx_stats(ctx): # -> stats + nonlocal t_stats + t_tx = time.now() + stats = conn_stats.req(ctx, 'stats', {}) + t_rx = time.now() + rtt_stats.add(t_rx-t_tx) + t = stats['utc'] + if t_stats is not None: + 未t_stats.add(t-t_stats) + t_stats = t + return stats + # issue first dummy stats. It won't report most of statistics due to + # initial_delay=0, but it will make the next stats query avoid pausing for 0.4s. + conn_stats.req(ctx, 'stats', {'initial_delay': 0}) + + # rx_all simultaneously issues `ue_get[stats]` and `stats` requests and returns server responses. + # the requests are issued synchronized in time. + 未_ue_stats = _IncStats() # ue_stats.timestamp - stats.timestamp + def rx_all(ctx): # -> ue_stats, stats + uq = chan(1) + sq = chan(1) + + _, _rx = select( + ctx.done().recv, # 0 + (ueget_reqch.send, uq), # 1 + ) + if _ == 0: + raise ctx.err() + + _, _rx = select( + ctx.done().recv, # 0 + (stats_reqch.send, sq), # 1 + ) + if _ == 0: + raise ctx.err() + + ue_stats = stats = None + while ue_stats is None or stats is None: + _, _rx = select( + ctx.done().recv, # 0 + uq.recv, # 1 + sq.recv, # 2 + ) + if _ == 0: + raise ctx.err() + if _ == 1: + ue_stats = _rx + uq = nilchan + if _ == 2: + stats = _rx + sq = nilchan + + 未_ue_stats.add(ue_stats['utc'] - stats['utc']) + return ue_stats, stats + + ueget_reqch = chan() + def Trx_ue_get(ctx): + while 1: + _, _rx = select( + ctx.done().recv, # 0 + ueget_reqch.recv, # 1 + ) + if _ == 0: + raise ctx.err() + retq = _rx + + ue_stats = rx_ue_get_stats(ctx) + retq.send(ue_stats) # cap = 1 + + stats_reqch = chan() + def Trx_stats(ctx): + while 1: + _, _rx = select( + ctx.done().recv, # 0 + stats_reqch.recv, # 1 + ) + if _ == 0: + raise ctx.err() + retq = _rx + + stats = rx_stats(ctx) + retq.send(stats) # cap = 1 + + # Tmain is the main thread that drives the process overall + def Tmain(ctx): + nonlocal rtt_ue_stats, 未t_ue_stats + nonlocal rtt_stats, 未t_stats + nonlocal 未_ue_stats + + t_req = time.now() + ue_stats, stats = rx_all(ctx) + + S = Sampler(ue_stats, stats) + qci_危dl = {} # qci -> _危 for dl + qci_危ul = {} # ----//---- for ul + class _危: + __slots__ = ( + 'tx_bytes', + 'tx_time', + 'tx_time_err', + 'tx_time_notailtti', + 'tx_time_notailtti_err', + 'tx_nsamples', + ) + def __init__(危): + for x in 危.__slots__: + setattr(危, x, 0) + # account accounts samples into 危tx_time/危tx_bytes in qci_危. + def account(qci_危, qci_samples): + for qci, samplev in qci_samples.items(): + 危 = qci_危.get(qci) + if 危 is None: + 危 = qci_危[qci] = _危() + for s in samplev: + # do not account short transmissions + # ( tx with 1 tti should be ignored per standard, but it is + # also that small ICMP messages span 2 transport blocks sometimes ) + t_lo = s.tx_time - s.tx_time_err + t_hi = s.tx_time + s.tx_time_err + if t_hi <= 1*tti or (t_hi <= 2 and s.tx_bytes < 1000): + continue + 危.tx_nsamples += 1 + 危.tx_bytes += s.tx_bytes + 危.tx_time += s.tx_time + 危.tx_time_err += s.tx_time_err + + # also aggregate .tx_time without tail tti (IP Throughput KPI needs this) + tt_hi = math.ceil(t_hi/tti - 1) # in tti + tt_lo = t_lo / tti # in tti + if tt_lo > 1: + tt_lo = math.ceil(tt_lo - 1) + tt = (tt_lo + tt_hi) / 2 + tt_err = (tt_hi - tt_lo) / 2 + 危.tx_time_notailtti += tt * tti + 危.tx_time_notailtti_err += tt_err * tti + + + while 1: + # TODO explicitly detect underrun? + _, _rx = select( + ctx.done().recv, # 0 + reqch.recv, # 1 + default, # 2 + ) + if _ == 0: + raise ctx.err() + if _ == 1: + # client requests to retrieve message for accumulated data + opts, respch = _rx + # TODO verify/handle opts? + + # wrap-up flows and account finalized samples + qci_dl, qci_ul = S.finish() + account(qci_危dl, qci_dl) + account(qci_危ul, qci_ul) + + _debug() + _debug('rtt_ue: %s ms' % rtt_ue_stats .str('%.2f', time.millisecond)) + _debug('未t_ue: %s ms' % 未t_ue_stats .str('%.2f', time.millisecond)) + _debug('rtt_stats: %s ms' % rtt_stats .str('%.2f', time.millisecond)) + _debug('未t_stats: %s ms' % 未t_stats .str('%.2f', time.millisecond)) + _debug('未(ue,stat): %s ms' % 未_ue_stats .str('%.2f', time.millisecond)) + + qci_dict = {} + 危0 = _危() + for qci in set(qci_危dl.keys()) .union(qci_危ul.keys()): + 危dl = qci_危dl.get(qci, 危0) + 危ul = qci_危ul.get(qci, 危0) + qci_dict[qci] = { + 'dl_tx_bytes': 危dl.tx_bytes, + 'dl_tx_time': 危dl.tx_time, + 'dl_tx_time_err': 危dl.tx_time_err, + 'dl_tx_time_notailtti': 危dl.tx_time_notailtti, + 'dl_tx_time_notailtti_err': 危dl.tx_time_notailtti_err, + 'dl_tx_nsamples': 危dl.tx_nsamples, + 'ul_tx_bytes': 危ul.tx_bytes, + 'ul_tx_time': 危ul.tx_time, + 'ul_tx_time_err': 危ul.tx_time_err, + 'ul_tx_time_notailtti': 危ul.tx_time_notailtti, + 'ul_tx_time_notailtti_err': 危ul.tx_time_notailtti_err, + 'u;_tx_nsamples': 危ul.tx_nsamples, + } + + r = {'time': ue_stats['time'], + 'utc': ue_stats['utc'], + 'qci_dict': qci_dict, + '未t_ueget': { + 'min': 未t_ue_stats.min, + 'avg': 未t_ue_stats.avg(), + 'max': 未t_ue_stats.max, + 'std': 未t_ue_stats.std(), + }, + '未_ueget_vs_stats': { + 'min': 未_ue_stats.min, + 'avg': 未_ue_stats.avg(), + 'max': 未_ue_stats.max, + 'std': 未_ue_stats.std(), + }, + } + + respch.send(r) + + # reset + qci_危dl = {} + qci_危ul = {} + + rtt_ue_stats = _IncStats() + 未t_ue_stats = _IncStats() + rtt_stats = _IncStats() + 未t_stats = _IncStats() + 未_ue_stats = _IncStats() + + # sync time to keep t_req' - t_req 鈮� 未t_rate + # this should automatically translate to 未t(ue_stats) 鈮� 未t_rate + t = time.now() + 未tsleep = 未t_rate - (t - t_req) + if 未tsleep > 0: + time.sleep(未tsleep) + + # retrieve ue_get[stats] and stats data for next frame from enb + t_req = time.now() + ue_stats, stats = rx_all(ctx) + + # pass data to sampler and account already detected samples + qci_dl, qci_ul = S.add(ue_stats, stats) + account(qci_危dl, qci_dl) + account(qci_危ul, qci_ul) + + # run everything + wg = sync.WorkGroup(ctx) + wg.go(Trx_ue_get) + wg.go(Trx_stats) + wg.go(Tmain) + wg.wait() + + # _IncStats incrementally computes statistics on provided values. # # Provide values via .add(). diff --git a/amari/xlog.py b/amari/xlog.py index 90df5328cd4d75e66faa00b7c1aa37daeed53db7..ffa8b001e87c8a119b8de9e7372b7a6fff766c5d 100644 --- a/amari/xlog.py +++ b/amari/xlog.py @@ -24,7 +24,7 @@ - use Reader to read logged information from xlog. -(*) for example result of stats, ue_get and erab_get queries. +(*) for example result of stats, ue_get, erab_get and synthetic queries. """ # XLog protocol @@ -59,6 +59,7 @@ from xlte import amari +from xlte.amari import drb import json import traceback @@ -196,14 +197,32 @@ class _XLogger: wg = sync.WorkGroup(ctx) defer(wg.wait) + # spawn servers to handle queries with synthetic messages + xmsgsrv_dict = {} + for l in xl.logspecv: + if l.query in _xmsg_registry: + xsrv = _XMsgServer(l.query, _xmsg_registry[l.query]) + xmsgsrv_dict[l.query] = xsrv + xsrv_ready = chan() # wait for xmsg._runCtx to be initialized + wg.go(xsrv.run, conn, xsrv_ready) + xsrv_ready.recv() + # spawn main logger - wg.go(xl._xlog1, conn) + wg.go(xl._xlog1, conn, xmsgsrv_dict) - def _xlog1(xl, ctx, conn): + def _xlog1(xl, ctx, conn, xmsgsrv_dict): + # req_ queries either amari service directly, or an extra message service. + def req_(ctx, query, opts): # -> resp_raw + if query in xmsgsrv_dict: + query_xsrv = xmsgsrv_dict[query] + _, resp_raw = query_xsrv.req_(ctx, opts) + else: + _, resp_raw = conn.req_(ctx, query, opts) + return resp_raw # emit config_get after attach - _, cfg_raw = conn.req_('config_get', {}) + cfg_raw = req_(ctx, 'config_get', {}) xl.emit(cfg_raw) # loop emitting requested logspecs @@ -247,10 +266,83 @@ class _XLogger: if _ == 0: raise ctx.err() - _, resp_raw = conn.req_(logspec.query, opts) + resp_raw = req_(ctx, logspec.query, opts) xl.emit(resp_raw) +# _XMsgServer represents a server for handling particular synthetic requests. +# +# for example the server for synthetic x.drb_stats query. +class _XMsgServer: + def __init__(xsrv, name, f): + xsrv.name = name # str message name, e.g. "x.drb_stats" + xsrv._func = f # func(ctx, conn) to run the service + xsrv._reqch = chan() # chan<respch> to send requests to the service + xsrv._runCtx = None # context not done while .run is running + + # run runs the extra server on amari service attached to via conn. + @func + def run(xsrv, ctx, conn: amari.Conn, ready: chan): + xsrv._runCtx, cancel = context.with_cancel(ctx) + defer(cancel) + ready.close() + # establish dedicated conn2 so that server does not semantically + # affect requests issued by main logger. For example if we do not and + # main logger queries stats, and x.drb_stats server also queries stats + # internally, then data received by main logger will cover only small + # random period of time instead of full wanted period. + conn2 = amari.connect(ctx, conn.wsuri) + defer(conn2.close) + xsrv._func(ctx, xsrv._reqch, conn2) + + # req queries the server and returns its response. + @func + def req_(xsrv, ctx, opts): # -> resp, resp_raw + origCtx = ctx + ctx, cancel = context.merge(ctx, xsrv._runCtx) # need only merge_cancel + defer(cancel) + + respch = chan(1) + _, _rx = select( + ctx.done().recv, # 0 + (xsrv._reqch.send, (opts, respch)), # 1 + ) + if _ == 0: + if xsrv._runCtx.err() and not origCtx.err(): + raise RuntimeError("%s server is down" % xsrv.name) + raise ctx.err() + + _, _rx = select( + ctx.done().recv, # 0 + respch.recv, # 1 + ) + if _ == 0: + if xsrv._runCtx.err() and not origCtx.err(): + raise RuntimeError("%s server is down" % xsrv.name) + raise ctx.err() + resp = _rx + + r = {'message': xsrv.name} # place 'message' first + r.update(resp) + resp = r + + resp_raw = json.dumps(resp, + separators=(',', ':'), # most compact, like Amari does + ensure_ascii=False) # so that e.g. 未t comes as is + return resp, resp_raw + + +# @_xmsg registers func f to provide server for extra messages with specified name. +_xmsg_registry = {} # name -> xsrv_func(ctx, reqch, conn) +def _xmsg(name, f, doc1): + assert name not in _xmsg_registry + f.xlog_doc1 = doc1 + _xmsg_registry[name] = f + +_xmsg("x.drb_stats", drb._x_stats_srv, "retrieve statistics about data radio bearers") + + + # ---------------------------------------- # Reader wraps IO reader to read information generated by xlog. @@ -435,11 +527,18 @@ Example for <logspec>+: stats[samples,rf]/30s ue_get[stats] erab_get/10s qos_flow_get +Besides queries supported by Amarisoft LTE stack natively, support for the +following synthetic queries is also provided: + +%s Options: -h --help show this help -""" % LogSpec.DEFAULT_PERIOD, file=out) +""" % (LogSpec.DEFAULT_PERIOD, + '\n'.join(" %-14s %s" % (q, f.xlog_doc1) + for q, f in sorted(_xmsg_registry.items()))), +file=out) def main(ctx, argv):