Commit e58e9845 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent f21771d3
......@@ -622,9 +622,16 @@ type Watch struct {
link *WatchLink // link to client
file *BigFile // XXX needed?
mu sync.Mutex // XXX split -> atMu(RW) + pinnedMu
at zodb.Tid // requested to be watched @at
pinned map[int64]*blkPinState // {} blk -> {... rev} blocks that are already pinned to be ≤ at
// atMu, similarly to zheadMu, protects watch.at and pins associated with Watch.
// atMu.R guarantees that watch.at is not changing, but multiple
// simultaneous pins could be running (used e.g. by readPinWatchers).
// atMu.W guaraneees that only one user has watch.at write access and
// that no pins are running (used by setupWatch).
atMu sync.RWMutex
at zodb.Tid // requested to be watched @at
pinnedMu sync.Mutex // atMu.W | atMu.R + pinnedMu
pinned map[int64]*blkPinState // {} blk -> {... rev} blocks that are already pinned to be ≤ at
}
// blkPinState represents state/result of pinning one block.
......@@ -1282,6 +1289,8 @@ retry:
// rev = zodb.TidMax means @head; otherwise rev must be ≤ w.at and there must
// be no rev_next changing file[blk]: rev < rev_next ≤ w.at.
//
// must be called with atMu rlocked.
//
// XXX error - when? or close watch on any error?
func (w *Watch) pin(ctx context.Context, blk int64, rev zodb.Tid) (err error) {
defer xerr.Contextf(&err, "wlink%d: f<%s>", w.link.id, w.file.zfile.POid())
......@@ -1296,15 +1305,13 @@ func (w *Watch) _pin(ctx context.Context, blk int64, rev zodb.Tid) (err error) {
}
defer xerr.Contextf(&err, "pin #%d @%s", blk, revstr)
w.mu.Lock()
// XXX ignore vvv ? (w.at could be ↑ after precheck in read (XXX or setupWatch)
if wat := w.at; !(rev == zodb.TidMax || rev <= wat) {
w.mu.Unlock()
if !(rev == zodb.TidMax || rev <= w.at) {
panicf("f<%s>: wlink%d: pin #%d @%s: watch.at (%s) < rev",
foid, w.link.id, blk, rev, wat)
foid, w.link.id, blk, rev, w.at)
}
w.pinnedMu.Lock()
// check/wait for previous/simultaneous pin.
// (pin could be called simultaneously e.g. by setupWatch and readPinWatchers)
for {
......@@ -1313,7 +1320,7 @@ func (w *Watch) _pin(ctx context.Context, blk int64, rev zodb.Tid) (err error) {
break
}
w.mu.Unlock()
w.pinnedMu.Unlock()
<-blkpin.ready // XXX + ctx ? (or just keep ready ?)
if blkpin.rev == rev {
......@@ -1323,27 +1330,27 @@ func (w *Watch) _pin(ctx context.Context, blk int64, rev zodb.Tid) (err error) {
}
// relock the watch and check that w.pinned[blk] is the same. Retry if it is not.
// ( w.pinned[blk] could have changed while w.mu was not held e.g. by
// ( w.pinned[blk] could have changed while w.mu was not held e.g. by XXX recheck
// simultaneous setupWatch if we were called by readPinWatchers )
w.mu.Lock()
w.pinnedMu.Lock()
if blkpin == w.pinned[blk] {
if blkpin.rev == zodb.TidMax {
w.mu.Unlock()
w.pinnedMu.Unlock()
panicf("f<%s>: wlink%d: pinned[#%d] = @head", foid, w.link.id, blk)
}
break
}
}
// w.mu locked & previous pin is either nil or completed and its .rev != rev
// w.pinnedMu locked & previous pin is either nil or completed and its .rev != rev
// -> setup new pin state
blkpin := &blkPinState{rev: rev, ready: make(chan struct{})}
w.pinned[blk] = blkpin
// perform IO without w.mu
w.mu.Unlock()
// perform IO without w.pinnedMu
w.pinnedMu.Unlock()
ack, err := w.link.sendReq(ctx, fmt.Sprintf("pin %s #%d @%s", foid, blk, revstr))
w.mu.Lock()
w.pinnedMu.Lock()
// check IO reply & verify/signal blkpin is ready
defer func() {
......@@ -1351,7 +1358,7 @@ func (w *Watch) _pin(ctx context.Context, blk int64, rev zodb.Tid) (err error) {
delete(w.pinned, blk)
}
w.mu.Unlock()
w.pinnedMu.Unlock()
close(blkpin.ready)
}()
......@@ -1368,7 +1375,7 @@ func (w *Watch) _pin(ctx context.Context, blk int64, rev zodb.Tid) (err error) {
if blkpin != w.pinned[blk] {
blkpin.err = fmt.Errorf("BUG: pinned[#%d] mutated while doing IO", blk)
panicf("f<%s>: wlink%d: %s", blkpin.err)
panicf("f<%s>: wlink%d: %s", foid, w.link.id, blkpin.err)
}
return nil
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment