Commit 15afd3ad authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent cb07d772
...@@ -21,14 +21,24 @@ package client ...@@ -21,14 +21,24 @@ package client
// cache management // cache management
import ( import (
"fmt"
"sort"
"sync" "sync"
"unsafe" "unsafe"
"lab.nexedi.com/kirr/neo/go/zodb" "lab.nexedi.com/kirr/neo/go/zodb"
) )
// storLoader represents loading part of a storage
// XXX -> zodb?
type storLoader interface {
Load(xid zodb.Xid) (data []byte, serial zodb.Tid, err error)
}
// Cache adds RAM caching layer over a storage // Cache adds RAM caching layer over a storage
type Cache struct { type Cache struct {
loader storLoader
mu sync.RWMutex mu sync.RWMutex
// cache is fully synchronized with storage for transactions with tid < before. // cache is fully synchronized with storage for transactions with tid < before.
...@@ -40,7 +50,9 @@ type Cache struct { ...@@ -40,7 +50,9 @@ type Cache struct {
// garbage collection: // garbage collection:
gcMu sync.Mutex gcMu sync.Mutex
lru listHead // revCacheEntries in LRU order lru listHead // revCacheEntries in LRU order
size int64 // cached data size in bytes size int // cached data size in bytes
sizeMax int // cache is allowed to occupy not more than this
} }
// oidCacheEntry maintains cached revisions for 1 oid // oidCacheEntry maintains cached revisions for 1 oid
...@@ -88,22 +100,32 @@ type revCacheEntry struct { ...@@ -88,22 +100,32 @@ type revCacheEntry struct {
ready chan struct{} // closed when loading finished ready chan struct{} // closed when loading finished
} }
// XXX doc func NewCache(loader storLoader) *Cache {
func (oce *oidCacheEntry) newRevEntry(before zodb.Tid) *revCacheEntry { return &Cache{loader: loader}
}
// newReveEntry creates new revCacheEntry with .before and inserts it into .revv @i
// (if i == len(oce.revv) - entry is appended)
func (oce *oidCacheEntry) newRevEntry(i int, before zodb.Tid) *revCacheEntry {
rce := &revCacheEntry{ rce := &revCacheEntry{
parent: oce, parent: oce,
serial: 0, serial: 0,
before: before, before: before,
ready: make(chan struct{})} ready: make(chan struct{}),
} }
rce.inLRU.init() rce.inLRU.Init()
oce.revv = append(oce.revv, nil)
copy(oce.revv[i+1:], oce.revv[i:])
oce.revv[i] = rce
return rce return rce
} }
// find finds rce under oce and returns its index in oce.revv. // find finds rce under oce and returns its index in oce.revv.
// not found -> -1. // not found -> -1.
func (oce *oidCacheEntry) find(rce *revCacheEntry) int { func (oce *oidCacheEntry) find(rce *revCacheEntry) int {
for i, r := oce.revv { for i, r := range oce.revv {
if r == rce { if r == rce {
return i return i
} }
...@@ -112,7 +134,12 @@ func (oce *oidCacheEntry) find(rce *revCacheEntry) int { ...@@ -112,7 +134,12 @@ func (oce *oidCacheEntry) find(rce *revCacheEntry) int {
} }
func (oce *oidCacheEntry) deli(i int) { func (oce *oidCacheEntry) deli(i int) {
oce.revv = append(oce.revv[:i], oce.revv[i+1:]) n := len(oce.revv) - 1
copy(oce.revv[i:], oce.revv[i+1:])
// release ptr to revCacheEntry so it won't confusingly stay live when
// its turn to be deleted come.
oce.revv[n] = nil
oce.revv = oce.revv[:n]
} }
// XXX doc; must be called with oce lock held // XXX doc; must be called with oce lock held
...@@ -122,7 +149,7 @@ func (oce *oidCacheEntry) del(rce *revCacheEntry) { ...@@ -122,7 +149,7 @@ func (oce *oidCacheEntry) del(rce *revCacheEntry) {
panic("rce not found") panic("rce not found")
} }
rce.revv = append(rce.revv[:i], rce.revv[i+1:]) oce.deli(i)
} }
// lock order: Cache.mu > oidCacheEntry > (?) revCacheEntry // lock order: Cache.mu > oidCacheEntry > (?) revCacheEntry
...@@ -130,14 +157,13 @@ func (oce *oidCacheEntry) del(rce *revCacheEntry) { ...@@ -130,14 +157,13 @@ func (oce *oidCacheEntry) del(rce *revCacheEntry) {
// XXX maintain nhit / nmiss? // XXX maintain nhit / nmiss?
func (c *cache) Load(xid zodb.Xid) (data []byte, tid Tid, err error) { func (c *Cache) Load(xid zodb.Xid) (data []byte, tid zodb.Tid, err error) {
// oid -> oce (oidCacheEntry) ; create new empty oce if not yet there // oid -> oce (oidCacheEntry) ; create new empty oce if not yet there
// exit with oce locked and cache.before read consistently // exit with oce locked and cache.before read consistently
c.mu.RLock() c.mu.RLock()
oce := c.entryMap[xid.Oid] oce := c.entryMap[xid.Oid]
cacheBefore := c.before cacheBefore := c.before
cacheMemUsed := c.memUsed
if oce != nil { if oce != nil {
oce.Lock() oce.Lock()
...@@ -148,7 +174,7 @@ func (c *cache) Load(xid zodb.Xid) (data []byte, tid Tid, err error) { ...@@ -148,7 +174,7 @@ func (c *cache) Load(xid zodb.Xid) (data []byte, tid Tid, err error) {
c.mu.Lock() c.mu.Lock()
oce = c.entryMap[xid.Oid] oce = c.entryMap[xid.Oid]
if oce == nil { if oce == nil {
oce = &cacheEntry{} oce = &oidCacheEntry{}
c.entryMap[xid.Oid] = oce c.entryMap[xid.Oid] = oce
} }
cacheBefore = c.before // reload c.before becuase we relocked the cache cacheBefore = c.before // reload c.before becuase we relocked the cache
...@@ -167,20 +193,19 @@ func (c *cache) Load(xid zodb.Xid) (data []byte, tid Tid, err error) { ...@@ -167,20 +193,19 @@ func (c *cache) Load(xid zodb.Xid) (data []byte, tid Tid, err error) {
if before == zodb.TidMax { if before == zodb.TidMax {
before = cacheBefore before = cacheBefore
} }
xid.Tid <= before return xid.Tid <= before
}) })
switch { switch {
// not found - tid > max(revv.before) - insert new max entry // not found - tid > max(revv.before) - insert new max entry
case i == l: case i == l:
rce = oce.newRevEntry(xid.Tid) rce = oce.newRevEntry(i, xid.Tid)
if rce.before == cacheBefore { if rce.before == cacheBefore {
// FIXME better do this when the entry becomes loaded ? // FIXME better do this when the entry becomes loaded ?
// XXX vs concurrent invalidations? // XXX vs concurrent invalidations?
rce.before = zodb.TidMax rce.before = zodb.TidMax
} }
rceNew = true rceNew = true
oce.revv = append(oce.revv, rce) // XXX -> newRevEntry ?
// found: // found:
// tid <= revv[i].before // tid <= revv[i].before
...@@ -197,9 +222,8 @@ func (c *cache) Load(xid zodb.Xid) (data []byte, tid Tid, err error) { ...@@ -197,9 +222,8 @@ func (c *cache) Load(xid zodb.Xid) (data []byte, tid Tid, err error) {
// otherwise - insert new entry // otherwise - insert new entry
default: default:
rce = oce.newRevEntry(xid.Tid) rce = oce.newRevEntry(i, xid.Tid)
rceNew = true rceNew = true
oce.revv = append(oce.revv[:i], rce, oce.revv[i:]) // XXX -> newRevEntry ?
} }
// XXX serial -> revCacheEntry // XXX serial -> revCacheEntry
...@@ -219,7 +243,7 @@ func (c *cache) Load(xid zodb.Xid) (data []byte, tid Tid, err error) { ...@@ -219,7 +243,7 @@ func (c *cache) Load(xid zodb.Xid) (data []byte, tid Tid, err error) {
} }
// entry was not in cache - this goroutine becomes responsible for loading it // entry was not in cache - this goroutine becomes responsible for loading it
data, serial, err := c.stor.Load(xid) data, serial, err := c.loader.Load(xid)
// normailize data/serial if it was error // normailize data/serial if it was error
if err != nil { if err != nil {
...@@ -235,7 +259,7 @@ func (c *cache) Load(xid zodb.Xid) (data []byte, tid Tid, err error) { ...@@ -235,7 +259,7 @@ func (c *cache) Load(xid zodb.Xid) (data []byte, tid Tid, err error) {
rce.errDB(xid.Oid, "load(<%v) -> %v", rce.before, serial) rce.errDB(xid.Oid, "load(<%v) -> %v", rce.before, serial)
} }
close(ready) close(rce.ready)
δsize := len(rce.data) δsize := len(rce.data)
// merge rce with adjacent entries in parent // merge rce with adjacent entries in parent
...@@ -270,7 +294,7 @@ func (c *cache) Load(xid zodb.Xid) (data []byte, tid Tid, err error) { ...@@ -270,7 +294,7 @@ func (c *cache) Load(xid zodb.Xid) (data []byte, tid Tid, err error) {
// if rcePrev & rce cover the same range -> drop rcePrev // if rcePrev & rce cover the same range -> drop rcePrev
if i > 0 { if i > 0 {
rcePrev = oce.revv[i-1] rcePrev := oce.revv[i-1]
if rcePrev.loaded() && rce.err == nil && rce.serial < rcePrev.before { if rcePrev.loaded() && rce.err == nil && rce.serial < rcePrev.before {
// drop rcePrev // drop rcePrev
oce.deli(i-1) oce.deli(i-1)
...@@ -290,14 +314,15 @@ func (c *cache) Load(xid zodb.Xid) (data []byte, tid Tid, err error) { ...@@ -290,14 +314,15 @@ func (c *cache) Load(xid zodb.Xid) (data []byte, tid Tid, err error) {
c.gcMu.Lock() c.gcMu.Lock()
rce.inLRU.MoveBefore(&c.lru) rce.inLRU.MoveBefore(&c.lru)
c.size += δsize c.size += δsize
if c.size > c.sizeTarget { if c.size > c.sizeMax {
-> run gc // XXX -> run gc
} }
c.gcMu.Unlock() c.gcMu.Unlock()
return rce.data, rce.serial, rce.err return rce.data, rce.serial, rce.err
} }
/*
func (c *cache) gc(...) { func (c *cache) gc(...) {
c.lruMu.Lock() c.lruMu.Lock()
revh := c.lru.next revh := c.lru.next
...@@ -328,6 +353,7 @@ func (c *cache) cleaner() { ...@@ -328,6 +353,7 @@ func (c *cache) cleaner() {
} }
} }
} }
*/
// loaded reports whether rce was already loaded // loaded reports whether rce was already loaded
...@@ -342,13 +368,15 @@ func (rce *revCacheEntry) loaded() bool { ...@@ -342,13 +368,15 @@ func (rce *revCacheEntry) loaded() bool {
// revCacheEntry: .inLRU -> . // revCacheEntry: .inLRU -> .
func (h *listHead) rceFromInLRU() (rce *revCacheEntry) { func (h *listHead) rceFromInLRU() (rce *revCacheEntry) {
return (*revCacheEntry)(unsafe.Pointer(h) - unsafe.OffsetOf(rce.inLRU)) urce := unsafe.Pointer(uintptr(unsafe.Pointer(h)) - unsafe.Offsetof(rce.inLRU))
return (*revCacheEntry)(urce)
} }
// errDB returns error about database being inconsistent // errDB returns error about database being inconsistent
func errDB(oid zodb.Oid, format string, argv ...interface{}) error { func errDB(oid zodb.Oid, format string, argv ...interface{}) error {
// XXX -> separate type? // XXX -> separate type?
return fmt.Errorf("cache: database inconsistency: oid: %v: " + format, oid, ...argv) return fmt.Errorf("cache: database inconsistency: oid: %v: " + format,
append([]interface{}{oid}, argv...)...)
} }
// errDB marks rce with database inconsistency error // errDB marks rce with database inconsistency error
......
...@@ -20,9 +20,90 @@ ...@@ -20,9 +20,90 @@
package client package client
import ( import (
"sort"
"testing" "testing"
"lab.nexedi.com/kirr/neo/go/zodb"
) )
// tStorage implements read-only storage for cache testing
type tStorage struct {
//txnv []tTxnRecord // transactions; .tid↑
// oid -> [](.serial, .data)
dataMap map[zodb.Oid][]tOidData // with .serial↑
}
// data for oid for 1 revision
type tOidData struct {
serial zodb.Tid
data []byte
}
var tstor = &tStorage{
dataMap: map[zodb.Oid][]tOidData{
1: {
{3, []byte("hello")},
{7, []byte("world")},
},
},
}
/*
type tTxnRecord struct {
tid zodb.Tid
// data records for oid changed in transaction
// .oid↑
datav []tDataRecord
}
type tDataRecord struct {
oid zodb.Oid
data []byte
}
if xid.TidBefore {
// find max txn with .tid < xid.Tid
n := len(s.txnv)
i := n - 1 - sort.Search(n, func(i int) bool {
return s.txnv[n - 1 - i].tid < xid.Tid
})
if i == -1 {
// XXX xid.Tid < all .tid - no such transaction
}
}
*/
func (s *tStorage) Load(xid zodb.Xid) (data []byte, serial zodb.Tid, err error) {
tid := xid.Tid
if xid.TidBefore {
tid++ // XXX overflow
}
datav := s.dataMap[xid.Oid]
if datav == nil {
return nil, 0, &zodb.ErrOidMissing{xid.Oid}
}
// find max entry with .serial < tid
n := len(datav)
i := n - 1 - sort.Search(n, func(i int) bool {
return datav[n - 1 - i].serial < tid
})
if i == -1 {
// tid < all .serial - no such transaction
return nil, 0, &zodb.ErrXidMissing{xid}
}
// check we have exact match if it was loadSerial
if xid.TidBefore && datav[i].serial != xid.Tid {
return nil, 0, &zodb.ErrXidMissing{xid}
}
return datav[i].data, datav[i].serial, nil
}
func TestCache(t *testing.T) { func TestCache(t *testing.T) {
// XXX <100 <90 <80 // XXX <100 <90 <80
// q<110 -> a) 110 <= cache.before b) otherwise // q<110 -> a) 110 <= cache.before b) otherwise
......
...@@ -40,7 +40,7 @@ func (h *listHead) Init() { ...@@ -40,7 +40,7 @@ func (h *listHead) Init() {
func (h *listHead) Delete() { func (h *listHead) Delete() {
h.next.prev = h.prev h.next.prev = h.prev
h.prev.next = h.next h.prev.next = h.next
h.init() h.Init()
} }
// MoveBefore moves a to be before b // MoveBefore moves a to be before b
......
...@@ -151,7 +151,7 @@ type IStorage interface { ...@@ -151,7 +151,7 @@ type IStorage interface {
// TODO data []byte -> something allocated from slab ? // TODO data []byte -> something allocated from slab ?
// XXX currently deleted data is returned as data=nil -- is it ok? // XXX currently deleted data is returned as data=nil -- is it ok?
// TODO specify error when data not found // TODO specify error when data not found
Load(xid Xid) (data []byte, tid Tid, err error) // XXX -> StorageRecordInformation ? Load(xid Xid) (data []byte, serial Tid, err error) // XXX -> StorageRecordInformation ?
// -> Prefetch(xid Xid) ... // -> Prefetch(xid Xid) ...
// PrefetchBefore(oidv []Oid, beforeTid Tid) error (?) // PrefetchBefore(oidv []Oid, beforeTid Tid) error (?)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment