Commit 6d940c61 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 4af54da9
...@@ -405,7 +405,7 @@ package main ...@@ -405,7 +405,7 @@ package main
// transaction is maintained. For zhead, every time it is resynced (see "5") // transaction is maintained. For zhead, every time it is resynced (see "5")
// the transaction associated with zhead is renewed. // the transaction associated with zhead is renewed.
// //
// XXX 10) gc @rev/ and @rev/bigfile/<bigfileX> automatically on atime timeout // TODO 10) gc @rev/ and @rev/bigfile/<bigfileX> automatically on atime timeout
// //
// //
// (*) see notes.txt -> "Notes on OS pagecache control" // (*) see notes.txt -> "Notes on OS pagecache control"
...@@ -1428,7 +1428,7 @@ func (w *Watch) _pin(ctx context.Context, blk int64, rev zodb.Tid) (err error) { ...@@ -1428,7 +1428,7 @@ func (w *Watch) _pin(ctx context.Context, blk int64, rev zodb.Tid) (err error) {
} }
// relock the watch and check that w.pinned[blk] is the same. Retry if it is not. // relock the watch and check that w.pinned[blk] is the same. Retry if it is not.
// ( w.pinned[blk] could have changed while w.mu was not held e.g. by XXX recheck // ( w.pinned[blk] could have changed while w.mu was not held e.g. by
// simultaneous setupWatch if we were called by readPinWatchers ) // simultaneous setupWatch if we were called by readPinWatchers )
w.pinnedMu.Lock() w.pinnedMu.Lock()
if blkpin == w.pinned[blk] { if blkpin == w.pinned[blk] {
...@@ -1488,7 +1488,7 @@ func (w *Watch) _pin(ctx context.Context, blk int64, rev zodb.Tid) (err error) { ...@@ -1488,7 +1488,7 @@ func (w *Watch) _pin(ctx context.Context, blk int64, rev zodb.Tid) (err error) {
// Must be called with f.head.zheadMu rlocked. // Must be called with f.head.zheadMu rlocked.
// //
// XXX do we really need to use/propagate caller context here? ideally update // XXX do we really need to use/propagate caller context here? ideally update
// watchers should be synchronous, and in practice we just use 30s timeout. // watchers should be synchronous, and in practice we just use 30s timeout (TODO).
// Should a READ interrupt cause watch update failure? -> probably no // Should a READ interrupt cause watch update failure? -> probably no
func (f *BigFile) readPinWatchers(ctx context.Context, blk int64, blkrevMax zodb.Tid) (err error) { func (f *BigFile) readPinWatchers(ctx context.Context, blk int64, blkrevMax zodb.Tid) (err error) {
defer xerr.Context(&err, "pin watchers") // f.path and blk is already put into context by readBlk defer xerr.Context(&err, "pin watchers") // f.path and blk is already put into context by readBlk
...@@ -1571,8 +1571,6 @@ func (f *BigFile) readPinWatchers(ctx context.Context, blk int64, blkrevMax zodb ...@@ -1571,8 +1571,6 @@ func (f *BigFile) readPinWatchers(ctx context.Context, blk int64, blkrevMax zodb
// setupWatch sets up or updates a Watch when client sends `watch <file> @<at>` request. // setupWatch sets up or updates a Watch when client sends `watch <file> @<at>` request.
// //
// It sends "pin" notifications; final "ok" or "error" must be sent by caller. // It sends "pin" notifications; final "ok" or "error" must be sent by caller.
//
// XXX called synchronously - only 1 setupWatch call at a time?
func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.Tid) (err error) { func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.Tid) (err error) {
defer xerr.Contextf(&err, "setup watch f<%s> @%s", foid, at) defer xerr.Contextf(&err, "setup watch f<%s> @%s", foid, at)
head := wlink.head head := wlink.head
...@@ -1882,8 +1880,6 @@ func (wlink *WatchLink) _serve() (err error) { ...@@ -1882,8 +1880,6 @@ func (wlink *WatchLink) _serve() (err error) {
return e return e
}) })
// XXX recheck that it is safe to handle multiple simultaneous watch requests.
for { for {
l, err := r.ReadString('\n') // TODO limit accepted line len to prevent DOS l, err := r.ReadString('\n') // TODO limit accepted line len to prevent DOS
if err != nil { if err != nil {
...@@ -1917,7 +1913,7 @@ func (wlink *WatchLink) _serve() (err error) { ...@@ -1917,7 +1913,7 @@ func (wlink *WatchLink) _serve() (err error) {
// client-initiated request // client-initiated request
// bye TODO document in "Isolation protocol" // bye
if msg == "bye" { if msg == "bye" {
return nil // deferred sk.Close will wake-up rx on client side return nil // deferred sk.Close will wake-up rx on client side
} }
...@@ -1966,13 +1962,14 @@ func (wlink *WatchLink) sendReq(ctx context.Context, req string) (reply string, ...@@ -1966,13 +1962,14 @@ func (wlink *WatchLink) sendReq(ctx context.Context, req string) (reply string,
stream = atomic.AddUint64(&wlink.reqNext, +2) stream = atomic.AddUint64(&wlink.reqNext, +2)
} }
rxq := make(chan string) // XXX cap=1? (so that if we return canceled we do not block client) rxq := make(chan string, 1) // cap=1 not to block _serve if we return canceled
wlink.rxMu.Lock() wlink.rxMu.Lock()
wlink.rxTab[stream] = rxq // XXX assert .stream is not there? wlink.rxTab[stream] = rxq // XXX assert .stream is not there?
wlink.rxMu.Unlock() wlink.rxMu.Unlock()
err = wlink.send(ctx, stream, req) err = wlink.send(ctx, stream, req)
if err != nil { if err != nil {
// XXX del rxTab[stream]
return "", err return "", err
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment