Commit 67466ae5 authored by Kirill Smelkov's avatar Kirill Smelkov

amari.xlog: Sync, reverse reading, timestamps for eNB < 2022-12-01

Rework XLog protocol to come with periodic sync events that come from time to
time so that xlog stream becomes self-synchronizing. Sync events should be
useful for Wendelin to start reading xlog stream from any point, and to verify
that the stream is ok by matching its content vs messages schedule coming in
the syncs.

Teach xlog.Reader to read streams in reverse order from end to start. This
should be useful to look at tail of a log without reading it in full from the
start.

Teach xlog.Reader to reconstruct messages timestamps for xlog streams produced
with Amarisoft releases < 2022-12-01. There messages do not have .utc field
added in https://support.amarisoft.com/issues/21934 and come with only .time
field that represent internal eNB time using clock originating at eNB startup.
We combine message.time and δ(utc, enb.time) from sync to build message.timestamp .

See individual patches for details and
kirr/xlte!3 for preliminary discussion.

/reviewed-by @xavier_thompson
/reviewed-on kirr/xlte!4
parents 2a016d48 0c772eb4
...@@ -63,7 +63,8 @@ def connect(ctx, wsuri): # -> Conn ...@@ -63,7 +63,8 @@ def connect(ctx, wsuri): # -> Conn
class Conn: class Conn:
# .wsuri websocket uri of the service # .wsuri websocket uri of the service
# ._ws websocket connection to service # ._ws websocket connection to service
# ._srv_ready_msg message we got for "ready" # .srv_ready_msg message we got for "ready"
# .t_srv_ready_msg timestamp of "ready" reception
# ._mu sync.Mutex # ._mu sync.Mutex
# ._rxtab {} msgid -> (request, rx channel) | None # ._rxtab {} msgid -> (request, rx channel) | None
...@@ -76,6 +77,7 @@ class Conn: ...@@ -76,6 +77,7 @@ class Conn:
def __init__(conn, ws, wsuri): def __init__(conn, ws, wsuri):
try: try:
msg0_raw = ws.recv() msg0_raw = ws.recv()
t_msg0 = time.now()
msg0 = json.loads(msg0_raw) msg0 = json.loads(msg0_raw)
# TODO also support 'authenticate' # TODO also support 'authenticate'
if msg0['message'] != 'ready': if msg0['message'] != 'ready':
...@@ -86,7 +88,8 @@ class Conn: ...@@ -86,7 +88,8 @@ class Conn:
conn.wsuri = wsuri conn.wsuri = wsuri
conn._ws = ws conn._ws = ws
conn._srv_ready_msg = msg0 conn.srv_ready_msg = msg0
conn.t_srv_ready_msg = t_msg0
conn._mu = sync.Mutex() conn._mu = sync.Mutex()
conn._rxtab = {} conn._rxtab = {}
...@@ -236,12 +239,12 @@ class Conn: ...@@ -236,12 +239,12 @@ class Conn:
@property @property
def srv_type(conn): def srv_type(conn):
return conn._srv_ready_msg['type'] return conn.srv_ready_msg['type']
@property @property
def srv_name(conn): def srv_name(conn):
return conn._srv_ready_msg['name'] return conn.srv_ready_msg['name']
@property @property
def srv_version(conn): def srv_version(conn):
return conn._srv_ready_msg['version'] return conn.srv_ready_msg['version']
...@@ -47,7 +47,7 @@ class LogMeasure: ...@@ -47,7 +47,7 @@ class LogMeasure:
# ._rlog IO reader for enb.log # ._rlog IO reader for enb.log
# #
# ._estats \/ last xlog.Message with read stats result # ._estats \/ last xlog.Message with read stats result
# \/ last xlog.Event | LogError # \/ last xlog.Event\sync | LogError
# \/ None # \/ None
# ._m kpi.Measurement being prepared covering [_estats_prev, _estats) | None # ._m kpi.Measurement being prepared covering [_estats_prev, _estats) | None
# ._m_next kpi.Measurement being prepared covering [_estats, _estats_next) | None # ._m_next kpi.Measurement being prepared covering [_estats, _estats_next) | None
...@@ -153,6 +153,10 @@ def _read(logm): ...@@ -153,6 +153,10 @@ def _read(logm):
if x is None: if x is None:
x = LogError.EOF # represent EOF as LogError x = LogError.EOF # represent EOF as LogError
# ignore sync events
if isinstance(x, xlog.SyncEvent):
continue
# handle messages that update current Measurement # handle messages that update current Measurement
if isinstance(x, xlog.Message): if isinstance(x, xlog.Message):
if x.message == "x.drb_stats": if x.message == "x.drb_stats":
...@@ -162,7 +166,7 @@ def _read(logm): ...@@ -162,7 +166,7 @@ def _read(logm):
continue # ignore other messages continue # ignore other messages
# it is an error, event or stats. # it is an error, event\sync or stats.
# if it is an event or stats -> finalize timestamp for _m_next. # if it is an event or stats -> finalize timestamp for _m_next.
# start building next _m_next covering [x, x_next). # start building next _m_next covering [x, x_next).
# shift m <- ._m <- ._m_next <- (new Measurement | None for LogError) # shift m <- ._m <- ._m_next <- (new Measurement | None for LogError)
......
...@@ -536,6 +536,35 @@ def test_LogMeasure_cc_wraparound(): ...@@ -536,6 +536,35 @@ def test_LogMeasure_cc_wraparound():
readok(4, 10) # 4-5 readok(4, 10) # 4-5
# verify that LogMeasure ignores syncs in xlog stream.
@func
def test_LogMeasure_sync():
t = tLogMeasure()
defer(t.close)
_ = t.expect1
cc = 'rrc_connection_request'
CC = 'RRC.ConnEstabAtt.sum'
t.xlog( jstats(1, {}) )
t.xlog( jstats(2, {cc: 4}) )
t.xlog( '{"meta": {"event": "sync", "time": 2.5, "state": "attached", "reason": "periodic", "generator": "xlog ws://localhost:9001 stats[]/30.0s"}}' )
t.xlog( jstats(3, {cc: 7}) )
def readok(τ, CC_value):
_('X.Tstart', τ)
_('X.δT', int(τ+1)-τ)
if CC_value is not None:
_(CC, CC_value)
else:
t.expect_nodata()
t.read()
readok(0.02, None) # attach-1
readok(1, 4) # 1-2
readok(2, 3) # 2-3 jumping over sync
# jstats returns json-encoded stats message corresponding to counters dict. # jstats returns json-encoded stats message corresponding to counters dict.
# τ goes directly to stats['utc'] as is. # τ goes directly to stats['utc'] as is.
def jstats(τ, counters): # -> str def jstats(τ, counters): # -> str
......
...@@ -40,12 +40,17 @@ ...@@ -40,12 +40,17 @@
# Queries are specific to monitored LTE service. # Queries are specific to monitored LTE service.
# Events are specific to xlog itself and can be as follows: # Events are specific to xlog itself and can be as follows:
# #
# - "start" when xlog starts
# - "service attach" when xlog successfully connects to monitored LTE service # - "service attach" when xlog successfully connects to monitored LTE service
# - "service detach" when xlog disconnects from monitored LTE service # - "service detach" when xlog disconnects from monitored LTE service
# - "service connect failure" when xlog tries to connect to monitored LTE service # - "service connect failure" when xlog tries to connect to monitored LTE service
# with unsuccessful result. # with unsuccessful result.
# - "sync" emitted periodically and when xlogs starts,
# stops (TODO and rotate logs). Comes with current state of
# connection to LTE service and xlog setup
# - "xlog failure" on internal xlog error # - "xlog failure" on internal xlog error
#
# it is guaranteed that valid xlog stream has a sync event at least every LOS_window entries.
LOS_window = 1000
# TODO log file + rotate # TODO log file + rotate
...@@ -63,6 +68,7 @@ from xlte.amari import drb ...@@ -63,6 +68,7 @@ from xlte.amari import drb
import json import json
import traceback import traceback
import io
from golang import func, defer, chan, select from golang import func, defer, chan, select
from golang import context, sync, time from golang import context, sync, time
from golang.gcompat import qq from golang.gcompat import qq
...@@ -79,6 +85,11 @@ class LogSpec: ...@@ -79,6 +85,11 @@ class LogSpec:
DEFAULT_PERIOD = 60 DEFAULT_PERIOD = 60
def __init__(spec, query, optv, period):
spec.query = query
spec.optv = optv
spec.period = period
def __str__(spec): def __str__(spec):
return "%s[%s]/%ss" % (spec.query, ','.join(spec.optv), spec.period) return "%s[%s]/%ss" % (spec.query, ','.join(spec.optv), spec.period)
...@@ -116,20 +127,52 @@ class LogSpec: ...@@ -116,20 +127,52 @@ class LogSpec:
if c in query: if c in query:
bad("invalid query") bad("invalid query")
spec = LogSpec() return LogSpec(query, optv, period)
spec.query = query
spec.optv = optv
spec.period = period
return spec
# xlog queries service @wsuri periodically according to queries specified by # xlog queries service @wsuri periodically according to queries specified by
# logspecv and logs the result. # logspecv and logs the result.
@func
def xlog(ctx, wsuri, logspecv): def xlog(ctx, wsuri, logspecv):
xl = _XLogger(wsuri, logspecv) # make sure we always have meta.sync - either the caller specifies it
# explicitly, or we add it automatically to come first with default
slogspecv = ' '.join(['%s' % _ for _ in logspecv]) # 10x·longest periodicity. Do the same about config_get - by default we
xl.jemit("start", {"generator": "xlog %s %s" % (wsuri, slogspecv)}) # want it to be present after every sync.
lsync = None
isync = None
lconfig_get = None
pmax = 1
for (i,l) in enumerate(logspecv):
pmax = max(pmax, l.period)
if l.query == "meta.sync":
isync = i
lsync = l
if l.query == "config_get":
lconfig_get = l
logspecv = logspecv[:] # keep caller's intact
if lsync is None:
isync = 0
lsync = LogSpec("meta.sync", [], pmax*10)
logspecv.insert(0, lsync)
if lconfig_get is None:
logspecv.insert(isync+1, LogSpec("config_get", [], lsync.period))
# verify that sync will come at least every LOS_window records
ns = 0
for l in logspecv:
ns += (lsync.period / l.period)
if ns > LOS_window:
raise ValueError("meta.sync asked to come ~ every %d entries, "
"which is > LOS_window (%d)" % (ns, LOS_window))
# ready to start logging
xl = _XLogger(wsuri, logspecv, lsync.period)
# emit sync at start/stop
xl.jemit_sync("detached", "start", {})
def _():
xl.jemit_sync("detached", "stop", {})
defer(_)
while 1: while 1:
try: try:
...@@ -145,13 +188,21 @@ def xlog(ctx, wsuri, logspecv): ...@@ -145,13 +188,21 @@ def xlog(ctx, wsuri, logspecv):
# e.g. disk full in xl.jemit itself # e.g. disk full in xl.jemit itself
log.exception('xlog failure (second level):') log.exception('xlog failure (second level):')
time.sleep(3) δt_reconnect = min(3, lsync.period)
_, _rx = select(
ctx.done().recv, # 0
time.after(δt_reconnect).recv, # 1
)
if _ == 0:
raise ctx.err()
# _XLogger serves xlog implementation. # _XLogger serves xlog implementation.
class _XLogger: class _XLogger:
def __init__(xl, wsuri, logspecv): def __init__(xl, wsuri, logspecv, δt_sync):
xl.wsuri = wsuri xl.wsuri = wsuri
xl.logspecv = logspecv xl.logspecv = logspecv
xl.δt_sync = δt_sync # = logspecv.get("meta.sync").period
xl.tsync = float('-inf') # never yet
# emit saves line to the log. # emit saves line to the log.
def emit(xl, line): def emit(xl, line):
...@@ -166,9 +217,25 @@ class _XLogger: ...@@ -166,9 +217,25 @@ class _XLogger:
d = {"meta": d} d = {"meta": d}
xl.emit(json.dumps(d)) xl.emit(json.dumps(d))
# jemit_sync emits line with sync event to the log.
# TODO logrotate at this point
def jemit_sync(xl, state, reason, args_dict):
tnow = time.now()
d = {"state": state,
"reason": reason,
"generator": "xlog %s %s" % (xl.wsuri, ' '.join(['%s' % _ for _ in xl.logspecv]))}
d.update(args_dict)
xl.jemit("sync", d)
xl.tsync = tnow
# xlog1 performs one cycle of attach/log,log,log.../detach. # xlog1 performs one cycle of attach/log,log,log.../detach.
@func @func
def xlog1(xl, ctx): def xlog1(xl, ctx):
# emit sync periodically even in detached state
# this is useful to still know e.g. intended logspec if the service is stopped for a long time
if time.now() - xl.tsync >= xl.δt_sync:
xl.jemit_sync("detached", "periodic", {})
# connect to the service # connect to the service
try: try:
conn = amari.connect(ctx, xl.wsuri) conn = amari.connect(ctx, xl.wsuri)
...@@ -180,10 +247,19 @@ class _XLogger: ...@@ -180,10 +247,19 @@ class _XLogger:
defer(conn.close) defer(conn.close)
# emit "service attach"/"service detach" # emit "service attach"/"service detach"
#
# on attach, besides name/type/version, also emit everything present
# in the first ready message from the service. This should include
# "time" and optionally "utc" for releases ≥ 2022-12-01.
srv_info = {"srv_name": conn.srv_name, srv_info = {"srv_name": conn.srv_name,
"srv_type": conn.srv_type, "srv_type": conn.srv_type,
"srv_version": conn.srv_version} "srv_version": conn.srv_version}
xl.jemit("service attach", srv_info) srv_iattach = srv_info.copy()
for k, v in conn.srv_ready_msg.items():
if k in {"message", "type", "name", "version"}:
continue
srv_iattach["srv_"+k] = v
xl.jemit("service attach", srv_iattach)
def _(): def _():
try: try:
raise raise
...@@ -209,27 +285,27 @@ class _XLogger: ...@@ -209,27 +285,27 @@ class _XLogger:
xsrv_ready.recv() xsrv_ready.recv()
# spawn main logger # spawn main logger
wg.go(xl._xlog1, conn, xmsgsrv_dict) wg.go(xl._xlog1, conn, xmsgsrv_dict, srv_info)
def _xlog1(xl, ctx, conn, xmsgsrv_dict): def _xlog1(xl, ctx, conn, xmsgsrv_dict, srv_info):
# req_ queries either amari service directly, or an extra message service. # req_ queries either amari service directly, or an extra message service.
def req_(ctx, query, opts): # -> resp_raw def req_(ctx, query, opts): # -> (t_rx, resp, resp_raw)
if query in xmsgsrv_dict: if query in xmsgsrv_dict:
query_xsrv = xmsgsrv_dict[query] query_xsrv = xmsgsrv_dict[query]
_, resp_raw = query_xsrv.req_(ctx, opts) resp, resp_raw = query_xsrv.req_(ctx, opts)
else: else:
_, resp_raw = conn.req_(ctx, query, opts) resp, resp_raw = conn.req_(ctx, query, opts)
return resp_raw return (time.now(), resp, resp_raw)
# emit config_get after attach
cfg_raw = req_(ctx, 'config_get', {})
xl.emit(cfg_raw)
# loop emitting requested logspecs # loop emitting requested logspecs
t0 = time.now() t0 = time.now()
tnextv = [0]*len(xl.logspecv) # [i] - next time to arm for logspecv[i] relative to t0 tnextv = [0]*len(xl.logspecv) # [i] - next time to arm for logspecv[i] relative to t0
t_rx = conn.t_srv_ready_msg # time of last received message
srv_time = conn.srv_ready_msg["time"] # .time in ----//----
srv_utc = conn.srv_ready_msg.get("utc") # .utc in ----//---- (present ≥ 2022-12-01)
while 1: while 1:
# go through all logspecs in the order they were given # go through all logspecs in the order they were given
# pick logspec with soonest arm time # pick logspec with soonest arm time
...@@ -267,7 +343,20 @@ class _XLogger: ...@@ -267,7 +343,20 @@ class _XLogger:
if _ == 0: if _ == 0:
raise ctx.err() raise ctx.err()
resp_raw = req_(ctx, logspec.query, opts) if logspec.query == 'meta.sync':
# emit sync with srv_time and srv_utc approximated from last
# rx'ed message and local clock run since that reception
tnow = time.now()
isync = srv_info.copy()
isync["srv_time"] = srv_time + (tnow - t_rx)
if srv_utc is not None:
isync["srv_utc"] = srv_utc + (tnow - t_rx)
xl.jemit_sync("attached", "periodic", isync)
else:
t_rx, resp, resp_raw = req_(ctx, logspec.query, opts)
srv_time = resp["time"]
srv_utc = resp.get("utc")
xl.emit(resp_raw) xl.emit(resp_raw)
...@@ -353,10 +442,18 @@ _xmsg("x.drb_stats", drb._x_stats_srv, "retrieve statistics about data radio bea ...@@ -353,10 +442,18 @@ _xmsg("x.drb_stats", drb._x_stats_srv, "retrieve statistics about data radio bea
# #
# The reader must provide .readline() method. # The reader must provide .readline() method.
# The ownership of wrapped reader is transferred to the Reader. # The ownership of wrapped reader is transferred to the Reader.
class ParseError(RuntimeError): pass class ParseError(RuntimeError): pass # an entry could not be parsed
class LOSError(RuntimeError): pass # loss of synchronization
class Reader: class Reader:
# ._r underlying IO reader # ._r underlying IO reader
# ._lineno current line number # ._lineno current line number
# ._sync sync(attached) covering current message(s) | None
# for a message M sync S covering it can come in the log both before and after M
# S covers M if there is no other event/error E in between S and M
# ._n_nosync for how long we have not seen a sync
# ._emsgq [](Message|Event|Exception)
# queue for messages/events/... while we are reading ahead to look for sync
# non-message could be only at tail
pass pass
# xdict represents dict loaded from xlog entry. # xdict represents dict loaded from xlog entry.
...@@ -380,29 +477,27 @@ class Message(xdict): ...@@ -380,29 +477,27 @@ class Message(xdict):
# .timestamp seconds since epoch # .timestamp seconds since epoch
pass pass
# SyncEvent specializes Event and represents "sync" event in xlog.
class SyncEvent(Event):
# .state
# .reason
# .generator
# .srv_time | None if not present
pass
# Reader(r) creates new reader that will read xlog data from r. # Reader(r) creates new reader that will read xlog data from r.
#
# if reverse=True xlog entries are read in reverse order from end to start.
@func(Reader) @func(Reader)
def __init__(xr, r): def __init__(xr, r, reverse=False):
if reverse:
r = _ReverseLineReader(r)
xr._r = r xr._r = r
xr._lineno = 0 xr._lineno = 0
xr._sync = None
# parse header xr._n_nosync = 0
try: xr._emsgq = []
head = xr._jread1()
if head is None:
raise xr._err("header: unexpected EOF")
meta = head.get1("meta", dict)
ev0, t0 = xr._parse_metahead(meta)
if ev0 != "start":
raise xr._err("header: starts with meta.event=%s ; expected `start`" % ev0)
gen = meta.get1("generator", str)
# TODO parse generator -> ._xlogspecv
except:
xr._r.close()
raise
# close release resources associated with the Reader. # close release resources associated with the Reader.
@func(Reader) @func(Reader)
...@@ -412,6 +507,87 @@ def close(xr): ...@@ -412,6 +507,87 @@ def close(xr):
# read returns next xlog entry or None at EOF. # read returns next xlog entry or None at EOF.
@func(Reader) @func(Reader)
def read(xr): # -> Event|Message|None def read(xr): # -> Event|Message|None
while 1:
# flush what we queued during readahead
if len(xr._emsgq) > 0:
x = xr._emsgq.pop(0)
# event/error
if not isinstance(x, Message):
for _ in xr._emsgq: # non-message could be only at tail
assert not isinstance(_, Message), _
if isinstance(x, SyncEvent) and x.state == "attached":
assert xr._sync is x # readahead should have set it
else:
# attach/detach/sync(detached)/error separate sync from other messages
xr._sync = None
if isinstance(x, Exception):
raise x
return x
# message
assert isinstance(x, Message)
# provide timestamps for xlog messages generated with eNB < 2022-12-01
# there messages come without .utc field and have only .time
# we estimate the timestamp from .time and from δ(utc,time) taken from covering sync
if x.timestamp is None:
if xr._sync is not None and xr._sync.srv_time is not None:
# srv_utc' = srv_time' + (time - srv_time)
srv_time_ = x.get1("time", (float,int)) # ParseError if not present
x.timestamp = srv_time_ + (xr._sync.timestamp - xr._sync.srv_time)
if x.timestamp is None:
raise ParseError("%s:%d/%s no `utc` and cannot compute "
"timestamp with sync" % (x.pos[0], x.pos[1], '/'.join(x.path)))
# TODO verify messages we get/got against their schedule in covering sync.
# Raise LOSError (loss of synchronization) if what we actually see
# does not match what sync says it should be.
return x
assert len(xr._emsgq) == 0
# read next message/event/... potentially reading ahead while looking for covering sync
while 1:
try:
x = xr._read1()
except Exception as e:
x = e
# if we see EOF - we return it to outside only if the queue is empty
# otherwise it might be that readahead reaches EOF early, but at
# the time when queue flush would want to yield it to the user, the
# stream might have more data.
if x is None:
if len(xr._emsgq) == 0:
return None
else:
break # flush the queue
xr._emsgq.append(x)
# if we see sync(attached) - it will cover future messages till next
# event, and messages that are already queued
if isinstance(x, SyncEvent):
xr._n_nosync = 0
if x.state == "attached":
xr._sync = x
else:
xr._n_nosync += 1
if xr._n_nosync > LOS_window:
xr._emsgq.append(LOSError("no sync for %d entries" % xr._n_nosync))
if isinstance(x, Message):
if xr._sync is None: # have message and no sync -
continue # - continue to read ahead to find it
# message with sync or any event - flush the queue
break
# _read1 serves read by reading one next raw entry from the log.
# it does not detect loss of synchronization.
@func(Reader)
def _read1(xr):
x = xr._jread1() x = xr._jread1()
if x is None: if x is None:
return None return None
...@@ -420,6 +596,19 @@ def read(xr): # -> Event|Message|None ...@@ -420,6 +596,19 @@ def read(xr): # -> Event|Message|None
x.__class__ = Event x.__class__ = Event
meta = x.get1("meta", dict) meta = x.get1("meta", dict)
x.event, x.timestamp = xr._parse_metahead(meta) x.event, x.timestamp = xr._parse_metahead(meta)
if x.event in {"sync", "start"}: # for backward compatibility with old logs meta:start
x.__class__ = SyncEvent # is reported to users as sync(start) event
x.generator = meta.get1("generator", str)
if x.event == "start":
x.state = "detached"
x.reason = "start"
else:
x.state = meta.get1("state", str)
x.reason = meta.get1("reason", str)
x.srv_time = None
if "srv_time" in meta:
x.srv_time = meta.get1("srv_time", (float,int))
# TODO parse generator -> .logspecv
return x return x
if "message" in x: if "message" in x:
...@@ -428,7 +617,12 @@ def read(xr): # -> Event|Message|None ...@@ -428,7 +617,12 @@ def read(xr): # -> Event|Message|None
# NOTE .time is internal eNB time using clock originating at eNB startup. # NOTE .time is internal eNB time using clock originating at eNB startup.
# .utc is seconds since epoch counted using OS clock. # .utc is seconds since epoch counted using OS clock.
# .utc field was added in 2022-12-01 - see https://support.amarisoft.com/issues/21934 # .utc field was added in 2022-12-01 - see https://support.amarisoft.com/issues/21934
# if there is no .utc - we defer computing .timestamp to ^^^ read
# where it might estimate it from .time and sync
if "utc" in x:
x.timestamp = x.get1("utc", (float,int)) x.timestamp = x.get1("utc", (float,int))
else:
x.timestamp = None
return x return x
raise xr._err("invalid xlog entry") raise xr._err("invalid xlog entry")
...@@ -443,7 +637,8 @@ def _err(xr, text): ...@@ -443,7 +637,8 @@ def _err(xr, text):
# None is returned at EOF. # None is returned at EOF.
@func(Reader) @func(Reader)
def _jread1(xr): # -> xdict|None def _jread1(xr): # -> xdict|None
xr._lineno += 1 xr._lineno += 1 if not isinstance(xr._r, _ReverseLineReader) else \
-1
try: try:
l = xr._r.readline() l = xr._r.readline()
except Exception as e: except Exception as e:
...@@ -490,6 +685,96 @@ def get1(xd, key, typeok): ...@@ -490,6 +685,96 @@ def get1(xd, key, typeok):
return val return val
# _ReverseLineReader serves xlog.Reader by wrapping an IO reader and reading
# lines from the underlying reader in reverse order.
#
# Use .readline() to retrieve lines from end to start.
# Use .close() when done.
#
# The original reader is required to provide both .readline() and .seek() so
# that backward reading could be done efficiently.
#
# Original reader can be opened in either binary or text mode -
# _ReverseLineReader will provide read data in the same mode as original.
#
# The ownership of wrapped reader is transferred to _ReverseLineReader.
class _ReverseLineReader:
# ._r underlying IO reader
# ._bufsize data is read in so sized chunks
# ._buf current buffer
# ._bufpos ._buf corresponds to ._r[_bufpos:...]
def __init__(rr, r, bufsize=None):
rr._r = r
if bufsize is None:
bufsize = 8192
rr._bufsize = bufsize
r.seek(0, io.SEEK_END)
rr._bufpos = r.tell()
if hasattr(r, 'encoding'): # text
rr._buf = ''
rr._lfchr = '\n'
rr._str0 = ''
else: # binary
rr._buf = b''
rr._lfchr = b'\n'
rr._str0 = b''
if hasattr(r, 'name'):
rr.name = r.name
# close releases resources associated with the reader.
def close(rr):
rr._r.close()
# readline reads next line from underlying stream.
# the lines are read backwards from end to start.
def readline(rr): # -> line | ø at EOF
chunkv = []
while 1:
# time to load next buffer
if len(rr._buf) == 0:
bufpos = max(0, rr._bufpos - rr._bufsize)
bufsize = rr._bufpos - bufpos
if bufsize == 0:
break
rr._r.seek(bufpos, io.SEEK_SET)
rr._buf = _ioreadn(rr._r, bufsize)
rr._bufpos = bufpos
assert len(rr._buf) > 0
# let's scan to the left where \n is
lf = rr._buf.rfind(rr._lfchr)
if lf == -1: # no \n - queue whole buf
chunkv.insert(0, rr._buf)
rr._buf = rr._buf[:0]
continue
if len(chunkv) == 0 and lf+1 == len(rr._buf): # started reading from ending \n
chunkv.insert(0, rr._buf[lf:])
rr._buf = rr._buf[:lf]
continue
chunkv.insert(0, rr._buf[lf+1:]) # \n of previous line found - we are done
rr._buf = rr._buf[:lf+1]
break
return rr._str0.join(chunkv)
# _ioreadn reads exactly n elements from f.
def _ioreadn(f, n):
l = n
data = '' if hasattr(f, 'encoding') else \
b''
while len(data) < n:
chunk = f.read(l)
data += chunk
l -= len(chunk)
return data[:n] # slice in case it overreads
# _ioname returns name of a file-like f. # _ioname returns name of a file-like f.
def _ioname(f): def _ioname(f):
if hasattr(f, 'name'): if hasattr(f, 'name'):
...@@ -533,6 +818,11 @@ following synthetic queries is also provided: ...@@ -533,6 +818,11 @@ following synthetic queries is also provided:
%s %s
Additionally the following queries are used to control xlog itself:
meta.sync specify how often synchronization events are emitted
default is 10x the longest period
Options: Options:
-h --help show this help -h --help show this help
......
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# Copyright (C) 2022 Nexedi SA and Contributors. # Copyright (C) 2022-2023 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com> # Kirill Smelkov <kirr@nexedi.com>
# #
# This program is free software: you can Use, Study, Modify and Redistribute # This program is free software: you can Use, Study, Modify and Redistribute
...@@ -19,8 +19,9 @@ ...@@ -19,8 +19,9 @@
# See https://www.nexedi.com/licensing for rationale and options. # See https://www.nexedi.com/licensing for rationale and options.
from xlte.amari import xlog from xlte.amari import xlog
from golang import func, defer from golang import func, defer, b
import io import io
import json
from pytest import raises from pytest import raises
...@@ -35,15 +36,24 @@ zzzqqqrrrr ...@@ -35,15 +36,24 @@ zzzqqqrrrr
{"message":"hello","message_id":3,"utc":10000} {"message":"hello","message_id":3,"utc":10000}
""" """
xr = xlog.Reader(io.BytesIO(data)) f = io.BytesIO(data); f.name = "enb.xlog"
xr = xlog.Reader(f)
defer(xr.close) defer(xr.close)
# TODO check xr.tstart == 0.01 # :1
# TODO check xr.xlogspecv == ue_get[]/3.0s erab_get[]/3.0s _ = _1 = xr.read()
assert type(_) is xlog.SyncEvent
assert _.pos == ("enb.xlog", 1)
assert _.event == "start"
assert _.timestamp == 0.01
assert _ == {"meta": {"event": "start",
"time": 0.01,
"generator": "xlog ws://localhost:9001 ue_get[]/3.0s erab_get[]/3.0s"}}
# :2 # :2
_ = xr.read() _ = _2 = xr.read()
assert type(_) is xlog.Event assert type(_) is xlog.Event
assert _.pos == ("enb.xlog", 2)
assert _.event == "service attach" assert _.event == "service attach"
assert _.timestamp == 0.02 assert _.timestamp == 0.02
assert _ == {"meta": {"event": "service attach", assert _ == {"meta": {"event": "service attach",
...@@ -52,8 +62,9 @@ zzzqqqrrrr ...@@ -52,8 +62,9 @@ zzzqqqrrrr
"srv_type": "ENB", "srv_type": "ENB",
"srv_version": "2022-12-01"}} "srv_version": "2022-12-01"}}
# :3 # :3
_ = xr.read() _ = _3 = xr.read()
assert type(_) is xlog.Message assert type(_) is xlog.Message
assert _.pos == ("enb.xlog", 3)
assert _.message == "ue_get" assert _.message == "ue_get"
assert _.timestamp == 9613.347 assert _.timestamp == 9613.347
assert _ == {"message": "ue_get", assert _ == {"message": "ue_get",
...@@ -63,12 +74,13 @@ zzzqqqrrrr ...@@ -63,12 +74,13 @@ zzzqqqrrrr
"utc": 9613.347} "utc": 9613.347}
# :4 (bad input) # :4 (bad input)
with raises(xlog.ParseError, match=":4 : invalid json"): with raises(xlog.ParseError, match="enb.xlog:4 : invalid json"):
_ = xr.read() _ = xr.read()
# :5 (restore after bad input) # :5 (restore after bad input)
_ = xr.read() _ = _5 = xr.read()
assert type(_) is xlog.Message assert type(_) is xlog.Message
assert _.pos == ("enb.xlog", 5)
assert _.message == "hello" assert _.message == "hello"
assert _.timestamp == 10000 assert _.timestamp == 10000
assert _ == {"message": "hello", assert _ == {"message": "hello",
...@@ -80,6 +92,211 @@ zzzqqqrrrr ...@@ -80,6 +92,211 @@ zzzqqqrrrr
assert _ is None assert _ is None
# ---- reverse ----
f = io.BytesIO(data); f.name = "bbb.xlog"
br = xlog.Reader(f, reverse=True)
# :-1 (:5)
_ = br.read()
assert type(_) is xlog.Message
assert _.pos == ("bbb.xlog", -1)
assert _ == _5
# :-2 (:4) (bad input)
with raises(xlog.ParseError, match="bbb.xlog:-2 : invalid json"):
_ = br.read()
# :-3 (:3) (restore after bad input)
_ = br.read()
assert type(_) is xlog.Message
assert _.pos == ("bbb.xlog", -3)
assert _ == _3
# :-4 (:2)
_ = br.read()
assert type(_) is xlog.Event
assert _.pos == ("bbb.xlog", -4)
assert _ == _2
# :-5 (:1)
_ = br.read()
assert type(_) is xlog.SyncEvent
assert _.pos == ("bbb.xlog", -5)
assert _ == _1
# EOF
_ = br.read()
assert _ is None
# verify that EOF is not returned prematurely due to readahead pre-hitting it
# sooner on the live stream.
@func
def test_Reader_readahead_vs_eof():
fxlog = io.BytesIO(b'')
def logit(line):
line = b(line)
assert b'\n' not in line
pos = fxlog.tell()
fxlog.seek(0, io.SEEK_END)
fxlog.write(b'%s\n' % line)
fxlog.seek(pos, io.SEEK_SET)
xr = xlog.Reader(fxlog)
def expect_msg(τ, msg):
_ = xr.read()
assert type(_) is xlog.Message
assert _.timestamp == τ
assert _.message == msg
logit('{"message": "aaa", "utc": 1}')
logit('{"message": "bbb", "utc": 2}')
expect_msg(1, "aaa")
expect_msg(2, "bbb")
# ^^^ readahead hit EOF internally, but at the time next .read() is called,
# the stream has more data
logit('{"message": "ccc", "utc": 3}')
expect_msg(3, "ccc")
# now, when read is called, the stream has no more data
# -> EOF is reported to the caller
_ = xr.read()
assert _ is None
# now the stream has more data again
logit('{"message": "ddd", "utc": 4}')
logit('{"message": "eee", "utc": 5}')
expect_msg(4, "ddd")
expect_msg(5, "eee")
_ = xr.read()
assert _ is None
# verify that for xlog stream produced by enb < 2022-12-01 the Reader can build
# Messages timestamps by itself based on sync.
@func
def test_Reader_timestamp_from_sync_wo_utc():
def jevent(time, event, args_dict={}):
d = {
"event": event,
"time" : time,
}
d.update(args_dict)
return json.dumps({"meta": d})
def jsync(time, srv_time):
d = {
"state": "attached" if srv_time is not None else "detached",
"reason": "periodic",
"generator": "...",
}
if srv_time is not None:
d['srv_time'] = srv_time
return jevent(time, "sync", d)
assert jsync(1.1, 2.2) == '{"meta": {"event": "sync", "time": 1.1, "state": "attached", "reason": "periodic", "generator": "...", "srv_time": 2.2}}'
assert jsync(1.1, None) == '{"meta": {"event": "sync", "time": 1.1, "state": "detached", "reason": "periodic", "generator": "..."}}'
def jmsg(srv_time, msg):
return json.dumps({"message": msg, "time": srv_time})
assert jmsg(123.4, "aaa") == '{"message": "aaa", "time": 123.4}'
data = b""
def _(line):
nonlocal data
assert '\n' not in line
data += b(line+'\n')
A = "service attach"
D = "service detach"
S = "sync"
_( jmsg(1, "aaa") ) # no timestamp: separated from ↓ jsync(1005) by event
_( jevent(1002, A ))
_( jmsg(3, "bbb") ) # have timestamp from ↓ jsync(1005)
_( jmsg(4, "ccc") ) # ----//----
_( jsync(1005, 5) ) # jsync with srv_time
_( jmsg(6, "ddd") ) # have timestamp from ↑ jsync(1005)
_( jmsg(7, "eee") ) # ----//----
_( jevent(1008, D ))
_( jmsg(9, "fff") ) # no timestamp: separated from ↑ jsync(1005) by event,
# and ↓ jsync(1010) has no srv_time
_( jsync(1010, None) ) # jsync without srv_time
_( jmsg(11, "ggg") ) # no timestamp
# expect_notime asserts that "no timestamp" error is raised on next read.
def expect_notime(xr, lineno):
with raises(xlog.ParseError,
match=":%d/ no `utc` and cannot compute timestamp with sync" % lineno):
_ = xr.read()
# expect_msg asserts that specified message with specified timestamp reads next.
def expect_msg(xr, timestamp, msg):
_ = xr.read()
assert type(_) is xlog.Message
assert _.message == msg
assert _.timestamp == timestamp
# expect_event asserts that specified event reads next.
def expect_event(xr, timestamp, event):
_ = xr.read()
assert type(_) is (xlog.SyncEvent if event == "sync" else xlog.Event)
assert _.event == event
assert _.timestamp == timestamp
xr = xlog.Reader(io.BytesIO(data))
br = xlog.Reader(io.BytesIO(data), reverse=True)
defer(xr.close)
defer(br.close)
expect_notime(xr, 1 ) # aaa
expect_event (xr, 1002, A )
expect_msg (xr, 1003, "bbb")
expect_msg (xr, 1004, "ccc")
expect_event (xr, 1005, S )
expect_msg (xr, 1006, "ddd")
expect_msg (xr, 1007, "eee")
expect_event (xr, 1008, D )
expect_notime(xr, 9 ) # fff
expect_event (xr, 1010, S )
expect_notime(xr, 11 ) # ggg
expect_notime(br, -1 ) # ggg
expect_event (br, 1010, S )
expect_notime(br, -3 ) # fff
expect_event (br, 1008, D )
expect_msg (br, 1007, "eee")
expect_msg (br, 1006, "ddd")
expect_event (br, 1005, S )
expect_msg (br, 1004, "ccc")
expect_msg (br, 1003, "bbb")
expect_event (br, 1002, A )
expect_notime(br, -11 ) # aaa
# extra check that we can get timestamp of first message if proper sync goes after
_( jsync(1012, 12) )
_( jmsg(13, "hhh") )
_( jmsg(14, "iii") )
bb = xlog.Reader(io.BytesIO(data), reverse=True)
defer(bb.close)
expect_msg (bb, 1014, "iii")
expect_msg (bb, 1013, "hhh")
expect_event (bb, 1012, S )
expect_msg (bb, 1011, "ggg") # now has timestamp because it is covered by ↑ sync(1012)
expect_event (bb, 1010, S ) # after sync(1010) it goes as for br
expect_notime(bb, -3-3 ) # fff
expect_event (bb, 1008, D )
expect_msg (bb, 1007, "eee")
expect_msg (bb, 1006, "ddd")
expect_event (bb, 1005, S )
expect_msg (bb, 1004, "ccc")
expect_msg (bb, 1003, "bbb")
expect_event (bb, 1002, A )
expect_notime(bb, -3-11 ) # aaa
def test_LogSpec(): def test_LogSpec():
logspec = "stats[samples,rf]/60s" logspec = "stats[samples,rf]/60s"
spec = xlog.LogSpec.parse(logspec) spec = xlog.LogSpec.parse(logspec)
...@@ -87,3 +304,56 @@ def test_LogSpec(): ...@@ -87,3 +304,56 @@ def test_LogSpec():
assert spec.query == "stats" assert spec.query == "stats"
assert spec.optv == ["samples", "rf"] assert spec.optv == ["samples", "rf"]
assert spec.period == 60.0 assert spec.period == 60.0
def test_ReverseLineReader():
linev = [
'hello world',
'привет мир',
'zzz',
'αβγδ', # 2-bytes UTF-8 characters
'你好' # 3-bytes ----//----
'𩸽𩹨', # 4-bytes ----//----
'{"message":"hello"}',
]
tdata = '\n'.join(linev) + '\n' # text
bdata = tdata.encode('utf-8') # binary
# check verifies _ReverseLineReader on tdata and bdata with particular bufsize.
@func
def check(bufsize):
trr = xlog._ReverseLineReader(io.StringIO(tdata), bufsize)
brr = xlog._ReverseLineReader(io.BytesIO (bdata), bufsize)
defer(trr.close)
defer(brr.close)
tv = []
while 1:
tl = trr.readline()
if tl == '':
break
assert tl.endswith('\n')
tl = tl[:-1]
assert '\n' not in tl
tv.append(tl)
bv = []
while 1:
bl = brr.readline()
if bl == b'':
break
assert bl.endswith(b'\n')
bl = bl[:-1]
assert b'\n' not in bl
bv.append(bl.decode('utf-8'))
venil = list(reversed(linev))
assert tv == venil
assert bv == venil
# verify all buffer sizes from 1 to 10x bigger the data.
# this way we cover all tricky cases where e.g. an UTF8 character is split
# in its middle by a buffer.
for bufsize in range(1, 10*len(bdata)):
check(bufsize)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment