Commit 487daf51 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent ff4d2f6b
...@@ -1458,6 +1458,8 @@ func (wlink *WatchLink) _serveRX() (err error) { ...@@ -1458,6 +1458,8 @@ func (wlink *WatchLink) _serveRX() (err error) {
defer xerr.Contextf(&err, "wlink %d: serve rx", wlink.id) defer xerr.Contextf(&err, "wlink %d: serve rx", wlink.id)
r := bufio.NewReader(wlink.sk) r := bufio.NewReader(wlink.sk)
// XXX close .sk on error/wcfs stopping
// XXX write to peer if it was logical error on client side // XXX write to peer if it was logical error on client side
// XXX on which stream? -1? // XXX on which stream? -1?
...@@ -1490,30 +1492,43 @@ func (wlink *WatchLink) _serveRX() (err error) { ...@@ -1490,30 +1492,43 @@ func (wlink *WatchLink) _serveRX() (err error) {
} }
// client-initiated watch request // client-initiated watch request
foid, at, err := parseWatch(msg) err = wlink.handleSetupWatch(stream, msg)
if err != nil { if err != nil {
return fmt.Errorf("%d: %s", stream, err) panic(err) // XXX
} }
}
}
ctx := context.TODO() // XXX ctx = ? // handleSetupWatch handles watch request from client.
//
// returned error comes without full error prefix.
func (wlink *WatchLink) handleSetupWatch(stream uint64, msg string) (err error) {
defer xerr.Contextf(&err, "%d", stream)
fmt.Printf("S: watch: AAA\n") foid, at, err := parseWatch(msg)
if err != nil {
return err
}
err = wlink.setupWatch(ctx, foid, at) ctx := context.TODO() // XXX ctx = ?
if err != nil {
fmt.Printf("S: watch: QQQ: %s\n", err)
return fmt.Errorf("%d: %s", stream, err)
}
fmt.Printf("S: watch: BBB\n") fmt.Printf("S: watch: AAA\n")
err = wlink.send(ctx, stream, "ok") err = wlink.setupWatch(ctx, foid, at)
if err != nil { if err != nil {
panic(err) // XXX fmt.Printf("S: watch: QQQ: %s\n", err)
} return err
}
fmt.Printf("S: watch: CCC\n") fmt.Printf("S: watch: BBB\n")
err = wlink.send(ctx, stream, "ok")
if err != nil {
return err
} }
fmt.Printf("S: watch: CCC\n")
return nil
} }
// sendReq sends wcfs-originated request to client and returns client response. // sendReq sends wcfs-originated request to client and returns client response.
...@@ -1533,6 +1548,7 @@ func (wlink *WatchLink) sendReq(ctx context.Context, req string) (reply string, ...@@ -1533,6 +1548,7 @@ func (wlink *WatchLink) sendReq(ctx context.Context, req string) (reply string,
select { select {
case <-ctx.Done(): case <-ctx.Done():
// XXX del rxTab[stream] ?
return "", ctx.Err() return "", ctx.Err()
case reply = <-rxq: case reply = <-rxq:
......
...@@ -443,7 +443,7 @@ class tWatch: ...@@ -443,7 +443,7 @@ class tWatch:
with t._txmu: with t._txmu:
pkt = b"%d %s\n" % (stream, msg) pkt = b"%d %s\n" % (stream, msg)
print('C: watch: tx: %r' % pkt) print('C: watch: tx: %r' % pkt)
t.wtx.write(pkt) # XXX read/write don't work in parallel? t.wtx.write(pkt)
t.wtx.flush() t.wtx.flush()
# sendReq sends client -> server request and returns server reply. # sendReq sends client -> server request and returns server reply.
...@@ -674,6 +674,7 @@ def test_wcfs(): ...@@ -674,6 +674,7 @@ def test_wcfs():
# XXX read file[blk]=hole; then file[blk]=zblk - must be invalidated and # XXX read file[blk]=hole; then file[blk]=zblk - must be invalidated and
# setupWatch must send pins. # setupWatch must send pins.
# XXX watch @at when file did not existed
def test_wcfs_invproto(): def test_wcfs_invproto():
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment