Commit bbe228d8 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 07b8e592
...@@ -699,8 +699,8 @@ func open(path string) (*FileStorage, error) { ...@@ -699,8 +699,8 @@ func open(path string) (*FileStorage, error) {
*/ */
// determine topPos from file size // determine topPos from file size
// if it is invalid (e.g. a transaction half-way commit) we'll catch it // if it is invalid (e.g. a transaction committed only half-way) we'll catch it
// while loading/recreating index // while loading/recreating index XXX recheck this logic
fi, err := f.Stat() fi, err := f.Stat()
if err != nil { if err != nil {
return nil, err // XXX err ctx return nil, err // XXX err ctx
...@@ -715,6 +715,7 @@ func open(path string) (*FileStorage, error) { ...@@ -715,6 +715,7 @@ func open(path string) (*FileStorage, error) {
} }
err = fs.txnhMax.Load(f, topPos, LoadAll) err = fs.txnhMax.Load(f, topPos, LoadAll)
// expect EOF but .LenPrev must be good // expect EOF but .LenPrev must be good
// FIXME ^^^ it will be no EOF if a txn was committed only partially
if err != io.EOF { if err != io.EOF {
if err == nil { if err == nil {
err = fmt.Errorf("no EOF after topPos") // XXX err context err = fmt.Errorf("no EOF after topPos") // XXX err context
...@@ -742,13 +743,13 @@ func Open(ctx context.Context, path string) (*FileStorage, error) { ...@@ -742,13 +743,13 @@ func Open(ctx context.Context, path string) (*FileStorage, error) {
} }
// TODO recreate index if missing / not sane (cancel this job on ctx.Done) // TODO recreate index if missing / not sane (cancel this job on ctx.Done)
topPos, index, err := LoadIndexFile(path + ".index") index, err := LoadIndexFile(path + ".index")
if err != nil { if err != nil {
panic(err) // XXX err panic(err) // XXX err
} }
// TODO verify index sane / topPos matches // TODO verify index sane / topPos matches
if topPos != fs.txnhMax.Pos + fs.txnhMax.Len { if index.TopPos != fs.txnhMax.Pos + fs.txnhMax.Len {
panic("inconsistent index topPos") // XXX panic("inconsistent index topPos") // XXX
} }
...@@ -1044,3 +1045,50 @@ func (fs *FileStorage) Iterate(tidMin, tidMax zodb.Tid) zodb.IStorageIterator { ...@@ -1044,3 +1045,50 @@ func (fs *FileStorage) Iterate(tidMin, tidMax zodb.Tid) zodb.IStorageIterator {
return &Iter return &Iter
} }
// ComputeIndex builds new in-memory index for FileStorage
func (fs *FileStorage) ComputeIndex(ctx context.Context, path string) (topPos int64, index *Index, err error) {
topPos = txnValidFrom
index = IndexNew()
// similar to Iterate but we know we start from the beginning and do
// not load actual data - only data headers.
fsSeq := xbufio.NewSeqReaderAt(fs.file)
// pre-setup txnh so that txnh.LoadNext starts loading from the beginning of file
txnh := &TxnHeader{Pos: 0, Len: topPos, TxnInfo: zodb.TxnInfo{Tid: 0}}
dh := &DataHeader{}
loop:
for {
err = txnh.LoadNext(fsSeq, LoadNoStrings)
if err != nil {
err = okEOF(err)
break
}
topPos = txnh.Pos + txnh.Len
// first data iteration will go to first data record
dh.Pos = txnh.DataPos()
dh.DataLen = -DataHeaderSize
for {
err = dh.LoadNext(fsSeq, txnh)
if err != nil {
err = okEOF(err)
if err != nil {
break loop
}
break
}
index.Set(dh.Oid, dh.Pos)
}
}
if err != nil {
return 0, nil, err
}
return topPos, index, nil
}
...@@ -46,11 +46,16 @@ import ( ...@@ -46,11 +46,16 @@ import (
// Index is Oid -> Data record position mapping used to associate Oid with // Index is Oid -> Data record position mapping used to associate Oid with
// Data record in latest transaction which changed it. // Data record in latest transaction which changed it.
type Index struct { type Index struct {
// this index covers data file up to < .TopPos
// usually for whole-file index TopPos is position pointing just past
// the last committed transaction.
TopPos int64
*fsb.Tree *fsb.Tree
} }
func IndexNew() *Index { func IndexNew() *Index {
return &Index{fsb.TreeNew()} return &Index{Tree: fsb.TreeNew()}
} }
...@@ -59,7 +64,7 @@ func IndexNew() *Index { ...@@ -59,7 +64,7 @@ func IndexNew() *Index {
// on-disk index format // on-disk index format
// (changed in 2010 in https://github.com/zopefoundation/ZODB/commit/1bb14faf) // (changed in 2010 in https://github.com/zopefoundation/ZODB/commit/1bb14faf)
// //
// topPos position pointing just past the last committed transaction // TopPos
// (oid[:6], fsBucket) // (oid[:6], fsBucket)
// (oid[:6], fsBucket) // (oid[:6], fsBucket)
// ... // ...
...@@ -86,13 +91,13 @@ func (e *IndexSaveError) Error() string { ...@@ -86,13 +91,13 @@ func (e *IndexSaveError) Error() string {
} }
// Save saves index to a writer // Save saves index to a writer
func (fsi *Index) Save(topPos int64, w io.Writer) error { func (fsi *Index) Save(w io.Writer) error {
var err error var err error
{ {
p := pickle.NewEncoder(w) p := pickle.NewEncoder(w)
err = p.Encode(topPos) err = p.Encode(fsi.TopPos)
if err != nil { if err != nil {
goto out goto out
} }
...@@ -163,7 +168,7 @@ out: ...@@ -163,7 +168,7 @@ out:
} }
// SaveFile saves index to a file // SaveFile saves index to a file
func (fsi *Index) SaveFile(topPos int64, path string) (err error) { func (fsi *Index) SaveFile(path string) (err error) {
f, err := os.Create(path) f, err := os.Create(path)
if err != nil { if err != nil {
return &IndexSaveError{err} return &IndexSaveError{err}
...@@ -178,7 +183,7 @@ func (fsi *Index) SaveFile(topPos int64, path string) (err error) { ...@@ -178,7 +183,7 @@ func (fsi *Index) SaveFile(topPos int64, path string) (err error) {
} }
}() }()
err = fsi.Save(topPos, f) err = fsi.Save(f)
return return
} }
...@@ -217,7 +222,7 @@ func xint64(xv interface{}) (v int64, ok bool) { ...@@ -217,7 +222,7 @@ func xint64(xv interface{}) (v int64, ok bool) {
} }
// LoadIndex loads index from a reader // LoadIndex loads index from a reader
func LoadIndex(r io.Reader) (topPos int64, fsi *Index, err error) { func LoadIndex(r io.Reader) (fsi *Index, err error) {
var picklePos int64 var picklePos int64
{ {
...@@ -233,13 +238,14 @@ func LoadIndex(r io.Reader) (topPos int64, fsi *Index, err error) { ...@@ -233,13 +238,14 @@ func LoadIndex(r io.Reader) (topPos int64, fsi *Index, err error) {
if err != nil { if err != nil {
goto out goto out
} }
topPos, ok = xint64(xtopPos) topPos, ok := xint64(xtopPos)
if !ok { if !ok {
err = fmt.Errorf("topPos is %T:%v (expected int64)", xtopPos, xtopPos) err = fmt.Errorf("topPos is %T:%v (expected int64)", xtopPos, xtopPos)
goto out goto out
} }
fsi = IndexNew() fsi = IndexNew()
fsi.TopPos = topPos
var oidb [8]byte var oidb [8]byte
loop: loop:
...@@ -319,24 +325,24 @@ func LoadIndex(r io.Reader) (topPos int64, fsi *Index, err error) { ...@@ -319,24 +325,24 @@ func LoadIndex(r io.Reader) (topPos int64, fsi *Index, err error) {
out: out:
if err == nil { if err == nil {
return topPos, fsi, err return fsi, err
} }
return 0, nil, &IndexLoadError{xio.Name(r), picklePos, err} return nil, &IndexLoadError{xio.Name(r), picklePos, err}
} }
// LoadIndexFile loads index from a file @ path // LoadIndexFile loads index from a file @ path
func LoadIndexFile(path string) (topPos int64, fsi *Index, err error) { func LoadIndexFile(path string) (fsi *Index, err error) {
f, err := os.Open(path) f, err := os.Open(path)
if err != nil { if err != nil {
return 0, nil, &IndexLoadError{path, -1, err} return nil, &IndexLoadError{path, -1, err}
} }
defer func() { defer func() {
err2 := f.Close() err2 := f.Close()
if err2 != nil && err == nil { if err2 != nil && err == nil {
err = &IndexLoadError{path, -1, err} err = &IndexLoadError{path, -1, err}
topPos, fsi = 0, nil fsi = nil
} }
}() }()
...@@ -346,13 +352,47 @@ func LoadIndexFile(path string) (topPos int64, fsi *Index, err error) { ...@@ -346,13 +352,47 @@ func LoadIndexFile(path string) (topPos int64, fsi *Index, err error) {
// ---------------------------------------- // ----------------------------------------
// ComputeIndex builds new in-memory index for a file @ path // Equal returns whether two indices are the same
func Reindex(ctx context.Context, path string) (*Index, error) { func (a *Index) Equal(b *Index) bool {
fs, err := open(path) // XXX open read-only if a.TopPos != b.TopPos {
if err != nil { return false
return nil, err }
return treeEqual(a.Tree, b.Tree)
}
// treeEqual returns whether two trees are the same
func treeEqual(a, b *fsb.Tree) bool {
if a.Len() != b.Len() {
return false
}
ea, _ := a.SeekFirst()
eb, _ := b.SeekFirst()
if ea == nil {
// this means len(a) == 0 -> len(b) == 0 -> eb = nil
return true
}
defer ea.Close()
defer eb.Close()
for {
ka, va, stopa := ea.Next()
kb, vb, stopb := eb.Next()
if stopa != nil || stopb != nil {
if stopa != stopb {
panic("same-length trees iteration did not end at the same time")
}
break
}
if !(ka == kb && va == vb) {
return false
}
} }
defer fs.Close() // XXX err?
// TODO iterate - compute return true
} }
...@@ -59,48 +59,12 @@ var indexTest1 = [...]indexEntry { ...@@ -59,48 +59,12 @@ var indexTest1 = [...]indexEntry {
{0xa000000000000000, 0x0000ffffffffffff}, {0xa000000000000000, 0x0000ffffffffffff},
} }
func setIndex(fsi *fsIndex, kv []indexEntry) { func setIndex(fsi *Index, kv []indexEntry) {
for _, entry := range kv { for _, entry := range kv {
fsi.Set(entry.oid, entry.pos) fsi.Set(entry.oid, entry.pos)
} }
} }
// test whether two trees are equal
func treeEqual(a, b *fsb.Tree) bool {
if a.Len() != b.Len() {
return false
}
ea, _ := a.SeekFirst()
eb, _ := b.SeekFirst()
if ea == nil {
// this means len(a) == 0 -> len(b) == 0 -> eb = nil
return true
}
defer ea.Close()
defer eb.Close()
for {
ka, va, stopa := ea.Next()
kb, vb, stopb := eb.Next()
if stopa != nil || stopb != nil {
if stopa != stopb {
panic("same-length trees iteration did not end at the same time")
}
break
}
if !(ka == kb && va == vb) {
return false
}
}
return true
}
// XXX unneded after Tree.Dump() was made to work ok // XXX unneded after Tree.Dump() was made to work ok
func treeString(t *fsb.Tree) string { func treeString(t *fsb.Tree) string {
entryv := []string{} entryv := []string{}
...@@ -123,7 +87,7 @@ func treeString(t *fsb.Tree) string { ...@@ -123,7 +87,7 @@ func treeString(t *fsb.Tree) string {
func TestIndexLookup(t *testing.T) { func TestIndexLookup(t *testing.T) {
// the lookup is tested in cznic.b itself // the lookup is tested in cznic.b itself
// here we only lightly exercise it // here we only lightly exercise it
fsi := fsIndexNew() fsi := IndexNew()
if fsi.Len() != 0 { if fsi.Len() != 0 {
t.Errorf("index created non empty") t.Errorf("index created non empty")
...@@ -178,9 +142,13 @@ func TestIndexLookup(t *testing.T) { ...@@ -178,9 +142,13 @@ func TestIndexLookup(t *testing.T) {
} }
} }
func checkIndexEqual(t *testing.T, subject string, topPos1, topPos2 int64, fsi1, fsi2 *fsIndex) { func checkIndexEqual(t *testing.T, subject string, fsi1, fsi2 *Index) {
if topPos1 != topPos2 { if fsi1.Equal(fsi2) {
t.Errorf("%s: topPos mismatch: %v ; want %v", subject, topPos1, topPos2) return
}
if fsi1.TopPos != fsi2.TopPos {
t.Errorf("%s: topPos mismatch: %v ; want %v", subject, fsi1.TopPos, fsi2.TopPos)
} }
if !treeEqual(fsi1.Tree, fsi2.Tree) { if !treeEqual(fsi1.Tree, fsi2.Tree) {
...@@ -193,21 +161,21 @@ func checkIndexEqual(t *testing.T, subject string, topPos1, topPos2 int64, fsi1, ...@@ -193,21 +161,21 @@ func checkIndexEqual(t *testing.T, subject string, topPos1, topPos2 int64, fsi1,
func TestIndexSaveLoad(t *testing.T) { func TestIndexSaveLoad(t *testing.T) {
workdir := xworkdir(t) workdir := xworkdir(t)
topPos := int64(786) fsi := IndexNew()
fsi := fsIndexNew() fsi.TopPos = int64(786)
setIndex(fsi, indexTest1[:]) setIndex(fsi, indexTest1[:])
err := fsi.SaveFile(topPos, workdir + "/1.fs.index") err := fsi.SaveFile(workdir + "/1.fs.index")
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
topPos2, fsi2, err := LoadIndexFile(workdir + "/1.fs.index") fsi2, err := LoadIndexFile(workdir + "/1.fs.index")
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
checkIndexEqual(t, "index load", topPos2, topPos, fsi2, fsi) checkIndexEqual(t, "index load", fsi2, fsi)
// TODO check with // TODO check with
// {0xb000000000000000, 0x7fffffffffffffff}, // will cause 'entry position too large' // {0xb000000000000000, 0x7fffffffffffffff}, // will cause 'entry position too large'
...@@ -216,15 +184,16 @@ func TestIndexSaveLoad(t *testing.T) { ...@@ -216,15 +184,16 @@ func TestIndexSaveLoad(t *testing.T) {
// test that we can correctly load index data as saved by zodb/py // test that we can correctly load index data as saved by zodb/py
func TestIndexLoadFromPy(t *testing.T) { func TestIndexLoadFromPy(t *testing.T) {
topPosPy, fsiPy, err := LoadIndexFile("testdata/1.fs.index") fsiPy, err := LoadIndexFile("testdata/1.fs.index")
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
fsiExpect := fsIndexNew() fsiExpect := IndexNew()
fsiExpect.TopPos = _1fs_indexTopPos
setIndex(fsiExpect, _1fs_indexEntryv[:]) setIndex(fsiExpect, _1fs_indexEntryv[:])
checkIndexEqual(t, "index load", topPosPy, _1fs_indexTopPos, fsiPy, fsiExpect) checkIndexEqual(t, "index load", fsiPy, fsiExpect)
} }
// test zodb/py can read index data as saved by us // test zodb/py can read index data as saved by us
...@@ -232,10 +201,11 @@ func TestIndexSaveToPy(t *testing.T) { ...@@ -232,10 +201,11 @@ func TestIndexSaveToPy(t *testing.T) {
needZODBPy(t) needZODBPy(t)
workdir := xworkdir(t) workdir := xworkdir(t)
fsi := fsIndexNew() fsi := IndexNew()
fsi.TopPos = _1fs_indexTopPos
setIndex(fsi, _1fs_indexEntryv[:]) setIndex(fsi, _1fs_indexEntryv[:])
err := fsi.SaveFile(_1fs_indexTopPos, workdir + "/1.fs.index") err := fsi.SaveFile(workdir + "/1.fs.index")
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -254,7 +224,7 @@ func TestIndexSaveToPy(t *testing.T) { ...@@ -254,7 +224,7 @@ func TestIndexSaveToPy(t *testing.T) {
func BenchmarkIndexLoad(b *testing.B) { func BenchmarkIndexLoad(b *testing.B) {
// FIXME small testdata/1.fs is not representative for benchmarks // FIXME small testdata/1.fs is not representative for benchmarks
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
_, _, err := LoadIndexFile("testdata/1.fs.index") _, err := LoadIndexFile("testdata/1.fs.index")
if err != nil { if err != nil {
b.Fatal(err) b.Fatal(err)
} }
...@@ -263,7 +233,7 @@ func BenchmarkIndexLoad(b *testing.B) { ...@@ -263,7 +233,7 @@ func BenchmarkIndexLoad(b *testing.B) {
func BenchmarkIndexGet(b *testing.B) { func BenchmarkIndexGet(b *testing.B) {
// FIXME small testdata/1.fs is not representative for benchmarks // FIXME small testdata/1.fs is not representative for benchmarks
_, fsi, err := LoadIndexFile("testdata/1.fs.index") fsi, err := LoadIndexFile("testdata/1.fs.index")
if err != nil { if err != nil {
b.Fatal(err) b.Fatal(err)
} }
......
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
// See https://www.nexedi.com/licensing for rationale and options. // See https://www.nexedi.com/licensing for rationale and options.
package zodbtools package zodbtools
// infrastructure to organize main tools driver // infrastructure to organize main driver program
import ( import (
"flag" "flag"
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment