Commit 517db5de authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent ab494bf9
...@@ -19,8 +19,8 @@ ...@@ -19,8 +19,8 @@
// Package fs1 provides so-called FileStorage v1 ZODB storage. // Package fs1 provides so-called FileStorage v1 ZODB storage.
// //
// FileStorage is a single file organized as a append-only log of transactions // FileStorage is a single file organized as a simple append-only log of
// with data changes. Every transaction record consists of: // transactions with data changes. Every transaction record consists of:
// //
// - transaction record header represented by TxnHeader, // - transaction record header represented by TxnHeader,
// - several data records corresponding to modified objects, // - several data records corresponding to modified objects,
...@@ -279,7 +279,7 @@ const ( ...@@ -279,7 +279,7 @@ const (
// LenPrev == 0 prev record length could not be read // LenPrev == 0 prev record length could not be read
// LenPrev == -1 EOF backward // LenPrev == -1 EOF backward
// LenPrev >= TxnHeaderFixSize LenPrev was read/checked normally // LenPrev >= TxnHeaderFixSize LenPrev was read/checked normally
func (txnh *TxnHeader) Load(r io.ReaderAt /* *os.File */, pos int64, flags TxnLoadFlags) error { func (txnh *TxnHeader) Load(r io.ReaderAt, pos int64, flags TxnLoadFlags) error {
if cap(txnh.workMem) < txnXHeaderFixSize { if cap(txnh.workMem) < txnXHeaderFixSize {
txnh.workMem = make([]byte, txnXHeaderFixSize, 256 /* to later avoid allocation for typical strings */) txnh.workMem = make([]byte, txnXHeaderFixSize, 256 /* to later avoid allocation for typical strings */)
} }
...@@ -383,7 +383,7 @@ func (txnh *TxnHeader) Load(r io.ReaderAt /* *os.File */, pos int64, flags TxnLo ...@@ -383,7 +383,7 @@ func (txnh *TxnHeader) Load(r io.ReaderAt /* *os.File */, pos int64, flags TxnLo
} }
// loadStrings makes sure strings that are part of transaction header are loaded // loadStrings makes sure strings that are part of transaction header are loaded
func (txnh *TxnHeader) loadStrings(r io.ReaderAt /* *os.File */) error { func (txnh *TxnHeader) loadStrings(r io.ReaderAt) error {
// XXX make it no-op if strings are already loaded? // XXX make it no-op if strings are already loaded?
// we rely on Load leaving len(workMem) = sum of all strings length ... // we rely on Load leaving len(workMem) = sum of all strings length ...
...@@ -437,7 +437,7 @@ func (txnh *TxnHeader) LoadPrev(r io.ReaderAt, flags TxnLoadFlags) error { ...@@ -437,7 +437,7 @@ func (txnh *TxnHeader) LoadPrev(r io.ReaderAt, flags TxnLoadFlags) error {
return nil return nil
} }
// LoadNext reads and decodes next transaction record header // LoadNext reads and decodes next transaction record header.
// prerequisite: txnh .Pos and .Len should be already initialized by: XXX also .Tid // prerequisite: txnh .Pos and .Len should be already initialized by: XXX also .Tid
// - previous successful call to Load() initially XXX ^^^ // - previous successful call to Load() initially XXX ^^^
// - TODO // - TODO
...@@ -492,7 +492,7 @@ func (dh *DataHeader) Len() int64 { ...@@ -492,7 +492,7 @@ func (dh *DataHeader) Len() int64 {
// Load reads and decodes data record header. // Load reads and decodes data record header.
// pos: points to data header start // pos: points to data header start
// no prerequisite requirements are made to previous dh state // no prerequisite requirements are made to previous dh state
func (dh *DataHeader) Load(r io.ReaderAt /* *os.File */, pos int64) error { func (dh *DataHeader) Load(r io.ReaderAt, pos int64) error {
dh.Pos = pos dh.Pos = pos
// XXX .Len = 0 = read error ? // XXX .Len = 0 = read error ?
...@@ -548,7 +548,7 @@ func (dh *DataHeader) Load(r io.ReaderAt /* *os.File */, pos int64) error { ...@@ -548,7 +548,7 @@ func (dh *DataHeader) Load(r io.ReaderAt /* *os.File */, pos int64) error {
// prerequisite: dh .Oid .Tid .PrevRevPos are initialized: // prerequisite: dh .Oid .Tid .PrevRevPos are initialized:
// - TODO describe how // - TODO describe how
// when there is no previous revision: io.EOF is returned // when there is no previous revision: io.EOF is returned
func (dh *DataHeader) LoadPrevRev(r io.ReaderAt /* *os.File */) error { func (dh *DataHeader) LoadPrevRev(r io.ReaderAt) error {
if dh.PrevRevPos == 0 { if dh.PrevRevPos == 0 {
return io.EOF // no more previous revisions return io.EOF // no more previous revisions
} }
...@@ -563,7 +563,7 @@ func (dh *DataHeader) LoadPrevRev(r io.ReaderAt /* *os.File */) error { ...@@ -563,7 +563,7 @@ func (dh *DataHeader) LoadPrevRev(r io.ReaderAt /* *os.File */) error {
return err return err
} }
func (dh *DataHeader) loadPrevRev(r io.ReaderAt /* *os.File */) error { func (dh *DataHeader) loadPrevRev(r io.ReaderAt) error {
oid := dh.Oid oid := dh.Oid
tid := dh.Tid tid := dh.Tid
...@@ -588,7 +588,7 @@ func (dh *DataHeader) loadPrevRev(r io.ReaderAt /* *os.File */) error { ...@@ -588,7 +588,7 @@ func (dh *DataHeader) loadPrevRev(r io.ReaderAt /* *os.File */) error {
// LoadBack reads and decodes data header for revision linked via back-pointer. // LoadBack reads and decodes data header for revision linked via back-pointer.
// prerequisite: dh XXX .DataLen == 0 // prerequisite: dh XXX .DataLen == 0
// if link is to zero (means deleted record) io.EOF is returned // if link is to zero (means deleted record) io.EOF is returned
func (dh *DataHeader) LoadBack(r io.ReaderAt /* *os.File */) error { func (dh *DataHeader) LoadBack(r io.ReaderAt) error {
if dh.DataLen != 0 { if dh.DataLen != 0 {
bug(dh, "LoadBack() on non-backpointer data header") bug(dh, "LoadBack() on non-backpointer data header")
} }
...@@ -638,7 +638,7 @@ func (dh *DataHeader) LoadBack(r io.ReaderAt /* *os.File */) error { ...@@ -638,7 +638,7 @@ func (dh *DataHeader) LoadBack(r io.ReaderAt /* *os.File */) error {
// LoadNext reads and decodes data header for next data record in the same transaction. // LoadNext reads and decodes data header for next data record in the same transaction.
// prerequisite: dh .Pos .DataLen are initialized // prerequisite: dh .Pos .DataLen are initialized
// when there is no more data records: io.EOF is returned // when there is no more data records: io.EOF is returned
func (dh *DataHeader) LoadNext(r io.ReaderAt /* *os.File */, txnh *TxnHeader) error { func (dh *DataHeader) LoadNext(r io.ReaderAt, txnh *TxnHeader) error {
err := dh.loadNext(r, txnh) err := dh.loadNext(r, txnh)
if err != nil && err != io.EOF { if err != nil && err != io.EOF {
err = txnh.err("iterating", err) err = txnh.err("iterating", err)
...@@ -646,7 +646,7 @@ func (dh *DataHeader) LoadNext(r io.ReaderAt /* *os.File */, txnh *TxnHeader) er ...@@ -646,7 +646,7 @@ func (dh *DataHeader) LoadNext(r io.ReaderAt /* *os.File */, txnh *TxnHeader) er
return err return err
} }
func (dh *DataHeader) loadNext(r io.ReaderAt /* *os.File */, txnh *TxnHeader) error { func (dh *DataHeader) loadNext(r io.ReaderAt, txnh *TxnHeader) error {
// position of txn tail - right after last data record byte // position of txn tail - right after last data record byte
txnTailPos := txnh.Pos + txnh.Len - 8 txnTailPos := txnh.Pos + txnh.Len - 8
...@@ -680,11 +680,11 @@ func (dh *DataHeader) loadNext(r io.ReaderAt /* *os.File */, txnh *TxnHeader) er ...@@ -680,11 +680,11 @@ func (dh *DataHeader) loadNext(r io.ReaderAt /* *os.File */, txnh *TxnHeader) er
} }
// LoadData loads data for the data record taking backpointers into account. // LoadData loads data for the data record taking backpointers into account.
// Data is loaded into *buf, which, if needed, is reallocated to hold all loading data size XXX // Data is loaded into *buf, which, if needed, is reallocated to hold whole loading data size.
// NOTE on success dh state is changed to data header of original data transaction // NOTE on success dh state is changed to data header of original data transaction
// NOTE "deleted" records are indicated via returning *buf=nil // NOTE "deleted" records are indicated via returning *buf=nil
// TODO buf -> slab // TODO buf -> slab
func (dh *DataHeader) LoadData(r io.ReaderAt /* *os.File */, buf *[]byte) error { func (dh *DataHeader) LoadData(r io.ReaderAt, buf *[]byte) error {
// scan via backpointers // scan via backpointers
for dh.DataLen == 0 { for dh.DataLen == 0 {
err := dh.LoadBack(r) err := dh.LoadBack(r)
...@@ -707,6 +707,12 @@ func (dh *DataHeader) LoadData(r io.ReaderAt /* *os.File */, buf *[]byte) error ...@@ -707,6 +707,12 @@ func (dh *DataHeader) LoadData(r io.ReaderAt /* *os.File */, buf *[]byte) error
return nil return nil
} }
// --- FileStorage ---
func (fs *FileStorage) StorageName() string {
return "FileStorage v1"
}
// open opens FileStorage without loading index // open opens FileStorage without loading index
func open(path string) (*FileStorage, error) { func open(path string) (*FileStorage, error) {
fs := &FileStorage{} fs := &FileStorage{}
...@@ -797,6 +803,16 @@ func Open(ctx context.Context, path string) (*FileStorage, error) { ...@@ -797,6 +803,16 @@ func Open(ctx context.Context, path string) (*FileStorage, error) {
return fs, nil return fs, nil
} }
func (fs *FileStorage) Close() error {
// TODO dump index
err := fs.file.Close()
if err != nil {
return err
}
fs.file = nil
return nil
}
func (fs *FileStorage) LastTid() (zodb.Tid, error) { func (fs *FileStorage) LastTid() (zodb.Tid, error) {
// XXX check we have transactions at all // XXX check we have transactions at all
...@@ -876,22 +892,8 @@ func (fs *FileStorage) Load(xid zodb.Xid) (data []byte, tid zodb.Tid, err error) ...@@ -876,22 +892,8 @@ func (fs *FileStorage) Load(xid zodb.Xid) (data []byte, tid zodb.Tid, err error)
return data, tid, nil return data, tid, nil
} }
func (fs *FileStorage) Close() error {
// TODO dump index
err := fs.file.Close()
if err != nil {
return err
}
fs.file = nil
return nil
}
func (fs *FileStorage) StorageName() string {
return "FileStorage v1"
}
// zodb.IStorage iteration
// iteration
type iterFlags int type iterFlags int
const ( const (
...@@ -1090,7 +1092,6 @@ func (fs *FileStorage) Iterate(tidMin, tidMax zodb.Tid) zodb.IStorageIterator { ...@@ -1090,7 +1092,6 @@ func (fs *FileStorage) Iterate(tidMin, tidMax zodb.Tid) zodb.IStorageIterator {
func (fs *FileStorage) computeIndex(ctx context.Context) (index *Index, err error) { func (fs *FileStorage) computeIndex(ctx context.Context) (index *Index, err error) {
// TODO err ctx <file>: <reindex>: // TODO err ctx <file>: <reindex>:
// XXX handle ctx cancel
index = IndexNew() index = IndexNew()
index.TopPos = txnValidFrom index.TopPos = txnValidFrom
...@@ -1099,12 +1100,18 @@ func (fs *FileStorage) computeIndex(ctx context.Context) (index *Index, err erro ...@@ -1099,12 +1100,18 @@ func (fs *FileStorage) computeIndex(ctx context.Context) (index *Index, err erro
fsSeq := xbufio.NewSeqReaderAt(fs.file) fsSeq := xbufio.NewSeqReaderAt(fs.file)
// pre-setup txnh so that txnh.LoadNext starts loading from the beginning of file // pre-setup txnh so that txnh.LoadNext starts loading from the beginning of file
//txnh := &TxnHeader{Pos: 0, Len: index.TopPos, TxnInfo: zodb.TxnInfo{Tid: 0}}
txnh := &TxnHeader{Pos: index.TopPos, Len: -2} // XXX -2 txnh := &TxnHeader{Pos: index.TopPos, Len: -2} // XXX -2
dh := &DataHeader{} dh := &DataHeader{}
loop: loop:
for { for {
// check ctx cancel once per transaction
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
// XXX merge logic into LoadNext/LoadPrev // XXX merge logic into LoadNext/LoadPrev
switch txnh.Len { switch txnh.Len {
case -2: case -2:
...@@ -1118,6 +1125,8 @@ loop: ...@@ -1118,6 +1125,8 @@ loop:
break break
} }
// XXX check txnh.Status != TxnInprogress
index.TopPos = txnh.Pos + txnh.Len index.TopPos = txnh.Pos + txnh.Len
// first data iteration will go to first data record // first data iteration will go to first data record
...@@ -1160,13 +1169,14 @@ func (fs *FileStorage) loadIndex() error { ...@@ -1160,13 +1169,14 @@ func (fs *FileStorage) loadIndex() error {
} }
fs.index = index fs.index = index
return nil
} }
// saveIndex flushes in-RAM index to disk // saveIndex flushes in-RAM index to disk
func (fs *FileStorage) saveIndex() error { func (fs *FileStorage) saveIndex() error {
// XXX lock? // XXX lock?
err = fs.index.SaveFile(fs.file.Name() + ".index") err := fs.index.SaveFile(fs.file.Name() + ".index")
return err return err
} }
...@@ -1195,7 +1205,7 @@ func (fs *FileStorage) VerifyIndex(ctx context.Context) error { ...@@ -1195,7 +1205,7 @@ func (fs *FileStorage) VerifyIndex(ctx context.Context) error {
} }
if !indexOk.Equal(fs.index) { if !indexOk.Equal(fs.index) {
err = &ErrIndexCorrupt{index: fs.index, indexOk: indexOk} err = &IndexCorruptError{index: fs.index, indexOk: indexOk}
} }
return err return err
......
...@@ -20,36 +20,39 @@ ...@@ -20,36 +20,39 @@
package fs1tools package fs1tools
import ( import (
"context"
"flag" "flag"
"fmt" "fmt"
"io" "io"
"os" "os"
"log" "log"
"lab.nexedi.com/kirr/neo/go/zodb/storage/fs1"
) )
// Reindex rebuilds index for FileStorage file @ path // Reindex rebuilds index for FileStorage file @ path
func Reindex(path string) error { func Reindex(path string) error {
// XXX open read-only // XXX open read-only
fs, err := fs1.Open(contex.Background(), path, fs1.OpenWithoutIndex) fs, err := fs1.Open(context.Background(), path) // XXX , fs1.OpenWithoutIndex)
if err != nil { if err != nil {
return nil // XXX err ctx return nil // XXX err ctx
} }
defer fs.Close() // XXX err defer fs.Close() // XXX err
err = fs.Reindex() err = fs.Reindex(nil)
return err // XXX ok? return err // XXX ok?
} }
// VerifyIndexFor verifies that on-disk index for FileStorage file @ path is correct // VerifyIndexFor verifies that on-disk index for FileStorage file @ path is correct
func VerifyIndexFor(path string) error { func VerifyIndexFor(path string) error {
// XXX open read-only // XXX open read-only
fs, err := fs1.Open(contex.Background(), path, 0) fs, err := fs1.Open(context.Background(), path) // XXX , 0)
if err != nil { if err != nil {
return nil // XXX err ctx return nil // XXX err ctx
} }
defer fs.Close() // XXX err defer fs.Close() // XXX err
err = fs.VerifyIndex() err = fs.VerifyIndex(nil)
return err return err
//fs.Index() //fs.Index()
//fs.ComputeIndex //fs.ComputeIndex
......
...@@ -19,7 +19,7 @@ ...@@ -19,7 +19,7 @@
package fs1tools package fs1tools
//go:generate sh -c "python2 -m ZODB.scripts.fstail -n 1000000 ../../testdata/1.fs >testdata/1.fsdump.ok" //go:generate sh -c "python2 -m ZODB.scripts.fstail -n 1000000 ../testdata/1.fs >testdata/1.fstail.ok"
import ( import (
"bytes" "bytes"
...@@ -54,7 +54,7 @@ func TestTail(t *testing.T) { ...@@ -54,7 +54,7 @@ func TestTail(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
dumpOk := loadFile(t, "testdata/1.fsdump.ok") dumpOk := loadFile(t, "testdata/1.fstail.ok")
if dumpOk != buf.String() { if dumpOk != buf.String() {
t.Errorf("dump different:\n%v", diff(dumpOk, buf.String())) t.Errorf("dump different:\n%v", diff(dumpOk, buf.String()))
......
...@@ -17,9 +17,6 @@ ...@@ -17,9 +17,6 @@
// See COPYING file for full licensing terms. // See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options. // See https://www.nexedi.com/licensing for rationale and options.
// XXX partly based on code from ZODB ?
// TODO link to format in zodb/py
package fs1 package fs1
// FileStorage v1. Index // FileStorage v1. Index
...@@ -75,7 +72,6 @@ func IndexNew() *Index { ...@@ -75,7 +72,6 @@ func IndexNew() *Index {
// fsBucket: // fsBucket:
// oid[6:8]oid[6:8]oid[6:8]...pos[2:8]pos[2:8]pos[2:8]... // oid[6:8]oid[6:8]oid[6:8]...pos[2:8]pos[2:8]pos[2:8]...
const ( const (
oidPrefixMask zodb.Oid = (1<<64-1) ^ (1<<16 - 1) // 0xffffffffffff0000 oidPrefixMask zodb.Oid = (1<<64-1) ^ (1<<16 - 1) // 0xffffffffffff0000
posInvalidMask uint64 = (1<<64-1) ^ (1<<48 - 1) // 0xffff000000000000 posInvalidMask uint64 = (1<<64-1) ^ (1<<48 - 1) // 0xffff000000000000
...@@ -362,7 +358,7 @@ func (a *Index) Equal(b *Index) bool { ...@@ -362,7 +358,7 @@ func (a *Index) Equal(b *Index) bool {
return treeEqual(a.Tree, b.Tree) return treeEqual(a.Tree, b.Tree)
} }
// treeEqual returns whether two trees are the same // treeEqual returns whether two fsb.Tree are the same
func treeEqual(a, b *fsb.Tree) bool { func treeEqual(a, b *fsb.Tree) bool {
if a.Len() != b.Len() { if a.Len() != b.Len() {
return false return false
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment