Commit 4d36c22c authored by Kirill Smelkov's avatar Kirill Smelkov

wcfs: Do not cancel pin handler by a READ interrupt

Pinning is critical operation whose failure will soon lead to client
being killed with SIGBUS. WCFS correctness also depend fundamentally on
pin operation, if started, to be handled by the client.

-> rework the READ handler not to cancel pin if a READ interrupt comes
   in from the OS client.

Do this via organizing WatchLink.serveCtx and running pins under this
context instead of under READ context. Later we will adjust pins to also
cancel this context on any error.

Test is, hopefully, TODO.
parent 256d6dac
......@@ -680,6 +680,10 @@ type WatchLink struct {
txMu sync.Mutex
rxMu sync.Mutex
rxTab map[/*stream*/uint64]chan string // client replies go via here
// serve operates under .serveCtx and can be requested to stop via serveCancel
serveCtx context.Context
serveCancel context.CancelFunc
}
// Watch represents watching for changes to 1 BigFile over particular watch link.
......@@ -1299,7 +1303,6 @@ func (f *BigFile) readBlk(ctx context.Context, blk int64, dest []byte) (err erro
δFtail.Track(f.zfile, blk, treepath, blkcov, zblk)
// we have the data - it can be used after watchers are updated
// XXX should we use ctx here? (see readPinWatchers comments)
err = f.readPinWatchers(ctx, blk, blkrevMax)
if err != nil {
blkdata = nil
......@@ -1432,14 +1435,22 @@ func traceIso(format string, argv ...interface{}) {
// rev = zodb.TidMax means @head; otherwise rev must be ≤ w.at and there must
// be no rev_next changing file[blk]: rev < rev_next ≤ w.at.
//
// must be called with atMu rlocked.
// Pinning works under WatchLink.serveCtx instead of explicitly
// specified context because pinning is critical operation whose failure will lead
// to client being SIGBUS'ed and so pinning should not be interrupted arbitrarily.
//
// pin is invoked by BigFile.readPinWatchers . It is called with atMu rlocked.
//
// TODO close watch on any error
func (w *Watch) pin(ctx context.Context, blk int64, rev zodb.Tid) (err error) {
func (w *Watch) pin(blk int64, rev zodb.Tid) (err error) {
defer xerr.Contextf(&err, "wlink%d: f<%s>", w.link.id, w.file.zfile.POid())
return w._pin(ctx, blk, rev)
return w._pin(w.link.serveCtx, blk, rev)
}
// _pin serves pin and is also invoked directly by WatchLink.setupWatch .
//
// It is invoked with ctx being either WatchLink.serveCtx or descendant of it.
// In all cases it is called with atMu rlocked.
func (w *Watch) _pin(ctx context.Context, blk int64, rev zodb.Tid) (err error) {
foid := w.file.zfile.POid()
revstr := rev.String()
......@@ -1532,10 +1543,6 @@ func (w *Watch) _pin(ctx context.Context, blk int64, rev zodb.Tid) (err error) {
//
// Must be called only for f under head/
// Must be called with f.head.zheadMu rlocked.
//
// XXX do we really need to use/propagate caller context here? ideally update
// watchers should be synchronous, and in practice we just use 30s timeout (TODO).
// Should a READ interrupt cause watch update failure? -> probably no
func (f *BigFile) readPinWatchers(ctx context.Context, blk int64, blkrevMax zodb.Tid) (err error) {
defer xerr.Context(&err, "pin watchers") // f.path and blk is already put into context by readBlk
......@@ -1612,8 +1619,13 @@ func (f *BigFile) readPinWatchers(ctx context.Context, blk int64, blkrevMax zodb
return err
}
//fmt.Printf("S: read #%d: watch @%s: pin -> @%s\n", blk, w.at, pinrev)
// NOTE we do not propagate context to pin. Ideally update
// watchers should be synchronous, and in practice we just use 30s timeout (TODO).
// A READ interrupt should not cause watch update failure.
//
// TODO close watcher on any error
return w.pin(ctx, blk, pinrev)
return w.pin(blk, pinrev)
})
}
......@@ -1849,38 +1861,37 @@ func (wnode *WatchNode) Open(flags uint32, fctx *fuse.Context) (nodefs.File, fus
// TODO(?) check flags
head := wnode.head
serveCtx, serveCancel := context.WithCancel(context.TODO() /*TODO ctx of wcfs running*/)
wlink := &WatchLink{
sk: NewFileSock(),
id: atomic.AddInt32(&wnode.idNext, +1),
head: head,
byfile: make(map[zodb.Oid]*Watch),
rxTab: make(map[uint64]chan string),
sk: NewFileSock(),
id: atomic.AddInt32(&wnode.idNext, +1),
head: head,
byfile: make(map[zodb.Oid]*Watch),
rxTab: make(map[uint64]chan string),
serveCtx: serveCtx,
serveCancel: serveCancel,
}
head.wlinkMu.Lock()
head.wlinkTab[wlink] = struct{}{}
head.wlinkMu.Unlock()
go wlink.serve()
go wlink.serve(serveCtx)
return wlink.sk.File(), fuse.OK
}
// serve serves client initiated watch requests and routes client replies to
// wcfs initiated pin requests.
func (wlink *WatchLink) serve() {
err := wlink._serve()
func (wlink *WatchLink) serve(ctx context.Context) {
err := wlink._serve(ctx)
if err != nil {
log.Error(err)
}
}
func (wlink *WatchLink) _serve() (err error) {
func (wlink *WatchLink) _serve(ctx context.Context) (err error) {
defer xerr.Contextf(&err, "wlink %d: serve rx", wlink.id)
ctx0 := context.TODO() // TODO ctx = merge(ctx of wcfs running, ctx of wlink timeout)
ctx, cancel := context.WithCancel(ctx0)
// final watchlink cleanup is done on serve exit
defer func() {
// unregister all watches created on this wlink
......@@ -1925,7 +1936,7 @@ func (wlink *WatchLink) _serve() (err error) {
// ( ok return is e.g. when we received "bye", so if client
// sends "bye" and some pin handlers are in progress - they
// anyway don't need to wait for client replies anymore )
cancel()
wlink.serveCancel()
// wait for setupWatch and pin handlers spawned from it to complete
err2 := wg.Wait()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment