Commit 04ed99ad authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 56d0b374
...@@ -1485,7 +1485,7 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T ...@@ -1485,7 +1485,7 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T
} }
// make sure zhead.At stays unchanged while we are preparing the watch // make sure zhead.At stays unchanged while we are preparing the watch
// (see vvv e.g. about unpin to @head for why it is neeed) // (see vvv e.g. about unpin to @head for why it is needed)
head.zheadMu.RLock() head.zheadMu.RLock()
defer head.zheadMu.RUnlock() defer head.zheadMu.RUnlock()
headAt := head.zconn.At() headAt := head.zconn.At()
...@@ -1521,10 +1521,16 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T ...@@ -1521,10 +1521,16 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T
return nil return nil
} }
// request exclusive access to the watch to change .at and compute pins.
// The lock will be downgraded from W to R after pins computation is done.
// Pins will be executed with atMu.R only - with the idea not to block
// other client that read-access the file simultaneously to setupWatch.
w.atMu.Lock()
// check at >= w.at // check at >= w.at
// XXX we might want to allow going back in history if we need it. // XXX we might want to allow going back in history if we need it.
if !(at >= w.at) { // XXX locking if !(at >= w.at) {
w.atMu.Unlock()
return fmt.Errorf("going back in history is forbidden") return fmt.Errorf("going back in history is forbidden")
} }
...@@ -1568,7 +1574,7 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T ...@@ -1568,7 +1574,7 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T
// unpinned, because we update w.at to requested at early. // unpinned, because we update w.at to requested at early.
// //
// XXX register only if watch was created anew, not updated? // XXX register only if watch was created anew, not updated?
w.at = at // XXX locking w.at = at
// NOTE registering f.watchTab[w] and wlink.byfile[foid] = w must come together. // NOTE registering f.watchTab[w] and wlink.byfile[foid] = w must come together.
f.watchTab[w] = struct{}{} // XXX locking f.watchTab[w] = struct{}{} // XXX locking
wlink.byfile[foid] = w // XXX locking wlink.byfile[foid] = w // XXX locking
...@@ -1576,12 +1582,9 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T ...@@ -1576,12 +1582,9 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T
// XXX defer -> unregister watch if error? // XXX defer -> unregister watch if error?
// pin all tracked file blocks that were changed in (at, head] range. // pin all tracked file blocks that were changed in (at, head] range.
toPin := map[int64]zodb.Tid{} // blk -> @rev
// XXX locking for _, δfile := range bfdir.δFtail.SliceByFileRev(f, at, headAt) { // XXX locking δFtail
toPin := map[int64]zodb.Tid{} // blk -> @rev
for _, δfile := range bfdir.δFtail.SliceByFileRev(f, at, headAt) {
for blk := range δfile.Blocks { for blk := range δfile.Blocks {
_, already := toPin[blk] _, already := toPin[blk]
if already { if already {
...@@ -1593,9 +1596,10 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T ...@@ -1593,9 +1596,10 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T
} }
// if a block was previously pinned, but ∉ δ(at, head] -> unpin it to head. // if a block was previously pinned, but ∉ δ(at, head] -> unpin it to head.
for blk, pinPrev := range w.pinned { // XXX locking for blk, pinPrev := range w.pinned {
// only 1 setupWatch can be run simultaneously for one file // only 1 setupWatch can be run simultaneously for one file
// XXX assert pinPrev.rev != zodb.TidMax // XXX assert pinPrev.rev != zodb.TidMax
pinNew, pinning := toPin[blk] pinNew, pinning := toPin[blk]
if !pinning { if !pinning {
toPin[blk] = zodb.TidMax // @head toPin[blk] = zodb.TidMax // @head
...@@ -1609,6 +1613,14 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T ...@@ -1609,6 +1613,14 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T
_ = pinNew _ = pinNew
} }
// downgrade atMu.W -> atMu.R
// XXX there is no primitive to do Wlock->Rlock atomically, but we are
// ok with that since we prepared eveyrhing to handle simultaneous pins
// from other reads.
w.atMu.Unlock()
w.atMu.RLock()
defer w.atMu.RUnlock()
wg, ctx := errgroup.WithContext(ctx) wg, ctx := errgroup.WithContext(ctx)
for blk, rev := range toPin { for blk, rev := range toPin {
blk := blk blk := blk
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment