Commit 42687746 authored by Kirill Smelkov's avatar Kirill Smelkov

X zodb/cache: A concurency bug was found

which is showing itself as e.g.

---- 8< ----
panic: Buf: refcnt < 0

goroutine 7 [running]:
lab.nexedi.com/kirr/neo/go/zodb.(*Buf).Release(0xc42000db40)
        /home/kirr/src/neo/src/lab.nexedi.com/kirr/neo/go/zodb/buffer.go:107 +0x116
lab.nexedi.com/kirr/neo/go/zodb.(*Buf).XRelease(0xc42000db40)
        /home/kirr/src/neo/src/lab.nexedi.com/kirr/neo/go/zodb/buffer.go:136 +0x3a
lab.nexedi.com/kirr/neo/go/zodb.(*Cache).gc(0xc4200101c0)
        /home/kirr/src/neo/src/lab.nexedi.com/kirr/neo/go/zodb/cache.go:488 +0x22f
lab.nexedi.com/kirr/neo/go/zodb.(*Cache).gcmain(0xc4200101c0)
        /home/kirr/src/neo/src/lab.nexedi.com/kirr/neo/go/zodb/cache.go:451 +0x3f
created by lab.nexedi.com/kirr/neo/go/zodb.NewCache
        /home/kirr/src/neo/src/lab.nexedi.com/kirr/neo/go/zodb/cache.go:123 +0x126
---- 8< ----

during zhash.go -prefetch run.

This patch adds the test for this bug and explains the problem in detail
there. The test currently fails:

--- FAIL: TestCache (0.01s)
panic: Buf.Incref: refcnt was < 1 [recovered]
        panic: Buf.Incref: refcnt was < 1

goroutine 18 [running]:
testing.tRunner.func1(0xc4240ce000)
        /home/kirr/src/tools/go/go/src/testing/testing.go:711 +0x2d2
panic(0x588a20, 0x5ebe70)
        /home/kirr/src/tools/go/go/src/runtime/panic.go:491 +0x283
lab.nexedi.com/kirr/neo/go/zodb.(*Buf).Incref(0xc4200c2460)
        /home/kirr/src/neo/src/lab.nexedi.com/kirr/neo/go/zodb/buffer.go:134 +0x58
lab.nexedi.com/kirr/neo/go/zodb.(*Buf).XIncref(0xc4200c2460)
        /home/kirr/src/neo/src/lab.nexedi.com/kirr/neo/go/zodb/buffer.go:148 +0x3a
lab.nexedi.com/kirr/neo/go/zodb.(*Cache).Load(0xc4200ca000, 0x6ab1a0, 0xc42001a148, 0x4, 0x1, 0x10, 0xc4200bf2e0, 0x10, 0x10)
        /home/kirr/src/neo/src/lab.nexedi.com/kirr/neo/go/zodb/cache.go:168 +0x1c7
lab.nexedi.com/kirr/neo/go/zodb.TestCache.func2(0x4, 0x1, 0xc4200c2440, 0x4, 0x0, 0x0)
        /home/kirr/src/neo/src/lab.nexedi.com/kirr/neo/go/zodb/cache_test.go:155 +0xcc
lab.nexedi.com/kirr/neo/go/zodb.TestCache(0xc4240ce000)
        /home/kirr/src/neo/src/lab.nexedi.com/kirr/neo/go/zodb/cache_test.go:579 +0x4eb5
testing.tRunner(0xc4240ce000, 0x5d4728)
        /home/kirr/src/tools/go/go/src/testing/testing.go:746 +0xd0
created by testing.(*T).Run
        /home/kirr/src/tools/go/go/src/testing/testing.go:789 +0x2de
exit status 2
parent 0abecd63
...@@ -103,10 +103,10 @@ func BufAlloc64(size int64) *Buf { ...@@ -103,10 +103,10 @@ func BufAlloc64(size int64) *Buf {
// XXX naming? -> Free? -> Decref? -> Unref? // XXX naming? -> Free? -> Decref? -> Unref?
func (buf *Buf) Release() { func (buf *Buf) Release() {
rc := atomic.AddInt32(&buf.refcnt, -1) rc := atomic.AddInt32(&buf.refcnt, -1)
if rc < -1 { if rc < 0 - 1 {
panic("Buf: refcnt < 0") panic("Buf.Release: refcnt < 0")
} }
if rc > -1 { if rc > 0 - 1 {
return return
} }
...@@ -126,8 +126,13 @@ func (buf *Buf) Release() { ...@@ -126,8 +126,13 @@ func (buf *Buf) Release() {
} }
// Incref increments buf's reference counter by 1. // Incref increments buf's reference counter by 1.
//
// buf must already have reference-counter > 0 before Incref call.
func (buf *Buf) Incref() { func (buf *Buf) Incref() {
atomic.AddInt32(&buf.refcnt, +1) rc := atomic.AddInt32(&buf.refcnt, +1)
if rc <= 1 - 1 {
panic("Buf.Incref: refcnt was < 1")
}
} }
// XRelease releases buf it is != nil. // XRelease releases buf it is != nil.
......
...@@ -194,7 +194,7 @@ func (c *Cache) Prefetch(ctx context.Context, xid Xid) { ...@@ -194,7 +194,7 @@ func (c *Cache) Prefetch(ctx context.Context, xid Xid) {
// initiated yet. If so the caller should proceed to loading rce via loadRCE. // initiated yet. If so the caller should proceed to loading rce via loadRCE.
func (c *Cache) lookupRCE(xid Xid) (rce *revCacheEntry, rceNew bool) { func (c *Cache) lookupRCE(xid Xid) (rce *revCacheEntry, rceNew bool) {
// oid -> oce (oidCacheEntry) ; create new empty oce if not yet there // oid -> oce (oidCacheEntry) ; create new empty oce if not yet there
// exit with oce locked and cache.syncedTo read consistently // exit with oce locked and cache.head read consistently
c.mu.RLock() c.mu.RLock()
oce := c.entryMap[xid.Oid] oce := c.entryMap[xid.Oid]
...@@ -511,6 +511,8 @@ func isErrNoData(err error) bool { ...@@ -511,6 +511,8 @@ func isErrNoData(err error) bool {
// newRevEntry creates new revCacheEntry with .head and inserts it into .rcev @i. // newRevEntry creates new revCacheEntry with .head and inserts it into .rcev @i.
// (if i == len(oce.rcev) - entry is appended) // (if i == len(oce.rcev) - entry is appended)
//
// oce must be locked.
func (oce *oidCacheEntry) newRevEntry(i int, head Tid) *revCacheEntry { func (oce *oidCacheEntry) newRevEntry(i int, head Tid) *revCacheEntry {
rce := &revCacheEntry{ rce := &revCacheEntry{
parent: oce, parent: oce,
...@@ -529,6 +531,8 @@ func (oce *oidCacheEntry) newRevEntry(i int, head Tid) *revCacheEntry { ...@@ -529,6 +531,8 @@ func (oce *oidCacheEntry) newRevEntry(i int, head Tid) *revCacheEntry {
// find finds rce in .rcev and returns its index // find finds rce in .rcev and returns its index
// not found -> -1. // not found -> -1.
//
// oce must be locked.
func (oce *oidCacheEntry) find(rce *revCacheEntry) int { func (oce *oidCacheEntry) find(rce *revCacheEntry) int {
for i, r := range oce.rcev { for i, r := range oce.rcev {
if r == rce { if r == rce {
...@@ -539,6 +543,8 @@ func (oce *oidCacheEntry) find(rce *revCacheEntry) int { ...@@ -539,6 +543,8 @@ func (oce *oidCacheEntry) find(rce *revCacheEntry) int {
} }
// deli deletes .rcev[i] // deli deletes .rcev[i]
//
// oce must be locked.
func (oce *oidCacheEntry) deli(i int) { func (oce *oidCacheEntry) deli(i int) {
n := len(oce.rcev) - 1 n := len(oce.rcev) - 1
copy(oce.rcev[i:], oce.rcev[i+1:]) copy(oce.rcev[i:], oce.rcev[i+1:])
...@@ -550,6 +556,8 @@ func (oce *oidCacheEntry) deli(i int) { ...@@ -550,6 +556,8 @@ func (oce *oidCacheEntry) deli(i int) {
// del delets rce from .rcev. // del delets rce from .rcev.
// it panics if rce is not there. // it panics if rce is not there.
//
// oce must be locked.
func (oce *oidCacheEntry) del(rce *revCacheEntry) { func (oce *oidCacheEntry) del(rce *revCacheEntry) {
i := oce.find(rce) i := oce.find(rce)
if i == -1 { if i == -1 {
...@@ -574,6 +582,8 @@ func (rce *revCacheEntry) loaded() bool { ...@@ -574,6 +582,8 @@ func (rce *revCacheEntry) loaded() bool {
// //
// ( ErrXidMissing contains xid for which it is missing. In cache we keep such // ( ErrXidMissing contains xid for which it is missing. In cache we keep such
// xid with max .head but users need to get ErrXidMissing with their own query ) // xid with max .head but users need to get ErrXidMissing with their own query )
//
// rce must be loaded.
func (rce *revCacheEntry) userErr(xid Xid) error { func (rce *revCacheEntry) userErr(xid Xid) error {
switch e := rce.err.(type) { switch e := rce.err.(type) {
case *ErrXidMissing: case *ErrXidMissing:
......
...@@ -229,7 +229,7 @@ func TestCache(t *testing.T) { ...@@ -229,7 +229,7 @@ func TestCache(t *testing.T) {
} }
// ---- verify cache behaviour for must be loaded/merged entries ---- // ---- verify cache behaviour for must be loaded/merged entries ----
// (this excercises mostly loadRCE/tryMerge) // (this exercises mostly loadRCE/tryMerge)
checkMRU(0) checkMRU(0)
...@@ -404,7 +404,7 @@ func TestCache(t *testing.T) { ...@@ -404,7 +404,7 @@ func TestCache(t *testing.T) {
// ---- verify rce lookup for must be cached entries ---- // ---- verify rce lookup for must be cached entries ----
// (this excersizes lookupRCE) // (this exercises lookupRCE)
checkLookup := func(xid Xid, expect *revCacheEntry) { checkLookup := func(xid Xid, expect *revCacheEntry) {
t.Helper() t.Helper()
...@@ -470,7 +470,7 @@ func TestCache(t *testing.T) { ...@@ -470,7 +470,7 @@ func TestCache(t *testing.T) {
// (attach to Cache GC tracepoints) // (attach to Cache GC tracepoints)
tracer := &tTracer{xtesting.NewSyncTracer()} tracer := &tTracer{xtesting.NewSyncTracer()}
pg := &tracing.ProbeGroup{} pg := &tracing.ProbeGroup{}
defer pg.Done() //defer pg.Done()
tracing.Lock() tracing.Lock()
traceCacheGCStart_Attach(pg, tracer.traceCacheGCStart) traceCacheGCStart_Attach(pg, tracer.traceCacheGCStart)
...@@ -539,6 +539,46 @@ func TestCache(t *testing.T) { ...@@ -539,6 +539,46 @@ func TestCache(t *testing.T) {
checkOCE(1) checkOCE(1)
checkMRU(0) checkMRU(0)
// ---- Load vs concurrent GC ----
// in the following scenario if GC runs after Load completed lookupRCE
// but before Load increfs returned buf, the GC will actually return
// the buf to buffer pool and so Load will be returning wrong buffer:
//
// ---- 8< ----
// T1 Tgc
// Prefetch:
// RCELookedUp
// RCELoaded
// # GC - on hold
// Load
// RCELookedUp
// -> pause T1
// # GC - unpause
// GCStart
// GCStop
// <- unpause T1
// # load completes
// ---- 8< ----
//
// it is hard to check this via stable tracepoints because, if done so,
// after the problem is fixed the test will deadlock.
// So test it probabilistically instead.
pg.Done()
c.SetSizeMax(0) // we want to GC to be constantly running
for i := 0; i < 1e4; i++ {
// issue Prefetch: this should create RCE and spawn loadRCE for it
c.Prefetch(ctx, xidat(1,4))
// issue Load: this should lookup the RCE and wait for it to be loaded.
// if GC runs in parallel to Load there are chances it will
// be running in between Load->lookupRCE and final rce.buf.XIncref()
//
// if something is incorrect with refcounting either
// buf.Incref() in Load or buf.Release() in GC will panic.
checkLoad(xidat(1,4), b(hello), 4, nil)
}
// XXX verify caching vs ctx cancel // XXX verify caching vs ctx cancel
// XXX verify db inconsistency checks // XXX verify db inconsistency checks
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment