Commit fc69bd18 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent b6b314d9
...@@ -446,25 +446,25 @@ func (c *Client) Load(ctx context.Context, xid zodb.Xid) (data *zodb.Buf, serial ...@@ -446,25 +446,25 @@ func (c *Client) Load(ctx context.Context, xid zodb.Xid) (data *zodb.Buf, serial
return nil, 0, err // XXX err context return nil, 0, err // XXX err context
} }
data = resp.Data buf = resp.DataBuf
//checksum := sha1.Sum(data.Data) //checksum := sha1.Sum(buf.Data)
//if checksum != resp.Checksum { //if checksum != resp.Checksum {
// return nil, 0, fmt.Errorf("data corrupt: checksum mismatch") // return nil, 0, fmt.Errorf("data corrupt: checksum mismatch")
//} //}
if resp.Compression { if resp.Compression {
// XXX cleanup mess vvv // XXX cleanup mess vvv
data2 := zodb.BufAlloc(len(data.Data)) buf2 := zodb.BufAlloc(len(buf.Data))
data2.Data = data2.Data[:0] buf2.Data = buf2.Data[:0]
udata, err := decompress(data.Data, data2.Data) udata, err := decompress(buf.Data, buf2.Data)
data.Free() buf.Release()
if err != nil { if err != nil {
data2.Free() buf2.Release()
return nil, 0, fmt.Errorf("data corrupt: %v", err) return nil, 0, fmt.Errorf("data corrupt: %v", err)
} }
data2.Data = udata buf2.Data = udata
data = data2 buf = buf2
} }
// reply.NextSerial // reply.NextSerial
......
...@@ -646,8 +646,7 @@ type AnswerObject struct { ...@@ -646,8 +646,7 @@ type AnswerObject struct {
NextSerial zodb.Tid // XXX but there it is out of sync NextSerial zodb.Tid // XXX but there it is out of sync
Compression bool Compression bool
Checksum Checksum Checksum Checksum
// Data []byte // TODO encode -> separately (for writev) DataBuf *zodb.Buf // TODO encode -> separately (for writev)
Data *zodb.Buf // TODO encode -> separately (for writev)
DataSerial zodb.Tid DataSerial zodb.Tid
} }
......
...@@ -22,6 +22,7 @@ package zodb ...@@ -22,6 +22,7 @@ package zodb
import ( import (
"sync" "sync"
"sync/atomic"
"lab.nexedi.com/kirr/go123/xmath" "lab.nexedi.com/kirr/go123/xmath"
) )
...@@ -29,13 +30,19 @@ import ( ...@@ -29,13 +30,19 @@ import (
// Buf represents memory buffer. // Buf represents memory buffer.
// //
// To lower pressure on Go garbage-collector allocate buffers with BufAlloc and // To lower pressure on Go garbage-collector allocate buffers with BufAlloc and
// free them with Buf.Free. // free them with Buf.Release.
// //
// Custom allocation functions affect only performance, not correctness - // Custom allocation functions affect only performance, not correctness -
// everything should work if data buffer is allocated and/or free'ed // everything should work if data buffer is allocated and/or free'ed
// regular Go/GC-way. // regular Go/GC-way.
type Buf struct { type Buf struct {
Data []byte Data []byte
// reference counter.
//
// NOTE to allow both Bufs created via BufAlloc and via std new Buf is
// created with refcnt=0. The real number of references to Buf is thus .refcnt+1
refcnt int32
} }
const order0 = 4 // buf sizes start from 2^4 (=16) const order0 = 4 // buf sizes start from 2^4 (=16)
...@@ -87,13 +94,27 @@ func BufAlloc64(size int64) *Buf { ...@@ -87,13 +94,27 @@ func BufAlloc64(size int64) *Buf {
buf := bufPoolv[order].Get().(*Buf) buf := bufPoolv[order].Get().(*Buf)
//println("\tlen:", len(buf.Data), "cap:", cap(buf.Data)) //println("\tlen:", len(buf.Data), "cap:", cap(buf.Data))
buf.Data = buf.Data[:size] // leaving cap as is = 2^i buf.Data = buf.Data[:size] // leaving cap as is = 2^i
buf.refcnt = 0
return buf return buf
} }
// Free returns no-longer needed buf to freelist. // Release marks buf as no longer used by caller.
//
// It decrements buf reference-counter and if it reaches zero returns buf to
// freelist.
//
// The caller must not use buf after call to Release.
// //
// The caller must not use buf after call to Free. // XXX naming? -> Free? -> Decref? -> Unref?
func (buf *Buf) Free() { func (buf *Buf) Release() {
rc := atomic.AddInt32(&buf.refcnt, -1)
if rc < -1 {
panic("Buf: refcnt < 0")
}
if rc > -1 {
return
}
// order = max i: 2^i <= cap // order = max i: 2^i <= cap
//order := bits.Len(uint(cap(buf.Data))) //order := bits.Len(uint(cap(buf.Data)))
order := xmath.FloorLog2(uint64(cap(buf.Data))) order := xmath.FloorLog2(uint64(cap(buf.Data)))
...@@ -110,3 +131,43 @@ func (buf *Buf) Free() { ...@@ -110,3 +131,43 @@ func (buf *Buf) Free() {
bufPoolv[order].Put(buf) bufPoolv[order].Put(buf)
} }
// Incref increments buf's reference counter by 1.
func (buf *Buf) Incref() {
atomic.AddInt32(&buf.refcnt, +1)
}
// XRelease releases buf it is != nil.
func (buf *Buf) XRelease() {
if buf != nil {
buf.Release()
}
}
// XIncref increments buf's reference counter by 1 if buf != nil.
func (buf *Buf) XIncref() {
if buf != nil {
buf.Incref()
}
}
// Len returns buf's len.
//
// it works even if buf=nil similarly to len() on nil []byte slice.
func (buf *Buf) Len() int {
if buf != nil {
return len(buf.Data)
}
return 0
}
// Cap returns buf's cap.
//
// it works even if buf=nil similarly to len() on nil []byte slice.
func (buf *Buf) Cap() int {
if buf != nil {
return cap(buf.Data)
}
return 0
}
...@@ -25,11 +25,22 @@ import ( ...@@ -25,11 +25,22 @@ import (
"unsafe" "unsafe"
) )
//go:linkname runtime_procPin runtime.procPin
//go:linkname runtime_procUnpin runtime.procUnpin
func runtime_procPin() int
func runtime_procUnpin()
func sliceDataPtr(b []byte) unsafe.Pointer { func sliceDataPtr(b []byte) unsafe.Pointer {
return unsafe.Pointer((*reflect.SliceHeader)(unsafe.Pointer(&b)).Data) return unsafe.Pointer((*reflect.SliceHeader)(unsafe.Pointer(&b)).Data)
} }
func TestBufAllocFree(t *testing.T) { func TestBufAllocFree(t *testing.T) {
// sync.Pool uses per-P free-lists. We check that after release we will
// allocate released object. This works only if P is not changed underneath.
runtime_procPin()
defer runtime_procUnpin()
for i := uint(0); i < 25; i++ { for i := uint(0); i < 25; i++ {
size := 1<<i - 1 size := 1<<i - 1
xcap := 1<<i xcap := 1<<i
...@@ -48,22 +59,56 @@ func TestBufAllocFree(t *testing.T) { ...@@ -48,22 +59,56 @@ func TestBufAllocFree(t *testing.T) {
t.Errorf("%v: cap=%v ; want %v", i, cap(buf.Data), xcap) t.Errorf("%v: cap=%v ; want %v", i, cap(buf.Data), xcap)
} }
checkref := func(rc int32) {
t.Helper()
if buf.refcnt != rc {
t.Errorf("%v: refcnt=%v ; want %v", i, buf.refcnt, rc)
}
}
checkref(0)
// free and allocate another buf -> it must be it // free and allocate another buf -> it must be it
data := buf.Data data := buf.Data
buf.Free() buf.Release()
checkref(-1)
buf2 := BufAlloc(size) buf2 := BufAlloc(size)
// from pool -> it must be the same
if int(i) < order0 + len(bufPoolv) {
if !(buf2 == buf && sliceDataPtr(buf2.Data) == sliceDataPtr(data)) {
t.Errorf("%v: buffer not reused on free/realloc", i)
}
// not from pool - memory won't be reused // not from pool - memory won't be reused
} else { if int(i) >= order0 + len(bufPoolv) {
if buf2 == buf || sliceDataPtr(buf2.Data) == sliceDataPtr(data) { if buf2 == buf || sliceDataPtr(buf2.Data) == sliceDataPtr(data) {
t.Errorf("%v: buffer reused but should not", i) t.Errorf("%v: buffer reused but should not", i)
} }
continue
}
// from pool -> it must be the same
if !(buf2 == buf && sliceDataPtr(buf2.Data) == sliceDataPtr(data)) {
t.Errorf("%v: buffer not reused on free/realloc", i)
}
checkref(0)
// add more ref and release original buf - it must stay alive
buf.Incref()
checkref(1)
buf.Release()
checkref(0)
// another alloc must be different
buf2 = BufAlloc(size)
checkref(0)
if buf2 == buf || sliceDataPtr(buf2.Data) == sliceDataPtr(data) {
t.Errorf("%v: buffer reused but should not", i)
}
// release buf again -> should go to pool
buf.Release()
checkref(-1)
buf2 = BufAlloc(size)
if !(buf2 == buf && sliceDataPtr(buf2.Data) == sliceDataPtr(data)) {
t.Errorf("%v: buffer not reused on free/realloc", i)
} }
checkref(0)
} }
} }
// empty .s so `go build` does not use -complete for go:linkname to work
...@@ -96,8 +96,8 @@ type revCacheEntry struct { ...@@ -96,8 +96,8 @@ type revCacheEntry struct {
// case when loadBefore with tid > cache.before was called. // case when loadBefore with tid > cache.before was called.
before zodb.Tid before zodb.Tid
// loading result: object (data, serial) or error // loading result: object (buf, serial) or error
data []byte buf *zodb.Buf
serial zodb.Tid serial zodb.Tid
err error err error
...@@ -108,7 +108,7 @@ type revCacheEntry struct { ...@@ -108,7 +108,7 @@ type revCacheEntry struct {
// StorLoader represents loading part of a storage. // StorLoader represents loading part of a storage.
// XXX -> zodb? // XXX -> zodb?
type StorLoader interface { type StorLoader interface {
Load(ctx context.Context, xid zodb.Xid) (data []byte, serial zodb.Tid, err error) Load(ctx context.Context, xid zodb.Xid) (buf *zodb.Buf, serial zodb.Tid, err error)
} }
// lock order: Cache.mu > oidCacheEntry // lock order: Cache.mu > oidCacheEntry
...@@ -149,7 +149,7 @@ func (c *Cache) SetSizeMax(sizeMax int) { ...@@ -149,7 +149,7 @@ func (c *Cache) SetSizeMax(sizeMax int) {
// Load loads data from database via cache. // Load loads data from database via cache.
// //
// If data is already in cache - cached content is returned. // If data is already in cache - cached content is returned.
func (c *Cache) Load(ctx context.Context, xid zodb.Xid) (data []byte, serial zodb.Tid, err error) { func (c *Cache) Load(ctx context.Context, xid zodb.Xid) (buf *zodb.Buf, serial zodb.Tid, err error) {
rce, rceNew := c.lookupRCE(xid) rce, rceNew := c.lookupRCE(xid)
// rce is already in cache - use it // rce is already in cache - use it
...@@ -177,7 +177,8 @@ func (c *Cache) Load(ctx context.Context, xid zodb.Xid) (data []byte, serial zod ...@@ -177,7 +177,8 @@ func (c *Cache) Load(ctx context.Context, xid zodb.Xid) (data []byte, serial zod
} }
} }
return rce.data, rce.serial, nil rce.buf.XIncref()
return rce.buf, rce.serial, nil
} }
// Prefetch arranges for data to be eventually present in cache. // Prefetch arranges for data to be eventually present in cache.
...@@ -293,19 +294,20 @@ func (c *Cache) lookupRCE(xid zodb.Xid) (rce *revCacheEntry, rceNew bool) { ...@@ -293,19 +294,20 @@ func (c *Cache) lookupRCE(xid zodb.Xid) (rce *revCacheEntry, rceNew bool) {
// loading completion is signalled by closing rce.ready. // loading completion is signalled by closing rce.ready.
func (c *Cache) loadRCE(ctx context.Context, rce *revCacheEntry, oid zodb.Oid) { func (c *Cache) loadRCE(ctx context.Context, rce *revCacheEntry, oid zodb.Oid) {
oce := rce.parent oce := rce.parent
data, serial, err := c.loader.Load(ctx, zodb.Xid{ buf, serial, err := c.loader.Load(ctx, zodb.Xid{
Oid: oid, Oid: oid,
XTid: zodb.XTid{Tid: rce.before, TidBefore: true}, XTid: zodb.XTid{Tid: rce.before, TidBefore: true},
}) })
// normalize data/serial if it was error // normalize buf/serial if it was error
if err != nil { if err != nil {
// XXX err == canceled? -> ? // XXX err == canceled? -> ?
data = nil buf.XRelease()
buf = nil
serial = 0 serial = 0
} }
rce.serial = serial rce.serial = serial
rce.data = data rce.buf = buf
rce.err = err rce.err = err
// verify db gives serial < before // verify db gives serial < before
if rce.serial >= rce.before { if rce.serial >= rce.before {
...@@ -313,7 +315,7 @@ func (c *Cache) loadRCE(ctx context.Context, rce *revCacheEntry, oid zodb.Oid) { ...@@ -313,7 +315,7 @@ func (c *Cache) loadRCE(ctx context.Context, rce *revCacheEntry, oid zodb.Oid) {
} }
close(rce.ready) close(rce.ready)
δsize := len(rce.data) δsize := rce.buf.Len()
// merge rce with adjacent entries in parent // merge rce with adjacent entries in parent
// ( e.g. loadBefore(3) and loadBefore(4) results in the same data loaded if // ( e.g. loadBefore(3) and loadBefore(4) results in the same data loaded if
...@@ -339,9 +341,10 @@ func (c *Cache) loadRCE(ctx context.Context, rce *revCacheEntry, oid zodb.Oid) { ...@@ -339,9 +341,10 @@ func (c *Cache) loadRCE(ctx context.Context, rce *revCacheEntry, oid zodb.Oid) {
if i + 1 < len(oce.rcev) { if i + 1 < len(oce.rcev) {
rceNext := oce.rcev[i+1] rceNext := oce.rcev[i+1]
if rceNext.loaded() && tryMerge(rce, rceNext, rce, oid) { if rceNext.loaded() && tryMerge(rce, rceNext, rce, oid) {
// not δsize -= len(rce.data) // not δsize -= len(rce.buf.Data)
// tryMerge can change rce.data if consistency is broken // tryMerge can change rce.buf if consistency is broken
δsize = 0 δsize = 0
rce.buf.XRelease()
rce = rceNext rce = rceNext
rceDropped = true rceDropped = true
} }
...@@ -355,8 +358,9 @@ func (c *Cache) loadRCE(ctx context.Context, rce *revCacheEntry, oid zodb.Oid) { ...@@ -355,8 +358,9 @@ func (c *Cache) loadRCE(ctx context.Context, rce *revCacheEntry, oid zodb.Oid) {
if rcePrev.loaded() && tryMerge(rcePrev, rce, rce, oid) { if rcePrev.loaded() && tryMerge(rcePrev, rce, rce, oid) {
rcePrevDropped = rcePrev rcePrevDropped = rcePrev
if rcePrev.accounted { if rcePrev.accounted {
δsize -= len(rcePrev.data) δsize -= rcePrev.buf.Len()
} }
rcePrev.buf.XRelease()
} }
} }
...@@ -506,8 +510,9 @@ func (c *Cache) gc() { ...@@ -506,8 +510,9 @@ func (c *Cache) gc() {
i := oce.find(rce) i := oce.find(rce)
if i != -1 { // rce could be already deleted by e.g. merge if i != -1 { // rce could be already deleted by e.g. merge
oce.deli(i) oce.deli(i)
c.size -= len(rce.data) c.size -= rce.buf.Len()
//fmt.Printf("gc: free %d bytes\n", len(rce.data)) //fmt.Printf("gc: free %d bytes\n", rce.buf.Len()))
rce.buf.XRelease()
} }
oce.Unlock() oce.Unlock()
...@@ -632,6 +637,7 @@ func errDB(oid zodb.Oid, format string, argv ...interface{}) error { ...@@ -632,6 +637,7 @@ func errDB(oid zodb.Oid, format string, argv ...interface{}) error {
// errDB marks rce with database inconsistency error // errDB marks rce with database inconsistency error
func (rce *revCacheEntry) errDB(oid zodb.Oid, format string, argv ...interface{}) { func (rce *revCacheEntry) errDB(oid zodb.Oid, format string, argv ...interface{}) {
rce.err = errDB(oid, format, argv...) rce.err = errDB(oid, format, argv...)
rce.data = nil rce.buf.XRelease()
rce.buf = nil
rce.serial = 0 rce.serial = 0
} }
...@@ -50,9 +50,27 @@ type tOidData struct { ...@@ -50,9 +50,27 @@ type tOidData struct {
err error // e.g. io error err error // e.g. io error
} }
func (stor *tStorage) Load(_ context.Context, xid zodb.Xid) (data []byte, serial zodb.Tid, err error) { // create new buffer with specified content copied there.
func mkbuf(data []byte) *zodb.Buf {
buf := zodb.BufAlloc(len(data))
copy(buf.Data, data)
return buf
}
// check whether buffers hold same data or both are nil.
//
// NOTE we ignore refcnt here
func bufSame(buf1, buf2 *zodb.Buf) bool {
if buf1 == nil {
return (buf2 == nil)
}
return reflect.DeepEqual(buf1.Data, buf2.Data)
}
func (stor *tStorage) Load(_ context.Context, xid zodb.Xid) (buf *zodb.Buf, serial zodb.Tid, err error) {
//fmt.Printf("> load(%v)\n", xid) //fmt.Printf("> load(%v)\n", xid)
//defer func() { fmt.Printf("< %v, %v, %v\n", data, serial, err) }() //defer func() { fmt.Printf("< %v, %v, %v\n", buf.XData(), serial, err) }()
tid := xid.Tid tid := xid.Tid
if !xid.TidBefore { if !xid.TidBefore {
tid++ // XXX overflow? tid++ // XXX overflow?
...@@ -85,7 +103,7 @@ func (stor *tStorage) Load(_ context.Context, xid zodb.Xid) (data []byte, serial ...@@ -85,7 +103,7 @@ func (stor *tStorage) Load(_ context.Context, xid zodb.Xid) (data []byte, serial
if e != nil { if e != nil {
s = 0 // obey protocol of returning 0 with error s = 0 // obey protocol of returning 0 with error
} }
return datav[i].data, s, e return mkbuf(datav[i].data), s, e
} }
var ioerr = errors.New("input/output error") var ioerr = errors.New("input/output error")
...@@ -142,15 +160,17 @@ func TestCache(t *testing.T) { ...@@ -142,15 +160,17 @@ func TestCache(t *testing.T) {
}, },
} }
b := mkbuf
c := NewCache(tstor, 100 /* > Σ all data */) c := NewCache(tstor, 100 /* > Σ all data */)
ctx := context.Background() ctx := context.Background()
checkLoad := func(xid zodb.Xid, data []byte, serial zodb.Tid, err error) { checkLoad := func(xid zodb.Xid, buf *zodb.Buf, serial zodb.Tid, err error) {
t.Helper() t.Helper()
bad := &bytes.Buffer{} bad := &bytes.Buffer{}
d, s, e := c.Load(ctx, xid) b, s, e := c.Load(ctx, xid)
if !reflect.DeepEqual(data, d) { if !bufSame(buf, b) {
fmt.Fprintf(bad, "data:\n%s\n", pretty.Compare(data, d)) fmt.Fprintf(bad, "buf:\n%s\n", pretty.Compare(buf, b))
} }
if serial != s { if serial != s {
fmt.Fprintf(bad, "serial:\n%s\n", pretty.Compare(serial, s)) fmt.Fprintf(bad, "serial:\n%s\n", pretty.Compare(serial, s))
...@@ -164,7 +184,7 @@ func TestCache(t *testing.T) { ...@@ -164,7 +184,7 @@ func TestCache(t *testing.T) {
} }
} }
checkRCE := func(rce *revCacheEntry, before, serial zodb.Tid, data []byte, err error) { checkRCE := func(rce *revCacheEntry, before, serial zodb.Tid, buf *zodb.Buf, err error) {
t.Helper() t.Helper()
bad := &bytes.Buffer{} bad := &bytes.Buffer{}
if rce.before != before { if rce.before != before {
...@@ -173,8 +193,8 @@ func TestCache(t *testing.T) { ...@@ -173,8 +193,8 @@ func TestCache(t *testing.T) {
if rce.serial != serial { if rce.serial != serial {
fmt.Fprintf(bad, "serial:\n%s\n", pretty.Compare(serial, rce.serial)) fmt.Fprintf(bad, "serial:\n%s\n", pretty.Compare(serial, rce.serial))
} }
if !reflect.DeepEqual(rce.data, data) { if !bufSame(rce.buf, buf) {
fmt.Fprintf(bad, "data:\n%s\n", pretty.Compare(data, rce.data)) fmt.Fprintf(bad, "buf:\n%s\n", pretty.Compare(buf, rce.buf))
} }
if !reflect.DeepEqual(rce.err, err) { if !reflect.DeepEqual(rce.err, err) {
fmt.Fprintf(bad, "err:\n%s\n", pretty.Compare(err, rce.err)) fmt.Fprintf(bad, "err:\n%s\n", pretty.Compare(err, rce.err))
...@@ -209,7 +229,7 @@ func TestCache(t *testing.T) { ...@@ -209,7 +229,7 @@ func TestCache(t *testing.T) {
debug.Sprint(h), debug.Sprint(hp)) debug.Sprint(h), debug.Sprint(hp))
} }
rce := h.rceFromInLRU() rce := h.rceFromInLRU()
size += len(rce.data) size += rce.buf.Len()
mruv = append(mruv, rce) mruv = append(mruv, rce)
} }
if !reflect.DeepEqual(mruv, mruvOk) { if !reflect.DeepEqual(mruv, mruvOk) {
...@@ -253,24 +273,24 @@ func TestCache(t *testing.T) { ...@@ -253,24 +273,24 @@ func TestCache(t *testing.T) {
checkMRU(0, rce1_b4) checkMRU(0, rce1_b4)
// load <6 -> new rce entry with data // load <6 -> new rce entry with data
checkLoad(xidlt(1,6), hello, 4, nil) checkLoad(xidlt(1,6), b(hello), 4, nil)
ok1(len(oce1.rcev) == 2) ok1(len(oce1.rcev) == 2)
rce1_b6 := oce1.rcev[1] rce1_b6 := oce1.rcev[1]
checkRCE(rce1_b6, 6, 4, hello, nil) checkRCE(rce1_b6, 6, 4, b(hello), nil)
checkOCE(1, rce1_b4, rce1_b6) checkOCE(1, rce1_b4, rce1_b6)
checkMRU(5, rce1_b6, rce1_b4) checkMRU(5, rce1_b6, rce1_b4)
// load <5 -> <5 merged with <6 // load <5 -> <5 merged with <6
checkLoad(xidlt(1,5), hello, 4, nil) checkLoad(xidlt(1,5), b(hello), 4, nil)
checkOCE(1, rce1_b4, rce1_b6) checkOCE(1, rce1_b4, rce1_b6)
checkMRU(5, rce1_b6, rce1_b4) checkMRU(5, rce1_b6, rce1_b4)
// load <7 -> <6 merged with <7 // load <7 -> <6 merged with <7
checkLoad(xidlt(1,7), hello, 4, nil) checkLoad(xidlt(1,7), b(hello), 4, nil)
ok1(len(oce1.rcev) == 2) ok1(len(oce1.rcev) == 2)
rce1_b7 := oce1.rcev[1] rce1_b7 := oce1.rcev[1]
ok1(rce1_b7 != rce1_b6) ok1(rce1_b7 != rce1_b6)
checkRCE(rce1_b7, 7, 4, hello, nil) checkRCE(rce1_b7, 7, 4, b(hello), nil)
checkOCE(1, rce1_b4, rce1_b7) checkOCE(1, rce1_b4, rce1_b7)
checkMRU(5, rce1_b7, rce1_b4) checkMRU(5, rce1_b7, rce1_b4)
...@@ -291,19 +311,19 @@ func TestCache(t *testing.T) { ...@@ -291,19 +311,19 @@ func TestCache(t *testing.T) {
checkMRU(5, rce1_b10, rce1_b8, rce1_b7, rce1_b4) checkMRU(5, rce1_b10, rce1_b8, rce1_b7, rce1_b4)
// load <11 -> new data rce, not merged with ioerr @<10 // load <11 -> new data rce, not merged with ioerr @<10
checkLoad(xidlt(1,11), world, 10, nil) checkLoad(xidlt(1,11), b(world), 10, nil)
ok1(len(oce1.rcev) == 5) ok1(len(oce1.rcev) == 5)
rce1_b11 := oce1.rcev[4] rce1_b11 := oce1.rcev[4]
checkRCE(rce1_b11, 11, 10, world, nil) checkRCE(rce1_b11, 11, 10, b(world), nil)
checkOCE(1, rce1_b4, rce1_b7, rce1_b8, rce1_b10, rce1_b11) checkOCE(1, rce1_b4, rce1_b7, rce1_b8, rce1_b10, rce1_b11)
checkMRU(12, rce1_b11, rce1_b10, rce1_b8, rce1_b7, rce1_b4) checkMRU(12, rce1_b11, rce1_b10, rce1_b8, rce1_b7, rce1_b4)
// load <12 -> <11 merged with <12 // load <12 -> <11 merged with <12
checkLoad(xidlt(1,12), world, 10, nil) checkLoad(xidlt(1,12), b(world), 10, nil)
ok1(len(oce1.rcev) == 5) ok1(len(oce1.rcev) == 5)
rce1_b12 := oce1.rcev[4] rce1_b12 := oce1.rcev[4]
ok1(rce1_b12 != rce1_b11) ok1(rce1_b12 != rce1_b11)
checkRCE(rce1_b12, 12, 10, world, nil) checkRCE(rce1_b12, 12, 10, b(world), nil)
checkOCE(1, rce1_b4, rce1_b7, rce1_b8, rce1_b10, rce1_b12) checkOCE(1, rce1_b4, rce1_b7, rce1_b8, rce1_b10, rce1_b12)
checkMRU(12, rce1_b12, rce1_b10, rce1_b8, rce1_b7, rce1_b4) checkMRU(12, rce1_b12, rce1_b10, rce1_b8, rce1_b7, rce1_b4)
...@@ -315,7 +335,7 @@ func TestCache(t *testing.T) { ...@@ -315,7 +335,7 @@ func TestCache(t *testing.T) {
rce1_b16, new16 := c.lookupRCE(xidlt(1,16)) rce1_b16, new16 := c.lookupRCE(xidlt(1,16))
ok1(new16) ok1(new16)
rce1_b16.serial = 10 rce1_b16.serial = 10
rce1_b16.data = world rce1_b16.buf = mkbuf(world)
// here: first half of loadRCE(<16) before close(<16.ready) // here: first half of loadRCE(<16) before close(<16.ready)
checkOCE(1, rce1_b4, rce1_b7, rce1_b8, rce1_b10, rce1_b12, rce1_b16) checkOCE(1, rce1_b4, rce1_b7, rce1_b8, rce1_b10, rce1_b12, rce1_b16)
ok1(!rce1_b16.loaded()) ok1(!rce1_b16.loaded())
...@@ -338,9 +358,9 @@ func TestCache(t *testing.T) { ...@@ -338,9 +358,9 @@ func TestCache(t *testing.T) {
// <16 did not yet took oce lock so c.size is temporarily reduced and // <16 did not yet took oce lock so c.size is temporarily reduced and
// <16 is not yet on LRU list) // <16 is not yet on LRU list)
c.loadRCE(ctx, rce1_b14, 1) c.loadRCE(ctx, rce1_b14, 1)
checkRCE(rce1_b14, 14, 10, world, nil) checkRCE(rce1_b14, 14, 10, b(world), nil)
checkRCE(rce1_b16, 16, 10, world, nil) checkRCE(rce1_b16, 16, 10, b(world), nil)
checkRCE(rce1_b12, 12, 10, world, nil) checkRCE(rce1_b12, 12, 10, b(world), nil)
checkOCE(1, rce1_b4, rce1_b7, rce1_b8, rce1_b10, rce1_b16) checkOCE(1, rce1_b4, rce1_b7, rce1_b8, rce1_b10, rce1_b16)
checkMRU(5 /*was 12*/, rce1_b10, rce1_b8, rce1_b7, rce1_b4) checkMRU(5 /*was 12*/, rce1_b10, rce1_b8, rce1_b7, rce1_b4)
...@@ -359,7 +379,7 @@ func TestCache(t *testing.T) { ...@@ -359,7 +379,7 @@ func TestCache(t *testing.T) {
// (<17 loads but not yet takes oce lock) // (<17 loads but not yet takes oce lock)
rce1_b17.serial = 16 rce1_b17.serial = 16
rce1_b17.data = zz rce1_b17.buf = mkbuf(zz)
close(rce1_b17.ready) close(rce1_b17.ready)
ok1(rce1_b17.loaded()) ok1(rce1_b17.loaded())
checkOCE(1, rce1_b4, rce1_b7, rce1_b8, rce1_b10, rce1_b16, rce1_b17, rce1_b18) checkOCE(1, rce1_b4, rce1_b7, rce1_b8, rce1_b10, rce1_b16, rce1_b17, rce1_b18)
...@@ -367,8 +387,8 @@ func TestCache(t *testing.T) { ...@@ -367,8 +387,8 @@ func TestCache(t *testing.T) {
// (<18 loads and takes oce lock first - merge <17 with <18) // (<18 loads and takes oce lock first - merge <17 with <18)
c.loadRCE(ctx, rce1_b18, 1) c.loadRCE(ctx, rce1_b18, 1)
checkRCE(rce1_b18, 18, 16, zz, nil) checkRCE(rce1_b18, 18, 16, b(zz), nil)
checkRCE(rce1_b17, 17, 16, zz, nil) checkRCE(rce1_b17, 17, 16, b(zz), nil)
checkOCE(1, rce1_b4, rce1_b7, rce1_b8, rce1_b10, rce1_b16, rce1_b18) checkOCE(1, rce1_b4, rce1_b7, rce1_b8, rce1_b10, rce1_b16, rce1_b18)
checkMRU(14, rce1_b18, rce1_b16, rce1_b10, rce1_b8, rce1_b7, rce1_b4) checkMRU(14, rce1_b18, rce1_b16, rce1_b10, rce1_b8, rce1_b7, rce1_b4)
...@@ -377,15 +397,15 @@ func TestCache(t *testing.T) { ...@@ -377,15 +397,15 @@ func TestCache(t *testing.T) {
ok1(len(oce1.rcev) == 6) ok1(len(oce1.rcev) == 6)
rce1_b20 := oce1.rcev[5] rce1_b20 := oce1.rcev[5]
ok1(rce1_b20 != rce1_b18) ok1(rce1_b20 != rce1_b18)
checkRCE(rce1_b20, 20, 16, zz, nil) checkRCE(rce1_b20, 20, 16, b(zz), nil)
checkOCE(1, rce1_b4, rce1_b7, rce1_b8, rce1_b10, rce1_b16, rce1_b20) checkOCE(1, rce1_b4, rce1_b7, rce1_b8, rce1_b10, rce1_b16, rce1_b20)
checkMRU(14, rce1_b20, rce1_b16, rce1_b10, rce1_b8, rce1_b7, rce1_b4) checkMRU(14, rce1_b20, rce1_b16, rce1_b10, rce1_b8, rce1_b7, rce1_b4)
// load =20 -> new <21 // load =20 -> new <21
checkLoad(xideq(1,20), www, 20, nil) checkLoad(xideq(1,20), b(www), 20, nil)
ok1(len(oce1.rcev) == 7) ok1(len(oce1.rcev) == 7)
rce1_b21 := oce1.rcev[6] rce1_b21 := oce1.rcev[6]
checkRCE(rce1_b21, 21, 20, www, nil) checkRCE(rce1_b21, 21, 20, b(www), nil)
checkOCE(1, rce1_b4, rce1_b7, rce1_b8, rce1_b10, rce1_b16, rce1_b20, rce1_b21) checkOCE(1, rce1_b4, rce1_b7, rce1_b8, rce1_b10, rce1_b16, rce1_b20, rce1_b21)
checkMRU(17, rce1_b21, rce1_b20, rce1_b16, rce1_b10, rce1_b8, rce1_b7, rce1_b4) checkMRU(17, rce1_b21, rce1_b20, rce1_b16, rce1_b10, rce1_b8, rce1_b7, rce1_b4)
...@@ -394,7 +414,7 @@ func TestCache(t *testing.T) { ...@@ -394,7 +414,7 @@ func TestCache(t *testing.T) {
ok1(len(oce1.rcev) == 7) ok1(len(oce1.rcev) == 7)
rce1_b22 := oce1.rcev[6] rce1_b22 := oce1.rcev[6]
ok1(rce1_b22 != rce1_b21) ok1(rce1_b22 != rce1_b21)
checkRCE(rce1_b22, 22, 20, www, nil) checkRCE(rce1_b22, 22, 20, b(www), nil)
checkOCE(1, rce1_b4, rce1_b7, rce1_b8, rce1_b10, rce1_b16, rce1_b20, rce1_b22) checkOCE(1, rce1_b4, rce1_b7, rce1_b8, rce1_b10, rce1_b16, rce1_b20, rce1_b22)
checkMRU(17, rce1_b22, rce1_b20, rce1_b16, rce1_b10, rce1_b8, rce1_b7, rce1_b4) checkMRU(17, rce1_b22, rce1_b20, rce1_b16, rce1_b10, rce1_b8, rce1_b7, rce1_b4)
...@@ -468,10 +488,10 @@ func TestCache(t *testing.T) { ...@@ -468,10 +488,10 @@ func TestCache(t *testing.T) {
// ---- verify how LRU changes for in-cache loads ---- // ---- verify how LRU changes for in-cache loads ----
checkMRU(17, rce1_b9, rce1_b22, rce1_b20, rce1_b16, rce1_b10, rce1_b8, rce1_b7, rce1_b4) checkMRU(17, rce1_b9, rce1_b22, rce1_b20, rce1_b16, rce1_b10, rce1_b8, rce1_b7, rce1_b4)
checkLoad(xidlt(1,7), hello, 4, nil) checkLoad(xidlt(1,7), b(hello), 4, nil)
checkMRU(17, rce1_b7, rce1_b9, rce1_b22, rce1_b20, rce1_b16, rce1_b10, rce1_b8, rce1_b4) checkMRU(17, rce1_b7, rce1_b9, rce1_b22, rce1_b20, rce1_b16, rce1_b10, rce1_b8, rce1_b4)
checkLoad(xidlt(1,16), world, 10, nil) checkLoad(xidlt(1,16), b(world), 10, nil)
checkMRU(17, rce1_b16, rce1_b7, rce1_b9, rce1_b22, rce1_b20, rce1_b10, rce1_b8, rce1_b4) checkMRU(17, rce1_b16, rce1_b7, rce1_b9, rce1_b22, rce1_b20, rce1_b10, rce1_b8, rce1_b4)
...@@ -517,7 +537,7 @@ func TestCache(t *testing.T) { ...@@ -517,7 +537,7 @@ func TestCache(t *testing.T) {
ok1(len(oce1.rcev) == 4) ok1(len(oce1.rcev) == 4)
rce1_b20_2 := oce1.rcev[3] rce1_b20_2 := oce1.rcev[3]
ok1(rce1_b20_2 != rce1_b20) ok1(rce1_b20_2 != rce1_b20)
checkRCE(rce1_b20_2, 20, 16, zz, nil) checkRCE(rce1_b20_2, 20, 16, b(zz), nil)
checkOCE(1, rce1_b7, rce1_b9, rce1_b16, rce1_b20_2) checkOCE(1, rce1_b7, rce1_b9, rce1_b16, rce1_b20_2)
checkMRU(14, rce1_b20_2, rce1_b16, rce1_b7, rce1_b9) checkMRU(14, rce1_b20_2, rce1_b16, rce1_b7, rce1_b9)
...@@ -531,7 +551,7 @@ func TestCache(t *testing.T) { ...@@ -531,7 +551,7 @@ func TestCache(t *testing.T) {
// - loaded <78 (big, size=10) // - loaded <78 (big, size=10)
ok1(len(oce1.rcev) == 2) ok1(len(oce1.rcev) == 2)
rce1_b78 := oce1.rcev[1] rce1_b78 := oce1.rcev[1]
checkRCE(rce1_b78, 78, 77, big, nil) checkRCE(rce1_b78, 78, 77, b(big), nil)
checkOCE(1, rce1_b20_2, rce1_b78) checkOCE(1, rce1_b20_2, rce1_b78)
checkMRU(12, rce1_b78, rce1_b20_2) checkMRU(12, rce1_b78, rce1_b20_2)
...@@ -544,7 +564,7 @@ func TestCache(t *testing.T) { ...@@ -544,7 +564,7 @@ func TestCache(t *testing.T) {
// and still loading works (because even if though rce's are evicted // and still loading works (because even if though rce's are evicted
// they stay live while someone user waits and uses it) // they stay live while someone user waits and uses it)
checkLoad(xidlt(1,5), hello, 4, nil) checkLoad(xidlt(1,5), b(hello), 4, nil)
tc.Expect(gcstart, gcfinish) tc.Expect(gcstart, gcfinish)
checkOCE(1) checkOCE(1)
checkMRU(0) checkMRU(0)
......
...@@ -369,7 +369,7 @@ func (zi *zIter) NextData() (*zodb.DataInfo, error) { ...@@ -369,7 +369,7 @@ func (zi *zIter) NextData() (*zodb.DataInfo, error) {
// - need to use separate dh because of this // - need to use separate dh because of this
zi.dhLoading = zi.iter.Datah zi.dhLoading = zi.iter.Datah
if zi.dataBuf != nil { if zi.dataBuf != nil {
zi.dataBuf.Free() zi.dataBuf.Release()
zi.dataBuf = nil zi.dataBuf = nil
} }
zi.dataBuf, err = zi.dhLoading.LoadData(zi.iter.R) zi.dataBuf, err = zi.dhLoading.LoadData(zi.iter.R)
......
...@@ -685,7 +685,7 @@ func (dh *DataHeader) LoadData(r io.ReaderAt) (*zodb.Buf, error) { ...@@ -685,7 +685,7 @@ func (dh *DataHeader) LoadData(r io.ReaderAt) (*zodb.Buf, error) {
buf := zodb.BufAlloc64(dh.DataLen) buf := zodb.BufAlloc64(dh.DataLen)
_, err := r.ReadAt(buf.Data, dh.Pos + DataHeaderSize) _, err := r.ReadAt(buf.Data, dh.Pos + DataHeaderSize)
if err != nil { if err != nil {
buf.Free() buf.Release()
return nil, dh.err("read data", noEOF(err)) // XXX recheck return nil, dh.err("read data", noEOF(err)) // XXX recheck
} }
......
...@@ -194,7 +194,7 @@ func (d *DumperFsDump) DumpTxn(buf *xfmt.Buffer, it *fs1.Iter) error { ...@@ -194,7 +194,7 @@ func (d *DumperFsDump) DumpTxn(buf *xfmt.Buffer, it *fs1.Iter) error {
} }
buf .S("\n") buf .S("\n")
dbuf.Free() dbuf.Release()
} }
return nil return nil
......
...@@ -40,7 +40,7 @@ func Catobj(ctx context.Context, w io.Writer, stor zodb.IStorage, xid zodb.Xid) ...@@ -40,7 +40,7 @@ func Catobj(ctx context.Context, w io.Writer, stor zodb.IStorage, xid zodb.Xid)
} }
_, err = w.Write(buf.Data) // NOTE deleted data are returned as err by Load _, err = w.Write(buf.Data) // NOTE deleted data are returned as err by Load
buf.Free() buf.Release()
return err // XXX err ctx ? return err // XXX err ctx ?
} }
...@@ -61,7 +61,7 @@ func Dumpobj(ctx context.Context, w io.Writer, stor zodb.IStorage, xid zodb.Xid, ...@@ -61,7 +61,7 @@ func Dumpobj(ctx context.Context, w io.Writer, stor zodb.IStorage, xid zodb.Xid,
d := dumper{W: w, HashOnly: hashOnly} d := dumper{W: w, HashOnly: hashOnly}
err = d.DumpData(&objInfo) err = d.DumpData(&objInfo)
buf.Free() buf.Release()
return err return err
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment