Commit e0d59f5d authored by Kirill Smelkov's avatar Kirill Smelkov

go/zodb/zodbtools: Watch

Add new `zodb watch` command that is both demonstration of just added
ZODB Watch API (see previous patch), and could be useful for a ZODB
database administration / monitoring.

See zodbtools/watch.go for description.
parent c41c2907
......@@ -30,6 +30,7 @@ var commands = prog.CommandRegistry{
{"info", infoSummary, infoUsage, infoMain},
{"dump", dumpSummary, dumpUsage, dumpMain},
{"catobj", catobjSummary, catobjUsage, catobjMain},
{"watch", watchSummary, watchUsage, watchMain},
}
// main zodbtools driver
......
// Copyright (C) 2019 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
// Zodbwatch - watch ZODB database for changes
//
// Zodbwatch watches database for changes and prints information about
// committed transactions. Output formats:
//
// Plain:
//
// # at <tid>
// txn <tid>
// txn <tid>
// ...
//
// Verbose:
//
// # at <tid>
// txn <tid>
// obj <oid>
// obj ...
// ...
// LF
// txn <tid>
// ...
//
// TODO add support for emitting transaction in zodbdump format.
package zodbtools
import (
"context"
"flag"
"fmt"
"io"
"os"
"lab.nexedi.com/kirr/go123/prog"
"lab.nexedi.com/kirr/go123/xerr"
"lab.nexedi.com/kirr/neo/go/zodb"
)
// Watch watches for database changes and prints them to w.
//
// see top-level documentation for output format.
func Watch(ctx context.Context, stor zodb.IStorage, w io.Writer, verbose bool) (err error) {
defer xerr.Contextf(&err, "%s: watch", stor.URL())
emitf := func(format string, argv ...interface{}) error {
_, err := fmt.Fprintf(w, format, argv...)
return err
}
watchq := make(chan zodb.Event)
at0 := stor.AddWatch(watchq)
defer stor.DelWatch(watchq)
err = emitf("# at %s\n", at0)
if err != nil {
return err
}
for {
select {
case <-ctx.Done():
return ctx.Err()
case event, ok := <-watchq:
if !ok {
err = emitf("# storage closed")
return err
}
var δ *zodb.EventCommit
switch event := event.(type) {
default:
panic(fmt.Sprintf("unexpected event: %T", event))
case *zodb.EventError:
return event.Err
case *zodb.EventCommit:
δ = event
}
err = emitf("txn %s\n", δ.Tid)
if err != nil {
return err
}
if verbose {
for _, oid := range δ.Changev {
err = emitf("obj %s\n", oid)
if err != nil {
return err
}
}
err = emitf("\n")
if err != nil {
return err
}
}
}
}
}
// ----------------------------------------
const watchSummary = "watch ZODB database for changes"
func watchUsage(w io.Writer) {
fmt.Fprintf(w,
`Usage: zodb watch [OPTIONS] <storage>
Watch ZODB database for changes.
<storage> is an URL (see 'zodb help zurl') of a ZODB-storage.
Options:
-h --help this help text.
-v verbose mode.
`)
}
func watchMain(argv []string) {
verbose := false
flags := flag.FlagSet{Usage: func() { watchUsage(os.Stderr) }}
flags.Init("", flag.ExitOnError)
flags.BoolVar(&verbose, "v", verbose, "verbose mode")
flags.Parse(argv[1:])
argv = flags.Args()
if len(argv) != 1 {
flags.Usage()
prog.Exit(2)
}
zurl := argv[0]
ctx := context.Background()
stor, err := zodb.OpenStorage(ctx, zurl, &zodb.OpenOptions{ReadOnly: true})
if err != nil {
prog.Fatal(err)
}
// TODO defer stor.Close()
err = Watch(ctx, stor, os.Stdout, verbose)
if err != nil {
prog.Fatal(err)
}
}
// Copyright (C) 2019 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
package zodbtools
import (
"bufio"
"context"
"fmt"
"io"
"io/ioutil"
"os"
"testing"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
"lab.nexedi.com/kirr/go123/exc"
"lab.nexedi.com/kirr/neo/go/internal/xtesting"
"lab.nexedi.com/kirr/neo/go/zodb"
)
func TestWatch(t *testing.T) {
X := exc.Raiseif
xtesting.NeedPy(t, "zodbtools")
work, err := ioutil.TempDir("", "t-zodbwatch"); X(err)
defer os.RemoveAll(work)
tfs := work + "/t.fs"
// force tfs creation
at := zodb.Tid(0)
xcommit := func(objv ...xtesting.ZRawObject) {
t.Helper()
var err error
at, err = xtesting.ZPyCommitRaw(tfs, at, objv...)
if err != nil {
t.Fatal(err)
}
}
obj := func(oid zodb.Oid, data string) xtesting.ZRawObject {
return xtesting.ZRawObject{oid, []byte(data)}
}
xcommit(obj(0, "data0"))
// open tfs at go side
bg := context.Background()
stor, err := zodb.OpenStorage(bg, tfs, &zodb.OpenOptions{ReadOnly: true}); X(err)
// spawn plain and verbose watchers
ctx0, cancel := context.WithCancel(bg)
wg, ctx := errgroup.WithContext(ctx0)
// gowatch spawns Watch(verbose) and returns expectf() func that is
// connected to verify Watch output.
gowatch := func(verbose bool) /*expectf*/func(format string, argv ...interface{}) {
pr, pw := io.Pipe()
wg.Go(func() error {
return Watch(ctx, stor, pw, verbose)
})
r := bufio.NewReader(pr)
expectf := func(format string, argv ...interface{}) {
t.Helper()
l, err := r.ReadString('\n')
if err != nil {
t.Fatalf("expect: %s", err)
}
l = l[:len(l)-1] // trim trailing \n
line := fmt.Sprintf(format, argv...)
if l != line {
t.Fatalf("expect\nhave: %q\nwant: %q", l, line)
}
}
return expectf
}
pexpect := gowatch(false)
vexpect := gowatch(true)
// initial header
pexpect("# at %s", at)
vexpect("# at %s", at)
// commit -> output
xcommit(obj(0, "data01"))
pexpect("txn %s", at)
vexpect("txn %s", at)
vexpect("obj 0000000000000000")
vexpect("")
// commit -> output
xcommit(obj(1, "data1"), obj(2, "data2"))
pexpect("txn %s", at)
vexpect("txn %s", at)
vexpect("obj 0000000000000001")
vexpect("obj 0000000000000002")
vexpect("")
cancel()
err = wg.Wait()
ecause := errors.Cause(err)
if ecause != context.Canceled {
t.Fatalf("finished: err: expected 'canceled' cause; got %q", err)
}
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment