Commit af422610 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 1e5a24b7
......@@ -1961,20 +1961,38 @@ func (wlink *WatchLink) sendReq(ctx context.Context, req string) (reply string,
stream = atomic.AddUint64(&wlink.reqNext, +2)
}
rxq := make(chan string, 1) // cap=1 not to block _serve if we return canceled
rxq := make(chan string, 1)
wlink.rxMu.Lock()
wlink.rxTab[stream] = rxq // XXX assert .stream is not there?
_, already := wlink.rxTab[stream]
if !already {
wlink.rxTab[stream] = rxq
}
wlink.rxMu.Unlock()
if already {
panic("BUG: to-be-sent stream is present in rxtab")
}
defer func() {
if err != nil {
// remove rxq from rxTab
// ( _serve could have already deleted it if unexpected
// reply came to the stream, but no other rxq should
// have registered on the [stream] slot )
wlink.rxMu.Lock()
delete(wlink.rxTab, stream)
wlink.rxMu.Unlock()
// no need to drain rxq - it was created with cap=1
}
}()
err = wlink.send(ctx, stream, req)
if err != nil {
// XXX del rxTab[stream]
return "", err
}
select {
case <-ctx.Done():
// XXX del rxTab[stream] ?
return "", ctx.Err()
case reply = <-rxq:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment