Commit 028fdbf0 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 7f56c92b
...@@ -428,7 +428,6 @@ func testPersistentDB(t0 *testing.T, rawcache bool) { ...@@ -428,7 +428,6 @@ func testPersistentDB(t0 *testing.T, rawcache bool) {
at2, err := ZPyCommit(zurl, at1, _obj2); X(err) at2, err := ZPyCommit(zurl, at1, _obj2); X(err)
// new db connection should see the change // new db connection should see the change
// XXX currently there is a race because db.Open does not do proper Sync
t2 := testopen(&ConnOptions{}) t2 := testopen(&ConnOptions{})
assert.Equal(t2.conn.At(), at2) assert.Equal(t2.conn.At(), at2)
assert.Equal(db.pool, []*Connection(nil)) assert.Equal(db.pool, []*Connection(nil))
......
...@@ -155,8 +155,10 @@ type storage struct { ...@@ -155,8 +155,10 @@ type storage struct {
downErr error // reason for shutdown downErr error // reason for shutdown
// watcher // watcher
headMu sync.Mutex
head Tid // local view of storage head; mutated by watcher only headMu sync.Mutex
head Tid // local view of storage head; mutated by watcher only
drvWatchq chan Event // watchq passed to driver drvWatchq chan Event // watchq passed to driver
watchReq chan watchRequest // {Add,Del}Watch requests go here watchReq chan watchRequest // {Add,Del}Watch requests go here
watchTab map[chan<- Event]struct{} // registered watchers watchTab map[chan<- Event]struct{} // registered watchers
...@@ -191,68 +193,6 @@ func (s *storage) Close() error { ...@@ -191,68 +193,6 @@ func (s *storage) Close() error {
// loading goes through cache - this way prefetching can work // loading goes through cache - this way prefetching can work
func (s *storage) Head() Tid {
s.headMu.Lock()
head := s.head
s.headMu.Unlock()
return head
}
// XXX place -> near watcher
func (s *storage) Sync(ctx context.Context) (err error) {
defer func() {
if err != nil {
err = s.zerr("sync", nil, err)
}
}()
// XXX better -> xcontext.Merge(ctx, s.opCtx) but currently it costs 1+ goroutine
if ready(s.down) {
return s.downErr
}
head, err := s.driver.Sync(ctx)
if err != nil {
return err
}
// XXX check that driver returns head↑
// wait till .head >= head
watchq := make(chan Event)
at := s.AddWatch(watchq)
defer s.DelWatch(watchq)
for at < head {
select {
case <-ctx.Done():
return ctx.Err()
case <-s.down:
return s.downErr
case event, ok := <-watchq:
if !ok {
// closed
return s.downErr // XXX ok? sync on .down?
}
switch e := event.(type) {
default:
panic("XXX") // XXX
case *EventError:
return e.Err
case *EventCommit:
at = e.Tid
}
}
}
return nil
}
// Load implements Loader. // Load implements Loader.
func (s *storage) Load(ctx context.Context, xid Xid) (*mem.Buf, Tid, error) { func (s *storage) Load(ctx context.Context, xid Xid) (*mem.Buf, Tid, error) {
// XXX better -> xcontext.Merge(ctx, s.opCtx) but currently it costs 1+ goroutine // XXX better -> xcontext.Merge(ctx, s.opCtx) but currently it costs 1+ goroutine
...@@ -483,6 +423,70 @@ func (s *storage) DelWatch(watchq chan<- Event) { ...@@ -483,6 +423,70 @@ func (s *storage) DelWatch(watchq chan<- Event) {
} }
} }
// Head implements IStorage.
func (s *storage) Head() Tid {
s.headMu.Lock()
head := s.head
s.headMu.Unlock()
return head
}
// Sync implements IStorage.
func (s *storage) Sync(ctx context.Context) (err error) {
defer func() {
if err != nil {
err = s.zerr("sync", nil, err)
}
}()
// XXX better -> xcontext.Merge(ctx, s.opCtx) but currently it costs 1+ goroutine
if ready(s.down) {
return s.downErr
}
head, err := s.driver.Sync(ctx)
if err != nil {
return err
}
// XXX check that driver returns head↑
// wait till .head >= head
watchq := make(chan Event)
at := s.AddWatch(watchq)
defer s.DelWatch(watchq)
for at < head {
select {
case <-ctx.Done():
return ctx.Err()
case <-s.down:
return s.downErr
case event, ok := <-watchq:
if !ok {
// closed
<-s.down
return s.downErr
}
switch e := event.(type) {
default:
panic(fmt.Sprintf("unexpected event %T", e))
case *EventError:
return e.Err
case *EventCommit:
at = e.Tid
}
}
}
return nil
}
// ---- misc ---- // ---- misc ----
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment