Commit 51026eb3 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 42a61864
...@@ -177,7 +177,7 @@ func isErrNoData(err error) bool { ...@@ -177,7 +177,7 @@ func isErrNoData(err error) bool {
// Load loads data from database via cache. // Load loads data from database via cache.
// //
// If data is already in cache cached content is returned. // If data is already in cache cached content is returned.
func (c *Cache) Load(xid zodb.Xid) (data []byte, tid zodb.Tid, err error) { func (c *Cache) Load(xid zodb.Xid) (data []byte, serial zodb.Tid, err error) {
rce, rceNew := c.lookupRCE(xid) rce, rceNew := c.lookupRCE(xid)
// rce is already in cache - use it // rce is already in cache - use it
...@@ -191,10 +191,21 @@ func (c *Cache) Load(xid zodb.Xid) (data []byte, tid zodb.Tid, err error) { ...@@ -191,10 +191,21 @@ func (c *Cache) Load(xid zodb.Xid) (data []byte, tid zodb.Tid, err error) {
} else { } else {
// XXX use connection poll // XXX use connection poll
// XXX or it should be cared by loader? // XXX or it should be cared by loader?
c.loadRCE(rce, xid) c.loadRCE(rce, xid.Oid)
} }
return rce.data, rce.serial, rce.userErr(xid) if rce.err != nil {
return nil, 0, rce.userErr(xid)
}
// for loadSerial - check we have exact hit - else "nodata"
if !xid.TidBefore {
if rce.serial != xid.Tid {
return nil, 0, &zodb.ErrXidMissing{xid}
}
}
return rce.data, rce.serial, nil
} }
// Prefetch arranges for data to be eventually present in cache. // Prefetch arranges for data to be eventually present in cache.
...@@ -207,19 +218,30 @@ func (c *Cache) Prefetch(xid zodb.Xid) { ...@@ -207,19 +218,30 @@ func (c *Cache) Prefetch(xid zodb.Xid) {
// XXX!rceNew -> adjust LRU? // XXX!rceNew -> adjust LRU?
// spawn prefetch in the background if rce was not yet loaded // spawn loading in the background if rce was not yet loaded
if rceNew { if rceNew {
// XXX use connection poll // XXX use connection poll
go c.loadRCE(rce, xid) go c.loadRCE(rce, xid.Oid)
} }
} }
// lookupRCE returns revCacheEntry corresponding to xid. // lookupRCE returns revCacheEntry corresponding to xid.
// //
// If xid indicates loadSerial query (xid.TidBefore=false) the rce will be
// lookuped and eventually loaded as if it was queried with <(serial+1).
// It is caller responsibility to check loadSerial cases for exact hits after
// rce will become ready.
//
// rceNew indicates whether rce is new and so loading on it has not been // rceNew indicates whether rce is new and so loading on it has not been
// initiated yet. rce should be loaded with loadRCE. // initiated yet. rce should be loaded with loadRCE.
func (c *Cache) lookupRCE(xid zodb.Xid) (rce *revCacheEntry, rceNew bool) { func (c *Cache) lookupRCE(xid zodb.Xid) (rce *revCacheEntry, rceNew bool) {
// loadSerial(serial) -> loadBefore(serial+1)
before := xid.Tid
if !xid.TidBefore {
before++ // XXX overflow
}
// oid -> oce (oidCacheEntry) ; create new empty oce if not yet there // oid -> oce (oidCacheEntry) ; create new empty oce if not yet there
// exit with oce locked and cache.before read consistently // exit with oce locked and cache.before read consistently
c.mu.RLock() c.mu.RLock()
...@@ -245,52 +267,46 @@ func (c *Cache) lookupRCE(xid zodb.Xid) (rce *revCacheEntry, rceNew bool) { ...@@ -245,52 +267,46 @@ func (c *Cache) lookupRCE(xid zodb.Xid) (rce *revCacheEntry, rceNew bool) {
} }
// oce, before -> rce (revCacheEntry) // oce, before -> rce (revCacheEntry)
if xid.TidBefore { l := len(oce.rcev)
l := len(oce.rcev) i := sort.Search(l, func(i int) bool {
i := sort.Search(l, func(i int) bool { before_i := oce.rcev[i].before
before := oce.rcev[i].before if before_i == zodb.TidMax {
if before == zodb.TidMax { before_i = cacheBefore
before = cacheBefore }
} return before <= before_i
return xid.Tid <= before })
})
switch {
switch { // not found - before > max(rcev.before) - insert new max entry
// not found - tid > max(rcev.before) - insert new max entry case i == l:
case i == l: rce = oce.newRevEntry(i, before)
rce = oce.newRevEntry(i, xid.Tid) if rce.before == cacheBefore {
if rce.before == cacheBefore { // FIXME better do this when the entry becomes loaded ?
// FIXME better do this when the entry becomes loaded ? // XXX vs concurrent invalidations?
// XXX vs concurrent invalidations? rce.before = zodb.TidMax
rce.before = zodb.TidMax }
} rceNew = true
rceNew = true
// found: // found:
// tid <= rcev[i].before // before <= rcev[i].before
// tid > rcev[i-1].before // before > rcev[i-1].before
// exact match - we already have entry for this before // exact match - we already have entry for this before
case xid.Tid == oce.rcev[i].before: case before == oce.rcev[i].before:
rce = oce.rcev[i] rce = oce.rcev[i]
// non-exact match: // non-exact match:
// - same entry if q(before) ∈ (serial, before] // - same entry if q(before) ∈ (serial, before]
// - we can also reuse this entry if q(before) < before and err="nodata" // - we can also reuse this entry if q(before) < before and err="nodata"
case oce.rcev[i].loaded() && ( case oce.rcev[i].loaded() && (
(oce.rcev[i].err == nil && oce.rcev[i].serial < xid.Tid) || (oce.rcev[i].err == nil && oce.rcev[i].serial < before) ||
(isErrNoData(oce.rcev[i].err) && xid.Tid < oce.rcev[i].before)): (isErrNoData(oce.rcev[i].err) && before < oce.rcev[i].before)):
rce = oce.rcev[i] rce = oce.rcev[i]
// otherwise - insert new entry // otherwise - insert new entry
default: default:
rce = oce.newRevEntry(i, xid.Tid) rce = oce.newRevEntry(i, before)
rceNew = true rceNew = true
}
// XXX serial -> revCacheEntry
} else {
// TODO
} }
oce.Unlock() oce.Unlock()
...@@ -300,9 +316,13 @@ func (c *Cache) lookupRCE(xid zodb.Xid) (rce *revCacheEntry, rceNew bool) { ...@@ -300,9 +316,13 @@ func (c *Cache) lookupRCE(xid zodb.Xid) (rce *revCacheEntry, rceNew bool) {
// loadRCE performs data loading from database into rce. // loadRCE performs data loading from database into rce.
// //
// rce must be new just created by lookupRCE() with returned rceNew=true. // rce must be new just created by lookupRCE() with returned rceNew=true.
func (c *Cache) loadRCE(rce *revCacheEntry, xid zodb.Xid) { // loading completion is signalled by closing rce.ready.
func (c *Cache) loadRCE(rce *revCacheEntry, oid zodb.Oid) {
oce := rce.parent oce := rce.parent
data, serial, err := c.loader.Load(xid) data, serial, err := c.loader.Load(zodb.Xid{
Oid: oid,
XTid: zodb.XTid{Tid: rce.before, TidBefore: true},
})
// normalize data/serial if it was error // normalize data/serial if it was error
if err != nil { if err != nil {
...@@ -314,8 +334,7 @@ func (c *Cache) loadRCE(rce *revCacheEntry, xid zodb.Xid) { ...@@ -314,8 +334,7 @@ func (c *Cache) loadRCE(rce *revCacheEntry, xid zodb.Xid) {
rce.err = err rce.err = err
// verify db gives serial < before // verify db gives serial < before
if rce.serial >= rce.before { if rce.serial >= rce.before {
// XXX loadSerial? rce.errDB(oid, "load(<%v) -> %v", rce.before, serial)
rce.errDB(xid.Oid, "load(<%v) -> %v", rce.before, serial)
} }
close(rce.ready) close(rce.ready)
...@@ -336,7 +355,7 @@ func (c *Cache) loadRCE(rce *revCacheEntry, xid zodb.Xid) { ...@@ -336,7 +355,7 @@ func (c *Cache) loadRCE(rce *revCacheEntry, xid zodb.Xid) {
// if rce & rceNext cover the same range -> drop rce // if rce & rceNext cover the same range -> drop rce
if i + 1 < len(oce.rcev) { if i + 1 < len(oce.rcev) {
rceNext := oce.rcev[i+1] rceNext := oce.rcev[i+1]
if rceNext.loaded() && tryMerge(rce, rceNext, rce, xid.Oid) { if rceNext.loaded() && tryMerge(rce, rceNext, rce, oid) {
// not δsize -= len(rce.data) // not δsize -= len(rce.data)
// tryMerge can change rce.data if consistency is broken // tryMerge can change rce.data if consistency is broken
δsize = 0 δsize = 0
...@@ -347,7 +366,7 @@ func (c *Cache) loadRCE(rce *revCacheEntry, xid zodb.Xid) { ...@@ -347,7 +366,7 @@ func (c *Cache) loadRCE(rce *revCacheEntry, xid zodb.Xid) {
// if rcePrev & rce cover the same range -> drop rcePrev // if rcePrev & rce cover the same range -> drop rcePrev
if i > 0 { if i > 0 {
rcePrev := oce.rcev[i-1] rcePrev := oce.rcev[i-1]
if rcePrev.loaded() && tryMerge(rcePrev, rce, rce, xid.Oid) { if rcePrev.loaded() && tryMerge(rcePrev, rce, rce, oid) {
δsize -= len(rcePrev.data) δsize -= len(rcePrev.data)
} }
} }
......
...@@ -269,7 +269,7 @@ func TestCache(t *testing.T) { ...@@ -269,7 +269,7 @@ func TestCache(t *testing.T) {
checkOCE(1, rce1_b4, rce1_b7, rce1_b8, rce1_b10, rce1_b12, rce1_b14, rce1_b16) checkOCE(1, rce1_b4, rce1_b7, rce1_b8, rce1_b10, rce1_b12, rce1_b14, rce1_b16)
// (<14 also becomes ready and takes oce lock first, merging <12 and <14 into <16) // (<14 also becomes ready and takes oce lock first, merging <12 and <14 into <16)
c.loadRCE(rce1_b14, xidlt(1,14)) c.loadRCE(rce1_b14, 1)
checkRCE(rce1_b14, 14, 10, world, nil) checkRCE(rce1_b14, 14, 10, world, nil)
checkRCE(rce1_b16, 16, 10, world, nil) checkRCE(rce1_b16, 16, 10, world, nil)
checkRCE(rce1_b12, 12, 10, world, nil) checkRCE(rce1_b12, 12, 10, world, nil)
...@@ -304,7 +304,7 @@ func TestCache(t *testing.T) { ...@@ -304,7 +304,7 @@ func TestCache(t *testing.T) {
// <9 must be separate from <8 and <10 because it is IO error there // <9 must be separate from <8 and <10 because it is IO error there
rce1_b9, new9 := c.lookupRCE(xidlt(1,9)) rce1_b9, new9 := c.lookupRCE(xidlt(1,9))
ok1(new9) ok1(new9)
c.loadRCE(rce1_b9, xidlt(1,9)) c.loadRCE(rce1_b9, 1)
checkRCE(rce1_b9, 9, 0, nil, ioerr) checkRCE(rce1_b9, 9, 0, nil, ioerr)
checkOCE(1, rce1_b4, rce1_b7, rce1_b8, rce1_b9, rce1_b10, rce1_b16) checkOCE(1, rce1_b4, rce1_b7, rce1_b8, rce1_b9, rce1_b10, rce1_b16)
......
...@@ -150,9 +150,12 @@ type IStorage interface { ...@@ -150,9 +150,12 @@ type IStorage interface {
LastOid() (Oid, error) LastOid() (Oid, error)
// LoadSerial and LoadBefore generalized into 1 Load (see Xid for details) // LoadSerial and LoadBefore generalized into 1 Load (see Xid for details)
// TODO data []byte -> something allocated from slab ? //
// XXX zodb.loadBefore() returns (data, serial, serial_next) -> add serial_next?
//
// XXX currently deleted data is returned as data=nil -- is it ok? // XXX currently deleted data is returned as data=nil -- is it ok?
// TODO specify error when data not found // TODO specify error when data not found -> ErrOidMissing | ErrXidMissing
// TODO data []byte -> something allocated from slab ?
Load(xid Xid) (data []byte, serial Tid, err error) // XXX -> StorageRecordInformation ? Load(xid Xid) (data []byte, serial Tid, err error) // XXX -> StorageRecordInformation ?
// Prefetch(xid Xid) (no error) // Prefetch(xid Xid) (no error)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment