Commit dbecc158 authored by Kirill Smelkov's avatar Kirill Smelkov

amari.xlog: Teach Reader to read xlog in reverse order from end to start

This functionality is useful to look at tail of a log without reading it
in full from the start.
parent 515c1573
...@@ -68,6 +68,7 @@ from xlte.amari import drb ...@@ -68,6 +68,7 @@ from xlte.amari import drb
import json import json
import traceback import traceback
import io
from golang import func, defer, chan, select from golang import func, defer, chan, select
from golang import context, sync, time from golang import context, sync, time
from golang.gcompat import qq from golang.gcompat import qq
...@@ -462,8 +463,12 @@ class SyncEvent(Event): ...@@ -462,8 +463,12 @@ class SyncEvent(Event):
# Reader(r) creates new reader that will read xlog data from r. # Reader(r) creates new reader that will read xlog data from r.
#
# if reverse=True xlog entries are read in reverse order from end to start.
@func(Reader) @func(Reader)
def __init__(xr, r): def __init__(xr, r, reverse=False):
if reverse:
r = _ReverseLineReader(r)
xr._r = r xr._r = r
xr._lineno = 0 xr._lineno = 0
xr._sync = None xr._sync = None
...@@ -588,7 +593,8 @@ def _err(xr, text): ...@@ -588,7 +593,8 @@ def _err(xr, text):
# None is returned at EOF. # None is returned at EOF.
@func(Reader) @func(Reader)
def _jread1(xr): # -> xdict|None def _jread1(xr): # -> xdict|None
xr._lineno += 1 xr._lineno += 1 if not isinstance(xr._r, _ReverseLineReader) else \
-1
try: try:
l = xr._r.readline() l = xr._r.readline()
except Exception as e: except Exception as e:
...@@ -635,6 +641,96 @@ def get1(xd, key, typeok): ...@@ -635,6 +641,96 @@ def get1(xd, key, typeok):
return val return val
# _ReverseLineReader serves xlog.Reader by wrapping an IO reader and reading
# lines from the underlying reader in reverse order.
#
# Use .readline() to retrieve lines from end to start.
# Use .close() when done.
#
# The original reader is required to provide both .readline() and .seek() so
# that backward reading could be done efficiently.
#
# Original reader can be opened in either binary or text mode -
# _ReverseLineReader will provide read data in the same mode as original.
#
# The ownership of wrapped reader is transferred to _ReverseLineReader.
class _ReverseLineReader:
# ._r underlying IO reader
# ._bufsize data is read in so sized chunks
# ._buf current buffer
# ._bufpos ._buf corresponds to ._r[_bufpos:...]
def __init__(rr, r, bufsize=None):
rr._r = r
if bufsize is None:
bufsize = 8192
rr._bufsize = bufsize
r.seek(0, io.SEEK_END)
rr._bufpos = r.tell()
if hasattr(r, 'encoding'): # text
rr._buf = ''
rr._lfchr = '\n'
rr._str0 = ''
else: # binary
rr._buf = b''
rr._lfchr = b'\n'
rr._str0 = b''
if hasattr(r, 'name'):
rr.name = r.name
# close releases resources associated with the reader.
def close(rr):
rr._r.close()
# readline reads next line from underlying stream.
# the lines are read backwards from end to start.
def readline(rr): # -> line | ø at EOF
chunkv = []
while 1:
# time to load next buffer
if len(rr._buf) == 0:
bufpos = max(0, rr._bufpos - rr._bufsize)
bufsize = rr._bufpos - bufpos
if bufsize == 0:
break
rr._r.seek(bufpos, io.SEEK_SET)
rr._buf = _ioreadn(rr._r, bufsize)
rr._bufpos = bufpos
assert len(rr._buf) > 0
# let's scan to the left where \n is
lf = rr._buf.rfind(rr._lfchr)
if lf == -1: # no \n - queue whole buf
chunkv.insert(0, rr._buf)
rr._buf = rr._buf[:0]
continue
if len(chunkv) == 0 and lf+1 == len(rr._buf): # started reading from ending \n
chunkv.insert(0, rr._buf[lf:])
rr._buf = rr._buf[:lf]
continue
chunkv.insert(0, rr._buf[lf+1:]) # \n of previous line found - we are done
rr._buf = rr._buf[:lf+1]
break
return rr._str0.join(chunkv)
# _ioreadn reads exactly n elements from f.
def _ioreadn(f, n):
l = n
data = '' if hasattr(f, 'encoding') else \
b''
while len(data) < n:
chunk = f.read(l)
data += chunk
l -= len(chunk)
return data[:n] # slice in case it overreads
# _ioname returns name of a file-like f. # _ioname returns name of a file-like f.
def _ioname(f): def _ioname(f):
if hasattr(f, 'name'): if hasattr(f, 'name'):
......
...@@ -40,7 +40,7 @@ zzzqqqrrrr ...@@ -40,7 +40,7 @@ zzzqqqrrrr
defer(xr.close) defer(xr.close)
# :1 # :1
_ = xr.read() _ = _1 = xr.read()
assert type(_) is xlog.SyncEvent assert type(_) is xlog.SyncEvent
assert _.pos == ("enb.xlog", 1) assert _.pos == ("enb.xlog", 1)
assert _.event == "start" assert _.event == "start"
...@@ -50,7 +50,7 @@ zzzqqqrrrr ...@@ -50,7 +50,7 @@ zzzqqqrrrr
"generator": "xlog ws://localhost:9001 ue_get[]/3.0s erab_get[]/3.0s"}} "generator": "xlog ws://localhost:9001 ue_get[]/3.0s erab_get[]/3.0s"}}
# :2 # :2
_ = xr.read() _ = _2 = xr.read()
assert type(_) is xlog.Event assert type(_) is xlog.Event
assert _.pos == ("enb.xlog", 2) assert _.pos == ("enb.xlog", 2)
assert _.event == "service attach" assert _.event == "service attach"
...@@ -61,7 +61,7 @@ zzzqqqrrrr ...@@ -61,7 +61,7 @@ zzzqqqrrrr
"srv_type": "ENB", "srv_type": "ENB",
"srv_version": "2022-12-01"}} "srv_version": "2022-12-01"}}
# :3 # :3
_ = xr.read() _ = _3 = xr.read()
assert type(_) is xlog.Message assert type(_) is xlog.Message
assert _.pos == ("enb.xlog", 3) assert _.pos == ("enb.xlog", 3)
assert _.message == "ue_get" assert _.message == "ue_get"
...@@ -77,7 +77,7 @@ zzzqqqrrrr ...@@ -77,7 +77,7 @@ zzzqqqrrrr
_ = xr.read() _ = xr.read()
# :5 (restore after bad input) # :5 (restore after bad input)
_ = xr.read() _ = _5 = xr.read()
assert type(_) is xlog.Message assert type(_) is xlog.Message
assert _.pos == ("enb.xlog", 5) assert _.pos == ("enb.xlog", 5)
assert _.message == "hello" assert _.message == "hello"
...@@ -91,6 +91,43 @@ zzzqqqrrrr ...@@ -91,6 +91,43 @@ zzzqqqrrrr
assert _ is None assert _ is None
# ---- reverse ----
f = io.BytesIO(data); f.name = "bbb.xlog"
br = xlog.Reader(f, reverse=True)
# :-1 (:5)
_ = br.read()
assert type(_) is xlog.Message
assert _.pos == ("bbb.xlog", -1)
assert _ == _5
# :-2 (:4) (bad input)
with raises(xlog.ParseError, match="bbb.xlog:-2 : invalid json"):
_ = br.read()
# :-3 (:3) (restore after bad input)
_ = br.read()
assert type(_) is xlog.Message
assert _.pos == ("bbb.xlog", -3)
assert _ == _3
# :-4 (:2)
_ = br.read()
assert type(_) is xlog.Event
assert _.pos == ("bbb.xlog", -4)
assert _ == _2
# :-5 (:1)
_ = br.read()
assert type(_) is xlog.SyncEvent
assert _.pos == ("bbb.xlog", -5)
assert _ == _1
# EOF
_ = br.read()
assert _ is None
# verify that EOF is not returned prematurely due to readahead pre-hitting it # verify that EOF is not returned prematurely due to readahead pre-hitting it
# sooner on the live stream. # sooner on the live stream.
@func @func
...@@ -142,3 +179,56 @@ def test_LogSpec(): ...@@ -142,3 +179,56 @@ def test_LogSpec():
assert spec.query == "stats" assert spec.query == "stats"
assert spec.optv == ["samples", "rf"] assert spec.optv == ["samples", "rf"]
assert spec.period == 60.0 assert spec.period == 60.0
def test_ReverseLineReader():
linev = [
'hello world',
'привет мир',
'zzz',
'αβγδ', # 2-bytes UTF-8 characters
'你好' # 3-bytes ----//----
'𩸽𩹨', # 4-bytes ----//----
'{"message":"hello"}',
]
tdata = '\n'.join(linev) + '\n' # text
bdata = tdata.encode('utf-8') # binary
# check verifies _ReverseLineReader on tdata and bdata with particular bufsize.
@func
def check(bufsize):
trr = xlog._ReverseLineReader(io.StringIO(tdata), bufsize)
brr = xlog._ReverseLineReader(io.BytesIO (bdata), bufsize)
defer(trr.close)
defer(brr.close)
tv = []
while 1:
tl = trr.readline()
if tl == '':
break
assert tl.endswith('\n')
tl = tl[:-1]
assert '\n' not in tl
tv.append(tl)
bv = []
while 1:
bl = brr.readline()
if bl == b'':
break
assert bl.endswith(b'\n')
bl = bl[:-1]
assert b'\n' not in bl
bv.append(bl.decode('utf-8'))
venil = list(reversed(linev))
assert tv == venil
assert bv == venil
# verify all buffer sizes from 1 to 10x bigger the data.
# this way we cover all tricky cases where e.g. an UTF8 character is split
# in its middle by a buffer.
for bufsize in range(1, 10*len(bdata)):
check(bufsize)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment