Commit c41c2907 authored by Kirill Smelkov's avatar Kirill Smelkov

go/zodb: High-level watching - initial draft

Add initial draft for IStorage-level watch API and implemention:

- Notifier goes away;
- Add Watcher interface instead with AddWatch/DelWatch calls. See
  Watcher documentation for details.
- IStorage wrapper now subscribes to driver events, retranslate them
  to users and handles Add/Del watch subscription requests.

XXX Raw cache becomes temporarily disabled, until it is fixed to handle invalidations.
XXX Tests pending.
parent 16db7baf
// Copyright (C) 2017 Nexedi SA and Contributors. // Copyright (C) 2017-2019 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com> // Kirill Smelkov <kirr@nexedi.com>
// //
// This program is free software: you can Use, Study, Modify and Redistribute // This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your // it under the terms of the GNU General Public License version 3, or (at your
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
// See https://www.nexedi.com/licensing for rationale and options. // See https://www.nexedi.com/licensing for rationale and options.
package zodb package zodb
// cache management // cache of raw storage data.
import ( import (
"context" "context"
...@@ -31,6 +31,17 @@ import ( ...@@ -31,6 +31,17 @@ import (
"lab.nexedi.com/kirr/go123/xcontainer/list" "lab.nexedi.com/kirr/go123/xcontainer/list"
) )
// FIXME watch over storage and update cache to new commits there.
//
// ( Will need to keep δtail for looking a bit into future for loads with at
// slightly in the past: e.g. DB changes frequently, and every connection
// open results in slightly lagging at.
//
// With δtail we will be able to look ahead in [at, .cache.head] range and
// mark just loaded RCE as having .head=∞, if object is not changed there,
// instead of marking RCE with .head=at and this way needing to load the same
// object again for next connection open )
// XXX managing LRU under 1 big gcMu might be bad for scalability. // XXX managing LRU under 1 big gcMu might be bad for scalability.
// TODO maintain nhit / nmiss + way to read cache stats // TODO maintain nhit / nmiss + way to read cache stats
// TODO optimize cache more so that miss overhead becomes negligible // TODO optimize cache more so that miss overhead becomes negligible
......
// Copyright (C) 2017 Nexedi SA and Contributors. // Copyright (C) 2017-2019 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com> // Kirill Smelkov <kirr@nexedi.com>
// //
// This program is free software: you can Use, Study, Modify and Redistribute // This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your // it under the terms of the GNU General Public License version 3, or (at your
...@@ -613,7 +613,7 @@ func TestCache(t *testing.T) { ...@@ -613,7 +613,7 @@ func TestCache(t *testing.T) {
// XXX verify caching vs ctx cancel // XXX verify caching vs ctx cancel
// XXX verify db inconsistency checks // XXX verify db inconsistency checks
// XXX verify loading with before > cache.before // XXX verify loading with at > cache.head
} }
type Checker struct { type Checker struct {
......
...@@ -23,10 +23,13 @@ package zodb ...@@ -23,10 +23,13 @@ package zodb
import ( import (
"context" "context"
"fmt" "fmt"
"log"
"net/url" "net/url"
"strings" "strings"
"sync"
"lab.nexedi.com/kirr/go123/mem" "lab.nexedi.com/kirr/go123/mem"
"lab.nexedi.com/kirr/go123/xcontext"
) )
// OpenOptions describes options for OpenStorage. // OpenOptions describes options for OpenStorage.
...@@ -97,12 +100,13 @@ func OpenStorage(ctx context.Context, zurl string, opt *OpenOptions) (IStorage, ...@@ -97,12 +100,13 @@ func OpenStorage(ctx context.Context, zurl string, opt *OpenOptions) (IStorage,
return nil, fmt.Errorf("zodb: URL scheme \"%s://\" not supported", u.Scheme) return nil, fmt.Errorf("zodb: URL scheme \"%s://\" not supported", u.Scheme)
} }
drvWatchq := make(chan Event)
drvOpt := &DriverOptions{ drvOpt := &DriverOptions{
ReadOnly: opt.ReadOnly, ReadOnly: opt.ReadOnly,
Watchq: nil, // TODO use watchq to implement high-level watching Watchq: drvWatchq,
} }
storDriver, _, err := opener(ctx, u, drvOpt) // TODO use at0 to initialize watcher δtail storDriver, at0, err := opener(ctx, u, drvOpt)
if err != nil { if err != nil {
return nil, err return nil, err
} }
...@@ -112,12 +116,28 @@ func OpenStorage(ctx context.Context, zurl string, opt *OpenOptions) (IStorage, ...@@ -112,12 +116,28 @@ func OpenStorage(ctx context.Context, zurl string, opt *OpenOptions) (IStorage,
// small cache so that prefetch can work for loading // small cache so that prefetch can work for loading
// XXX 512K hardcoded (= ~ 128 · 4K-entries) // XXX 512K hardcoded (= ~ 128 · 4K-entries)
cache = NewCache(storDriver, 128 * 4*1024) cache = NewCache(storDriver, 128 * 4*1024)
// FIXME teach cache for watching and remove vvv
log.Printf("zodb: FIXME: open %s: cache is not ready for invalidations" +
" -> NoCache forced", zurl)
cache = nil
}
stor := &storage{
driver: storDriver,
l1cache: cache,
down: make(chan struct{}),
drvWatchq: drvWatchq,
drvHead: at0,
watchReq: make(chan watchRequest),
watchTab: make(map[chan<- Event]struct{}),
watchCancel: make(map[chan<- Event]chan struct{}),
} }
go stor.watcher() // stoped on close
return &storage{ return stor, nil
driver: storDriver,
l1cache: cache,
}, nil
} }
...@@ -129,28 +149,66 @@ func OpenStorage(ctx context.Context, zurl string, opt *OpenOptions) (IStorage, ...@@ -129,28 +149,66 @@ func OpenStorage(ctx context.Context, zurl string, opt *OpenOptions) (IStorage,
type storage struct { type storage struct {
driver IStorageDriver driver IStorageDriver
l1cache *Cache // can be =nil, if opened with NoCache l1cache *Cache // can be =nil, if opened with NoCache
down chan struct{} // ready when no longer operational
downOnce sync.Once // shutdown may be due to both Close and IO error in watcher
downErr error // reason for shutdown
// watcher
drvWatchq chan Event // watchq passed to driver
drvHead Tid // last tid received from drvWatchq
watchReq chan watchRequest // {Add,Del}Watch requests go here
watchTab map[chan<- Event]struct{} // registered watchers
// when watcher is closed (.down is ready) {Add,Del}Watch operate directly
// on .watchTab and interact with each other directly. In that mode:
watchMu sync.Mutex // for watchTab and * below
watchCancel map[chan<- Event]chan struct{} // DelWatch can cancel AddWatch via here
} }
func (s *storage) URL() string { return s.driver.URL() } func (s *storage) URL() string { return s.driver.URL() }
func (s *storage) shutdown(reason error) {
s.downOnce.Do(func() {
close(s.down)
s.downErr = fmt.Errorf("not operational due: %s", reason)
})
}
func (s *storage) Iterate(ctx context.Context, tidMin, tidMax Tid) ITxnIterator { func (s *storage) Iterate(ctx context.Context, tidMin, tidMax Tid) ITxnIterator {
// XXX better -> xcontext.Merge(ctx, s.opCtx)
ctx, cancel := xcontext.MergeChan(ctx, s.down)
defer cancel()
return s.driver.Iterate(ctx, tidMin, tidMax) return s.driver.Iterate(ctx, tidMin, tidMax)
} }
func (s *storage) Close() error { func (s *storage) Close() error {
return s.driver.Close() s.shutdown(fmt.Errorf("closed"))
return s.driver.Close() // this will close drvWatchq and cause watcher stop
} }
// loading goes through cache - this way prefetching can work // loading goes through cache - this way prefetching can work
func (s *storage) LastTid(ctx context.Context) (Tid, error) { func (s *storage) LastTid(ctx context.Context) (Tid, error) {
// XXX better -> xcontext.Merge(ctx, s.opCtx) but currently it costs 1+ goroutine
if ready(s.down) {
return InvalidTid, s.zerr("last_tid", nil, s.downErr)
}
return s.driver.LastTid(ctx) return s.driver.LastTid(ctx)
} }
// Load implements Loader.
func (s *storage) Load(ctx context.Context, xid Xid) (*mem.Buf, Tid, error) { func (s *storage) Load(ctx context.Context, xid Xid) (*mem.Buf, Tid, error) {
// XXX better -> xcontext.Merge(ctx, s.opCtx) but currently it costs 1+ goroutine
if ready(s.down) {
return nil, InvalidTid, s.zerr("load", xid, s.downErr)
}
// XXX here: offload xid validation from cache and driver ? // XXX here: offload xid validation from cache and driver ?
// XXX here: offload wrapping err -> OpError{"load", err} ? // XXX here: offload wrapping err -> OpError{"load", err} ?
// XXX wait xid.At <= .Head ?
if s.l1cache != nil { if s.l1cache != nil {
return s.l1cache.Load(ctx, xid) return s.l1cache.Load(ctx, xid)
} else { } else {
...@@ -158,8 +216,231 @@ func (s *storage) Load(ctx context.Context, xid Xid) (*mem.Buf, Tid, error) { ...@@ -158,8 +216,231 @@ func (s *storage) Load(ctx context.Context, xid Xid) (*mem.Buf, Tid, error) {
} }
} }
// Prefetch implements Prefetcher.
func (s *storage) Prefetch(ctx context.Context, xid Xid) { func (s *storage) Prefetch(ctx context.Context, xid Xid) {
if s.l1cache != nil { if s.l1cache != nil {
s.l1cache.Prefetch(ctx, xid) s.l1cache.Prefetch(ctx, xid)
} }
} }
// ---- watcher ----
// watchRequest represents request to add/del a watch.
type watchRequest struct {
op watchOp // add or del
ack chan Tid // when request processed: at0 for add, ø for del.
watchq chan<- Event // {Add,Del}Watch argument
}
type watchOp int
const (
addWatch watchOp = 0
delWatch watchOp = 1
)
// watcher dispatches events from driver to subscribers and serves
// {Add,Del}Watch requests.
func (s *storage) watcher() {
err := s._watcher()
s.shutdown(err)
}
func (s *storage) _watcher() error {
// staging place for AddWatch requests.
//
// during event delivery to registered watchqs, add/del requests are
// also served - not to get stuck and support clients who do DelWatch
// and no longer receive from their watchq. However we cannot register
// added watchq immediately, because it is undefined whether or not
// we'll see it while iterating watchTab map. So we queue what was
// added and flush it to watchTab on the beginning of each cycle.
var addq map[chan<- Event]struct{}
addqFlush := func() {
for watchq := range addq {
s.watchTab[watchq] = struct{}{}
}
addq = make(map[chan<- Event]struct{})
}
serveReq := func(req watchRequest) {
switch req.op {
case addWatch:
_, already := s.watchTab[req.watchq]
if !already {
_, already = addq[req.watchq]
}
if already {
req.ack <- InvalidTid
return
}
addq[req.watchq] = struct{}{}
case delWatch:
delete(s.watchTab, req.watchq)
delete(addq, req.watchq)
default:
panic("bad watch request op")
}
req.ack <- s.drvHead
}
// close all subscribers's watchq on watcher shutdow
defer func() {
addqFlush()
for watchq := range s.watchTab {
close(watchq)
}
}()
var errDown error
for {
if errDown != nil {
return errDown
}
addqFlush() // register staged AddWatch(s)
select {
case req := <-s.watchReq:
serveReq(req)
case event, ok := <-s.drvWatchq:
if !ok {
// storage closed
return nil
}
switch e := event.(type) {
default:
// XXX -> just log?
panic(fmt.Sprintf("unexpected event: %T", e))
case *EventError:
// ok
case *EventCommit:
// verify event.Tid ↑ (else e.g. δtail.Append will panic)
// if !↑ - stop the storage with error.
if !(e.Tid > s.drvHead) {
errDown = fmt.Errorf(
"%s: storage error: notified with δ.tid not ↑ (%s -> %s)",
s.URL(), s.drvHead, e.Tid)
event = &EventError{errDown}
} else {
s.drvHead = e.Tid
}
}
// deliver event to all watchers.
// handle add/del watchq in the process.
next:
for watchq := range s.watchTab {
for {
select {
case req := <-s.watchReq:
serveReq(req)
// if watchq was removed - we have to skip sending to it
// else try sending to current watchq once again.
_, present := s.watchTab[watchq]
if !present {
continue next
}
case watchq <- event:
// ok
continue next
}
}
}
}
}
}
// AddWatch implements Watcher.
func (s *storage) AddWatch(watchq chan<- Event) (at0 Tid) {
ack := make(chan Tid)
select {
// no longer operational: behave if watchq was registered before that
// and then seen down/close events. Interact with DelWatch directly.
case <-s.down:
at0 = s.drvHead
s.watchMu.Lock()
_, already := s.watchTab[watchq]
if already {
s.watchMu.Unlock()
panic("multiple AddWatch with the same channel")
}
s.watchTab[watchq] = struct{}{}
cancel := make(chan struct{})
s.watchCancel[watchq] = cancel
s.watchMu.Unlock()
go func() {
if s.downErr != nil {
select {
case <-cancel:
return
case watchq <- &EventError{s.downErr}:
// ok
}
}
close(watchq)
}()
return at0
// operational - interact with watcher
case s.watchReq <- watchRequest{addWatch, ack, watchq}:
at0 = <-ack
if at0 == InvalidTid {
panic("multiple AddWatch with the same channel")
}
return at0
}
}
// DelWatch implements Watcher.
func (s *storage) DelWatch(watchq chan<- Event) {
ack := make(chan Tid)
select {
// no longer operational - interact with AddWatch directly.
case <-s.down:
s.watchMu.Lock()
delete(s.watchTab, watchq)
cancel := s.watchCancel[watchq]
if cancel != nil {
delete(s.watchCancel, watchq)
close(cancel)
}
s.watchMu.Unlock()
// operational - interact with watcher
case s.watchReq <- watchRequest{delWatch, ack, watchq}:
<-ack
}
}
// ---- misc ----
// zerr turns err into OpError about s.op(args)
func (s *storage) zerr(op string, args interface{}, err error) *OpError {
return &OpError{URL: s.URL(), Op: op, Args: args, Err: err}
}
// ready returns whether channel is ready.
//
// it should be used only on channels that are intended to be closed.
func ready(ch chan struct{}) bool {
select {
case <-ch:
return true
default:
return false
}
}
// Copyright (C) 2019 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
package zodb
// XXX watcher tests
...@@ -331,6 +331,7 @@ type IStorage interface { ...@@ -331,6 +331,7 @@ type IStorage interface {
// additional to IStorageDriver // additional to IStorageDriver
Prefetcher Prefetcher
Watcher
} }
// Prefetcher provides functionality to prefetch objects. // Prefetcher provides functionality to prefetch objects.
...@@ -450,9 +451,52 @@ type EventCommit struct { ...@@ -450,9 +451,52 @@ type EventCommit struct {
Changev []Oid // ID of objects changed by committed transaction Changev []Oid // ID of objects changed by committed transaction
} }
// Notifier allows to be notified of database changes made by other clients. // Watcher allows to be notified of changes to database.
type Notifier interface { //
// TODO: invalidation channel (notify about changes made to DB not by us from outside) // Watcher is safe to use from multiple goroutines simultaneously.
type Watcher interface {
// AddWatch registers watchq to be notified of database changes.
//
// Whenever a new transaction is committed into the database,
// corresponding event will be sent to watchq.
//
// It is guaranteed that events are coming with ↑ .Tid .
//
// It will be only and all events in (at₀, +∞] range, that will be
// sent, where at₀ is database head that was current when AddWatch call
// was made.
//
// Once registered, watchq must be read until it is closed or until
// DelWatch call. Not doing so will stuck whole storage.
//
// Registered watchq are closed when the database storage is closed.
//
// It is safe to add watch to a closed database storage.
//
// AddWatch must be used only once for a particular watchq channel.
AddWatch(watchq chan<- Event) (at0 Tid)
// DelWatch unregisters watchq from being notified of database changes.
//
// After DelWatch call completes, no new events will be sent to watchq.
// It is safe to call DelWatch without simultaneously reading watchq.
// In particular the following example is valid:
//
// at0 := stor.AddWatch(watchq)
// defer stor.DelWatch(watchq)
//
// for {
// select {
// case <-ctx.Done():
// return ctx.Err()
//
// case <-watchq:
// ...
// }
// }
//
// DelWatch is noop if watchq was not registered.
DelWatch(watchq chan<- Event)
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment