Commit 2de41552 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 53b681e3
...@@ -39,7 +39,8 @@ from golang import go, chan, select, func, defer ...@@ -39,7 +39,8 @@ from golang import go, chan, select, func, defer
from golang import context, sync, time from golang import context, sync, time
from golang.gcompat import qq from golang.gcompat import qq
from zodbtools.util import ashex as h, fromhex from zodbtools.util import ashex as h, fromhex
from pytest import raises, mark import pytest; xfail = pytest.mark.xfail
from pytest import raises, fail
from six import reraise from six import reraise
from .internal import mm from .internal import mm
from .internal.wcfs_test import read_nogil, install_sigbus_trap, fadvise_dontneed from .internal.wcfs_test import read_nogil, install_sigbus_trap, fadvise_dontneed
...@@ -166,7 +167,7 @@ class DFile: ...@@ -166,7 +167,7 @@ class DFile:
# tDB provides database/wcfs testing environment. # tDB provides database/wcfs testing environment.
# #
# Database root and wcfs connection are represented by .root and .wc correspondingly. # Database root and wcfs connection are represented by .root and .wc correspondingly.
# The database is initialized with one ZBigFile created (XXX .zfile) # The database is initialized with one ZBigFile created and opened via ZODB connection as .zfile .
# #
# The primary way to access wcfs is by opening BigFiles and WatchLinks. # The primary way to access wcfs is by opening BigFiles and WatchLinks.
# A BigFile opened under tDB is represented as tFile - see .open for details. # A BigFile opened under tDB is represented as tFile - see .open for details.
...@@ -791,14 +792,7 @@ class tWatchLink: ...@@ -791,14 +792,7 @@ class tWatchLink:
# #
# only 1 sendReq must be used at a time. # XXX relax? # only 1 sendReq must be used at a time. # XXX relax?
def sendReq(t, ctx, req): # reply | None when EOF def sendReq(t, ctx, req): # reply | None when EOF
stream = 1 rxq = t._sendReq(ctx, req)
rxq = chan()
with t._rxmu:
assert stream not in t._rxtab
t._rxtab[stream] = rxq
t._send(stream, req)
_, _rx = select( _, _rx = select(
ctx.done().recv, # 0 ctx.done().recv, # 0
...@@ -808,6 +802,17 @@ class tWatchLink: ...@@ -808,6 +802,17 @@ class tWatchLink:
raise ctx.err() raise ctx.err()
return _rx return _rx
def _sendReq(t, ctx, req): # -> rxq
stream = 1
rxq = chan()
with t._rxmu:
assert stream not in t._rxtab
t._rxtab[stream] = rxq
t._send(stream, req)
return rxq
# recvReq receives client <- server request. # recvReq receives client <- server request.
# #
# multiple recvReq could be used at a time. # multiple recvReq could be used at a time.
...@@ -918,7 +923,7 @@ def watch(twlink, zf, at, pinok=None): # -> tWatch ...@@ -918,7 +923,7 @@ def watch(twlink, zf, at, pinok=None): # -> tWatch
# blk ∉ pin_prev, blk ∈ pin -> cannot happen, except on first start # blk ∉ pin_prev, blk ∈ pin -> cannot happen, except on first start
if blk not in pin_prev and blk in pin: if blk not in pin_prev and blk in pin:
if at_prev is not None: if at_prev is not None:
assert False, '#%d pinned %s; not pinned %s' % (t.hat(at_prev), t.hat(at)) fail('#%d pinned %s; not pinned %s' % (t.hat(at_prev), t.hat(at)))
# blk ∈ pin -> blk is tracked; has rev > at # blk ∈ pin -> blk is tracked; has rev > at
# (see criteria in _pinnedAt) # (see criteria in _pinnedAt)
...@@ -1043,7 +1048,7 @@ def doCheckingPin(f, pinokByWLink, pinfunc=None): # -> []event(str) ...@@ -1043,7 +1048,7 @@ def doCheckingPin(f, pinokByWLink, pinfunc=None): # -> []event(str)
for p in pinv: for p in pinv:
assert w.foid == p.foid assert w.foid == p.foid
if p.at is None: # unpin to @head if p.at is None: # unpin to @head
assert p.blk in w.pinned # must have been pinned before XXX correct? assert p.blk in w.pinned # must have been pinned before
del w.pinned[p.blk] del w.pinned[p.blk]
else: else:
w.pinned[p.blk] = p.at w.pinned[p.blk] = p.at
...@@ -1058,7 +1063,7 @@ def doCheckingPin(f, pinokByWLink, pinfunc=None): # -> []event(str) ...@@ -1058,7 +1063,7 @@ def doCheckingPin(f, pinokByWLink, pinfunc=None): # -> []event(str)
return # cancel is expected after f completes return # cancel is expected after f completes
reraise(e, None, e.__traceback__) reraise(e, None, e.__traceback__)
assert False, "extra pin message received: %r" % req.msg fail("extra pin message received: %r" % req.msg)
wg.go(_, wlink, zf, pinok) wg.go(_, wlink, zf, pinok)
def _(ctx): def _(ctx):
...@@ -1266,9 +1271,8 @@ def test_wcfs_basic(): ...@@ -1266,9 +1271,8 @@ def test_wcfs_basic():
# verify how wcfs processes ZODB invalidations when hole becomes a block with data. # verify how wcfs processes ZODB invalidations when hole becomes a block with data.
# XXX currently fails (needs δbtree)
# TODO merge into test_wcfs_basic & watch tests after δbtree is done # TODO merge into test_wcfs_basic & watch tests after δbtree is done
@mark.xfail @xfail # δbtree
@func @func
def test_wcfs_basic_hole2zblk(): def test_wcfs_basic_hole2zblk():
t = tDB(); zf = t.zfile t = tDB(); zf = t.zfile
...@@ -1354,37 +1358,52 @@ def test_wcfs_watch_robust(): ...@@ -1354,37 +1358,52 @@ def test_wcfs_watch_robust():
# verify that wcfs kills slow/faulty client who does not reply to pin in time. # verify that wcfs kills slow/faulty client who does not reply to pin in time.
# XXX place -> after normal tests? (yes, seems better - together with watch_robust) @xfail # protection against faulty/slow clients
@func @func
def test_wcfs_pintimeout_kill(): def test_wcfs_pintimeout_kill():
t = tDB(); zf = t.zfile # XXX wcfs arg: lower kill timeout # adjusted wcfs timeout to kill client who is stuck not providing pin reply
tkill = 3*time.second
t = tDB(); zf = t.zfile # XXX wcfs args += tkill
defer(t.close) defer(t.close)
# XXX move into subprocess not to kill whole testing
at1 = t.commit(zf, {2:'c1'}) at1 = t.commit(zf, {2:'c1'})
at2 = t.commit(zf, {2:'c2'}) at2 = t.commit(zf, {2:'c2'})
f = t.open(zf) f = t.open(zf)
f.assertData(['','','c2']) f.assertData(['','','c2'])
# XXX move into subprocess not to kill whole testing
ctx, _ = context.with_timeout(context.background(), 2*tkill)
wl = t.openwatch() wl = t.openwatch()
wg = sync.WorkGroup(XXX) wg = sync.WorkGroup(ctx)
def _(ctx): def _(ctx):
# XXX assert? # send watch. The pin handler won't be replying -> we should never get reply here.
wl.sendReq(ctx, b"watch %s @%s" % (h(zf._p_oid), h(at1))) rxq = wl._sendReq(ctx, b"watch %s @%s" % (h(zf._p_oid), h(at1)))
_, _rx = select(
ctx.done().recv, # 0
rxq.recv, # 1
)
if _ == 0:
raise ctx.err()
fail("watch request completed (should not as pin handler is stuck)")
wg.go(_) wg.go(_)
def _(ctx): def _(ctx):
req = wl.recvReq(ctx) req = wl.recvReq(ctx)
assert req is not None assert req is not None
assert req.msg == b"pin %s #%d @%s" % (h(zf._p_oid), 2, h(at1)) assert req.msg == b"pin %s #%d @%s" % (h(zf._p_oid), 2, h(at1))
# XXX sleep > wcfs pin timeout - wcfs must kill us # sleep > wcfs pin timeout - wcfs must kill us
assert False, "wcfs did not killed stuck client" _, _rx = select(
ctx.done().recv, # 0
time.after(tkill).recv, # 1
)
if _ == 0:
raise ctx.err()
fail("wcfs did not killed stuck client")
wg.go(_) wg.go(_)
wg.wait() wg.wait()
# verify that watch setup/update sends correct pins. # verify that watch setup/update sends correct pins.
@func @func
def test_wcfs_watch_setup(): def test_wcfs_watch_setup():
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment