Commit e80dcbde authored by Kirill Smelkov's avatar Kirill Smelkov

X zodb/cache: Fix concurrency bug (42687746)

By extending loadRCE to accept how much the caller wants returned
rce.buf to be xincref'ed we can teach it to do the incref consistently
under rce.parent lock either itself, or schedule the incref to loadRCE
to be done after rce is loaded right before it is exposed to outside
world (rce waiters + gc).
parent 148222a1
...@@ -96,7 +96,15 @@ type revCacheEntry struct { ...@@ -96,7 +96,15 @@ type revCacheEntry struct {
err error err error
ready chan struct{} // closed when loading finished ready chan struct{} // closed when loading finished
accounted bool // whether rce size accounted in cache size; protected by .parent's lock
// protected by .parent's lock:
accounted bool // whether rce size was accounted in cache size
// how many waiters for buf is there while rce is being loaded.
// after data for this RCE is loaded loadRCE will do .buf.XIncref() .waitBufRef times.
// = -1 after loading is complete.
waitBufRef int32
} }
// StorLoader represents loading part of a storage. // StorLoader represents loading part of a storage.
...@@ -144,7 +152,7 @@ func (c *Cache) SetSizeMax(sizeMax int) { ...@@ -144,7 +152,7 @@ func (c *Cache) SetSizeMax(sizeMax int) {
// //
// If data is already in cache - cached content is returned. // If data is already in cache - cached content is returned.
func (c *Cache) Load(ctx context.Context, xid Xid) (buf *Buf, serial Tid, err error) { func (c *Cache) Load(ctx context.Context, xid Xid) (buf *Buf, serial Tid, err error) {
rce, rceNew := c.lookupRCE(xid) rce, rceNew := c.lookupRCE(xid, +1)
// rce is already in cache - use it // rce is already in cache - use it
if !rceNew { if !rceNew {
...@@ -164,7 +172,6 @@ func (c *Cache) Load(ctx context.Context, xid Xid) (buf *Buf, serial Tid, err er ...@@ -164,7 +172,6 @@ func (c *Cache) Load(ctx context.Context, xid Xid) (buf *Buf, serial Tid, err er
return nil, 0, rce.userErr(xid) return nil, 0, rce.userErr(xid)
} }
rce.buf.XIncref()
return rce.buf, rce.serial, nil return rce.buf, rce.serial, nil
} }
...@@ -174,10 +181,12 @@ func (c *Cache) Load(ctx context.Context, xid Xid) (buf *Buf, serial Tid, err er ...@@ -174,10 +181,12 @@ func (c *Cache) Load(ctx context.Context, xid Xid) (buf *Buf, serial Tid, err er
// Prefetch is not blocking operation and does not wait for loading, if any was // Prefetch is not blocking operation and does not wait for loading, if any was
// started, to complete. // started, to complete.
func (c *Cache) Prefetch(ctx context.Context, xid Xid) { func (c *Cache) Prefetch(ctx context.Context, xid Xid) {
rce, rceNew := c.lookupRCE(xid) rce, rceNew := c.lookupRCE(xid, +0)
// !rceNew -> no need to adjust LRU - it will be adjusted by further actual data Load // !rceNew -> no need to adjust LRU - it will be adjusted by further actual data Load.
// XXX or it is better to adjust LRU here too? // More: we must not expose not-yet-loaded RCEs to Cache.lru because
// their rce.waitBufRef was not yet synced to rce.buf.
// See loadRCE for details.
// spawn loading in the background if rce was not yet loaded // spawn loading in the background if rce was not yet loaded
if rceNew { if rceNew {
...@@ -192,7 +201,17 @@ func (c *Cache) Prefetch(ctx context.Context, xid Xid) { ...@@ -192,7 +201,17 @@ func (c *Cache) Prefetch(ctx context.Context, xid Xid) {
// //
// rceNew indicates whether rce is new and so loading on it has not been // rceNew indicates whether rce is new and so loading on it has not been
// initiated yet. If so the caller should proceed to loading rce via loadRCE. // initiated yet. If so the caller should proceed to loading rce via loadRCE.
func (c *Cache) lookupRCE(xid Xid) (rce *revCacheEntry, rceNew bool) { //
// wantBufRef indicates how much caller wants returned rce.buf to be incref'ed.
//
// This increment will be done only after rce is loaded either by lookupRCE
// here - if it find the rce to be already loaded, or by future loadRCE - if
// rce is not loaded yet - to which the increment will be scheduled.
//
// In any way - either by lookupRCE or loadRCE - the increment will be done
// consistently while under rce.parent lock - this way making sure concurrent gc
// won't release rce.buf while it does not yet hold all its wanted references.
func (c *Cache) lookupRCE(xid Xid, wantBufRef int) (rce *revCacheEntry, rceNew bool) {
// oid -> oce (oidCacheEntry) ; create new empty oce if not yet there // oid -> oce (oidCacheEntry) ; create new empty oce if not yet there
// exit with oce locked and cache.head read consistently // exit with oce locked and cache.head read consistently
c.mu.RLock() c.mu.RLock()
...@@ -260,6 +279,16 @@ func (c *Cache) lookupRCE(xid Xid) (rce *revCacheEntry, rceNew bool) { ...@@ -260,6 +279,16 @@ func (c *Cache) lookupRCE(xid Xid) (rce *revCacheEntry, rceNew bool) {
rceNew = true rceNew = true
} }
// wantBufRef -> either incref loaded buf, or schedule this incref to
// loadRCE to be done after loading is complete.
for ; wantBufRef > 0; wantBufRef-- {
if rce.loaded() {
rce.buf.XIncref()
} else {
rce.waitBufRef++
}
}
oce.Unlock() oce.Unlock()
return rce, rceNew return rce, rceNew
} }
...@@ -284,24 +313,37 @@ func (c *Cache) loadRCE(ctx context.Context, rce *revCacheEntry, oid Oid) { ...@@ -284,24 +313,37 @@ func (c *Cache) loadRCE(ctx context.Context, rce *revCacheEntry, oid Oid) {
rce.err = err rce.err = err
// verify db gives serial <= head // verify db gives serial <= head
if rce.serial > rce.head { if rce.serial > rce.head {
rce.errDB(oid, "load(@%v) -> %v", rce.head, serial) rce.markAsDBError(oid, "load(@%v) -> %v", rce.head, serial)
} }
close(rce.ready)
δsize := rce.buf.Len() δsize := rce.buf.Len()
// merge rce with adjacent entries in parent
// ( e.g. load(@3) and load(@4) results in the same data loaded if
// there are only revisions with serials 1 and 5 )
oce.Lock() oce.Lock()
// sync .waitBufRef -> .buf
//
// this is needed so that we always put into Cache.lru an RCE with proper
// refcount = 1 for cache + n·Load waiters. If we do not account for
// n·Load waiters here - under .parent's lock, gc might run before Load
// resumes, see .buf.refcnt = 1 and return .buf to freelist -> oops.
for ; rce.waitBufRef > 0; rce.waitBufRef-- {
rce.buf.XIncref()
}
rce.waitBufRef = -1 // mark as loaded
i := oce.find(rce) i := oce.find(rce)
if i == -1 { if i == -1 {
// rce was already dropped by merge / evicted // rce was already dropped by merge / evicted
// (XXX recheck about evicted) // (XXX recheck about evicted)
oce.Unlock() oce.Unlock()
close(rce.ready)
return return
} }
// merge rce with adjacent entries in parent
// ( e.g. load(@3) and load(@4) results in the same data loaded if
// there are only revisions with serials 1 and 5 )
//
// if rce & rceNext cover the same range -> drop rce // if rce & rceNext cover the same range -> drop rce
// //
// if we drop rce - do not update c.lru as: // if we drop rce - do not update c.lru as:
...@@ -310,6 +352,7 @@ func (c *Cache) loadRCE(ctx context.Context, rce *revCacheEntry, oid Oid) { ...@@ -310,6 +352,7 @@ func (c *Cache) loadRCE(ctx context.Context, rce *revCacheEntry, oid Oid) {
// //
// if rceNext is not yet there on lru list its loadRCE is in progress // if rceNext is not yet there on lru list its loadRCE is in progress
// and will update lru and cache size for it itself. // and will update lru and cache size for it itself.
rceOrig := rce
rceDropped := false rceDropped := false
if i + 1 < len(oce.rcev) { if i + 1 < len(oce.rcev) {
rceNext := oce.rcev[i+1] rceNext := oce.rcev[i+1]
...@@ -343,6 +386,11 @@ func (c *Cache) loadRCE(ctx context.Context, rce *revCacheEntry, oid Oid) { ...@@ -343,6 +386,11 @@ func (c *Cache) loadRCE(ctx context.Context, rce *revCacheEntry, oid Oid) {
oce.Unlock() oce.Unlock()
// now after .waitBufRef was synced to .buf notify to waiters that
// original rce in question was loaded. Do so outside .parent lock.
close(rceOrig.ready)
// update lru & cache size // update lru & cache size
gcrun := false gcrun := false
c.gcMu.Lock() c.gcMu.Lock()
...@@ -401,12 +449,12 @@ func tryMerge(prev, next, cur *revCacheEntry, oid Oid) bool { ...@@ -401,12 +449,12 @@ func tryMerge(prev, next, cur *revCacheEntry, oid Oid) bool {
// check consistency // check consistency
switch { switch {
case prev.err == nil && prev.serial != next.serial: case prev.err == nil && prev.serial != next.serial:
cur.errDB(oid, "load(@%v) -> %v; load(@%v) -> %v", cur.markAsDBError(oid, "load(@%v) -> %v; load(@%v) -> %v",
prev.head, prev.serial, next.head, next.serial) prev.head, prev.serial, next.head, next.serial)
case prev.err != nil && !isErrNoData(prev.err): case prev.err != nil && !isErrNoData(prev.err):
if cur.err == nil { if cur.err == nil {
cur.errDB(oid, "load(@%v) -> %v; load(@%v) -> %v", cur.markAsDBError(oid, "load(@%v) -> %v; load(@%v) -> %v",
prev.head, prev.err, next.head, next.serial) prev.head, prev.err, next.head, next.serial)
} }
} }
...@@ -485,6 +533,12 @@ func (c *Cache) gc() { ...@@ -485,6 +533,12 @@ func (c *Cache) gc() {
oce.deli(i) oce.deli(i)
c.size -= rce.buf.Len() c.size -= rce.buf.Len()
//fmt.Printf("gc: free %d bytes\n", rce.buf.Len())) //fmt.Printf("gc: free %d bytes\n", rce.buf.Len()))
// not-yet-loaded rce must not be on Cache.lru
if !rce.loaded() {
panic("cache: gc: found !loaded rce on lru")
}
rce.buf.XRelease() rce.buf.XRelease()
} }
oce.Unlock() oce.Unlock()
...@@ -516,7 +570,6 @@ func isErrNoData(err error) bool { ...@@ -516,7 +570,6 @@ func isErrNoData(err error) bool {
func (oce *oidCacheEntry) newRevEntry(i int, head Tid) *revCacheEntry { func (oce *oidCacheEntry) newRevEntry(i int, head Tid) *revCacheEntry {
rce := &revCacheEntry{ rce := &revCacheEntry{
parent: oce, parent: oce,
serial: 0,
head: head, head: head,
ready: make(chan struct{}), ready: make(chan struct{}),
} }
...@@ -569,13 +622,10 @@ func (oce *oidCacheEntry) del(rce *revCacheEntry) { ...@@ -569,13 +622,10 @@ func (oce *oidCacheEntry) del(rce *revCacheEntry) {
// loaded reports whether rce was already loaded // loaded reports whether rce was already loaded
//
// must be called with rce.parent locked.
func (rce *revCacheEntry) loaded() bool { func (rce *revCacheEntry) loaded() bool {
select { return (rce.waitBufRef == -1)
case <-rce.ready:
return true
default:
return false
}
} }
// userErr returns error that, if any, needs to be returned to user from Cache.Load // userErr returns error that, if any, needs to be returned to user from Cache.Load
...@@ -617,8 +667,12 @@ func errDB(oid Oid, format string, argv ...interface{}) error { ...@@ -617,8 +667,12 @@ func errDB(oid Oid, format string, argv ...interface{}) error {
append([]interface{}{oid}, argv...)...) append([]interface{}{oid}, argv...)...)
} }
// errDB marks rce with database inconsistency error // markAsDBError marks rce with database inconsistency error.
func (rce *revCacheEntry) errDB(oid Oid, format string, argv ...interface{}) { //
// Caller must be the only one to access rce.
// In practice this means rce was just loaded but neither yet signalled to be
// ready to waiter, nor yet made visible to GC (via adding to Cacle.lru list).
func (rce *revCacheEntry) markAsDBError(oid Oid, format string, argv ...interface{}) {
rce.err = errDB(oid, format, argv...) rce.err = errDB(oid, format, argv...)
rce.buf.XRelease() rce.buf.XRelease()
rce.buf = nil rce.buf = nil
......
...@@ -316,7 +316,7 @@ func TestCache(t *testing.T) { ...@@ -316,7 +316,7 @@ func TestCache(t *testing.T) {
// with 15] -> 11] and 13] should be merged into 15]. // with 15] -> 11] and 13] should be merged into 15].
// (manually add rce1_h15 so it is not merged with 11]) // (manually add rce1_h15 so it is not merged with 11])
rce1_h15, new15 := c.lookupRCE(xidat(1,15)) rce1_h15, new15 := c.lookupRCE(xidat(1,15), +0)
ok1(new15) ok1(new15)
rce1_h15.serial = 10 rce1_h15.serial = 10
rce1_h15.buf = mkbuf(world) rce1_h15.buf = mkbuf(world)
...@@ -327,12 +327,13 @@ func TestCache(t *testing.T) { ...@@ -327,12 +327,13 @@ func TestCache(t *testing.T) {
// (lookup 13] while 15] is not yet loaded so 15] is not picked // (lookup 13] while 15] is not yet loaded so 15] is not picked
// automatically at lookup phase) // automatically at lookup phase)
rce1_h13, new13 := c.lookupRCE(xidat(1,13)) rce1_h13, new13 := c.lookupRCE(xidat(1,13), +0)
ok1(new13) ok1(new13)
checkOCE(1, rce1_h3, rce1_h6, rce1_h7, rce1_h9, rce1_h11, rce1_h13, rce1_h15) checkOCE(1, rce1_h3, rce1_h6, rce1_h7, rce1_h9, rce1_h11, rce1_h13, rce1_h15)
checkMRU(12, rce1_h11, rce1_h9, rce1_h7, rce1_h6, rce1_h3) // no <14 and <16 yet checkMRU(12, rce1_h11, rce1_h9, rce1_h7, rce1_h6, rce1_h3) // no <14 and <16 yet
// (now 15] becomes ready but not yet takes oce lock) // (now 15] becomes ready but does not yet takes oce lock)
rce1_h15.waitBufRef = -1
close(rce1_h15.ready) close(rce1_h15.ready)
ok1(rce1_h15.loaded()) ok1(rce1_h15.loaded())
checkOCE(1, rce1_h3, rce1_h6, rce1_h7, rce1_h9, rce1_h11, rce1_h13, rce1_h15) checkOCE(1, rce1_h3, rce1_h6, rce1_h7, rce1_h9, rce1_h11, rce1_h13, rce1_h15)
...@@ -356,14 +357,15 @@ func TestCache(t *testing.T) { ...@@ -356,14 +357,15 @@ func TestCache(t *testing.T) {
// similar race in between 16] and 17] but now β (17]) takes oce lock first: // similar race in between 16] and 17] but now β (17]) takes oce lock first:
rce1_h16, new16 := c.lookupRCE(xidat(1,16)) rce1_h16, new16 := c.lookupRCE(xidat(1,16), +0)
ok1(new16) ok1(new16)
rce1_h17, new17 := c.lookupRCE(xidat(1,17)) rce1_h17, new17 := c.lookupRCE(xidat(1,17), +0)
ok1(new17) ok1(new17)
// (16] loads but not yet takes oce lock) // (16] loads but not yet takes oce lock)
rce1_h16.serial = 16 rce1_h16.serial = 16
rce1_h16.buf = mkbuf(zz) rce1_h16.buf = mkbuf(zz)
rce1_h16.waitBufRef = -1
close(rce1_h16.ready) close(rce1_h16.ready)
ok1(rce1_h16.loaded()) ok1(rce1_h16.loaded())
checkOCE(1, rce1_h3, rce1_h6, rce1_h7, rce1_h9, rce1_h15, rce1_h16, rce1_h17) checkOCE(1, rce1_h3, rce1_h6, rce1_h7, rce1_h9, rce1_h15, rce1_h16, rce1_h17)
...@@ -409,7 +411,7 @@ func TestCache(t *testing.T) { ...@@ -409,7 +411,7 @@ func TestCache(t *testing.T) {
checkLookup := func(xid Xid, expect *revCacheEntry) { checkLookup := func(xid Xid, expect *revCacheEntry) {
t.Helper() t.Helper()
bad := &bytes.Buffer{} bad := &bytes.Buffer{}
rce, rceNew := c.lookupRCE(xid) rce, rceNew := c.lookupRCE(xid, +0)
if rceNew { if rceNew {
fmt.Fprintf(bad, "rce must be already in cache\n") fmt.Fprintf(bad, "rce must be already in cache\n")
} }
...@@ -433,7 +435,7 @@ func TestCache(t *testing.T) { ...@@ -433,7 +435,7 @@ func TestCache(t *testing.T) {
checkLookup(xidat(1,9), rce1_h9) checkLookup(xidat(1,9), rce1_h9)
// 8] must be separate from 7] and 9] because it is IO error there // 8] must be separate from 7] and 9] because it is IO error there
rce1_h8, new8 := c.lookupRCE(xidat(1,8)) rce1_h8, new8 := c.lookupRCE(xidat(1,8), +0)
ok1(new8) ok1(new8)
c.loadRCE(ctx, rce1_h8, 1) c.loadRCE(ctx, rce1_h8, 1)
checkRCE(rce1_h8, 8, 0, nil, ioerr) checkRCE(rce1_h8, 8, 0, nil, ioerr)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment