Commit f515f4b0 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent ebe407da
...@@ -69,8 +69,8 @@ type Client struct { ...@@ -69,8 +69,8 @@ type Client struct {
operational bool // XXX <- somehow move to NodeApp? operational bool // XXX <- somehow move to NodeApp?
opReady chan struct{} // reinitialized each time state becomes non-operational opReady chan struct{} // reinitialized each time state becomes non-operational
// driver client <- watcher: database commits. // driver client <- watcher: database commits | errors.
watchq chan<- zodb.CommitEvent // FIXME stub watchq chan<- zodb.Event // FIXME stub
} }
var _ zodb.IStorageDriver = (*Client)(nil) var _ zodb.IStorageDriver = (*Client)(nil)
......
...@@ -122,7 +122,7 @@ func NewDB(stor IStorage) *DB { ...@@ -122,7 +122,7 @@ func NewDB(stor IStorage) *DB {
tδkeep: 10*time.Minute, // see δtail discussion tδkeep: 10*time.Minute, // see δtail discussion
} }
watchq := make(chan CommitEvent) watchq := make(chan Event)
at0 := stor.AddWatch(watchq) at0 := stor.AddWatch(watchq)
db.δtail = NewΔTail(at0) // init to (at0, at0] db.δtail = NewΔTail(at0) // init to (at0, at0]
...@@ -176,7 +176,7 @@ type δwaiter struct { ...@@ -176,7 +176,7 @@ type δwaiter struct {
// watcher receives events about new committed transactions and updates δtail. // watcher receives events about new committed transactions and updates δtail.
// //
// it also notifies δtail waiters. // it also notifies δtail waiters.
func (db *DB) watcher(watchq <-chan CommitEvent) { // XXX err ? func (db *DB) watcher(watchq <-chan Event) { // XXX err ?
for { for {
event, ok := <-watchq event, ok := <-watchq
if !ok { if !ok {
...@@ -187,13 +187,26 @@ func (db *DB) watcher(watchq <-chan CommitEvent) { // XXX err ? ...@@ -187,13 +187,26 @@ func (db *DB) watcher(watchq <-chan CommitEvent) { // XXX err ?
fmt.Printf("db: watcher <- %v\n", event) fmt.Printf("db: watcher <- %v\n", event)
var δ *EventCommit
switch event := event.(type) {
default:
panic(fmt.Sprintf("unexepected event: %T", event))
case *EventError:
fmt.Printf("db: watcher: error: %s\n", event.Err)
continue // XXX shutdown instead?
case *EventCommit:
δ = event
}
var readyv []chan struct{} // waiters that become ready var readyv []chan struct{} // waiters that become ready
db.mu.Lock() db.mu.Lock()
db.δtail.Append(event.Tid, event.Changev) db.δtail.Append(δ.Tid, δ.Changev)
for w := range db.δwait { for w := range db.δwait {
if w.at <= event.Tid { if w.at <= δ.Tid {
readyv = append(readyv, w.ready) readyv = append(readyv, w.ready)
delete(db.δwait, w) delete(db.δwait, w)
} }
......
...@@ -50,9 +50,7 @@ type DriverOptions struct { ...@@ -50,9 +50,7 @@ type DriverOptions struct {
// //
// The storage driver will send only and all events in (at₀, +∞] range, // The storage driver will send only and all events in (at₀, +∞] range,
// where at₀ is at returned by driver open. // where at₀ is at returned by driver open.
// Watchq chan<- Event
// TODO extend watchq to also receive errors from watcher.
Watchq chan<- CommitEvent
} }
// DriverOpener is a function to open a storage driver. // DriverOpener is a function to open a storage driver.
...@@ -100,7 +98,7 @@ func OpenStorage(ctx context.Context, zurl string, opt *OpenOptions) (IStorage, ...@@ -100,7 +98,7 @@ func OpenStorage(ctx context.Context, zurl string, opt *OpenOptions) (IStorage,
return nil, fmt.Errorf("zodb: URL scheme \"%s://\" not supported", u.Scheme) return nil, fmt.Errorf("zodb: URL scheme \"%s://\" not supported", u.Scheme)
} }
drvWatchq := make(chan CommitEvent) drvWatchq := make(chan Event)
drvOpt := &DriverOptions{ drvOpt := &DriverOptions{
ReadOnly: opt.ReadOnly, ReadOnly: opt.ReadOnly,
Watchq: drvWatchq, Watchq: drvWatchq,
...@@ -131,7 +129,7 @@ func OpenStorage(ctx context.Context, zurl string, opt *OpenOptions) (IStorage, ...@@ -131,7 +129,7 @@ func OpenStorage(ctx context.Context, zurl string, opt *OpenOptions) (IStorage,
drvWatchq: drvWatchq, drvWatchq: drvWatchq,
drvHead: at0, drvHead: at0,
watchReq: make(chan watchRequest), watchReq: make(chan watchRequest),
watchTab: make(map[chan<- CommitEvent]struct{}), watchTab: make(map[chan<- Event]struct{}),
} }
go stor.watcher() // XXX stop on close go stor.watcher() // XXX stop on close
...@@ -150,10 +148,10 @@ type storage struct { ...@@ -150,10 +148,10 @@ type storage struct {
l1cache *Cache // can be =nil, if opened with NoCache l1cache *Cache // can be =nil, if opened with NoCache
// watcher // watcher
drvWatchq chan CommitEvent // watchq passed to driver drvWatchq chan Event // watchq passed to driver
drvHead Tid // last tid received from drvWatchq drvHead Tid // last tid received from drvWatchq
watchReq chan watchRequest // {Add,Del}Watch requests go here watchReq chan watchRequest // {Add,Del}Watch requests go here
watchTab map[chan<- CommitEvent]struct{} // registered watchers watchTab map[chan<- Event]struct{} // registered watchers
} }
// loading goes through cache - this way prefetching can work // loading goes through cache - this way prefetching can work
...@@ -183,9 +181,9 @@ func (s *storage) Prefetch(ctx context.Context, xid Xid) { ...@@ -183,9 +181,9 @@ func (s *storage) Prefetch(ctx context.Context, xid Xid) {
// watchRequest represents request to add/del a watch. // watchRequest represents request to add/del a watch.
type watchRequest struct { type watchRequest struct {
op watchOp // add or del op watchOp // add or del
ack chan Tid // when request processed: at0 for add, ø for del. ack chan Tid // when request processed: at0 for add, ø for del.
watchq chan<- CommitEvent // {Add,Del}Watch argument watchq chan<- Event // {Add,Del}Watch argument
} }
type watchOp int type watchOp int
...@@ -222,9 +220,18 @@ func (s *storage) watcher() { ...@@ -222,9 +220,18 @@ func (s *storage) watcher() {
return return
} }
// XXX verify event.Tid ↑ (else e.g. δtail.Append will panic) switch event := event.(type) {
// if !↑ - stop the storage with error. default:
s.drvHead = event.Tid panic(fmt.Sprintf("unexpected event: %T", event))
case *EventError:
// ok
case *EventCommit:
// XXX verify event.Tid ↑ (else e.g. δtail.Append will panic)
// if !↑ - stop the storage with error.
s.drvHead = event.Tid
}
// deliver event to all watchers // deliver event to all watchers
for watchq := range s.watchTab { for watchq := range s.watchTab {
...@@ -236,7 +243,7 @@ func (s *storage) watcher() { ...@@ -236,7 +243,7 @@ func (s *storage) watcher() {
} }
// AddWatch implements Watcher. // AddWatch implements Watcher.
func (s *storage) AddWatch(watchq chan<- CommitEvent) (at0 Tid) { func (s *storage) AddWatch(watchq chan<- Event) (at0 Tid) {
// XXX when already Closed? // XXX when already Closed?
ack := make(chan Tid) ack := make(chan Tid)
s.watchReq <- watchRequest{addWatch, ack, watchq} s.watchReq <- watchRequest{addWatch, ack, watchq}
...@@ -244,7 +251,7 @@ func (s *storage) AddWatch(watchq chan<- CommitEvent) (at0 Tid) { ...@@ -244,7 +251,7 @@ func (s *storage) AddWatch(watchq chan<- CommitEvent) (at0 Tid) {
} }
// DelWatch implements Watcher. // DelWatch implements Watcher.
func (s *storage) DelWatch(watchq chan<- CommitEvent) { func (s *storage) DelWatch(watchq chan<- Event) {
// XXX when already Closed? // XXX when already Closed?
ack := make(chan Tid) ack := make(chan Tid)
s.watchReq <- watchRequest{delWatch, ack, watchq} s.watchReq <- watchRequest{delWatch, ack, watchq}
......
...@@ -98,8 +98,8 @@ type FileStorage struct { ...@@ -98,8 +98,8 @@ type FileStorage struct {
txnhMax TxnHeader // (both with .Len=0 & .Tid=0 if database is empty) txnhMax TxnHeader // (both with .Len=0 & .Tid=0 if database is empty)
downErr error // !nil when the storage is no longer operational downErr error // !nil when the storage is no longer operational
// driver client <- watcher: database commits. // driver client <- watcher: database commits | errors.
watchq chan<- zodb.CommitEvent watchq chan<- zodb.Event
down chan struct{} // ready when storage is no longer operational down chan struct{} // ready when storage is no longer operational
downOnce sync.Once // shutdown may be due to both Close and IO error in watcher downOnce sync.Once // shutdown may be due to both Close and IO error in watcher
...@@ -496,6 +496,9 @@ func (fs *FileStorage) watcher(w *fsnotify.Watcher, errFirstRead chan<- error) { ...@@ -496,6 +496,9 @@ func (fs *FileStorage) watcher(w *fsnotify.Watcher, errFirstRead chan<- error) {
fs.shutdown(err) fs.shutdown(err)
if fs.watchq != nil { if fs.watchq != nil {
if err != nil {
fs.watchq <- &zodb.EventError{err}
}
close(fs.watchq) close(fs.watchq)
} }
} }
...@@ -692,7 +695,7 @@ mainloop: ...@@ -692,7 +695,7 @@ mainloop:
case <-fs.down: case <-fs.down:
return nil return nil
case fs.watchq <- zodb.CommitEvent{it.Txnh.Tid, δoid}: case fs.watchq <- &zodb.EventCommit{it.Txnh.Tid, δoid}:
// ok // ok
} }
} }
......
...@@ -377,7 +377,7 @@ func TestWatch(t *testing.T) { ...@@ -377,7 +377,7 @@ func TestWatch(t *testing.T) {
// force tfs creation & open tfs at go side // force tfs creation & open tfs at go side
at := xcommit(0, xtesting.ZRawObject{0, b("data0")}) at := xcommit(0, xtesting.ZRawObject{0, b("data0")})
watchq := make(chan zodb.CommitEvent) watchq := make(chan zodb.Event)
fs, at0 := xfsopenopt(t, tfs, &zodb.DriverOptions{ReadOnly: true, Watchq: watchq}) fs, at0 := xfsopenopt(t, tfs, &zodb.DriverOptions{ReadOnly: true, Watchq: watchq})
if at0 != at { if at0 != at {
t.Fatalf("opened @ %s ; want %s", at0, at) t.Fatalf("opened @ %s ; want %s", at0, at)
...@@ -423,10 +423,22 @@ func TestWatch(t *testing.T) { ...@@ -423,10 +423,22 @@ func TestWatch(t *testing.T) {
xtesting.ZRawObject{i, b(datai)}) xtesting.ZRawObject{i, b(datai)})
// TODO also test for watcher errors // TODO also test for watcher errors
e := <-watchq event := <-watchq
if objvWant := []zodb.Oid{0, i}; !(e.Tid == at && reflect.DeepEqual(e.Changev, objvWant)) { var δ *zodb.EventCommit
t.Fatalf("watch:\nhave: %s %s\nwant: %s %s", e.Tid, e.Changev, at, objvWant) switch event := event.(type) {
default:
panic(fmt.Sprintf("unexpected event: %T", event))
case *zodb.EventError:
t.Fatal(event.Err)
case *zodb.EventCommit:
δ = event
}
if objvWant := []zodb.Oid{0, i}; !(δ.Tid == at && reflect.DeepEqual(δ.Changev, objvWant)) {
t.Fatalf("watch:\nhave: %s %s\nwant: %s %s", δ.Tid, δ.Changev, at, objvWant)
} }
checkLastTid(at) checkLastTid(at)
......
...@@ -45,8 +45,8 @@ type zeo struct { ...@@ -45,8 +45,8 @@ type zeo struct {
mu sync.Mutex mu sync.Mutex
lastTid zodb.Tid lastTid zodb.Tid
// driver client <- watcher: database commits. // driver client <- watcher: database commits | errors.
watchq chan<- zodb.CommitEvent // FIXME stub watchq chan<- zodb.Event // FIXME stub
url string // we were opened via this url string // we were opened via this
} }
......
...@@ -428,9 +428,26 @@ type Committer interface { ...@@ -428,9 +428,26 @@ type Committer interface {
// TpcAbort(txn) // TpcAbort(txn)
} }
// Event represents one database event.
//
// Possible events are:
//
// - EventError an error happened
// - EventCommit a transaction was committed
type Event interface {
event()
}
func (_ *EventError) event() {}
func (_ *EventCommit) event() {}
// CommitEvent is event describing one observed database commit. // EventError is event descrbing an error observed by watcher.
type CommitEvent struct { type EventError struct {
Err error
}
// EventCommit is event describing one observed database commit.
type EventCommit struct {
Tid Tid // ID of committed transaction Tid Tid // ID of committed transaction
Changev []Oid // ID of objects changed by committed transaction Changev []Oid // ID of objects changed by committed transaction
} }
...@@ -453,10 +470,10 @@ type Watcher interface { ...@@ -453,10 +470,10 @@ type Watcher interface {
// Once registered, watchq must be read until DelWatch call. // Once registered, watchq must be read until DelWatch call.
// Not doing so will stuck whole storage. // Not doing so will stuck whole storage.
// //
// Multiple AddWatch calls with the same watchq register watchq only once. XXX // Registered watchq are closed when the database storage is closed.
// //
// XXX watchq closed when stor.watchq closed? // Multiple AddWatch calls with the same watchq register watchq only once. XXX
AddWatch(watchq chan<- CommitEvent) (at0 Tid) AddWatch(watchq chan<- Event) (at0 Tid)
// DelWatch unregisters watchq from being notified of database changes. // DelWatch unregisters watchq from being notified of database changes.
// //
...@@ -480,7 +497,7 @@ type Watcher interface { ...@@ -480,7 +497,7 @@ type Watcher interface {
// DelWatch is noop if watchq was not registered. // DelWatch is noop if watchq was not registered.
// //
// XXX also return curent head? // XXX also return curent head?
DelWatch(watchq chan<- CommitEvent) DelWatch(watchq chan<- Event)
} }
......
...@@ -17,9 +17,9 @@ ...@@ -17,9 +17,9 @@
// See COPYING file for full licensing terms. // See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options. // See https://www.nexedi.com/licensing for rationale and options.
// Zodbwatch - watch database for changes // Zodbwatch - watch ZODB database for changes
// //
// Zodbwatch watches deatbase for changes and prints information about // Zodbwatch watches database for changes and prints information about
// committed transactions. Output formats: // committed transactions. Output formats:
// //
// Plain: // Plain:
...@@ -67,7 +67,7 @@ func Watch(ctx context.Context, stor zodb.IStorage, w io.Writer, verbose bool) ( ...@@ -67,7 +67,7 @@ func Watch(ctx context.Context, stor zodb.IStorage, w io.Writer, verbose bool) (
return err return err
} }
watchq := make(chan zodb.CommitEvent) watchq := make(chan zodb.Event)
at0 := stor.AddWatch(watchq) at0 := stor.AddWatch(watchq)
defer stor.DelWatch(watchq) defer stor.DelWatch(watchq)
...@@ -81,13 +81,24 @@ func Watch(ctx context.Context, stor zodb.IStorage, w io.Writer, verbose bool) ( ...@@ -81,13 +81,24 @@ func Watch(ctx context.Context, stor zodb.IStorage, w io.Writer, verbose bool) (
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err() return ctx.Err()
case δ, ok := <-watchq: case event, ok := <-watchq:
if !ok { if !ok {
// XXX correct?
err = emitf("# storage closed") err = emitf("# storage closed")
return err return err
} }
var δ *zodb.EventCommit
switch event := event.(type) {
default:
panic(fmt.Sprintf("unexpected event: %T", event))
case *zodb.EventError:
return event.Err
case *zodb.EventCommit:
δ = event
}
err = emitf("txn %s\n", δ.Tid) err = emitf("txn %s\n", δ.Tid)
if err != nil { if err != nil {
return err return err
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment