Commit 7b0c4bc4 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 2eee583e
...@@ -242,13 +242,12 @@ func (txnh *TxnHeader) CloneFrom(txnh2 *TxnHeader) { ...@@ -242,13 +242,12 @@ func (txnh *TxnHeader) CloneFrom(txnh2 *TxnHeader) {
txnh.workMem = workMem txnh.workMem = workMem
copy(workMem, txnh2.workMem) copy(workMem, txnh2.workMem)
// FIXME handle case when strings were already loaded -> set len properly
luser := cap(txnh2.User) luser := cap(txnh2.User)
xdesc := luser + cap(txnh2.Description) xdesc := luser + cap(txnh2.Description)
xext := xdesc + cap(txnh2.Extension) xext := xdesc + cap(txnh2.Extension)
txnh.User = workMem[0:0:luser] txnh.User = workMem[0:0:luser] [:len(txnh2.User)]
txnh.Description = workMem[luser:luser:xdesc] txnh.Description = workMem[luser:luser:xdesc] [:len(txnh2.Description)]
txnh.Extension = workMem[xdesc:xdesc:xext] txnh.Extension = workMem[xdesc:xdesc:xext] [:len(txnh2.Extension)]
} }
// flags for TxnHeader.Load // flags for TxnHeader.Load
...@@ -761,6 +760,8 @@ func (it *Iter) NextTxn(flags TxnLoadFlags) error { ...@@ -761,6 +760,8 @@ func (it *Iter) NextTxn(flags TxnLoadFlags) error {
panic("Iter.Dir invalid") panic("Iter.Dir invalid")
} }
fmt.Printf("Iter.NextTxn dir=%v -> %v\n", it.Dir, err)
if err != nil { if err != nil {
// reset .Datah to be invalid (just in case) // reset .Datah to be invalid (just in case)
it.Datah.Pos = 0 it.Datah.Pos = 0
...@@ -887,7 +888,7 @@ func open(path string) (*FileStorage, error) { ...@@ -887,7 +888,7 @@ func open(path string) (*FileStorage, error) {
topPos := fi.Size() topPos := fi.Size()
// read tidMin/tidMax // read tidMin/tidMax
// FIXME support empty file case // FIXME support empty file case -> then both txnhMin and txnhMax stays invalid
err = fs.txnhMin.Load(f, txnValidFrom, LoadAll) // XXX txnValidFrom here -> ? err = fs.txnhMin.Load(f, txnValidFrom, LoadAll) // XXX txnValidFrom here -> ?
if err != nil { if err != nil {
return nil, err // XXX +context return nil, err // XXX +context
...@@ -1129,9 +1130,90 @@ func (e *iterStartError) NextTxn() (*zodb.TxnInfo, zodb.IStorageRecordIterator, ...@@ -1129,9 +1130,90 @@ func (e *iterStartError) NextTxn() (*zodb.TxnInfo, zodb.IStorageRecordIterator,
return nil, nil, e.err return nil, nil, e.err
} }
// findTxnRecord finds smallest transaction record with txn.tid >= tid XXX or <= ?
// if there is no such transaction returned TxnHeader will be invalid (.Pos = 0) and error = nil
// error != nil only on IO error
// XXX ^^^ text
func (fs *FileStorage) findTxnRecord(r io.ReaderAt, tid zodb.Tid) (TxnHeader, error) {
fmt.Printf("findTxn %v\n", tid)
// XXX read snapshot under lock
// NOTE cloning to unalias strings memory
var tmin, tmax TxnHeader
tmin.CloneFrom(&fs.txnhMin)
tmax.CloneFrom(&fs.txnhMax)
if tmax.Pos == 0 { // XXX -> tmax.Valid() )?
// empty database - no such record
return TxnHeader{}, nil
}
// now we know the database is not empty and thus tmin & tmax are valid
if tmax.Tid < tid {
return TxnHeader{}, nil // no such record
}
if tmin.Tid >= tid {
return tmin, nil // tmin satisfies
}
// now we know tid ∈ (tmin, tmax]
// iterate and scan either from tmin or tmax, depending which way it is
// likely closer, to searched tid.
// when iterating use IO optimized for sequential access
iter := &Iter{R: r}
if (tid - tmin.Tid) < (tmax.Tid - tid) {
fmt.Printf("forward %.1f%%\n", 100 * float64(tid - tmin.Tid) / float64(tmax.Tid - tmin.Tid))
iter.Dir = IterForward
iter.Txnh = tmin // ok not to clone - memory is already ours
} else {
fmt.Printf("backward %.1f%%\n", 100 * float64(tid - tmin.Tid) / float64(tmax.Tid - tmin.Tid))
iter.Dir = IterBackward
iter.Txnh = tmax // ok not to clone - ... ^^^
}
var txnhPrev TxnHeader
for {
txnhPrev = iter.Txnh // ok not to clone - we'll reload strings in the end
err := iter.NextTxn(LoadNoStrings)
if err != nil {
return TxnHeader{}, noEOF(err)
}
if (iter.Dir == IterForward && iter.Txnh.Tid >= tid) ||
(iter.Dir == IterBackward && iter.Txnh.Tid < tid) {
break // found (prev for backward)
}
}
// found
var txnhFound TxnHeader
if iter.Dir == IterForward {
txnhFound = iter.Txnh
} else {
txnhFound = txnhPrev
}
// load strings to make sure not to return txnh with strings data from
// another transaction
err := txnhFound.loadStrings(iter.R)
if err != nil {
return TxnHeader{}, noEOF(err)
}
return txnhFound, nil
}
// Iterate creates zodb-level iterator for tidMin..tidMax range // Iterate creates zodb-level iterator for tidMin..tidMax range
func (fs *FileStorage) Iterate(tidMin, tidMax zodb.Tid) zodb.IStorageIterator { func (fs *FileStorage) Iterate(tidMin, tidMax zodb.Tid) zodb.IStorageIterator {
fmt.Printf("iterate %v..%v\n", tidMin, tidMax) fmt.Printf("iterate %v..%v\n", tidMin, tidMax)
// XXX still needed ?
/*
// FIXME case when only 0 or 1 txn present // FIXME case when only 0 or 1 txn present
if tidMin < fs.txnhMin.Tid { if tidMin < fs.txnhMin.Tid {
tidMin = fs.txnhMin.Tid tidMin = fs.txnhMin.Tid
...@@ -1139,21 +1221,24 @@ func (fs *FileStorage) Iterate(tidMin, tidMax zodb.Tid) zodb.IStorageIterator { ...@@ -1139,21 +1221,24 @@ func (fs *FileStorage) Iterate(tidMin, tidMax zodb.Tid) zodb.IStorageIterator {
if tidMax > fs.txnhMax.Tid { if tidMax > fs.txnhMax.Tid {
tidMax = fs.txnhMax.Tid tidMax = fs.txnhMax.Tid
} }
*/
// when iterating use IO optimized for sequential access // when iterating use IO optimized for sequential access
// XXX -> IterateRaw ? // XXX -> IterateRaw ?
fsSeq := xbufio.NewSeqReaderAt(fs.file) fsSeq := xbufio.NewSeqReaderAt(fs.file)
ziter := &zIter{iter: Iter{R: fsSeq}} ziter := &zIter{iter: Iter{R: fsSeq, Dir: IterForward}, TidStop: tidMax}
iter := &ziter.iter iter := &ziter.iter
// XXX still needed?
/*
if tidMin > tidMax { if tidMin > tidMax {
ziter.zFlags |= zIterEOF // empty ziter.zFlags |= zIterEOF // empty
return ziter return ziter
} }
*/
/*
// scan either from file start or end, depending which way it is likely closer, to tidMin // scan either from file start or end, depending which way it is likely closer, to tidMin
if (tidMin - fs.txnhMin.Tid) < (fs.txnhMax.Tid - tidMin) { if (tidMin - fs.txnhMin.Tid) < (fs.txnhMax.Tid - tidMin) {
fmt.Printf("forward %.1f%%\n", 100 * float64(tidMin - fs.txnhMin.Tid) / float64(fs.txnhMax.Tid - fs.txnhMin.Tid)) fmt.Printf("forward %.1f%%\n", 100 * float64(tidMin - fs.txnhMin.Tid) / float64(fs.txnhMax.Tid - fs.txnhMin.Tid))
iter.Dir = IterForward iter.Dir = IterForward
...@@ -1168,22 +1253,34 @@ func (fs *FileStorage) Iterate(tidMin, tidMax zodb.Tid) zodb.IStorageIterator { ...@@ -1168,22 +1253,34 @@ func (fs *FileStorage) Iterate(tidMin, tidMax zodb.Tid) zodb.IStorageIterator {
ziter.TidStop = tidMin ziter.TidStop = tidMin
} }
println("AAA")
// XXX recheck how we enter loop // XXX recheck how we enter loop
var err error var err error
for { for {
err = iter.NextTxn(LoadNoStrings) err = iter.NextTxn(LoadNoStrings) // XXX -> ziter
if err != nil { if err != nil {
err = okEOF(err) err = okEOF(err)
break break
} }
} }
println("BBB")
*/
// find first txn : txn.tid >= tidMin
txnh, err := fs.findTxnRecord(fsSeq, tidMin)
if err != nil { if err != nil {
return &iterStartError{err} // XXX err ctx return &iterStartError{err} // XXX err ctx
} }
if txnh.Pos == 0 { // XXX -> txnh.Valid() ?
ziter.zFlags |= zIterEOF // empty
return ziter
}
//fmt.Printf("tidRange: %v..%v -> found %v @%v\n", tidMin, tidMax, iter.Txnh.Tid, iter.Txnh.Pos) fmt.Printf("tidRange: %v..%v -> found %v @%v\n", tidMin, tidMax, txnh.Tid, txnh.Pos)
fmt.Printf("\t%#v\n", txnh)
/*
// where to start around tidMin found - let's reinitialize iter to // where to start around tidMin found - let's reinitialize iter to
// iterate appropriately forward up to tidMax // iterate appropriately forward up to tidMax
ziter.zFlags &= ^zIterEOF ziter.zFlags &= ^zIterEOF
...@@ -1195,7 +1292,15 @@ func (fs *FileStorage) Iterate(tidMin, tidMax zodb.Tid) zodb.IStorageIterator { ...@@ -1195,7 +1292,15 @@ func (fs *FileStorage) Iterate(tidMin, tidMax zodb.Tid) zodb.IStorageIterator {
} }
ziter.zFlags |= zIterPreloaded ziter.zFlags |= zIterPreloaded
} }
*/
iter.Txnh = txnh
iter.Datah.Pos = txnh.DataPos()
iter.Datah.DataLen = -DataHeaderSize // first iteration will go to first data record
iter.Dir = IterForward iter.Dir = IterForward
ziter.zFlags |= zIterPreloaded
ziter.TidStop = tidMax ziter.TidStop = tidMax
return ziter return ziter
......
...@@ -150,6 +150,7 @@ func testIterate(t *testing.T, fs *FileStorage, tidMin, tidMax zodb.Tid, expectv ...@@ -150,6 +150,7 @@ func testIterate(t *testing.T, fs *FileStorage, tidMin, tidMax zodb.Tid, expectv
for k := 0; ; k++ { for k := 0; ; k++ {
txnErrorf := func(format string, a ...interface{}) { txnErrorf := func(format string, a ...interface{}) {
t.Helper()
subj := fmt.Sprintf("iterating %v..%v: step %v#%v", tidMin, tidMax, k, len(expectv)) subj := fmt.Sprintf("iterating %v..%v: step %v#%v", tidMin, tidMax, k, len(expectv))
msg := fmt.Sprintf(format, a...) msg := fmt.Sprintf(format, a...)
t.Errorf("%v: %v", subj, msg) t.Errorf("%v: %v", subj, msg)
...@@ -191,6 +192,7 @@ func testIterate(t *testing.T, fs *FileStorage, tidMin, tidMax zodb.Tid, expectv ...@@ -191,6 +192,7 @@ func testIterate(t *testing.T, fs *FileStorage, tidMin, tidMax zodb.Tid, expectv
for kdata := 0; ; kdata++ { for kdata := 0; ; kdata++ {
dataErrorf := func(format string, a...interface{}) { dataErrorf := func(format string, a...interface{}) {
t.Helper()
dsubj := fmt.Sprintf("dstep %v#%v", kdata, len(dbe.Entryv)) dsubj := fmt.Sprintf("dstep %v#%v", kdata, len(dbe.Entryv))
msg := fmt.Sprintf(format, a...) msg := fmt.Sprintf(format, a...)
txnErrorf("%v: %v", dsubj, msg) txnErrorf("%v: %v", dsubj, msg)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment