Commit b412d488 authored by Kirill Smelkov's avatar Kirill Smelkov

amari.xlog: Require sync to be present at least every 1000 records

This way xlog.Reader can be sure that if it looked around in such a
window and did not find a sync, then something is not good with the
stream and it does not need to go beyond that limit looking around.

This is a change of the protocol. But it is early days and existing logs
- that we use in the demo, are all below 1000 lines limit, so they will
continue to be loaded ok.

No direct test for actual Loss Of Sync detection - this functionality is
draft for now and should be improved later. However for no-LOS cases
xlog.Reader is already covered with tests.
parent 9d9d20f3
...@@ -48,6 +48,9 @@ ...@@ -48,6 +48,9 @@
# stops (TODO and rotate logs). Comes with current state of # stops (TODO and rotate logs). Comes with current state of
# connection to LTE service and xlog setup # connection to LTE service and xlog setup
# - "xlog failure" on internal xlog error # - "xlog failure" on internal xlog error
#
# it is guaranteed that valid xlog stream has a sync event at least every LOS_window entries.
LOS_window = 1000
# TODO log file + rotate # TODO log file + rotate
...@@ -153,7 +156,15 @@ def xlog(ctx, wsuri, logspecv): ...@@ -153,7 +156,15 @@ def xlog(ctx, wsuri, logspecv):
if lconfig_get is None: if lconfig_get is None:
logspecv.insert(isync+1, LogSpec("config_get", [], lsync.period)) logspecv.insert(isync+1, LogSpec("config_get", [], lsync.period))
# verify that sync will come at least every LOS_window records
ns = 0
for l in logspecv:
ns += (lsync.period / l.period)
if ns > LOS_window:
raise ValueError("meta.sync asked to come ~ every %d entries, "
"which is > LOS_window (%d)" % (ns, LOS_window))
# ready to start logging
xl = _XLogger(wsuri, logspecv, lsync.period) xl = _XLogger(wsuri, logspecv, lsync.period)
# emit sync at start/stop # emit sync at start/stop
...@@ -407,10 +418,18 @@ _xmsg("x.drb_stats", drb._x_stats_srv, "retrieve statistics about data radio bea ...@@ -407,10 +418,18 @@ _xmsg("x.drb_stats", drb._x_stats_srv, "retrieve statistics about data radio bea
# #
# The reader must provide .readline() method. # The reader must provide .readline() method.
# The ownership of wrapped reader is transferred to the Reader. # The ownership of wrapped reader is transferred to the Reader.
class ParseError(RuntimeError): pass class ParseError(RuntimeError): pass # an entry could not be parsed
class LOSError(RuntimeError): pass # loss of synchronization
class Reader: class Reader:
# ._r underlying IO reader # ._r underlying IO reader
# ._lineno current line number # ._lineno current line number
# ._sync sync(attached) covering current message(s) | None
# for a message M sync S covering it can come in the log both before and after M
# S covers M if there is no other event/error E in between S and M
# ._n_nosync for how long we have not seen a sync
# ._emsgq [](Message|Event|Exception)
# queue for messages/events/... while we are reading ahead to look for sync
# non-message could be only at tail
pass pass
# xdict represents dict loaded from xlog entry. # xdict represents dict loaded from xlog entry.
...@@ -447,6 +466,9 @@ class SyncEvent(Event): ...@@ -447,6 +466,9 @@ class SyncEvent(Event):
def __init__(xr, r): def __init__(xr, r):
xr._r = r xr._r = r
xr._lineno = 0 xr._lineno = 0
xr._sync = None
xr._n_nosync = 0
xr._emsgq = []
# close release resources associated with the Reader. # close release resources associated with the Reader.
@func(Reader) @func(Reader)
...@@ -456,6 +478,75 @@ def close(xr): ...@@ -456,6 +478,75 @@ def close(xr):
# read returns next xlog entry or None at EOF. # read returns next xlog entry or None at EOF.
@func(Reader) @func(Reader)
def read(xr): # -> Event|Message|None def read(xr): # -> Event|Message|None
while 1:
# flush what we queued during readahead
if len(xr._emsgq) > 0:
x = xr._emsgq.pop(0)
# event/error
if not isinstance(x, Message):
for _ in xr._emsgq: # non-message could be only at tail
assert not isinstance(_, Message), _
if isinstance(x, SyncEvent) and x.state == "attached":
assert xr._sync is x # readahead should have set it
else:
# attach/detach/sync(detached)/error separate sync from other messages
xr._sync = None
if isinstance(x, Exception):
raise x
return x
# message
assert isinstance(x, Message)
# TODO verify messages we get/got against their schedule in covering sync.
# Raise LOSError (loss of synchronization) if what we actually see
# does not match what sync says it should be.
return x
assert len(xr._emsgq) == 0
# read next message/event/... potentially reading ahead while looking for covering sync
while 1:
try:
x = xr._read1()
except Exception as e:
x = e
# if we see EOF - we return it to outside only if the queue is empty
# otherwise it might be that readahead reaches EOF early, but at
# the time when queue flush would want to yield it to the user, the
# stream might have more data.
if x is None:
if len(xr._emsgq) == 0:
return None
else:
break # flush the queue
xr._emsgq.append(x)
# if we see sync(attached) - it will cover future messages till next
# event, and messages that are already queued
if isinstance(x, SyncEvent):
xr._n_nosync = 0
if x.state == "attached":
xr._sync = x
else:
xr._n_nosync += 1
if xr._n_nosync > LOS_window:
xr._emsgq.append(LOSError("no sync for %d entries" % xr._n_nosync))
if isinstance(x, Message):
if xr._sync is None: # have message and no sync -
continue # - continue to read ahead to find it
# message with sync or any event - flush the queue
break
# _read1 serves read by reading one next raw entry from the log.
# it does not detect loss of synchronization.
@func(Reader)
def _read1(xr):
x = xr._jread1() x = xr._jread1()
if x is None: if x is None:
return None return None
......
...@@ -19,7 +19,7 @@ ...@@ -19,7 +19,7 @@
# See https://www.nexedi.com/licensing for rationale and options. # See https://www.nexedi.com/licensing for rationale and options.
from xlte.amari import xlog from xlte.amari import xlog
from golang import func, defer from golang import func, defer, b
import io import io
from pytest import raises from pytest import raises
...@@ -86,6 +86,50 @@ zzzqqqrrrr ...@@ -86,6 +86,50 @@ zzzqqqrrrr
assert _ is None assert _ is None
# verify that EOF is not returned prematurely due to readahead pre-hitting it
# sooner on the live stream.
@func
def test_Reader_readahead_vs_eof():
fxlog = io.BytesIO(b'')
def logit(line):
line = b(line)
assert b'\n' not in line
pos = fxlog.tell()
fxlog.seek(0, io.SEEK_END)
fxlog.write(b'%s\n' % line)
fxlog.seek(pos, io.SEEK_SET)
xr = xlog.Reader(fxlog)
def expect_msg(τ, msg):
_ = xr.read()
assert type(_) is xlog.Message
assert _.timestamp == τ
assert _.message == msg
logit('{"message": "aaa", "utc": 1}')
logit('{"message": "bbb", "utc": 2}')
expect_msg(1, "aaa")
expect_msg(2, "bbb")
# ^^^ readahead hit EOF internally, but at the time next .read() is called,
# the stream has more data
logit('{"message": "ccc", "utc": 3}')
expect_msg(3, "ccc")
# now, when read is called, the stream has no more data
# -> EOF is reported to the caller
_ = xr.read()
assert _ is None
# now the stream has more data again
logit('{"message": "ddd", "utc": 4}')
logit('{"message": "eee", "utc": 5}')
expect_msg(4, "ddd")
expect_msg(5, "eee")
_ = xr.read()
assert _ is None
def test_LogSpec(): def test_LogSpec():
logspec = "stats[samples,rf]/60s" logspec = "stats[samples,rf]/60s"
spec = xlog.LogSpec.parse(logspec) spec = xlog.LogSpec.parse(logspec)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment