Commit 5886aad3 authored by Kirill Smelkov's avatar Kirill Smelkov

X zodb: Clarify Load interface

- Load does not return "next serial"
- Load always returns LoadError wrapping error cause
- Load: clarified & documented semantic for "there-is-no-data" results
- ErrOidMissing -> NoObjectError (just rename)
- ErrXidMissing -> NoDataError   (rename + adjust information inside)

  inside: xid -> oid + deletedAt

  deletedAt is needed if storages are stacked (e.g. in DemoStorage) in case
  there is no data for oid/xid in "changes" to find out whether in "changes" oid
  was created after searched xid.At (and then look into "base"), or it was
  already deleted in "changes" and then we must not look into "base".

  see: kirr/ZODB@a762e2f8

- URL IStorage -> IStorageDriver (so that raw driver can properly return
  LoadError from its load)
parent 87b1102c
...@@ -388,20 +388,9 @@ func (c *Client) LastOid(ctx context.Context) (zodb.Oid, error) { ...@@ -388,20 +388,9 @@ func (c *Client) LastOid(ctx context.Context) (zodb.Oid, error) {
func (c *Client) Load(ctx context.Context, xid zodb.Xid) (buf *mem.Buf, serial zodb.Tid, err error) { func (c *Client) Load(ctx context.Context, xid zodb.Xid) (buf *mem.Buf, serial zodb.Tid, err error) {
// defer func() ... // defer func() ...
buf, serial, err = c._Load(ctx, xid) buf, serial, err = c._Load(ctx, xid)
if err != nil {
switch err.(type) { err = &zodb.LoadError{URL: c.URL(), Xid: xid, Err: err}
case nil:
// ok (avoid allocation in xerr.Contextf() call for no-error case)
// keep zodb errors intact
// XXX ok? or requre users always call Cause?
case *zodb.ErrOidMissing:
case *zodb.ErrXidMissing:
default:
xerr.Contextf(&err, "client: load %v", xid)
} }
return buf, serial, err return buf, serial, err
} }
...@@ -521,6 +510,13 @@ func openClientByURL(ctx context.Context, u *url.URL, opt *zodb.OpenOptions) (zo ...@@ -521,6 +510,13 @@ func openClientByURL(ctx context.Context, u *url.URL, opt *zodb.OpenOptions) (zo
return c, nil return c, nil
} }
func (c *Client) URL() string {
// XXX neos:// depending whether it was tls
// XXX options if such were given to open are discarded
// (but we need to be able to contruct URL if Client was created via NewClient directly)
return fmt.Sprintf("neo://%s@%s", c.node.ClusterName, c.node.MasterAddr)
}
func init() { func init() {
zodb.RegisterDriver("neo", openClientByURL) zodb.RegisterDriver("neo", openClientByURL)
} }
...@@ -21,6 +21,8 @@ package neo ...@@ -21,6 +21,8 @@ package neo
// error related utilities // error related utilities
import ( import (
"strings"
"lab.nexedi.com/kirr/neo/go/zodb" "lab.nexedi.com/kirr/neo/go/zodb"
) )
...@@ -33,11 +35,14 @@ func ErrEncode(err error) *Error { ...@@ -33,11 +35,14 @@ func ErrEncode(err error) *Error {
switch err := err.(type) { switch err := err.(type) {
case *Error: case *Error:
return err return err
case *zodb.ErrXidMissing: case *zodb.NoDataError:
// XXX abusing message for xid // XXX abusing message for oid, deletedAt
return &Error{Code: OID_NOT_FOUND, Message: err.Xid.String()} return &Error{
Code: OID_NOT_FOUND,
Message: err.Oid.String() + "," + err.DeletedAt.String(),
}
case *zodb.ErrOidMissing: case *zodb.NoObjectError:
// XXX abusing message for oid // XXX abusing message for oid
return &Error{Code: OID_DOES_NOT_EXIST, Message: err.Oid.String()} return &Error{Code: OID_DOES_NOT_EXIST, Message: err.Oid.String()}
...@@ -52,15 +57,22 @@ func ErrEncode(err error) *Error { ...@@ -52,15 +57,22 @@ func ErrEncode(err error) *Error {
func ErrDecode(e *Error) error { func ErrDecode(e *Error) error {
switch e.Code { switch e.Code {
case OID_NOT_FOUND: case OID_NOT_FOUND:
xid, err := zodb.ParseXid(e.Message) // XXX abusing message for xid // XXX abusing message for oid, deletedAt
if err == nil { argv := strings.Split(e.Message, ",")
return &zodb.ErrXidMissing{xid} if len(argv) != 2 {
break
}
oid, err0 := zodb.ParseOid(argv[0])
del, err1 := zodb.ParseTid(argv[1])
if !(err0 == nil && err1 == nil) {
break
} }
return &zodb.NoDataError{Oid: oid, DeletedAt: del}
case OID_DOES_NOT_EXIST: case OID_DOES_NOT_EXIST:
oid, err := zodb.ParseOid(e.Message) // XXX abusing message for oid oid, err := zodb.ParseOid(e.Message) // XXX abusing message for oid
if err == nil { if err == nil {
return &zodb.ErrOidMissing{oid} return &zodb.NoObjectError{oid}
} }
} }
......
...@@ -643,8 +643,8 @@ type GetObject struct { ...@@ -643,8 +643,8 @@ type GetObject struct {
type AnswerObject struct { type AnswerObject struct {
Oid zodb.Oid Oid zodb.Oid
Serial zodb.Tid // XXX strictly is SerialStart/SerialEnd in proto.py Serial zodb.Tid
NextSerial zodb.Tid // XXX but there it is out of sync NextSerial zodb.Tid
Compression bool Compression bool
Checksum Checksum Checksum Checksum
Data *mem.Buf // TODO encode -> separately (for writev) Data *mem.Buf // TODO encode -> separately (for writev)
......
...@@ -515,10 +515,16 @@ func TestMasterStorage(t *testing.T) { ...@@ -515,10 +515,16 @@ func TestMasterStorage(t *testing.T) {
} }
} else { } else {
// deleted // deleted
errWant := &zodb.ErrXidMissing{xid} errWant := &zodb.LoadError{
URL: C.URL(),
Xid: xid,
Err: &zodb.NoDataError{Oid: xid.Oid, DeletedAt: datai.Tid},
}
if !(buf == nil && serial == 0 && reflect.DeepEqual(err, errWant)) { if !(buf == nil && serial == 0 && reflect.DeepEqual(err, errWant)) {
t.Fatalf("load: %v:\nhave: %v, %#v, %#v\nwant: %v, %#v, %#v", t.Fatalf("load: %v ->\nbuf:\n%s\nserial:\n%s\nerr:\n%s\n", xid,
xid, serial, err, buf, zodb.Tid(0), errWant, []byte(nil)) pretty.Compare(nil, buf),
pretty.Compare(0, serial),
pretty.Compare(errWant, err))
} }
} }
} }
......
...@@ -551,14 +551,18 @@ func (stor *Storage) serveClient1(ctx context.Context, req neo.Msg) (resp neo.Ms ...@@ -551,14 +551,18 @@ func (stor *Storage) serveClient1(ctx context.Context, req neo.Msg) (resp neo.Ms
buf, serial, err := stor.zstor.Load(ctx, xid) buf, serial, err := stor.zstor.Load(ctx, xid)
if err != nil { if err != nil {
// translate err to NEO protocol error codes // translate err to NEO protocol error codes
return neo.ErrEncode(err) e := err.(*zodb.LoadError) // XXX move this to ErrEncode?
return neo.ErrEncode(e.Err)
} }
// compatibility with py side:
// for loadSerial - check we have exact hit - else "nodata" // for loadSerial - check we have exact hit - else "nodata"
if req.Serial != neo.INVALID_TID { if req.Serial != neo.INVALID_TID {
if serial != req.Serial { if serial != req.Serial {
// XXX actually show in error it was strict "=" load return &neo.Error{
return neo.ErrEncode(&zodb.ErrXidMissing{xid}) Code: neo.OID_NOT_FOUND,
Message: fmt.Sprintf("%s: no data with serial %s", xid.Oid, req.Serial),
}
} }
} }
......
...@@ -40,7 +40,10 @@ import ( ...@@ -40,7 +40,10 @@ import (
// Cache provides RAM caching layer that can be used over a storage. // Cache provides RAM caching layer that can be used over a storage.
type Cache struct { type Cache struct {
loader StorLoader loader interface {
StorLoader
URL() string
}
mu sync.RWMutex mu sync.RWMutex
...@@ -125,7 +128,7 @@ type StorLoader interface { ...@@ -125,7 +128,7 @@ type StorLoader interface {
// NewCache creates new cache backed up by loader. // NewCache creates new cache backed up by loader.
// //
// The cache will use not more than ~ sizeMax bytes of RAM for cached data. // The cache will use not more than ~ sizeMax bytes of RAM for cached data.
func NewCache(loader StorLoader, sizeMax int) *Cache { func NewCache(loader interface { StorLoader; URL() string }, sizeMax int) *Cache {
c := &Cache{ c := &Cache{
loader: loader, loader: loader,
entryMap: make(map[Oid]*oidCacheEntry), entryMap: make(map[Oid]*oidCacheEntry),
...@@ -166,7 +169,7 @@ func (c *Cache) Load(ctx context.Context, xid Xid) (buf *mem.Buf, serial Tid, er ...@@ -166,7 +169,7 @@ func (c *Cache) Load(ctx context.Context, xid Xid) (buf *mem.Buf, serial Tid, er
} }
if rce.err != nil { if rce.err != nil {
return nil, 0, rce.userErr(xid) return nil, 0, &LoadError{URL: c.loader.URL(), Xid: xid, Err: rce.err}
} }
return rce.buf, rce.serial, nil return rce.buf, rce.serial, nil
...@@ -300,7 +303,12 @@ func (c *Cache) loadRCE(ctx context.Context, rce *revCacheEntry) { ...@@ -300,7 +303,12 @@ func (c *Cache) loadRCE(ctx context.Context, rce *revCacheEntry) {
// normalize buf/serial if it was error // normalize buf/serial if it was error
if err != nil { if err != nil {
// XXX err == canceled? -> ? e := err.(*LoadError) // XXX better driver return *LoadError explicitly
// only remember problem cause - full LoadError will be
// reconstructed in Load with actual requested there xid.
err = e.Err
// TODO err == canceled? -> don't remember
buf.XRelease() buf.XRelease()
buf = nil buf = nil
serial = 0 serial = 0
...@@ -563,8 +571,8 @@ func isErrNoData(err error) bool { ...@@ -563,8 +571,8 @@ func isErrNoData(err error) bool {
default: default:
return false return false
case *ErrOidMissing: case *NoObjectError:
case *ErrXidMissing: case *NoDataError:
} }
return true return true
} }
...@@ -634,23 +642,6 @@ func (rce *revCacheEntry) loaded() bool { ...@@ -634,23 +642,6 @@ func (rce *revCacheEntry) loaded() bool {
return (rce.waitBufRef == -1) return (rce.waitBufRef == -1)
} }
// userErr returns error that, if any, needs to be returned to user from Cache.Load
//
// ( ErrXidMissing contains xid for which it is missing. In cache we keep such
// xid with max .head but users need to get ErrXidMissing with their own query )
//
// rce must be loaded.
func (rce *revCacheEntry) userErr(xid Xid) error {
switch e := rce.err.(type) {
case *ErrXidMissing:
if e.Xid != xid {
return &ErrXidMissing{xid}
}
}
return rce.err
}
// list head that knows it is in revCacheEntry.inLRU // list head that knows it is in revCacheEntry.inLRU
type lruHead struct { type lruHead struct {
list.Head list.Head
......
...@@ -66,13 +66,24 @@ func bufSame(buf1, buf2 *mem.Buf) bool { ...@@ -66,13 +66,24 @@ func bufSame(buf1, buf2 *mem.Buf) bool {
return reflect.DeepEqual(buf1.Data, buf2.Data) return reflect.DeepEqual(buf1.Data, buf2.Data)
} }
func (stor *tStorage) URL() string {
return "test"
}
func (stor *tStorage) Load(_ context.Context, xid Xid) (buf *mem.Buf, serial Tid, err error) { func (stor *tStorage) Load(_ context.Context, xid Xid) (buf *mem.Buf, serial Tid, err error) {
//fmt.Printf("> load(%v)\n", xid) //fmt.Printf("> load(%v)\n", xid)
//defer func() { fmt.Printf("< %v, %v, %v\n", buf.XData(), serial, err) }() //defer func() { fmt.Printf("< %v, %v, %v\n", buf.XData(), serial, err) }()
buf, serial, err = stor.load(xid)
if err != nil {
err = &LoadError{URL: stor.URL(), Xid: xid, Err: err}
}
return buf, serial, err
}
func (stor *tStorage) load(xid Xid) (buf *mem.Buf, serial Tid, err error) {
datav := stor.dataMap[xid.Oid] datav := stor.dataMap[xid.Oid]
if datav == nil { if datav == nil {
return nil, 0, &ErrOidMissing{xid.Oid} return nil, 0, &NoObjectError{xid.Oid}
} }
// find max entry with .serial <= xid.At // find max entry with .serial <= xid.At
...@@ -85,7 +96,7 @@ func (stor *tStorage) Load(_ context.Context, xid Xid) (buf *mem.Buf, serial Tid ...@@ -85,7 +96,7 @@ func (stor *tStorage) Load(_ context.Context, xid Xid) (buf *mem.Buf, serial Tid
//fmt.Printf("i: %d n: %d\n", i, n) //fmt.Printf("i: %d n: %d\n", i, n)
if i == -1 { if i == -1 {
// xid.At < all .serial - no such transaction // xid.At < all .serial - no such transaction
return nil, 0, &ErrXidMissing{xid} return nil, 0, &NoDataError{Oid: xid.Oid, DeletedAt: 0}
} }
s, e := datav[i].serial, datav[i].err s, e := datav[i].serial, datav[i].err
...@@ -102,6 +113,10 @@ func xidat(oid Oid, tid Tid) Xid { ...@@ -102,6 +113,10 @@ func xidat(oid Oid, tid Tid) Xid {
return Xid{Oid: oid, At: tid} return Xid{Oid: oid, At: tid}
} }
func nodata(oid Oid, deletedAt Tid) *NoDataError {
return &NoDataError{Oid: oid, DeletedAt: deletedAt}
}
func TestCache(t *testing.T) { func TestCache(t *testing.T) {
// XXX hack; place=ok? // XXX hack; place=ok?
pretty.CompareConfig.PrintStringers = true pretty.CompareConfig.PrintStringers = true
...@@ -135,7 +150,7 @@ func TestCache(t *testing.T) { ...@@ -135,7 +150,7 @@ func TestCache(t *testing.T) {
c := NewCache(tstor, 100 /* > Σ all data */) c := NewCache(tstor, 100 /* > Σ all data */)
ctx := context.Background() ctx := context.Background()
checkLoad := func(xid Xid, buf *mem.Buf, serial Tid, err error) { checkLoad := func(xid Xid, buf *mem.Buf, serial Tid, errCause error) {
t.Helper() t.Helper()
bad := &bytes.Buffer{} bad := &bytes.Buffer{}
b, s, e := c.Load(ctx, xid) b, s, e := c.Load(ctx, xid)
...@@ -145,6 +160,11 @@ func TestCache(t *testing.T) { ...@@ -145,6 +160,11 @@ func TestCache(t *testing.T) {
if serial != s { if serial != s {
fmt.Fprintf(bad, "serial:\n%s\n", pretty.Compare(serial, s)) fmt.Fprintf(bad, "serial:\n%s\n", pretty.Compare(serial, s))
} }
var err error
if errCause != nil {
err = &LoadError{URL: "test", Xid: xid, Err: errCause}
}
if !reflect.DeepEqual(err, e) { if !reflect.DeepEqual(err, e) {
fmt.Fprintf(bad, "err:\n%s\n", pretty.Compare(err, e)) fmt.Fprintf(bad, "err:\n%s\n", pretty.Compare(err, e))
} }
...@@ -241,29 +261,29 @@ func TestCache(t *testing.T) { ...@@ -241,29 +261,29 @@ func TestCache(t *testing.T) {
checkMRU(0) checkMRU(0)
// load @2 -> new rce entry // load @2 -> new rce entry
checkLoad(xidat(1,2), nil, 0, &ErrXidMissing{xidat(1,2)}) checkLoad(xidat(1,2), nil, 0, nodata(1,0))
checkOIDV(1) checkOIDV(1)
oce1 := c.entryMap[1] oce1 := c.entryMap[1]
ok1(len(oce1.rcev) == 1) ok1(len(oce1.rcev) == 1)
rce1_h2 := oce1.rcev[0] rce1_h2 := oce1.rcev[0]
checkRCE(rce1_h2, 2, 0, nil, &ErrXidMissing{xidat(1,2)}) checkRCE(rce1_h2, 2, 0, nil, nodata(1,0))
checkMRU(0, rce1_h2) checkMRU(0, rce1_h2)
// load @3 -> 2] merged with 3] // load @3 -> 2] merged with 3]
checkLoad(xidat(1,3), nil, 0, &ErrXidMissing{xidat(1,3)}) checkLoad(xidat(1,3), nil, 0, nodata(1,0))
checkOIDV(1) checkOIDV(1)
ok1(len(oce1.rcev) == 1) ok1(len(oce1.rcev) == 1)
rce1_h3 := oce1.rcev[0] rce1_h3 := oce1.rcev[0]
ok1(rce1_h3 != rce1_h2) // rce1_h2 was merged into rce1_h3 ok1(rce1_h3 != rce1_h2) // rce1_h2 was merged into rce1_h3
checkRCE(rce1_h3, 3, 0, nil, &ErrXidMissing{xidat(1,3)}) checkRCE(rce1_h3, 3, 0, nil, nodata(1,0))
checkMRU(0, rce1_h3) checkMRU(0, rce1_h3)
// load @1 -> 1] merged with 3] // load @1 -> 1] merged with 3]
checkLoad(xidat(1,1), nil, 0, &ErrXidMissing{xidat(1,1)}) checkLoad(xidat(1,1), nil, 0, nodata(1,0))
checkOIDV(1) checkOIDV(1)
ok1(len(oce1.rcev) == 1) ok1(len(oce1.rcev) == 1)
ok1(oce1.rcev[0] == rce1_h3) ok1(oce1.rcev[0] == rce1_h3)
checkRCE(rce1_h3, 3, 0, nil, &ErrXidMissing{xidat(1,3)}) checkRCE(rce1_h3, 3, 0, nil, nodata(1,0))
checkMRU(0, rce1_h3) checkMRU(0, rce1_h3)
// load @5 -> new rce entry with data // load @5 -> new rce entry with data
...@@ -620,6 +640,10 @@ func (c *Checker) assertEq(a, b interface{}) { ...@@ -620,6 +640,10 @@ func (c *Checker) assertEq(a, b interface{}) {
type noopStorage struct {} type noopStorage struct {}
var noopData = []byte{0} var noopData = []byte{0}
func (s *noopStorage) URL() string {
return "noop"
}
func (s *noopStorage) Load(_ context.Context, xid Xid) (buf *mem.Buf, serial Tid, err error) { func (s *noopStorage) Load(_ context.Context, xid Xid) (buf *mem.Buf, serial Tid, err error) {
return mkbuf(noopData), 1, nil return mkbuf(noopData), 1, nil
} }
......
...@@ -84,7 +84,8 @@ func OpenStorage(ctx context.Context, storageURL string, opt *OpenOptions) (ISto ...@@ -84,7 +84,8 @@ func OpenStorage(ctx context.Context, storageURL string, opt *OpenOptions) (ISto
IStorageDriver: storDriver, IStorageDriver: storDriver,
// small cache so that prefetch can work for loading // small cache so that prefetch can work for loading
l1cache: NewCache(storDriver, 128 * 4*1024), // XXX 512K hardcoded (= ~ 128 · 4K-entries) // XXX 512K hardcoded (= ~ 128 · 4K-entries)
l1cache: NewCache(storDriver, 128 * 4*1024),
}, nil }, nil
} }
...@@ -97,21 +98,17 @@ func OpenStorage(ctx context.Context, storageURL string, opt *OpenOptions) (ISto ...@@ -97,21 +98,17 @@ func OpenStorage(ctx context.Context, storageURL string, opt *OpenOptions) (ISto
type storage struct { type storage struct {
IStorageDriver IStorageDriver
l1cache *Cache l1cache *Cache
url string // URL this storage was opened via
} }
// loading always goes through cache - this way prefetching can work // loading always goes through cache - this way prefetching can work
func (s *storage) Load(ctx context.Context, xid Xid) (*mem.Buf, Tid, error) { func (s *storage) Load(ctx context.Context, xid Xid) (*mem.Buf, Tid, error) {
// XXX here: offload xid validation from cache and driver ?
// XXX here: offload wrapping err -> LoadError{err} ?
return s.l1cache.Load(ctx, xid) return s.l1cache.Load(ctx, xid)
} }
func (s *storage) Prefetch(ctx context.Context, xid Xid) { func (s *storage) Prefetch(ctx context.Context, xid Xid) {
s.l1cache.Prefetch(ctx, xid) s.l1cache.Prefetch(ctx, xid)
} }
func (s *storage) URL() string {
return s.url
}
...@@ -107,20 +107,10 @@ func (fs *FileStorage) LastOid(_ context.Context) (zodb.Oid, error) { ...@@ -107,20 +107,10 @@ func (fs *FileStorage) LastOid(_ context.Context) (zodb.Oid, error) {
return lastOid, nil return lastOid, nil
} }
// ErrXidLoad is returned when there is an error while loading xid func (fs *FileStorage) URL() string {
// XXX -> zodb (common bits) return fs.file.Name()
type ErrXidLoad struct {
Xid zodb.Xid
Err error
} }
func (e *ErrXidLoad) Error() string {
return fmt.Sprintf("loading %v: %v", e.Xid, e.Err)
}
// XXX +Cause
// freelist(DataHeader) // freelist(DataHeader)
var dhPool = sync.Pool{New: func() interface{} { return &DataHeader{} }} var dhPool = sync.Pool{New: func() interface{} { return &DataHeader{} }}
...@@ -137,14 +127,22 @@ func (dh *DataHeader) Free() { ...@@ -137,14 +127,22 @@ func (dh *DataHeader) Free() {
} }
func (fs *FileStorage) Load(_ context.Context, xid zodb.Xid) (buf *mem.Buf, serial zodb.Tid, err error) { func (fs *FileStorage) Load(_ context.Context, xid zodb.Xid) (buf *mem.Buf, serial zodb.Tid, err error) {
// FIXME zodb.TidMax is only 7fff... tid from outside can be ffff...
// -> TODO reject tid out of range
buf, serial, err = fs.load(xid)
if err != nil {
err = &zodb.LoadError{URL: fs.URL(), Xid: xid, Err: err}
}
return buf, serial, err
}
func (fs *FileStorage) load(xid zodb.Xid) (buf *mem.Buf, serial zodb.Tid, err error) {
// lookup in index position of oid data record within latest transaction which changed this oid // lookup in index position of oid data record within latest transaction which changed this oid
dataPos, ok := fs.index.Get(xid.Oid) dataPos, ok := fs.index.Get(xid.Oid)
if !ok { if !ok {
return nil, 0, &zodb.ErrOidMissing{Oid: xid.Oid} return nil, 0, &zodb.NoObjectError{Oid: xid.Oid}
} }
// FIXME zodb.TidMax is only 7fff... tid from outside can be ffff... -> TODO reject tid out of range
// XXX go compiler cannot deduce dh should be on stack here // XXX go compiler cannot deduce dh should be on stack here
//dh := DataHeader{Oid: xid.Oid, Tid: zodb.TidMax, PrevRevPos: dataPos} //dh := DataHeader{Oid: xid.Oid, Tid: zodb.TidMax, PrevRevPos: dataPos}
dh := DataHeaderAlloc() dh := DataHeaderAlloc()
...@@ -152,21 +150,19 @@ func (fs *FileStorage) Load(_ context.Context, xid zodb.Xid) (buf *mem.Buf, seri ...@@ -152,21 +150,19 @@ func (fs *FileStorage) Load(_ context.Context, xid zodb.Xid) (buf *mem.Buf, seri
dh.Tid = zodb.TidMax dh.Tid = zodb.TidMax
dh.PrevRevPos = dataPos dh.PrevRevPos = dataPos
//defer dh.Free() //defer dh.Free()
buf, serial, err = fs._Load(dh, xid) buf, serial, err = fs._load(dh, xid)
dh.Free() dh.Free()
return buf, serial, err return buf, serial, err
} }
func (fs *FileStorage) _Load(dh *DataHeader, xid zodb.Xid) (*mem.Buf, zodb.Tid, error) { func (fs *FileStorage) _load(dh *DataHeader, xid zodb.Xid) (*mem.Buf, zodb.Tid, error) {
// search backwards for when we first have data record with tid satisfying xid.At // search backwards for when we first have data record with tid satisfying xid.At
for { for {
err := dh.LoadPrevRev(fs.file) err := dh.LoadPrevRev(fs.file)
if err != nil { if err != nil {
if err == io.EOF { if err == io.EOF {
// no such oid revision // object was created after xid.At
err = &zodb.ErrXidMissing{Xid: xid} err = &zodb.NoDataError{Oid: xid.Oid, DeletedAt: 0}
} else {
err = &ErrXidLoad{xid, err}
} }
return nil, 0, err return nil, 0, err
...@@ -183,11 +179,11 @@ func (fs *FileStorage) _Load(dh *DataHeader, xid zodb.Xid) (*mem.Buf, zodb.Tid, ...@@ -183,11 +179,11 @@ func (fs *FileStorage) _Load(dh *DataHeader, xid zodb.Xid) (*mem.Buf, zodb.Tid,
buf, err := dh.LoadData(fs.file) buf, err := dh.LoadData(fs.file)
if err != nil { if err != nil {
return nil, 0, &ErrXidLoad{xid, err} return nil, 0, err
} }
if buf.Data == nil { if buf.Data == nil {
// data was deleted // object was deleted
return nil, 0, &zodb.ErrXidMissing{Xid: xid} return nil, 0, &zodb.NoDataError{Oid: xid.Oid, DeletedAt: serial}
} }
return buf, serial, nil return buf, serial, nil
......
...@@ -68,11 +68,14 @@ func checkLoad(t *testing.T, fs *FileStorage, xid zodb.Xid, expect objState) { ...@@ -68,11 +68,14 @@ func checkLoad(t *testing.T, fs *FileStorage, xid zodb.Xid, expect objState) {
t.Helper() t.Helper()
buf, tid, err := fs.Load(context.Background(), xid) buf, tid, err := fs.Load(context.Background(), xid)
// deleted obj - it should load with "no data // deleted obj - it should load with "no data"
if expect.data == nil { if expect.data == nil {
errOk := &zodb.ErrXidMissing{Xid: xid} errOk := &zodb.LoadError{
e, ok := err.(*zodb.ErrXidMissing) URL: fs.URL(),
if !(ok && *e == *errOk) { Xid: xid,
Err: &zodb.NoDataError{Oid: xid.Oid , DeletedAt: expect.tid},
}
if !reflect.DeepEqual(err, errOk) {
t.Errorf("load %v: returned err unexpected: %v ; want: %v", xid, err, errOk) t.Errorf("load %v: returned err unexpected: %v ; want: %v", xid, err, errOk)
} }
......
...@@ -89,11 +89,11 @@ type TxnInfo struct { ...@@ -89,11 +89,11 @@ type TxnInfo struct {
} }
// DataInfo represents information about one data record. // DataInfo represents information about one object change.
type DataInfo struct { type DataInfo struct {
Oid Oid Oid Oid
Tid Tid Tid Tid // changed by this transaction
Data []byte // nil means: deleted XXX -> *Buf ? Data []byte // new object data; nil if object becomes deleted
// DataTidHint is optional hint from a storage that the same data was // DataTidHint is optional hint from a storage that the same data was
// already originally committed in earlier transaction, for example in // already originally committed in earlier transaction, for example in
...@@ -119,45 +119,70 @@ const ( ...@@ -119,45 +119,70 @@ const (
// ---- interfaces ---- // ---- interfaces ----
// ErrOidMissing is an error which tells that there is no such oid in the database at all // NoObjectError is the error which tells that there is no such object in the database at all
// type NoObjectError struct {
// XXX do we need distinction in between ErrOidMissing & ErrXidMissing ?
// (think how client should handle error from Load ?)
type ErrOidMissing struct {
Oid Oid Oid Oid
} }
func (e ErrOidMissing) Error() string { func (e NoObjectError) Error() string {
return fmt.Sprintf("%v: no such oid", e.Oid) return fmt.Sprintf("%s: no such object", e.Oid)
} }
// ErrXidMissing is an error which tells that oid exists in the database, // NoDataError is the error which tells that object exists in the database,
// but there is no its revision satisfying xid.At search criteria. // but there is no its non-empty revision satisfying search criteria.
type ErrXidMissing struct { type NoDataError struct {
Oid Oid
// DeletedAt explains object state wrt used search criteria:
// - 0: object was not created at time of searched xid.At
// - !0: object was deleted by transaction with tid=DeletedAt
DeletedAt Tid
}
func (e *NoDataError) Error() string {
if e.DeletedAt == 0 {
return fmt.Sprintf("%s: object was not yet created", e.Oid)
} else {
return fmt.Sprintf("%s: object was deleted @%s", e.Oid, e.DeletedAt)
}
}
// LoadError is the error returned by IStorageDriver.Load
type LoadError struct {
URL string
Xid Xid Xid Xid
Err error
} }
func (e *ErrXidMissing) Error() string { func (e *LoadError) Error() string {
return fmt.Sprintf("%v: no matching data record found", e.Xid) return fmt.Sprintf("%s: load %s: %v", e.URL, e.Xid, e.Err)
} }
func (e *LoadError) Cause() error {
return e.Err
}
// IStorage is the interface provided by opened ZODB storage // IStorage is the interface provided by opened ZODB storage
type IStorage interface { type IStorage interface {
IStorageDriver IStorageDriver
// URL returns URL of how the storage was opened
URL() string
// Prefetch prefetches object addressed by xid. // Prefetch prefetches object addressed by xid.
// //
// If data is not yet in cache loading for it is started in the background. // If data is not yet in cache loading for it is started in the background.
// Prefetch is not blocking operation and does not wait for loading, if any was // Prefetch is not blocking operation and does not wait for loading, if any was
// started, to complete. // started, to complete.
//
// Prefetch does not return any error.
Prefetch(ctx context.Context, xid Xid) Prefetch(ctx context.Context, xid Xid)
} }
// IStorageDriver is the raw interface provided by ZODB storage drivers // IStorageDriver is the raw interface provided by ZODB storage drivers
type IStorageDriver interface { type IStorageDriver interface {
// URL returns URL of how the storage was opened
URL() string
// Close closes storage // Close closes storage
Close() error Close() error
...@@ -174,26 +199,44 @@ type IStorageDriver interface { ...@@ -174,26 +199,44 @@ type IStorageDriver interface {
// Load loads object data addressed by xid from database. // Load loads object data addressed by xid from database.
// //
// TODO specify error when data not found -> ErrOidMissing | ErrXidMissing // Returned are:
//
// - if there is data to load: buf is non-empty, serial indicates
// transaction which matched xid criteria and err=nil.
//
// otherwise buf=nil, serial=0 and err is *LoadError with err.Err
// describing the error cause:
// //
// NOTE ZODB/py provides 2 entrypoints in IStorage for loading: // - *NoObjectError if there is no such object in database at all,
// LoadSerial and LoadBefore but in ZODB/go we have only Load which is // - *NoDataError if object exists in database but there is no
// its data matching xid,
// - some other error indicating e.g. IO problem.
//
//
// NOTE 1: ZODB/py provides 2 entrypoints in IStorage for loading:
// loadSerial and loadBefore but in ZODB/go we have only Load which is
// a bit different from both: // a bit different from both:
// //
// - Load loads object data for object at database state specified by xid.At // - Load loads object data for object at database state specified by xid.At
// - LoadBefore loads object data for object at database state previous to xid.At // - loadBefore loads object data for object at database state previous to xid.At
// it is thus equivalent to Load(..., xid.At-1) // it is thus equivalent to Load(..., xid.At-1)
// - LoadSerial loads object data from revision exactly modified // - loadSerial loads object data from revision exactly modified
// by transaction with tid = xid.At. // by transaction with tid = xid.At.
// it is thus equivalent to Load(..., xid.At) with followup // it is thus equivalent to Load(..., xid.At) with followup
// check that returned serial is exactly xid.At(*) // check that returned serial is exactly xid.At(*)
// //
// (*) LoadSerial is used only in a few places in ZODB/py - mostly in // (*) loadSerial is used only in a few places in ZODB/py - mostly in
// conflict resolution code where plain Load semantic - without // conflict resolution code where plain Load semantic - without
// checking object was particularly modified at that revision - would // checking object was particularly modified at that revision - would
// suffice. // suffice.
// //
// XXX zodb.loadBefore() returns (data, serial, serial_next) -> add serial_next? // NOTE 2: in ZODB/py loadBefore, in addition to serial, also returns
// serial_next, which constraints storage implementations unnecessarily
// and is used only in client cache.
//
// In ZODB/go Cache shows that it is possible to build efficient client
// cache without serial_next returned from Load. For this reason in ZODB/go
// Load specification comes without specifying serial_next return.
Load(ctx context.Context, xid Xid) (buf *mem.Buf, serial Tid, err error) Load(ctx context.Context, xid Xid) (buf *mem.Buf, serial Tid, err error)
// TODO: write mode // TODO: write mode
......
...@@ -53,14 +53,14 @@ func Catobj(ctx context.Context, w io.Writer, stor zodb.IStorage, xid zodb.Xid) ...@@ -53,14 +53,14 @@ func Catobj(ctx context.Context, w io.Writer, stor zodb.IStorage, xid zodb.Xid)
func Dumpobj(ctx context.Context, w io.Writer, stor zodb.IStorage, xid zodb.Xid, hashOnly bool) error { func Dumpobj(ctx context.Context, w io.Writer, stor zodb.IStorage, xid zodb.Xid, hashOnly bool) error {
var objInfo zodb.DataInfo var objInfo zodb.DataInfo
buf, tid, err := stor.Load(ctx, xid) buf, serial, err := stor.Load(ctx, xid)
if err != nil { if err != nil {
return err return err
} }
// XXX hack - TODO rework IStorage.Load to fill-in objInfo directly? // XXX hack - TODO rework IStorage.Load to fill-in objInfo directly?
objInfo.Oid = xid.Oid objInfo.Oid = xid.Oid
objInfo.Tid = tid objInfo.Tid = serial
objInfo.Data = buf.Data objInfo.Data = buf.Data
objInfo.DataTidHint = 0 // no copy detection at catobj - just dump raw content objInfo.DataTidHint = 0 // no copy detection at catobj - just dump raw content
......
...@@ -370,7 +370,7 @@ class Application(ThreadedApplication): ...@@ -370,7 +370,7 @@ class Application(ThreadedApplication):
elif self._loading_invalidated: elif self._loading_invalidated:
# oid has just been invalidated. # oid has just been invalidated.
if not next_tid: if not next_tid:
next_tid = self._loading_invalidated next_tid = self._loading_invalidated # NOTE stores up to head
self._cache.store(oid, data, tid, next_tid) self._cache.store(oid, data, tid, next_tid)
# Else, we just reconnected to the master. # Else, we just reconnected to the master.
finally: finally:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment