Commit 15af0624 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent f71eebf7
......@@ -80,9 +80,9 @@ func IndexNew() *Index {
// oid[6:8]oid[6:8]oid[6:8]...pos[2:8]pos[2:8]pos[2:8]...
const (
oidPrefixMask zodb.Oid = (1<<64-1) ^ (1<<16 - 1) // 0xffffffffffff0000
posInvalidMask uint64 = (1<<64-1) ^ (1<<48 - 1) // 0xffff000000000000
posValidMask uint64 = 1<<48 - 1 // 0x0000ffffffffffff
oidPrefixMask zodb.Oid = (1<<64 - 1) ^ (1<<16 - 1) // 0xffffffffffff0000
posInvalidMask uint64 = (1<<64 - 1) ^ (1<<48 - 1) // 0xffff000000000000
posValidMask uint64 = 1<<48 - 1 // 0x0000ffffffffffff
)
// IndexSaveError is the error type returned by index save routines
......@@ -95,80 +95,77 @@ func (e *IndexSaveError) Error() string {
}
// Save saves index to a writer
func (fsi *Index) Save(w io.Writer) error {
var err error
{
p := pickle.NewEncoder(w)
func (fsi *Index) Save(w io.Writer) (err error) {
defer func() {
if err == nil {
return
}
err = p.Encode(fsi.TopPos)
if err != nil {
goto out
if _, ok := err.(*pickle.TypeError); ok {
panic(err) // all our types are expected to be supported by pickle
}
var oidb [8]byte
var posb [8]byte
var oidPrefixCur zodb.Oid // current oid[0:6] with [6:8] = 00
oidBuf := []byte{} // current oid[6:8]oid[6:8]...
posBuf := []byte{} // current pos[2:8]pos[2:8]...
var t [2]interface{} // tuple for (oid, fsBucket.toString())
e, _ := fsi.SeekFirst()
if e != nil {
defer e.Close()
for {
oid, pos, errStop := e.Next()
oidPrefix := oid & oidPrefixMask
if oidPrefix != oidPrefixCur || errStop != nil {
// emit (oid[0:6], oid[6:8]oid[6:8]...pos[2:8]pos[2:8]...)
binary.BigEndian.PutUint64(oidb[:], uint64(oidPrefixCur))
t[0] = oidb[0:6]
t[1] = bytes.Join([][]byte{oidBuf, posBuf}, nil)
err = p.Encode(pickle.Tuple(t[:]))
if err != nil {
goto out
}
oidPrefixCur = oidPrefix
oidBuf = oidBuf[:0]
posBuf = posBuf[:0]
}
// otherwise it is an error returned by writer, which should already
// have filename & op as context.
err = &IndexSaveError{err}
}()
if errStop != nil {
break
}
p := pickle.NewEncoder(w)
// check pos does not overflow 6 bytes
if uint64(pos) & posInvalidMask != 0 {
err = fmt.Errorf("entry position too large: 0x%x", pos)
goto out
err = p.Encode(fsi.TopPos)
if err != nil {
return err
}
var oidb [8]byte
var posb [8]byte
var oidPrefixCur zodb.Oid // current oid[0:6] with [6:8] = 00
oidBuf := []byte{} // current oid[6:8]oid[6:8]...
posBuf := []byte{} // current pos[2:8]pos[2:8]...
var t [2]interface{} // tuple for (oid, fsBucket.toString())
e, _ := fsi.SeekFirst()
if e != nil {
defer e.Close()
for {
oid, pos, errStop := e.Next()
oidPrefix := oid & oidPrefixMask
if oidPrefix != oidPrefixCur || errStop != nil {
// emit (oid[0:6], oid[6:8]oid[6:8]...pos[2:8]pos[2:8]...)
binary.BigEndian.PutUint64(oidb[:], uint64(oidPrefixCur))
t[0] = oidb[0:6]
t[1] = bytes.Join([][]byte{oidBuf, posBuf}, nil)
err = p.Encode(pickle.Tuple(t[:]))
if err != nil {
return err
}
binary.BigEndian.PutUint64(oidb[:], uint64(oid))
binary.BigEndian.PutUint64(posb[:], uint64(pos))
oidPrefixCur = oidPrefix
oidBuf = oidBuf[:0]
posBuf = posBuf[:0]
}
oidBuf = append(oidBuf, oidb[6:8]...)
posBuf = append(posBuf, posb[2:8]...)
if errStop != nil {
break
}
}
err = p.Encode(pickle.None{})
}
// check pos does not overflow 6 bytes
if uint64(pos)&posInvalidMask != 0 {
return fmt.Errorf("entry position too large: 0x%x", pos)
}
out:
if err == nil {
return err
}
binary.BigEndian.PutUint64(oidb[:], uint64(oid))
binary.BigEndian.PutUint64(posb[:], uint64(pos))
if _, ok := err.(*pickle.TypeError); ok {
panic(err) // all our types are expected to be supported by pickle
oidBuf = append(oidBuf, oidb[6:8]...)
posBuf = append(posBuf, posb[2:8]...)
}
}
// otherwise it is an error returned by writer, which should already
// have filename & op as context.
return &IndexSaveError{err}
err = p.Encode(pickle.None{})
return err
}
// SaveFile saves index to a file @ path.
......@@ -178,9 +175,9 @@ out:
// updated only with complete index data.
func (fsi *Index) SaveFile(path string) error {
dir, name := filepath.Dir(path), filepath.Base(path)
f, err := ioutil.TempFile(dir, name + ".tmp")
f, err := ioutil.TempFile(dir, name+".tmp")
if err != nil {
return &IndexSaveError{err} // XXX needed?
return &IndexSaveError{err}
}
// use buffering for f (ogórek does not buffer itself on encoding)
......@@ -193,14 +190,14 @@ func (fsi *Index) SaveFile(path string) error {
os.Remove(f.Name())
err = err1
if err == nil {
err = &IndexSaveError{xerr.First(err2, err3)} // XXX needed?
err = &IndexSaveError{xerr.First(err2, err3)}
}
return err
}
err = os.Rename(f.Name(), path)
if err != nil {
return &IndexSaveError{err} // XXX needed?
return &IndexSaveError{err}
}
return nil
......@@ -210,7 +207,7 @@ func (fsi *Index) SaveFile(path string) error {
type IndexLoadError struct {
Filename string
Pos int64
Err error
Err error
}
func (e *IndexLoadError) Error() string {
......@@ -242,111 +239,101 @@ func xint64(xv interface{}) (v int64, ok bool) {
// LoadIndex loads index from a reader
func LoadIndex(r io.Reader) (fsi *Index, err error) {
var picklePos int64
defer func() {
if err != nil {
err = &IndexLoadError{xio.Name(r), picklePos, err}
}
}()
{
var ok bool
var xtopPos, xv interface{}
var ok bool
var xtopPos, xv interface{}
xr := xbufio.NewReader(r)
// by passing bufio.Reader directly we make sure it won't create one internally
p := pickle.NewDecoder(xr.Reader)
xr := xbufio.NewReader(r)
// by passing bufio.Reader directly we make sure it won't create one internally
p := pickle.NewDecoder(xr.Reader)
picklePos = xr.InputOffset()
xtopPos, err = p.Decode()
if err != nil {
return nil, err
}
topPos, ok := xint64(xtopPos)
if !ok {
return nil, fmt.Errorf("topPos is %T:%v (expected int64)", xtopPos, xtopPos)
}
fsi = IndexNew()
fsi.TopPos = topPos
var oidb [8]byte
loop:
for {
// load/decode next entry
var v pickle.Tuple
picklePos = xr.InputOffset()
xtopPos, err = p.Decode()
xv, err = p.Decode()
if err != nil {
goto out
}
topPos, ok := xint64(xtopPos)
if !ok {
err = fmt.Errorf("topPos is %T:%v (expected int64)", xtopPos, xtopPos)
goto out
return nil, err
}
fsi = IndexNew()
fsi.TopPos = topPos
var oidb [8]byte
loop:
for {
// load/decode next entry
var v pickle.Tuple
picklePos = xr.InputOffset()
xv, err = p.Decode()
if err != nil {
goto out
}
switch xv := xv.(type) {
default:
err = fmt.Errorf("invalid entry: type %T", xv)
goto out
switch xv := xv.(type) {
default:
return nil, fmt.Errorf("invalid entry: type %T", xv)
case pickle.None:
break loop
case pickle.None:
break loop
// we accept tuple or list
// XXX accept only tuple ?
case pickle.Tuple:
v = xv
case []interface{}:
v = pickle.Tuple(xv)
}
// we accept tuple or list
case pickle.Tuple:
v = xv
case []interface{}:
v = pickle.Tuple(xv)
}
// unpack entry tuple -> oidPrefix, fsBucket
if len(v) != 2 {
err = fmt.Errorf("invalid entry: len = %i", len(v))
goto out
}
// unpack entry tuple -> oidPrefix, fsBucket
if len(v) != 2 {
return nil, fmt.Errorf("invalid entry: len = %i", len(v))
}
// decode oidPrefix
xoidPrefixStr := v[0]
oidPrefixStr, ok := xoidPrefixStr.(string)
if !ok {
err = fmt.Errorf("invalid oidPrefix: type %T", xoidPrefixStr)
goto out
}
if l := len(oidPrefixStr); l != 6 {
err = fmt.Errorf("invalid oidPrefix: len = %i", l)
goto out
}
copy(oidb[:], oidPrefixStr)
oidPrefix := zodb.Oid(binary.BigEndian.Uint64(oidb[:]))
// decode oidPrefix
xoidPrefixStr := v[0]
oidPrefixStr, ok := xoidPrefixStr.(string)
if !ok {
return nil, fmt.Errorf("invalid oidPrefix: type %T", xoidPrefixStr)
}
if l := len(oidPrefixStr); l != 6 {
return nil, fmt.Errorf("invalid oidPrefix: len = %i", l)
}
copy(oidb[:], oidPrefixStr)
oidPrefix := zodb.Oid(binary.BigEndian.Uint64(oidb[:]))
// check fsBucket
xkvStr := v[1]
kvStr, ok := xkvStr.(string)
if !ok {
err = fmt.Errorf("invalid fsBucket: type %T", xkvStr)
goto out
}
if l := len(kvStr); l % 8 != 0 {
err = fmt.Errorf("invalid fsBucket: len = %i", l)
goto out
}
// check fsBucket
xkvStr := v[1]
kvStr, ok := xkvStr.(string)
if !ok {
return nil, fmt.Errorf("invalid fsBucket: type %T", xkvStr)
}
if l := len(kvStr); l%8 != 0 {
return nil, fmt.Errorf("invalid fsBucket: len = %i", l)
}
// load btree from fsBucket entries
kvBuf := mem.Bytes(kvStr)
// load btree from fsBucket entries
kvBuf := mem.Bytes(kvStr)
n := len(kvBuf) / 8
oidBuf := kvBuf[:n*2]
posBuf := kvBuf[n*2-2:] // NOTE starting 2 bytes behind
n := len(kvBuf) / 8
oidBuf := kvBuf[:n*2]
posBuf := kvBuf[n*2-2:] // NOTE starting 2 bytes behind
for i:=0; i<n; i++ {
oid := zodb.Oid(binary.BigEndian.Uint16(oidBuf[i*2:]))
oid |= oidPrefix
pos := int64(binary.BigEndian.Uint64(posBuf[i*6:]) & posValidMask)
for i := 0; i < n; i++ {
oid := zodb.Oid(binary.BigEndian.Uint16(oidBuf[i*2:]))
oid |= oidPrefix
pos := int64(binary.BigEndian.Uint64(posBuf[i*6:]) & posValidMask)
fsi.Set(oid, pos)
}
fsi.Set(oid, pos)
}
}
out:
if err == nil {
return fsi, err
}
return nil, &IndexLoadError{xio.Name(r), picklePos, err}
return fsi, nil
}
// LoadIndexFile loads index from a file @ path.
......@@ -419,10 +406,10 @@ func treeEqual(a, b *fsb.Tree) bool {
// IndexUpdateProgress is data sent by Index.Update to notify about progress
type IndexUpdateProgress struct {
TopPos int64 // data range to update to; if = -1 -- till EOF
TxnIndexed int // # transactions read/indexed so far
Index *Index // index built so far
Iter *Iter // iterator through data XXX needed?
TopPos int64 // data range to update to; if = -1 -- till EOF
TxnIndexed int // # transactions read/indexed so far
Index *Index // index built so far
Iter *Iter // iterator through data XXX needed?
}
// Update updates in-memory index from r's FileStorage data in byte-range index.TopPos..topPos
......@@ -441,11 +428,11 @@ type IndexUpdateProgress struct {
// On success returned error is nil and index.TopPos is set to either:
// - topPos (if it is != -1), or
// - r's position at which read got EOF (if topPos=-1).
func (index *Index) Update(ctx context.Context, r io.ReaderAt, topPos int64, progress func (*IndexUpdateProgress)) (err error) {
func (index *Index) Update(ctx context.Context, r io.ReaderAt, topPos int64, progress func(*IndexUpdateProgress)) (err error) {
defer xerr.Contextf(&err, "%s: reindex %v..%v", xio.Name(r), index.TopPos, topPos)
if topPos >= 0 && index.TopPos > topPos {
return fmt.Errorf("backward update requested")
return fmt.Errorf("backward update requested")
}
// XXX another way to compute index: iterate backwards - then
......@@ -485,7 +472,7 @@ func (index *Index) Update(ctx context.Context, r io.ReaderAt, topPos int64, pro
// check for topPos overlapping txn & whether we are done.
// topPos=-1 will never match here
if it.Txnh.Pos < topPos && (it.Txnh.Pos + it.Txnh.Len) > topPos {
if it.Txnh.Pos < topPos && (it.Txnh.Pos+it.Txnh.Len) > topPos {
return fmt.Errorf("transaction %v @%v overlaps topPos boundary",
it.Txnh.Tid, it.Txnh.Pos)
}
......@@ -497,7 +484,7 @@ func (index *Index) Update(ctx context.Context, r io.ReaderAt, topPos int64, pro
// do not update the index immediately so that in case of error
// in the middle of txn's data, index stays consistent and
// correct for topPos pointing to previous transaction.
update := map[zodb.Oid]int64{} // XXX malloc every time -> better reuse
update := map[zodb.Oid]int64{} // XXX malloc every time -> better reuse
for {
err = it.NextData()
if err != nil {
......@@ -547,7 +534,7 @@ func BuildIndex(ctx context.Context, r io.ReaderAt, progress func(*IndexUpdatePr
func BuildIndexForFile(ctx context.Context, path string, progress func(*IndexUpdateProgress)) (index *Index, err error) {
f, err := os.Open(path)
if err != nil {
return IndexNew(), err // XXX add err ctx?
return IndexNew(), err
}
defer func() {
......@@ -580,10 +567,10 @@ func indexCorrupt(r io.ReaderAt, format string, argv ...interface{}) *IndexCorru
// IndexVerifyProgress is data sent by Index.Verify to notify about progress
type IndexVerifyProgress struct {
TxnTotal int // total # of transactions to verify; if = -1 -- whole data
TxnTotal int // total # of transactions to verify; if = -1 -- whole data
TxnChecked int
Index *Index // index verification runs for
Iter *Iter // iterator through data
Index *Index // index verification runs for
Iter *Iter // iterator through data
OidChecked map[zodb.Oid]struct{} // oid checked so far
}
......@@ -637,7 +624,7 @@ func (index *Index) Verify(ctx context.Context, r io.ReaderAt, ntxn int, progres
wholeData = true
break
}
return oidChecked, err // XXX err ctx
return oidChecked, err
}
for {
......@@ -646,7 +633,7 @@ func (index *Index) Verify(ctx context.Context, r io.ReaderAt, ntxn int, progres
if err == io.EOF {
break
}
return oidChecked, err // XXX err ctx
return oidChecked, err
}
// if oid was already checked - do not check index anymore
......@@ -700,7 +687,7 @@ func (index *Index) Verify(ctx context.Context, r io.ReaderAt, ntxn int, progres
// VerifyForFile checks index correctness against FileStorage data in file @ path
//
// See Verify for semantic description.
func (index *Index) VerifyForFile(ctx context.Context, path string, ntxn int, progress func (*IndexVerifyProgress)) (oidChecked map[zodb.Oid]struct{}, err error) {
func (index *Index) VerifyForFile(ctx context.Context, path string, ntxn int, progress func(*IndexVerifyProgress)) (oidChecked map[zodb.Oid]struct{}, err error) {
f, err := os.Open(path)
if err != nil {
return nil, err
......@@ -716,7 +703,7 @@ func (index *Index) VerifyForFile(ctx context.Context, path string, ntxn int, pr
return nil, err
}
topPos := fi.Size() // XXX there might be last TxnInprogress transaction
topPos := fi.Size() // XXX there might be last TxnInprogress transaction
if index.TopPos != topPos {
return nil, indexCorrupt(f, "topPos mismatch: data=%v index=%v", topPos, index.TopPos)
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment