wcfs_test.py 68 KB
Newer Older
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1
# -*- coding: utf-8 -*-
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2 3
# Copyright (C) 2018-2019  Nexedi SA and Contributors.
#                          Kirill Smelkov <kirr@nexedi.com>
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your
# option) any later version, as published by the Free Software Foundation.
#
# You can also Link and Combine this program with other software covered by
# the terms of any of the Free Software licenses or any of the Open Source
# Initiative approved licenses and Convey the resulting work. Corresponding
# source of such a combination shall include the source code for all other
# software used.
#
# This program is distributed WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See COPYING file for full licensing terms.
# See https://www.nexedi.com/licensing for rationale and options.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
20 21
"""wcfs_test tests wcfs filesystem from outside as python client process

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
22
It also unit-tests wcfs.py virtmem-level integration.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
23 24 25 26

At functional level, the whole wendelin.core test suite is used to verify
wcfs.py/wcfs.go while running tox tests in wcfs mode.
"""
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
27

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
28
from __future__ import print_function, absolute_import
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
29

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
30
from wendelin.lib.testing import getTestDB
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
31
from wendelin.lib.zodb import dbclose
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
32
from wendelin.lib.mem import memcpy
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
33 34
from wendelin.bigfile.file_zodb import ZBigFile
from wendelin.bigfile.tests.test_filezodb import blksize
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
35
from wendelin import wcfs
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
36

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
37
import transaction
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
38 39
from persistent import Persistent
from persistent.timestamp import TimeStamp
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
40
from ZODB.utils import z64, u64, p64
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
41

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
42
import sys, os, os.path, subprocess, inspect, traceback
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
43
from thread import get_ident as gettid
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
44
from time import gmtime
45 46
from errno import EINVAL, ENOENT, ENOTCONN
from stat import S_ISDIR
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
47
from signal import SIGQUIT, SIGKILL
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
48
from golang import go, chan, select, func, defer, default
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
49
from golang import context, sync, time
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
50
from zodbtools.util import ashex as h, fromhex
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
51 52
import pytest; xfail = pytest.mark.xfail
from pytest import raises, fail
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
53
from six import reraise
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
54
from .internal import io, mm
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
55
from .internal.wcfs_test import read_nogil, install_sigbus_trap, fadvise_dontneed
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
56

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
57 58
# XXX `py.test -v` -> WENDELIN_CORE_WCFS_OPTIONS += -v=1?

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
59 60 61 62
# setup:
# - create test database, compute zurl and mountpoint for wcfs
# - at every test: make sure wcfs is not running before & after the test.

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
63
testdb = None
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
64 65
testzurl = None     # URL of testdb
testmntpt = None    # wcfs is mounted here
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
66
def setup_module():
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
67 68
    # if wcfs.py receives SIGBUS because wcfs.go panics while serving mmap'ed
    # read, we want to see python-level traceback instead of being killed.
69 70
    install_sigbus_trap()

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
71
    # if wcfs.go is built with race detector and detects a race - make it fail
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
72
    # current test loudly on the first wcfs.go race.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
73 74 75 76
    gorace = os.environ.get("GORACE", "")
    if gorace != "":
        gorace += " "
    os.environ["GORACE"] = gorace + "halt_on_error=1"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
77

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
78
    global testdb, testzurl, testmntpt
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
79 80 81
    testdb = getTestDB()
    testdb.setup()

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
82
    zstor = testdb.getZODBStorage()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
83
    testzurl = wcfs.zstor_2zurl(zstor)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
84 85 86 87
    zstor.close()
    testmntpt = wcfs._mntpt_4zurl(testzurl)
    os.rmdir(testmntpt)

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
88 89 90
def teardown_module():
    testdb.teardown()

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
91
# make sure we start every test without wcfs server running.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
92 93 94
def setup_function(f):
    assert not os.path.exists(testmntpt)

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
95
# make sure we unmount wcfs after every test.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
96
# (tDB checks this in more detail, but join tests don't use tDB)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
97
def teardown_function(f):
98
    mounted = is_mountpoint(testmntpt)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
99 100 101 102 103 104
    if mounted:
        subprocess.check_call(["fusermount", "-u", testmntpt])
    if os.path.exists(testmntpt):
        os.rmdir(testmntpt)


Kirill Smelkov's avatar
.  
Kirill Smelkov committed
105
# ---- test join/autostart ----
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
106

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
107
# test that zurl does not change from one open to another storage open.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
108 109 110
def test_zurlstable():
    for i in range(10):
        zstor = testdb.getZODBStorage()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
111
        zurl  = wcfs.zstor_2zurl(zstor)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
112 113 114
        zstor.close()
        assert zurl == testzurl

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
115
# test that join works.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
116
@func
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
117
def test_join():
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
118
    zurl = testzurl
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
119 120 121
    with raises(RuntimeError, match="wcfs: join .*: server not started"):
        wcfs.join(zurl, autostart=False)

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
122 123 124 125 126
    assert wcfs._wcregistry == {}
    def _():
        assert wcfs._wcregistry == {}
    defer(_)

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
127
    wc = wcfs._start(zurl)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
128
    defer(wc.close)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
129
    assert wc.mountpoint == testmntpt
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
130
    assert wc._njoin == 1
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
131
    assert readfile(wc.mountpoint + "/.wcfs/zurl") == zurl
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
132 133
    assert os.path.isdir(wc.mountpoint + "/head")
    assert os.path.isdir(wc.mountpoint + "/head/bigfile")
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
134

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
135
    wc2 = wcfs.join(zurl, autostart=False)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
136
    defer(wc2.close)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
137 138
    assert wc2 is wc
    assert wc._njoin == 2
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
139

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
140
# test that join(autostart=y) works.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
141
@func
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
142
def test_join_autostart():
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
143 144 145 146
    zurl = testzurl
    with raises(RuntimeError, match="wcfs: join .*: server not started"):
        wcfs.join(zurl, autostart=False)

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
147 148 149 150 151
    assert wcfs._wcregistry == {}
    def _():
        assert wcfs._wcregistry == {}
    defer(_)

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
152
    wc = wcfs.join(zurl, autostart=True)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
153
    defer(wc.close)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
154
    assert wc.mountpoint == testmntpt
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
155
    assert wc._njoin == 1
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
156
    assert readfile(wc.mountpoint + "/.wcfs/zurl") == zurl
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
157 158
    assert os.path.isdir(wc.mountpoint + "/head")
    assert os.path.isdir(wc.mountpoint + "/head/bigfile")
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
159

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
160

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
161
# ---- infrastructure for data access tests ----
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
162 163 164
#
# Testing infrastructure consists of tDB, tFile, tWatch and tWatchLink that
# jointly organize wcfs behaviour testing. See individual classes for details.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
165

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
166
# many tests need to be run with some reasonable timeout to detect lack of wcfs
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
167
# response. with_timeout and timeout provide syntactic shortcuts to do so.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
168 169 170 171 172 173 174
def with_timeout(parent=context.background()):  # -> ctx, cancel
    return context.with_timeout(parent, 3*time.second)

def timeout(parent=context.background()):   # -> ctx
    ctx, _ = with_timeout()
    return ctx

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
175 176
# tdelay is used in places where we need to delay a bit in order to e.g. raise
# probability of a bug due to race condition.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
177
def tdelay():
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
178 179
    time.sleep(10*time.millisecond)

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
180

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
181 182
# DF represents a change in files space.
# it corresponds to ΔF in wcfs.go .
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
183 184 185 186 187 188 189
class DF:
    # .rev      tid
    # .byfile   {} ZBigFile -> DFile
    def __init__(dF):
        # rev set from outside
        dF.byfile = {}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
190 191
# DFile represents a change to one file.
# it is is similar to ΔFile in wcfs.go .
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
192 193
class DFile:
    # .rev      tid
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
194
    # .ddata    {} blk -> data
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
195 196 197 198
    def __init__(dfile):
        # rev set from outside
        dfile.ddata = {}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
199 200
# tDB provides database/wcfs testing environment.
#
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
201
# Database root and wcfs connection are represented by .root and .wc correspondingly.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
202
# The database is initialized with one ZBigFile created and opened via ZODB connection as .zfile .
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
203
#
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
204
# The primary way to access wcfs is by opening BigFiles and WatchLinks.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
205 206
# A BigFile   opened under tDB is represented as tFile      - see .open for details.
# A WatchLink opened under tDB is represented as tWatchLink - see .openwatch for details.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
207
#
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
208
# The database can be mutated (via !wcfs codepath) with .change + .commit .
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
209 210
# Current database head is represented by .head .
# The history of the changes is kept in .dFtail .
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
211
# There are various helpers to query history (_blkDataAt, _pinnedAt, .iter_revv, ...)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
212
#
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
213
# tDB must be explicitly closed once no longer used.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
214
#
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
215
# XXX print -> t.trace/debug() + t.verbose depending on py.test -v -v ?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
216
class tDB:
217
    @func
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
218
    def __init__(t):
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
219
        t.root = testdb.dbopen()
220 221 222 223 224 225
        def _(): # close/unlock db if __init__ fails
            exc = sys.exc_info()[1]
            if exc is not None:
                dbclose(t.root)
        defer(_)

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
226
        assert not os.path.exists(testmntpt)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
227
        t.wc = wcfs.join(testzurl, autostart=True)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
228
        assert os.path.exists(testmntpt)
229
        assert is_mountpoint(testmntpt)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
230

231 232 233 234 235 236
        # force-unmount wcfs on timeout to unstuck current test and let it fail.
        # Force-unmount can be done reliably only by writing into
        # /sys/fs/fuse/connections/<X>/abort. For everything else there are
        # cases, when wcfs, even after receiving `kill -9`, will be stuck in kernel.
        # ( git.kernel.org/linus/a131de0a482a makes in-kernel FUSE client to
        #   still wait for request completion even after fatal signal )
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
237 238 239
        t._closed        = chan()
        t._wcfuseaborted = chan()
        t._wcfuseabort   = open("/sys/fs/fuse/connections/%d/abort" % os.stat(testmntpt).st_dev, "w")
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
240
        go(t._abort_ontimeout, 10*time.second)   # NOTE must be >> with_timeout
241

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
242 243 244
        # ZBigFile(s) scheduled for commit
        t._changed = {} # ZBigFile -> {} blk -> data

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
245 246
        # committed: (tail, head] + δF history
        t.tail   = t.root._p_jar.db().storage.lastTransaction()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
247
        t.dFtail = [] # of DF; head = dFtail[-1].rev
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
248 249 250 251 252

        # fh(.wcfs/zhead) + history of zhead read from there
        t._wc_zheadfh = open(t.wc.mountpoint + "/.wcfs/zhead")
        t._wc_zheadv  = []

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
253
        # whether head/ ZBigFile(s) blocks were ever accessed via wcfs.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
254
        # this is updated only explicitly via ._blkheadaccess() .
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
255
        t._blkaccessedViaHead = {} # ZBigFile -> set(blk)   XXX ZF -> foid ? (threads)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
256

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
257
        # tracked opened tFiles & tWatchLinks
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
258 259
        t._files    = set()
        t._wlinks   = set()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
260

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
261 262 263 264 265
        # ID of the thread which created tDB
        # ( transaction plays dirty games with threading.local and we have to
        #   check the thread is the same when .root is used )
        t._maintid = gettid()

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
266
        # prepare initial objects for test: zfile, nonzfile
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
267 268 269 270
        t.root['!file'] = t.nonzfile  = Persistent()
        t.root['zfile'] = t.zfile     = ZBigFile(blksize)
        t.at0 = t.commit()

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
271 272 273 274
    @property
    def head(t):
        return t.dFtail[-1].rev

275 276

    def _abort_ontimeout(t, dt):
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
277 278 279 280
        # XXX better run whole this function withou GIL - if a code that is
        # holding GIL will access wcfs-mmapped memory, and wcfs will send pin,
        # but pin handler is failing one way or another - select will wake-up
        # but won't continue to run trying to lock GIL -> deadlock.
281 282 283 284 285 286 287 288
        _, _rx = select(
            time.after(dt).recv,    # 0
            t._closed.recv,         # 1
        )
        if _ == 1:
            return  # tDB closed = testcase completed

        # timeout -> force-umount wcfs
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
289 290
        eprint("\nC: test timed out after %.1fs" % (dt / time.second))
        eprint("-> aborting wcfs fuse connection to unblock ...\n")
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
291 292
        t._wcfuseabort.write(b"1\n")
        t._wcfuseabort.flush()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
293
        t._wcfuseaborted.close()
294

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
295
    # close closes test database as well as all tracked files, watch links and wcfs.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
296 297
    # it also prints change history to help developer overview current testcase.
    @func
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
298
    def close(t):
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
299
        defer(t._wcfuseabort.close)
300
        defer(t._closed.close)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
301
        defer(lambda: dbclose(t.root))
302

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
303 304
        # unmount and wait for wcfs to exit
        def _():
305
            assert not is_mountpoint(testmntpt)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
306 307
            os.rmdir(testmntpt)
        defer(_)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
308 309 310 311
        def _():
            # kill wcfs.go in case it is deadlocked and does not exit by itself
            if procwait_(timeout(), t.wc._proc):
                return
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
312 313
            eprint("\nC: wcfs.go does not exit")
            eprint("-> kill -QUIT wcfs.go ...\n")
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
314 315 316 317
            os.kill(t.wc._proc.pid, SIGQUIT)

            if procwait_(timeout(), t.wc._proc):
                return
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
318 319
            eprint("\nC: wcfs.go does not exit (after SIGQUIT)")
            eprint("-> kill -KILL wcfs.go ...\n")
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
320 321 322 323
            os.kill(t.wc._proc.pid, SIGKILL)

            if procwait_(timeout(), t.wc._proc):
                return
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
324 325
            eprint("\nC: wcfs.go does not exit (after SIGKILL; probably it is stuck in kernel)")
            eprint("-> nothing we can do...\n")  # XXX dump /proc/pid/task/*/stack instead  (ignore EPERM)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
326 327 328
            fail("wcfs.go does not exit even after SIGKILL")
        defer(_)
        def _():
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
329
            #if not ready(t._wcfuseaborted):    XXX kill _wcfuseaborted ?
330 331
            #    assert 0 == subprocess.call(["mountpoint", "-q", testmntpt])
            assert is_mountpoint(testmntpt)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
332 333
            subprocess.check_call(["fusermount", "-u", testmntpt])
        defer(_)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
334

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
335
        defer(t.dump_history)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
336
        for tf in t._files.copy():
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
337
            tf.close()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
338 339
        for tw in t._wlinks.copy():
            tw.close()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
340
        assert len(t._files)   == 0
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
341
        assert len(t._wlinks)  == 0
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
342 343
        t._wc_zheadfh.close()
        t.wc.close()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
344

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
345 346 347 348 349 350
    # open opens wcfs file corresponding to zf@at and starts to track it.
    # see returned tFile for details.
    def open(t, zf, at=None):   # -> tFile
        return tFile(t, zf, at=at)

    # openwatch opens /head/watch on wcfs.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
351 352 353
    # see returned tWatchLink for details.
    def openwatch(t):   # -> tWatchLink
        return tWatchLink(t)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
354

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
355
    # change schedules zf to be changed according to changeDelta at commit.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
356
    #
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
357
    # changeDelta: {} blk -> data.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
358
    # data can be both bytes and unicode.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
359 360 361 362
    def change(t, zf, changeDelta):
        assert isinstance(zf, ZBigFile)
        zfDelta = t._changed.setdefault(zf, {})
        for blk, data in changeDelta.iteritems():
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
363 364
            if not isinstance(data, bytes):
                data = data.encode('utf-8')
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
365 366
            assert len(data) <= zf.blksize
            zfDelta[blk] = data
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
367

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
368 369
    # commit commits transaction and makes sure wcfs is synchronized to it.
    #
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
370
    # It updates .dFtail and returns committed transaction ID.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
371 372 373
    #
    # zf and changeDelta can be optionally provided, in which case .change(zf,
    # changeDelta) call is made before actually committing.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
374
    def commit(t, zf=None, changeDelta=None):   # -> tAt
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
375 376 377 378
        if zf is not None:
            assert changeDelta is not None
            t.change(zf, changeDelta)

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
379 380
        # perform modifications scheduled by change.
        # use !wcfs mode so that we prepare data independently of wcfs code paths.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
381
        dF = DF()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
382
        zconns = set()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
383
        for zf, zfDelta in t._changed.items():
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
384
            dfile = DFile()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
385
            zconns.add(zf._p_jar)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
386 387
            zfh = zf.fileh_open(_use_wcfs=False)
            for blk, data in zfDelta.iteritems():
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
388
                dfile.ddata[blk] = data
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
389 390 391
                data += b'\0'*(zf.blksize - len(data))  # trailing \0
                vma = zfh.mmap(blk, 1)
                memcpy(vma, data)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
392
            dF.byfile[zf] = dfile
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
393

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
394
        # verify that all changed objects come from the same ZODB connection
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
395 396 397 398 399 400 401 402 403
        assert len(zconns) in (0, 1)    # either nothing to commit or all from the same zconn
        if len(zconns) == 1:
            zconn = zconns.pop()
            root = zconn.root()
        else:
            # no objects to commit
            root = t.root
            assert gettid() == t._maintid

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
404 405
        # perform the commit. NOTE there is no clean way to retrieve tid of
        # just committed transaction - we use last._p_serial as workaround.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
406
        root['_last'] = last = Persistent()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
407 408
        last._p_changed = 1
        transaction.commit()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
409
        head = tAt(t, last._p_serial)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
410

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
411 412 413 414
        dF.rev = head
        for dfile in dF.byfile.values():
            dfile.rev = head
        t.dFtail.append(dF)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
415
        assert t.head == head   # self-check
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
416

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
417
        print('\nM: commit -> %s' % head)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
418 419 420 421
        for zf, zfDelta in t._changed.items():
            print('M:      f<%s>\t%s' % (h(zf._p_oid), sorted(zfDelta.keys())))
        t._changed = {}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
422
        # synchronize wcfs to db, and we are done
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
423
        t._wcsync()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
424 425
        return head

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
426 427
    # _wcsync makes sure wcfs is synchronized to latest committed transaction.
    def _wcsync(t):
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
428
        while len(t._wc_zheadv) < len(t.dFtail):
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
429
            l = t._wc_zheadfh.readline()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
430
            #print('> zhead read: %r' % l)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
431
            l = l.rstrip('\n')
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
432
            wchead = tAt(t, fromhex(l))
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
433
            i = len(t._wc_zheadv)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
434
            if wchead != t.dFtail[i].rev:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
435
                raise RuntimeError("wcsync #%d: wczhead (%s) != zhead (%s)" % (i, wchead, t.dFtail[i].rev))
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
436 437
            t._wc_zheadv.append(wchead)

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
438
        # head/at = last txn of whole db
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
439
        assert t.wc._read("head/at") == h(t.head)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
440

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
441 442
    # _blkheadaccess marks head/zf[blk] accessed.
    def _blkheadaccess(t, zf, blk):
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
443
        # XXX locking needed? or we do everything serially?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
444
        t._blkaccessed(zf).add(blk)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
445

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
446 447 448
    # _blkaccessed returns set describing whether head/zf blocks were ever accessed.
    def _blkaccessed(t, zf): # set(blk)
        return t._blkaccessedViaHead.setdefault(zf, set())
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
449

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
450

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
451
# tFile provides testing environment for one bigfile opened on wcfs.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
452
#
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
453
# ._blk() provides access to data of a block. .cached() gives state of which
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
454
# blocks are in OS pagecache. .assertCache and .assertBlk/.assertData assert
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
455
# on state of cache and data.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
456
class tFile:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
457 458
    # maximum number of pages we mmap for 1 file.
    # this should be not big not to exceed mlock limit.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
459
    _max_tracked_pages = 8
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
460

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
461 462 463
    def __init__(t, tdb, zf, at=None):
        assert isinstance(zf, ZBigFile)
        t.tdb = tdb
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
464 465
        t.zf  = zf
        t.at  = at
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
466
        t.f   = tdb.wc._open(zf, at=at)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
467 468
        t.blksize = zf.blksize

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
469 470 471 472 473
        # make sure that wcfs reports zf.blksize as preffered block size for IO.
        # wcfs.py also uses .st_blksize in blk -> byte offset computation.
        st = os.fstat(t.f.fileno())
        assert st.st_blksize == t.blksize

474 475 476
        # mmap the file past the end up to _max_tracked_pages and setup
        # invariants on which we rely to verify OS cache state:
        #
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
477
        # 1. lock pages with MLOCK_ONFAULT: this way after a page is read by
478 479 480
        #    mmap access we have the guarantee from kernel that the page will
        #    stay in pagecache.
        #
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
481
        # 2. madvise memory with MADV_SEQUENTIAL and MADV_RANDOM in interleaved
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
482 483 484 485
        #    mode. This adjusts kernel readahead (which triggers for
        #    MADV_NORMAL or MADV_SEQUENTIAL vma) to not go over to next block
        #    and thus a read access to one block won't trigger implicit read
        #    access to its neighbour block.
486 487 488 489
        #
        #      https://www.quora.com/What-heuristics-does-the-adaptive-readahead-implementation-in-the-Linux-kernel-use
        #      https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/mm/madvise.c?h=v5.2-rc4#n51
        #
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
490 491 492 493 494 495 496 497
        #    we don't use MADV_NORMAL instead of MADV_SEQUENTIAL, because for
        #    MADV_NORMAL, there is not only read-ahead, but also read-around,
        #    which might result in accessing previous block.
        #
        #    we don't disable readahead universally, since enabled readahead
        #    helps to test how wcfs handles simultaneous read triggered by
        #    async kernel readahead vs wcfs uploading data for the same block
        #    into OS cache. Also, fully enabled readahead is how wcfs is
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
498
        #    actually used in practice.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
499
        assert t.blksize % mm.PAGE_SIZE == 0
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
500
        t.fmmap = mm.map_ro(t.f.fileno(), 0, t._max_tracked_pages*t.blksize)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
501

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
502
        mm.lock(t.fmmap, mm.MLOCK_ONFAULT)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
503

504 505
        for blk in range(t._max_tracked_pages):
            blkmmap = t.fmmap[blk*t.blksize:(blk+1)*t.blksize]
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
506
            # NOTE the kernel does not start readahead from access to
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
507
            # MADV_RANDOM vma, but for a MADV_{NORMAL/SEQUENTIAL} vma it starts
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
508
            # readahead which can go _beyond_ vma that was used to decide RA
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
509 510 511 512
            # start. For this reason - to prevent RA started at one block to
            # overlap with the next block, we put MADV_RANDOM vma at the end of
            # every block covering last 1/8 of it.
            # XXX implicit assumption that RA window is < 1/8·blksize
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
513 514 515 516 517 518 519 520
            #
            # NOTE with a block completely covered by MADV_RANDOM the kernel
            # issues 4K sized reads; wcfs starts uploading into cache almost
            # immediately, but the kernel still issues many reads to read the
            # full 2MB of the block. This works slow.
            # XXX -> investigate and maybe make read(while-uploading) wait for
            # uploading to complete and only then return? (maybe it will help
            # performance even in normal case)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
521 522 523
            _ = len(blkmmap)*7//8
            mm.advise(blkmmap[:_], mm.MADV_SEQUENTIAL)
            mm.advise(blkmmap[_:], mm.MADV_RANDOM)
524

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
525
        tdb._files.add(t)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
526

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
527
    def close(t):
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
528
        t.tdb._files.remove(t)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
529
        mm.unmap(t.fmmap)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
530 531
        t.f.close()

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
532
    # _blk returns memoryview of file[blk].
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
533
    # when/if block memory is accessed, the user has to notify tFile with _blkaccess call.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
534
    def _blk(t, blk):
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
535
        assert blk <= t._max_tracked_pages
536
        return memoryview(t.fmmap[blk*t.blksize:(blk+1)*t.blksize])
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
537

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
538 539 540 541
    def _blkaccess(t, blk):
        if t.at is None:    # notify tDB only for head/file access
            t.tdb._blkheadaccess(t.zf, blk)

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
542
    # cached returns [] with indicating whether a file block is cached or not.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
543 544
    # 1 - cached, 0 - not cached, fractional (0,1) - some pages of the block are cached some not.
    def cached(t):
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
545
        l = t._sizeinblk()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
546 547 548 549 550 551 552 553 554 555 556 557 558 559
        incorev = mm.incore(t.fmmap[:l*t.blksize])
        # incorev is in pages; convert to in blocks
        assert t.blksize % mm.PAGE_SIZE == 0
        blkpages = t.blksize // mm.PAGE_SIZE
        cachev = [0.]*l
        for i, v in enumerate(incorev):
            blk = i // blkpages
            cachev[blk] += bool(v)
        for blk in range(l):
            cachev[blk] /= blkpages
            if cachev[blk] == int(cachev[blk]):
                cachev[blk] = int(cachev[blk])  # 0.0 -> 0, 1.0 -> 1
        return cachev

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
560 561 562
    # _sizeinblk returns file size in blocks.
    def _sizeinblk(t):
        st = os.fstat(t.f.fileno())
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
563
        assert st.st_blksize == t.blksize   # just in case
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
564
        assert st.st_size % t.blksize == 0
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
565
        assert st.st_size // t.blksize <= t._max_tracked_pages
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
566 567
        return st.st_size // t.blksize

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
568
    # assertCache asserts on state of OS cache for file.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
569 570 571 572 573
    #
    # incorev is [] of 1/0 representing whether block data is present or not.
    def assertCache(t, incorev):
        assert t.cached() == incorev

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
574
    # assertBlk asserts that file[blk] has data as expected.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
575 576
    #
    # Expected data may be given with size < t.blksize. In such case the data
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
577
    # is implicitly appended with trailing zeros. Data can be both bytes and unicode.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
578
    #
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
579 580 581
    # It also checks that file watches are properly notified on data access -
    # - see "7.2) for all registered client@at watchers ..."
    #
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
582
    # pinokByWLink: {} tWatchLink -> {} blk -> at.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
583
    # pinokByWLink can be omitted - in that case it is computed only automatically.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
584 585 586
    #
    # The automatic computation of pinokByWLink is verified against explicitly
    # provided pinokByWLink when it is present.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
587
    @func
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
588
    def assertBlk(t, blk, dataok, pinokByWLink=None):
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
589 590 591 592 593 594 595 596 597
        # XXX -> assertCtx('blk #%d' % blk)
        def _():
            assertCtx = 'blk #%d' % blk
            _, e, _ = sys.exc_info()
            if isinstance(e, AssertionError):
                assert len(e.args) == 1 # pytest puts everything as args[0]
                e.args = (assertCtx + "\n" + e.args[0],)
        defer(_)

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
598 599
        if not isinstance(dataok, bytes):
            dataok = dataok.encode('utf-8')
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
600
        blkdata, _ = t.tdb._blkDataAt(t.zf, blk, t.at)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
601
        assert blkdata == dataok, "computed vs explicit data"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
602 603 604 605 606
        t._assertBlk(blk, dataok, pinokByWLink)

    @func
    def _assertBlk(t, blk, dataok, pinokByWLink=None, pinfunc=None):
        assert len(dataok) <= t.blksize
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
607
        dataok += b'\0'*(t.blksize - len(dataok))   # tailing zeros
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
608
        assert blk < t._sizeinblk()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
609

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
610 611 612 613
        # access to this block must not trigger access to other blocks
        incore_before = t.cached()
        def _():
            incore_after = t.cached()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
614 615
            incore_before[blk] = 'x'
            incore_after [blk] = 'x'
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
616 617 618
            assert incore_before == incore_after
        defer(_)

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
619
        cached = t.cached()[blk]
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
620
        assert cached in (0, 1) # every check accesses a block in full
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
621
        shouldPin = False       # whether at least one wlink should receive a pin
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
622

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
623
        # watches that must be notified if access goes to @head/file
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
624
        wpin = {}   # tWatchLink -> pinok
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
625
        blkrev = t.tdb._blkRevAt(t.zf, blk, t.at)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
626 627 628
        if t.at is None: # @head/...
            for wlink in t.tdb._wlinks:
                pinok = {}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
629
                w = wlink._watching.get(t.zf._p_oid)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
630
                if w is not None and w.at < blkrev:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
631
                    if cached == 1:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
632 633 634
                        # @head[blk].rev is after w.at - w[blk] must be already pinned
                        assert blk in w.pinned
                        assert w.pinned[blk] <= w.at
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
635
                    else:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
636
                        assert cached == 0
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
637 638 639
                        # even if @head[blk] is uncached, the block could be
                        # already pinned by setup watch
                        if blk not in w.pinned:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
640
                            pinok = {blk: t.tdb._blkRevAt(t.zf, blk, w.at)}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
641
                            shouldPin = True
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
642
                wpin[wlink] = pinok
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
643 644 645 646 647

        if pinokByWLink is not None:
            assert wpin == pinokByWLink, "computed vs explicit pinokByWLink"
        pinokByWLink = wpin

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
648
        # doCheckingPin expects every wlink entry to also contain zf
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
649 650 651
        for wlink, pinok in pinokByWLink.items():
            pinokByWLink[wlink] = (t.zf, pinok)

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
652
        # access 1 byte on the block and verify that wcfs sends us correct pins
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
653
        blkview = t._blk(blk)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
654
        assert t.cached()[blk] == cached
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
655 656

        def _(ctx, ev):
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
657
            assert t.cached()[blk] == cached
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
658 659 660
            ev.append('read pre')

            # access data with released GIL so that the thread that reads data from
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
661
            # head/watch can receive pin message. Be careful to handle cancellation,
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
662 663
            # so that on error in another worker we don't get stuck and the
            # error can be propagated to wait and reported.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
664 665
            have_read = chan(1)
            def _():
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
666
                b = read_nogil(blkview[0:1])
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
667
                t._blkaccess(blk)
Kirill Smelkov's avatar
Kirill Smelkov committed
668
                have_read.send(b)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
669 670 671 672 673 674 675 676 677
            go(_)
            _, _rx = select(
                ctx.done().recv,    # 0
                have_read.recv,     # 1
            )
            if _ == 0:
                raise ctx.err()
            b = _rx

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
678
            ev.append('read ' + b)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
679
        ev = doCheckingPin(_, pinokByWLink, pinfunc)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
680 681

        # XXX hack - wlinks are notified and emit events simultaneously - we
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
682
        # check only that events begin and end with read pre/post and that pins
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
683 684
        # are inside (i.e. read is stuck until pins are acknowledged).
        # Better do explicit check in tracetest style.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
685 686
        assert ev[0]  == 'read pre', ev
        assert ev[-1] == 'read ' + dataok[0], ev
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
687
        ev = ev[1:-1]
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
688 689 690 691 692
        if not shouldPin:
            assert ev == []
        else:
            assert 'pin rx' in ev
            assert 'pin ack pre' in ev
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
693

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
694
        assert t.cached()[blk] > 0
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
695

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
696
        # verify full data of the block
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
697
        # XXX assert individually for every block's page? (easier debugging?)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
698
        assert blkview.tobytes() == dataok
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
699

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
700
        # we just accessed the block in full - it has to be in OS cache completely
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
701 702
        assert t.cached()[blk] == 1

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
703

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
704
    # assertData asserts that file has data blocks as specified.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
705 706
    #
    # Expected blocks may be given with size < zf.blksize. In such case they
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
707
    # are implicitly appended with trailing zeros. If a block is specified as
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
708
    # 'x' - this particular block is not accessed and is not checked.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
709
    #
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
710
    # The file size and optionally mtime are also verified.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
711
    def assertData(t, dataokv, mtime=None):
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
712
        st = os.fstat(t.f.fileno())
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
713
        assert st.st_blksize == t.blksize
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
714
        assert st.st_size == len(dataokv)*t.blksize
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
715 716 717
        if mtime is not None:
            assert st.st_mtime == tidtime(mtime)

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
718
        cachev = t.cached()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
719
        for blk, dataok in enumerate(dataokv):
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
720 721
            if dataok == 'x':
                continue
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
722
            t.assertBlk(blk, dataok)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
723
            cachev[blk] = 1
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
724

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
725 726
        # all accessed blocks must be in cache after we touched them all
        t.assertCache(cachev)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
727 728


Kirill Smelkov's avatar
.  
Kirill Smelkov committed
729 730
# tWatch represents watch for one file setup on a tWatchLink.
class tWatch:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
731 732
    def __init__(w, foid):
        w.foid   = foid
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
733
        w.at     = z64  # not None - always concrete
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
734 735
        w.pinned = {}   # blk -> rev

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
736
# tWatchLink provides testing environment for /head/watch link opened on wcfs.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
737
#
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
738
# .watch() setups/adjusts a watch for a file and verifies that wcfs correctly sends initial pins.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
739
class tWatchLink(wcfs.WatchLink):
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
740 741

    def __init__(t, tdb):
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
742
        super(tWatchLink, t).__init__(tdb.wc)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
743
        t.tdb = tdb
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
744
        tdb._wlinks.add(t)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
745

746 747
    def close(t):
        t.tdb._wlinks.remove(t)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
748
        super(tWatchLink, t).close()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
749

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
750
    # recvReq is the same as WatchLink.recvReq but returns tSrvReq instead of SrvReq.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
751
    def recvReq(t, ctx): # -> tSrvReq | None when EOF
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
752 753 754 755 756 757 758 759
        req = super(tWatchLink, t).recvReq(ctx)
        if req is not None:
            assert req.__class__ is wcfs.SrvReq
            req.__class__ = tSrvReq
        return req

class tSrvReq(wcfs.SrvReq):
    # _parse is the same as SrvReq._parse, but returns at wrapped with tAt.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
760
    def _parse(req): # -> (foid, blk, at|None)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
761 762 763
        foid, blk, at = super(tSrvReq, req)._parse()
        if at is not None:
            at = tAt(req.wlink.tdb, at)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
764 765 766
        return foid, blk, at


Kirill Smelkov's avatar
.  
Kirill Smelkov committed
767

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
768
# ---- infrastructure: watch setup/adjust ----
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
769

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
770 771
# watch sets up or adjusts a watch for file@at.
#
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
772
# During setup it verifies that wcfs sends correct initial/update pins.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
773
#
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
774
# pinok: {} blk -> rev
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
775 776 777 778
# pinok can be omitted - in that case it is computed automatically.
#
# The automatic computation of pinok is verified against explicitly provided
# pinok when it is present.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
779
@func(tWatchLink)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
780
def watch(twlink, zf, at, pinok=None): # -> tWatch
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
781
    foid = zf._p_oid
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
782
    t = twlink.tdb
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
783
    w = twlink._watching.get(foid)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
784
    if w is None:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
785
        w = twlink._watching[foid] = tWatch(foid)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
786 787 788
        at_prev = None
    else:
        at_prev = w.at  # we were previously watching zf @at_prev
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
789

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
790
    at_from = ''
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
791
    if at_prev is not None:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
792
        at_from = '(%s ->) ' % at_prev
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
793
    print('\nC: setup watch f<%s> %s%s' % (h(foid), at_from, at))
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
794

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
795 796
    accessed    = t._blkaccessed(zf)
    lastRevOf   = lambda blk: t._blkRevAt(zf, blk, t.head)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
797

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
798
    pin_prev = {}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
799
    if at_prev is not None:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
800
        assert at_prev <= at, 'TODO %s -> %s' % (at_prev, at)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
801
        pin_prev = t._pinnedAt(zf, at_prev)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
802
    assert w.pinned == pin_prev
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
803

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
804
    pin = t._pinnedAt(zf, at)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
805

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
806
    if at_prev != at and at_prev is not None:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
807
        print('# pin@old: %s\n# pin@new: %s' % (t.hpin(pin_prev), t.hpin(pin)))
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
808

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
809 810 811 812
    for blk in set(pin_prev.keys()).union(pin.keys()):
        # blk ∉ pin_prev,   blk ∉ pin       -> cannot happen
        assert (blk in pin_prev) or (blk in pin)

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
813
        # blk ∉ pin_prev,   blk ∈ pin       -> cannot happen, except on first start
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
814 815
        if blk not in pin_prev and blk in pin:
            if at_prev is not None:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
816
                fail('#%d pinned %s; not pinned %s' % (at_prev, at))
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
817

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
818
            # blk ∈ pin     -> blk is tracked; has rev > at
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
819
            # (see criteria in _pinnedAt)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
820
            assert blk in accessed
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
821
            assert at  <  lastRevOf(blk)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
822

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
823
        # blk ∈ pin_prev,   blk ∉ pin       -> unpin to head
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
824
        elif blk in pin_prev and blk not in pin:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
825
            # blk ∈ pin_prev -> blk is tracked; has rev > at_prev
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
826
            assert blk in accessed
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
827
            assert at_prev < lastRevOf(blk)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
828

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
829 830
            # blk ∉ pin      -> last blk revision is ≤ at
            assert lastRevOf(blk) <= at
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
831

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
832
            pin[blk] = None     # @head
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
833

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
834
        # blk ∈ pin_prev,   blk ∈ pin       -> if rev different: use pin
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
835
        elif blk in pin_prev and blk in pin:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
836
            # blk ∈ pin_prev, pin   -> blk is tracked; has rev > at_prev, at
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
837
            assert blk in accessed
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
838 839
            assert at_prev < lastRevOf(blk)
            assert at      < lastRevOf(blk)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
840

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
841 842 843
            assert pin_prev[blk] <= pin[blk]
            if pin_prev[blk] == pin[blk]:
                del pin[blk]    # would need to pin to what it is already pinned
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
844

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
845
    #print('-> %s' % t.hpin(pin))
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
846

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
847
    # {} blk -> at that have to be pinned.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
848
    if pinok is not None:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
849
        assert pin == pinok,    "computed vs explicit pinok"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
850
    pinok = pin
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
851
    print('#  pinok: %s' % t.hpin(pinok))
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
852

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
853 854
    # send watch request and check that we receive pins for tracked (previously
    # accessed at least once) blocks changed > at.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
855 856
    twlink._watch(zf, at, pinok, "ok")

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
857
    w.at = at
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
858 859

    # `watch ... -> at_i -> at_j`  must be the same as  `watch ø -> at_j`
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
860
    assert w.pinned == t._pinnedAt(zf, at)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
861

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
862
    return w
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
863

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
864 865 866 867

# stop_watch instructs wlink to stop watching the file.
@func(tWatchLink)
def stop_watch(twlink, zf):
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
868 869 870
    foid = zf._p_oid
    assert foid in twlink._watching
    w = twlink._watching.pop(foid)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
871 872 873 874 875 876

    twlink._watch(zf, b"-", {}, "ok")
    w.at = z64
    w.pinned = {}


Kirill Smelkov's avatar
.  
Kirill Smelkov committed
877
# _watch sends watch request for zf@at, expects initial pins specified by pinok and final reply.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
878
#
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
879 880
# at also can be b"-" which means "stop watching"
#
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
881
# pinok: {} blk -> at that have to be pinned.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
882
# if replyok ends with '…' only reply prefix until the dots is checked.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
883
@func(tWatchLink)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
884
def _watch(twlink, zf, at, pinok, replyok):
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
885 886 887 888 889
    if at == b"-":
        xat = at
    else:
        xat = b"@%s" % h(at)

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
890
    def _(ctx, ev):
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
891
        reply = twlink.sendReq(ctx, b"watch %s %s" % (h(zf._p_oid), xat))
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
892 893 894 895 896 897
        if replyok.endswith('…'):
            rok = replyok[:-len('…')]
            assert reply[:len(rok)] == rok
        else:
            assert reply == replyok

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
898
    doCheckingPin(_, {twlink: (zf, pinok)})
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
899

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
900 901 902 903

# doCheckingPin calls f and verifies that wcfs sends expected pins during the
# time f executes.
#
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
904
# f(ctx, eventv)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
905
# pinokByWLink: {} tWatchLink -> (zf, {} blk -> at).
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
906
# pinfunc(wlink, foid, blk, at) | None.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
907
#
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
908 909
# pinfunc is called after pin request is received from wcfs, but before pin ack
# is replied back. Pinfunc must not block.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
910
def doCheckingPin(f, pinokByWLink, pinfunc=None): # -> []event(str)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
911 912 913
    # call f and check that we receive pins as specified.
    # Use timeout to detect wcfs replying less pins than expected.
    #
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
914
    # XXX detect not sent pins via ack'ing previous pins as they come in (not
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
915
    # waiting for all of them) and then seeing that we did not received expected
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
916
    # pin when f completes?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
917 918
    ctx, cancel = with_timeout()
    wg = sync.WorkGroup(ctx)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
919
    ev = []
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
920

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
921
    for wlink, (zf, pinok) in pinokByWLink.items():
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
922
        def _(ctx, wlink, zf, pinok):
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
923
            w = wlink._watching.get(zf._p_oid)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
924 925 926
            if len(pinok) > 0:
                assert w is not None

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
927
            pinv = wlink._expectPin(ctx, zf, pinok)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
928 929 930 931 932 933 934 935 936 937 938
            if len(pinv) > 0:
                ev.append('pin rx')         # XXX + zf, pin details?

            # increase probability to receive erroneous extra pins
            tdelay()

            if len(pinv) > 0:
                if pinfunc is not None:
                    for p in pinv:
                        pinfunc(wlink, p.foid, p.blk, p.at)
                ev.append('pin ack pre')    # XXX +details?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
939
                for p in pinv:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
940
                    assert w.foid == p.foid
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
941
                    if p.at is None:    # unpin to @head
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
942
                        assert p.blk in w.pinned    # must have been pinned before
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
943 944 945 946
                        del w.pinned[p.blk]
                    else:
                        w.pinned[p.blk] = p.at

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
947 948
                    p.reply(b"ack")

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
949
            # check that we don't get extra pins before f completes
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
950 951 952 953
            try:
                req = wlink.recvReq(ctx)
            except Exception as e:
                if e is context.canceled:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
954
                    return # cancel is expected after f completes
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
955 956
                reraise(e, None, e.__traceback__)

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
957
            fail("extra pin message received: %r" % req.msg)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
958
        wg.go(_, wlink, zf, pinok)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
959 960

    def _(ctx):
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
961
        f(ctx, ev)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
962 963 964 965 966 967
        # cancel _expectPin waiting upon completing f
        # -> error that missed pins were not received.
        cancel()
    wg.go(_)

    wg.wait()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
968 969
    return ev

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
970

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
971 972 973 974
# _expectPin asserts that wcfs sends expected pin messages.
#
# expect is {} blk -> at
# returns [] of received pin requests.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
975
@func(tWatchLink)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
976
def _expectPin(twlink, ctx, zf, expect): # -> []SrvReq
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
977 978
    expected = set()    # of expected pin messages
    for blk, at in expect.items():
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
979 980
        hat = h(at) if at is not None else 'head'
        msg = b"pin %s #%d @%s" % (h(zf._p_oid), blk, hat)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
981 982 983 984 985 986
        assert msg not in expected
        expected.add(msg)

    reqv = []   # of received requests
    while len(expected) > 0:
        try:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
987
            req = twlink.recvReq(ctx)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
988
        except Exception as e:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
989
            raise RuntimeError("%s\nnot all pin messages received - pending:\n%s" % (e, expected))
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
990 991 992 993 994 995 996 997
        assert req is not None  # channel not closed
        assert req.msg in expected
        expected.remove(req.msg)
        reqv.append(req)

    return reqv


Kirill Smelkov's avatar
.  
Kirill Smelkov committed
998 999
# ---- infrastructure: helpers to query dFtail/accessed history ----

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1000
# _blkDataAt returns expected zf[blk] data and its revision as of @at database state.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1001 1002 1003 1004
#
# If the block is hole - (b'', at0) is returned.  XXX -> @z64?
# XXX ret for when the file did not existed at all? blk was after file size?
@func(tDB)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1005
def _blkDataAt(t, zf, blk, at): # -> (data, rev)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022
    if at is None:
        at = t.head

    # all changes to zf
    vdf = [_.byfile[zf] for _ in t.dFtail if zf in _.byfile]

    # changes to zf[blk] <= at
    blkhistoryat = [_ for _ in vdf if blk in _.ddata and _.rev <= at]
    if len(blkhistoryat) == 0:
        # blk did not existed @at       # XXX verify whether file was existing at all
        data = b''
        rev  = t.dFtail[0].rev  # was hole - at0    XXX -> pin to z64
    else:
        _ = blkhistoryat[-1]
        data = _.ddata[blk]
        rev  = _.rev

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1023
    assert rev <= at
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1024 1025
    return data, rev

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1026
# _blkRevAt returns expected zf[blk] revision as of @at database state.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1027
@func(tDB)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1028 1029
def _blkRevAt(t, zf, blk, at): # -> rev
    _, rev = t._blkDataAt(zf, blk, at)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1030 1031 1032
    return rev


Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1033
# _pinnedAt returns which blocks needs to be pinned for zf@at compared to zf@head
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1034 1035
# according to wcfs invalidation protocol.
#
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1036
# Criteria for when blk must be pinned as of @at view:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1037
#
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1038 1039 1040
#   blk ∈ pinned(at)   <=>   1) ∃ r = rev(blk): at < r  ; blk was changed after at
#                            2) blk ∈ tracked           ; blk was accessed at least once
#                                                       ; (and so is tracked by wcfs)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1041
@func(tDB)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1042
def _pinnedAt(t, zf, at):  # -> pin = {} blk -> rev
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1043 1044 1045 1046 1047 1048 1049 1050 1051
    # all changes to zf
    vdf = [_.byfile[zf] for _ in t.dFtail if zf in _.byfile]

    # {} blk -> at for changes ∈ (at, head]
    pin = {}
    for df in [_ for _ in vdf if _.rev > at]:
        for blk in df.ddata:
            if blk in pin:
                continue
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1052
            if blk in t._blkaccessed(zf):
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1053
                pin[blk] = t._blkRevAt(zf, blk, at)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1054 1055 1056 1057 1058 1059 1060 1061

    return pin

# iter_revv iterates through all possible at_i -> at_j -> at_k ... sequences.
# at_i < at_j       NOTE all sequences go till head.
@func(tDB)
def iter_revv(t, start=z64, level=0):
    dFtail = [_ for _ in t.dFtail if _.rev > start]
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1062
    #print(' '*level, 'iter_revv', start, [_.rev for _ in dFtail])
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1063 1064 1065 1066 1067
    if len(dFtail) == 0:
        yield []
        return

    for dF in dFtail:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1068
        #print(' '*level, 'QQQ', dF.rev)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1069 1070 1071 1072 1073 1074
        for tail in t.iter_revv(start=dF.rev, level=level+1):
            #print(' '*level, 'zzz', tail)
            yield ([dF.rev] + tail)


# ---- actual tests to access data ----
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1075

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1076 1077
# exercise wcfs functionality without wcfs invalidation protocol.
# plain data access + wcfs handling of ZODB invalidations.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1078
@func
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1079 1080
def test_wcfs_basic():
    t = tDB(); zf = t.zfile
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1081
    defer(t.close)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1082

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1083
    # >>> lookup non-BigFile -> must be rejected
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1084
    with raises(OSError) as exc:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1085
        t.wc._stat("head/bigfile/%s" % h(t.nonzfile._p_oid))
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1086 1087
    assert exc.value.errno == EINVAL

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1088
    # >>> file initially empty
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1089
    f = t.open(zf)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1090
    f.assertCache([])
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1091
    f.assertData ([], mtime=t.at0)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1092

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1093
    # >>> (@at1) commit data -> we can see it on wcfs
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1094
    at1 = t.commit(zf, {2:'c1'})
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1095

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1096
    f.assertCache([0,0,0])  # initially not cached
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1097
    f.assertData (['','','c1'], mtime=t.head)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1098

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1099
    # >>> (@at2) commit again -> we can see both latest and snapshotted states
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1100
    # NOTE blocks d(4) and f(5) will be accessed only in the end
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1101
    at2 = t.commit(zf, {2:'c2', 3:'d2', 5:'f2'})
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1102

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1103
    # f @head
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1104 1105 1106
    f.assertCache([1,1,0,0,0,0])
    f.assertData (['','', 'c2', 'd2', 'x','x'], mtime=t.head)
    f.assertCache([1,1,1,1,0,0])
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1107

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1108 1109
    # f @at1
    f1 = t.open(zf, at=at1)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1110
    f1.assertCache([0,0,1])
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1111
    f1.assertData (['','','c1'])  # XXX + mtime=at1?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1112 1113


Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1114
    # >>> (@at3) commit again without changing zf size
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1115
    f2 = t.open(zf, at=at2)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1116
    at3 = t.commit(zf, {2:'c3', 5:'f3'})  # FIXME + a3 after δbtree works (hole -> zblk)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1117

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1118
    f.assertCache([1,1,0,1,0,0])
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1119

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1120 1121
    # f @head is opened again -> cache must not be lost
    f_ = t.open(zf)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1122
    f_.assertCache([1,1,0,1,0,0])
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1123
    f_.close()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1124
    f.assertCache([1,1,0,1,0,0])
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1125 1126

    # f @head
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1127 1128
    f.assertCache([1,1,0,1,0,0])
    f.assertData (['','','c3','d2','x','x'], mtime=t.head)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1129 1130

    # f @at2
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1131
    # NOTE f(2) is accessed but via @at/ not head/  ; f(2) in head/zf remains unaccessed
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1132 1133
    f2.assertCache([0,0,1,0,0,0])
    f2.assertData (['','','c2','d2','','f2'])   # XXX mtime=at2?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1134 1135 1136

    # f @at1
    f1.assertCache([1,1,1])
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1137
    f1.assertData (['','','c1'])        # XXX mtime=at1?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1138 1139 1140


    # >>> f close / open again -> cache must not be lost
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1141
    # XXX a bit flaky since OS can evict whole f cache under pressure
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1142
    f.assertCache([1,1,1,1,0,0])
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1143 1144
    f.close()
    f = t.open(zf)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1145
    if f.cached() != [1,1,1,1,0,0]:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1146
        assert sum(f.cached()) > 4*1/2  # > 50%
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1147

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1148 1149
    # verify all blocks
    f.assertData(['','','c3','d2','','f3'])
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1150
    f.assertCache([1,1,1,1,1,1])
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1151

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1152

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1153 1154
# verify how wcfs processes ZODB invalidations when hole becomes a block with data.
# TODO merge into test_wcfs_basic & watch tests after δbtree is done
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1155
@xfail  # δbtree
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169
@func
def test_wcfs_basic_hole2zblk():
    t = tDB(); zf = t.zfile
    defer(t.close)

    f = t.open(zf)
    t.commit(zf, {2:'c1'})  # b & a are holes
    f.assertCache([0,0,0])
    f.assertData(['','','c1'])

    t.commit(zf, {1:'b2'})  # hole -> zblk
    f.assertCache([1,0,1])
    f.assertData(['','b2','c1'])

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1170 1171 1172
# XXX ZBlk copied from blk1 -> blk2 ; for the same file and for file1 -> file2  (δbtree)
# XXX ZBlk moved  from blk1 -> blk2 ; for the same file and for file1 -> file2  (δbtree)

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1173 1174 1175 1176 1177 1178 1179
# verify that read after file size returns (0, ok)
# (the same behaviour as on e.g. ext4 and as requested by posix)
@func
def test_wcfs_basic_read_aftertail():
    t = tDB(); zf = t.zfile
    defer(t.close)

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1180
    t.commit(zf, {2:'c1'})
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1181 1182 1183
    f = t.open(zf)
    f.assertData(['','','c1'])

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202
    def _(off): # -> bytes read from f[off +4)
        buf = bytearray(4)
        n = io.readat(f.f.fileno(), off, buf)
        return bytes(buf[:n])

    assert _(0*blksize)     == b'\x00\x00\x00\x00'
    assert _(1*blksize)     == b'\x00\x00\x00\x00'
    assert _(2*blksize)     == b'c1\x00\x00'
    assert _(3*blksize-4)   == b'\x00\x00\x00\x00'
    assert _(3*blksize-3)   == b'\x00\x00\x00'
    assert _(3*blksize-2)   == b'\x00\x00'
    assert _(3*blksize-1)   == b'\x00'
    assert _(3*blksize-0)   == b''
    assert _(3*blksize+1)   == b''
    assert _(3*blksize+2)   == b''
    assert _(3*blksize+3)   == b''
    assert _(4*blksize)     == b''
    assert _(8*blksize)     == b''
    assert _(100*blksize)   == b''
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1203 1204


Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1205 1206
# verify that watch setup is robust to client errors/misbehaviour.
@func
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1207
def test_wcfs_watch_robust():
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1208 1209 1210
    t = tDB(); zf = t.zfile
    defer(t.close)

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1211 1212
    at1 = t.commit(zf, {2:'c1'})
    at2 = t.commit(zf, {2:'c2'})
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1213

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1214 1215 1216 1217 1218 1219 1220
    # file not yet opened on wcfs
    wl = t.openwatch()
    assert wl.sendReq(timeout(), b"watch %s @%s" % (h(zf._p_oid), h(at1))) == \
        "error setup watch f<%s> @%s: " % (h(zf._p_oid), h(at1)) + \
        "file not yet known to wcfs or is not a ZBigFile"
    wl.close()

1221
    # closeTX/bye cancels blocked pin handlers
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1222 1223 1224 1225
    f = t.open(zf)
    f.assertBlk(2, 'c2')
    f.assertCache([0,0,1])

1226 1227 1228
    wl = t.openwatch()
    wg = sync.WorkGroup(timeout())
    def _(ctx):
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1229
        assert wl.sendReq(ctx, b"watch %s @%s" % (h(zf._p_oid), h(at1))) == \
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1230 1231
                "error setup watch f<%s> @%s: " % (h(zf._p_oid), h(at1)) + \
                "pin #%d @%s: context canceled" % (2, h(at1))
1232 1233 1234 1235
    wg.go(_)
    def _(ctx):
        req = wl.recvReq(ctx)
        assert req is not None
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1236
        assert req.msg == b"pin %s #%d @%s" % (h(zf._p_oid), 2, h(at1))
1237 1238 1239 1240
        # don't reply to req - close instead
        wl._closeTX()
    wg.go(_)
    wg.wait()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1241
    wl.close()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1242
    # NOTE if wcfs.go does not fully cleanup this canceled watch and leaves it
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1243 1244
    # in half-working state, it will break on further commit, as pin to the
    # watch won't be handled.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1245
    at3 = t.commit(zf, {2:'c3'})
1246

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1247
    # invalid requests -> wcfs replies error
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1248
    wl = t.openwatch()
1249
    assert wl.sendReq(timeout(), b'bla bla') ==  \
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1250
            b'error bad watch: not a watch request: "bla bla"'
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1251

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1252 1253 1254
    # invalid request not following frame structure -> fatal + wcfs must close watch link
    assert wl.fatalv == []
    wl._write(b'zzz hello\n')
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1255
    _, _rx = select(
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1256
        timeout().done().recv,
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1257 1258 1259
        wl.rx_eof.recv,
    )
    if _ == 0:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1260
        raise RuntimeError("%s: did not rx EOF after bad frame " % wl)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1261
    assert wl.fatalv == [b'error: invalid frame: "zzz hello\\n" (invalid stream)']
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1262
    wl.close()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1263

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1264 1265
    # watch with @at < δtail.tail -> rejected
    wl = t.openwatch()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1266 1267 1268
    atpast = p64(u64(t.tail)-1)
    wl._watch(zf, atpast, {}, "error setup watch f<%s> @%s: too far away back from"
            " head/at (@%s); …" % (h(zf._p_oid), h(atpast), h(t.head)))
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1269
    wl.close()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1270

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1271
# verify that `watch file @at` -> error, for @at when file did not existed.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292
@xfail  # check that file exists @at
@func
def test_wcfs_watch_before_create():
    t = tDB(); zf = t.zfile
    defer(t.close)

    at1 = t.commit(zf, {2:'c1'})
    zf2 = t.root['zfile2'] = ZBigFile(blksize)  # zf2 created @at2
    at2 = t.commit()
    at3 = t.commit(zf2, {1:'β3'})

    # force wcfs to access/know zf2
    f2 = t.open(zf2)
    f2.assertData(['','β3'])

    wl = t.openwatch()
    assert wl.sendReq(timeout(), b"watch %s @%s" % (h(zf2._p_oid), h(at1))) == \
        "error setup watch f<%s> @%s: " % (h(zf2._p_oid), h(at1)) + \
        "file does not exist at that database state"
    wl.close()

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1293

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1294
# verify that watch @at_i -> @at_j ↓ is rejected
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1295
# XXX we might want to allow going back in history later.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313
@func
def test_wcfs_watch_going_back():
    t = tDB(); zf = t.zfile
    defer(t.close)

    at1 = t.commit(zf, {2:'c1'})
    at2 = t.commit(zf, {2:'c2'})
    f = t.open(zf)
    f.assertData(['','','c2'])

    wl = t.openwatch()
    wl.watch(zf, at2, {})
    wl.sendReq(timeout(), b"watch %s @%s" % (h(zf._p_oid), h(at1))) == \
        "error setup watch f<%s> @%s: " % (h(zf._p_oid), h(at1)) + \
        "going back in history is forbidden"
    wl.close()


Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1314
# verify that wcfs kills slow/faulty client who does not reply to pin in time.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1315
@xfail  # protection against faulty/slow clients
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1316 1317
@func
def test_wcfs_pintimeout_kill():
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1318 1319 1320
    # adjusted wcfs timeout to kill client who is stuck not providing pin reply
    tkill = 3*time.second
    t = tDB(); zf = t.zfile     # XXX wcfs args += tkill
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1321 1322 1323 1324 1325 1326 1327
    defer(t.close)

    at1 = t.commit(zf, {2:'c1'})
    at2 = t.commit(zf, {2:'c2'})
    f = t.open(zf)
    f.assertData(['','','c2'])

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1328 1329 1330
    # XXX move into subprocess not to kill whole testing
    ctx, _ = context.with_timeout(context.background(), 2*tkill)

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1331
    wl = t.openwatch()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1332
    wg = sync.WorkGroup(ctx)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1333
    def _(ctx):
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1334 1335 1336 1337 1338 1339 1340 1341 1342
        # send watch. The pin handler won't be replying -> we should never get reply here.
        rxq = wl._sendReq(ctx, b"watch %s @%s" % (h(zf._p_oid), h(at1)))
        _, _rx = select(
            ctx.done().recv,    # 0
            rxq.recv,           # 1
        )
        if _ == 0:
            raise ctx.err()
        fail("watch request completed (should not as pin handler is stuck)")
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1343 1344 1345 1346 1347 1348
    wg.go(_)
    def _(ctx):
        req = wl.recvReq(ctx)
        assert req is not None
        assert req.msg == b"pin %s #%d @%s" % (h(zf._p_oid), 2, h(at1))

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1349 1350 1351 1352 1353 1354 1355 1356
        # sleep > wcfs pin timeout - wcfs must kill us
        _, _rx = select(
            ctx.done().recv,        # 0
            time.after(tkill).recv, # 1
        )
        if _ == 0:
            raise ctx.err()
        fail("wcfs did not killed stuck client")
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1357 1358 1359
    wg.go(_)
    wg.wait()

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1360

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1361
# watch with @at > head - must wait for head to become >= at.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1362
# XXX too far ahead - reject?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1363 1364 1365 1366 1367 1368 1369
@func
def test_wcfs_watch_setup_ahead():
    t = tDB(); zf = t.zfile
    defer(t.close)

    f = t.open(zf)
    at1 = t.commit(zf, {2:'c1'})
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1370
    f.assertData(['','x','c1'])     # NOTE #1 not accessed for watch @at1 to receive no pins
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404

    wg = sync.WorkGroup(timeout())
    dt = 100*time.millisecond
    committing = chan() # becomes ready when T2 starts to commit

    # T1: watch @(at1+1·dt)
    @func
    def _(ctx):
        wl = t.openwatch()
        defer(wl.close)

        wat = tidfromtime(tidtime(at1) + 1*dt)  # > at1, but < at2
        rxq = wl._sendReq(ctx, b"watch %s @%s" % (h(zf._p_oid), h(wat)))
        _, _rx = select(
            ctx.done().recv,    # 0
            rxq.recv,           # 1
        )
        if _ == 0:
            raise ctx.err()
        assert ready(committing)
        assert _rx == b"ok"
    wg.go(_)

    # T2: sleep(10·dt); commit
    @func
    def _(ctx):
        # reopen connection to database as we are committing from another thread
        conn = t.root._p_jar.db().open()
        defer(conn.close)
        root = conn.root()
        zf = root['zfile']

        time.sleep(10*dt)
        committing.close()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1405
        at2 = t.commit(zf, {1:'b2'})
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1406 1407 1408 1409 1410 1411
        assert tidtime(at2) - tidtime(at1) >= 10*dt
    wg.go(_)

    wg.wait()


Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1412
# verify that watch setup/update sends correct pins.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1413
@func
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1414
def test_wcfs_watch_setup():
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1415
    t = tDB(); zf = t.zfile; at0=t.at0
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1416 1417
    defer(t.close)

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1418
    f = t.open(zf)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1419 1420
    at1 = t.commit(zf, {2:'c1'})                # XXX + hole -> zblk
    at2 = t.commit(zf, {2:'c2', 3:'d2', 4:'e2', 5:'f2'})
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1421
    at3 = t.commit(zf, {2:'c3', 5:'f3'})
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1422

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1423 1424
    f.assertData(['','','c3','d2','x','f3'])     # access everything except e as of @at3
    f.assertCache([1,1,1,1,0,1])
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1425

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1426 1427
    # change again, but don't access e and f
    at4 = t.commit(zf, {2:'c4', 4:'e4', 5:'f4'})
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1428
    at5 = t.commit(zf, {3:'d5', 5:'f5'})
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1429 1430
    f.assertData(['','','c4','d5','x','x'])
    f.assertCache([1,1,1,1,0,0])
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1431

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1432 1433
    # some watch setup/update requests with explicit pinok (also partly
    # verifies how tWatchLink.watch computes automatic pinok)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1434 1435 1436 1437 1438 1439 1440 1441

    # new watch setup ø -> at
    def assertNewWatch(at, pinok):
        wl = t.openwatch()
        wl.watch(zf, at, pinok)
        wl.close()
    assertNewWatch(at1, {2:at1,  3:at0,  5:at0})
    assertNewWatch(at2, {2:at2,  3:at2,  5:at2})
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1442 1443
    assertNewWatch(at3, {2:at3,  3:at2,  5:at3})    # f(5) is pinned, even though it was not
    assertNewWatch(at4, {        3:at2,  5:at4})    # accessed after at3
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1444 1445 1446
    assertNewWatch(at5, {                     })

    # new watch + update at_i -> at_j
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1447 1448
    wl = t.openwatch()
    # XXX check @at0 ?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1449 1450 1451 1452 1453
    wl.watch(zf, at1, {2:at1,  3:at0,  5:at0})  #     -> at1 (new watch)    XXX at0 -> ø?
    wl.watch(zf, at2, {2:at2,  3:at2,  5:at2})  # at1 -> at2
    wl.watch(zf, at3, {2:at3,          5:at3})  # at2 -> at3
    wl.watch(zf, at4, {2:None,         5:at4})  # at3 -> at4 f(5) pinned even it was not accessed >=4
    wl.watch(zf, at5, {        3:None, 5:None}) # at4 -> at5 (current head)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1454
    wl.close()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1455

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1456
    # all valid watch setup/update requests going at_i -> at_j -> ... with automatic pinok
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1457
    for zf in t.zfiles():
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1458 1459
        for revv in t.iter_revv():
            print('\n--------')
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1460
            print(' -> '.join(['%s' % _ for _ in revv]))    # XXX join joins bytes as raw
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1461
            wl = t.openwatch()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1462
            wl.watch(zf, revv[0])
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1463
            wl.watch(zf, revv[0])    # verify at_i -> at_i
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1464
            for at in revv[1:]:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1465 1466
                wl.watch(zf, at)
            wl.close()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1467

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1468

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1469 1470
# verify that already setup watch(es) receive correct pins on block access.
@func
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1471
def test_wcfs_watch_vs_access():
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1472 1473 1474 1475 1476 1477 1478 1479 1480 1481
    t = tDB(); zf = t.zfile; at0=t.at0
    defer(t.close)

    f = t.open(zf)
    at1 = t.commit(zf, {2:'c1'})                # XXX + hole -> zblk
    at2 = t.commit(zf, {2:'c2', 3:'d2', 5:'f2'})
    at3 = t.commit(zf, {2:'c3', 5:'f3'})

    f.assertData(['','','c3','d2','x','x'])
    f.assertCache([1,1,1,1,0,0])
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1482

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1483 1484
    # watched + commit -> read -> receive pin messages.
    # read vs pin ordering is checked by assertBlk.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1485
    #
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1486
    # f(5) is kept not accessed to check later how wcfs.go handles δFtail
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1487
    # rebuild after it sees not yet accessed ZBlk that has change history.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1488
    wl3  = t.openwatch();  w3 = wl3.watch(zf, at3);  assert at3 == t.head
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1489 1490 1491
    assert w3.at     == at3
    assert w3.pinned == {}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1492
    wl3_ = t.openwatch();  w3_ = wl3_.watch(zf, at3)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1493 1494 1495
    assert w3_.at     == at3
    assert w3_.pinned == {}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1496
    wl2  = t.openwatch();  w2 = wl2.watch(zf, at2)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1497 1498 1499
    assert w2.at     == at2
    assert w2.pinned == {2: at2}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1500 1501 1502 1503 1504 1505
    # w_assertPin asserts on state of .pinned for {w3,w3_,w2}
    def w_assertPin(pinw3, pinw3_, pinw2):
        assert w3.pinned   == pinw3
        assert w3_.pinned  == pinw3_
        assert w2.pinned   == pinw2

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1506
    f.assertCache([1,1,1,1,0,0])
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1507
    at4 = t.commit(zf, {         2:'c4', 5:'f4', 6:'g4'})  # FIXME + b4 after δbtree works + update vvv
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1508
    f.assertCache([1,1,0,1,0,0,0])
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1509 1510 1511 1512 1513 1514 1515

    f.assertBlk(0, '',   {wl3: {},              wl3_: {},              wl2: {}})
    w_assertPin(               {},                    {},                   {2:at2})

    f.assertBlk(1, '',   {wl3: {},              wl3_: {},              wl2: {}})
    w_assertPin(               {},                    {},                   {2:at2})

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1516
    f.assertBlk(2, 'c4', {wl3: {2:at3},         wl3_: {2:at3},         wl2: {}})
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1517 1518
    w_assertPin(               {2:at3},               {2:at3},              {2:at2})

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1519
    f.assertBlk(3, 'd2', {wl3: {},              wl3_: {},              wl2: {}})
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1520
    w_assertPin(               {2:at3},               {2:at3},              {2:at2})
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1521

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1522
    # blk4 is hole @head - the same as at earlier db view - not pinned
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1523 1524 1525
    f.assertBlk(4, '',   {wl3: {},              wl3_: {},              wl2: {}})
    w_assertPin(               {2:at3},               {2:at3},              {2:at2})

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1526
    # f(5) is kept unaccessed (see ^^^)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1527
    assert f.cached()[5] == 0
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1528 1529 1530

    f.assertBlk(6, 'g4', {wl3: {6:at0},         wl3_: {6:at0},         wl2: {6:at0}}) # XXX at0->ø?
    w_assertPin(               {2:at3, 6:at0},        {2:at3, 6:at0},       {2:at2, 6:at0})
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1531

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1532
    # commit again:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1533
    # - c(2) is already pinned  -> wl3 not notified
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1534 1535
    # - watch stopped (wl3_)    -> watch no longer notified
    # - wlink closed (wl2)      -> watch no longer notified
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1536
    # - f(5) is still kept unaccessed (see ^^^)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1537
    f.assertCache([1,1,1,1,1,0,1])
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1538
    at5 = t.commit(zf, {2:'c5', 3:'d5', 5:'f5'})
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1539
    f.assertCache([1,1,0,0,1,0,1])
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1540

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1541
    wl3_.stop_watch(zf) # w3_ should not be notified
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1542
    wl2.close()         # wl2:* should not be notified
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1543 1544
    def w_assertPin(pinw3):
        assert w3.pinned   == pinw3
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1545
        assert w3_.pinned  == {}; assert w3_.at == z64  # wl3_ unsubscribed from zf
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1546
        assert w2.pinned   == {}; assert w2.at  == z64  # wl2 closed
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1547

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1548
    f.assertBlk(0, '',   {wl3: {},                      wl3_: {}})  # no change
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1549
    w_assertPin(               {2:at3, 6:at0})
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1550

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1551
    f.assertBlk(1, '',   {wl3: {},                      wl3_: {}})
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1552
    w_assertPin(               {2:at3, 6:at0})
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1553

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1554
    f.assertBlk(2, 'c5', {wl3: {},                      wl3_: {}})  # c(2) already pinned on wl3
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1555
    w_assertPin(               {2:at3, 6:at0})
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1556

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1557
    f.assertBlk(3, 'd5', {wl3: {3:at2},                 wl3_: {}})  # d(3) was not pinned on wl3; wl3_ not notified
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1558
    w_assertPin(               {2:at3, 3:at2, 6:at0})
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1559

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1560
    f.assertBlk(4, '',   {wl3: {},                      wl3_: {}})
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1561 1562
    w_assertPin(               {2:at3, 3:at2, 6:at0})

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1563
    # f(5) is kept still unaccessed (see ^^^)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1564
    assert f.cached()[5] == 0
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1565

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1566 1567
    f.assertBlk(6, 'g4', {wl3: {},                      wl3_: {}})
    w_assertPin(               {2:at3, 3:at2, 6:at0})
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1568

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1569

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1570 1571
    # advance watch - receives new pins/unpins to @head.
    # this is also tested ^^^ in `at_i -> at_j -> ...` watch setup/adjust.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1572
    # NOTE f(5) is not affected because it was not pinned previously.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1573
    wl3.watch(zf, at4, {2:at4, 6:None})     # at3 -> at4
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1574 1575
    w_assertPin(       {2:at4, 3:at2})

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1576 1577 1578 1579
    # access f(5) -> wl3 should be correctly pinned
    assert f.cached() == [1,1,1,1,1,0,1]  # f(5) was not yet accessed
    f.assertBlk(5, 'f5', {wl3: {5:at4},                 wl3_: {}})
    w_assertPin(               {2:at4, 3:at2, 5:at4})
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1580

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1581
    # advance watch again
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1582 1583
    wl3.watch(zf, at5, {2:None, 3:None, 5:None})    # at4 -> at5
    w_assertPin(       {})
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1584

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1585
    wl3.close()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1586

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1587

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1588
# verify that on pin message, while under pagefault, we can mmap @at/f[blk]
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1589
# into where head/f[blk] was mmaped; the result of original pagefaulting read
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1590
# must be from newly inserted mapping.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1591 1592 1593
#
# TODO same with two mappings to the same file, but only one changing blk mmap
#      -> one read gets changed data, one read gets data from @head.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1594 1595 1596 1597 1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616 1617
@func
def test_wcfs_remmap_on_pin():
    t = tDB(); zf = t.zfile
    defer(t.close)

    at1 = t.commit(zf, {2:'hello'})
    at2 = t.commit(zf, {2:'world'})

    f  = t.open(zf)
    f1 = t.open(zf, at=at1)
    wl = t.openwatch()
    wl.watch(zf, at1, {})

    f.assertCache([0,0,0])
    def _(wlink, foid, blk, at):
        assert wlink is wl
        assert foid  == zf._p_oid
        assert blk   == 2
        assert at    == at1
        mm.map_into_ro(f._blk(blk), f1.f.fileno(), blk*f.blksize)

    f._assertBlk(2, 'hello', {wl: {2:at1}}, pinfunc=_)     # NOTE not world


Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1618
# verify that pin message is not sent for the same blk@at twice.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1619 1620 1621 1622 1623 1624 1625 1626 1627 1628 1629 1630 1631 1632 1633 1634 1635
@func
def test_wcfs_no_pin_twice():
    t = tDB(); zf = t.zfile
    defer(t.close)

    f = t.open(zf)
    at1 = t.commit(zf, {2:'c1'})
    at2 = t.commit(zf, {2:'c2'})
    wl = t.openwatch()
    w = wl.watch(zf, at1, {})
    f.assertCache([0,0,0])

    f.assertBlk(2, 'c2', {wl: {2:at1}})
    f.assertCache([0,0,1])
    assert w.pinned == {2:at1}

    # drop file[blk] from cache, access again -> no pin message sent the second time
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1636 1637 1638 1639 1640
    #
    # ( we need both madvise(DONTNEED) and fadvise(DONTNEED) - given only one of
    #   those the kernel won't release the page from pagecache; madvise does
    #   not work without munlock. )
    mm.unlock(f._blk(2))
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1641
    mm.advise(f._blk(2), mm.MADV_DONTNEED)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1642
    fadvise_dontneed(f.f.fileno(), 2*blksize, 1*blksize)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1643 1644
    f.assertCache([0,0,0])

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1645 1646
    f.assertBlk(2, 'c2', {wl: {}})
    f.assertCache([0,0,1])
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1647 1648


Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1649 1650 1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669 1670 1671 1672
# verify watching for 2 files over single watch link.
@func
def test_wcfs_watch_2files():
    t = tDB(); zf1 = t.zfile
    defer(t.close)

    t.root['zfile2'] = zf2 = ZBigFile(blksize)
    t.commit()

    t.change(zf1, {0:'a2', 2:'c2'})
    t.change(zf2, {1:'β2', 3:'δ2'})
    at2 = t.commit()

    t.change(zf1, {0:'a3', 2:'c3'})
    t.change(zf2, {1:'β3', 3:'δ3'})
    at3 = t.commit()

    f1 = t.open(zf1)
    f2 = t.open(zf2)

    f1.assertData(['a3', '',   'x'    ])
    f2.assertData(['',   'β3', '', 'x'])

    wl = t.openwatch()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1673 1674 1675 1676 1677 1678 1679 1680 1681 1682 1683 1684
    w1 = wl.watch(zf1, at2, {0:at2})
    w2 = wl.watch(zf2, at2, {1:at2})

    def w_assertPin(pinw1, pinw2):
        assert w1.pinned == pinw1
        assert w2.pinned == pinw2

    w_assertPin(               {0:at2},             {1:at2})
    f1.assertBlk(2, 'c3', {wl: {2:at2}})
    w_assertPin(               {0:at2, 2:at2},      {1:at2})
    f2.assertBlk(3, 'δ3', {wl:                      {3:at2}})
    w_assertPin(               {0:at2, 2:at2},      {1:at2, 3:at2})
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1685

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1686 1687 1688 1689 1690
    wl.watch(zf1, at3, {0:None, 2:None})
    w_assertPin(               {},                  {1:at2, 3:at2})
    wl.watch(zf2, at3, {1:None, 3:None})
    w_assertPin(               {},                  {})

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1691 1692


Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1693
# XXX new watch request while previous watch request is in progress (over the same /head/watch handle)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1694
# XXX @revX/ is automatically removed after some time
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1695

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1696

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1697 1698
# ---- wcfs.py + virtmem integration ----

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1699 1700 1701 1702 1703 1704 1705 1706 1707 1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718
# tMapping provides testing environment for Mapping.
class tMapping(object):
    def __init__(t, mmap):
        t.mmap = mmap

    # XXX assertCache

    # assertBlk asserts that mmap[·] with · corresponding to blk has reads as dataok.
    # see also: tFile.assertBlk .
    def assertBlk(t, blk, dataok):
        assert t.mmap.blk_start <= blk < t.mmap.blk_stop
        blk_inmmap = blk - t.mmap.blk_start

        if not isinstance(dataok, bytes):
            dataok = dataok.encode('utf-8')
        f = t.mmap.file
        assert len(dataok) <= f.blksize
        dataok += b'\0'*(f.blksize - len(dataok))   # trailing zeros

        blkview = memoryview(t.mmap.mem[blk_inmmap*f.blksize:][:f.blksize])
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1719 1720 1721 1722 1723 1724
        # XXX first access without GIL, so that e.g. if there is timeout on
        # wcfs.py side, _abort_ontimeout could run and kill WCFS.
        # FIXME also test with GIL locked, since wcfs.py pinner must be itself
        # running without GIL.
        _ = read_nogil(blkview[:1])
        assert _ == dataok[0]
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1725 1726 1727 1728 1729
        assert blkview.tobytes() == dataok

    # XXX assertData

# test_wcfspy_virtmem verifies wcfs.py integration with virtmem.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1730 1731
@func
def test_wcfspy_virtmem():
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1732 1733 1734
    t = tDB(); zf = t.zfile
    defer(t.close)

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1735
    at1 = t.commit(zf, {2:'c1', 3:'d1'})
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1736 1737 1738
    at2 = t.commit(zf, {2:'c2'})

    wconn = t.wc.connect(at1)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1739
    defer(wconn.close)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1740

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1741
    # create mmap with 1 block beyond file size
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1742
    m1 = wconn.mmap(zf._p_oid, 2, 3)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1743
    defer(m1.unmap)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1744

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1745 1746 1747 1748
    assert m1.blk_start == 2
    assert m1.blk_stop  == 5
    assert len(m1.mem)  == 3*zf.blksize

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1749
    f = m1.file
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1750
    tm1 = tMapping(m1)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1751 1752 1753 1754

    #assertCache(m1, [0,0,0])
    assert f.pinned == {}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1755
    # verify initial data reads
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1756
    tm1.assertBlk(2, 'c1')
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1757
    assert f.pinned == {2:at1}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1758
    tm1.assertBlk(3, 'd1')
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1759
    assert f.pinned == {2:at1}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1760
    tm1.assertBlk(4, '')
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1761 1762
    assert f.pinned == {2:at1}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1763
    # commit with growing file size -> verify data read as the same, #3 pinned.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1764
    # (#4 is not yet pinned because it was not accessed)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1765
    at3 = t.commit(zf, {3:'d3', 4:'e3'})
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1766
    assert f.pinned == {2:at1}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1767 1768
    tm1.assertBlk(2, 'c1')
    assert f.pinned == {2:at1}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1769 1770
    tm1.assertBlk(3, 'd1')
    assert f.pinned == {2:at1, 3:at1}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1771 1772
    tm1.assertBlk(4, '')
    assert f.pinned == {2:at1, 3:at1}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1773

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1774
    # resync at1 -> at2:    #2 must unpin to @head; #4 must stay as zero
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1775 1776 1777
    wconn.resync(at2)
    assert f.pinned == {3:at1}
    tm1.assertBlk(2, 'c2')
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1778
    tm1.assertBlk(3, 'd1')
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1779
    tm1.assertBlk(4, '')
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1780

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1781
    # resync at2 -> at3:    #3 must unpin to @head; #4 - start to read with data
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1782 1783 1784
    wconn.resync(at3)
    assert f.pinned == {}
    tm1.assertBlk(2, 'c2')
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1785
    tm1.assertBlk(3, 'd3')
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1786
    tm1.assertBlk(4, 'e3')
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1787

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1788 1789
    # XXX resync ↓ ?

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1790 1791


Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1792
    # XXX mmap after .size completely (start > size)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1793 1794 1795 1796

    # XXX w mapping with RW - in sync


Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1797

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1798 1799 1800 1801 1802 1803 1804
# ---- misc ---

# readfile reads file @ path.
def readfile(path):
    with open(path) as f:
        return f.read()

1805 1806 1807 1808 1809
# writefile writes data to file @ path.
def writefile(path, data):
    with open(path, "w") as f:
        f.write(data)

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1810 1811 1812 1813 1814 1815 1816 1817 1818
# tidtime converts tid to transaction commit time.
def tidtime(tid):
    t = TimeStamp(tid).timeTime()

    # ZODB/py vs ZODB/go time resolution is not better than 1µs
    # see e.g. https://lab.nexedi.com/kirr/neo/commit/9112f21e
    #
    # NOTE pytest.approx supports only ==, not e.g. <, so we use plain round.
    return round(t, 6)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1819

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1820
# tidfromtime converts time into corresponding transaction ID.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1821 1822 1823 1824 1825 1826 1827 1828 1829
def tidfromtime(t):
    f = t - int(t)      # fraction of seconds
    t = int(t)
    _ = gmtime(t)
    s = _.tm_sec + f    # total seconds

    ts = TimeStamp(_.tm_year, _.tm_mon, _.tm_mday, _.tm_hour, _.tm_min, s)
    return ts.raw()

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1830
# verify that tidtime is precise enough to show difference in between transactions.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1831
# verify that tidtime -> tidfromtime is identity withing rounding tolerance.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1832
@func
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1833
def test_tidtime():
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1834
    t = tDB()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1835
    defer(t.close)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1836

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1837 1838
    # tidtime not rough
    atv = [t.commit()]
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1839 1840
    for i in range(10):
        at = t.commit()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1841 1842 1843 1844 1845 1846 1847 1848 1849
        assert tidtime(at) > tidtime(atv[-1])
        atv.append(at)

    # tidtime -> tidfromtime
    for at in atv:
        tat  = tidtime(at)
        at_  = tidfromtime(tat)
        tat_ = tidtime(at_)
        assert abs(tat_ - tat) <= 2E-6
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1850 1851


Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1852 1853 1854
# tAt is bytes whose repr returns human readable string considering it as `at` under tDB.
#
# It gives both symbolic version and raw hex forms, for example:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1855
#   @at2 (03cf7850500b5f66)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1856 1857 1858 1859 1860 1861 1862 1863 1864 1865 1866 1867 1868 1869 1870 1871 1872
#
# tAt is used everywhere with the idea that e.g. if an assert comparing at, or
# e.g. dicts containing at, fails, everything is printed in human readable
# form instead of raw hex that is hard to visibly map to logical transaction.
class tAt(bytes):
    def __new__(cls, tdb, at):
        tat = bytes.__new__(cls, at)
        tat.tdb = tdb
        return tat

    def __repr__(at):
        t = at.tdb
        for i, dF in enumerate(t.dFtail):
            if dF.rev == at:
                    return "@at%d (%s)" % (i, h(at))
        return "@" + h(at)
    __str__ = __repr__
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1873

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1874 1875 1876 1877 1878 1879 1880 1881
# hpin returns human-readable representation for {}blk->rev.
@func(tDB)
def hpin(t, pin):
    pinv = []
    for blk in sorted(pin.keys()):
        if pin[blk] is None:
            s = '@head'
        else:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1882
            s = '%s' % pin[blk]
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1883 1884 1885 1886
        pinv.append('%d: %s' % (blk, s))
    return '{%s}' % ', '.join(pinv)


Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1887 1888 1889 1890 1891 1892 1893 1894 1895 1896
# zfiles returns ZBigFiles that were ever changed under t.
@func(tDB)
def zfiles(t):
    zfs = set()
    for dF in t.dFtail:
        for zf in dF.byfile:
            if zf not in zfs:
                zfs.add(zf)
    return zfs

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1897

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1898
# dump_history prints t's change history in tabular form.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1899
#
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1900 1901
# the output is useful while developing or analyzing a test failure: to get
# overview of how file(s) are changed in tests.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1902 1903 1904 1905 1906
@func(tDB)
def dump_history(t):
    print('>>> Change history by file:\n')
    for zf in t.zfiles():
        print('f<%s>:' % h(zf._p_oid))
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1907
        indent = '\t%s\t' % (' '*len('%s' % t.head),)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1908 1909
        print('%s%s' % (indent, ' '.join('01234567')))
        print('%s%s' % (indent, ' '.join('abcdefgh')))
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1910 1911 1912 1913 1914 1915 1916 1917 1918 1919 1920
        for dF in t.dFtail:
            df = dF.byfile.get(zf)
            emitv = []
            if df is not None:
                dblk = set(df.ddata.keys())
                for blk in range(max(dblk)+1):
                    if blk in dblk:
                        emitv.append('%d' % blk)
                    else:
                        emitv.append(' ')

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1921
            print('\t%s\t%s' % (dF.rev, ' '.join(emitv)))
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1922
    print()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1923 1924


Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1925 1926 1927 1928 1929 1930 1931 1932 1933
# ready reports whether chan ch is ready.
def ready(ch):
    _, _rx = select(
        default,    # 0
        ch.recv,    # 1
    )
    return bool(_)


Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1934 1935 1936 1937 1938 1939 1940 1941 1942 1943 1944 1945 1946 1947 1948 1949 1950 1951 1952 1953 1954 1955 1956 1957 1958 1959
# procwait waits for a process (subprocess.Popen) to terminate.
def procwait(ctx, proc):
    wg = sync.WorkGroup(ctx)
    def _(ctx):
        while 1:
            if ready(ctx.done()):
                raise ctx.err()
            ret = proc.poll()
            if ret is not None:
                return
            tdelay()
    wg.go(_)
    wg.wait()

# procwait_, similarly to procwait, waits for a process (subprocess.Popen) to terminate.
#
# it returns bool whether process terminated or not - e.g. due to context being canceled.
def procwait_(ctx, proc):   # -> ok
    try:
        procwait(ctx, proc)
    except Exception as e:
        if e in (context.canceled, context.deadlineExceeded):
            return False
        raise
    return True

1960 1961 1962 1963 1964 1965 1966 1967 1968 1969 1970 1971 1972 1973 1974 1975 1976 1977 1978 1979 1980
# is_mountpoint returns whether path is a mountpoint
def is_mountpoint(path):    # -> bool
    # NOTE we don't call mountpoint directly on path, because if FUSE
    # fileserver failed, the mountpoint will also fail and print ENOTCONN
    try:
        _ = os.lstat(path)
    except OSError as e:
        if e.errno == ENOENT:
            return False
        # "Transport endpoint is not connected" -> it is a failed FUSE server
        # (XXX we can also grep /proc/mounts)
        if e.errno == ENOTCONN:
            return True
        raise

    if not S_ISDIR(_.st_mode):
        return False

    mounted = (0 == subprocess.call(["mountpoint", "-q", path]))
    return mounted

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1981

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1982 1983 1984 1985
# eprint prints msg to stderr
def eprint(msg):
    print(msg, file=sys.stderr)

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1986 1987 1988 1989 1990
# xdefer is like defer, but makes sure exception raised before deferred
# function is called is not lost.
#
# if deferred function raises exception itself - it prints previous exception to stderr.
#
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1991
# XXX xdefer is workaround for Python2 not having exception chaining (PEP 3134)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1992 1993 1994 1995
# without which, if e.g. tDB.close() raises exception, it prevents to see
# whether and which an assert in the test failed.
#
# XXX merge into defer?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1996
_defer = defer  # original golang.defer
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1997 1998 1999 2000 2001 2002 2003 2004 2005 2006 2007 2008 2009
def xdefer(f):
    # hack - imitate as if defer called from under xdefer was called directly by caller func
    fgo = inspect.currentframe().f_back.f_back
    __goframe__ = fgo.f_locals['__goframe__']
    _xdefer(f)

def _xdefer(f):
    def _():
        # call f, but print previous exception if f raises
        exc_type, exc_value, exc_traceback = sys.exc_info()
        try:
            f()
        except:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2010 2011
            if exc_type is not None:
                traceback.print_exception(exc_type, exc_value, exc_traceback)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2012
            raise
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2013 2014 2015 2016 2017
    _defer(_)

# replace defer with xdefer
defer = xdefer
del xdefer