Commit 92543fae authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 311f2cc0
...@@ -44,16 +44,66 @@ type Storage struct { ...@@ -44,16 +44,66 @@ type Storage struct {
δ zodb.IStorageDriver δ zodb.IStorageDriver
baseAt0 zodb.Tid baseAt0 zodb.Tid
baseWatchq <-chan zodb.Event
δWatchq <-chan zodb.Event // nil if demo is opened with watchq=nil
watchq chan<- zodb.Event // user requested to deliver events here
watchWG *xsync.WorkGroup
watchCancel func()
} }
// baseMutatedError is reported when Storage.base is detected to change. // baseMutatedError is reported when Storage.base is detected to change.
type baseMutatedError struct { type baseMutatedError struct {
baseAt0 zodb.Tid baseAt0 zodb.Tid
baseHead zodb.Tid // baseHead zodb.Tid
baseEvent zodb.Event
} }
func (e *baseMutatedError) Error() string { func (e *baseMutatedError) Error() string {
return fmt.Sprintf("base.head mutated from @%s to @%s", e.baseAt0, e.baseHead) // return fmt.Sprintf("base.head mutated from @%s to @%s", e.baseAt0, e.baseHead)
return fmt.Sprintf("base mutated from @%s: %s", e.baseAt0, e.baseEvent)
}
// watcher detects base mutation and proxies δ events to user.
// it runs as separate goroutine.
func (d *Storage) watcher(ctx context.Context) error {
if d.watchq != nil {
defer close(d.watchq)
}
for {
select {
// Close requests to stop watching
case <-ctx.Done():
return ctx.Err()
// event on base -> base mutated + shutdown
case event, ok := <-d.baseWatchq:
if !ok {
// base closed
d.baseWatchq = nil
continue
}
edown := &baseMutatedError{d.baseAt0, event}
ev := &zodb.EventError{edown} // XXX + context
if d.watchq != nil {
d.watchq <- ev
}
// XXX d.shutdown(edown)
return edown
// event on δ -> proxy to user
case event, ok := <-d.δWatchq:
if !ok {
// δ closed
d.δWatchq = nil
continue
}
d.watchq <- event // !nil becase d.δWatchq != nil
}
}
} }
...@@ -65,10 +115,11 @@ func (d *Storage) Close() (err error) { ...@@ -65,10 +115,11 @@ func (d *Storage) Close() (err error) {
} }
}() }()
err1 := d.δ.Close() errδ := d.δ.Close()
err2 := d.base.Close() errBase := d.base.Close()
// XXX close base watcher d.watchCancel()
return xerr.Merge(err1, err2) errWatch := d.watchWG.Wait()
return xerr.Merge(errWatch, errδ, errBase)
} }
// Sync implements zodb.IStorageDriver . // Sync implements zodb.IStorageDriver .
...@@ -92,7 +143,7 @@ func (d *Storage) Sync(ctx context.Context) (_ zodb.Tid, err error) { ...@@ -92,7 +143,7 @@ func (d *Storage) Sync(ctx context.Context) (_ zodb.Tid, err error) {
return err return err
} }
if baseHead != d.baseAt0 { if baseHead != d.baseAt0 {
return &baseMutatedError{d.baseAt0, baseHead} return &baseMutatedError{d.baseAt0, fmt.Errorf("to @%s", baseHead)}
} }
return nil return nil
}) })
...@@ -214,8 +265,14 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb ...@@ -214,8 +265,14 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb
} }
}() }()
// open δ - as requested // open δ - as requested but with events going through us
δ, δAt0, err := zodb.OpenDriver(ctx, δZURL, opt) δopt := *opt
var δWatchq chan zodb.Event
if δopt.Watchq != nil {
δWatchq = make(chan zodb.Event)
δopt.Watchq = δWatchq
}
δ, δAt0, err := zodb.OpenDriver(ctx, δZURL, &δopt)
if err != nil { if err != nil {
return nil, zodb.InvalidTid, err return nil, zodb.InvalidTid, err
} }
...@@ -226,6 +283,8 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb ...@@ -226,6 +283,8 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb
} }
}() }()
// XXX read watchqs while we verify? (not to deadlock)
// verify that either // verify that either
// - δ is all empty (just created), or // - δ is all empty (just created), or
// - all δ transactions come strictly after base. // - all δ transactions come strictly after base.
...@@ -245,21 +304,29 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb ...@@ -245,21 +304,29 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb
return nil, zodb.InvalidTid, err return nil, zodb.InvalidTid, err
} }
// there is a δ transaction ∈ [δAt0, baseAt0) // there is a δ transaction ∈ [δAt0, baseAt0)
// TODO iδ.Stop() // TODO iδ.Close()
return nil, zodb.InvalidTid, fmt.Errorf("base overlaps with δ: base.head=%s δ.tail=%s", baseAt0, δtxni.Tid) return nil, zodb.InvalidTid, fmt.Errorf("base overlaps with δ: base.head=%s δ.tail=%s", baseAt0, δtxni.Tid)
} }
at0 = δAt0 at0 = δAt0
} }
// XXX listen on baseWatchq and shutdown storage if base changes.
d := &Storage{ d := &Storage{
base: base, base: base,
δ : δ, δ : δ,
baseAt0: baseAt0, baseAt0: baseAt0,
baseWatchq: baseWatchq,
δWatchq: δWatchq,
watchq: opt.Watchq,
} }
// spawn watcher to listen on baseWatchq and shutdown storage if base changes.
ctx, cancel := context.WithCancel(context.Background())
d.watchWG = xsync.NewWorkGroup(ctx)
d.watchCancel = cancel
d.watchWG.Go(d.watcher)
return d, at0, nil return d, at0, nil
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment