Commit d91454c0 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 2b746af1
......@@ -30,11 +30,15 @@ import (
type FileStorage struct {
file *os.File
index *fsIndex // oid -> data record position in transaction which last changed oid
topPos int64 // position pointing just past last committed transaction
// (= size(.file) when no commit is in progress)
// min/max tids committed
tidMin, tidMax zodb.Tid
// transaction headers for min/max transactions committed
// XXX keep loaded with LoadNoStrings ?
txnhMin TxnHeader
txnhMax TxnHeader
// XXX topPos = txnhMax.Pos + txnhMax.Len
//topPos int64 // position pointing just past last committed transaction
// // (= size(.file) when no commit is in progress)
}
// IStorage XXX move ?
......@@ -150,6 +154,36 @@ func noEOF(err error) error {
return err
}
// okEOF returns err, but changes io.EOF -> nil
func okEOF(err error) error {
if err == io.EOF {
err = nil
}
return err
}
// CloneFrom copies txnh2 to txnh making sure underlying slices (.workMem .User
// .Desc ...) are not shared
func (txnh *TxnHeader) CloneFrom(txnh2 *TxnHeader) {
workMem := txnh.workMem
lwork2 := len(txnh2.workMem)
if cap(workMem) < lwork2 {
workMem = make([]byte, lwork2)
} else {
workMem = workMem[:lwork2]
}
*txnh = *txnh2
// now unshare slices
txnh.workMem = workMem
copy(workMem, txnh2.workMem)
luser := cap(txnh2.User)
xdesc := luser + cap(txnh2.Description)
xext := xdesc + cap(txnh2.Extension)
txnh.User = workMem[0:0:luser]
txnh.Description = workMem[luser:luser:xdesc]
txnh.Extension = workMem[xdesc:xdesc:xext]
}
// flags for TxnHeader.Load
type TxnLoadFlags int
......@@ -554,12 +588,15 @@ func (dh *DataHeader) loadNext(r io.ReaderAt /* *os.File */, txnh *TxnHeader) er
return nil
}
// Open opens FileStorage XXX text
func Open(path string) (*FileStorage, error) {
fs := &FileStorage{}
f, err := os.Open(path) // XXX opens in O_RDONLY
if err != nil {
return nil, err // XXX err more context ?
}
fs.file = f
// check file magic
var xxx [len(Magic)]byte
......@@ -577,15 +614,15 @@ func Open(path string) (*FileStorage, error) {
if err != nil {
panic(err) // XXX err
}
fs.index = index
// read tidMin/tidMax
// FIXME support empty file case
var txnhMin, txnhMax TxnHeader
err = txnhMin.Load(f, txnValidFrom, LoadAll) // XXX txnValidFrom here -> ?
err = fs.txnhMin.Load(f, txnValidFrom, LoadAll) // XXX txnValidFrom here -> ?
if err != nil {
return nil, err // XXX +context
}
err = txnhMax.Load(f, topPos, LoadAll)
err = fs.txnhMax.Load(f, topPos, LoadAll)
// expect EOF but .LenPrev must be good
if err != io.EOF {
if err == nil {
......@@ -593,30 +630,24 @@ func Open(path string) (*FileStorage, error) {
}
return nil, err // XXX +context
}
if txnhMax.LenPrev <= 0 {
if fs.txnhMax.LenPrev <= 0 {
panic("could not read LenPrev @topPos") // XXX err
}
err = txnhMax.LoadPrev(f, LoadAll)
err = fs.txnhMax.LoadPrev(f, LoadAll)
if err != nil {
panic(err) // XXX
}
return &FileStorage{
file: f,
index: index,
topPos: topPos,
tidMin: txnhMin.Tid,
tidMax: txnhMax.Tid,
}, nil
return fs, nil
}
func (fs *FileStorage) LastTid() zodb.Tid {
// XXX check we have transactions at all
// XXX what to return then?
return fs.tidMax
return fs.txnhMax.Tid
}
// ErrXidLoad is returned when there is an error while loading xid
......@@ -709,18 +740,22 @@ type txnIter struct {
}
func (fi *txnIter) NextTxn(flags TxnLoadFlags) error {
if fi.Dir == 0 {
return io.EOF
}
// XXX from what we start? how to yield 1st elem?
var err error
if fi.Dir > 0 {
switch {
case fi.Dir & 0x10 != 0:
// first element is already there - preloaded by who initialized txnIter
fi.Dir &= ^0x10
return nil
case fi.Dir > 0:
err = fi.Txnh.LoadNext(fi.fs.file, flags)
} else {
// XXX we are ok to get EOF, provided that LenPrev was read ok?
case fi.Dir < 0:
err = fi.Txnh.LoadPrev(fi.fs.file, flags)
default: // fi.Dir == 0:
return io.EOF
}
if err != nil {
......@@ -756,11 +791,12 @@ func (fsi *Iterator) NextTxn(txnInfo *zodb.TxnInfo) (dataIter zodb.IStorageRecor
}
func (fs *FileStorage) Iterate(tidMin, tidMax zodb.Tid) zodb.IStorageIterator {
if tidMin < fs.tidMin {
tidMin = fs.tidMin
// FIXME case when only 0 or 1 txn present
if tidMin < fs.txnhMin.Tid {
tidMin = fs.txnhMin.Tid
}
if tidMax > fs.tidMax {
tidMax = fs.tidMax
if tidMax > fs.txnhMax.Tid {
tidMax = fs.txnhMax.Tid
}
if tidMin > tidMax {
// -> XXX empty
......@@ -769,28 +805,22 @@ func (fs *FileStorage) Iterate(tidMin, tidMax zodb.Tid) zodb.IStorageIterator {
// scan either from file start or end, depending which way it is likely closer, to tidMin
iter := txnIter{fs: fs}
if (tidMin - fs.tidMin) < (fs.tidMax - tidMin) {
// XXX recheck how we enter loop
iter.Dir = +1
iter.Txnh.Pos = txnValidFrom // XXX -> txnStartFrom ?
// XXX .LenPrev = 0
// XXX .Len = ?
if (tidMin - fs.txnhMin.Tid) < (fs.txnhMax.Tid - tidMin) {
iter.Dir = +0x11
iter.Txnh.CloneFrom(&fs.txnhMin)
iter.TidStop = tidMin
} else {
// XXX recheck how we enter loop
iter.Dir = -1
iter.Txnh.Pos = fs.topPos
// XXX .LenPrev = ?
// XXX .Len = 0
iter.Dir = -0x11
iter.Txnh.CloneFrom(&fs.txnhMax)
iter.TidStop = tidMin
}
// XXX recheck how we enter loop
var err error
for {
err = iter.NextTxn(LoadNoStrings)
// XXX err
if err == io.EOF {
err = nil
if err != nil {
err = okEOF(err)
break
}
}
......
......@@ -15,7 +15,6 @@ package fs1
import (
"bytes"
"io"
"reflect"
"testing"
......@@ -78,6 +77,8 @@ func TestLoad(t *testing.T) {
for _, txe := range dbe.Entryv {
txh := txe.Header
// XXX check Load finds data at correct .Pos / etc ?
// loadSerial
// TODO also test for getting error when not found
xid := zodb.Xid{zodb.XTid{txh.Tid, false}, txh.Oid}
......@@ -134,9 +135,7 @@ func TestLoad(t *testing.T) {
for k := 0; ; k++ {
dataIter, err := iter.NextTxn(&txni)
if err != nil {
if err == io.EOF {
err = nil
}
err = okEOF(err)
break
}
......@@ -152,13 +151,12 @@ func TestLoad(t *testing.T) {
for {
err = dataIter.NextData(&datai)
if err != nil {
if err == io.EOF {
err = nil
}
err = okEOF(err)
break
}
}
// TODO check err
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment