Commit 737033df authored by Kirill Smelkov's avatar Kirill Smelkov

wcfs: Rework WatchLink.serve to rely on context cancellation to stop reading

Previously we were using .sk.CloseRead() to interrupt sk.Read(), but
that is not necessary since .sk, relying on xio.Pipe, implements
xio.Reader natively with full support for cancellation.

The original code to cancel via CloseRead comes from mid 2019 and predates

go123@7ad867a3
go123@0e368363
go123@0bdac628
go123@9db4dfac
go123@d2dc6c09

And in b17aeb8c and
6f0cdaff (wcfs: Provide isolation to clients), it seems, I missed to
update WatchLink.serve code to that.

Do that now because it simplifies code flow organization a bit.
parent ba59ea62
......@@ -1887,8 +1887,6 @@ func (wlink *WatchLink) _serve() (err error) {
ctx, cancel := context.WithCancel(ctx0)
wg := xsync.NewWorkGroup(ctx)
r := bufio.NewReader(xio.BindCtxR(wlink.sk, ctx))
defer func() {
// cancel all handlers on both error and ok return.
// ( ok return is e.g. when we received "bye", so if client
......@@ -1916,40 +1914,34 @@ func (wlink *WatchLink) _serve() (err error) {
_ = wlink.send(ctx0, 0, fmt.Sprintf("error: %s", err))
}
// close .sk.tx : this wakes up rx on client side.
err2 = wlink.sk.CloseWrite()
// close .sk
// closing .sk.tx wakes up rx on client side.
err2 = wlink.sk.Close()
if err == nil {
err = err2
}
}()
// close .sk.rx on error/wcfs stopping or return: this wakes up read(sk).
retq := make(chan struct{})
defer close(retq)
// cancel main thread on any watch handler error
ctx, mainCancel := context.WithCancel(ctx)
defer mainCancel()
wg.Go(func(ctx context.Context) error {
// monitor is always canceled - either at parent ctx cancel, or
// upon return from serve (see "cancel all handlers ..." ^^^).
// If it was return - report returned error to wg.Wait, not "canceled".
// monitor is always canceled - either due to parent ctx cancel, error in workgroup,
// or return from serve and running "cancel all handlers ..." above
<-ctx.Done()
e := ctx.Err()
select {
default:
case <-retq:
e = err // returned error
}
e2 := wlink.sk.CloseRead()
if e == nil {
e = e2
}
return e
mainCancel()
return nil
})
r := bufio.NewReader(xio.BindCtxR(wlink.sk, ctx))
for {
// NOTE r.Read is woken up by ctx cancel because wlink.sk implements xio.Reader natively
l, err := r.ReadString('\n') // TODO limit accepted line len to prevent DOS
if err != nil {
// r.Read is woken up by sk.CloseRead when serve decides to exit
if err == io.ErrClosedPipe || err == io.EOF {
if err == io.EOF {
err = io.ErrUnexpectedEOF
}
if errors.Is(err, ctx.Err()) {
err = nil
}
return err
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment