Commit 9cf38ffe authored by Kirill Smelkov's avatar Kirill Smelkov

Merge branch 'master' into t

* master:
  wcfs: Setup basic logging for warnings/errors to go to stderr when invoked as e.g. `wcfs serve`
  wcfs: Fix crash if on invalidation handledδZ needs to access ZODB
  t/qemu-runlinux: Use multidevs=remaps for 9P setup
  fixup! *: Use defer for dbclose & friends
  wcfs: client: tests: Turn SIGSEGV in tMapping.assertBlk into exception
  wcfs: Server.stop: Don't log "after SIGTERM" when first wait for wcfs.go exit failed
  wcfs: Server.stop: Don't report first unmount failure to outside
parents 2b28174e 2a6b9bc3
...@@ -44,13 +44,15 @@ def teardown_module(): ...@@ -44,13 +44,15 @@ def teardown_module():
testdb.teardown() testdb.teardown()
@func
def test_zbigarray(): def test_zbigarray():
root = testdb.dbopen() root = testdb.dbopen()
defer(lambda: dbclose(root))
root['zarray'] = ZBigArray((16*1024*1024,), uint8) root['zarray'] = ZBigArray((16*1024*1024,), uint8)
transaction.commit() transaction.commit()
dbclose(root) dbclose(root)
del root
root = testdb.dbopen() root = testdb.dbopen()
...@@ -94,7 +96,7 @@ def test_zbigarray(): ...@@ -94,7 +96,7 @@ def test_zbigarray():
# reload DB & array # reload DB & array
dbclose(root) dbclose(root)
del root, a,b, A del a,b, A
root = testdb.dbopen() root = testdb.dbopen()
...@@ -130,7 +132,7 @@ def test_zbigarray(): ...@@ -130,7 +132,7 @@ def test_zbigarray():
# reload & verify changes # reload & verify changes
dbclose(root) dbclose(root)
del root, a, A, db del a, A, db
root = testdb.dbopen() root = testdb.dbopen()
...@@ -165,7 +167,7 @@ def test_zbigarray(): ...@@ -165,7 +167,7 @@ def test_zbigarray():
# commit; reload & verify changes # commit; reload & verify changes
transaction.commit() transaction.commit()
dbclose(root) dbclose(root)
del root, a, b, A del a, b, A
root = testdb.dbopen() root = testdb.dbopen()
...@@ -190,22 +192,21 @@ def test_zbigarray(): ...@@ -190,22 +192,21 @@ def test_zbigarray():
assert a[24*1024*1024+2] == 12 assert a[24*1024*1024+2] == 12
assert a[24*1024*1024+3] == 13 assert a[24*1024*1024+3] == 13
dbclose(root)
# test array ordering is saved properly into DB and is picked up in # test array ordering is saved properly into DB and is picked up in
# backward-compatible manner - for data saved before order parameter was # backward-compatible manner - for data saved before order parameter was
# introduced. # introduced.
# (actual ordering indexing test is in BigArray tests, not here) # (actual ordering indexing test is in BigArray tests, not here)
@func
def test_zbigarray_order(): def test_zbigarray_order():
# make sure order is properly saved/restored to/from DB # make sure order is properly saved/restored to/from DB
root = testdb.dbopen() root = testdb.dbopen()
defer(lambda: dbclose(root))
root['carray'] = ZBigArray((16*1024*1024,), uint8) root['carray'] = ZBigArray((16*1024*1024,), uint8)
root['farray'] = ZBigArray((16*1024*1024,), uint8, order='F') root['farray'] = ZBigArray((16*1024*1024,), uint8, order='F')
transaction.commit() transaction.commit()
dbclose(root) dbclose(root)
del root
root = testdb.dbopen() root = testdb.dbopen()
C = root['carray'] C = root['carray']
...@@ -228,16 +229,12 @@ def test_zbigarray_order(): ...@@ -228,16 +229,12 @@ def test_zbigarray_order():
transaction.commit() transaction.commit()
dbclose(root) dbclose(root)
del root, Cold
root = testdb.dbopen() root = testdb.dbopen()
Cold = root['coldarray'] Cold = root['coldarray']
assert Cold._order == 'C' assert Cold._order == 'C'
dbclose(root)
del root
# the same as test_bigfile_filezodb_vs_conn_migration but explicitly for ZBigArray # the same as test_bigfile_filezodb_vs_conn_migration but explicitly for ZBigArray
......
...@@ -74,10 +74,12 @@ blksize32 = blksize // 4 ...@@ -74,10 +74,12 @@ blksize32 = blksize // 4
def Blk(vma, i): def Blk(vma, i):
return ndarray(blksize32, offset=i*blksize, buffer=vma, dtype=uint32) return ndarray(blksize32, offset=i*blksize, buffer=vma, dtype=uint32)
@func
def test_bigfile_filezodb(): def test_bigfile_filezodb():
ram_reclaim_all() # reclaim pages allocated by previous tests ram_reclaim_all() # reclaim pages allocated by previous tests
root = dbopen() root = dbopen()
defer(lambda: dbclose(root))
root['zfile'] = f = ZBigFile(blksize) root['zfile'] = f = ZBigFile(blksize)
transaction.commit() transaction.commit()
...@@ -124,7 +126,6 @@ def test_bigfile_filezodb(): ...@@ -124,7 +126,6 @@ def test_bigfile_filezodb():
del fh del fh
dbclose(root) dbclose(root)
del root
root = dbopen() root = dbopen()
f = root['zfile'] f = root['zfile']
...@@ -160,7 +161,6 @@ def test_bigfile_filezodb(): ...@@ -160,7 +161,6 @@ def test_bigfile_filezodb():
del vma del vma
del fh del fh
dbclose(root) dbclose(root)
del root
root = dbopen() root = dbopen()
f = root['zfile'] f = root['zfile']
...@@ -194,7 +194,7 @@ def test_bigfile_filezodb(): ...@@ -194,7 +194,7 @@ def test_bigfile_filezodb():
del vma del vma
del fh del fh
dbclose(root) dbclose(root)
del db, root del db
root = dbopen() root = dbopen()
f = root['zfile'] f = root['zfile']
...@@ -209,9 +209,6 @@ def test_bigfile_filezodb(): ...@@ -209,9 +209,6 @@ def test_bigfile_filezodb():
assert array_equal(Blk(vma, i)[1:], dataX(i)[1:]) assert array_equal(Blk(vma, i)[1:], dataX(i)[1:])
dbclose(root)
# connection can migrate between threads handling requests. # connection can migrate between threads handling requests.
# verify _ZBigFileH properly adjusts. # verify _ZBigFileH properly adjusts.
......
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
# qemu-runlinux [options] <kernel> <program> ... # qemu-runlinux [options] <kernel> <program> ...
# run kernel/program in QEMU with root fs taken from host # run kernel/program in QEMU with root fs taken from host
# #
# Copyright (C) 2014-2019 Nexedi SA and Contributors. # Copyright (C) 2014-2021 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com> # Kirill Smelkov <kirr@nexedi.com>
# #
# This program is free software: you can Use, Study, Modify and Redistribute # This program is free software: you can Use, Study, Modify and Redistribute
...@@ -205,7 +205,7 @@ qemu-system-$arch \ ...@@ -205,7 +205,7 @@ qemu-system-$arch \
\ \
-m 512M `# default 128M is too limiting` \ -m 512M `# default 128M is too limiting` \
\ \
-fsdev local,id=R,path=/,security_model=none,readonly \ -fsdev local,id=R,path=/,security_model=none,readonly,multidevs=remap \
-device virtio-9p-pci,fsdev=R,mount_tag=/dev/root \ -device virtio-9p-pci,fsdev=R,mount_tag=/dev/root \
\ \
-kernel $kernel \ -kernel $kernel \
......
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# Copyright (C) 2018-2021 Nexedi SA and Contributors. # Copyright (C) 2018-2022 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com> # Kirill Smelkov <kirr@nexedi.com>
# #
# This program is free software: you can Use, Study, Modify and Redistribute # This program is free software: you can Use, Study, Modify and Redistribute
...@@ -446,7 +446,7 @@ def __stop(wcsrv, ctx, _onstuck): ...@@ -446,7 +446,7 @@ def __stop(wcsrv, ctx, _onstuck):
if _procwait_(timeoutFrac(0.5), wcsrv._proc): if _procwait_(timeoutFrac(0.5), wcsrv._proc):
return return
log.warn("wcfs.go does not exit (after SIGTERM)") log.warn("wcfs.go does not exit")
log.warn("-> kill -QUIT wcfs.go ...") log.warn("-> kill -QUIT wcfs.go ...")
os.kill(wcsrv._proc.pid, SIGQUIT) os.kill(wcsrv._proc.pid, SIGQUIT)
...@@ -469,7 +469,7 @@ def __stop(wcsrv, ctx, _onstuck): ...@@ -469,7 +469,7 @@ def __stop(wcsrv, ctx, _onstuck):
try: try:
if _is_mountpoint(wcsrv.mountpoint): # could be unmounted from outside if _is_mountpoint(wcsrv.mountpoint): # could be unmounted from outside
_fuse_unmount(wcsrv.mountpoint) _fuse_unmount(wcsrv.mountpoint)
except: except Exception as e:
# if clean unmount failed -> kill -TERM wcfs and force abort of fuse connection. # if clean unmount failed -> kill -TERM wcfs and force abort of fuse connection.
# #
# aborting fuse connection is needed in case wcfs/kernel will be stuck # aborting fuse connection is needed in case wcfs/kernel will be stuck
...@@ -482,6 +482,10 @@ def __stop(wcsrv, ctx, _onstuck): ...@@ -482,6 +482,10 @@ def __stop(wcsrv, ctx, _onstuck):
wcsrv._fuseabort.write(b"1\n") wcsrv._fuseabort.write(b"1\n")
wcsrv._fuseabort.flush() wcsrv._fuseabort.flush()
defer(_) defer(_)
# treat first unmount failure as temporary - e.g. due to "device or resource is busy".
# we'll be retrying to unmount the filesystem the second time after kill/fuse-abort.
if not isinstance(e, _FUSEUnmountError):
raise raise
...@@ -550,6 +554,8 @@ def _rmdir_ifexists(path): ...@@ -550,6 +554,8 @@ def _rmdir_ifexists(path):
# _fuse_unmount calls `fusermount -u` + logs details if unmount failed. # _fuse_unmount calls `fusermount -u` + logs details if unmount failed.
# #
# Additional options to fusermount can be passed via optv. # Additional options to fusermount can be passed via optv.
class _FUSEUnmountError(RuntimeError):
pass
@func @func
def _fuse_unmount(mntpt, *optv): def _fuse_unmount(mntpt, *optv):
ret, out = _sysproccallout(["fusermount", "-u"] + list(optv) + [mntpt]) ret, out = _sysproccallout(["fusermount", "-u"] + list(optv) + [mntpt])
...@@ -576,7 +582,7 @@ def _fuse_unmount(mntpt, *optv): ...@@ -576,7 +582,7 @@ def _fuse_unmount(mntpt, *optv):
opts += ' ' opts += ' '
emsg = "fuse_unmount %s%s: failed: %s" % (opts, mntpt, out) emsg = "fuse_unmount %s%s: failed: %s" % (opts, mntpt, out)
log.warn(emsg) log.warn(emsg)
raise RuntimeError("%s\n(more details logged)" % emsg) raise _FUSEUnmountError("%s\n(more details logged)" % emsg)
# _is_mountpoint returns whether path is a mountpoint # _is_mountpoint returns whether path is a mountpoint
def _is_mountpoint(path): # -> bool def _is_mountpoint(path): # -> bool
...@@ -731,6 +737,10 @@ def main(): ...@@ -731,6 +737,10 @@ def main():
zurl = argv[-1] # -a -b zurl -> zurl zurl = argv[-1] # -a -b zurl -> zurl
optv = argv[:-1] # -a -b zurl -> -a -b optv = argv[:-1] # -a -b zurl -> -a -b
# setup log.warn/error to go to stderr, so that details could be seen on
# e.g. "fuse_unmount: ... failed (more details logged)"
logging.basicConfig(stream=sys.stderr, level=logging.WARNING)
if cmd == "serve": if cmd == "serve":
if argv[0] == '-h': if argv[0] == '-h':
os.execv(_wcfs_exe(), [_wcfs_exe(), '-h']) os.execv(_wcfs_exe(), [_wcfs_exe(), '-h'])
......
...@@ -32,7 +32,7 @@ from wendelin.bigfile.file_zodb import ZBigFile ...@@ -32,7 +32,7 @@ from wendelin.bigfile.file_zodb import ZBigFile
from wendelin.wcfs.wcfs_test import tDB, tAt, timeout, eprint from wendelin.wcfs.wcfs_test import tDB, tAt, timeout, eprint
from wendelin.wcfs import _waitfor_ as waitfor_ from wendelin.wcfs import _waitfor_ as waitfor_
from wendelin.wcfs import wcfs_test from wendelin.wcfs import wcfs_test
from wendelin.wcfs.internal.wcfs_test import read_mustfault from wendelin.wcfs.internal.wcfs_test import read_mustfault, read_exfault_withgil
from wendelin.wcfs.internal import mm from wendelin.wcfs.internal import mm
from pytest import raises from pytest import raises
...@@ -83,7 +83,7 @@ class tMapping(object): ...@@ -83,7 +83,7 @@ class tMapping(object):
# - - - - - - # - - - - - -
# client process # client process
# #
_ = blkview[0] _ = read_exfault_withgil(blkview[0:1])
assert _ == dataok[0] assert _ == dataok[0]
assert blkview.tobytes() == dataok assert blkview.tobytes() == dataok
......
...@@ -81,12 +81,20 @@ cdef class _tWCFS: ...@@ -81,12 +81,20 @@ cdef class _tWCFS:
# read_exfault_nogil reads mem with GIL released and returns its content. # read_exfault_nogil reads mem with GIL released and returns its content.
# #
# If reading hits segmentation fault, it is converted to SegmentationFault exception. # If reading hits segmentation fault, it is converted to SegmentationFault exception.
def read_exfault_nogil(const unsigned char[::1] mem not None) -> bytes:
return _read_exfault(mem, withgil=False)
# read_exfault_withgil is similar to read_exfault_nogil, but does the reading
# while holding the GIL.
def read_exfault_withgil(const unsigned char[::1] mem not None) -> bytes:
return _read_exfault(mem, withgil=True)
class SegmentationFault(Exception): pass class SegmentationFault(Exception): pass
cdef sync.Mutex exfaultMu # one at a time as sigaction is per-process cdef sync.Mutex exfaultMu # one at a time as sigaction is per-process
cdef sigjmp_buf exfaultJmp cdef sigjmp_buf exfaultJmp
cdef cbool faulted cdef cbool faulted
def read_exfault_nogil(const unsigned char[::1] mem not None) -> bytes: def _read_exfault(const unsigned char[::1] mem not None, cbool withgil) -> bytes:
assert len(mem) == 1, "read_exfault_nogil: only [1] mem is supported for now" assert len(mem) == 1, "_read_exfault: only [1] mem is supported for now"
cdef unsigned char b cdef unsigned char b
global faulted global faulted
cdef cbool faulted_ cdef cbool faulted_
...@@ -97,8 +105,12 @@ def read_exfault_nogil(const unsigned char[::1] mem not None) -> bytes: ...@@ -97,8 +105,12 @@ def read_exfault_nogil(const unsigned char[::1] mem not None) -> bytes:
faulted = False faulted = False
try: try:
if withgil:
#with gil: (cython complains "Trying to acquire the GIL while it is already held.")
b = __read_exfault(&mem[0])
else:
with nogil: with nogil:
b = _read_exfault(&mem[0]) b = __read_exfault(&mem[0])
finally: finally:
faulted_ = faulted faulted_ = faulted
with nogil: with nogil:
...@@ -114,7 +126,7 @@ cdef void exfaultSighand(int sig) nogil: ...@@ -114,7 +126,7 @@ cdef void exfaultSighand(int sig) nogil:
faulted = True faulted = True
siglongjmp(exfaultJmp, 1) siglongjmp(exfaultJmp, 1)
cdef unsigned char _read_exfault(const unsigned char *p) nogil except +topyexc: cdef unsigned char __read_exfault(const unsigned char *p) nogil except +topyexc:
global faulted global faulted
cdef sigaction_t act, saveact cdef sigaction_t act, saveact
......
// Copyright (C) 2018-2021 Nexedi SA and Contributors. // Copyright (C) 2018-2022 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com> // Kirill Smelkov <kirr@nexedi.com>
// //
// This program is free software: you can Use, Study, Modify and Redistribute // This program is free software: you can Use, Study, Modify and Redistribute
...@@ -868,6 +868,13 @@ retry: ...@@ -868,6 +868,13 @@ retry:
log.Infof("\n\n") log.Infof("\n\n")
} }
// put zhead's transaction into ctx because we will potentially need to
// access ZODB when processing invalidations.
// TODO better ctx = transaction.PutIntoContext(ctx, txn)
ctx0 := ctx
ctx, cancel := xcontext.Merge(ctx0, zhead.TxnCtx)
defer cancel()
// invalidate kernel cache for file data // invalidate kernel cache for file data
wg := xsync.NewWorkGroup(ctx) wg := xsync.NewWorkGroup(ctx)
for foid, δfile := range δF.ByFile { for foid, δfile := range δF.ByFile {
...@@ -920,12 +927,14 @@ retry: ...@@ -920,12 +927,14 @@ retry:
// 1. abort old and resync to new txn/at // 1. abort old and resync to new txn/at
transaction.Current(zhead.TxnCtx).Abort() transaction.Current(zhead.TxnCtx).Abort()
_, ctx = transaction.New(context.Background()) _, txnCtx := transaction.New(context.Background())
ctx, cancel = xcontext.Merge(ctx0, txnCtx) // TODO better transaction.PutIntoContext
defer cancel()
err = zhead.Resync(ctx, δZ.Tid) err = zhead.Resync(ctx, δZ.Tid)
if err != nil { if err != nil {
return err return err
} }
zhead.TxnCtx = ctx zhead.TxnCtx = txnCtx
// 2. restat invalidated ZBigFile // 2. restat invalidated ZBigFile
// NOTE no lock needed since .blksize and .size are constant during lifetime of one txn. // NOTE no lock needed since .blksize and .size are constant during lifetime of one txn.
......
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# Copyright (C) 2018-2021 Nexedi SA and Contributors. # Copyright (C) 2018-2022 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com> # Kirill Smelkov <kirr@nexedi.com>
# #
# This program is free software: you can Use, Study, Modify and Redistribute # This program is free software: you can Use, Study, Modify and Redistribute
...@@ -395,7 +395,7 @@ class tWCFS(_tWCFS): ...@@ -395,7 +395,7 @@ class tWCFS(_tWCFS):
class tDB(tWCFS): class tDB(tWCFS):
@func @func
def __init__(t): def __init__(t, _with_old_data=False):
t.root = testdb.dbopen() t.root = testdb.dbopen()
def _(): # close/unlock db if __init__ fails def _(): # close/unlock db if __init__ fails
exc = sys.exc_info()[1] exc = sys.exc_info()[1]
...@@ -432,9 +432,15 @@ class tDB(tWCFS): ...@@ -432,9 +432,15 @@ class tDB(tWCFS):
t._maintid = gettid() t._maintid = gettid()
# prepare initial objects for test: zfile, nonzfile # prepare initial objects for test: zfile, nonzfile
if not _with_old_data:
t.root['!file'] = t.nonzfile = Persistent() t.root['!file'] = t.nonzfile = Persistent()
t.root['zfile'] = t.zfile = ZBigFile(blksize) t.root['zfile'] = t.zfile = ZBigFile(blksize)
t.at0 = t.commit() t.at0 = t.commit()
else:
t.at0 = tAt(t, t.tail)
t.nonzfile = t.root['!file']
t.zfile = t.root['zfile']
@property @property
def head(t): def head(t):
...@@ -1318,6 +1324,37 @@ def test_wcfs_basic_read_aftertail(): ...@@ -1318,6 +1324,37 @@ def test_wcfs_basic_read_aftertail():
assert _(100*blksize) == b'' assert _(100*blksize) == b''
# verify that wcfs does not panic with "no current transaction" when processing
# invalidations if it needs to access ZODB during handleδZ.
@func
def test_wcfs_basic_invalidation_wo_dFtail_coverage():
# prepare initial data with single change to zfile[0].
@func
def _():
t = tDB(); zf = t.zfile
defer(t.close)
t.commit(zf, {0:'a'})
_()
# start wcfs with ΔFtail/ΔBtail not covering that initial change.
t = tDB(_with_old_data=True); zf = t.zfile
defer(t.close)
f = t.open(zf)
t.commit(zf, {1:'b1'}) # arbitrary commit to non-0 blk
f._assertBlk(0, 'a') # [0] becomes tracked (don't verify against computed dataok due to _with_old_data)
# wcfs was crashing on processing further invalidation to blk 0 because
# - ΔBtail.GetAt([0], head) returns valueExact=false, and so
# - ΔFtail.BlkRevAt activates "access ZODB" codepath,
# - but handleδZ was calling ΔFtail.BlkRevAt without properly putting zhead's transaction into ctx.
# -> panic.
t.commit(zf, {0:'a2'})
# just in case
f.assertBlk(0, 'a2')
# ---- verify wcfs functionality that depends on isolation protocol ---- # ---- verify wcfs functionality that depends on isolation protocol ----
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment