Commit a6580062 authored by Kirill Smelkov's avatar Kirill Smelkov

go/zodb: Require drivers to provide at₀ on open

and to deliver to watchq only and all events in (at₀, +∞] range.

This continues 4d2c8b1d (go/zodb: Require drivers to provide
notifications for database change events) and makes initialization
semantics to process invalidations easier for both application-level ZODB
layer, and for low-level users that work with drivers directly:

- there is no possibility that an event will come from watchq with tid <
  current user head.

- there is no possibility that watcher will start delivering events
  after at₀, but not immediately after it, i.e. users can rely that they
  won't loose an event.

This correctness invariants should be easy to provide in drivers.
parent 2799f995
...@@ -47,12 +47,18 @@ type DriverOptions struct { ...@@ -47,12 +47,18 @@ type DriverOptions struct {
// //
// The storage driver closes !nil Watchq when the driver is closed. // The storage driver closes !nil Watchq when the driver is closed.
// //
// The storage driver will send only and all events in (at₀, +∞] range,
// where at₀ is at returned by driver open.
//
// TODO extend watchq to also receive errors from watcher. // TODO extend watchq to also receive errors from watcher.
Watchq chan<- CommitEvent Watchq chan<- CommitEvent
} }
// DriverOpener is a function to open a storage driver. // DriverOpener is a function to open a storage driver.
type DriverOpener func (ctx context.Context, u *url.URL, opt *DriverOptions) (IStorageDriver, error) //
// at₀ gives database state at open time. The driver will send to Watchq (see
// DriverOptions) only and all events in (at₀, +∞] range.
type DriverOpener func (ctx context.Context, u *url.URL, opt *DriverOptions) (_ IStorageDriver, at0 Tid, _ error)
// {} scheme -> DriverOpener // {} scheme -> DriverOpener
var driverRegistry = map[string]DriverOpener{} var driverRegistry = map[string]DriverOpener{}
...@@ -98,7 +104,7 @@ func OpenStorage(ctx context.Context, storageURL string, opt *OpenOptions) (ISto ...@@ -98,7 +104,7 @@ func OpenStorage(ctx context.Context, storageURL string, opt *OpenOptions) (ISto
Watchq: nil, // TODO use watchq to implement high-level watching Watchq: nil, // TODO use watchq to implement high-level watching
} }
storDriver, err := opener(ctx, u, drvOpt) storDriver, _, err := opener(ctx, u, drvOpt) // TODO use at0 to initialize watcher δtail
if err != nil { if err != nil {
return nil, err return nil, err
} }
......
...@@ -717,10 +717,10 @@ func (fs *FileStorage) Close() error { ...@@ -717,10 +717,10 @@ func (fs *FileStorage) Close() error {
} }
// Open opens FileStorage @path. // Open opens FileStorage @path.
func Open(ctx context.Context, path string, opt *zodb.DriverOptions) (_ *FileStorage, err error) { func Open(ctx context.Context, path string, opt *zodb.DriverOptions) (_ *FileStorage, at0 zodb.Tid, err error) {
// TODO read-write support // TODO read-write support
if !opt.ReadOnly { if !opt.ReadOnly {
return nil, fmt.Errorf("fs1: %s: TODO write mode not implemented", path) return nil, zodb.InvalidTid, fmt.Errorf("fs1: %s: TODO write mode not implemented", path)
} }
fs := &FileStorage{ fs := &FileStorage{
...@@ -730,7 +730,7 @@ func Open(ctx context.Context, path string, opt *zodb.DriverOptions) (_ *FileSto ...@@ -730,7 +730,7 @@ func Open(ctx context.Context, path string, opt *zodb.DriverOptions) (_ *FileSto
f, err := os.Open(path) f, err := os.Open(path)
if err != nil { if err != nil {
return nil, err return nil, zodb.InvalidTid, err
} }
fs.file = f fs.file = f
defer func() { defer func() {
...@@ -745,7 +745,7 @@ func Open(ctx context.Context, path string, opt *zodb.DriverOptions) (_ *FileSto ...@@ -745,7 +745,7 @@ func Open(ctx context.Context, path string, opt *zodb.DriverOptions) (_ *FileSto
fh := FileHeader{} fh := FileHeader{}
err = fh.Load(f) err = fh.Load(f)
if err != nil { if err != nil {
return nil, err return nil, zodb.InvalidTid, err
} }
// load index // load index
...@@ -778,7 +778,7 @@ func Open(ctx context.Context, path string, opt *zodb.DriverOptions) (_ *FileSto ...@@ -778,7 +778,7 @@ func Open(ctx context.Context, path string, opt *zodb.DriverOptions) (_ *FileSto
checkTailGarbage = true checkTailGarbage = true
} }
if err != nil { if err != nil {
return nil, err return nil, zodb.InvalidTid, err
} }
fs.index = index fs.index = index
...@@ -789,7 +789,7 @@ func Open(ctx context.Context, path string, opt *zodb.DriverOptions) (_ *FileSto ...@@ -789,7 +789,7 @@ func Open(ctx context.Context, path string, opt *zodb.DriverOptions) (_ *FileSto
err = fs.txnhMin.Load(f, txnValidFrom, LoadAll) err = fs.txnhMin.Load(f, txnValidFrom, LoadAll)
err = noEOF(err) err = noEOF(err)
if err != nil { if err != nil {
return nil, err return nil, zodb.InvalidTid, err
} }
_ = fs.txnhMax.Load(f, index.TopPos, LoadAll) _ = fs.txnhMax.Load(f, index.TopPos, LoadAll)
...@@ -798,19 +798,21 @@ func Open(ctx context.Context, path string, opt *zodb.DriverOptions) (_ *FileSto ...@@ -798,19 +798,21 @@ func Open(ctx context.Context, path string, opt *zodb.DriverOptions) (_ *FileSto
// that we read .LenPrev ok. // that we read .LenPrev ok.
switch fs.txnhMax.LenPrev { switch fs.txnhMax.LenPrev {
case -1: case -1:
return nil, fmt.Errorf("%s: could not read LenPrev @%d (last transaction)", f.Name(), fs.txnhMax.Pos) return nil, zodb.InvalidTid, fmt.Errorf("%s: could not read LenPrev @%d (last transaction)", f.Name(), fs.txnhMax.Pos)
case 0: case 0:
return nil, fmt.Errorf("%s: could not read LenPrev @%d (last transaction): unexpected EOF backward", f.Name(), fs.txnhMax.Pos) return nil, zodb.InvalidTid, fmt.Errorf("%s: could not read LenPrev @%d (last transaction): unexpected EOF backward", f.Name(), fs.txnhMax.Pos)
default: default:
// .LenPrev is ok - read last previous record // .LenPrev is ok - read last previous record
err = fs.txnhMax.LoadPrev(f, LoadAll) err = fs.txnhMax.LoadPrev(f, LoadAll)
if err != nil { if err != nil {
return nil, err return nil, zodb.InvalidTid, err
} }
} }
} }
at0 = fs.txnhMax.Tid
// there might be simultaneous updates to the data file from outside. // there might be simultaneous updates to the data file from outside.
// launch the watcher who will observe them. // launch the watcher who will observe them.
// //
...@@ -818,12 +820,12 @@ func Open(ctx context.Context, path string, opt *zodb.DriverOptions) (_ *FileSto ...@@ -818,12 +820,12 @@ func Open(ctx context.Context, path string, opt *zodb.DriverOptions) (_ *FileSto
// race of missing early file writes. // race of missing early file writes.
w, err := fsnotify.NewWatcher() w, err := fsnotify.NewWatcher()
if err != nil { if err != nil {
return nil, err return nil, zodb.InvalidTid, err
} }
err = w.Add(f.Name()) err = w.Add(f.Name())
if err != nil { if err != nil {
w.Close() // XXX lclose w.Close() // XXX lclose
return nil, err return nil, zodb.InvalidTid, err
} }
var errFirstRead chan error var errFirstRead chan error
...@@ -838,14 +840,14 @@ func Open(ctx context.Context, path string, opt *zodb.DriverOptions) (_ *FileSto ...@@ -838,14 +840,14 @@ func Open(ctx context.Context, path string, opt *zodb.DriverOptions) (_ *FileSto
if checkTailGarbage { if checkTailGarbage {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return nil, ctx.Err() return nil, zodb.InvalidTid, ctx.Err()
case err = <-errFirstRead: case err = <-errFirstRead:
if err != nil { if err != nil {
return nil, err // it was garbage return nil, zodb.InvalidTid, err // it was garbage
} }
} }
} }
return fs, nil return fs, at0, nil
} }
...@@ -114,22 +114,22 @@ func checkLoad(t *testing.T, fs *FileStorage, xid zodb.Xid, expect objState) { ...@@ -114,22 +114,22 @@ func checkLoad(t *testing.T, fs *FileStorage, xid zodb.Xid, expect objState) {
} }
} }
func xfsopen(t testing.TB, path string) *FileStorage { func xfsopen(t testing.TB, path string) (*FileStorage, zodb.Tid) {
t.Helper() t.Helper()
return xfsopenopt(t, path, &zodb.DriverOptions{ReadOnly: true}) return xfsopenopt(t, path, &zodb.DriverOptions{ReadOnly: true})
} }
func xfsopenopt(t testing.TB, path string, opt *zodb.DriverOptions) *FileStorage { func xfsopenopt(t testing.TB, path string, opt *zodb.DriverOptions) (*FileStorage, zodb.Tid) {
t.Helper() t.Helper()
fs, err := Open(context.Background(), path, opt) fs, at0, err := Open(context.Background(), path, opt)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
return fs return fs, at0
} }
func TestLoad(t *testing.T) { func TestLoad(t *testing.T) {
fs := xfsopen(t, "testdata/1.fs") fs, _ := xfsopen(t, "testdata/1.fs")
defer exc.XRun(fs.Close) defer exc.XRun(fs.Close)
// current knowledge of what was "before" for an oid as we scan over // current knowledge of what was "before" for an oid as we scan over
...@@ -278,7 +278,7 @@ func testIterate(t *testing.T, fs *FileStorage, tidMin, tidMax zodb.Tid, expectv ...@@ -278,7 +278,7 @@ func testIterate(t *testing.T, fs *FileStorage, tidMin, tidMax zodb.Tid, expectv
} }
func TestIterate(t *testing.T) { func TestIterate(t *testing.T) {
fs := xfsopen(t, "testdata/1.fs") fs, _ := xfsopen(t, "testdata/1.fs")
defer exc.XRun(fs.Close) defer exc.XRun(fs.Close)
// all []tids in test database // all []tids in test database
...@@ -314,7 +314,7 @@ func TestIterate(t *testing.T) { ...@@ -314,7 +314,7 @@ func TestIterate(t *testing.T) {
} }
func BenchmarkIterate(b *testing.B) { func BenchmarkIterate(b *testing.B) {
fs := xfsopen(b, "testdata/1.fs") fs, _ := xfsopen(b, "testdata/1.fs")
defer exc.XRun(fs.Close) defer exc.XRun(fs.Close)
ctx := context.Background() ctx := context.Background()
...@@ -412,7 +412,10 @@ func TestWatch(t *testing.T) { ...@@ -412,7 +412,10 @@ func TestWatch(t *testing.T) {
at := xcommit(0, Object{0, "data0"}) at := xcommit(0, Object{0, "data0"})
watchq := make(chan zodb.CommitEvent) watchq := make(chan zodb.CommitEvent)
fs := xfsopenopt(t, tfs, &zodb.DriverOptions{ReadOnly: true, Watchq: watchq}) fs, at0 := xfsopenopt(t, tfs, &zodb.DriverOptions{ReadOnly: true, Watchq: watchq})
if at0 != at {
t.Fatalf("opened @ %s ; want %s", at0, at)
}
ctx := context.Background() ctx := context.Background()
checkLastTid := func(lastOk zodb.Tid) { checkLastTid := func(lastOk zodb.Tid) {
...@@ -510,10 +513,13 @@ func TestOpenRecovery(t *testing.T) { ...@@ -510,10 +513,13 @@ func TestOpenRecovery(t *testing.T) {
} }
for _, l := range lok { for _, l := range lok {
checkL(t, l, func(t *testing.T, tfs string) { checkL(t, l, func(t *testing.T, tfs string) {
fs := xfsopen(t, tfs) fs, at0 := xfsopen(t, tfs)
defer func() { defer func() {
err = fs.Close(); X(err) err = fs.Close(); X(err)
}() }()
if at0 != lastTidOk {
t.Fatalf("at0: %s ; expected: %s", at0, lastTidOk)
}
head, err := fs.LastTid(ctx); X(err) head, err := fs.LastTid(ctx); X(err)
if head != lastTidOk { if head != lastTidOk {
t.Fatalf("last_tid: %s ; expected: %s", head, lastTidOk) t.Fatalf("last_tid: %s ; expected: %s", head, lastTidOk)
...@@ -525,7 +531,7 @@ func TestOpenRecovery(t *testing.T) { ...@@ -525,7 +531,7 @@ func TestOpenRecovery(t *testing.T) {
// XXX better check 0..sizeof(txnh)-1 but in this range each check is slow. // XXX better check 0..sizeof(txnh)-1 but in this range each check is slow.
for _, l := range []int{TxnHeaderFixSize-1,1} { for _, l := range []int{TxnHeaderFixSize-1,1} {
checkL(t, l, func(t *testing.T, tfs string) { checkL(t, l, func(t *testing.T, tfs string) {
_, err := Open(ctx, tfs, &zodb.DriverOptions{ReadOnly: true}) _, _, err := Open(ctx, tfs, &zodb.DriverOptions{ReadOnly: true})
estr := "" estr := ""
if err != nil { if err != nil {
estr = err.Error() estr = err.Error()
......
...@@ -27,13 +27,12 @@ import ( ...@@ -27,13 +27,12 @@ import (
"lab.nexedi.com/kirr/neo/go/zodb" "lab.nexedi.com/kirr/neo/go/zodb"
) )
func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (zodb.IStorageDriver, error) { func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (zodb.IStorageDriver, zodb.Tid, error) {
// TODO handle query // TODO handle query
// XXX u.Path is not always raw path - recheck and fix // XXX u.Path is not always raw path - recheck and fix
path := u.Host + u.Path path := u.Host + u.Path
fs, err := Open(ctx, path, opt) return Open(ctx, path, opt)
return fs, err
} }
func init() { func init() {
......
...@@ -283,7 +283,7 @@ func (r rpc) ereplyf(format string, argv ...interface{}) *errorUnexpectedReply { ...@@ -283,7 +283,7 @@ func (r rpc) ereplyf(format string, argv ...interface{}) *errorUnexpectedReply {
// ---- open ---- // ---- open ----
func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb.IStorageDriver, err error) { func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb.IStorageDriver, at0 zodb.Tid, err error) {
url := u.String() url := u.String()
defer xerr.Contextf(&err, "open %s:", url) defer xerr.Contextf(&err, "open %s:", url)
...@@ -307,11 +307,12 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb ...@@ -307,11 +307,12 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb
} }
if !opt.ReadOnly { if !opt.ReadOnly {
return nil, fmt.Errorf("TODO write mode not implemented") return nil, zodb.InvalidTid, fmt.Errorf("TODO write mode not implemented")
} }
// FIXME handle opt.Watchq // FIXME handle opt.Watchq
// for now we pretend as if the database is not changing. // for now we pretend as if the database is not changing.
// TODO watcher(when implementing): filter-out first < at0 messages.
if opt.Watchq != nil { if opt.Watchq != nil {
log.Print("zeo: FIXME: watchq support not implemented - there" + log.Print("zeo: FIXME: watchq support not implemented - there" +
"won't be notifications about database changes") "won't be notifications about database changes")
...@@ -319,7 +320,7 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb ...@@ -319,7 +320,7 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb
zl, err := dialZLink(ctx, net, addr) zl, err := dialZLink(ctx, net, addr)
if err != nil { if err != nil {
return nil, err return nil, zodb.InvalidTid, err
} }
defer func() { defer func() {
...@@ -334,7 +335,7 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb ...@@ -334,7 +335,7 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb
rpc := z.rpc("register") rpc := z.rpc("register")
xlastTid, err := rpc.call(ctx, storageID, opt.ReadOnly) xlastTid, err := rpc.call(ctx, storageID, opt.ReadOnly)
if err != nil { if err != nil {
return nil, err return nil, zodb.InvalidTid, err
} }
// register returns last_tid in ZEO5 but nothing earlier. // register returns last_tid in ZEO5 but nothing earlier.
...@@ -343,17 +344,24 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb ...@@ -343,17 +344,24 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb
rpc = z.rpc("lastTransaction") rpc = z.rpc("lastTransaction")
xlastTid, err = rpc.call(ctx) xlastTid, err = rpc.call(ctx)
if err != nil { if err != nil {
return nil, err return nil, zodb.InvalidTid, err
} }
} }
lastTid, ok := tidUnpack(xlastTid) // XXX -> xlastTid -> scan lastTid, ok := tidUnpack(xlastTid) // XXX -> xlastTid -> scan
if !ok { if !ok {
return nil, rpc.ereplyf("got %v; expect tid", xlastTid) return nil, zodb.InvalidTid, rpc.ereplyf("got %v; expect tid", xlastTid)
} }
z.lastTid = lastTid z.lastTid = lastTid
// XXX since we read lastTid, at least with ZEO < 5, in separate RPC
// call, there is a chance, that by the time when lastTid was read some
// new transactions were committed. This way lastTid will be > than
// some first transactions received by watcher via
// "invalidateTransaction" server notification.
at0 = lastTid
//call('get_info') -> {}str->str, ex // XXX can be omitted //call('get_info') -> {}str->str, ex // XXX can be omitted
/* /*
...@@ -371,7 +379,7 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb ...@@ -371,7 +379,7 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb
'supports_record_iternext': True}) 'supports_record_iternext': True})
*/ */
return z, nil return z, at0, nil
} }
func (z *zeo) Close() error { func (z *zeo) Close() error {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment