Commit 5fc83902 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 8ee4675a
...@@ -30,6 +30,7 @@ import ( ...@@ -30,6 +30,7 @@ import (
"io" "io"
"net/url" "net/url"
"regexp" "regexp"
"sync"
"lab.nexedi.com/kirr/go123/mem" "lab.nexedi.com/kirr/go123/mem"
"lab.nexedi.com/kirr/go123/xerr" "lab.nexedi.com/kirr/go123/xerr"
...@@ -54,6 +55,10 @@ type Storage struct { ...@@ -54,6 +55,10 @@ type Storage struct {
watchWG *xsync.WorkGroup watchWG *xsync.WorkGroup
watchCancel func() watchCancel func()
downOnce sync.Once
down chan struct{} // ready when storage is down
downErr error // reason for shutdown
} }
// baseMutatedError is reported when Storage.base is detected to change. // baseMutatedError is reported when Storage.base is detected to change.
...@@ -78,6 +83,7 @@ func (e *baseError) Error() string { ...@@ -78,6 +83,7 @@ func (e *baseError) Error() string {
func (e *baseError) Cause() error { return e.err } func (e *baseError) Cause() error { return e.err }
func (e *baseError) Unwrap() error { return e.err } func (e *baseError) Unwrap() error { return e.err }
// watcher detects base mutation and proxies δ events to user watchq. // watcher detects base mutation and proxies δ events to user watchq.
// it runs as separate goroutine. // it runs as separate goroutine.
func (d *Storage) watcher(ctx context.Context) error { func (d *Storage) watcher(ctx context.Context) error {
...@@ -111,11 +117,11 @@ func (d *Storage) watcher(ctx context.Context) error { ...@@ -111,11 +117,11 @@ func (d *Storage) watcher(ctx context.Context) error {
edown = fmt.Errorf("base: unexpected event %T", event) edown = fmt.Errorf("base: unexpected event %T", event)
} }
d.shutdown(edown)
ev := &zodb.EventError{edown} // XXX + context ev := &zodb.EventError{edown} // XXX + context
if d.watchq != nil { if d.watchq != nil {
d.watchq <- ev d.watchq <- ev
} }
// XXX d.shutdown(edown)
return edown return edown
// event on δ -> proxy to user // event on δ -> proxy to user
...@@ -132,6 +138,15 @@ func (d *Storage) watcher(ctx context.Context) error { ...@@ -132,6 +138,15 @@ func (d *Storage) watcher(ctx context.Context) error {
} }
} }
// shutdown marks Storage as no longer being operational due to reason.
func (d *Storage) shutdown(reason error) {
d.downOnce.Do(func() {
d.downErr = reason
close(d.down)
})
}
var errClosed = errors.New("storage is closed")
// Close implements zodb.IStorageDriver . // Close implements zodb.IStorageDriver .
func (d *Storage) Close() (err error) { func (d *Storage) Close() (err error) {
...@@ -141,6 +156,7 @@ func (d *Storage) Close() (err error) { ...@@ -141,6 +156,7 @@ func (d *Storage) Close() (err error) {
} }
}() }()
d.shutdown(errClosed)
errδ := d.δ.Close() errδ := d.δ.Close()
errBase := d.base.Close() errBase := d.base.Close()
...@@ -200,6 +216,15 @@ func (d *Storage) Load(ctx context.Context, xid zodb.Xid) (_ *mem.Buf, _ zodb.Ti ...@@ -200,6 +216,15 @@ func (d *Storage) Load(ctx context.Context, xid zodb.Xid) (_ *mem.Buf, _ zodb.Ti
} }
}() }()
if ready(d.down) {
return nil, zodb.InvalidTid, d.downErr
}
var eNoData *zodb.NoDataError
var eNoObject *zodb.NoObjectError
inδ := false
if xid.At > d.baseAt0 {
data, serial, err := d.δ.Load(ctx, xid) data, serial, err := d.δ.Load(ctx, xid)
if err == nil { if err == nil {
// object data is present in δ // object data is present in δ
...@@ -207,9 +232,6 @@ func (d *Storage) Load(ctx context.Context, xid zodb.Xid) (_ *mem.Buf, _ zodb.Ti ...@@ -207,9 +232,6 @@ func (d *Storage) Load(ctx context.Context, xid zodb.Xid) (_ *mem.Buf, _ zodb.Ti
} }
useBase := false useBase := false
inδ := false
var eNoData *zodb.NoDataError
var eNoObject *zodb.NoObjectError
switch { switch {
case errors.As(err, &eNoData): case errors.As(err, &eNoData):
if eNoData.DeletedAt != 0 { if eNoData.DeletedAt != 0 {
...@@ -229,9 +251,16 @@ func (d *Storage) Load(ctx context.Context, xid zodb.Xid) (_ *mem.Buf, _ zodb.Ti ...@@ -229,9 +251,16 @@ func (d *Storage) Load(ctx context.Context, xid zodb.Xid) (_ *mem.Buf, _ zodb.Ti
if !useBase { if !useBase {
return data, serial, err return data, serial, err
} }
}
// cap .at in xid to .baseAt0 (we convert it back on error return, and
// it makes more robust wrt simultaneous base mutation).
xidBase := xid
if xid.At > d.baseAt0 {
xidBase.At = d.baseAt0
}
// XXX cap .at in xid to .baseAt0 ? (and convert back on error return) data, serial, err := d.base.Load(ctx, xidBase)
data, serial, err = d.base.Load(ctx, xid)
if err == nil { if err == nil {
return data, serial, nil return data, serial, nil
} }
...@@ -350,6 +379,8 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb ...@@ -350,6 +379,8 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb
baseWatchq: baseWatchq, baseWatchq: baseWatchq,
δWatchq: δWatchq, δWatchq: δWatchq,
watchq: opt.Watchq, watchq: opt.Watchq,
down: make(chan struct{}),
} }
// spawn watcher to listen on baseWatchq and shutdown storage if base changes. // spawn watcher to listen on baseWatchq and shutdown storage if base changes.
...@@ -364,3 +395,16 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb ...@@ -364,3 +395,16 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb
func init() { func init() {
zodb.RegisterDriver("demo", openByURL) zodb.RegisterDriver("demo", openByURL)
} }
// misc
// ready returns whether c is ready.
func ready(c <-chan struct{}) bool {
select {
case <-c:
return true
default:
return false
}
}
...@@ -240,7 +240,7 @@ func TestWatchLoad_vs_BaseMutate(t *testing.T) { ...@@ -240,7 +240,7 @@ func TestWatchLoad_vs_BaseMutate(t *testing.T) {
t.Fatalf("after base mutate: load: unexpected error:\nhave: %s\nwant: %s", t.Fatalf("after base mutate: load: unexpected error:\nhave: %s\nwant: %s",
err, errOk) err, errOk)
} }
if !(data == nil && serial == 0) { if !(data == nil && serial == zodb.InvalidTid) {
t.Fatalf("after base mutate: load: unexpected data=%v serial=%v", data, serial) t.Fatalf("after base mutate: load: unexpected data=%v serial=%v", data, serial)
} }
}) })
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment