// Copyright (C) 2017-2018 Nexedi SA and Contributors. // Kirill Smelkov <kirr@nexedi.com> // // This program is free software: you can Use, Study, Modify and Redistribute // it under the terms of the GNU General Public License version 3, or (at your // option) any later version, as published by the Free Software Foundation. // // You can also Link and Combine this program with other software covered by // the terms of any of the Free Software licenses or any of the Open Source // Initiative approved licenses and Convey the resulting work. Corresponding // source of such a combination shall include the source code for all other // software used. // // This program is distributed WITHOUT ANY WARRANTY; without even the implied // warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. // // See COPYING file for full licensing terms. // See https://www.nexedi.com/licensing for rationale and options. package fs1 // records definition + basic operations on them import ( "encoding/binary" "fmt" "io" "os" "lab.nexedi.com/kirr/neo/go/zodb" "lab.nexedi.com/kirr/go123/mem" "lab.nexedi.com/kirr/go123/xbytes" ) // FileHeader represents header of whole data file. type FileHeader struct { Magic [4]byte } // TxnHeader represents header of a transaction record. type TxnHeader struct { Pos int64 // position of transaction start LenPrev int64 // whole previous transaction record length (see EOF/error rules in Load) Len int64 // whole transaction record length (see EOF/error rules in Load) // transaction metadata itself zodb.TxnInfo // underlying memory for header loading and for user/desc/extension strings // invariant: after successful TxnHeader load len(.workMem) = lenUser + lenDesc + lenExt // as specified by on-disk header. workMem []byte } // DataHeader represents header of a data record. type DataHeader struct { Pos int64 // position of data record start Oid zodb.Oid Tid zodb.Tid PrevRevPos int64 // position of this oid's previous-revision data record TxnPos int64 // position of transaction record this data record belongs to //_ uint16 // 2-bytes with zero values. (Was version length.) DataLen int64 // length of following data. if 0 -> following = 8 bytes backpointer // if backpointer == 0 -> oid deleted // underlying memory for header loading (to avoid allocations) workMem [DataHeaderSize]byte } const ( Magic = "FS21" // every FileStorage file starts with this // on-disk sizes FileHeaderSize = 4 TxnHeaderFixSize = 8 + 8 + 1 + 2 + 2 + 2 // without user/desc/ext strings txnXHeaderFixSize = 8 + TxnHeaderFixSize // ^^^ with trail LenPrev from previous record DataHeaderSize = 8 + 8 + 8 + 8 + 2 + 8 // txn/data pos that if < vvv are for sure invalid txnValidFrom = FileHeaderSize dataValidFrom = txnValidFrom + TxnHeaderFixSize // invalid length that indicates start of iteration for TxnHeader LoadNext/LoadPrev // NOTE load routines check loaded fields to be valid and .Len in particular, // so it can never come to us from outside to be this. lenIterStart int64 = -0x1111111111111112 // = 0xeeeeeeeeeeeeeeee if unsigned ) // RecordError represents error associated with operation on a record in // FileStorage data file. type RecordError struct { Path string // path of the data file; present if used IO object was with .Name() Record string // record kind - "file header", "transaction record", "data record", ... Pos int64 // position of record Subj string // subject context for the error - e.g. "read" or "check" Err error // actual error } func (e *RecordError) Cause() error { return e.Err } func (e *RecordError) Error() string { // XXX omit path: when .Err already contains it (e.g. when it is os.PathError)? prefix := e.Path if prefix != "" { prefix += ": " } return fmt.Sprintf("%s%s @%d: %s: %s", prefix, e.Record, e.Pos, e.Subj, e.Err) } // err creates RecordError for transaction located at txnh.Pos in f. func (txnh *TxnHeader) err(f interface{}, subj string, err error) error { return &RecordError{ioname(f), "transaction record", txnh.Pos, subj, err} } // err creates RecordError for data record located at dh.Pos in f. func (dh *DataHeader) err(f interface{}, subj string, err error) error { return &RecordError{ioname(f), "data record", dh.Pos, subj, err} } // ierr is an interface for something which can create errors. // // it is used by TxnHeader and DataHeader to create appropriate errors with their context. type ierr interface { err(f interface{}, subj string, err error) error } // errf is syntactic shortcut for err and fmt.Errorf. func errf(f interface{}, e ierr, subj, format string, a ...interface{}) error { return e.err(f, subj, fmt.Errorf(format, a...)) } // checkErr is syntactic shortcut for errf("check", ...) func checkErr(f interface{}, e ierr, format string, a ...interface{}) error { return errf(f, e, "check", format, a...) } // bug panics with errf("bug", ...) func bug(f interface{}, e ierr, format string, a ...interface{}) { panic(errf(f, e, "bug", format, a...)) } // --- File header --- // Load reads and decodes file header. func (fh *FileHeader) Load(r io.ReaderAt) error { _, err := r.ReadAt(fh.Magic[:], 0) err = okEOF(err) if err != nil { return fmt.Errorf("%sread magic: %s", ioprefix(r), err) } if string(fh.Magic[:]) != Magic { return fmt.Errorf("%sinvalid fs1 magic %q", ioprefix(r), fh.Magic) } return nil } // --- Transaction record --- // HeaderLen returns whole transaction header length including its variable part. // // NOTE: data records start right after transaction header. func (txnh *TxnHeader) HeaderLen() int64 { return TxnHeaderFixSize + int64(len(txnh.workMem)) } // DataPos returns start position of data inside transaction record. func (txnh *TxnHeader) DataPos() int64 { return txnh.Pos + txnh.HeaderLen() } // DataLen returns length of all data inside transaction record container. func (txnh *TxnHeader) DataLen() int64 { return txnh.Len - txnh.HeaderLen() - 8 /* trailer redundant length */ } // CloneFrom copies txnh2 to txnh making sure underlying slices (.workMem .User // .Desc ...) are not shared. func (txnh *TxnHeader) CloneFrom(txnh2 *TxnHeader) { workMem := txnh.workMem lwork2 := len(txnh2.workMem) workMem = xbytes.Realloc(workMem, lwork2) *txnh = *txnh2 // now unshare slices txnh.workMem = workMem copy(workMem, txnh2.workMem) luser := cap(txnh2.User) xdesc := luser + cap(txnh2.Description) xext := xdesc + cap(txnh2.Extension) txnh.User = workMem[0:0:luser] [:len(txnh2.User)] txnh.Description = workMem[luser:luser:xdesc] [:len(txnh2.Description)] txnh.Extension = workMem[xdesc:xdesc:xext] [:len(txnh2.Extension)] } // flags for TxnHeader.Load type TxnLoadFlags int const ( LoadAll TxnLoadFlags = 0x00 // load whole transaction header LoadNoStrings = 0x01 // do not load user/desc/ext strings ) // Load reads and decodes transaction record header @ pos. // // Both transaction header starting at pos, and redundant length of previous // transaction are loaded. The data read is verified for consistency lightly. // // No prerequisite requirements are made to previous txnh state. // // Rules for .Len/.LenPrev returns: // // .Len = -1 transaction header could not be read // .Len = 0 EOF forward // .Len > 0 transaction was read normally // // .LenPrev = -1 prev record length could not be read // .LenPrev = 0 EOF backward // .LenPrev > 0 LenPrev was read/checked normally // // For example when pos points to the end of file .Len will be returned = -1, but // .LenPrev will be usually valid if file has at least 1 transaction. func (txnh *TxnHeader) Load(r io.ReaderAt, pos int64, flags TxnLoadFlags) error { if cap(txnh.workMem) < txnXHeaderFixSize { txnh.workMem = make([]byte, txnXHeaderFixSize, 256 /* to later avoid allocation for typical strings */) } work := txnh.workMem[:txnXHeaderFixSize] txnh.Pos = pos txnh.Len = -1 // read error txnh.LenPrev = -1 // read error if pos < txnValidFrom { bug(r, txnh, "Load() on invalid position") } var n int var err error if pos - 8 >= txnValidFrom { // read together with previous's txn record redundant length n, err = r.ReadAt(work, pos - 8) n -= 8 // relative to pos if n >= 0 { lenPrev := 8 + int64(binary.BigEndian.Uint64(work[8-8:])) if lenPrev < TxnHeaderFixSize { return checkErr(r, txnh, "invalid prev record length: %v", lenPrev) } posPrev := txnh.Pos - lenPrev if posPrev < txnValidFrom { return checkErr(r, txnh, "prev record length goes beyond valid area: %v", lenPrev) } if posPrev < txnValidFrom + TxnHeaderFixSize && posPrev != txnValidFrom { return checkErr(r, txnh, "prev record does not land exactly at valid area start: %v", posPrev) } txnh.LenPrev = lenPrev } } else { // read only current txn without previous record length n, err = r.ReadAt(work[8:], pos) txnh.LenPrev = 0 // EOF backward } if err != nil { if err == io.EOF && n == 0 { txnh.Len = 0 // EOF forward return err // end of stream } // EOF after txn header is not good - because at least // redundant length should be also there return txnh.err(r, "read", noEOF(err)) } txnh.Tid = zodb.Tid(binary.BigEndian.Uint64(work[8+0:])) if !txnh.Tid.Valid() { return checkErr(r, txnh, "invalid tid: %v", txnh.Tid) } tlen := 8 + int64(binary.BigEndian.Uint64(work[8+8:])) if tlen < TxnHeaderFixSize { return checkErr(r, txnh, "invalid txn record length: %v", tlen) } // NOTE no need to check tlen does not go beyon file size - loadStrings/LoadData or LoadNext will catch it. // txnh.Len will be set =tlen at last - after checking other fields for correctness. txnh.Status = zodb.TxnStatus(work[8+16]) if !txnh.Status.Valid() { return checkErr(r, txnh, "invalid status: %q", txnh.Status) } luser := binary.BigEndian.Uint16(work[8+17:]) ldesc := binary.BigEndian.Uint16(work[8+19:]) lext := binary.BigEndian.Uint16(work[8+21:]) lstr := int(luser) + int(ldesc) + int(lext) if TxnHeaderFixSize + int64(lstr) + 8 > tlen { return checkErr(r, txnh, "strings overlap with txn boundary: %v / %v", lstr, tlen) } // set .Len at last after doing all header checks // this way we make sure we never return bad fixed TxnHeader with non-error .Len txnh.Len = tlen // NOTE we encode whole strings length into len(.workMem) txnh.workMem = xbytes.Realloc(txnh.workMem, lstr) // NOTE we encode each x string length into cap(x) // and set len(x) = 0 to indicate x is not loaded yet //println("workmem len:", len(txnh.workMem), "cap:", cap(txnh.workMem)) //println("luser:", luser) //println("ldesc:", ldesc) //println("lext: ", lext) xdesc := luser + ldesc xext := xdesc + lext txnh.User = txnh.workMem[0:0:luser] txnh.Description = txnh.workMem[luser:luser:xdesc] txnh.Extension = txnh.workMem[xdesc:xdesc:xext] if flags & LoadNoStrings == 0 { err = txnh.loadStrings(r) } return err } // loadStrings makes sure strings that are part of transaction header are loaded. func (txnh *TxnHeader) loadStrings(r io.ReaderAt) error { // XXX make it no-op if strings are already loaded? // we rely on Load leaving len(workMem) = sum of all strings length ... _, err := r.ReadAt(txnh.workMem, txnh.Pos + TxnHeaderFixSize) if err != nil { return txnh.err(r, "read strings", noEOF(err)) } // ... and presetting x to point to appropriate places in .workMem . // so set len(x) = cap(x) to indicate strings are now loaded. txnh.User = txnh.User[:cap(txnh.User)] txnh.Description = txnh.Description[:cap(txnh.Description)] txnh.Extension = txnh.Extension[:cap(txnh.Extension)] return nil } // LoadPrev reads and decodes previous transaction record header. // // prerequisites: // // - txnh .Pos, .LenPrev are initialized // - optionally if .Len is initialized and txnh was loaded - tid↓ will be also checked func (txnh *TxnHeader) LoadPrev(r io.ReaderAt, flags TxnLoadFlags) error { lenPrev := txnh.LenPrev switch lenPrev { case -1: bug(r, txnh, "LoadPrev() when .LenPrev == error") case 0: return io.EOF case lenIterStart: // start of iteration backward: // read LenPrev @pos, then tail to LoadPrev pos := txnh.Pos err := txnh.Load(r, pos, flags) if txnh.LenPrev == -1 { if err == nil { panic("nil err with txnh.LenPrev = error") } return err } // we do not care if it was error above as long as txnh.LenPrev could be read. return txnh.LoadPrev(r, flags) } lenCur := txnh.Len tidCur := txnh.Tid // here we know: Load already checked txnh.Pos - lenPrev to be valid position err := txnh.Load(r, txnh.Pos - lenPrev, flags) if err != nil { // EOF forward is unexpected here if err == io.EOF { err = txnh.err(r, "read", io.ErrUnexpectedEOF) } return err } if txnh.Len != lenPrev { return checkErr(r, txnh, "head/tail lengths mismatch: %v, %v", txnh.Len, lenPrev) } // check tid↓ if we had txnh for "cur" loaded if lenCur > 0 && txnh.Tid >= tidCur { return checkErr(r, txnh, "tid monitonity broken: %v ; next: %v", txnh.Tid, tidCur) } return nil } // LoadNext reads and decodes next transaction record header. // // prerequisite: txnh .Pos, .Len and .Tid should be already initialized. func (txnh *TxnHeader) LoadNext(r io.ReaderAt, flags TxnLoadFlags) error { lenCur := txnh.Len posCur := txnh.Pos switch lenCur { case -1: bug(r, txnh, "LoadNext() when .Len == error") case 0: return io.EOF case lenIterStart: // start of iteration forward return txnh.Load(r, posCur, flags) } // valid .Len means txnh was read ok tidCur := txnh.Tid err := txnh.Load(r, txnh.Pos + lenCur, flags) // before checking loading error for next txn, let's first check redundant length // NOTE also: err could be EOF if txnh.LenPrev >= 0 && txnh.LenPrev != lenCur { t := &TxnHeader{Pos: posCur} // txn for which we discovered problem return checkErr(r, t, "head/tail lengths mismatch: %v, %v", lenCur, txnh.LenPrev) } if err != nil { return err } // check tid↑ if txnh.Tid <= tidCur { return checkErr(r, txnh, "tid↑ broken: %v ; prev: %v", txnh.Tid, tidCur) } return nil } // --- Data record --- // Len returns whole data record length. func (dh *DataHeader) Len() int64 { dataLen := dh.DataLen if dataLen == 0 { dataLen = 8 // back-pointer | oid removal } return DataHeaderSize + dataLen } // Load reads and decodes data record header @ pos. // // Only the data record header is loaded, not data itself. // See LoadData for actually loading record's data. // // No prerequisite requirements are made to previous dh state. func (dh *DataHeader) Load(r io.ReaderAt, pos int64) error { dh.Pos = -1 // ~ error if pos < dataValidFrom { bug(r, dh, "Load() on invalid position") } _, err := r.ReadAt(dh.workMem[:], pos) if err != nil { return dh.err(r, "read", noEOF(err)) } // XXX also check oid.Valid() ? dh.Oid = zodb.Oid(binary.BigEndian.Uint64(dh.workMem[0:])) dh.Tid = zodb.Tid(binary.BigEndian.Uint64(dh.workMem[8:])) if !dh.Tid.Valid() { return checkErr(r, dh, "invalid tid: %v", dh.Tid) } dh.PrevRevPos = int64(binary.BigEndian.Uint64(dh.workMem[16:])) dh.TxnPos = int64(binary.BigEndian.Uint64(dh.workMem[24:])) if dh.TxnPos < txnValidFrom { return checkErr(r, dh, "invalid txn position: %v", dh.TxnPos) } if dh.TxnPos + TxnHeaderFixSize > pos { return checkErr(r, dh, "txn position not decreasing: %v", dh.TxnPos) } if dh.PrevRevPos != 0 { // zero means there is no previous revision if dh.PrevRevPos < dataValidFrom { return checkErr(r, dh, "invalid prev revision position: %v", dh.PrevRevPos) } if dh.PrevRevPos + DataHeaderSize > dh.TxnPos - 8 { return checkErr(r, dh, "prev revision position (%v) overlaps with txn (%v)", dh.PrevRevPos, dh.TxnPos) } } verlen := binary.BigEndian.Uint16(dh.workMem[32:]) if verlen != 0 { return checkErr(r, dh, "non-zero version: #%v", verlen) } dh.DataLen = int64(binary.BigEndian.Uint64(dh.workMem[34:])) if dh.DataLen < 0 { // XXX also check DataLen < max ? return checkErr(r, dh, "invalid data len: %v", dh.DataLen) } dh.Pos = pos return nil } // LoadPrevRev reads and decodes previous revision data record header. // // Prerequisite: dh .Oid .Tid .PrevRevPos are initialized. // // When there is no previous revision: io.EOF is returned. func (dh *DataHeader) LoadPrevRev(r io.ReaderAt) error { if dh.PrevRevPos == 0 { return io.EOF // no more previous revisions } posCur := dh.Pos err := dh.loadPrevRev(r, dh.PrevRevPos) if err != nil { // data record @...: -> (prev rev): data record @...: ... // XXX dup wrt DataHeader.err err = &RecordError{ioname(r), "data record", posCur, "-> (prev rev)", err} } return err } // worker for LoadPrevRev and LoadBack func (dh *DataHeader) loadPrevRev(r io.ReaderAt, prevPos int64) error { oid := dh.Oid tid := dh.Tid err := dh.Load(r, prevPos) if err != nil { return err } if dh.Oid != oid { return checkErr(r, dh, "oid mismatch: %s -> %s", oid, dh.Oid) } if dh.Tid >= tid { return checkErr(r, dh, "tid not ↓: %s -> %s", tid, dh.Tid) } return nil } // LoadBackRef reads data for the data record and decodes it as backpointer reference. // // Prerequisite: dh loaded and .LenData == 0 (data record with back-pointer). func (dh *DataHeader) LoadBackRef(r io.ReaderAt) (backPos int64, err error) { if dh.DataLen != 0 { bug(r, dh, "LoadBack() on non-backpointer data header") } _, err = r.ReadAt(dh.workMem[:8], dh.Pos + DataHeaderSize) if err != nil { return 0, dh.err(r, "read data", noEOF(err)) } backPos = int64(binary.BigEndian.Uint64(dh.workMem[0:])) if !(backPos == 0 || backPos >= dataValidFrom) { return 0, checkErr(r, dh, "invalid backpointer: %v", backPos) } if backPos + DataHeaderSize > dh.TxnPos - 8 { return 0, checkErr(r, dh, "backpointer (%v) overlaps with txn (%v)", backPos, dh.TxnPos) } return backPos, nil } // LoadBack reads and decodes data header for revision linked via back-pointer. // // Prerequisite: dh is loaded and .DataLen == 0. // // If link is to zero (means deleted record) io.EOF is returned. func (dh *DataHeader) LoadBack(r io.ReaderAt) error { backPos, err := dh.LoadBackRef(r) if err != nil { return err } if backPos == 0 { return io.EOF // oid was deleted } posCur := dh.Pos err = dh.loadPrevRev(r, backPos) if err != nil { // data record @...: -> (prev rev): data record @...: ... // XXX dup wrt DataHeader.err err = &RecordError{ioname(r), "data record", posCur, "-> (back)", err} } return err } // LoadNext reads and decodes data header for next data record in the same transaction. // // Prerequisite: dh .Pos .DataLen are initialized. // // When there is no more data records: io.EOF is returned. func (dh *DataHeader) LoadNext(r io.ReaderAt, txnh *TxnHeader) error { err := dh.loadNext(r, txnh) if err != nil && err != io.EOF { err = txnh.err(r, "-> (iter data)", err) } return err } func (dh *DataHeader) loadNext(r io.ReaderAt, txnh *TxnHeader) error { // position of txn tail - right after last data record byte txnTailPos := txnh.Pos + txnh.Len - 8 // NOTE we know nextPos does not overlap txnTailPos - it was checked by // previous LoadNext() nextPos := dh.Pos + dh.Len() if nextPos == txnTailPos { return io.EOF } if nextPos + DataHeaderSize > txnTailPos { // XXX dup wrt DataHeader.err return &RecordError{ioname(r), "data record", nextPos, "check", fmt.Errorf("data record header [..., %d] overlaps txn boundary [..., %d)", nextPos + DataHeaderSize, txnTailPos)} } err := dh.Load(r, nextPos) if err != nil { return err } if dh.Tid != txnh.Tid { return checkErr(r, dh, "tid mismatch: %s -> %s", txnh.Tid, dh.Tid) } if dh.TxnPos != txnh.Pos { return checkErr(r, dh, "txn position not pointing back: %d", dh.TxnPos) } if dh.Pos + dh.Len() > txnTailPos { return checkErr(r, dh, "data record [..., %d) overlaps txn boundary [..., %d)", dh.Pos + dh.Len(), txnTailPos) } return nil } // LoadData loads data for the data record taking backpointers into account. // // On success dh state is changed to data header of original data transaction. // // NOTE "deleted" records are indicated via returning buf with .Data=nil without error. func (dh *DataHeader) LoadData(r io.ReaderAt) (*mem.Buf, error) { // scan via backpointers for dh.DataLen == 0 { err := dh.LoadBack(r) if err != nil { if err == io.EOF { return &mem.Buf{Data: nil}, nil // deleted } return nil, err } } // now read actual data buf := mem.BufAlloc64(dh.DataLen) _, err := r.ReadAt(buf.Data, dh.Pos + DataHeaderSize) if err != nil { buf.Release() return nil, dh.err(r, "read data", noEOF(err)) } return buf, nil } // --- raw iteration --- // Iter is combined 2-level iterator over transaction and data records. type Iter struct { R io.ReaderAt Dir IterDir Txnh TxnHeader // current transaction record information Datah DataHeader // current data record information } type IterDir int const ( IterForward IterDir = iota IterBackward ) // NextTxn iterates to next/previous transaction record according to iteration direction. // // The data header is reset to iterate inside transaction record that becomes current. func (it *Iter) NextTxn(flags TxnLoadFlags) error { var err error switch it.Dir { case IterForward: err = it.Txnh.LoadNext(it.R, flags) case IterBackward: err = it.Txnh.LoadPrev(it.R, flags) default: panic("Iter.Dir invalid") } if err != nil { // reset .Datah to be invalid (just in case) it.Datah.Pos = 0 it.Datah.DataLen = 0 } else { // set .Datah to iterate over .Txnh it.Datah.Pos = it.Txnh.DataPos() it.Datah.DataLen = -DataHeaderSize // first iteration will go to first data record } return err } // NextData iterates to next data record header inside current transaction. func (it *Iter) NextData() error { return it.Datah.LoadNext(it.R, &it.Txnh) } // Iterate creates Iter to iterate over r starting from posStart in direction dir. func Iterate(r io.ReaderAt, posStart int64, dir IterDir) *Iter { it := &Iter{R: r, Dir: dir, Txnh: TxnHeader{Pos: posStart}} switch dir { case IterForward: it.Txnh.Len = lenIterStart case IterBackward: it.Txnh.LenPrev = lenIterStart default: panic("dir invalid") } return it } // IterateFile opens file @ path read-only and creates Iter to iterate over it. // // The iteration will use buffering over os.File optimized for sequential access. // You are responsible to eventually close the file after the iteration is done. func IterateFile(path string, dir IterDir) (iter *Iter, file *os.File, err error) { f, err := os.Open(path) if err != nil { return nil, nil, err } // close file in case we return with an error defer func() { if err != nil { f.Close() } }() // use IO optimized for sequential access when iterating fSeq := seqReadAt(f) switch dir { case IterForward: return Iterate(fSeq, txnValidFrom, IterForward), f, nil case IterBackward: // get file size as topPos and start iterating backward from it // XXX half-committed transaction might be there. fi, err := f.Stat() if err != nil { return nil, nil, err } topPos := fi.Size() return Iterate(fSeq, topPos, IterBackward), f, nil default: panic("dir invalid") } }