Commit 035449f9 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 8e77cf48
...@@ -379,7 +379,7 @@ func (c *Client) LastOid(ctx context.Context) (zodb.Oid, error) { ...@@ -379,7 +379,7 @@ func (c *Client) LastOid(ctx context.Context) (zodb.Oid, error) {
panic("TODO") panic("TODO")
} }
func (c *Client) Load(ctx context.Context, xid zodb.Xid) (data []byte, serial zodb.Tid, err error) { func (c *Client) Load(ctx context.Context, xid zodb.Xid) (data *zodb.Buf, serial zodb.Tid, err error) {
defer func() { defer func() {
switch err.(type) { switch err.(type) {
case nil: case nil:
...@@ -448,16 +448,21 @@ func (c *Client) Load(ctx context.Context, xid zodb.Xid) (data []byte, serial zo ...@@ -448,16 +448,21 @@ func (c *Client) Load(ctx context.Context, xid zodb.Xid) (data []byte, serial zo
data = resp.Data data = resp.Data
//checksum := sha1.Sum(data) //checksum := sha1.Sum(data.Data)
//if checksum != resp.Checksum { //if checksum != resp.Checksum {
// return nil, 0, fmt.Errorf("data corrupt: checksum mismatch") // return nil, 0, fmt.Errorf("data corrupt: checksum mismatch")
//} //}
if resp.Compression { if resp.Compression {
data, err = decompress(resp.Data, make([]byte, 0, len(resp.Data))) data2 := zodb.BufAlloc(len(data.Data))
data2.Data = data2.Data[:0]
udata, err = decompress(resp.Data, data2)
data.Free()
if err != nil { if err != nil {
data2.Free()
return nil, 0, fmt.Errorf("data corrupt: %v", err) return nil, 0, fmt.Errorf("data corrupt: %v", err)
} }
data = data2
} }
// reply.NextSerial // reply.NextSerial
......
...@@ -23,7 +23,7 @@ func lclose(ctx context.Context, c io.Closer) { ...@@ -23,7 +23,7 @@ func lclose(ctx context.Context, c io.Closer) {
// decompress decompresses data according to zlib encoding. // decompress decompresses data according to zlib encoding.
// //
// out buffer, if there is enough capacity, is used for decompression destination. // out buffer, if there is enough capacity, is used for decompression destination.
// if out has not not enough capacity a new buffer is allocated and used. // if out has not enough capacity a new buffer is allocated and used.
// //
// return: destination buffer with full decompressed data or error. // return: destination buffer with full decompressed data or error.
func decompress(in []byte, out []byte) (data []byte, err error) { func decompress(in []byte, out []byte) (data []byte, err error) {
......
...@@ -646,7 +646,8 @@ type AnswerObject struct { ...@@ -646,7 +646,8 @@ type AnswerObject struct {
NextSerial zodb.Tid // XXX but there it is out of sync NextSerial zodb.Tid // XXX but there it is out of sync
Compression bool Compression bool
Checksum Checksum Checksum Checksum
Data []byte // TODO separately (for writev) // Data []byte // TODO separately (for writev)
Data *zodb.Buf
DataSerial zodb.Tid DataSerial zodb.Tid
} }
......
...@@ -59,6 +59,14 @@ func init() { ...@@ -59,6 +59,14 @@ func init() {
// //
// buffer memory is not initialized. // buffer memory is not initialized.
func BufAlloc(size int) *Buf { func BufAlloc(size int) *Buf {
return BufAlloc64(int64(size))
}
func BufAlloc64(size int64) *Buf {
if size < 0 {
panic("invalid size")
}
// order = min i: 2^i >= size // order = min i: 2^i >= size
order := xmath.CeilLog2(uint64(size)) order := xmath.CeilLog2(uint64(size))
......
...@@ -238,7 +238,7 @@ func (dh *DataHeader) Free() { ...@@ -238,7 +238,7 @@ func (dh *DataHeader) Free() {
dhPool.Put(dh) dhPool.Put(dh)
} }
func (fs *FileStorage) Load(_ context.Context, xid zodb.Xid) (data []byte, tid zodb.Tid, err error) { func (fs *FileStorage) Load(_ context.Context, xid zodb.Xid) (data *zodb.Buf, tid zodb.Tid, err error) {
// lookup in index position of oid data record within latest transaction who changed this oid // lookup in index position of oid data record within latest transaction who changed this oid
dataPos, ok := fs.index.Get(xid.Oid) dataPos, ok := fs.index.Get(xid.Oid)
if !ok { if !ok {
...@@ -284,7 +284,7 @@ func (fs *FileStorage) Load(_ context.Context, xid zodb.Xid) (data []byte, tid z ...@@ -284,7 +284,7 @@ func (fs *FileStorage) Load(_ context.Context, xid zodb.Xid) (data []byte, tid z
tid = dh.Tid tid = dh.Tid
// TODO data -> slab // TODO data -> slab
err = dh.LoadData(fs.file, &data) data, err = dh.LoadData(fs.file)
if err != nil { if err != nil {
return nil, 0, &ErrXidLoad{xid, err} return nil, 0, &ErrXidLoad{xid, err}
} }
......
...@@ -667,31 +667,29 @@ func (dh *DataHeader) loadNext(r io.ReaderAt, txnh *TxnHeader) error { ...@@ -667,31 +667,29 @@ func (dh *DataHeader) loadNext(r io.ReaderAt, txnh *TxnHeader) error {
// LoadData loads data for the data record taking backpointers into account. // LoadData loads data for the data record taking backpointers into account.
// //
// Data is loaded into *buf, which, if needed, is reallocated to hold whole loading data size.
// NOTE on success dh state is changed to data header of original data transaction // NOTE on success dh state is changed to data header of original data transaction
// NOTE "deleted" records are indicated via returning *buf=nil // NOTE "deleted" records are indicated via returning *buf=nil
// TODO buf -> slab func (dh *DataHeader) LoadData(r io.ReaderAt) (*zodb.Buf, error) {
func (dh *DataHeader) LoadData(r io.ReaderAt, buf *[]byte) error {
// scan via backpointers // scan via backpointers
for dh.DataLen == 0 { for dh.DataLen == 0 {
err := dh.LoadBack(r) err := dh.LoadBack(r)
if err != nil { if err != nil {
if err == io.EOF { if err == io.EOF {
*buf = nil // deleted return nil, nil // deleted
return nil
} }
return err // XXX recheck return nil, err // XXX recheck
} }
} }
// now read actual data // now read actual data
*buf = xbytes.Realloc64(*buf, dh.DataLen) buf := zodb.BufAlloc64(dh.DataLen)
_, err := r.ReadAt(*buf, dh.Pos + DataHeaderSize) _, err := r.ReadAt(buf.Data, dh.Pos + DataHeaderSize)
if err != nil { if err != nil {
return dh.err("read data", noEOF(err)) // XXX recheck buf.Free()
return nil, dh.err("read data", noEOF(err)) // XXX recheck
} }
return nil return buf, nil
} }
// --- raw iteration --- // --- raw iteration ---
......
...@@ -158,8 +158,7 @@ type IStorage interface { ...@@ -158,8 +158,7 @@ type IStorage interface {
// //
// XXX currently deleted data is returned as data=nil -- is it ok? // XXX currently deleted data is returned as data=nil -- is it ok?
// TODO specify error when data not found -> ErrOidMissing | ErrXidMissing // TODO specify error when data not found -> ErrOidMissing | ErrXidMissing
// TODO data []byte -> something allocated from slab ? Load(ctx context.Context, xid Xid) (data *Buf, serial Tid, err error) // XXX -> StorageRecordInformation ?
Load(ctx context.Context, xid Xid) (data []byte, serial Tid, err error) // XXX -> StorageRecordInformation ?
// Prefetch(ctx, xid Xid) (no error) // Prefetch(ctx, xid Xid) (no error)
......
...@@ -34,12 +34,13 @@ import ( ...@@ -34,12 +34,13 @@ import (
// Catobj dumps content of one ZODB object // Catobj dumps content of one ZODB object
// The object is printed in raw form without any headers (see Dumpobj) // The object is printed in raw form without any headers (see Dumpobj)
func Catobj(ctx context.Context, w io.Writer, stor zodb.IStorage, xid zodb.Xid) error { func Catobj(ctx context.Context, w io.Writer, stor zodb.IStorage, xid zodb.Xid) error {
data, _, err := stor.Load(ctx, xid) buf, _, err := stor.Load(ctx, xid)
if err != nil { if err != nil {
return err return err
} }
_, err = w.Write(data) // NOTE delted data are returned as err by Load _, err = w.Write(buf.Data) // NOTE deleted data are returned as err by Load
buf.Free()
return err // XXX err ctx ? return err // XXX err ctx ?
} }
...@@ -47,7 +48,7 @@ func Catobj(ctx context.Context, w io.Writer, stor zodb.IStorage, xid zodb.Xid) ...@@ -47,7 +48,7 @@ func Catobj(ctx context.Context, w io.Writer, stor zodb.IStorage, xid zodb.Xid)
func Dumpobj(ctx context.Context, w io.Writer, stor zodb.IStorage, xid zodb.Xid, hashOnly bool) error { func Dumpobj(ctx context.Context, w io.Writer, stor zodb.IStorage, xid zodb.Xid, hashOnly bool) error {
var objInfo zodb.StorageRecordInformation var objInfo zodb.StorageRecordInformation
data, tid, err := stor.Load(ctx, xid) buf, tid, err := stor.Load(ctx, xid)
if err != nil { if err != nil {
return err return err
} }
...@@ -55,11 +56,12 @@ func Dumpobj(ctx context.Context, w io.Writer, stor zodb.IStorage, xid zodb.Xid, ...@@ -55,11 +56,12 @@ func Dumpobj(ctx context.Context, w io.Writer, stor zodb.IStorage, xid zodb.Xid,
// XXX hack - TODO rework IStorage.Load to fill-in objInfo directly // XXX hack - TODO rework IStorage.Load to fill-in objInfo directly
objInfo.Oid = xid.Oid objInfo.Oid = xid.Oid
objInfo.Tid = tid objInfo.Tid = tid
objInfo.Data = data objInfo.Data = buf.Data
objInfo.DataTid = tid // XXX generally wrong objInfo.DataTid = tid // XXX generally wrong
d := dumper{W: w, HashOnly: hashOnly} d := dumper{W: w, HashOnly: hashOnly}
err = d.DumpData(&objInfo) err = d.DumpData(&objInfo)
buf.Free()
return err return err
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment