Commit 87b663ec authored by Kirill Smelkov's avatar Kirill Smelkov

wcfs: Shutdown WatchLink on any pin error

If a pin misbehaves or there is IO error or anything else, we want to
stop all communication on the watchlink, cancel on in-flight pin
handlers, and (TODO) kill the client with SIGBUS.

This patch organizes WatchLink shutdown on any pin error.
This functionality is indirectly tested by test_Wcfs_watch_robust and
will be also indirectly tested by faultyprotection tests.

It would be good to have dedicated tests probably, but that is,
hopefully, TODO.
parent 4d36c22c
...@@ -684,6 +684,9 @@ type WatchLink struct { ...@@ -684,6 +684,9 @@ type WatchLink struct {
// serve operates under .serveCtx and can be requested to stop via serveCancel // serve operates under .serveCtx and can be requested to stop via serveCancel
serveCtx context.Context serveCtx context.Context
serveCancel context.CancelFunc serveCancel context.CancelFunc
down1 sync.Once
down chan struct{} // ready after shutdown completes
pinWG sync.WaitGroup // all pin handlers are accounted here
} }
// Watch represents watching for changes to 1 BigFile over particular watch link. // Watch represents watching for changes to 1 BigFile over particular watch link.
...@@ -1439,19 +1442,38 @@ func traceIso(format string, argv ...interface{}) { ...@@ -1439,19 +1442,38 @@ func traceIso(format string, argv ...interface{}) {
// specified context because pinning is critical operation whose failure will lead // specified context because pinning is critical operation whose failure will lead
// to client being SIGBUS'ed and so pinning should not be interrupted arbitrarily. // to client being SIGBUS'ed and so pinning should not be interrupted arbitrarily.
// //
// pin is invoked by BigFile.readPinWatchers . It is called with atMu rlocked. // Corresponding watchlink is shutdown on any error.
//
// No error is returned as currently pin handles all errors itself inside, and
// in the future the only error that pin will not be able to handle itself inside
// will be considered to be fatal and the filesystem will be switched to EIO mode on that.
// //
// TODO close watch on any error // pin is invoked by BigFile.readPinWatchers . It is called with atMu rlocked.
func (w *Watch) pin(blk int64, rev zodb.Tid) (err error) { func (w *Watch) pin(blk int64, rev zodb.Tid) {
defer xerr.Contextf(&err, "wlink%d: f<%s>", w.link.id, w.file.zfile.POid()) w._pin(w.link.serveCtx, blk, rev)
return w._pin(w.link.serveCtx, blk, rev)
} }
// _pin serves pin and is also invoked directly by WatchLink.setupWatch . // _pin serves pin and is also invoked directly by WatchLink.setupWatch .
// //
// It is invoked with ctx being either WatchLink.serveCtx or descendant of it. // It is invoked with ctx being either WatchLink.serveCtx or descendant of it.
// In all cases it is called with atMu rlocked. // In all cases it is called with atMu rlocked.
func (w *Watch) _pin(ctx context.Context, blk int64, rev zodb.Tid) (err error) { func (w *Watch) _pin(ctx context.Context, blk int64, rev zodb.Tid) {
if ctx.Err() != nil {
return // don't enter pinWG if watchlink is down
}
w.link.pinWG.Add(1)
defer w.link.pinWG.Done()
ctx, cancel := context.WithTimeout(ctx, groot.pinTimeout)
defer cancel()
err := w.__pin(ctx, blk, rev)
if err != nil {
w.link.shutdown(err)
}
}
func (w *Watch) __pin(ctx context.Context, blk int64, rev zodb.Tid) (err error) {
foid := w.file.zfile.POid() foid := w.file.zfile.POid()
revstr := rev.String() revstr := rev.String()
if rev == zodb.TidMax { if rev == zodb.TidMax {
...@@ -1623,9 +1645,8 @@ func (f *BigFile) readPinWatchers(ctx context.Context, blk int64, blkrevMax zodb ...@@ -1623,9 +1645,8 @@ func (f *BigFile) readPinWatchers(ctx context.Context, blk int64, blkrevMax zodb
// NOTE we do not propagate context to pin. Ideally update // NOTE we do not propagate context to pin. Ideally update
// watchers should be synchronous, and in practice we just use 30s timeout (TODO). // watchers should be synchronous, and in practice we just use 30s timeout (TODO).
// A READ interrupt should not cause watch update failure. // A READ interrupt should not cause watch update failure.
// w.pin(blk, pinrev) // only fatal error
// TODO close watcher on any error return nil
return w.pin(blk, pinrev)
}) })
} }
...@@ -1845,12 +1866,13 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T ...@@ -1845,12 +1866,13 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T
blk := blk blk := blk
rev := rev rev := rev
wg.Go(func(ctx context.Context) error { wg.Go(func(ctx context.Context) error {
return w._pin(ctx, blk, rev) w._pin(ctx, blk, rev) // only fatal error
return nil
}) })
} }
err = wg.Wait() err = wg.Wait()
if err != nil { if err != nil {
return err return err // should not fail
} }
return nil return nil
...@@ -1870,6 +1892,7 @@ func (wnode *WatchNode) Open(flags uint32, fctx *fuse.Context) (nodefs.File, fus ...@@ -1870,6 +1892,7 @@ func (wnode *WatchNode) Open(flags uint32, fctx *fuse.Context) (nodefs.File, fus
rxTab: make(map[uint64]chan string), rxTab: make(map[uint64]chan string),
serveCtx: serveCtx, serveCtx: serveCtx,
serveCancel: serveCancel, serveCancel: serveCancel,
down: make(chan struct{}),
} }
head.wlinkMu.Lock() head.wlinkMu.Lock()
...@@ -1880,6 +1903,36 @@ func (wnode *WatchNode) Open(flags uint32, fctx *fuse.Context) (nodefs.File, fus ...@@ -1880,6 +1903,36 @@ func (wnode *WatchNode) Open(flags uint32, fctx *fuse.Context) (nodefs.File, fus
return wlink.sk.File(), fuse.OK return wlink.sk.File(), fuse.OK
} }
// shutdown shuts down communication over watchlink due to specified reason and
// marks the watchlink as no longer active.
//
// Only the first shutdown call has the effect, but all calls wait for the
// actual shutdown to complete.
//
// NOTE shutdown can be invoked under atMu.R from pin.
func (wlink *WatchLink) shutdown(reason error) {
wlink.down1.Do(func() {
// mark wlink as down; this signals serve loop to exit and cancels all in-progress pins
wlink.serveCancel()
// give client a chance to be notified if shutdown was due to some logical error
if reason != nil {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
_ = wlink.send(ctx, 0, fmt.Sprintf("error: %s", reason))
}
// NOTE unregistering watches and wlink itself is done on serve exit, not
// here, to avoid AB-BA deadlock on atMu and e.g. WatchLink.byfileMu .
// It is ok to leave some watches still present in BigFile.watchTab
// until final cleanup because pin becomes noop on down watchlink.
close(wlink.down)
})
<-wlink.down
}
// serve serves client initiated watch requests and routes client replies to // serve serves client initiated watch requests and routes client replies to
// wcfs initiated pin requests. // wcfs initiated pin requests.
func (wlink *WatchLink) serve(ctx context.Context) { func (wlink *WatchLink) serve(ctx context.Context) {
...@@ -1910,13 +1963,6 @@ func (wlink *WatchLink) _serve(ctx context.Context) (err error) { ...@@ -1910,13 +1963,6 @@ func (wlink *WatchLink) _serve(ctx context.Context) (err error) {
delete(head.wlinkTab, wlink) delete(head.wlinkTab, wlink)
head.wlinkMu.Unlock() head.wlinkMu.Unlock()
// give client a chance to be notified if it was due to some logical error
if err != nil {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
_ = wlink.send(ctx, 0, fmt.Sprintf("error: %s", err))
}
// close .sk // close .sk
// closing .sk.tx wakes up rx on client side. // closing .sk.tx wakes up rx on client side.
err2 := wlink.sk.Close() err2 := wlink.sk.Close()
...@@ -1928,15 +1974,28 @@ func (wlink *WatchLink) _serve(ctx context.Context) (err error) { ...@@ -1928,15 +1974,28 @@ func (wlink *WatchLink) _serve(ctx context.Context) (err error) {
// watch handlers are spawned in dedicated workgroup // watch handlers are spawned in dedicated workgroup
// //
// Pin handlers are run either inside - for pins run from setupWatch, or, // Pin handlers are run either inside - for pins run from setupWatch, or,
// for pins run from readPinWatchers, outside. // for pins run from readPinWatchers, under wlink.pinWG.
// Upon serve exit we cancel watch and pin handlers ran inside and wait for their completion. // Upon serve exit we cancel them all and wait for their completion.
wg := xsync.NewWorkGroup(ctx) wg := xsync.NewWorkGroup(ctx)
defer func() { defer func() {
// cancel all handlers on both error and ok return. // cancel all watch and pin handlers on both error and ok return.
// ( ok return is e.g. when we received "bye", so if client //
// sends "bye" and some pin handlers are in progress - they // For ok return, when we received "bye", we want to cancel
// anyway don't need to wait for client replies anymore ) // in-progress pin handlers without killing clients. That's why
wlink.serveCancel() // we call shutdown ourselves.
//
// For error return, we want any in-progress, and so will
// become failed, pin handler to result in corresponding client
// to become killed (TODO). That's why we trigger only cancel
// ourselves and let failed pin handlers to invoke shutdown
// with their specific reason.
//
// NOTE this affects pin handlers invoked by both setupWatch and readPinWatchers.
if err != nil {
wlink.serveCancel()
} else {
wlink.shutdown(nil)
}
// wait for setupWatch and pin handlers spawned from it to complete // wait for setupWatch and pin handlers spawned from it to complete
err2 := wg.Wait() err2 := wg.Wait()
...@@ -1944,6 +2003,12 @@ func (wlink *WatchLink) _serve(ctx context.Context) (err error) { ...@@ -1944,6 +2003,12 @@ func (wlink *WatchLink) _serve(ctx context.Context) (err error) {
err = err2 err = err2
} }
// wait for all other pin handlers to complete
wlink.pinWG.Wait()
// make sure that shutdown is actually invoked if it was an
// error and there were no in-progress pin handlers
wlink.shutdown(err)
}() }()
// cancel main thread on any watch handler error // cancel main thread on any watch handler error
...@@ -1951,7 +2016,7 @@ func (wlink *WatchLink) _serve(ctx context.Context) (err error) { ...@@ -1951,7 +2016,7 @@ func (wlink *WatchLink) _serve(ctx context.Context) (err error) {
defer mainCancel() defer mainCancel()
wg.Go(func(ctx context.Context) error { wg.Go(func(ctx context.Context) error {
// monitor is always canceled - either due to parent ctx cancel, error in workgroup, // monitor is always canceled - either due to parent ctx cancel, error in workgroup,
// or return from serve and running "cancel all handlers ..." above // or return from serve and running "cancel all watch handlers ..." above.
<-ctx.Done() <-ctx.Done()
mainCancel() mainCancel()
return nil return nil
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment