Commit 89bee96e authored by Kirill Smelkov's avatar Kirill Smelkov

Merge branch 'master' into t

* master:
  wcfs: Fix crash if on watch request setupWatch needs to access ZODB
  wcfs: zdata: ΔFtail tests: Fix/Adjust debug dump for computed blkRevAt
  wcfs: tests: Exercise watching @at0
  wcfs: Adjust ΔFtail/ΔBtail to allow point-queries with at=tail
  wcfs: tests: Add test do demonstrate "at out of bounds" crash on readPinWatchers -> ΔFtail.BlkRevAt
  wcfs: tests: Move tests for crashing WCFS due to old data to dedicated section
  wcfs: tests: Teach tDB to create database with initial ZBigFile changes before WCFS is started
  wcfs: tests: Always start tDB with ZBigFile pre-created before WCFS startup
  wcfs: tests: Simplify syncing WCFS to database in tDB.commit
  wcfs: tests: Inline tDB._wcsync into tDB.commit
  wcfs: tests: Split tDB.commit into .commit and ._commit
parents 9cf38ffe 38dde766
// Copyright (C) 2018-2021 Nexedi SA and Contributors.
// Copyright (C) 2018-2022 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
......@@ -935,7 +935,7 @@ func (δTtail *_ΔTtail) _forgetPast(revCut zodb.Tid) {
// revExact: false
//
// key must be tracked
// at must ∈ (tail, head]
// at must ∈ {tail} ∪ (tail, head]
func (δBtail *ΔBtail) GetAt(root zodb.Oid, key Key, at zodb.Tid) (value Value, rev zodb.Tid, valueExact, revExact bool, err error) {
defer xerr.Contextf(&err, "ΔBtail: root<%s>: get %d @%s", root, key, at)
......@@ -956,7 +956,7 @@ func (δBtail *ΔBtail) GetAt(root zodb.Oid, key Key, at zodb.Tid) (value Value,
tail := δBtail.Tail()
head := δBtail.Head()
if !(tail < at && at <= head) {
if !(tail <= at && at <= head) {
panicf("at out of bounds: at: @%s, (tail, head] = (@%s, @%s]", at, tail, head)
}
......
// Copyright (C) 2020-2021 Nexedi SA and Contributors.
// Copyright (C) 2020-2022 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
......@@ -1167,6 +1167,11 @@ func TestΔBtailGetAt(t_ *testing.T) {
}
// @at key value rev valueExact revExact
assertGetAt(at1, 1, ø, at1, false, false)
assertGetAt(at1, 2, ø, at1, false, false)
assertGetAt(at1, 3, "c", at1, true, false)
assertGetAt(at1, 4, ø, at1, true, false)
assertGetAt(at2, 1, ø, at1, false, false)
assertGetAt(at2, 2, ø, at1, false, false)
assertGetAt(at2, 3, "c", at1, true, false)
......
// Copyright (C) 2019-2021 Nexedi SA and Contributors.
// Copyright (C) 2019-2022 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
......@@ -1061,8 +1061,8 @@ func flipsign(x int64) int64 {
//
// if exact=False - what is returned is only an upper bound for last block revision.
//
// zfile must be any checkout from (tail, head]
// at must ∈ (tail, head]
// zfile must be any checkout from {tail} ∪ (tail, head]
// at must ∈ {tail} ∪ (tail, head]
// blk must be tracked
func (δFtail *ΔFtail) BlkRevAt(ctx context.Context, zfile *ZBigFile, blk int64, at zodb.Tid) (_ zodb.Tid, exact bool, err error) {
foid := zfile.POid()
......@@ -1070,17 +1070,17 @@ func (δFtail *ΔFtail) BlkRevAt(ctx context.Context, zfile *ZBigFile, blk int64
//fmt.Printf("\nblkrev #%d @%s\n", blk, at)
// assert at ∈ (tail, head]
// assert at ∈ [tail, head]
tail := δFtail.Tail()
head := δFtail.Head()
if !(tail < at && at <= head) {
if !(tail <= at && at <= head) {
panicf("at out of bounds: at: @%s, (tail, head] = (@%s, @%s]", at, tail, head)
}
// assert zfile.at ∈ (tail, head]
// assert zfile.at ∈ [tail, head]
zconn := zfile.PJar()
zconnAt := zconn.At()
if !(tail < zconnAt && zconnAt <= head) {
if !(tail <= zconnAt && zconnAt <= head) {
panicf("zconn.at out of bounds: zconn.at: @%s, (tail, head] = (@%s, @%s]", zconnAt, tail, head)
}
......@@ -1119,7 +1119,7 @@ func (δFtail *ΔFtail) BlkRevAt(ctx context.Context, zfile *ZBigFile, blk int64
//fmt.Printf(" epoch: @%s root: %s\n", epoch, root)
if root == xbtree.VDEL {
return epoch, true, nil
return epoch, (epoch > tail), nil
}
zblk, tabRev, zblkExact, tabRevExact, err := δFtail.δBtail.GetAt(root, blk, at)
......@@ -1135,7 +1135,7 @@ func (δFtail *ΔFtail) BlkRevAt(ctx context.Context, zfile *ZBigFile, blk int64
}
// if δBtail does not have entry that covers root[blk] - get it
// through any zconn with .at ∈ (tail, head].
// through any zconn with .at ∈ [tail, head].
if !zblkExact {
xblktab, err := zconn.Get(ctx, root)
if err != nil {
......
// Copyright (C) 2019-2021 Nexedi SA and Contributors.
// Copyright (C) 2019-2022 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
......@@ -386,6 +386,7 @@ func testΔFtail(t_ *testing.T, testq chan ΔFTestEntry) {
}
blkRevAt[commit.At] = blkRev
/*
if false {
fmt.Printf("blkRevAt[@%s]:\n", commit.AtSymb())
blkv := []int64{}
for blk := range blkRev {
......@@ -395,8 +396,9 @@ func testΔFtail(t_ *testing.T, testq chan ΔFTestEntry) {
return blkv[i] < blkv[j]
})
for _, blk := range blkv {
fmt.Printf(" #%d: %v\n", blk, blkRev[blk])
fmt.Printf(" #%d: @%s\n", blk, t.AtSymb(blkRev[blk]))
}
}
*/
// update zfile
......@@ -571,9 +573,16 @@ func testΔFtail(t_ *testing.T, testq chan ΔFTestEntry) {
return blkv[i] < blkv[j]
})
for j := 0; j < len(vδf); j++ {
at := vδf[j].Rev
blkRev := blkRevAt[at]
for j := -1; j < len(vδf); j++ {
var at zodb.Tid
var blkRev map[int64]zodb.Tid
if j == -1 {
at = δFtail.Tail()
// blkRev remains ø
} else {
at = vδf[j].Rev
blkRev = blkRevAt[at]
}
for _, blk := range blkv {
rev, exact, err := δFtail.BlkRevAt(ctx, zfile, blk, at); X(err)
revOK, ok := blkRev[blk]
......
......@@ -1613,6 +1613,10 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T
defer head.zheadMu.RUnlock()
headAt := head.zconn.At()
// TODO better ctx = transaction.PutIntoContext(ctx, txn)
ctx, cancel := xcontext.Merge(ctx, head.zconn.TxnCtx)
defer cancel()
if at != zodb.InvalidTid && at < bfdir.δFtail.Tail() {
return fmt.Errorf("too far away back from head/at (@%s); δt = %s",
headAt, headAt.Time().Sub(at.Time().Time))
......
......@@ -394,8 +394,13 @@ class tWCFS(_tWCFS):
class tDB(tWCFS):
# __init__ initializes test database and wcfs.
#
# old_data can be optinally provided to specify ZBigFile revisions to
# create before wcfs startup. old_data is []changeDelta - see .commit
# and .change for details.
@func
def __init__(t, _with_old_data=False):
def __init__(t, old_data=[]):
t.root = testdb.dbopen()
def _(): # close/unlock db if __init__ fails
exc = sys.exc_info()[1]
......@@ -403,10 +408,6 @@ class tDB(tWCFS):
dbclose(t.root)
defer(_)
# start wcfs after testdb is created
super(tDB, t).__init__()
# ZBigFile(s) scheduled for commit
t._changed = {} # ZBigFile -> {} blk -> data
......@@ -414,9 +415,25 @@ class tDB(tWCFS):
t.tail = t.root._p_jar.db().storage.lastTransaction()
t.dFtail = [] # of DF; head = dFtail[-1].rev
# ID of the thread which created tDB
# ( transaction plays dirty games with threading.local and we have to
# check the thread is the same when .root is used )
t._maintid = gettid()
# prepare initial objects for test: zfile, nonzfile
t.root['!file'] = t.nonzfile = Persistent()
t.root['zfile'] = t.zfile = ZBigFile(blksize)
t.at0 = t._commit()
# commit initial data before wcfs starts
for changeDelta in old_data:
t._commit(t.zfile, changeDelta)
# start wcfs after testdb is created and initial data is committed
super(tDB, t).__init__()
# fh(.wcfs/zhead) + history of zhead read from there
t._wc_zheadfh = open(t.wc.mountpoint + "/.wcfs/zhead")
t._wc_zheadv = []
# whether head/ ZBigFile(s) blocks were ever accessed via wcfs.
# this is updated only explicitly via ._blkheadaccess() .
......@@ -426,22 +443,6 @@ class tDB(tWCFS):
t._files = set()
t._wlinks = set()
# ID of the thread which created tDB
# ( transaction plays dirty games with threading.local and we have to
# check the thread is the same when .root is used )
t._maintid = gettid()
# prepare initial objects for test: zfile, nonzfile
if not _with_old_data:
t.root['!file'] = t.nonzfile = Persistent()
t.root['zfile'] = t.zfile = ZBigFile(blksize)
t.at0 = t.commit()
else:
t.at0 = tAt(t, t.tail)
t.nonzfile = t.root['!file']
t.zfile = t.root['zfile']
@property
def head(t):
return t.dFtail[-1].rev
......@@ -491,6 +492,21 @@ class tDB(tWCFS):
# zf and changeDelta can be optionally provided, in which case .change(zf,
# changeDelta) call is made before actually committing.
def commit(t, zf=None, changeDelta=None): # -> tAt
head = t._commit(zf, changeDelta)
# make sure wcfs is synchronized to committed transaction
l = t._wc_zheadfh.readline()
#print('> zhead read: %r' % l)
l = l.rstrip('\n')
wchead = tAt(t, fromhex(l))
if wchead != t.dFtail[-1].rev:
raise RuntimeError("commit #%d: wcsync: wczhead (%s) != zhead (%s)" %
(len(t.dFtail), wchead, t.dFtail[-1].rev))
assert t.wc._read("head/at") == h(head)
return head
def _commit(t, zf=None, changeDelta=None): # -> tAt
if zf is not None:
assert changeDelta is not None
t.change(zf, changeDelta)
......@@ -538,25 +554,8 @@ class tDB(tWCFS):
print('M: f<%s>\t%s' % (h(zf._p_oid), sorted(zfDelta.keys())))
t._changed = {}
# synchronize wcfs to db, and we are done
t._wcsync()
return head
# _wcsync makes sure wcfs is synchronized to latest committed transaction.
def _wcsync(t):
while len(t._wc_zheadv) < len(t.dFtail):
l = t._wc_zheadfh.readline()
#print('> zhead read: %r' % l)
l = l.rstrip('\n')
wchead = tAt(t, fromhex(l))
i = len(t._wc_zheadv)
if wchead != t.dFtail[i].rev:
raise RuntimeError("wcsync #%d: wczhead (%s) != zhead (%s)" % (i, wchead, t.dFtail[i].rev))
t._wc_zheadv.append(wchead)
# head/at = last txn of whole db
assert t.wc._read("head/at") == h(t.head)
# _blkheadaccess marks head/zf[blk] accessed.
def _blkheadaccess(t, zf, blk):
t._blkaccessed(zf).add(blk)
......@@ -1324,37 +1323,6 @@ def test_wcfs_basic_read_aftertail():
assert _(100*blksize) == b''
# verify that wcfs does not panic with "no current transaction" when processing
# invalidations if it needs to access ZODB during handleδZ.
@func
def test_wcfs_basic_invalidation_wo_dFtail_coverage():
# prepare initial data with single change to zfile[0].
@func
def _():
t = tDB(); zf = t.zfile
defer(t.close)
t.commit(zf, {0:'a'})
_()
# start wcfs with ΔFtail/ΔBtail not covering that initial change.
t = tDB(_with_old_data=True); zf = t.zfile
defer(t.close)
f = t.open(zf)
t.commit(zf, {1:'b1'}) # arbitrary commit to non-0 blk
f._assertBlk(0, 'a') # [0] becomes tracked (don't verify against computed dataok due to _with_old_data)
# wcfs was crashing on processing further invalidation to blk 0 because
# - ΔBtail.GetAt([0], head) returns valueExact=false, and so
# - ΔFtail.BlkRevAt activates "access ZODB" codepath,
# - but handleδZ was calling ΔFtail.BlkRevAt without properly putting zhead's transaction into ctx.
# -> panic.
t.commit(zf, {0:'a2'})
# just in case
f.assertBlk(0, 'a2')
# ---- verify wcfs functionality that depends on isolation protocol ----
......@@ -1602,8 +1570,8 @@ def test_wcfs_watch_setup():
# new watch + update at_i -> at_j
wl = t.openwatch()
# XXX check @at0 ?
wl.watch(zf, at1, {0:at0, 2:at1, 3:at0, 5:at0}) # -> at1 (new watch) XXX at0 -> ø?
wl.watch(zf, at0, {0:at0, 2:at0, 3:at0, 5:at0}) # -> at0 (new watch) XXX at0 -> ø?
wl.watch(zf, at1, { 2:at1, }) # at0 -> at1
wl.watch(zf, at2, { 2:at2, 3:at2, 5:at2}) # at1 -> at2
wl.watch(zf, at3, {0:None, 2:at3, 5:at3}) # at2 -> at3
wl.watch(zf, at4, { 2:None, 5:at4}) # at3 -> at4 f(5) pinned even it was not accessed >=4
......@@ -1804,6 +1772,8 @@ def test_wcfs_no_pin_twice():
# verify watching for 2 files over single watch link.
#
# NOTE this test also verifies how wcfs handles ZBigFile created after wcfs startup.
@func
def test_wcfs_watch_2files():
t = tDB(); zf1 = t.zfile
......@@ -1850,6 +1820,44 @@ def test_wcfs_watch_2files():
# TODO new watch request while previous watch request is in progress (over the same /head/watch handle)
# TODO @revX/ is automatically removed after some time
# ----------------------------------------
# verify that wcfs does not panic with "no current transaction" / "at out of
# bounds" on read/invalidate/watch codepaths.
@func
def test_wcfs_crash_old_data():
# start wcfs with ΔFtail/ΔBtail not covering initial data.
t = tDB(old_data=[{0:'a'}]); zf = t.zfile; at1 = t.head
defer(t.close)
f = t.open(zf)
# ΔFtail coverage is currently (at1,at1]
wl = t.openwatch()
wl.watch(zf, at1, {})
# wcfs was crashing on readPinWatcher -> ΔFtail.BlkRevAt with
# "at out of bounds: at: @at1, (tail,head] = (@at1,@at1]
# because BlkRevAt(at=tail) query was disallowed.
f.assertBlk(0, 'a') # [0] becomes tracked
at2 = t.commit(zf, {1:'b1'}) # arbitrary commit to non-0 blk
# wcfs was crashing on processing invalidation to blk 0 because
# 1. ΔBtail.GetAt([0], head) returns valueExact=false, and so
# 2. ΔFtail.BlkRevAt activates "access ZODB" codepath,
# 3. but handleδZ was calling ΔFtail.BlkRevAt without putting zhead's transaction into ctx.
# -> panic.
at3 = t.commit(zf, {0:'a2'})
# just in case
f.assertBlk(0, 'a2')
# wcfs was crashing in setting up watch because of "1" and "2" from above, and
# 3. setupWatch was calling ΔFtail.BlkRevAt without putting zhead's transaction into ctx.
wl2 = t.openwatch()
wl2.watch(zf, at2, {0:at1})
# ---- misc ---
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment