Commit 362c2ca4 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 39263985
...@@ -75,9 +75,10 @@ import ( ...@@ -75,9 +75,10 @@ import (
"lab.nexedi.com/kirr/go123/xerr" "lab.nexedi.com/kirr/go123/xerr"
) )
// FileStorage is a ZODB storage which stores data in simple append-only file // FileStorage is a ZODB storage which stores data in simple append-only file
// organized as transactional log. // organized as transactional log.
//
// It is on-disk compatible with FileStorage from ZODB/py.
type FileStorage struct { type FileStorage struct {
file *os.File file *os.File
index *Index // oid -> data record position in transaction which last changed oid index *Index // oid -> data record position in transaction which last changed oid
......
...@@ -404,34 +404,28 @@ func treeEqual(a, b *fsb.Tree) bool { ...@@ -404,34 +404,28 @@ func treeEqual(a, b *fsb.Tree) bool {
// FileStorage and see there is more data; we update index from data range // FileStorage and see there is more data; we update index from data range
// not-yet covered by the index. // not-yet covered by the index.
// //
// topPos=-1 means range to update from is index.TopPos..EOF // topPos=-1 means data range to update from is index.TopPos..EOF
// //
// XXX on error existing index is in invalid state? // The index stays valid even in case of error - then index is updated but only
// XXX naming (more specific -> UpdateXXX as just Update() is confusing) // partially. The index always stays consistent as updates to it are applied as
// a whole for every data transaction. On return index.TopPos indicates till
// which position in data the index could be updated.
//
// On success returned error is nil and index.TopPos is set to either:
// - topPos (if it is != -1), or
// - r's position at which read got EOF (if topPos=-1)
func (index *Index) Update(ctx context.Context, r io.ReaderAt, topPos int64) (err error) { func (index *Index) Update(ctx context.Context, r io.ReaderAt, topPos int64) (err error) {
defer xerr.Contextf(&err, "%s: reindex", xio.Name(r)) defer xerr.Contextf(&err, "%s: reindex %v..%v", xio.Name(r), index.TopPos, topPos)
// XXX err ctx
defer func() {
// XXX it is possible to rollback index to consistent state to
// last wholly-processed txn (during every txn keep updating {}
// and if txn was not completed - rollback from that {})
if err != nil {
index.Clear()
index.TopPos = txnValidFrom
}
}()
// XXX if topPos >= 0 && topPos < index.TopPos { if topPos >= 0 && index.TopPos > topPos {
// error return fmt.Errorf("backward update requested")
// } }
// XXX another way to compute index: iterate backwards - then // XXX another way to compute index: iterate backwards - then
// 1. index entry for oid is ready right after we see oid the first time // 1. index entry for oid is ready right after we see oid the first time
// 2. we can be sure we build the whole index if we saw all oids // 2. we can be sure we build the whole index if we saw all oids
it := Iterate(r, index.TopPos, IterForward) it := Iterate(r, index.TopPos, IterForward)
for { for {
// check ctx cancel once per transaction // check ctx cancel once per transaction
select { select {
...@@ -440,26 +434,36 @@ func (index *Index) Update(ctx context.Context, r io.ReaderAt, topPos int64) (er ...@@ -440,26 +434,36 @@ func (index *Index) Update(ctx context.Context, r io.ReaderAt, topPos int64) (er
default: default:
} }
// iter to next txn
err = it.NextTxn(LoadNoStrings) err = it.NextTxn(LoadNoStrings)
if err != nil { if err != nil {
// XXX if EOF earlier topPos -> error err = okEOF(err)
return okEOF(err) if err == nil {
// if EOF earlier topPos -> error
if topPos >= 0 && index.TopPos < topPos {
err = fmt.Errorf("unexpected EOF @%v", index.TopPos)
}
}
return err
} }
// XXX check txnh.Status != TxnInprogress // XXX check txnh.Status != TxnInprogress
// check for overlapping txn & whether we are done. // check for topPos overlapping txn & whether we are done.
// topPos=-1 will never match here // topPos=-1 will never match here
if it.Txnh.Pos < topPos && (it.Txnh.Pos + it.Txnh.Len) > topPos { if it.Txnh.Pos < topPos && (it.Txnh.Pos + it.Txnh.Len) > topPos {
return fmt.Errorf("transaction %v @%v overlaps requested topPos %v", return fmt.Errorf("transaction %v @%v overlaps topPos boundary",
it.Txnh.Tid, it.Txnh.Pos, topPos) it.Txnh.Tid, it.Txnh.Pos)
} }
if it.Txnh.Pos == topPos { if it.Txnh.Pos == topPos {
return nil return nil
} }
index.TopPos = it.Txnh.Pos + it.Txnh.Len // collect data for index update in temporary place.
// do not update the index immediately so that in case of error
// in the middle of txn's data, index stays consistent and
// correct for topPos pointing to previous transaction.
update := map[zodb.Oid]int64{} // XXX malloc every time -> better reuse
for { for {
err = it.NextData() err = it.NextData()
if err != nil { if err != nil {
...@@ -470,7 +474,13 @@ func (index *Index) Update(ctx context.Context, r io.ReaderAt, topPos int64) (er ...@@ -470,7 +474,13 @@ func (index *Index) Update(ctx context.Context, r io.ReaderAt, topPos int64) (er
break break
} }
index.Set(it.Datah.Oid, it.Datah.Pos) update[it.Datah.Oid] = it.Datah.Pos
}
// update index "atomically" with data from just read transaction
index.TopPos = it.Txnh.Pos + it.Txnh.Len
for oid, pos := range update {
index.Set(oid, pos)
} }
} }
...@@ -478,7 +488,6 @@ func (index *Index) Update(ctx context.Context, r io.ReaderAt, topPos int64) (er ...@@ -478,7 +488,6 @@ func (index *Index) Update(ctx context.Context, r io.ReaderAt, topPos int64) (er
} }
// BuildIndex builds new in-memory index for data in r // BuildIndex builds new in-memory index for data in r
// XXX in case of error return partially built index? (index has .TopPos until which it covers the data)
func BuildIndex(ctx context.Context, r io.ReaderAt) (index *Index, err error) { func BuildIndex(ctx context.Context, r io.ReaderAt) (index *Index, err error) {
index = IndexNew() index = IndexNew()
index.TopPos = txnValidFrom index.TopPos = txnValidFrom
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment