Commit 4a915205 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent b912f3d6
...@@ -287,7 +287,7 @@ func (txnh *TxnHeader) loadStrings(r io.ReaderAt /* *os.File */) error { ...@@ -287,7 +287,7 @@ func (txnh *TxnHeader) loadStrings(r io.ReaderAt /* *os.File */) error {
} }
// LoadPrev reads and decodes previous transaction record header // LoadPrev reads and decodes previous transaction record header
// prerequisite: txnh .Pos and .LenPrev should be already initialized: // prerequisite: txnh .Pos and .LenPrev are initialized:
// - by successful call to Load() initially XXX but EOF also works // - by successful call to Load() initially XXX but EOF also works
// - by subsequent successful calls to LoadPrev / LoadNext XXX recheck // - by subsequent successful calls to LoadPrev / LoadNext XXX recheck
func (txnh *TxnHeader) LoadPrev(r io.ReaderAt, flags TxnLoadFlags) error { func (txnh *TxnHeader) LoadPrev(r io.ReaderAt, flags TxnLoadFlags) error {
...@@ -340,7 +340,9 @@ func (txnh *TxnHeader) LoadNext(r io.ReaderAt, flags TxnLoadFlags) error { ...@@ -340,7 +340,9 @@ func (txnh *TxnHeader) LoadNext(r io.ReaderAt, flags TxnLoadFlags) error {
// decode reads and decodes data record header // load reads and decodes data record header
// pos: points to data header start
// no prerequisite requirements are made to previous dh state
func (dh *DataHeader) load(r io.ReaderAt /* *os.File */, pos int64, tmpBuf *[DataHeaderSize]byte) error { func (dh *DataHeader) load(r io.ReaderAt /* *os.File */, pos int64, tmpBuf *[DataHeaderSize]byte) error {
dh.Pos = pos dh.Pos = pos
// XXX .Len = 0 = read error ? // XXX .Len = 0 = read error ?
...@@ -351,7 +353,7 @@ func (dh *DataHeader) load(r io.ReaderAt /* *os.File */, pos int64, tmpBuf *[Dat ...@@ -351,7 +353,7 @@ func (dh *DataHeader) load(r io.ReaderAt /* *os.File */, pos int64, tmpBuf *[Dat
_, err := r.ReadAt(tmpBuf[:], pos) _, err := r.ReadAt(tmpBuf[:], pos)
if err != nil { if err != nil {
return &ErrDataRecord{pos, "read", noEOF(err)} return dh.err("read", noEOF(err))
} }
// XXX also check oid.Valid() ? // XXX also check oid.Valid() ?
...@@ -363,9 +365,6 @@ func (dh *DataHeader) load(r io.ReaderAt /* *os.File */, pos int64, tmpBuf *[Dat ...@@ -363,9 +365,6 @@ func (dh *DataHeader) load(r io.ReaderAt /* *os.File */, pos int64, tmpBuf *[Dat
dh.PrevRevPos = int64(binary.BigEndian.Uint64(tmpBuf[16:])) dh.PrevRevPos = int64(binary.BigEndian.Uint64(tmpBuf[16:]))
dh.TxnPos = int64(binary.BigEndian.Uint64(tmpBuf[24:])) dh.TxnPos = int64(binary.BigEndian.Uint64(tmpBuf[24:]))
if dh.PrevRevPos < dataValidFrom {
return decodeErr(dh, "invalid prev oid data position: %v", dh.PrevRevPos)
}
if dh.TxnPos < txnValidFrom { if dh.TxnPos < txnValidFrom {
return decodeErr(dh, "invalid txn position: %v", dh.TxnPos) return decodeErr(dh, "invalid txn position: %v", dh.TxnPos)
} }
...@@ -373,12 +372,15 @@ func (dh *DataHeader) load(r io.ReaderAt /* *os.File */, pos int64, tmpBuf *[Dat ...@@ -373,12 +372,15 @@ func (dh *DataHeader) load(r io.ReaderAt /* *os.File */, pos int64, tmpBuf *[Dat
if dh.TxnPos + TxnHeaderFixSize > pos { if dh.TxnPos + TxnHeaderFixSize > pos {
return decodeErr(dh, "txn position not decreasing: %v", dh.TxnPos) return decodeErr(dh, "txn position not decreasing: %v", dh.TxnPos)
} }
if dh.PrevRevPos + DataHeaderSize > dh.TxnPos - 8 { if dh.PrevRevPos != 0 { // zero means there is no previous revision
return decodeErr(dh, "prev oid data position (%v) overlaps with txn (%v)", dh.PrevRevPos, dh.TxnPos) // XXX wording if dh.PrevRevPos < dataValidFrom {
return decodeErr(dh, "invalid prev revision position: %v", dh.PrevRevPos)
}
if dh.PrevRevPos + DataHeaderSize > dh.TxnPos - 8 {
return decodeErr(dh, "prev revision position (%v) overlaps with txn (%v)", dh.PrevRevPos, dh.TxnPos)
}
} }
// XXX check PrevRevPos vs TxnPos overlap
verlen := binary.BigEndian.Uint16(tmpBuf[32:]) verlen := binary.BigEndian.Uint16(tmpBuf[32:])
if verlen != 0 { if verlen != 0 {
return decodeErr(dh, "non-zero version: #%v", verlen) return decodeErr(dh, "non-zero version: #%v", verlen)
...@@ -399,6 +401,35 @@ func (dh *DataHeader) Load(r io.ReaderAt, pos int64) error { ...@@ -399,6 +401,35 @@ func (dh *DataHeader) Load(r io.ReaderAt, pos int64) error {
return dh.load(r, pos, &tmpBuf) return dh.load(r, pos, &tmpBuf)
} }
// LoadPrevRev reads and decodes previous revision data record
// prerequisite: dh .Oid .Tid .PrevRevPos are initialized:
// - TODO describe how
// when there is no previous revision: io.EOF is returned
func (dh *DataHeader) LoadPrevRev(r io.ReaderAt /* *os.File */) error {
if dh.PrevRevPos == 0 { // XXX -> -1 ?
return io.EOF // no more previous revisions
}
err = dh.Load(r, dh.PrevRevPos)
if err != nil {
return err
}
if dh.Oid != oid {
// data record @...: while loading as prev rev for data record @...: oid mismatch ...
return ...
}
if dh.Tid >= tid {
// data record @...: while loading as prev rev for data record @...: tid not decreasing: ...
return ...
}
return nil
}
func (dh *DataHeader) LoadBack(r io.ReaderAt /* *os.File */) error {
}
func Open(path string) (*FileStorage, error) { func Open(path string) (*FileStorage, error) {
...@@ -477,42 +508,22 @@ func (fs *FileStorage) Load(xid zodb.Xid) (data []byte, tid zodb.Tid, err error) ...@@ -477,42 +508,22 @@ func (fs *FileStorage) Load(xid zodb.Xid) (data []byte, tid zodb.Tid, err error)
return nil, zodb.Tid(0), &ErrXidLoad{xid, zodb.ErrOidMissing{Oid: xid.Oid}} return nil, zodb.Tid(0), &ErrXidLoad{xid, zodb.ErrOidMissing{Oid: xid.Oid}}
} }
dh := DataHeader{Tid: zodb.TidMax} dh := DataHeader{Oid: xid.Oid, Tid: zodb.TidMax, PrevRevPos: dataPos}
tidBefore := xid.XTid.Tid tidBefore := xid.XTid.Tid
if !xid.XTid.TidBefore { if !xid.XTid.TidBefore {
tidBefore++ // XXX recheck this is ok wrt overflow tidBefore++ // XXX recheck this is ok wrt overflow
} }
// search backwards for when we first have data record with tid satisfying xid.XTid // search backwards for when we first have data record with tid satisfying xid.XTid
for { for dh.Tid >= tidBefore {
//prevTid := dh.Tid err = dh.LoadPrevRev(fs.file)
err = dh.Load(fs.file, dataPos)
if err != nil { if err != nil {
if err == io.EOF {
// no such oid revision
err = &zodb.ErrXidMissing{Xid: xid}
}
return nil, zodb.Tid(0), &ErrXidLoad{xid, err} return nil, zodb.Tid(0), &ErrXidLoad{xid, err}
} }
// TODO -> LoadPrev()
// check data record consistency
// TODO reenable
// if dh.Oid != oid {
// // ... header invalid:
// return nil, zodb.Tid(0), &ErrXidLoad{xid, &ErrDataRecord{dataPos, "consistency check", "TODO unexpected oid")}
// }
// if dh.Tid >= prevTid { ... }
// if dh.TxnPos >= dataPos - TxnHeaderSize { ... }
// if dh.PrevDataRecPos >= dh.TxnPos - DataHeaderSize - 8 /* XXX */ { ... }
if dh.Tid < tidBefore {
break
}
// continue search
dataPos = dh.PrevRevPos
if dataPos == 0 {
// no such oid revision
return nil, zodb.Tid(0), &ErrXidLoad{xid, &zodb.ErrXidMissing{Xid: xid}}
}
} }
// found dh.Tid < tidBefore; check it really satisfies xid.XTid // found dh.Tid < tidBefore; check it really satisfies xid.XTid
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment