Commit a135f32d authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 102b571e
...@@ -624,7 +624,18 @@ type Watch struct { ...@@ -624,7 +624,18 @@ type Watch struct {
mu sync.Mutex // XXX ok ? mu sync.Mutex // XXX ok ?
at zodb.Tid // requested to be watched @at at zodb.Tid // requested to be watched @at
pinned map[int64]zodb.Tid // {} blk -> rev blocks that are already pinned to be ≤ at pinned map[int64]*blkPinState // {} blk -> {... rev} blocks that are already pinned to be ≤ at
}
// blkPinState represents state/result of pinning one block.
//
// when !ready the pinning is in progress.
// when ready the pinning has been completed.
type blkPinState struct {
rev zodb.Tid // revision to which the block is being or has been pinned
ready chan struct{}
err error
} }
// -------- 3) Cache invariant -------- // -------- 3) Cache invariant --------
...@@ -1285,42 +1296,83 @@ func (w *Watch) _pin(ctx context.Context, blk int64, rev zodb.Tid) (err error) { ...@@ -1285,42 +1296,83 @@ func (w *Watch) _pin(ctx context.Context, blk int64, rev zodb.Tid) (err error) {
} }
defer xerr.Contextf(&err, "pin #%d @%s", blk, revstr) defer xerr.Contextf(&err, "pin #%d @%s", blk, revstr)
// XXX locking? w.mu.Lock()
// XXX simultaneous calls?
// XXX ignore vvv ? (w.at could be ↑ after precheck in read (XXX or setupBlk) // XXX ignore vvv ? (w.at could be ↑ after precheck in read (XXX or setupBlk)
if !(rev == zodb.TidMax || rev <= w.at) { if wat := w.at; !(rev == zodb.TidMax || rev <= wat) {
w.mu.Unlock()
panicf("f<%s>: wlink%d: pin #%d @%s: watch.at (%s) < rev", panicf("f<%s>: wlink%d: pin #%d @%s: watch.at (%s) < rev",
foid, w.link.id, blk, rev, w.at) foid, w.link.id, blk, rev, wat)
} }
if w.pinned[blk] == rev { // check/wait for previous/simultaneous pin.
if rev == zodb.TidMax { // (pin could be called simultaneously e.g. by setupWatch and readPinWatchers)
panicf("f<%s>: wlink%d: pinned[#%d] = @head", foid, w.link.id, blk) for {
blkpin := w.pinned[blk]
if blkpin == nil {
break
} }
w.mu.Unlock()
<-blkpin.ready // XXX + ctx ? (or just keep ready ?)
if blkpin.rev == rev {
// already pinned // already pinned
// (e.g. os cache for block was evicted and read called the second time) // (e.g. os cache for block was evicted and read called the second time)
return nil return blkpin.err
}
// relock the watch and check that w.pinned[blk] is the same. Retry if it is not.
// ( w.pinned[blk] could have changed while w.mu was not held e.g. by
// simultaneous setupWatch if we were called by readPinWatchers )
w.mu.Lock()
if blkpin == w.pinned[blk] {
if blkpin.rev == zodb.TidMax {
w.mu.Unlock()
panicf("f<%s>: wlink%d: pinned[#%d] = @head", foid, w.link.id, blk)
}
break
}
} }
// w.mu locked & previous pin is either nil or completed and its .rev != rev
// -> setup new pin state
blkpin := &blkPinState{rev: rev, ready: make(chan struct{})}
w.pinned[blk] = blkpin
// perform IO without w.mu
w.mu.Unlock()
ack, err := w.link.sendReq(ctx, fmt.Sprintf("pin %s #%d @%s", foid, blk, revstr)) ack, err := w.link.sendReq(ctx, fmt.Sprintf("pin %s #%d @%s", foid, blk, revstr))
w.mu.Lock()
// check IO reply & verify/signal blkpin is ready
defer func() {
if rev == zodb.TidMax {
delete(w.pinned, blk)
}
w.mu.Unlock()
close(blkpin.ready)
}()
if err != nil { if err != nil {
blkpin.err = err
return err return err
} }
if ack != "ack" { if ack != "ack" {
return fmt.Errorf("expect %q; got %q", "ack", ack) blkpin.err = fmt.Errorf("expect %q; got %q", "ack", ack)
return blkpin.err
} }
if rev == zodb.TidMax { if blkpin != w.pinned[blk] {
delete(w.pinned, blk) blkpin.err = fmt.Errorf("BUG: pinned[#%d] mutated while doing IO", blk)
} else { panicf("f<%s>: wlink%d: %s", blkpin.err)
w.pinned[blk] = rev
} }
return nil return nil
} }
*/
// readPinWatchers complements readBlk: it sends `pin blk` for watchers of the file // readPinWatchers complements readBlk: it sends `pin blk` for watchers of the file
// after a block was loaded from ZODB and before block data is returned to kernel. // after a block was loaded from ZODB and before block data is returned to kernel.
...@@ -1445,7 +1497,7 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T ...@@ -1445,7 +1497,7 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T
link: wlink, link: wlink,
file: f, file: f,
at: at, at: at,
pinned: make(map[int64]zodb.Tid), pinned: make(map[int64]*blkPinState),
} }
} }
...@@ -1529,15 +1581,19 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T ...@@ -1529,15 +1581,19 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T
// if a block was previously pinned, but ∉ δ(at, head] -> unpin it to head. // if a block was previously pinned, but ∉ δ(at, head] -> unpin it to head.
for blk, pinPrev := range w.pinned { // XXX locking for blk, pinPrev := range w.pinned { // XXX locking
// only 1 setupWatch can be run simultaneously for one file
// XXX assert pinPrev.rev != zodb.TidMax
pinNew, pinning := toPin[blk] pinNew, pinning := toPin[blk]
if !pinning { if !pinning {
toPin[blk] = zodb.TidMax // @head toPin[blk] = zodb.TidMax // @head
} }
// don't bother to spawn .pin goroutines if pin revision is the same // TODO don't bother to spawn .pin goroutines if pin revision is the same ?
if pinPrev == pinNew { // if pinNew == pinPrev.rev && ready(pinPrev.ready) && pinPrev.err == nil {
delete(toPin, blk) // delete(toPin, blk)
} // }
_ = pinPrev
_ = pinNew
} }
wg, ctx := errgroup.WithContext(ctx) wg, ctx := errgroup.WithContext(ctx)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment