Commit 375efa1e authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent f44b8da3
......@@ -124,14 +124,13 @@ func (e *ErrXidLoad) Error() string {
}
// freelist(DataHeader) XXX move -> format.go ?
// freelist(DataHeader)
var dhPool = sync.Pool{New: func() interface{} { return &DataHeader{} }}
// DataHeaderAlloc allocates DataHeader from freelist.
func DataHeaderAlloc() *DataHeader {
return dhPool.Get().(*DataHeader)
}
// Free puts dh back into DataHeader freelist.
//
// Caller must not use dh after call to Free.
......
......@@ -87,61 +87,56 @@ const (
lenIterStart int64 = -0x1111111111111112 // = 0xeeeeeeeeeeeeeeee if unsigned
)
// TxnError is returned on transaction record read / decode errors
type TxnError struct {
Pos int64 // position of transaction record
Subj string // about what .Err is
// RecordError represents error associated with operation on a record in
// FileStorage data file.
type RecordError struct {
Path string // path of the data file
Record string // record kind - "file header", "transaction record", "data record", ...
Pos int64 // position of record
Subj string // subject context for the error - e.g. "read", "check" or "bug"
Err error // actual error
}
func (e *TxnError) Error() string {
return fmt.Sprintf("transaction record @%v: %v: %v", e.Pos, e.Subj, e.Err)
func (e *RecordError) Cause() error {
return e.Err
}
// err creates TxnError for transaction located at txnh.Pos
func (txnh *TxnHeader) err(subj string, err error) error {
return &TxnError{txnh.Pos, subj, err}
func (e *RecordError) Error() string {
// XXX omit path: when .Err already contains it (e.g. when it is os.PathError)?
return fmt.Sprintf("%s: %s @%d: %s: %s", e.Path, e.Record, e.Pos, e.Subj, e.Err)
}
// DataError is returned on data record read / decode errors
type DataError struct {
Pos int64 // position of data record
Subj string // about what .Err is
Err error // actual error
// err creates RecordError for transaction located at txnh.Pos
func (txnh *TxnHeader) err(r io.ReaderAt, subj string, err error) error {
return &RecordError{xio.Name(r), "transaction record", txnh.Pos, subj, err}
}
func (e *DataError) Error() string {
return fmt.Sprintf("data record @%v: %v: %v", e.Pos, e.Subj, e.Err)
}
// err creates DataError for data record located at dh.Pos
//
// XXX add link to containing txn? (check whether we can do it on data access) ?
func (dh *DataHeader) err(subj string, err error) error {
return &DataError{dh.Pos, subj, err}
// err creates RecordError for data record located at dh.Pos
func (dh *DataHeader) err(r io.ReaderAt, subj string, err error) error {
return &RecordError{xio.Name(r), "data record", dh.Pos, subj, err}
}
// ierr is an interface for something which can create errors.
// it is used by TxnHeader and DataHeader to create appropriate errors with their context.
type ierr interface {
err(subj string, err error) error
err(r io.ReaderAt, subj string, err error) error
}
// errf is syntactic shortcut for err and fmt.Errorf
func errf(e ierr, subj, format string, a ...interface{}) error {
return e.err(subj, fmt.Errorf(format, a...))
func errf(r io.ReaderAt, e ierr, subj, format string, a ...interface{}) error {
return e.err(r, subj, fmt.Errorf(format, a...))
}
// checkErr is syntactic shortcut for errf("check", ...)
func checkErr(e ierr, format string, a ...interface{}) error {
return errf(e, "check", format, a...)
func checkErr(r io.ReaderAt, e ierr, format string, a ...interface{}) error {
return errf(r, e, "check", format, a...)
}
// bug panics with errf("bug", ...)
func bug(e ierr, format string, a ...interface{}) {
panic(errf(e, "bug", format, a...))
func bug(r io.ReaderAt, e ierr, format string, a ...interface{}) {
panic(errf(r, e, "bug", format, a...))
}
......@@ -236,7 +231,7 @@ func (txnh *TxnHeader) Load(r io.ReaderAt, pos int64, flags TxnLoadFlags) error
txnh.LenPrev = 0 // read error
if pos < txnValidFrom {
bug(txnh, "Load() on invalid position")
bug(r, txnh, "Load() on invalid position")
}
var n int
......@@ -249,14 +244,14 @@ func (txnh *TxnHeader) Load(r io.ReaderAt, pos int64, flags TxnLoadFlags) error
if n >= 0 {
lenPrev := 8 + int64(binary.BigEndian.Uint64(work[8-8:]))
if lenPrev < TxnHeaderFixSize {
return checkErr(txnh, "invalid prev record length: %v", lenPrev)
return checkErr(r, txnh, "invalid prev record length: %v", lenPrev)
}
posPrev := txnh.Pos - lenPrev
if posPrev < txnValidFrom {
return checkErr(txnh, "prev record length goes beyond valid area: %v", lenPrev)
return checkErr(r, txnh, "prev record length goes beyond valid area: %v", lenPrev)
}
if posPrev < txnValidFrom + TxnHeaderFixSize && posPrev != txnValidFrom {
return checkErr(txnh, "prev record does not land exactly at valid area start: %v", posPrev)
return checkErr(r, txnh, "prev record does not land exactly at valid area start: %v", posPrev)
}
txnh.LenPrev = lenPrev
}
......@@ -275,24 +270,24 @@ func (txnh *TxnHeader) Load(r io.ReaderAt, pos int64, flags TxnLoadFlags) error
// EOF after txn header is not good - because at least
// redundant length should be also there
return txnh.err("read", noEOF(err))
return txnh.err(r, "read", noEOF(err))
}
txnh.Tid = zodb.Tid(binary.BigEndian.Uint64(work[8+0:]))
if !txnh.Tid.Valid() {
return checkErr(txnh, "invalid tid: %v", txnh.Tid)
return checkErr(r, txnh, "invalid tid: %v", txnh.Tid)
}
tlen := 8 + int64(binary.BigEndian.Uint64(work[8+8:]))
if tlen < TxnHeaderFixSize {
return checkErr(txnh, "invalid txn record length: %v", tlen)
return checkErr(r, txnh, "invalid txn record length: %v", tlen)
}
// XXX also check tlen to not go beyond file size ?
// txnh.Len will be set =tlen at last - after checking other fields for correctness.
txnh.Status = zodb.TxnStatus(work[8+16])
if !txnh.Status.Valid() {
return checkErr(txnh, "invalid status: %q", txnh.Status)
return checkErr(r, txnh, "invalid status: %q", txnh.Status)
}
......@@ -302,7 +297,7 @@ func (txnh *TxnHeader) Load(r io.ReaderAt, pos int64, flags TxnLoadFlags) error
lstr := int(luser) + int(ldesc) + int(lext)
if TxnHeaderFixSize + int64(lstr) + 8 > tlen {
return checkErr(txnh, "strings overlap with txn boundary: %v / %v", lstr, tlen)
return checkErr(r, txnh, "strings overlap with txn boundary: %v / %v", lstr, tlen)
}
// set .Len at last after doing all header checks
......@@ -338,7 +333,7 @@ func (txnh *TxnHeader) loadStrings(r io.ReaderAt) error {
// we rely on Load leaving len(workMem) = sum of all strings length ...
_, err := r.ReadAt(txnh.workMem, txnh.Pos + TxnHeaderFixSize)
if err != nil {
return txnh.err("read strings", noEOF(err))
return txnh.err(r, "read strings", noEOF(err))
}
// ... and presetting x to point to appropriate places in .workMem .
......@@ -360,7 +355,7 @@ func (txnh *TxnHeader) LoadPrev(r io.ReaderAt, flags TxnLoadFlags) error {
lenPrev := txnh.LenPrev
switch lenPrev {
case 0:
bug(txnh, "LoadPrev() when .LenPrev == error")
bug(r, txnh, "LoadPrev() when .LenPrev == error")
case -1:
return io.EOF
......@@ -387,18 +382,18 @@ func (txnh *TxnHeader) LoadPrev(r io.ReaderAt, flags TxnLoadFlags) error {
if err != nil {
// EOF forward is unexpected here
if err == io.EOF {
err = txnh.err("read", io.ErrUnexpectedEOF)
err = txnh.err(r, "read", io.ErrUnexpectedEOF)
}
return err
}
if txnh.Len != lenPrev {
return checkErr(txnh, "head/tail lengths mismatch: %v, %v", txnh.Len, lenPrev)
return checkErr(r, txnh, "head/tail lengths mismatch: %v, %v", txnh.Len, lenPrev)
}
// check tid↓ if we had txnh for "cur" loaded
if lenCur > 0 && txnh.Tid >= tidCur {
return checkErr(txnh, "tid monitonity broken: %v ; next: %v", txnh.Tid, tidCur)
return checkErr(r, txnh, "tid monitonity broken: %v ; next: %v", txnh.Tid, tidCur)
}
return nil
......@@ -412,7 +407,7 @@ func (txnh *TxnHeader) LoadNext(r io.ReaderAt, flags TxnLoadFlags) error {
posCur := txnh.Pos
switch lenCur {
case 0:
bug(txnh, "LoadNext() when .Len == error")
bug(r, txnh, "LoadNext() when .Len == error")
case -1:
return io.EOF
......@@ -430,7 +425,7 @@ func (txnh *TxnHeader) LoadNext(r io.ReaderAt, flags TxnLoadFlags) error {
// NOTE also: err could be EOF
if txnh.LenPrev != 0 && txnh.LenPrev != lenCur {
t := &TxnHeader{Pos: posCur} // txn for which we discovered problem
return checkErr(t, "head/tail lengths mismatch: %v, %v", lenCur, txnh.LenPrev)
return checkErr(r, t, "head/tail lengths mismatch: %v, %v", lenCur, txnh.LenPrev)
}
if err != nil {
......@@ -439,7 +434,7 @@ func (txnh *TxnHeader) LoadNext(r io.ReaderAt, flags TxnLoadFlags) error {
// check tid↑
if txnh.Tid <= tidCur {
return checkErr(txnh, "tid↑ broken: %v ; prev: %v", txnh.Tid, tidCur)
return checkErr(r, txnh, "tid↑ broken: %v ; prev: %v", txnh.Tid, tidCur)
}
return nil
......@@ -468,48 +463,48 @@ func (dh *DataHeader) Load(r io.ReaderAt, pos int64) error {
dh.Pos = -1 // ~ error
if pos < dataValidFrom {
bug(dh, "Load() on invalid position")
bug(r, dh, "Load() on invalid position")
}
_, err := r.ReadAt(dh.workMem[:], pos)
if err != nil {
return dh.err("read", noEOF(err))
return dh.err(r, "read", noEOF(err))
}
// XXX also check oid.Valid() ?
dh.Oid = zodb.Oid(binary.BigEndian.Uint64(dh.workMem[0:])) // XXX -> zodb.Oid.Decode() ?
dh.Tid = zodb.Tid(binary.BigEndian.Uint64(dh.workMem[8:])) // XXX -> zodb.Tid.Decode() ?
if !dh.Tid.Valid() {
return checkErr(dh, "invalid tid: %v", dh.Tid)
return checkErr(r, dh, "invalid tid: %v", dh.Tid)
}
dh.PrevRevPos = int64(binary.BigEndian.Uint64(dh.workMem[16:]))
dh.TxnPos = int64(binary.BigEndian.Uint64(dh.workMem[24:]))
if dh.TxnPos < txnValidFrom {
return checkErr(dh, "invalid txn position: %v", dh.TxnPos)
return checkErr(r, dh, "invalid txn position: %v", dh.TxnPos)
}
if dh.TxnPos + TxnHeaderFixSize > pos {
return checkErr(dh, "txn position not decreasing: %v", dh.TxnPos)
return checkErr(r, dh, "txn position not decreasing: %v", dh.TxnPos)
}
if dh.PrevRevPos != 0 { // zero means there is no previous revision
if dh.PrevRevPos < dataValidFrom {
return checkErr(dh, "invalid prev revision position: %v", dh.PrevRevPos)
return checkErr(r, dh, "invalid prev revision position: %v", dh.PrevRevPos)
}
if dh.PrevRevPos + DataHeaderSize > dh.TxnPos - 8 {
return checkErr(dh, "prev revision position (%v) overlaps with txn (%v)", dh.PrevRevPos, dh.TxnPos)
return checkErr(r, dh, "prev revision position (%v) overlaps with txn (%v)", dh.PrevRevPos, dh.TxnPos)
}
}
verlen := binary.BigEndian.Uint16(dh.workMem[32:])
if verlen != 0 {
return checkErr(dh, "non-zero version: #%v", verlen)
return checkErr(r, dh, "non-zero version: #%v", verlen)
}
dh.DataLen = int64(binary.BigEndian.Uint64(dh.workMem[34:]))
if dh.DataLen < 0 {
// XXX also check DataLen < max ?
return checkErr(dh, "invalid data len: %v", dh.DataLen)
return checkErr(r, dh, "invalid data len: %v", dh.DataLen)
}
dh.Pos = pos
......@@ -531,7 +526,8 @@ func (dh *DataHeader) LoadPrevRev(r io.ReaderAt) error {
err := dh.loadPrevRev(r, dh.PrevRevPos)
if err != nil {
// data record @...: -> (prev rev): data record @...: ...
err = &DataError{posCur, "-> (prev rev)", err}
// XXX dup wrt DataHeader.err
err = &RecordError{xio.Name(r), "data record", posCur, "-> (prev rev)", err}
}
return err
}
......@@ -547,11 +543,11 @@ func (dh *DataHeader) loadPrevRev(r io.ReaderAt, prevPos int64) error {
}
if dh.Oid != oid {
return checkErr(dh, "oid mismatch: %s -> %s", oid, dh.Oid)
return checkErr(r, dh, "oid mismatch: %s -> %s", oid, dh.Oid)
}
if dh.Tid >= tid {
return checkErr(dh, "tid not ↓: %s -> %s", tid, dh.Tid)
return checkErr(r, dh, "tid not ↓: %s -> %s", tid, dh.Tid)
}
return nil
......@@ -562,20 +558,20 @@ func (dh *DataHeader) loadPrevRev(r io.ReaderAt, prevPos int64) error {
// Prerequisite: dh loaded and .LenData == 0 (data record with back-pointer).
func (dh *DataHeader) LoadBackRef(r io.ReaderAt) (backPos int64, err error) {
if dh.DataLen != 0 {
bug(dh, "LoadBack() on non-backpointer data header")
bug(r, dh, "LoadBack() on non-backpointer data header")
}
_, err = r.ReadAt(dh.workMem[:8], dh.Pos + DataHeaderSize)
if err != nil {
return 0, dh.err("read data", noEOF(err))
return 0, dh.err(r, "read data", noEOF(err))
}
backPos = int64(binary.BigEndian.Uint64(dh.workMem[0:]))
if !(backPos == 0 || backPos >= dataValidFrom) {
return 0, checkErr(dh, "invalid backpointer: %v", backPos)
return 0, checkErr(r, dh, "invalid backpointer: %v", backPos)
}
if backPos + DataHeaderSize > dh.TxnPos - 8 {
return 0, checkErr(dh, "backpointer (%v) overlaps with txn (%v)", backPos, dh.TxnPos)
return 0, checkErr(r, dh, "backpointer (%v) overlaps with txn (%v)", backPos, dh.TxnPos)
}
return backPos, nil
......@@ -601,7 +597,8 @@ func (dh *DataHeader) LoadBack(r io.ReaderAt) error {
err = dh.loadPrevRev(r, backPos)
if err != nil {
// data record @...: -> (prev rev): data record @...: ...
err = &DataError{posCur, "-> (back)", err}
// XXX dup wrt DataHeader.err
err = &RecordError{xio.Name(r), "data record", posCur, "-> (back)", err}
}
return err
......@@ -615,7 +612,7 @@ func (dh *DataHeader) LoadBack(r io.ReaderAt) error {
func (dh *DataHeader) LoadNext(r io.ReaderAt, txnh *TxnHeader) error {
err := dh.loadNext(r, txnh)
if err != nil && err != io.EOF {
err = txnh.err("-> (iter data)", err)
err = txnh.err(r, "-> (iter data)", err)
}
return err
}
......@@ -632,7 +629,8 @@ func (dh *DataHeader) loadNext(r io.ReaderAt, txnh *TxnHeader) error {
}
if nextPos + DataHeaderSize > txnTailPos {
return &DataError{nextPos, "check",
// XXX dup wrt DataHeader.err
return &RecordError{xio.Name(r), "data record", nextPos, "check",
fmt.Errorf("data record header [..., %d] overlaps txn boundary [..., %d)",
nextPos + DataHeaderSize, txnTailPos)}
}
......@@ -643,13 +641,13 @@ func (dh *DataHeader) loadNext(r io.ReaderAt, txnh *TxnHeader) error {
}
if dh.Tid != txnh.Tid {
return checkErr(dh, "tid mismatch: %s -> %s", txnh.Tid, dh.Tid)
return checkErr(r, dh, "tid mismatch: %s -> %s", txnh.Tid, dh.Tid)
}
if dh.TxnPos != txnh.Pos {
return checkErr(dh, "txn position not pointing back: %d", dh.TxnPos)
return checkErr(r, dh, "txn position not pointing back: %d", dh.TxnPos)
}
if dh.Pos + dh.Len() > txnTailPos {
return checkErr(dh, "data record [..., %d) overlaps txn boundary [..., %d)",
return checkErr(r, dh, "data record [..., %d) overlaps txn boundary [..., %d)",
dh.Pos + dh.Len(), txnTailPos)
}
......@@ -677,7 +675,7 @@ func (dh *DataHeader) LoadData(r io.ReaderAt) (*zodb.Buf, error) {
_, err := r.ReadAt(buf.Data, dh.Pos + DataHeaderSize)
if err != nil {
buf.Release()
return nil, dh.err("read data", noEOF(err))
return nil, dh.err(r, "read data", noEOF(err))
}
return buf, nil
......
......@@ -29,6 +29,7 @@ import (
"lab.nexedi.com/kirr/neo/go/zodb"
"lab.nexedi.com/kirr/neo/go/zodb/storage/fs1"
"lab.nexedi.com/kirr/neo/go/xcommon/xio"
"lab.nexedi.com/kirr/go123/prog"
"lab.nexedi.com/kirr/go123/xbytes"
......@@ -362,7 +363,8 @@ func (d *DumperFsTail) DumpTxn(buf *xfmt.Buffer, it *fs1.Iter) error {
if err == io.EOF {
err = io.ErrUnexpectedEOF // XXX -> noEOF(err)
}
return &fs1.TxnError{txnh.Pos, "read data payload", err}
// XXX dup wrt fs1.TxnHeader.err
return &fs1.RecordError{xio.Name(it.R), "transaction record", txnh.Pos, "read data payload", err}
}
// print information about read txn record
......
......@@ -62,12 +62,13 @@ def main():
txnLenPrev = -1
for txn in stor.iterator(): # txn is TransactionRecord
# txn.extension is already depickled dict - we want to put raw data from file
# also we need to access txn record legth which is not provided by higher-level iterator
# also we need to access txn record length which is not provided by higher-level iterator
# do deep-dive into FileStorage
th = stor._read_txn_header(txn._tpos)
assert th.tid == txn.tid
assert th.tlen == txn._tend - txn._tpos
# fs1/go keeps in RAM whole txn length, not len-8 as it is on disk
txnLen = th.tlen + 8
emit("\t{")
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment