Commit 18cf8955 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent acec3cf1
...@@ -420,6 +420,9 @@ package main ...@@ -420,6 +420,9 @@ package main
// head.zheadMu WLock by handleδZ // head.zheadMu WLock by handleδZ
// RLock by read // RLock by read
// ... // ...
//
// WatchLink.byfileMu > Watch.mu
// BigFile.watchMu > Watch.mu
import ( import (
"bufio" "bufio"
...@@ -570,6 +573,7 @@ type BigFile struct { ...@@ -570,6 +573,7 @@ type BigFile struct {
// progress of being established. XXX text // progress of being established. XXX text
// //
// XXX locking -> watchMu? // XXX locking -> watchMu?
// XXX -> watches ?
watchTab map[*Watch]struct{} watchTab map[*Watch]struct{}
} }
...@@ -598,14 +602,12 @@ type WatchLink struct { ...@@ -598,14 +602,12 @@ type WatchLink struct {
id int32 // ID of this /head/watch handle (for debug log) id int32 // ID of this /head/watch handle (for debug log)
head *Head head *Head
// watches established over this watch link. // watches associated with this watch link.
// XXX in-progress - where? -> (XXX no - see vvv) nowhere; here only established watches are added
// XXX -> in-progress here - so that access to new blocks after δFtail
// was queried also send pins.
// //
// XXX locking? // both already established, and watches being initialized in-progress are registered here.
// XXX -> watchByFile? // (see setupWatch)
byfile map[zodb.Oid]*Watch // {} foid -> Watch byfileMu sync.Mutex
byfile map[zodb.Oid]*Watch // {} foid -> Watch
// IO // IO
reqNext uint64 // stream ID for next wcfs-originated request; 0 is reserved for control messages reqNext uint64 // stream ID for next wcfs-originated request; 0 is reserved for control messages
...@@ -1395,14 +1397,31 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T ...@@ -1395,14 +1397,31 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T
head := wlink.head head := wlink.head
bfdir := head.bfdir bfdir := head.bfdir
// XXX locking // wait for zhead.At ≥ at
// XXX head.zheadMu.RLock() + defer unlock (see vvv for unpin vs pin and locked head) if at != zodb.InvalidTid {
err = head.zheadWait(ctx, at)
if err != nil {
return err
}
}
// make sure zhead.At stays unchanged while we are preparing the watch
// (see vvv e.g. about unpin to @head for why it is neeed)
head.zheadMu.RLock()
defer head.zheadMu.RUnlock()
headAt := head.zconn.At()
if at < bfdir.δFtail.Tail() {
return fmt.Errorf("too far away back from head/at (@%s); δt = %s",
headAt, headAt.Time().Sub(at.Time().Time))
}
// XXX if watch was already established - we need to update it // XXX if watch was already established - we need to update it
// XXX locking (.byfile)
w := wlink.byfile[foid] w := wlink.byfile[foid]
if w == nil { if w == nil {
// watch was not previously established - set it up anew // watch was not previously established - set it up anew
// XXX locking // XXX locking (.fileTab)
f := bfdir.fileTab[foid] f := bfdir.fileTab[foid]
if f == nil { if f == nil {
// by "invalidation protocol" watch is setup after data file was opened // by "invalidation protocol" watch is setup after data file was opened
...@@ -1432,23 +1451,9 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T ...@@ -1432,23 +1451,9 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T
return fmt.Errorf("going back in history is forbidden") return fmt.Errorf("going back in history is forbidden")
} }
// XXX locking
err = head.zheadWait(ctx, at)
if err != nil {
return err
}
// XXX locking
headAt := head.zconn.At()
if at < bfdir.δFtail.Tail() {
return fmt.Errorf("too far away back from head/at (@%s); δt = %s",
headAt, headAt.Time().Sub(at.Time().Time))
}
f := w.file f := w.file
// register w to f here early, so that READs going in parallel to us // register w to f early, so that READs going in parallel to us
// preparing and processing initial pins, also send pins to w for read // preparing and processing initial pins, also send pins to w for read
// blocks. If we don't, we can miss to send pin to w for a freshly read // blocks. If we don't, we can miss to send pin to w for a freshly read
// block which could have revision > w.at: XXX test // block which could have revision > w.at: XXX test
...@@ -1467,7 +1472,7 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T ...@@ -1467,7 +1472,7 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T
// pin for #3. // pin for #3.
// //
// NOTE for `unpin blk` to -> @head we can be sure there won't be // NOTE for `unpin blk` to -> @head we can be sure there won't be
// simultaneous `pin blk` request, because: XXX recheck // simultaneous `pin blk` request, because:
// //
// - unpin means blk was previously pinned, // - unpin means blk was previously pinned,
// - blk was pinned means it is tracked by δFtail, // - blk was pinned means it is tracked by δFtail,
...@@ -1475,7 +1480,7 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T ...@@ -1475,7 +1480,7 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T
// there is indeed no blk change in that region, // there is indeed no blk change in that region,
// - which means that δblk with rev > w.at might be only > head, // - which means that δblk with rev > w.at might be only > head,
// - but such δblk are processed with zhead wlocked and we keep zhead // - but such δblk are processed with zhead wlocked and we keep zhead
// rlocked during pin setup. XXX rlock zhead during setupWatch // rlocked during pin setup.
// //
// δ δ // δ δ
// ----x----.------------]----x---- // ----x----.------------]----x----
...@@ -1573,8 +1578,6 @@ func (wlink *WatchLink) serve() { ...@@ -1573,8 +1578,6 @@ func (wlink *WatchLink) serve() {
log.Error(err) log.Error(err)
} }
// XXX deactiwate all watches from wlink.byfile[*]
head := wlink.head head := wlink.head
head.wlinkMu.Lock() head.wlinkMu.Lock()
delete(head.wlinkTab, wlink) delete(head.wlinkTab, wlink)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment