Commit 412c35a2 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent ebbd4b8f
...@@ -22,6 +22,7 @@ package client ...@@ -22,6 +22,7 @@ package client
import ( import (
"sync" "sync"
"unsafe"
"lab.nexedi.com/kirr/neo/go/zodb" "lab.nexedi.com/kirr/neo/go/zodb"
) )
...@@ -37,8 +38,7 @@ type Cache struct { ...@@ -37,8 +38,7 @@ type Cache struct {
oidDir map[zodb.Oid]*oidEntry oidDir map[zodb.Oid]*oidEntry
bytesUsed int64 size int64 // in bytes
lru revListHead lru revListHead
} }
...@@ -95,6 +95,10 @@ type revListHead struct { ...@@ -95,6 +95,10 @@ type revListHead struct {
next, prev *revCacheEntry next, prev *revCacheEntry
} }
func (h *revListHead) rceFromInLRU() (rce *revCacheEntry) {
return (*revCacheEntry)(unsafe.Pointer(h) - unsafe.OffsetOf(rce.inLRU))
}
// XXX -> to ctor? // XXX -> to ctor?
func (h *revListHead) init() { func (h *revListHead) init() {
h.next = h h.next = h
...@@ -118,6 +122,29 @@ func (a *revListHead) MoveBefore(b *revListHead) { ...@@ -118,6 +122,29 @@ func (a *revListHead) MoveBefore(b *revListHead) {
a.prev.next = a a.prev.next = a
} }
// XXX doc
func (oce *oidCacheEntry) newRevEntry(before zodb.Tid) *revCacheEntry {
rce := &revCacheEntry{
parent: oce,
serial: 0,
before: before,
ready: make(chan struct{})}
}
rce.inLRU.init()
return rce
}
// XXX doc; must be called with oce lock held
func (oce *oidCacheEntry) del(rce *revCacheEntry) {
for i, r := range rce.revv {
if r == rce {
rce.revv = append(rce.revv[:i], rce.revv[i+1:])
return
}
}
panic("rce not found")
}
// cleaner is the process that cleans cache by evicting less-needed entries. // cleaner is the process that cleans cache by evicting less-needed entries.
func (c *cache) cleaner() { func (c *cache) cleaner() {
...@@ -144,23 +171,23 @@ func (c *cache) Load(xid zodb.Xid) (data []byte, tid Tid, err error) { ...@@ -144,23 +171,23 @@ func (c *cache) Load(xid zodb.Xid) (data []byte, tid Tid, err error) {
// exit with cacheEntry locked and cache.before read consistently // exit with cacheEntry locked and cache.before read consistently
c.mu.RLock() c.mu.RLock()
entry := c.entryMap[xid.Oid] oce := c.entryMap[xid.Oid]
cacheBefore := c.before cacheBefore := c.before
cacheMemUsed := c.memUsed cacheMemUsed := c.memUsed
if entry != nil { if oce != nil {
entry.Lock() oce.Lock()
c.mu.RUnlock() c.mu.RUnlock()
} else { } else {
c.mu.RUnlock() c.mu.RUnlock()
c.mu.Lock() c.mu.Lock()
entry = c.entryMap[xid.Oid] oce = c.entryMap[xid.Oid]
if entry == nil { if oce == nil {
entry = &cacheEntry{} oce = &cacheEntry{}
c.entryMap[xid.Oid] = entry c.entryMap[xid.Oid] = oce
} }
cacheBefore = c.before // reload c.before for correctness cacheBefore = c.before // reload c.before for correctness
entry.Lock() oce.Lock()
c.mu.Unlock() c.mu.Unlock()
} }
...@@ -169,9 +196,9 @@ func (c *cache) Load(xid zodb.Xid) (data []byte, tid Tid, err error) { ...@@ -169,9 +196,9 @@ func (c *cache) Load(xid zodb.Xid) (data []byte, tid Tid, err error) {
// before -> revCacheEntry // before -> revCacheEntry
if xid.TidBefore { if xid.TidBefore {
l := len(entry.revv) l := len(oce.revv)
i := sort.Search(l, func(i int) bool { i := sort.Search(l, func(i int) bool {
before := entry.revv[i].before before := oce.revv[i].before
if before == zodb.TidMax { if before == zodb.TidMax {
before = cacheBefore before = cacheBefore
} }
...@@ -181,13 +208,13 @@ func (c *cache) Load(xid zodb.Xid) (data []byte, tid Tid, err error) { ...@@ -181,13 +208,13 @@ func (c *cache) Load(xid zodb.Xid) (data []byte, tid Tid, err error) {
switch { switch {
// not found - tid > max(revv.before) - create new max entry // not found - tid > max(revv.before) - create new max entry
case i == l: case i == l:
rce = &revCacheEntry{serial: 0, before: before} rce = oce.newRevEntry(xid.Tid)
if rce.before == cacheBefore { if rce.before == cacheBefore {
// XXX better do this when the entry becomes loaded ? // XXX better do this when the entry becomes loaded ?
rce.before = zodb.TidMax // XXX vs concurrent invalidations? rce.before = zodb.TidMax // XXX vs concurrent invalidations?
} }
rceNew = true rceNew = true
entry.revv = append(entry.revv, rce) oce.revv = append(oce.revv, rce)
// found: // found:
// tid <= revv[i].before // tid <= revv[i].before
...@@ -195,13 +222,13 @@ func (c *cache) Load(xid zodb.Xid) (data []byte, tid Tid, err error) { ...@@ -195,13 +222,13 @@ func (c *cache) Load(xid zodb.Xid) (data []byte, tid Tid, err error) {
// exact match - we already have it // exact match - we already have it
case xid.Tid == revv[i].before: case xid.Tid == revv[i].before:
rce = entry.revv[i] rce = oce.revv[i]
// if outside [serial, before) - insert new entry // if outside [serial, before) - insert new entry
case !(revv[i].serial != 0 && revv[i].serial <= xid.Tid): case !(revv[i].serial != 0 && revv[i].serial <= xid.Tid):
rce = &revCacheEntry{serial: 0, before: xid.Tid} rce = oce.newRevEntry(xid.Tid)
rceNew = true rceNew = true
entry.revv = append(entry.revv[:i], rce, entry.revv[i:]) oce.revv = append(oce.revv[:i], rce, oce.revv[i:])
} }
// XXX serial -> revCacheEntry // XXX serial -> revCacheEntry
...@@ -209,15 +236,12 @@ func (c *cache) Load(xid zodb.Xid) (data []byte, tid Tid, err error) { ...@@ -209,15 +236,12 @@ func (c *cache) Load(xid zodb.Xid) (data []byte, tid Tid, err error) {
// TODO // TODO
} }
if rceNew { oce.Unlock()
rce.ready = make(chan struct{})
}
entry.Unlock()
// entry was already in cache - use it // entry was already in cache - use it
if !rceNew { if !rceNew {
<-rce.ready <-rce.ready
// XXX update lru
return rce.data, rce.serial, rce.err return rce.data, rce.serial, rce.err
} }
...@@ -228,14 +252,32 @@ func (c *cache) Load(xid zodb.Xid) (data []byte, tid Tid, err error) { ...@@ -228,14 +252,32 @@ func (c *cache) Load(xid zodb.Xid) (data []byte, tid Tid, err error) {
rce.err = err rce.err = err
close(ready) close(ready)
entry.Lock() oce.Lock()
// XXX merge with adjacent entries in revv // XXX merge with adjacent entries in revv
entry.Unlock() oce.Unlock()
c.Lock() c.Lock()
c.memUsed += len(data) c.size += len(data)
// XXX GC if memUsed > threthold ? if c.size > c.sizeTarget {
-> run gc
}
c.Unlock() c.Unlock()
return rce.data, rce.serial, rce.err return rce.data, rce.serial, rce.err
} }
func (c *cache) gc(...) {
c.lruMu.Lock()
revh := c.lru.next
c.lruMu.Unlock()
for ; revh != &lru; revh = revh.next { // .next after .delete - ok ?
rce := revh.rceFromInLRU()
oce := rce.parent
oce.Lock()
oce.del(rce)
oce.Unlock()
}
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment