Commit 558c79cf authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent a17d2e12
...@@ -226,14 +226,20 @@ func okEOF(err error) error { ...@@ -226,14 +226,20 @@ func okEOF(err error) error {
// --- File header --- // --- File header ---
// Load reads and decodes file header.
func (fh *FileHeader) Load(r io.ReaderAt) error { func (fh *FileHeader) Load(r io.ReaderAt) error {
_, err = f.ReadAt(fh.Magic[:], 0) _, err := r.ReadAt(fh.Magic[:], 0)
err = okEOF(err)
if err != nil { if err != nil {
return err // XXX err more context return fh.err("read", err)
//return err // XXX err more context
} }
if string(fh.Magic[:]) != Magic { if string(fh.Magic[:]) != Magic {
return fmt.Errorf("%s: invalid magic %q", path, fh.Magic) // XXX -> decode err //return fmt.Errorf("%s: invalid magic %q", path, fh.Magic) // XXX -> decode err
return decodeErr(fh, "invalid magic %q", fh.Magic)
} }
return nil
} }
// --- Transaction record --- // --- Transaction record ---
...@@ -910,24 +916,6 @@ func (fs *FileStorage) Load(xid zodb.Xid) (data []byte, tid zodb.Tid, err error) ...@@ -910,24 +916,6 @@ func (fs *FileStorage) Load(xid zodb.Xid) (data []byte, tid zodb.Tid, err error)
// --- raw iteration --- // --- raw iteration ---
/*
// TxnIter is iterator over transaction records
type TxnIter struct {
fsSeq *xbufio.SeqReaderAt
Txnh TxnHeader // current transaction record information
Flags iterFlags // XXX iterate forward (> 0) / backward (< 0) / EOF reached (== 0)
}
// DataIter is iterator over data records inside one transaction
type DataIter struct {
fsSeq *xbufio.SeqReaderAt
Txnh *TxnHeader // header of transaction we are iterating inside
Datah DataHeader // current data record information
}
*/
// Iter is combined 2-level iterator over transaction and data records // Iter is combined 2-level iterator over transaction and data records
type Iter struct { type Iter struct {
fsSeq *xbufio.SeqReaderAt fsSeq *xbufio.SeqReaderAt
...@@ -938,46 +926,44 @@ type Iter struct { ...@@ -938,46 +926,44 @@ type Iter struct {
} }
// NextTxn iterates to next/previous transaction record according to iteration direction // NextTxn iterates to next/previous transaction record according to iteration direction.
// The data header is reset to iterate inside transaction record that became current.
func (it *Iter) NextTxn(flags TxnLoadFlags) error { func (it *Iter) NextTxn(flags TxnLoadFlags) error {
var err error var err error
if ti.Flags & iterDir != 0 { if it.Flags & iterDir != 0 {
err = it.Txnh.LoadNext(ti.fsSeq, flags) err = it.Txnh.LoadNext(it.fsSeq, flags)
} else { } else {
err = it.Txnh.LoadPrev(ti.fsSeq, flags) err = it.Txnh.LoadPrev(it.fsSeq, flags)
} }
//fmt.Println("loaded:", ti.Txnh.Tid) //fmt.Println("loaded:", it.Txnh.Tid)
return err if err != nil {
// reset .Datah to be invalid (just in case)
it.Datah.Pos = 0
it.Datah.DataLen = 0
} else {
// set .Datah to iterate over .Txnh // set .Datah to iterate over .Txnh
it.Datah.Pos = fsi.txnIter.Txnh.DataPos() it.Datah.Pos = it.Txnh.DataPos()
it.Datah.DataLen = -DataHeaderSize // first iteration will go to first data record it.Datah.DataLen = -DataHeaderSize // first iteration will go to first data record
}
return err
} }
// NextData iterates to next data record header inside current transaction // NextData iterates to next data record header inside current transaction
func (di *DataIter) NextData() error { func (it *Iter) NextData() error {
return di.Datah.LoadNext(di.fsSeq, di.Txnh) return it.Datah.LoadNext(it.fsSeq, &it.Txnh)
} }
// NextTxn iterates to next transaction record and resets data iterator to iterate inside it
func (iter *Iter) NextTxn() error {
err := iter.TxnIter.NextTxn()
if err != nil {
return err
}
}
// IterateRaw ... XXX // IterateRaw ... XXX
func (fs *FileStorage) IterateRaw(dir/*XXX fwd/back*/) *Iter { func (fs *FileStorage) IterateRaw(dir/*XXX fwd/back*/) *Iter {
// when iterating use IO optimized for sequential access // when iterating use IO optimized for sequential access
fsSeq := xbufio.NewSeqReaderAt(fs.file) fsSeq := xbufio.NewSeqReaderAt(fs.file)
// XXX setup .TxnIter.dir and start // XXX setup .TxnIter.dir and start
iter.TxnIter.fsSeq = fsSeq it := &Iter{fsSeq: fsSeq} // XXX ...
iter.DataIter.fsSeq = fsSeq //it.DataIter.Txnh = &iter.txnIter.Txnh
iter.DataIter.Txnh = &iter.txnIter.Txnh return it
return iter
} }
...@@ -1012,30 +998,30 @@ type zIter struct { ...@@ -1012,30 +998,30 @@ type zIter struct {
// NextTxn iterates to next/previous transaction record according to iteration direction // NextTxn iterates to next/previous transaction record according to iteration direction
func (zi *zIter) NextTxn() (*zodb.TxnInfo, zodb.IStorageRecordIterator, error) { func (zi *zIter) NextTxn() (*zodb.TxnInfo, zodb.IStorageRecordIterator, error) {
switch { switch {
case ti.Flags & iterEOF != 0: case zi.Flags & iterEOF != 0:
//println("already eof") //println("already eof")
return io.EOF return nil, nil, io.EOF
// XXX needed? // XXX needed?
case ti.Flags & iterPreloaded != 0: case zi.Flags & iterPreloaded != 0:
// first element is already there - preloaded by who initialized TxnIter // first element is already there - preloaded by who initialized TxnIter
ti.Flags &= ^iterPreloaded zi.Flags &= ^iterPreloaded
//fmt.Println("preloaded:", ti.Txnh.Tid) //fmt.Println("preloaded:", zi.Txnh.Tid)
default: default:
err := zi.iter.NextTxn(LoadAll) err := zi.iter.NextTxn(LoadAll)
// XXX EOF ^^^ is not expected (range pre-cut to valid tids) ? // XXX EOF ^^^ is not expected (range pre-cut to valid tids) ?
if err != nil { if err != nil {
return err return nil, nil, err
} }
} }
// XXX how to make sure last good txnh is preserved? // XXX how to make sure last good txnh is preserved?
if (ti.Flags&iterDir != 0 && ti.Txnh.Tid > ti.TidStop) || if (zi.Flags&iterDir != 0 && zi.iter.Txnh.Tid > zi.TidStop) ||
(ti.Flags&iterDir == 0 && ti.Txnh.Tid < ti.TidStop) { (zi.Flags&iterDir == 0 && zi.iter.Txnh.Tid < zi.TidStop) {
//println("-> EOF") //println("-> EOF")
ti.Flags |= iterEOF zi.Flags |= iterEOF
return io.EOF return nil, nil, io.EOF
} }
return &zi.iter.Txnh.TxnInfo, zi, nil return &zi.iter.Txnh.TxnInfo, zi, nil
...@@ -1055,7 +1041,7 @@ func (zi *zIter) NextData() (*zodb.StorageRecordInformation, error) { ...@@ -1055,7 +1041,7 @@ func (zi *zIter) NextData() (*zodb.StorageRecordInformation, error) {
// - need to use separate dh because of this // - need to use separate dh because of this
zi.dhLoading = zi.iter.Datah zi.dhLoading = zi.iter.Datah
zi.sri.Data = zi.dataBuf zi.sri.Data = zi.dataBuf
err = zi.dhLoading.LoadData(zi.iter.DataIter.fsSeq, &zi.sri.Data) err = zi.dhLoading.LoadData(zi.iter.fsSeq, &zi.sri.Data)
if err != nil { if err != nil {
return nil, err // XXX recheck return nil, err // XXX recheck
} }
......
...@@ -35,17 +35,19 @@ Format is the same as in fsdump/py originally written by Jeremy Hylton: ...@@ -35,17 +35,19 @@ Format is the same as in fsdump/py originally written by Jeremy Hylton:
https://github.com/zopefoundation/ZODB/commit/ddcb46a2 https://github.com/zopefoundation/ZODB/commit/ddcb46a2
https://github.com/zopefoundation/ZODB/commit/4d86e4e0 https://github.com/zopefoundation/ZODB/commit/4d86e4e0
*/ */
func Dump(w io.Writer, path string) (err error) { func Dump(w io.Writer, path string, options DumpOptions) (err error) {
// TODO var d dumper
} if options.Verbose {
d = &dumperVerbose{}
} else {
d = &dumper1{}
}
return dump(w, path, d)
}
// dumper is internal interface to implement various dumping modes type DumpOptions struct {
type dumper interface { Verbose bool // dump in verbose mode
dumpFileHeader(buf *xfmt.Buffer, *fs1.FileHeader) error
dumpTxn(buf *xfmt.Buffer, *fs1.TxnHeader) error
dumpData(buf *xfmt.Buffer, *fs1.DataHeader) error
dumpTxnPost(buf *xfmt.Buffer, *fs1.TxnHeader) error
} }
func dump(w io.Writer, path string, d dumper) (err error) { func dump(w io.Writer, path string, d dumper) (err error) {
...@@ -68,7 +70,7 @@ func dump(w io.Writer, path string, d dumper) (err error) { ...@@ -68,7 +70,7 @@ func dump(w io.Writer, path string, d dumper) (err error) {
return err return err
} }
// make sure to flush buffer if we return prematurely with an error // make sure to flush buffer if we return prematurely e.g. with an error
defer func() { defer func() {
err2 := flushBuf() err2 := flushBuf()
err = xerr.First(err, err2) err = xerr.First(err, err2)
...@@ -78,17 +80,15 @@ func dump(w io.Writer, path string, d dumper) (err error) { ...@@ -78,17 +80,15 @@ func dump(w io.Writer, path string, d dumper) (err error) {
it := fs.IterateRaw(fwd) it := fs.IterateRaw(fwd)
loop:
for i := 0; ; i++ { for i := 0; ; i++ {
err = it.NextTxn(fs1.LoadAll) err = it.NextTxn(fs1.LoadAll)
if err != nil { if err != nil {
if err == io.EOF { if err == io.EOF {
err = nil // XXX -> okEOF(err) err = nil // XXX -> okEOF(err)
} }
break return err
} }
// txn header
d.dumpTxn(buf, it.Txnh) // XXX err d.dumpTxn(buf, it.Txnh) // XXX err
for j := 0; ; j++ { for j := 0; ; j++ {
...@@ -98,10 +98,9 @@ loop: ...@@ -98,10 +98,9 @@ loop:
err = nil // XXX -> okEOF(err) err = nil // XXX -> okEOF(err)
break break
} }
break loop // XXX -> better just return ? return err
} }
// data record
d.dumpData(buf, it.Datah) // XXX err d.dumpData(buf, it.Datah) // XXX err
} }
...@@ -115,6 +114,14 @@ loop: ...@@ -115,6 +114,14 @@ loop:
} }
} }
// dumper is internal interface to implement various dumping modes
type dumper interface {
dumpFileHeader(buf *xfmt.Buffer, *fs1.FileHeader) error
dumpTxn(buf *xfmt.Buffer, *fs1.TxnHeader) error
dumpData(buf *xfmt.Buffer, *fs1.DataHeader) error
dumpTxnPost(buf *xfmt.Buffer, *fs1.TxnHeader) error
}
// "normal" dumper // "normal" dumper
type dumper1 struct { type dumper1 struct {
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment