Commit ebbd4b8f authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 094fdc56
...@@ -35,11 +35,15 @@ type Cache struct { ...@@ -35,11 +35,15 @@ type Cache struct {
// cache is fully synchronized with storage for transactions with tid < before. // cache is fully synchronized with storage for transactions with tid < before.
before zodb.Tid before zodb.Tid
entryMap map[zodb.Oid]*cacheEntry oidDir map[zodb.Oid]*oidEntry
bytesUsed int64
lru revListHead
} }
// cacheEntry maintains cached revisions for 1 oid // oidCacheEntry maintains cached revisions for 1 oid
type cacheEntry struct { type oidCacheEntry struct {
sync.Mutex // XXX -> rw ? sync.Mutex // XXX -> rw ?
// cached revisions in ascending order // cached revisions in ascending order
...@@ -53,7 +57,10 @@ type cacheEntry struct { ...@@ -53,7 +57,10 @@ type cacheEntry struct {
// revCacheEntry is information about 1 cached oid revision // revCacheEntry is information about 1 cached oid revision
type revCacheEntry struct { type revCacheEntry struct {
sync.Mutex // XXX -> rw ? inLRU revListHead
parent *oidCacheEntry
// sync.Mutex XXX not needed
// oid revision // oid revision
// 0 if don't know yet - loadBefore(.before) is in progress and actual // 0 if don't know yet - loadBefore(.before) is in progress and actual
...@@ -76,15 +83,56 @@ type revCacheEntry struct { ...@@ -76,15 +83,56 @@ type revCacheEntry struct {
// case when loadBefore with tid > cache.before was called. // case when loadBefore with tid > cache.before was called.
before zodb.Tid before zodb.Tid
// object data or loading error
// object data
data []byte data []byte
err error
ready chan struct{} // closed when loading finished
}
type revListHead struct {
// XXX needs to be created with .next = .prev = self
next, prev *revCacheEntry
}
// XXX + lru // XXX -> to ctor?
ready chan struct{} // closed when entry loaded; nil if evicted from cache func (h *revListHead) init() {
h.next = h
h.prev = h
} }
// Delete deletes h from list
func (h *revListHead) Delete() {
h.next.prev = h.prev
h.prev.next = h.next
}
// MoveBefore moves a to be before b
// XXX ok to move if a was not previously on the list?
func (a *revListHead) MoveBefore(b *revListHead) {
a.Delete()
a.next = b
b.prev = a
a.prev = b.prev
a.prev.next = a
}
// cleaner is the process that cleans cache by evicting less-needed entries.
func (c *cache) cleaner() {
for {
// cleaner is the only mutator/user of Cache.lru and revCacheEntry.inLRU
select {
case rce := <-c.used:
rce.inLRU.MoveBefore(&c.lru)
default:
for rce := c.lru.next; rce != &c.lru; rce = rce.next {
}
}
}
}
// lock order: Cache > cacheEntry > (?) revCacheEntry // lock order: Cache > cacheEntry > (?) revCacheEntry
...@@ -95,8 +143,11 @@ func (c *cache) Load(xid zodb.Xid) (data []byte, tid Tid, err error) { ...@@ -95,8 +143,11 @@ func (c *cache) Load(xid zodb.Xid) (data []byte, tid Tid, err error) {
// oid -> cacheEntry ; creating new empty if not yet there // oid -> cacheEntry ; creating new empty if not yet there
// exit with cacheEntry locked and cache.before read consistently // exit with cacheEntry locked and cache.before read consistently
c.mu.RLock() c.mu.RLock()
entry := c.entryMap[xid.Oid] entry := c.entryMap[xid.Oid]
cacheBefore := c.before cacheBefore := c.before
cacheMemUsed := c.memUsed
if entry != nil { if entry != nil {
entry.Lock() entry.Lock()
c.mu.RUnlock() c.mu.RUnlock()
...@@ -108,7 +159,7 @@ func (c *cache) Load(xid zodb.Xid) (data []byte, tid Tid, err error) { ...@@ -108,7 +159,7 @@ func (c *cache) Load(xid zodb.Xid) (data []byte, tid Tid, err error) {
entry = &cacheEntry{} entry = &cacheEntry{}
c.entryMap[xid.Oid] = entry c.entryMap[xid.Oid] = entry
} }
cacheBefore = c.before cacheBefore = c.before // reload c.before for correctness
entry.Lock() entry.Lock()
c.mu.Unlock() c.mu.Unlock()
} }
...@@ -136,7 +187,7 @@ func (c *cache) Load(xid zodb.Xid) (data []byte, tid Tid, err error) { ...@@ -136,7 +187,7 @@ func (c *cache) Load(xid zodb.Xid) (data []byte, tid Tid, err error) {
rce.before = zodb.TidMax // XXX vs concurrent invalidations? rce.before = zodb.TidMax // XXX vs concurrent invalidations?
} }
rceNew = true rceNew = true
entry.revv = append(entry.revv, &revCacheEntry{before: before} entry.revv = append(entry.revv, rce)
// found: // found:
// tid <= revv[i].before // tid <= revv[i].before
...@@ -158,36 +209,33 @@ func (c *cache) Load(xid zodb.Xid) (data []byte, tid Tid, err error) { ...@@ -158,36 +209,33 @@ func (c *cache) Load(xid zodb.Xid) (data []byte, tid Tid, err error) {
// TODO // TODO
} }
entry.Unlock() // XXX order ok? if rceNew {
rce.Lock() rce.ready = make(chan struct{})
}
// entry is not in cache - this goroutine becomes responsible for loading it entry.Unlock()
if rce.ready == nil {
ready := make(chan struct{})
rce.ready = ready
rce.Unlock()
// entry was already in cache - use it
if !rceNew {
<-rce.ready
return rce.data, rce.serial, rce.err
}
// entry was not in cache - this goroutine becomes responsible for loading it
data, serial, err := c.stor.Load(xid) data, serial, err := c.stor.Load(xid)
rce.Lock()
if rce.ready != nil { // could be evicted XXX ok?
// XXX if err != nil -> ?
rce.serial = serial rce.serial = serial
rce.data = data rce.data = data
} rce.err = err
rce.Unlock()
close(ready) close(ready)
entry.Lock() entry.Lock()
// XXX merge with adjacent entries in revv // XXX merge with adjacent entries in revv
entry.Unlock() entry.Unlock()
return rce c.Lock()
} c.memUsed += len(data)
// XXX GC if memUsed > threthold ?
c.Unlock()
// entry is already in cache - use it return rce.data, rce.serial, rce.err
if rce.ready != nil {
rce.Unlock()
<-rce.ready
return rce
}
} }
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment