Commit 5a7e31ab authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 7729e7d1
...@@ -32,7 +32,7 @@ type FileStorage struct { ...@@ -32,7 +32,7 @@ type FileStorage struct {
file *os.File file *os.File
index *fsIndex // oid -> data record position in transaction which last changed oid index *fsIndex // oid -> data record position in transaction which last changed oid
topPos int64 // position pointing just past last committed transaction topPos int64 // position pointing just past last committed transaction
// (= size(f) when no commit is in progress) // (= size(.file) when no commit is in progress)
// min/max tids committed // min/max tids committed
tidMin, tidMax zodb.Tid tidMin, tidMax zodb.Tid
...@@ -51,15 +51,16 @@ type TxnHeader struct { ...@@ -51,15 +51,16 @@ type TxnHeader struct {
// transaction metadata tself // transaction metadata tself
zodb.TxnInfo zodb.TxnInfo
// underlying memory for header loading and for user/desc/extension // underlying memory for header loading and for user/desc/extension strings
workMem []byte workMem []byte
} }
// DataHeader represents header of a data record // DataHeader represents header of a data record
type DataHeader struct { type DataHeader struct {
Pos int64 // position of data record Pos int64 // position of data record start
Oid zodb.Oid Oid zodb.Oid
Tid zodb.Tid Tid zodb.Tid
// XXX -> .PosPrevRev .PosTxn .LenData
PrevRevPos int64 // position of this oid's previous-revision data record XXX naming PrevRevPos int64 // position of this oid's previous-revision data record XXX naming
TxnPos int64 // position of transaction record this data record belongs to TxnPos int64 // position of transaction record this data record belongs to
//_ uint16 // 2-bytes with zero values. (Was version length.) //_ uint16 // 2-bytes with zero values. (Was version length.)
...@@ -71,13 +72,12 @@ type DataHeader struct { ...@@ -71,13 +72,12 @@ type DataHeader struct {
// XXX include word0 ? // XXX include word0 ?
} }
// on-disk sizes
const ( const (
Magic = "FS21" Magic = "FS21" // every FileStorage file starts with this
TxnHeaderFixSize = 8+8+1+2+2+2 // on-disk size without user/desc/ext strings
txnXHeaderFixSize = 8 + TxnHeaderFixSize // with trail LenPrev from previous record
// on-disk sizes
TxnHeaderFixSize = 8+8+1+2+2+2 // without user/desc/ext strings
txnXHeaderFixSize = 8 + TxnHeaderFixSize // ^^^ with trail LenPrev from previous record
DataHeaderSize = 8+8+8+8+2+8 DataHeaderSize = 8+8+8+8+2+8
// txn/data pos that are < vvv are for sure invalid // txn/data pos that are < vvv are for sure invalid
...@@ -86,7 +86,6 @@ const ( ...@@ -86,7 +86,6 @@ const (
) )
// ErrTxnRecord is returned on transaction record read / decode errors // ErrTxnRecord is returned on transaction record read / decode errors
// XXX merge with ErrDataRecord -> ErrRecord{pos, "transaction|data", "read", err} ?
type ErrTxnRecord struct { type ErrTxnRecord struct {
Pos int64 // position of transaction record Pos int64 // position of transaction record
Subj string // about what .Err is Subj string // about what .Err is
...@@ -97,6 +96,20 @@ func (e *ErrTxnRecord) Error() string { ...@@ -97,6 +96,20 @@ func (e *ErrTxnRecord) Error() string {
return fmt.Sprintf("transaction record @%v: %v: %v", e.Pos, e.Subj, e.Err) return fmt.Sprintf("transaction record @%v: %v: %v", e.Pos, e.Subj, e.Err)
} }
// err creates ErrTxnRecord for transaction located at txnh.Pos
func (txnh *TxnHeader) err(subj string, err error) *ErrTxnRecord {
return &ErrTxnRecord{txnh.Pos, subj, err}
}
// errf is syntatic shortcut for err and fmt.Errorf
func (txnh *TxnHeader) errf(subj, format string, a ...interface{}) *ErrTxnRecord {
return txnh.err(subj, fmt.Errorf(format, a...))
}
func (txnh *TxnHeader) decodeErr(format string, a ...interface{}) *ErrTxnRecord {
return txnh.errf("decode", format, a...)
}
// ErrDataRecord is returned on data record read / decode errors // ErrDataRecord is returned on data record read / decode errors
type ErrDataRecord struct { type ErrDataRecord struct {
Pos int64 // position of data record Pos int64 // position of data record
...@@ -108,11 +121,17 @@ func (e *ErrDataRecord) Error() string { ...@@ -108,11 +121,17 @@ func (e *ErrDataRecord) Error() string {
return fmt.Sprintf("data record @%v: %v: %v", e.Pos, e.Subj, e.Err) return fmt.Sprintf("data record @%v: %v: %v", e.Pos, e.Subj, e.Err)
} }
// err creates ErrDataRecord for data record located at dh.Pos
// XXX add link to containing txn? (check whether we can do it on data access) ?
func (dh *DataHeader) err(subj string, err error) *ErrDataRecord {
return &ErrDataRecord{dh.Pos, subj, err}
}
// TODO errf, decodeErr
// // XXX -> zodb? // // XXX -> zodb?
// var ErrVersionNonZero = errors.New("non-zero version") // var ErrVersionNonZero = errors.New("non-zero version")
var bugPosition = errors.New("software bug: invalid position")
// noEOF returns err, but changes io.EOF -> io.ErrUnexpectedEOF // noEOF returns err, but changes io.EOF -> io.ErrUnexpectedEOF
func noEOF(err error) error { func noEOF(err error) error {
if err == io.EOF { if err == io.EOF {
...@@ -129,6 +148,10 @@ const ( ...@@ -129,6 +148,10 @@ const (
LoadNoStrings = 0x01 // do not load user/desc/ext strings LoadNoStrings = 0x01 // do not load user/desc/ext strings
) )
// it is software bug to pass invalid position to Load*()
// this error will be paniced
var bugPositionInvalid = errors.New("software bug: invalid position")
// Load reads and decodes transaction record header // Load reads and decodes transaction record header
// pos: points to transaction start // pos: points to transaction start
// no requirements are made to previous txnh state // no requirements are made to previous txnh state
...@@ -139,16 +162,13 @@ func (txnh *TxnHeader) Load(r io.ReaderAt /* *os.File */, pos int64, flags TxnLo ...@@ -139,16 +162,13 @@ func (txnh *TxnHeader) Load(r io.ReaderAt /* *os.File */, pos int64, flags TxnLo
} }
work := txnh.workMem[:txnXHeaderFixSize] work := txnh.workMem[:txnXHeaderFixSize]
// XXX recheck rules about error exit
txnh.Pos = pos txnh.Pos = pos
txnh.Len = 0 // XXX recheck rules about error exit txnh.Len = 0
txnh.LenPrev = 0 txnh.LenPrev = 0
if pos < txnValidFrom { if pos < txnValidFrom {
panic(&ErrTxnRecord{pos, "read", bugPosition}) panic(txnh.err("read", bugPositionInvalid))
}
decodeErr := func(format string, a ...interface{}) *ErrTxnRecord {
return &ErrTxnRecord{pos, "decode", fmt.Errorf(format, a...)}
} }
var n int var n int
...@@ -161,7 +181,7 @@ func (txnh *TxnHeader) Load(r io.ReaderAt /* *os.File */, pos int64, flags TxnLo ...@@ -161,7 +181,7 @@ func (txnh *TxnHeader) Load(r io.ReaderAt /* *os.File */, pos int64, flags TxnLo
if n >= 0 { if n >= 0 {
lenPrev := 8 + int64(binary.BigEndian.Uint64(work[8-8:])) lenPrev := 8 + int64(binary.BigEndian.Uint64(work[8-8:]))
if lenPrev < TxnHeaderFixSize { if lenPrev < TxnHeaderFixSize {
return decodeErr("invalid txn prev record length: %v", lenPrev) return txnh.decodeErr("invalid txn prev record length: %v", lenPrev)
} }
txnh.LenPrev = lenPrev txnh.LenPrev = lenPrev
} }
...@@ -178,23 +198,23 @@ func (txnh *TxnHeader) Load(r io.ReaderAt /* *os.File */, pos int64, flags TxnLo ...@@ -178,23 +198,23 @@ func (txnh *TxnHeader) Load(r io.ReaderAt /* *os.File */, pos int64, flags TxnLo
// EOF after txn header is not good - because at least // EOF after txn header is not good - because at least
// redundand lenght should be also there // redundand lenght should be also there
return &ErrTxnRecord{pos, "read", noEOF(err)} return txnh.err("read", noEOF(err))
} }
txnh.Tid = zodb.Tid(binary.BigEndian.Uint64(work[8+0:])) txnh.Tid = zodb.Tid(binary.BigEndian.Uint64(work[8+0:]))
if !txnh.Tid.Valid() { if !txnh.Tid.Valid() {
return decodeErr("invalid tid: %v", txnh.Tid) return txnh.decodeErr("invalid tid: %v", txnh.Tid)
} }
tlen := 8 + int64(binary.BigEndian.Uint64(work[8+8:])) tlen := 8 + int64(binary.BigEndian.Uint64(work[8+8:]))
if tlen < TxnHeaderFixSize { if tlen < TxnHeaderFixSize {
return decodeErr("invalid txn record length: %v", tlen) return txnh.decodeErr("invalid txn record length: %v", tlen)
} }
txnh.Len = tlen txnh.Len = tlen
txnh.Status = zodb.TxnStatus(work[8+16]) txnh.Status = zodb.TxnStatus(work[8+16])
if !txnh.Status.Valid() { if !txnh.Status.Valid() {
return decodeErr("invalid status: %v", txnh.Status) return txnh.decodeErr("invalid status: %v", txnh.Status)
} }
...@@ -210,13 +230,11 @@ func (txnh *TxnHeader) Load(r io.ReaderAt /* *os.File */, pos int64, flags TxnLo ...@@ -210,13 +230,11 @@ func (txnh *TxnHeader) Load(r io.ReaderAt /* *os.File */, pos int64, flags TxnLo
txnh.workMem = txnh.workMem[:lstr] txnh.workMem = txnh.workMem[:lstr]
} }
work = txnh.workMem[:lstr]
// NOTE we encode each x string length into cap(x) // NOTE we encode each x string length into cap(x)
// and set len(x) = 0 to indicate x is not loaded yet // and set len(x) = 0 to indicate x is not loaded yet
txnh.User = work[0:0:luser] txnh.User = txnh.workMem[0:0:luser]
txnh.Description = work[luser:luser:ldesc] txnh.Description = txnh.workMem[luser:luser:ldesc]
txnh.Extension = work[luser+ldesc:luser+ldesc:lext] txnh.Extension = txnh.workMem[luser+ldesc:luser+ldesc:lext]
if flags & LoadNoStrings == 0 { if flags & LoadNoStrings == 0 {
err = txnh.loadStrings(r) err = txnh.loadStrings(r)
...@@ -232,7 +250,7 @@ func (txnh *TxnHeader) loadStrings(r io.ReaderAt /* *os.File */) error { ...@@ -232,7 +250,7 @@ func (txnh *TxnHeader) loadStrings(r io.ReaderAt /* *os.File */) error {
// we rely on Load leaving len(workMem) = sum of all strings length ... // we rely on Load leaving len(workMem) = sum of all strings length ...
_, err := r.ReadAt(txnh.workMem, txnh.Pos + TxnHeaderFixSize) _, err := r.ReadAt(txnh.workMem, txnh.Pos + TxnHeaderFixSize)
if err != nil { if err != nil {
return &ErrTxnRecord{txnh.Pos, "read strings", noEOF(err)} return txnh.err("read strings", noEOF(err))
} }
// ... and presetting x to point to appropriate places in .workMem . // ... and presetting x to point to appropriate places in .workMem .
...@@ -247,17 +265,35 @@ func (txnh *TxnHeader) loadStrings(r io.ReaderAt /* *os.File */) error { ...@@ -247,17 +265,35 @@ func (txnh *TxnHeader) loadStrings(r io.ReaderAt /* *os.File */) error {
// LoadPrev reads and decodes previous transaction record header // LoadPrev reads and decodes previous transaction record header
// txnh should be already initialized by previous call to load() // txnh should be already initialized by previous call to load()
func (txnh *TxnHeader) LoadPrev(r io.ReaderAt, flags TxnLoadFlags) error { func (txnh *TxnHeader) LoadPrev(r io.ReaderAt, flags TxnLoadFlags) error {
if txnh.LenPrev == 0 { lenPrev := txnh.LenPrev
if lenPrev == 0 {
return io.EOF return io.EOF
} }
return txnh.Load(r, txnh.Pos - txnh.LenPrev, flags) err := txnh.Load(r, txnh.Pos - lenPrev, flags)
if err != nil {
return err
}
if txnh.Len != lenPrev {
return txnh.decodeErr("redundant lengths mismatch: %v, %v", txnh.Len, lenPrev)
}
return nil
} }
// LoadNext reads and decodes next transaction record header // LoadNext reads and decodes next transaction record header
// txnh should be already initialized by previous call to load() // txnh should be already initialized by previous call to load()
func (txnh *TxnHeader) LoadNext(r io.ReaderAt, flags TxnLoadFlags) error { func (txnh *TxnHeader) LoadNext(r io.ReaderAt, flags TxnLoadFlags) error {
return txnh.Load(r, txnh.Pos + txnh.Len, flags) lenCur := txnh.Len
err := txnh.Load(r, txnh.Pos + txnh.Len, flags)
// TODO XXX
if txnh.LenPrev != lenCur {
}
if err != nil {
return err
}
} }
...@@ -265,7 +301,7 @@ func (txnh *TxnHeader) LoadNext(r io.ReaderAt, flags TxnLoadFlags) error { ...@@ -265,7 +301,7 @@ func (txnh *TxnHeader) LoadNext(r io.ReaderAt, flags TxnLoadFlags) error {
// decode reads and decodes data record header // decode reads and decodes data record header
func (dh *DataHeader) load(r io.ReaderAt /* *os.File */, pos int64, tmpBuf *[DataHeaderSize]byte) error { func (dh *DataHeader) load(r io.ReaderAt /* *os.File */, pos int64, tmpBuf *[DataHeaderSize]byte) error {
if pos < dataValidFrom { if pos < dataValidFrom {
panic(&ErrDataRecord{pos, "read", bugPosition}) panic(&ErrDataRecord{pos, "read", bugPositionInvalid})
} }
dh.Pos = pos dh.Pos = pos
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment