Commit f44b8da3 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 1c4409fb
...@@ -41,10 +41,8 @@ type FileHeader struct { ...@@ -41,10 +41,8 @@ type FileHeader struct {
// TxnHeader represents header of a transaction record // TxnHeader represents header of a transaction record
type TxnHeader struct { type TxnHeader struct {
Pos int64 // position of transaction start Pos int64 // position of transaction start
LenPrev int64 // whole previous transaction record length LenPrev int64 // whole previous transaction record length (see EOF/error rules in Load)
// (-1 if there is no previous txn record) XXX see rules in Load Len int64 // whole transaction record length (see EOF/error rules in Load)
Len int64 // whole transaction record length XXX see rules in Load
// ^^^ FIXME make Len be raw len as stored on disk (currently it is len-on-disk + 8)
// transaction metadata itself // transaction metadata itself
zodb.TxnInfo zodb.TxnInfo
...@@ -60,7 +58,6 @@ type DataHeader struct { ...@@ -60,7 +58,6 @@ type DataHeader struct {
Pos int64 // position of data record start Pos int64 // position of data record start
Oid zodb.Oid Oid zodb.Oid
Tid zodb.Tid Tid zodb.Tid
// XXX -> .PosPrevRev .PosTxn .LenData?
PrevRevPos int64 // position of this oid's previous-revision data record PrevRevPos int64 // position of this oid's previous-revision data record
TxnPos int64 // position of transaction record this data record belongs to TxnPos int64 // position of transaction record this data record belongs to
//_ uint16 // 2-bytes with zero values. (Was version length.) //_ uint16 // 2-bytes with zero values. (Was version length.)
...@@ -234,7 +231,6 @@ func (txnh *TxnHeader) Load(r io.ReaderAt, pos int64, flags TxnLoadFlags) error ...@@ -234,7 +231,6 @@ func (txnh *TxnHeader) Load(r io.ReaderAt, pos int64, flags TxnLoadFlags) error
} }
work := txnh.workMem[:txnXHeaderFixSize] work := txnh.workMem[:txnXHeaderFixSize]
// XXX recheck rules about error exit
txnh.Pos = pos txnh.Pos = pos
txnh.Len = 0 // read error txnh.Len = 0 // read error
txnh.LenPrev = 0 // read error txnh.LenPrev = 0 // read error
...@@ -292,11 +288,11 @@ func (txnh *TxnHeader) Load(r io.ReaderAt, pos int64, flags TxnLoadFlags) error ...@@ -292,11 +288,11 @@ func (txnh *TxnHeader) Load(r io.ReaderAt, pos int64, flags TxnLoadFlags) error
return checkErr(txnh, "invalid txn record length: %v", tlen) return checkErr(txnh, "invalid txn record length: %v", tlen)
} }
// XXX also check tlen to not go beyond file size ? // XXX also check tlen to not go beyond file size ?
txnh.Len = tlen // txnh.Len will be set =tlen at last - after checking other fields for correctness.
txnh.Status = zodb.TxnStatus(work[8+16]) txnh.Status = zodb.TxnStatus(work[8+16])
if !txnh.Status.Valid() { if !txnh.Status.Valid() {
return checkErr(txnh, "invalid status: %v", txnh.Status) return checkErr(txnh, "invalid status: %q", txnh.Status)
} }
...@@ -305,10 +301,14 @@ func (txnh *TxnHeader) Load(r io.ReaderAt, pos int64, flags TxnLoadFlags) error ...@@ -305,10 +301,14 @@ func (txnh *TxnHeader) Load(r io.ReaderAt, pos int64, flags TxnLoadFlags) error
lext := binary.BigEndian.Uint16(work[8+21:]) lext := binary.BigEndian.Uint16(work[8+21:])
lstr := int(luser) + int(ldesc) + int(lext) lstr := int(luser) + int(ldesc) + int(lext)
if TxnHeaderFixSize + int64(lstr) + 8 > txnh.Len { if TxnHeaderFixSize + int64(lstr) + 8 > tlen {
return checkErr(txnh, "strings overlap with txn boundary: %v / %v", lstr, txnh.Len) return checkErr(txnh, "strings overlap with txn boundary: %v / %v", lstr, tlen)
} }
// set .Len at last after doing all header checks
// this way we make sure we never return bad fixed TxnHeader with non-error .Len
txnh.Len = tlen
// NOTE we encode whole strings length into len(.workMem) // NOTE we encode whole strings length into len(.workMem)
txnh.workMem = xbytes.Realloc(txnh.workMem, lstr) txnh.workMem = xbytes.Realloc(txnh.workMem, lstr)
...@@ -352,12 +352,13 @@ func (txnh *TxnHeader) loadStrings(r io.ReaderAt) error { ...@@ -352,12 +352,13 @@ func (txnh *TxnHeader) loadStrings(r io.ReaderAt) error {
// LoadPrev reads and decodes previous transaction record header. // LoadPrev reads and decodes previous transaction record header.
// //
// prerequisite: txnh .Pos, .LenPrev and .Len are initialized: XXX (.Len for .Tid) // prerequisites:
// - by successful call to Load() initially XXX but EOF also works //
// - by subsequent successful calls to LoadPrev / LoadNext XXX recheck // - txnh .Pos, .LenPrev are initialized
// - optionally if .Len is initialized and txnh was loaded - tid↓ will be also checked
func (txnh *TxnHeader) LoadPrev(r io.ReaderAt, flags TxnLoadFlags) error { func (txnh *TxnHeader) LoadPrev(r io.ReaderAt, flags TxnLoadFlags) error {
lenPrev := txnh.LenPrev lenPrev := txnh.LenPrev
switch lenPrev { // XXX recheck states for: 1) LenPrev load error 2) EOF switch lenPrev {
case 0: case 0:
bug(txnh, "LoadPrev() when .LenPrev == error") bug(txnh, "LoadPrev() when .LenPrev == error")
case -1: case -1:
...@@ -405,9 +406,7 @@ func (txnh *TxnHeader) LoadPrev(r io.ReaderAt, flags TxnLoadFlags) error { ...@@ -405,9 +406,7 @@ func (txnh *TxnHeader) LoadPrev(r io.ReaderAt, flags TxnLoadFlags) error {
// LoadNext reads and decodes next transaction record header. // LoadNext reads and decodes next transaction record header.
// //
// prerequisite: txnh .Pos and .Len should be already initialized by: XXX also .Tid // prerequisite: txnh .Pos, .Len and .Tid should be already initialized.
// - previous successful call to Load() initially XXX ^^^
// - TODO
func (txnh *TxnHeader) LoadNext(r io.ReaderAt, flags TxnLoadFlags) error { func (txnh *TxnHeader) LoadNext(r io.ReaderAt, flags TxnLoadFlags) error {
lenCur := txnh.Len lenCur := txnh.Len
posCur := txnh.Pos posCur := txnh.Pos
...@@ -440,7 +439,7 @@ func (txnh *TxnHeader) LoadNext(r io.ReaderAt, flags TxnLoadFlags) error { ...@@ -440,7 +439,7 @@ func (txnh *TxnHeader) LoadNext(r io.ReaderAt, flags TxnLoadFlags) error {
// check tid↑ // check tid↑
if txnh.Tid <= tidCur { if txnh.Tid <= tidCur {
return checkErr(txnh, "tid monotonity broken: %v ; prev: %v", txnh.Tid, tidCur) return checkErr(txnh, "tid broken: %v ; prev: %v", txnh.Tid, tidCur)
} }
return nil return nil
...@@ -452,7 +451,6 @@ func (txnh *TxnHeader) LoadNext(r io.ReaderAt, flags TxnLoadFlags) error { ...@@ -452,7 +451,6 @@ func (txnh *TxnHeader) LoadNext(r io.ReaderAt, flags TxnLoadFlags) error {
func (dh *DataHeader) Len() int64 { func (dh *DataHeader) Len() int64 {
dataLen := dh.DataLen dataLen := dh.DataLen
if dataLen == 0 { if dataLen == 0 {
// XXX -> .DataLen() ?
dataLen = 8 // back-pointer | oid removal dataLen = 8 // back-pointer | oid removal
} }
...@@ -460,13 +458,14 @@ func (dh *DataHeader) Len() int64 { ...@@ -460,13 +458,14 @@ func (dh *DataHeader) Len() int64 {
} }
// Load reads and decodes data record header. // Load reads and decodes data record header @ pos.
//
// Only the data record header is loaded, not data itself.
// See LoadData for actually loading record's data.
// //
// pos: points to data header start // No prerequisite requirements are made to previous dh state.
// no prerequisite requirements are made to previous dh state
func (dh *DataHeader) Load(r io.ReaderAt, pos int64) error { func (dh *DataHeader) Load(r io.ReaderAt, pos int64) error {
dh.Pos = pos dh.Pos = -1 // ~ error
// XXX .Len = 0 = read error ?
if pos < dataValidFrom { if pos < dataValidFrom {
bug(dh, "Load() on invalid position") bug(dh, "Load() on invalid position")
...@@ -513,14 +512,15 @@ func (dh *DataHeader) Load(r io.ReaderAt, pos int64) error { ...@@ -513,14 +512,15 @@ func (dh *DataHeader) Load(r io.ReaderAt, pos int64) error {
return checkErr(dh, "invalid data len: %v", dh.DataLen) return checkErr(dh, "invalid data len: %v", dh.DataLen)
} }
dh.Pos = pos
return nil return nil
} }
// LoadPrevRev reads and decodes previous revision data record header. // LoadPrevRev reads and decodes previous revision data record header.
// //
// prerequisite: dh .Oid .Tid .PrevRevPos are initialized. // Prerequisite: dh .Oid .Tid .PrevRevPos are initialized.
// //
// when there is no previous revision: io.EOF is returned // When there is no previous revision: io.EOF is returned.
func (dh *DataHeader) LoadPrevRev(r io.ReaderAt) error { func (dh *DataHeader) LoadPrevRev(r io.ReaderAt) error {
if dh.PrevRevPos == 0 { if dh.PrevRevPos == 0 {
return io.EOF // no more previous revisions return io.EOF // no more previous revisions
...@@ -559,7 +559,7 @@ func (dh *DataHeader) loadPrevRev(r io.ReaderAt, prevPos int64) error { ...@@ -559,7 +559,7 @@ func (dh *DataHeader) loadPrevRev(r io.ReaderAt, prevPos int64) error {
// LoadBackRef reads data for the data record and decodes it as backpointer reference. // LoadBackRef reads data for the data record and decodes it as backpointer reference.
// //
// prerequisite: dh loaded and .LenData == 0 (data record with back-pointer). // Prerequisite: dh loaded and .LenData == 0 (data record with back-pointer).
func (dh *DataHeader) LoadBackRef(r io.ReaderAt) (backPos int64, err error) { func (dh *DataHeader) LoadBackRef(r io.ReaderAt) (backPos int64, err error) {
if dh.DataLen != 0 { if dh.DataLen != 0 {
bug(dh, "LoadBack() on non-backpointer data header") bug(dh, "LoadBack() on non-backpointer data header")
...@@ -583,9 +583,9 @@ func (dh *DataHeader) LoadBackRef(r io.ReaderAt) (backPos int64, err error) { ...@@ -583,9 +583,9 @@ func (dh *DataHeader) LoadBackRef(r io.ReaderAt) (backPos int64, err error) {
// LoadBack reads and decodes data header for revision linked via back-pointer. // LoadBack reads and decodes data header for revision linked via back-pointer.
// //
// prerequisite: dh is loaded and .DataLen == 0 // Prerequisite: dh is loaded and .DataLen == 0.
// //
// if link is to zero (means deleted record) io.EOF is returned // If link is to zero (means deleted record) io.EOF is returned.
func (dh *DataHeader) LoadBack(r io.ReaderAt) error { func (dh *DataHeader) LoadBack(r io.ReaderAt) error {
backPos, err := dh.LoadBackRef(r) backPos, err := dh.LoadBackRef(r)
if err != nil { if err != nil {
...@@ -609,7 +609,7 @@ func (dh *DataHeader) LoadBack(r io.ReaderAt) error { ...@@ -609,7 +609,7 @@ func (dh *DataHeader) LoadBack(r io.ReaderAt) error {
// LoadNext reads and decodes data header for next data record in the same transaction. // LoadNext reads and decodes data header for next data record in the same transaction.
// //
// prerequisite: dh .Pos .DataLen are initialized. // Prerequisite: dh .Pos .DataLen are initialized.
// //
// When there is no more data records: io.EOF is returned. // When there is no more data records: io.EOF is returned.
func (dh *DataHeader) LoadNext(r io.ReaderAt, txnh *TxnHeader) error { func (dh *DataHeader) LoadNext(r io.ReaderAt, txnh *TxnHeader) error {
...@@ -658,8 +658,8 @@ func (dh *DataHeader) loadNext(r io.ReaderAt, txnh *TxnHeader) error { ...@@ -658,8 +658,8 @@ func (dh *DataHeader) loadNext(r io.ReaderAt, txnh *TxnHeader) error {
// LoadData loads data for the data record taking backpointers into account. // LoadData loads data for the data record taking backpointers into account.
// //
// NOTE on success dh state is changed to data header of original data transaction // NOTE on success dh state is changed to data header of original data transaction.
// NOTE "deleted" records are indicated via returning *buf=nil // NOTE "deleted" records are indicated via returning buf with .Data=nil.
func (dh *DataHeader) LoadData(r io.ReaderAt) (*zodb.Buf, error) { func (dh *DataHeader) LoadData(r io.ReaderAt) (*zodb.Buf, error) {
// scan via backpointers // scan via backpointers
for dh.DataLen == 0 { for dh.DataLen == 0 {
...@@ -668,7 +668,7 @@ func (dh *DataHeader) LoadData(r io.ReaderAt) (*zodb.Buf, error) { ...@@ -668,7 +668,7 @@ func (dh *DataHeader) LoadData(r io.ReaderAt) (*zodb.Buf, error) {
if err == io.EOF { if err == io.EOF {
return &zodb.Buf{Data: nil}, nil // deleted return &zodb.Buf{Data: nil}, nil // deleted
} }
return nil, err // XXX recheck return nil, err
} }
} }
...@@ -677,7 +677,7 @@ func (dh *DataHeader) LoadData(r io.ReaderAt) (*zodb.Buf, error) { ...@@ -677,7 +677,7 @@ func (dh *DataHeader) LoadData(r io.ReaderAt) (*zodb.Buf, error) {
_, err := r.ReadAt(buf.Data, dh.Pos + DataHeaderSize) _, err := r.ReadAt(buf.Data, dh.Pos + DataHeaderSize)
if err != nil { if err != nil {
buf.Release() buf.Release()
return nil, dh.err("read data", noEOF(err)) // XXX recheck return nil, dh.err("read data", noEOF(err))
} }
return buf, nil return buf, nil
...@@ -701,6 +701,7 @@ const ( ...@@ -701,6 +701,7 @@ const (
) )
// NextTxn iterates to next/previous transaction record according to iteration direction. // NextTxn iterates to next/previous transaction record according to iteration direction.
//
// The data header is reset to iterate inside transaction record that becomes current. // The data header is reset to iterate inside transaction record that becomes current.
func (it *Iter) NextTxn(flags TxnLoadFlags) error { func (it *Iter) NextTxn(flags TxnLoadFlags) error {
var err error var err error
...@@ -713,8 +714,6 @@ func (it *Iter) NextTxn(flags TxnLoadFlags) error { ...@@ -713,8 +714,6 @@ func (it *Iter) NextTxn(flags TxnLoadFlags) error {
panic("Iter.Dir invalid") panic("Iter.Dir invalid")
} }
//fmt.Printf("Iter.NextTxn dir=%v -> %v\n", it.Dir, err)
if err != nil { if err != nil {
// reset .Datah to be invalid (just in case) // reset .Datah to be invalid (just in case)
it.Datah.Pos = 0 it.Datah.Pos = 0
...@@ -750,6 +749,7 @@ func Iterate(r io.ReaderAt, posStart int64, dir IterDir) *Iter { ...@@ -750,6 +749,7 @@ func Iterate(r io.ReaderAt, posStart int64, dir IterDir) *Iter {
// IterateFile opens file @ path read-only and creates Iter to iterate over it. // IterateFile opens file @ path read-only and creates Iter to iterate over it.
//
// The iteration will use buffering over os.File optimized for sequential access. // The iteration will use buffering over os.File optimized for sequential access.
// You are responsible to eventually close the file after the iteration is done. // You are responsible to eventually close the file after the iteration is done.
func IterateFile(path string, dir IterDir) (iter *Iter, file *os.File, err error) { func IterateFile(path string, dir IterDir) (iter *Iter, file *os.File, err error) {
...@@ -774,6 +774,7 @@ func IterateFile(path string, dir IterDir) (iter *Iter, file *os.File, err error ...@@ -774,6 +774,7 @@ func IterateFile(path string, dir IterDir) (iter *Iter, file *os.File, err error
case IterBackward: case IterBackward:
// get file size as topPos and start iterating backward from it // get file size as topPos and start iterating backward from it
// XXX half-committed transaction might be there.
fi, err := f.Stat() fi, err := f.Stat()
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
......
...@@ -47,7 +47,7 @@ func (p byOid) Len() int { return len(p) } ...@@ -47,7 +47,7 @@ func (p byOid) Len() int { return len(p) }
func (p byOid) Swap(i, j int) { p[i], p[j] = p[j], p[i] } func (p byOid) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
func (p byOid) Less(i, j int) bool { return p[i].oid < p[j].oid } func (p byOid) Less(i, j int) bool { return p[i].oid < p[j].oid }
var indexTest1 = [...]indexEntry { var indexTest1 = [...]indexEntry{
{0x0000000000000000, 111}, {0x0000000000000000, 111},
{0x0000000000000001, 222}, {0x0000000000000001, 222},
{0x000000000000ffff, 333}, {0x000000000000ffff, 333},
...@@ -108,7 +108,7 @@ func TestIndexLookup(t *testing.T) { ...@@ -108,7 +108,7 @@ func TestIndexLookup(t *testing.T) {
} }
// try non-existing entries too // try non-existing entries too
oid := entry.oid ^ (1<<32) oid := entry.oid ^ (1 << 32)
pos, ok = fsi.Get(oid) pos, ok = fsi.Get(oid)
if !(pos == 0 && ok == false) { if !(pos == 0 && ok == false) {
t.Errorf("fsi[%x] -> got (%x, %v) ; want (0, false)", oid, pos, ok) t.Errorf("fsi[%x] -> got (%x, %v) ; want (0, false)", oid, pos, ok)
...@@ -124,7 +124,7 @@ func TestIndexLookup(t *testing.T) { ...@@ -124,7 +124,7 @@ func TestIndexLookup(t *testing.T) {
sort.Sort(byOid(tt[:])) sort.Sort(byOid(tt[:]))
i := 0 i := 0
for ;; i++ { for ; ; i++ {
oid, pos, errStop := e.Next() oid, pos, errStop := e.Next()
if errStop != nil { if errStop != nil {
break break
...@@ -182,8 +182,7 @@ func TestIndexSaveLoad(t *testing.T) { ...@@ -182,8 +182,7 @@ func TestIndexSaveLoad(t *testing.T) {
// {0xb000000000000000, 0x7fffffffffffffff}, // will cause 'entry position too large' // {0xb000000000000000, 0x7fffffffffffffff}, // will cause 'entry position too large'
} }
var _1fs_index = func() *Index {
var _1fs_index = func () *Index {
idx := IndexNew() idx := IndexNew()
idx.TopPos = _1fs_indexTopPos idx.TopPos = _1fs_indexTopPos
setIndex(idx, _1fs_indexEntryv[:]) setIndex(idx, _1fs_indexEntryv[:])
...@@ -211,7 +210,7 @@ func TestIndexSaveToPy(t *testing.T) { ...@@ -211,7 +210,7 @@ func TestIndexSaveToPy(t *testing.T) {
} }
// now ask python part to compare testdata and saved-by-us index // now ask python part to compare testdata and saved-by-us index
cmd := exec.Command("./py/indexcmp", "testdata/1.fs.index", workdir + "/1.fs.index") cmd := exec.Command("./py/indexcmp", "testdata/1.fs.index", workdir+"/1.fs.index")
cmd.Stdout = os.Stdout cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr cmd.Stderr = os.Stderr
err = cmd.Run() err = cmd.Run()
...@@ -236,7 +235,7 @@ func TestIndexBuildVerify(t *testing.T) { ...@@ -236,7 +235,7 @@ func TestIndexBuildVerify(t *testing.T) {
} }
pos0, _ := index.Get(0) pos0, _ := index.Get(0)
index.Set(0, pos0 + 1) index.Set(0, pos0+1)
_, err = index.VerifyForFile(context.Background(), "testdata/1.fs", -1, nil) _, err = index.VerifyForFile(context.Background(), "testdata/1.fs", -1, nil)
if err == nil { if err == nil {
t.Fatalf("index verify: expected error after tweak") t.Fatalf("index verify: expected error after tweak")
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment