Commit 0c772eb4 authored by Kirill Smelkov's avatar Kirill Smelkov

amari.xlog: attach,sync += information about on-service time

We currently emit information about local time in events, and
information about on-service time in messages. Events don't have
information about on-service time and messages don't carry information
about local time. That is mostly ok, since primary xlog setup is to run
on the same machine, where eNB runs because on-service .utc correlates
with .time in events.

However for eNB < 2022-12-01 on-service time includes only .time field
without .utc field with .time representing "time passed since when eNB
was started". This way for enb.xlog streams generated on older systems
it is not possible for xlog.Reader to know the absolute timestamps of
read messages.

To fix this we amend "attach" and "sync" events to carry both local and
on-service times. This way xlog.Reader, after seeing e.g. "sync" with
.time and only .srv_time without .srv_utc, should be able to
correlate local and on-service clocks and to approximate srv_utc as

	srv_utc' = srv_time' + (time - srv_time)

where time and srv_time correspond to last synchronization, and
srv_time' is what xlog.Reader retrieves for a further-read message in
question.

See kirr/xlte!3 for related discussion.
parent dbecc158
...@@ -63,7 +63,8 @@ def connect(ctx, wsuri): # -> Conn ...@@ -63,7 +63,8 @@ def connect(ctx, wsuri): # -> Conn
class Conn: class Conn:
# .wsuri websocket uri of the service # .wsuri websocket uri of the service
# ._ws websocket connection to service # ._ws websocket connection to service
# ._srv_ready_msg message we got for "ready" # .srv_ready_msg message we got for "ready"
# .t_srv_ready_msg timestamp of "ready" reception
# ._mu sync.Mutex # ._mu sync.Mutex
# ._rxtab {} msgid -> (request, rx channel) | None # ._rxtab {} msgid -> (request, rx channel) | None
...@@ -76,6 +77,7 @@ class Conn: ...@@ -76,6 +77,7 @@ class Conn:
def __init__(conn, ws, wsuri): def __init__(conn, ws, wsuri):
try: try:
msg0_raw = ws.recv() msg0_raw = ws.recv()
t_msg0 = time.now()
msg0 = json.loads(msg0_raw) msg0 = json.loads(msg0_raw)
# TODO also support 'authenticate' # TODO also support 'authenticate'
if msg0['message'] != 'ready': if msg0['message'] != 'ready':
...@@ -86,7 +88,8 @@ class Conn: ...@@ -86,7 +88,8 @@ class Conn:
conn.wsuri = wsuri conn.wsuri = wsuri
conn._ws = ws conn._ws = ws
conn._srv_ready_msg = msg0 conn.srv_ready_msg = msg0
conn.t_srv_ready_msg = t_msg0
conn._mu = sync.Mutex() conn._mu = sync.Mutex()
conn._rxtab = {} conn._rxtab = {}
...@@ -236,12 +239,12 @@ class Conn: ...@@ -236,12 +239,12 @@ class Conn:
@property @property
def srv_type(conn): def srv_type(conn):
return conn._srv_ready_msg['type'] return conn.srv_ready_msg['type']
@property @property
def srv_name(conn): def srv_name(conn):
return conn._srv_ready_msg['name'] return conn.srv_ready_msg['name']
@property @property
def srv_version(conn): def srv_version(conn):
return conn._srv_ready_msg['version'] return conn.srv_ready_msg['version']
...@@ -247,10 +247,19 @@ class _XLogger: ...@@ -247,10 +247,19 @@ class _XLogger:
defer(conn.close) defer(conn.close)
# emit "service attach"/"service detach" # emit "service attach"/"service detach"
#
# on attach, besides name/type/version, also emit everything present
# in the first ready message from the service. This should include
# "time" and optionally "utc" for releases ≥ 2022-12-01.
srv_info = {"srv_name": conn.srv_name, srv_info = {"srv_name": conn.srv_name,
"srv_type": conn.srv_type, "srv_type": conn.srv_type,
"srv_version": conn.srv_version} "srv_version": conn.srv_version}
xl.jemit("service attach", srv_info) srv_iattach = srv_info.copy()
for k, v in conn.srv_ready_msg.items():
if k in {"message", "type", "name", "version"}:
continue
srv_iattach["srv_"+k] = v
xl.jemit("service attach", srv_iattach)
def _(): def _():
try: try:
raise raise
...@@ -281,18 +290,22 @@ class _XLogger: ...@@ -281,18 +290,22 @@ class _XLogger:
def _xlog1(xl, ctx, conn, xmsgsrv_dict, srv_info): def _xlog1(xl, ctx, conn, xmsgsrv_dict, srv_info):
# req_ queries either amari service directly, or an extra message service. # req_ queries either amari service directly, or an extra message service.
def req_(ctx, query, opts): # -> resp_raw def req_(ctx, query, opts): # -> (t_rx, resp, resp_raw)
if query in xmsgsrv_dict: if query in xmsgsrv_dict:
query_xsrv = xmsgsrv_dict[query] query_xsrv = xmsgsrv_dict[query]
_, resp_raw = query_xsrv.req_(ctx, opts) resp, resp_raw = query_xsrv.req_(ctx, opts)
else: else:
_, resp_raw = conn.req_(ctx, query, opts) resp, resp_raw = conn.req_(ctx, query, opts)
return resp_raw return (time.now(), resp, resp_raw)
# loop emitting requested logspecs # loop emitting requested logspecs
t0 = time.now() t0 = time.now()
tnextv = [0]*len(xl.logspecv) # [i] - next time to arm for logspecv[i] relative to t0 tnextv = [0]*len(xl.logspecv) # [i] - next time to arm for logspecv[i] relative to t0
t_rx = conn.t_srv_ready_msg # time of last received message
srv_time = conn.srv_ready_msg["time"] # .time in ----//----
srv_utc = conn.srv_ready_msg.get("utc") # .utc in ----//---- (present ≥ 2022-12-01)
while 1: while 1:
# go through all logspecs in the order they were given # go through all logspecs in the order they were given
# pick logspec with soonest arm time # pick logspec with soonest arm time
...@@ -331,9 +344,19 @@ class _XLogger: ...@@ -331,9 +344,19 @@ class _XLogger:
raise ctx.err() raise ctx.err()
if logspec.query == 'meta.sync': if logspec.query == 'meta.sync':
xl.jemit_sync("attached", "periodic", srv_info) # emit sync with srv_time and srv_utc approximated from last
# rx'ed message and local clock run since that reception
tnow = time.now()
isync = srv_info.copy()
isync["srv_time"] = srv_time + (tnow - t_rx)
if srv_utc is not None:
isync["srv_utc"] = srv_utc + (tnow - t_rx)
xl.jemit_sync("attached", "periodic", isync)
else: else:
resp_raw = req_(ctx, logspec.query, opts) t_rx, resp, resp_raw = req_(ctx, logspec.query, opts)
srv_time = resp["time"]
srv_utc = resp.get("utc")
xl.emit(resp_raw) xl.emit(resp_raw)
...@@ -459,6 +482,7 @@ class SyncEvent(Event): ...@@ -459,6 +482,7 @@ class SyncEvent(Event):
# .state # .state
# .reason # .reason
# .generator # .generator
# .srv_time | None if not present
pass pass
...@@ -504,6 +528,18 @@ def read(xr): # -> Event|Message|None ...@@ -504,6 +528,18 @@ def read(xr): # -> Event|Message|None
# message # message
assert isinstance(x, Message) assert isinstance(x, Message)
# provide timestamps for xlog messages generated with eNB < 2022-12-01
# there messages come without .utc field and have only .time
# we estimate the timestamp from .time and from δ(utc,time) taken from covering sync
if x.timestamp is None:
if xr._sync is not None and xr._sync.srv_time is not None:
# srv_utc' = srv_time' + (time - srv_time)
srv_time_ = x.get1("time", (float,int)) # ParseError if not present
x.timestamp = srv_time_ + (xr._sync.timestamp - xr._sync.srv_time)
if x.timestamp is None:
raise ParseError("%s:%d/%s no `utc` and cannot compute "
"timestamp with sync" % (x.pos[0], x.pos[1], '/'.join(x.path)))
# TODO verify messages we get/got against their schedule in covering sync. # TODO verify messages we get/got against their schedule in covering sync.
# Raise LOSError (loss of synchronization) if what we actually see # Raise LOSError (loss of synchronization) if what we actually see
# does not match what sync says it should be. # does not match what sync says it should be.
...@@ -569,6 +605,9 @@ def _read1(xr): ...@@ -569,6 +605,9 @@ def _read1(xr):
else: else:
x.state = meta.get1("state", str) x.state = meta.get1("state", str)
x.reason = meta.get1("reason", str) x.reason = meta.get1("reason", str)
x.srv_time = None
if "srv_time" in meta:
x.srv_time = meta.get1("srv_time", (float,int))
# TODO parse generator -> .logspecv # TODO parse generator -> .logspecv
return x return x
...@@ -578,7 +617,12 @@ def _read1(xr): ...@@ -578,7 +617,12 @@ def _read1(xr):
# NOTE .time is internal eNB time using clock originating at eNB startup. # NOTE .time is internal eNB time using clock originating at eNB startup.
# .utc is seconds since epoch counted using OS clock. # .utc is seconds since epoch counted using OS clock.
# .utc field was added in 2022-12-01 - see https://support.amarisoft.com/issues/21934 # .utc field was added in 2022-12-01 - see https://support.amarisoft.com/issues/21934
x.timestamp = x.get1("utc", (float,int)) # if there is no .utc - we defer computing .timestamp to ^^^ read
# where it might estimate it from .time and sync
if "utc" in x:
x.timestamp = x.get1("utc", (float,int))
else:
x.timestamp = None
return x return x
raise xr._err("invalid xlog entry") raise xr._err("invalid xlog entry")
......
...@@ -21,6 +21,7 @@ ...@@ -21,6 +21,7 @@
from xlte.amari import xlog from xlte.amari import xlog
from golang import func, defer, b from golang import func, defer, b
import io import io
import json
from pytest import raises from pytest import raises
...@@ -172,6 +173,130 @@ def test_Reader_readahead_vs_eof(): ...@@ -172,6 +173,130 @@ def test_Reader_readahead_vs_eof():
assert _ is None assert _ is None
# verify that for xlog stream produced by enb < 2022-12-01 the Reader can build
# Messages timestamps by itself based on sync.
@func
def test_Reader_timestamp_from_sync_wo_utc():
def jevent(time, event, args_dict={}):
d = {
"event": event,
"time" : time,
}
d.update(args_dict)
return json.dumps({"meta": d})
def jsync(time, srv_time):
d = {
"state": "attached" if srv_time is not None else "detached",
"reason": "periodic",
"generator": "...",
}
if srv_time is not None:
d['srv_time'] = srv_time
return jevent(time, "sync", d)
assert jsync(1.1, 2.2) == '{"meta": {"event": "sync", "time": 1.1, "state": "attached", "reason": "periodic", "generator": "...", "srv_time": 2.2}}'
assert jsync(1.1, None) == '{"meta": {"event": "sync", "time": 1.1, "state": "detached", "reason": "periodic", "generator": "..."}}'
def jmsg(srv_time, msg):
return json.dumps({"message": msg, "time": srv_time})
assert jmsg(123.4, "aaa") == '{"message": "aaa", "time": 123.4}'
data = b""
def _(line):
nonlocal data
assert '\n' not in line
data += b(line+'\n')
A = "service attach"
D = "service detach"
S = "sync"
_( jmsg(1, "aaa") ) # no timestamp: separated from ↓ jsync(1005) by event
_( jevent(1002, A ))
_( jmsg(3, "bbb") ) # have timestamp from ↓ jsync(1005)
_( jmsg(4, "ccc") ) # ----//----
_( jsync(1005, 5) ) # jsync with srv_time
_( jmsg(6, "ddd") ) # have timestamp from ↑ jsync(1005)
_( jmsg(7, "eee") ) # ----//----
_( jevent(1008, D ))
_( jmsg(9, "fff") ) # no timestamp: separated from ↑ jsync(1005) by event,
# and ↓ jsync(1010) has no srv_time
_( jsync(1010, None) ) # jsync without srv_time
_( jmsg(11, "ggg") ) # no timestamp
# expect_notime asserts that "no timestamp" error is raised on next read.
def expect_notime(xr, lineno):
with raises(xlog.ParseError,
match=":%d/ no `utc` and cannot compute timestamp with sync" % lineno):
_ = xr.read()
# expect_msg asserts that specified message with specified timestamp reads next.
def expect_msg(xr, timestamp, msg):
_ = xr.read()
assert type(_) is xlog.Message
assert _.message == msg
assert _.timestamp == timestamp
# expect_event asserts that specified event reads next.
def expect_event(xr, timestamp, event):
_ = xr.read()
assert type(_) is (xlog.SyncEvent if event == "sync" else xlog.Event)
assert _.event == event
assert _.timestamp == timestamp
xr = xlog.Reader(io.BytesIO(data))
br = xlog.Reader(io.BytesIO(data), reverse=True)
defer(xr.close)
defer(br.close)
expect_notime(xr, 1 ) # aaa
expect_event (xr, 1002, A )
expect_msg (xr, 1003, "bbb")
expect_msg (xr, 1004, "ccc")
expect_event (xr, 1005, S )
expect_msg (xr, 1006, "ddd")
expect_msg (xr, 1007, "eee")
expect_event (xr, 1008, D )
expect_notime(xr, 9 ) # fff
expect_event (xr, 1010, S )
expect_notime(xr, 11 ) # ggg
expect_notime(br, -1 ) # ggg
expect_event (br, 1010, S )
expect_notime(br, -3 ) # fff
expect_event (br, 1008, D )
expect_msg (br, 1007, "eee")
expect_msg (br, 1006, "ddd")
expect_event (br, 1005, S )
expect_msg (br, 1004, "ccc")
expect_msg (br, 1003, "bbb")
expect_event (br, 1002, A )
expect_notime(br, -11 ) # aaa
# extra check that we can get timestamp of first message if proper sync goes after
_( jsync(1012, 12) )
_( jmsg(13, "hhh") )
_( jmsg(14, "iii") )
bb = xlog.Reader(io.BytesIO(data), reverse=True)
defer(bb.close)
expect_msg (bb, 1014, "iii")
expect_msg (bb, 1013, "hhh")
expect_event (bb, 1012, S )
expect_msg (bb, 1011, "ggg") # now has timestamp because it is covered by ↑ sync(1012)
expect_event (bb, 1010, S ) # after sync(1010) it goes as for br
expect_notime(bb, -3-3 ) # fff
expect_event (bb, 1008, D )
expect_msg (bb, 1007, "eee")
expect_msg (bb, 1006, "ddd")
expect_event (bb, 1005, S )
expect_msg (bb, 1004, "ccc")
expect_msg (bb, 1003, "bbb")
expect_event (bb, 1002, A )
expect_notime(bb, -3-11 ) # aaa
def test_LogSpec(): def test_LogSpec():
logspec = "stats[samples,rf]/60s" logspec = "stats[samples,rf]/60s"
spec = xlog.LogSpec.parse(logspec) spec = xlog.LogSpec.parse(logspec)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment