Commit 7f56c92b authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent bbe72558
...@@ -376,10 +376,10 @@ func (c *Client) initFromMaster(ctx context.Context, mlink *neonet.NodeLink) (er ...@@ -376,10 +376,10 @@ func (c *Client) initFromMaster(ctx context.Context, mlink *neonet.NodeLink) (er
// --- user API calls --- // --- user API calls ---
func (c *Client) LastTid(ctx context.Context) (_ zodb.Tid, err error) { func (c *Client) Sync(ctx context.Context) (_ zodb.Tid, err error) {
defer func() { defer func() {
if err != nil { if err != nil {
err = &zodb.OpError{URL: c.URL(), Op: "last_tid", Args: nil, Err: err} err = &zodb.OpError{URL: c.URL(), Op: "sync", Args: nil, Err: err}
} }
}() }()
...@@ -539,7 +539,7 @@ func openClientByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) ( ...@@ -539,7 +539,7 @@ func openClientByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (
// TODO change NEO protocol so that when C connects to M, M sends it // TODO change NEO protocol so that when C connects to M, M sends it
// current head and guarantees to send only followup invalidation // current head and guarantees to send only followup invalidation
// updates. // updates.
at0, err := c.LastTid(ctx) at0, err := c.Sync(ctx)
if err != nil { if err != nil {
c.Close() // XXX lclose c.Close() // XXX lclose
return nil, zodb.InvalidTid, fmt.Errorf("neo: open %q: %s", u, err) return nil, zodb.InvalidTid, fmt.Errorf("neo: open %q: %s", u, err)
......
// Copyright (C) 2017-2018 Nexedi SA and Contributors. // Copyright (C) 2017-2019 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com> // Kirill Smelkov <kirr@nexedi.com>
// //
// This program is free software: you can Use, Study, Modify and Redistribute // This program is free software: you can Use, Study, Modify and Redistribute
...@@ -169,7 +169,7 @@ func TestMasterStorage(t0 *testing.T) { ...@@ -169,7 +169,7 @@ func TestMasterStorage(t0 *testing.T) {
})) }))
lastOid, err1 := zstor.LastOid(bg) lastOid, err1 := zstor.LastOid(bg)
lastTid, err2 := zstor.LastTid(bg) lastTid, err2 := zstor.Sync(bg)
exc.Raiseif(xerr.Merge(err1, err2)) exc.Raiseif(xerr.Merge(err1, err2))
tMS.Expect(conntx("m:2", "s:2", 8, &proto.LastIDs{})) tMS.Expect(conntx("m:2", "s:2", 8, &proto.LastIDs{}))
tMS.Expect(conntx("s:2", "m:2", 8, &proto.AnswerLastIDs{ tMS.Expect(conntx("s:2", "m:2", 8, &proto.AnswerLastIDs{
...@@ -250,7 +250,7 @@ func TestMasterStorage(t0 *testing.T) { ...@@ -250,7 +250,7 @@ func TestMasterStorage(t0 *testing.T) {
// C asks M about last tid XXX better master sends it itself on new client connected // C asks M about last tid XXX better master sends it itself on new client connected
wg = &errgroup.Group{} wg = &errgroup.Group{}
gox(wg, func() { gox(wg, func() {
cLastTid, err := C.LastTid(bg) cLastTid, err := C.Sync(bg)
exc.Raiseif(err) exc.Raiseif(err)
if cLastTid != lastTid { if cLastTid != lastTid {
......
...@@ -58,7 +58,7 @@ func Open(ctx context.Context, path string) (*Backend, error) { ...@@ -58,7 +58,7 @@ func Open(ctx context.Context, path string) (*Backend, error) {
func (f *Backend) LastTid(ctx context.Context) (zodb.Tid, error) { func (f *Backend) LastTid(ctx context.Context) (zodb.Tid, error) {
return f.zstor.LastTid(ctx) return f.zstor.Sync(ctx)
} }
func (f *Backend) LastOid(ctx context.Context) (zodb.Oid, error) { func (f *Backend) LastOid(ctx context.Context) (zodb.Oid, error) {
......
// Copyright (C) 2017-2018 Nexedi SA and Contributors. // Copyright (C) 2017-2019 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com> // Kirill Smelkov <kirr@nexedi.com>
// //
// This program is free software: you can Use, Study, Modify and Redistribute // This program is free software: you can Use, Study, Modify and Redistribute
...@@ -181,10 +181,7 @@ func zhash(ctx context.Context, url string, h hasher, useprefetch bool, bench, c ...@@ -181,10 +181,7 @@ func zhash(ctx context.Context, url string, h hasher, useprefetch bool, bench, c
lastTid, err := stor.LastTid(ctx) at := stor.Head()
if err != nil {
return err
}
tstart := time.Now() tstart := time.Now()
...@@ -192,7 +189,7 @@ func zhash(ctx context.Context, url string, h hasher, useprefetch bool, bench, c ...@@ -192,7 +189,7 @@ func zhash(ctx context.Context, url string, h hasher, useprefetch bool, bench, c
nread := 0 nread := 0
loop: loop:
for { for {
xid := zodb.Xid{Oid: oid, At: lastTid} xid := zodb.Xid{Oid: oid, At: at}
if xid.Oid % nprefetch == 0 { if xid.Oid % nprefetch == 0 {
prefetchBlk(ctx, xid) prefetchBlk(ctx, xid)
} }
...@@ -405,9 +402,9 @@ func zwrkPreconnect(ctx context.Context, url string, at zodb.Tid, nwrk int) (_ [ ...@@ -405,9 +402,9 @@ func zwrkPreconnect(ctx context.Context, url string, at zodb.Tid, nwrk int) (_ [
storv[i] = stor storv[i] = stor
// storage to warm-up the connection // storage to warm-up the connection
// ( in case of NEO LastTid connects to master and Load // ( in case of NEO Sync connects to master and Load
// - to a storage ) // - to a storage )
_, err = stor.LastTid(ctx) err = stor.Sync(ctx)
if err != nil { if err != nil {
return err return err
} }
...@@ -456,15 +453,12 @@ func zwrkPrepare(ctx context.Context, url string, h hasher, check string) (at zo ...@@ -456,15 +453,12 @@ func zwrkPrepare(ctx context.Context, url string, h hasher, check string) (at zo
err = xerr.First(err, err2) err = xerr.First(err, err2)
}() }()
lastTid, err := stor.LastTid(ctx) at = stor.Head()
if err != nil {
return 0, nil, err
}
oid := zodb.Oid(0) oid := zodb.Oid(0)
loop: loop:
for { for {
xid := zodb.Xid{Oid: oid, At: lastTid} xid := zodb.Xid{Oid: oid, At: at}
buf, _, err := stor.Load(ctx, xid) buf, _, err := stor.Load(ctx, xid)
if err != nil { if err != nil {
switch errors.Cause(err).(type) { switch errors.Cause(err).(type) {
...@@ -494,7 +488,7 @@ loop: ...@@ -494,7 +488,7 @@ loop:
} }
} }
return lastTid, objcheckv, nil return at, objcheckv, nil
} }
// ---------------------------------------- // ----------------------------------------
......
...@@ -385,13 +385,12 @@ func (db *DB) Open(ctx context.Context, opt *ConnOptions) (_ *Connection, err er ...@@ -385,13 +385,12 @@ func (db *DB) Open(ctx context.Context, opt *ConnOptions) (_ *Connection, err er
// sync storage for lastTid // sync storage for lastTid
var err error var err error
// XXX stor.LastTid returns last_tid storage itself err = db.stor.Sync(ctx)
// received on server, not last_tid on server.
// -> add stor.Sync() ?
at, err = db.stor.LastTid(ctx)
if err != nil { if err != nil {
return nil, err return nil, err
} }
at = db.stor.Head()
} }
} }
......
...@@ -128,14 +128,14 @@ func OpenStorage(ctx context.Context, zurl string, opt *OpenOptions) (IStorage, ...@@ -128,14 +128,14 @@ func OpenStorage(ctx context.Context, zurl string, opt *OpenOptions) (IStorage,
l1cache: cache, l1cache: cache,
down: make(chan struct{}), down: make(chan struct{}),
head: at0,
drvWatchq: drvWatchq, drvWatchq: drvWatchq,
drvHead: at0,
watchReq: make(chan watchRequest), watchReq: make(chan watchRequest),
watchTab: make(map[chan<- Event]struct{}), watchTab: make(map[chan<- Event]struct{}),
watchCancel: make(map[chan<- Event]chan struct{}), watchCancel: make(map[chan<- Event]chan struct{}),
} }
go stor.watcher() // stoped on close go stor.watcher() // stopped on close
return stor, nil return stor, nil
} }
...@@ -145,7 +145,7 @@ func OpenStorage(ctx context.Context, zurl string, opt *OpenOptions) (IStorage, ...@@ -145,7 +145,7 @@ func OpenStorage(ctx context.Context, zurl string, opt *OpenOptions) (IStorage,
// storage represents storage opened via OpenStorage. // storage represents storage opened via OpenStorage.
// //
// it provides a small cache on top of raw storage driver to implement prefetch // it provides a small cache on top of raw storage driver to implement prefetch
// and other storage-independed higher-level functionality. // and other storage-independent higher-level functionality.
type storage struct { type storage struct {
driver IStorageDriver driver IStorageDriver
l1cache *Cache // can be =nil, if opened with NoCache l1cache *Cache // can be =nil, if opened with NoCache
...@@ -155,8 +155,9 @@ type storage struct { ...@@ -155,8 +155,9 @@ type storage struct {
downErr error // reason for shutdown downErr error // reason for shutdown
// watcher // watcher
headMu sync.Mutex
head Tid // local view of storage head; mutated by watcher only
drvWatchq chan Event // watchq passed to driver drvWatchq chan Event // watchq passed to driver
drvHead Tid // last tid received from drvWatchq
watchReq chan watchRequest // {Add,Del}Watch requests go here watchReq chan watchRequest // {Add,Del}Watch requests go here
watchTab map[chan<- Event]struct{} // registered watchers watchTab map[chan<- Event]struct{} // registered watchers
...@@ -190,16 +191,66 @@ func (s *storage) Close() error { ...@@ -190,16 +191,66 @@ func (s *storage) Close() error {
// loading goes through cache - this way prefetching can work // loading goes through cache - this way prefetching can work
func (s *storage) LastTid(ctx context.Context) (Tid, error) { func (s *storage) Head() Tid {
// XXX LastTid - report only LastTid for which cache is ready? s.headMu.Lock()
// or driver.LastTid(), then wait cache is ready? head := s.head
s.headMu.Unlock()
return head
}
// XXX place -> near watcher
func (s *storage) Sync(ctx context.Context) (err error) {
defer func() {
if err != nil {
err = s.zerr("sync", nil, err)
}
}()
// XXX better -> xcontext.Merge(ctx, s.opCtx) but currently it costs 1+ goroutine // XXX better -> xcontext.Merge(ctx, s.opCtx) but currently it costs 1+ goroutine
if ready(s.down) { if ready(s.down) {
return InvalidTid, s.zerr("last_tid", nil, s.downErr) return s.downErr
}
head, err := s.driver.Sync(ctx)
if err != nil {
return err
}
// XXX check that driver returns head↑
// wait till .head >= head
watchq := make(chan Event)
at := s.AddWatch(watchq)
defer s.DelWatch(watchq)
for at < head {
select {
case <-ctx.Done():
return ctx.Err()
case <-s.down:
return s.downErr
case event, ok := <-watchq:
if !ok {
// closed
return s.downErr // XXX ok? sync on .down?
}
switch e := event.(type) {
default:
panic("XXX") // XXX
case *EventError:
return e.Err
case *EventCommit:
at = e.Tid
}
}
} }
return s.driver.LastTid(ctx) return nil
} }
// Load implements Loader. // Load implements Loader.
...@@ -287,10 +338,10 @@ func (s *storage) _watcher() error { ...@@ -287,10 +338,10 @@ func (s *storage) _watcher() error {
panic("bad watch request op") panic("bad watch request op")
} }
req.ack <- s.drvHead req.ack <- s.head
} }
// close all subscribers's watchq on watcher shutdow // close all subscribers's watchq on watcher shutdown
defer func() { defer func() {
addqFlush() addqFlush()
for watchq := range s.watchTab { for watchq := range s.watchTab {
...@@ -327,13 +378,15 @@ func (s *storage) _watcher() error { ...@@ -327,13 +378,15 @@ func (s *storage) _watcher() error {
case *EventCommit: case *EventCommit:
// verify event.Tid ↑ (else e.g. δtail.Append will panic) // verify event.Tid ↑ (else e.g. δtail.Append will panic)
// if !↑ - stop the storage with error. // if !↑ - stop the storage with error.
if !(e.Tid > s.drvHead) { if !(e.Tid > s.head) {
errDown = fmt.Errorf( errDown = fmt.Errorf(
"%s: storage error: notified with δ.tid not ↑ (%s -> %s)", "%s: storage error: notified with δ.tid not ↑ (%s -> %s)",
s.URL(), s.drvHead, e.Tid) s.URL(), s.head, e.Tid)
event = &EventError{errDown} event = &EventError{errDown}
} else { } else {
s.drvHead = e.Tid s.headMu.Lock()
s.head = e.Tid
s.headMu.Unlock()
} }
} }
...@@ -369,7 +422,9 @@ func (s *storage) AddWatch(watchq chan<- Event) (at0 Tid) { ...@@ -369,7 +422,9 @@ func (s *storage) AddWatch(watchq chan<- Event) (at0 Tid) {
// no longer operational: behave if watchq was registered before that // no longer operational: behave if watchq was registered before that
// and then seen down/close events. Interact with DelWatch directly. // and then seen down/close events. Interact with DelWatch directly.
case <-s.down: case <-s.down:
at0 = s.drvHead s.headMu.Lock() // shutdown may be due to Close call and watcher might be
at0 = s.head // still running - we cannot skip locking.
s.headMu.Unlock()
s.watchMu.Lock() s.watchMu.Lock()
_, already := s.watchTab[watchq] _, already := s.watchTab[watchq]
......
...@@ -115,12 +115,15 @@ func (fs *FileStorage) zerr(op string, args interface{}, err error) *zodb.OpErro ...@@ -115,12 +115,15 @@ func (fs *FileStorage) zerr(op string, args interface{}, err error) *zodb.OpErro
return &zodb.OpError{URL: fs.URL(), Op: op, Args: args, Err: err} return &zodb.OpError{URL: fs.URL(), Op: op, Args: args, Err: err}
} }
func (fs *FileStorage) LastTid(_ context.Context) (zodb.Tid, error) { func (fs *FileStorage) Sync(_ context.Context) (zodb.Tid, error) {
// FIXME: it currently does not do full sync to check data state as of Sync call time
// XXX -> move closer to watcher
fs.mu.RLock() fs.mu.RLock()
defer fs.mu.RUnlock() defer fs.mu.RUnlock()
if fs.downErr != nil { if fs.downErr != nil {
return zodb.InvalidTid, fs.zerr("last_tid", nil, fs.downErr) return zodb.InvalidTid, fs.zerr("sync", nil, fs.downErr)
} }
return fs.txnhMax.Tid, nil // txnhMax.Tid = 0, if empty return fs.txnhMax.Tid, nil // txnhMax.Tid = 0, if empty
...@@ -491,7 +494,7 @@ func (fs *FileStorage) watcher(w *fsnotify.Watcher, errFirstRead chan<- error) { ...@@ -491,7 +494,7 @@ func (fs *FileStorage) watcher(w *fsnotify.Watcher, errFirstRead chan<- error) {
} }
// if watcher failed with e.g. IO error, we no longer know what is real // if watcher failed with e.g. IO error, we no longer know what is real
// last_tid and which objects were modified after it. // head and which objects were modified after it.
// -> storage operations have to fail from now on. // -> storage operations have to fail from now on.
fs.shutdown(err) fs.shutdown(err)
......
...@@ -384,15 +384,15 @@ func TestWatch(t *testing.T) { ...@@ -384,15 +384,15 @@ func TestWatch(t *testing.T) {
} }
ctx := context.Background() ctx := context.Background()
checkLastTid := func(lastOk zodb.Tid) { checkHead := func(headOk zodb.Tid) {
t.Helper() t.Helper()
head, err := fs.LastTid(ctx); X(err) head, err := fs.Sync(ctx); X(err)
if head != lastOk { if head != headOk {
t.Fatalf("check last_tid: got %s; want %s", head, lastOk) t.Fatalf("check head: got %s; want %s", head, headOk)
} }
} }
checkLastTid(at) checkHead(at)
checkLoad := func(at zodb.Tid, oid zodb.Oid, dataOk string, serialOk zodb.Tid) { checkLoad := func(at zodb.Tid, oid zodb.Oid, dataOk string, serialOk zodb.Tid) {
t.Helper() t.Helper()
...@@ -441,7 +441,7 @@ func TestWatch(t *testing.T) { ...@@ -441,7 +441,7 @@ func TestWatch(t *testing.T) {
t.Fatalf("watch:\nhave: %s %s\nwant: %s %s", δ.Tid, δ.Changev, at, objvWant) t.Fatalf("watch:\nhave: %s %s\nwant: %s %s", δ.Tid, δ.Changev, at, objvWant)
} }
checkLastTid(at) checkHead(at)
// make sure we can load what was committed. // make sure we can load what was committed.
checkLoad(at, 0, data0, at) checkLoad(at, 0, data0, at)
...@@ -498,9 +498,9 @@ func TestOpenRecovery(t *testing.T) { ...@@ -498,9 +498,9 @@ func TestOpenRecovery(t *testing.T) {
if at0 != lastTidOk { if at0 != lastTidOk {
t.Fatalf("at0: %s ; expected: %s", at0, lastTidOk) t.Fatalf("at0: %s ; expected: %s", at0, lastTidOk)
} }
head, err := fs.LastTid(ctx); X(err) head, err := fs.Sync(ctx); X(err)
if head != lastTidOk { if head != lastTidOk {
t.Fatalf("last_tid: %s ; expected: %s", head, lastTidOk) t.Fatalf("head: %s ; expected: %s", head, lastTidOk)
} }
}) })
} }
......
...@@ -52,10 +52,25 @@ type zeo struct { ...@@ -52,10 +52,25 @@ type zeo struct {
} }
func (z *zeo) LastTid(ctx context.Context) (zodb.Tid, error) { func (z *zeo) Sync(ctx context.Context) (head zodb.Tid, err error) {
z.mu.Lock() defer func() {
defer z.mu.Unlock() if err != nil {
return z.lastTid, nil err = &zodb.OpError{URL: z.URL(), Op: "sync", Args: nil, Err: err}
}
}()
rpc := z.rpc("lastTransaction")
xhead, err := rpc.call(ctx)
if err != nil {
return zodb.InvalidTid, err
}
head, ok := tidUnpack(xhead)
if !ok {
return zodb.InvalidTid, rpc.ereplyf("got %v; expect tid", xhead)
}
return head, nil
} }
func (z *zeo) Load(ctx context.Context, xid zodb.Xid) (*mem.Buf, zodb.Tid, error) { func (z *zeo) Load(ctx context.Context, xid zodb.Xid) (*mem.Buf, zodb.Tid, error) {
......
...@@ -326,10 +326,34 @@ type IStorage interface { ...@@ -326,10 +326,34 @@ type IStorage interface {
// same as in IStorageDriver // same as in IStorageDriver
URL() string URL() string
Close() error Close() error
LastTid(context.Context) (Tid, error)
Loader Loader
Iterator Iterator
// similar to IStorage
// Sync syncs to storage and updates current view of it.
//
// After Sync, Head is guaranteed to give ID of last transaction
// committed to storage data as observed from some time _afterwards_
// Sync call was made. In particular for client-server case, Sync
// cannot retain cached view of storage and has to perform round-trip
// to the server.
Sync(context.Context) error
// Head returns ID of last committed transaction.
//
// Returned head is ID of last committed transaction as observed from
// some time _before_ Head call was made. In particular for
// client-sever case, Head can return cached view of storage that was
// learned some time ago.
//
// Head is ↑=.
//
// Head is 0 if no transactions have been committed yet.
//
// Use Sync to synchronize with the storage.
Head() Tid
// additional to IStorageDriver // additional to IStorageDriver
Prefetcher Prefetcher
Watcher Watcher
...@@ -356,12 +380,17 @@ type IStorageDriver interface { ...@@ -356,12 +380,17 @@ type IStorageDriver interface {
// Close closes storage // Close closes storage
Close() error Close() error
// LastTid returns the id of the last committed transaction. // Sync syncs to storage and returns ID of last committed transaction.
//
// Returned head is ID of last transaction committed to storage data as
// observed from some time _afterwards_ Sync call was made. In particular
// for client-server case, Sync cannot return cached view of storage
// and has to perform round-trip to the server.
// //
// If no transactions have been committed yet, LastTid returns 0. // Head is ↑=.
// //
// XXX clarify semantic XXX -> Sync + Head ? // Head is 0 if no transactions have been committed yet.
LastTid(ctx context.Context) (Tid, error) Sync(ctx context.Context) (head Tid, _ error)
Loader Loader
Iterator Iterator
......
...@@ -41,10 +41,13 @@ var infov = []struct {name string; getParam paramFunc} { ...@@ -41,10 +41,13 @@ var infov = []struct {name string; getParam paramFunc} {
}}, }},
// TODO reenable size // TODO reenable size
// {"size", func(stor zodb.IStorage) (string, error) { return stor.StorageSize(), nil }}, // {"size", func(stor zodb.IStorage) (string, error) { return stor.StorageSize(), nil }},
{"last_tid", func(ctx context.Context, stor zodb.IStorage) (string, error) { {"head", zhead},
tid, err := stor.LastTid(ctx) {"last_tid", zhead}, // last_tid is deprecated alias for head
return tid.String(), err }
}},
func zhead(ctx context.Context, stor zodb.IStorage) (string, error) {
err := stor.Sync(ctx)
return stor.Head().String(), err
} }
// {} parameter_name -> get_parameter(stor) // {} parameter_name -> get_parameter(stor)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment