Commit 76feca9e authored by Kirill Smelkov's avatar Kirill Smelkov

Merge branch 't2' into t

* t2:
  .
  .
  .
  .
  .
  .
  .
  .
  .
  .
  .
  .
  .
  .
parents b5e4e424 9b33e0e3
......@@ -308,7 +308,7 @@ package main
// 4.4) processing ZODB invalidations and serving file reads (see 7) are
// organized to be mutually exclusive.
//
// 5.5) similarly, processing ZODB invalidations and setting up watches (see
// 4.5) similarly, processing ZODB invalidations and setting up watches (see
// 7.2) are organized to be mutually exclusive.
//
// 5) after OS file cache was invalidated, we resync zhead to new database
......@@ -332,11 +332,16 @@ package main
//
// min(rev) in δFtail is min(@at) at which head/bigfile/file is currently watched (see below).
//
// to support initial openings with @at being slightly in the past, we also
// To support initial openings with @at being slightly in the past, we also
// make sure that min(rev) is enough to cover last 1 minute of history
// from head/at.
//
// See ΔFtail documentation in internal/zdata/δftail.go for more details.
// Scalability of δFtail plays important role in scalability of WCFS because
// δFtail, besides other places, is queried and potentially rebuilt at every
// FUSE read request (see 7 below).
//
// See documentation in internal/zdata/δftail.go for more details on ΔFtail
// and its scalability properties.
//
// 7) when we receive a FUSE read(#blk) request to a head/bigfile/file, we process it as follows:
//
......
......@@ -45,7 +45,7 @@ from thread import get_ident as gettid
from time import gmtime
from errno import EINVAL, ENOTCONN
from resource import setrlimit, getrlimit, RLIMIT_MEMLOCK
from golang import go, chan, select, func, defer, default, error, b
from golang import go, chan, select, func, defer, error, b
from golang import context, errors, sync, time
from zodbtools.util import ashex as h, fromhex
import pytest; xfail = pytest.mark.xfail
......@@ -198,7 +198,7 @@ def test_join_after_crash():
procmounts_lookup_wcfs(zurl)
# verify that start successfuly starts server if previous wcfs exited uncleanly.
# verify that start successfully starts server if previous wcfs exited uncleanly.
@func
def test_start_after_crash():
zurl = testzurl
......@@ -345,7 +345,7 @@ class DFile:
#
# tDB must be explicitly closed once no longer used.
#
# XXX print -> t.trace/debug() + t.verbose depending on py.test -v -v ?
# TODO(?) print -> t.trace/debug() + t.verbose depending on py.test -v -v ?
class tWCFS(_tWCFS):
@func
def __init__(t):
......@@ -420,7 +420,7 @@ class tDB(tWCFS):
# whether head/ ZBigFile(s) blocks were ever accessed via wcfs.
# this is updated only explicitly via ._blkheadaccess() .
t._blkaccessedViaHead = {} # ZBigFile -> set(blk) XXX ZF -> foid ? (threads)
t._blkaccessedViaHead = {} # ZBigFile -> set(blk)
# tracked opened tFiles & tWatchLinks
t._files = set()
......@@ -555,10 +555,9 @@ class tDB(tWCFS):
# _blkheadaccess marks head/zf[blk] accessed.
def _blkheadaccess(t, zf, blk):
# XXX locking needed? or we do everything serially?
t._blkaccessed(zf).add(blk)
# _blkaccessed returns set describing whether head/zf blocks were ever accessed.
# _blkaccessed returns set describing which head/zf blocks were ever accessed.
def _blkaccessed(t, zf): # set(blk)
return t._blkaccessedViaHead.setdefault(zf, set())
......@@ -583,7 +582,7 @@ class tFile:
t.fmmap = None
tdb._files.add(t)
# make sure that wcfs reports zf.blksize as preffered block size for IO.
# make sure that wcfs reports zf.blksize as preferred block size for IO.
# wcfs.py also uses .st_blksize in blk -> byte offset computation.
st = os.fstat(t.f.fileno())
assert st.st_blksize == t.blksize
......@@ -631,7 +630,7 @@ class tFile:
# NOTE with a block completely covered by MADV_RANDOM the kernel
# issues 4K sized reads; wcfs starts uploading into cache almost
# immediately, but the kernel still issues many reads to read the
# full 2MB of the block. This works slow.
# full 2MB of the block. This works slowly.
# XXX -> investigate and maybe make read(while-uploading) wait for
# uploading to complete and only then return? (maybe it will help
# performance even in normal case)
......@@ -702,7 +701,7 @@ class tFile:
# provided pinokByWLink when it is present.
@func
def assertBlk(t, blk, dataok, pinokByWLink=None):
# XXX -> assertCtx('blk #%d' % blk)
# TODO -> assertCtx('blk #%d' % blk)
def _():
assertCtx = 'blk #%d' % blk
_, e, _ = sys.exc_info()
......@@ -785,9 +784,6 @@ class tFile:
# failed and shut down. But on test shutdown .fmmap is unmapped for
# all opened tFiles, and so read will hit SIGSEGV. Prepare to catch
# that SIGSEGV here.
#
# XXX after WatchLink is moved to pyx/nogil, do we still need to do
# here with nogil?
have_read = chan(1)
def _():
try:
......@@ -824,7 +820,7 @@ class tFile:
assert t.cached()[blk] > 0
# verify full data of the block
# XXX assert individually for every block's page? (easier debugging?)
# TODO(?) assert individually for every block's page? (easier debugging?)
assert blkview.tobytes() == dataok
# we just accessed the block in full - it has to be in OS cache completely
......@@ -886,27 +882,6 @@ class tWatchLink(wcfs.WatchLink):
w.pinned = {}
t._watching = {}
# XXX just wrap req.at with tAt inpace
"""
# recvReq is the same as WatchLink.recvReq but returns tSrvReq instead of PinReq.
def recvReq(t, ctx): # -> tSrvReq | None when EOF
req = super(tWatchLink, t).recvReq(ctx)
if req is not None:
assert req.__class__ is wcfs.PinReq
req.__class__ = tSrvReq
return req
class tSrvReq(wcfs.PinReq):
# _parse is the same as PinReq._parse, but returns at wrapped with tAt.
# XXX -> just wrap `at`
def _parse(req): # -> (foid, blk, at|None)
foid, blk, at = super(tSrvReq, req)._parse()
if at is not None:
at = tAt(req.wlink.tdb, at)
return foid, blk, at
"""
# ---- infrastructure: watch setup/adjust ----
......@@ -1069,7 +1044,7 @@ def doCheckingPin(f, pinokByWLink, pinfunc=None): # -> []event(str)
pinv = wlink._expectPin(ctx, zf, pinok)
if len(pinv) > 0:
ev.append('pin rx') # XXX + zf, pin details?
ev.append('pin rx')
# increase probability to receive erroneous extra pins
tdelay()
......@@ -1078,7 +1053,7 @@ def doCheckingPin(f, pinokByWLink, pinfunc=None): # -> []event(str)
if pinfunc is not None:
for p in pinv:
pinfunc(wlink, p.foid, p.blk, p.at)
ev.append('pin ack pre') # XXX +details?
ev.append('pin ack pre')
for p in pinv:
assert w.foid == p.foid
if p.at is None: # unpin to @head
......@@ -1144,8 +1119,7 @@ def _expectPin(twlink, ctx, zf, expect): # -> []SrvReq
# _blkDataAt returns expected zf[blk] data and its revision as of @at database state.
#
# If the block is hole - (b'', at0) is returned. XXX -> @z64?
# XXX ret for when the file did not existed at all?
# XXX ret ----//---- blk was after file size?
# Hole include cases when the file does not exists, or when blk is > file size.
@func(tDB)
def _blkDataAt(t, zf, blk, at): # -> (data, rev)
if at is None:
......@@ -1157,9 +1131,9 @@ def _blkDataAt(t, zf, blk, at): # -> (data, rev)
# changes to zf[blk] <= at
blkhistoryat = [_ for _ in vdf if blk in _.ddata and _.rev <= at]
if len(blkhistoryat) == 0:
# blk did not existed @at # XXX verify whether file was existing at all
# blk did not existed @at
data = b''
rev = t.dFtail[0].rev # was hole - at0 XXX -> pin to z64
rev = t.dFtail[0].rev # was hole - at0
else:
_ = blkhistoryat[-1]
data = _.ddata[blk]
......@@ -1243,7 +1217,7 @@ def test_wcfs_basic():
f.assertData (['','','c1'], mtime=t.head)
# >>> (@at2) commit again -> we can see both latest and snapshotted states
# NOTE blocks d(4) and f(5) will be accessed only in the end
# NOTE blocks e(4) and f(5) will be accessed only in the end
at2 = t.commit(zf, {2:'c2', 3:'d2', 5:'f2'})
# f @head
......@@ -1254,33 +1228,33 @@ def test_wcfs_basic():
# f @at1
f1 = t.open(zf, at=at1)
f1.assertCache([0,0,1])
f1.assertData (['','','c1']) # XXX + mtime=at1?
f1.assertData (['','','c1']) # TODO + mtime=at1
# >>> (@at3) commit again without changing zf size
f2 = t.open(zf, at=at2)
at3 = t.commit(zf, {2:'c3', 5:'f3'}) # FIXME + a3 after δbtree works (hole -> zblk)
at3 = t.commit(zf, {0:'a3', 2:'c3', 5:'f3'})
f.assertCache([1,1,0,1,0,0])
f.assertCache([0,1,0,1,0,0])
# f @head is opened again -> cache must not be lost
f_ = t.open(zf)
f_.assertCache([1,1,0,1,0,0])
f_.assertCache([0,1,0,1,0,0])
f_.close()
f.assertCache([1,1,0,1,0,0])
f.assertCache([0,1,0,1,0,0])
# f @head
f.assertCache([1,1,0,1,0,0])
f.assertData (['','','c3','d2','x','x'], mtime=t.head)
f.assertCache([0,1,0,1,0,0])
f.assertData (['a3','','c3','d2','x','x'], mtime=t.head)
# f @at2
# NOTE f(2) is accessed but via @at/ not head/ ; f(2) in head/zf remains unaccessed
f2.assertCache([0,0,1,0,0,0])
f2.assertData (['','','c2','d2','','f2']) # XXX mtime=at2?
f2.assertData (['','','c2','d2','','f2']) # TODO mtime=at2
# f @at1
f1.assertCache([1,1,1])
f1.assertData (['','','c1']) # XXX mtime=at1?
f1.assertData (['','','c1']) # TODO mtime=at1
# >>> f close / open again -> cache must not be lost
......@@ -1292,7 +1266,7 @@ def test_wcfs_basic():
assert sum(f.cached()) > 4*1/2 # > 50%
# verify all blocks
f.assertData(['','','c3','d2','','f3'])
f.assertData(['a3','','c3','d2','','f3'])
f.assertCache([1,1,1,1,1,1])
......@@ -1311,8 +1285,8 @@ def test_wcfs_basic_hole2zblk():
f.assertCache([1,0,1])
f.assertData(['','b2','c1'])
# XXX ZBlk copied from blk1 -> blk2 ; for the same file and for file1 -> file2 (δbtree)
# XXX ZBlk moved from blk1 -> blk2 ; for the same file and for file1 -> file2 (δbtree)
# TODO ZBlk copied from blk1 -> blk2 ; for the same file and for file1 -> file2
# TODO ZBlk moved from blk1 -> blk2 ; for the same file and for file1 -> file2
# verify that read after file size returns (0, ok)
# (the same behaviour as on e.g. ext4 and as requested by posix)
......@@ -1430,7 +1404,6 @@ def test_wcfs_watch_robust():
wl.close()
# verify that `watch file @at` -> error, for @at when file did not existed.
@xfail # check that file exists @at
@func
def test_wcfs_watch_before_create():
t = tDB(); zf = t.zfile
......@@ -1448,12 +1421,12 @@ def test_wcfs_watch_before_create():
wl = t.openwatch()
assert wl.sendReq(timeout(), b"watch %s @%s" % (h(zf2._p_oid), h(at1))) == \
"error setup watch f<%s> @%s: " % (h(zf2._p_oid), h(at1)) + \
"file does not exist at that database state"
"file epoch detected @%s in between (at,head=@%s]" % (h(at2), h(t.head))
wl.close()
# verify that watch @at_i -> @at_j ↓ is rejected
# XXX we might want to allow going back in history later.
# TODO(?) we might want to allow going back in history later.
@func
def test_wcfs_watch_going_back():
t = tDB(); zf = t.zfile
......@@ -1478,7 +1451,7 @@ def test_wcfs_watch_going_back():
def test_wcfs_pintimeout_kill():
# adjusted wcfs timeout to kill client who is stuck not providing pin reply
tkill = 3*time.second
t = tDB(); zf = t.zfile # XXX wcfs args += tkill
t = tDB(); zf = t.zfile # XXX wcfs args += tkill=<small>
defer(t.close)
at1 = t.commit(zf, {2:'c1'})
......@@ -1514,7 +1487,7 @@ def test_wcfs_pintimeout_kill():
# watch with @at > head - must wait for head to become >= at.
# XXX too far ahead - reject?
# TODO(?) too far ahead - reject?
@func
def test_wcfs_watch_setup_ahead():
t = tDB(); zf = t.zfile
......@@ -1565,17 +1538,17 @@ def test_wcfs_watch_setup():
defer(t.close)
f = t.open(zf)
at1 = t.commit(zf, {2:'c1'}) # XXX + hole -> zblk
at1 = t.commit(zf, {2:'c1'})
at2 = t.commit(zf, {2:'c2', 3:'d2', 4:'e2', 5:'f2'})
at3 = t.commit(zf, {2:'c3', 5:'f3'})
at3 = t.commit(zf, {0:'a3', 2:'c3', 5:'f3'})
f.assertData(['','','c3','d2','x','f3']) # access everything except e as of @at3
f.assertData(['a3','','c3','d2','x','f3']) # access everything except e as of @at3
f.assertCache([1,1,1,1,0,1])
# change again, but don't access e and f
at4 = t.commit(zf, {2:'c4', 4:'e4', 5:'f4'})
at5 = t.commit(zf, {3:'d5', 5:'f5'})
f.assertData(['','','c4','d5','x','x'])
f.assertData(['a3','','c4','d5','x','x'])
f.assertCache([1,1,1,1,0,0])
# some watch setup/update requests with explicit pinok (also partly
......@@ -1586,19 +1559,19 @@ def test_wcfs_watch_setup():
wl = t.openwatch()
wl.watch(zf, at, pinok)
wl.close()
assertNewWatch(at1, {2:at1, 3:at0, 5:at0})
assertNewWatch(at2, {2:at2, 3:at2, 5:at2})
assertNewWatch(at3, {2:at3, 3:at2, 5:at3}) # f(5) is pinned, even though it was not
assertNewWatch(at1, {0:at0, 2:at1, 3:at0, 5:at0})
assertNewWatch(at2, {0:at0, 2:at2, 3:at2, 5:at2})
assertNewWatch(at3, { 2:at3, 3:at2, 5:at3}) # f(5) is pinned, even though it was not
assertNewWatch(at4, { 3:at2, 5:at4}) # accessed after at3
assertNewWatch(at5, { })
# new watch + update at_i -> at_j
wl = t.openwatch()
# XXX check @at0 ?
wl.watch(zf, at1, {2:at1, 3:at0, 5:at0}) # -> at1 (new watch) XXX at0 -> ø?
wl.watch(zf, at2, {2:at2, 3:at2, 5:at2}) # at1 -> at2
wl.watch(zf, at3, {2:at3, 5:at3}) # at2 -> at3
wl.watch(zf, at4, {2:None, 5:at4}) # at3 -> at4 f(5) pinned even it was not accessed >=4
wl.watch(zf, at1, {0:at0, 2:at1, 3:at0, 5:at0}) # -> at1 (new watch) XXX at0 -> ø?
wl.watch(zf, at2, { 2:at2, 3:at2, 5:at2}) # at1 -> at2
wl.watch(zf, at3, {0:None, 2:at3, 5:at3}) # at2 -> at3
wl.watch(zf, at4, { 2:None, 5:at4}) # at3 -> at4 f(5) pinned even it was not accessed >=4
wl.watch(zf, at5, { 3:None, 5:None}) # at4 -> at5 (current head)
wl.close()
......@@ -1606,7 +1579,7 @@ def test_wcfs_watch_setup():
for zf in t.zfiles():
for revv in t.iter_revv():
print('\n--------')
print(' -> '.join(['%s' % _ for _ in revv])) # XXX join joins bytes as raw
print(' -> '.join(['%s' % _ for _ in revv]))
wl = t.openwatch()
wl.watch(zf, revv[0])
wl.watch(zf, revv[0]) # verify at_i -> at_i
......@@ -1622,11 +1595,11 @@ def test_wcfs_watch_vs_access():
defer(t.close)
f = t.open(zf)
at1 = t.commit(zf, {2:'c1'}) # XXX + hole -> zblk
at1 = t.commit(zf, {2:'c1'})
at2 = t.commit(zf, {2:'c2', 3:'d2', 5:'f2'})
at3 = t.commit(zf, {2:'c3', 5:'f3'})
at3 = t.commit(zf, {0:'a3', 2:'c3', 5:'f3'})
f.assertData(['','','c3','d2','x','x'])
f.assertData(['a3','','c3','d2','x','x'])
f.assertCache([1,1,1,1,0,0])
# watched + commit -> read -> receive pin messages.
......@@ -1644,7 +1617,7 @@ def test_wcfs_watch_vs_access():
wl2 = t.openwatch(); w2 = wl2.watch(zf, at2)
assert w2.at == at2
assert w2.pinned == {2: at2}
assert w2.pinned == {0:at0, 2:at2}
# w_assertPin asserts on state of .pinned for {w3,w3_,w2}
def w_assertPin(pinw3, pinw3_, pinw2):
......@@ -1653,30 +1626,30 @@ def test_wcfs_watch_vs_access():
assert w2.pinned == pinw2
f.assertCache([1,1,1,1,0,0])
at4 = t.commit(zf, { 2:'c4', 5:'f4', 6:'g4'}) # FIXME + b4 after δbtree works + update vvv
f.assertCache([1,1,0,1,0,0,0])
at4 = t.commit(zf, {1:'b4', 2:'c4', 5:'f4', 6:'g4'})
f.assertCache([1,0,0,1,0,0,0])
f.assertBlk(0, '', {wl3: {}, wl3_: {}, wl2: {}})
w_assertPin( {}, {}, {2:at2})
f.assertBlk(0, 'a3', {wl3: {}, wl3_: {}, wl2: {}})
w_assertPin( {}, {}, {0:at0, 2:at2})
f.assertBlk(1, '', {wl3: {}, wl3_: {}, wl2: {}})
w_assertPin( {}, {}, {2:at2})
f.assertBlk(1, 'b4', {wl3: {1:at0}, wl3_: {1:at0}, wl2: {1:at0}})
w_assertPin( {1:at0}, {1:at0}, {0:at0, 1:at0, 2:at2})
f.assertBlk(2, 'c4', {wl3: {2:at3}, wl3_: {2:at3}, wl2: {}})
w_assertPin( {2:at3}, {2:at3}, {2:at2})
w_assertPin( {1:at0, 2:at3}, {1:at0, 2:at3}, {0:at0, 1:at0, 2:at2})
f.assertBlk(3, 'd2', {wl3: {}, wl3_: {}, wl2: {}})
w_assertPin( {2:at3}, {2:at3}, {2:at2})
w_assertPin( {1:at0, 2:at3}, {1:at0, 2:at3}, {0:at0, 1:at0, 2:at2})
# blk4 is hole @head - the same as at earlier db view - not pinned
f.assertBlk(4, '', {wl3: {}, wl3_: {}, wl2: {}})
w_assertPin( {2:at3}, {2:at3}, {2:at2})
w_assertPin( {1:at0, 2:at3}, {1:at0, 2:at3}, {0:at0, 1:at0, 2:at2})
# f(5) is kept unaccessed (see ^^^)
assert f.cached()[5] == 0
f.assertBlk(6, 'g4', {wl3: {6:at0}, wl3_: {6:at0}, wl2: {6:at0}}) # XXX at0->ø?
w_assertPin( {2:at3, 6:at0}, {2:at3, 6:at0}, {2:at2, 6:at0})
w_assertPin( {1:at0, 2:at3, 6:at0}, {1:at0, 2:at3, 6:at0}, {0:at0, 1:at0, 2:at2, 6:at0})
# commit again:
# - c(2) is already pinned -> wl3 not notified
......@@ -1694,32 +1667,32 @@ def test_wcfs_watch_vs_access():
assert w3_.pinned == {}; assert w3_.at == z64 # wl3_ unsubscribed from zf
assert w2.pinned == {}; assert w2.at == z64 # wl2 closed
f.assertBlk(0, '', {wl3: {}, wl3_: {}}) # no change
w_assertPin( {2:at3, 6:at0})
f.assertBlk(0, 'a3', {wl3: {}, wl3_: {}}) # no change
w_assertPin( {1:at0, 2:at3, 6:at0})
f.assertBlk(1, '', {wl3: {}, wl3_: {}})
w_assertPin( {2:at3, 6:at0})
f.assertBlk(1, 'b4', {wl3: {}, wl3_: {}})
w_assertPin( {1:at0, 2:at3, 6:at0})
f.assertBlk(2, 'c5', {wl3: {}, wl3_: {}}) # c(2) already pinned on wl3
w_assertPin( {2:at3, 6:at0})
w_assertPin( {1:at0, 2:at3, 6:at0})
f.assertBlk(3, 'd5', {wl3: {3:at2}, wl3_: {}}) # d(3) was not pinned on wl3; wl3_ not notified
w_assertPin( {2:at3, 3:at2, 6:at0})
w_assertPin( {1:at0, 2:at3, 3:at2, 6:at0})
f.assertBlk(4, '', {wl3: {}, wl3_: {}})
w_assertPin( {2:at3, 3:at2, 6:at0})
w_assertPin( {1:at0, 2:at3, 3:at2, 6:at0})
# f(5) is kept still unaccessed (see ^^^)
assert f.cached()[5] == 0
f.assertBlk(6, 'g4', {wl3: {}, wl3_: {}})
w_assertPin( {2:at3, 3:at2, 6:at0})
w_assertPin( {1:at0, 2:at3, 3:at2, 6:at0})
# advance watch - receives new pins/unpins to @head.
# this is also tested ^^^ in `at_i -> at_j -> ...` watch setup/adjust.
# NOTE f(5) is not affected because it was not pinned previously.
wl3.watch(zf, at4, {2:at4, 6:None}) # at3 -> at4
wl3.watch(zf, at4, {1:None, 2:at4, 6:None}) # at3 -> at4
w_assertPin( {2:at4, 3:at2})
# access f(5) -> wl3 should be correctly pinned
......@@ -1839,8 +1812,8 @@ def test_wcfs_watch_2files():
# XXX new watch request while previous watch request is in progress (over the same /head/watch handle)
# XXX @revX/ is automatically removed after some time
# TODO new watch request while previous watch request is in progress (over the same /head/watch handle)
# TODO @revX/ is automatically removed after some time
# ---- misc ---
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment