Commit 9544fc02 authored by Levin Zimmermann's avatar Levin Zimmermann

wcfs: Fix protection against faulty client

The WCFS documentation specifies [1]:

- - - 8> - - - 8> - - -

If a client, on purpose or due to a bug or being stopped, is slow to respond
with ack to file invalidation notification, it creates a problem because the
server will become blocked waiting for pin acknowledgments, and thus all
other clients, that try to work with the same file, will get stuck.

[...]

Lacking OS primitives to change address space of another process and not
being able to work it around with ptrace in userspace, wcfs takes approach
to kill a slow client on 30 seconds timeout by default.

- - - <8 - - - <8 - - -

But before this patch, this protection wasn't implemented yet: one
faulty client could therefore freeze the whole system. With this patch
this protection is implemented now: faulty clients are killed after the
timeout.

[1] https://lab.nexedi.com/nexedi/wendelin.core/blob/38dde766/wcfs/wcfs.go#L186-208
parent c5a66254
......@@ -29,6 +29,7 @@ import (
"strings"
"sync/atomic"
"syscall"
"time"
log "github.com/golang/glog"
......@@ -515,3 +516,22 @@ func (root *Root) StatFs() *fuse.StatfsOut {
func panicf(format string, argv ...interface{}) {
panic(fmt.Sprintf(format, argv...))
}
// isProcessAlive returns 'true' if process is still alive after timeout
// and 'false' otherwise
func isProcessAlive(pid int, timeout time.Duration) bool {
for start := time.Now(); time.Since(start) < timeout; {
alive := _isProcessAlive(pid)
if !alive {
return false
}
}
return true
}
// _isProcessAlive returns 'true' if process is currently present
// and 'false' otherwise
func _isProcessAlive(pid int) bool {
killErr := syscall.Kill(pid, syscall.Signal(0))
return killErr == nil
}
......@@ -1487,6 +1487,30 @@ func (w *Watch) _pin(ctx context.Context, blk int64, rev zodb.Tid) (err error) {
if err != nil {
// If timed out, we kill the client:
// SIGBUS => wait for some time; if still alive => SIGKILL
if errors.Is(err, ErrTimedOut) {
pid := int(w.link.caller.Pid)
// TODO kirr: "The kernel then sends SIGBUS on such case with the details about
// access to which address generated this error going in si_addr field of
// siginfo structure. It would be good if we can mimic that behaviour to a
// reasonable extent if possible."
err := syscall.Kill(pid, syscall.Signal(syscall.SIGBUS))
if err != nil {
return err
}
if isProcessAlive(pid, time.Second * 1) {
err = syscall.Kill(pid, syscall.Signal(syscall.SIGKILL))
if err != nil {
return err
}
}
// We don't return an error, because 'readPinWatchers'
// should continue as if nothing would have happened if we
// timed out: the other clients should not be affected by
// one faulty client.
return nil
}
blkpin.err = err
return err
}
......@@ -1982,6 +2006,8 @@ func (wlink *WatchLink) _handleWatch(ctx context.Context, msg string) error {
return err
}
var ErrTimedOut error = errors.New("timed out")
// sendReq sends wcfs-originated request to client and returns client response.
func (wlink *WatchLink) sendReq(ctx context.Context, req string) (reply string, err error) {
defer xerr.Context(&err, "sendReq") // wlink is already put into ctx by caller
......@@ -2027,6 +2053,9 @@ func (wlink *WatchLink) sendReq(ctx context.Context, req string) (reply string,
case reply = <-rxq:
return reply, nil
case <-time.After(30 * time.Second):
return "", ErrTimedOut
}
}
......
......@@ -49,7 +49,6 @@ from resource import setrlimit, getrlimit, RLIMIT_MEMLOCK
from golang import go, chan, select, func, defer, error, b
from golang import context, errors, sync, time
from zodbtools.util import ashex as h, fromhex
import pytest; xfail = pytest.mark.xfail
from pytest import raises, fail
from wendelin.wcfs.internal import io, mm
from wendelin.wcfs.internal.wcfs_test import _tWCFS, read_exfault_nogil, SegmentationFault, install_sigbus_trap, fadvise_dontneed
......@@ -348,7 +347,7 @@ class DFile:
# TODO(?) print -> t.trace/debug() + t.verbose depending on py.test -v -v ?
class tWCFS(_tWCFS):
@func
def __init__(t):
def __init__(t, timeout=10*time.second):
assert not os.path.exists(testmntpt)
wc = wcfs.join(testzurl, autostart=True)
assert wc.mountpoint == testmntpt
......@@ -364,7 +363,7 @@ class tWCFS(_tWCFS):
# still wait for request completion even after fatal signal )
nogilready = chan(dtype='C.structZ')
t._wcfuseabort = os.dup(wc._wcsrv._fuseabort.fileno())
go(t._abort_ontimeout, t._wcfuseabort, 10*time.second, nogilready) # NOTE must be: with_timeout << · << wcfs_pin_timeout
go(t._abort_ontimeout, t._wcfuseabort, timeout, nogilready) # NOTE must be: with_timeout << · << wcfs_pin_timeout
nogilready.recv() # wait till _abort_ontimeout enters nogil
# _abort_ontimeout is in wcfs_test.pyx
......@@ -402,7 +401,7 @@ class tDB(tWCFS):
# create before wcfs startup. old_data is []changeDelta - see .commit
# and .change for details.
@func
def __init__(t, old_data=[]):
def __init__(t, old_data=[], **kwargs):
t.root = testdb.dbopen()
def _(): # close/unlock db if __init__ fails
exc = sys.exc_info()[1]
......@@ -432,7 +431,7 @@ class tDB(tWCFS):
t._commit(t.zfile, changeDelta)
# start wcfs after testdb is created and initial data is committed
super(tDB, t).__init__()
super(tDB, t).__init__(**kwargs)
# fh(.wcfs/zhead) + history of zhead read from there
t._wc_zheadfh = open(t.wc.mountpoint + "/.wcfs/zhead")
......@@ -1451,12 +1450,14 @@ def test_wcfs_watch_going_back():
# verify that wcfs kills slow/faulty client who does not reply to pin in time.
@xfail # protection against faulty/slow clients
# protection against faulty/slow clients
@func
def test_wcfs_pintimeout_kill():
# adjusted wcfs timeout to kill client who is stuck not providing pin reply
tkill = 3*time.second
t = tDB(); zf = t.zfile # XXX wcfs args += tkill=<small>
# timeout until killing is 30 seconds, we add 2 seconds delay for the actual
# killing/process shutdown.
tkill = 32*time.second
t = tDB(timeout=tkill*2); zf = t.zfile
defer(t.close)
at1 = t.commit(zf, {2:'c1'})
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment