Commit cf3fbe5b authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent cb061733
......@@ -11,6 +11,7 @@
// See COPYING file for full licensing terms.
//
// XXX partly based on code from ZODB ?
// TODO link to format in zodb/py
// FileStorage v1. XXX text
package fs1
......@@ -25,9 +26,11 @@ import (
"../../zodb"
)
// FileStorage is a ZODB storage which stores data in simple append-only file
// organized as transactional log.
type FileStorage struct {
f *os.File // XXX naming -> file ?
index *fsIndex
file *os.File
index *fsIndex // oid -> data record position in transaction which last changed oid
topPos int64 // position pointing just past last committed transaction
// (= size(f) when no commit is in progress)
......@@ -35,15 +38,13 @@ type FileStorage struct {
tidMin, tidMax zodb.Tid
}
// IStorage
// IStorage XXX move ?
var _ zodb.IStorage = (*FileStorage)(nil)
// TxnHeader represents header of a transaction record
type TxnHeader struct {
// XXX -> TxnPos, TxnLen, PrevTxnLen ?
Pos int64 // position of transaction start
PrevLen int64 // whole previous transaction record length
LenPrev int64 // whole previous transaction record length
// (0 if there is no previous txn record)
Len int64 // whole transaction record length
......@@ -54,32 +55,15 @@ type TxnHeader struct {
workMem []byte
}
const (
txnHeaderFixSize = 8+8+1+2+2+2 // = 23
txnXHeaderFixSize = 8 + txnHeaderFixSize // with trail lengthm8 from previous record
)
// ErrTxnRecord is returned on transaction record read / decode errors
// XXX merge with ErrDataRecord -> ErrRecord{pos, "transaction|data", "read", err} ?
type ErrTxnRecord struct {
Pos int64 // position of transaction record
Subj string // about what .Err is
Err error // actual error
}
func (e *ErrTxnRecord) Error() string {
return fmt.Sprintf("transaction record @%v: %v: %v", e.Pos, e.Subj, e.Err)
}
// DataHeader represents header of a data record
type DataHeader struct {
Pos int64 // position of data record
Oid zodb.Oid
Tid zodb.Tid
PrevRevPos int64 // position of this oid's previous-revision data record
PrevRevPos int64 // position of this oid's previous-revision data record XXX naming
TxnPos int64 // position of transaction record this data record belongs to
//_ uint16 // 2-bytes with zero values. (Was version length.)
DataLen int64 // length of following data. if 0 -> following = 8 bytes backpointer XXX -> validate in code
DataLen int64 // length of following data. if 0 -> following = 8 bytes backpointer
// if backpointer == 0 -> oid deleted
//Data []byte
//DataRecPos uint64 // if Data == nil -> byte position of data record containing data
......@@ -87,7 +71,31 @@ type DataHeader struct {
// XXX include word0 ?
}
const dataHeaderSize = 8+8+8+8+2+8 // = 42
// on-disk sizes
const (
Magic = "FS21"
TxnHeaderFixSize = 8+8+1+2+2+2 // on-disk size without user/desc/ext strings
txnXHeaderFixSize = 8 + TxnHeaderFixSize // with trail LenPrev from previous record
DataHeaderSize = 8+8+8+8+2+8
// txn/data pos that are < vvv are for sure invalid
txnValidFrom = int64(len(Magic))
dataValidFrom = txnValidFrom + TxnHeaderFixSize
)
// ErrTxnRecord is returned on transaction record read / decode errors
// XXX merge with ErrDataRecord -> ErrRecord{pos, "transaction|data", "read", err} ?
type ErrTxnRecord struct {
Pos int64 // position of transaction record
Subj string // about what .Err is
Err error // actual error
}
func (e *ErrTxnRecord) Error() string {
return fmt.Sprintf("transaction record @%v: %v: %v", e.Pos, e.Subj, e.Err)
}
// ErrDataRecord is returned on data record read / decode errors
type ErrDataRecord struct {
......@@ -103,62 +111,92 @@ func (e *ErrDataRecord) Error() string {
// XXX -> zodb?
var ErrVersionNonZero = errors.New("non-zero version")
var errPositionBug = errors.New("software bug: invalid position")
// noEOF returns err, but changes io.EOF -> io.ErrUnexpectedEOF
func noEOF(err error) error {
if err == io.EOF {
err = io.ErrUnexpectedEOF
}
return err
}
// flags for TxnHeader.Load
type TxnLoadFlags int
const (
LoadAll TxnLoadFlags = 0x00
LoadNoStrings = 0x01
LoadAll TxnLoadFlags = 0x00 // load whole transaction header
LoadNoStrings = 0x01 // do not load user/desc/ext strings
)
// Load reads and decodes transaction record header from a readerAt
// Load reads and decodes transaction record header
// pos: points to transaction start
// no requirements are made to previous txnh state
// XXX io.ReaderAt -> *os.File (if iface conv costly)
func (txnh *TxnHeader) Load(r io.ReaderAt, pos int64, flags TxnLoadFlags) error {
// TODO describe what happens at EOF and when .LenPrev is still valid
func (txnh *TxnHeader) Load(r io.ReaderAt /* *os.File */, pos int64, flags TxnLoadFlags) error {
if cap(txnh.workMem) < txnXHeaderFixSize {
txnh.workMem = make([]byte, txnXHeaderFixSize) // XXX or 0, ... ?
}
work := txnh.workMem[:txnXHeaderFixSize] // XXX name
work := txnh.workMem[:txnXHeaderFixSize]
txnh.Pos = pos
txnh.Len = 0 // XXX recheck rules about error exit
txnh.LenPrev = 0
if pos < txnValidFrom {
panic(&ErrTxnRecord{pos, "read", errPositionBug})
}
decodeErr := func(format string, a ...interface{}) *ErrTxnRecord {
return &ErrTxnRecord{pos, "decode", fmt.Errorf(format, a...)}
}
var n int
var err error
if pos > 4 + 8 { // XXX -> magic
if pos - 8 >= txnValidFrom {
// read together with previous's txn record redundand length
n, err = r.ReadAt(work, pos - 8)
n -= 8 // relative to pos
if n >= 0 {
txnh.PrevLen = 8 + int64(binary.BigEndian.Uint64(work[8-8:]))
if txnh.PrevLen < txnHeaderFixSize {
panic("too small txn prev record length") // XXX
lenPrev := 8 + int64(binary.BigEndian.Uint64(work[8-8:]))
if lenPrev < TxnHeaderFixSize {
return decodeErr("invalid txn prev record length: %v", lenPrev)
}
txnh.LenPrev = lenPrev
}
} else {
// read only current txn without previous record length
n, err = r.ReadAt(work[8:], pos)
txnh.PrevLen = 0
}
if err != nil {
if err == io.EOF {
if n == 0 {
return err // end of stream
}
// EOF after txn header is not good
err = io.ErrUnexpectedEOF
if err == io.EOF && n == 0 {
return err // end of stream
}
return n, &ErrTxnRecord{pos, "read", err}
// EOF after txn header is not good - because at least
// redundand lenght should be also there
return &ErrTxnRecord{pos, "read", noEOF(err)}
}
txnh.Tid = zodb.Tid(binary.BigEndian.Uint64(work[8+0:]))
txnh.Len = 8 + int64(binary.BigEndian.Uint64(work[8+8:]))
txnh.Status = zodb.TxnStatus(work[8+16])
if !txnh.Tid.Valid() {
return decodeErr("invalid tid: %v", txnh.Tid)
}
if txnh.Len < txnHeaderFixSize {
panic("too small txn record length") // XXX
tlen := 8 + int64(binary.BigEndian.Uint64(work[8+8:]))
if tlen < TxnHeaderFixSize {
return decodeErr("invalid txn record length: %v", tlen)
}
txnh.Len = tlen
txnh.Status = zodb.TxnStatus(work[8+16])
if !txnh.Status.Valid() {
return decodeErr("invalid status: %v", txnh.Status)
}
luser := binary.BigEndian.Uint16(work[8+17:])
ldesc := binary.BigEndian.Uint16(work[8+19:])
......@@ -180,21 +218,21 @@ func (txnh *TxnHeader) Load(r io.ReaderAt, pos int64, flags TxnLoadFlags) error
txnh.Description = work[luser:luser:ldesc]
txnh.Extension = work[luser+ldesc:luser+ldesc:lext]
if flags & LoadNoString == 0 {
txnh.loadString()
if flags & LoadNoStrings == 0 {
err = txnh.loadStrings(r)
}
return txnHeaderFixSize + lstr, nil
return err
}
// loadStrings makes sure strings that are part of transaction header are loaded
func (txnh *TxnHeader) loadStrings(r io.ReaderAt) error {
func (txnh *TxnHeader) loadStrings(r io.ReaderAt /* *os.File */) error {
// XXX make it no-op if strings are already loaded?
// we rely on Load leaving len(workMem) = sum of all strings length ...
nstr, err = r.ReadAt(txnh.workMem, txnh.Pos + txnHeaderFixSize)
_, err := r.ReadAt(txnh.workMem, txnh.Pos + TxnHeaderFixSize)
if err != nil {
return 0, &ErrTxnRecord{txnh.Pos, "read", noEof(err)} // XXX -> "read strings" ?
return &ErrTxnRecord{txnh.Pos, "read strings", noEOF(err)}
}
// ... and presetting x to point to appropriate places in .workMem .
......@@ -202,19 +240,21 @@ func (txnh *TxnHeader) loadStrings(r io.ReaderAt) error {
txnh.User = txnh.User[:cap(txnh.User)]
txnh.Description = txnh.Description[:cap(txnh.Description)]
txnh.Extension = txnh.Extension[:cap(txnh.Extension)]
return nil
}
// LoadPrev reads and decodes previous transaction record header from a readerAt
// LoadPrev reads and decodes previous transaction record header
// txnh should be already initialized by previous call to load()
func (txnh *TxnHeader) LoadPrev(r io.ReaderAt, flags TxnLoadFlags) error {
if txnh.PrevLen == 0 {
if txnh.LenPrev == 0 {
return io.EOF
}
return txnh.Load(r, txnh.Pos - txnh.PrevLen, flags)
return txnh.Load(r, txnh.Pos - txnh.LenPrev, flags)
}
// LoadNext reads and decodes next transaction record header from a readerAt
// LoadNext reads and decodes next transaction record header
// txnh should be already initialized by previous call to load()
func (txnh *TxnHeader) LoadNext(r io.ReaderAt, flags TxnLoadFlags) error {
return txnh.Load(r, txnh.Pos + txnh.Len, flags)
......@@ -222,51 +262,70 @@ func (txnh *TxnHeader) LoadNext(r io.ReaderAt, flags TxnLoadFlags) error {
// XXX -> Load ?
// decode reads and decodes data record header from a readerAt
// XXX io.ReaderAt -> *os.File (if iface conv costly)
func (dh *DataHeader) decode(r io.ReaderAt, pos int64, tmpBuf *[dataHeaderSize]byte) error {
n, err := r.ReadAt(tmpBuf[:], pos)
// decode reads and decodes data record header
func (dh *DataHeader) load(r io.ReaderAt /* *os.File */, pos int64, tmpBuf *[DataHeaderSize]byte) error {
if pos < dataValidFrom {
panic(&ErrDataRecord{pos, "read", errPositionBug})
}
_, err := r.ReadAt(tmpBuf[:], pos)
if err != nil {
return &ErrDataRecord{pos, "read", noEof(err)}
return &ErrDataRecord{pos, "read", noEOF(err)}
}
decodeErr := func(format string, a ...interface{}) *ErrDataRecord {
return &ErrDataRecord{pos, "decode", fmt.Errorf(format, a...)}
}
// XXX also check oid.Valid() ?
dh.Oid = zodb.Oid(binary.BigEndian.Uint64(tmpBuf[0:])) // XXX -> zodb.Oid.Decode() ?
dh.Tid = zodb.Tid(binary.BigEndian.Uint64(tmpBuf[8:])) // XXX -> zodb.Tid.Decode() ?
dh.PrevDataRecPos = int64(binary.BigEndian.Uint64(tmpBuf[16:]))
if !dh.Tid.Valid() {
return decodeErr("invalid tid: %v", dh.Tid)
}
// XXX check prev data pos:
// < current pos
// > ... (valid)
dh.PrevRevPos = int64(binary.BigEndian.Uint64(tmpBuf[16:]))
// XXX txnPos < current pos
// XXX > ... valid
dh.TxnPos = int64(binary.BigEndian.Uint64(tmpBuf[24:]))
verlen := binary.BigEndian.Uint16(tmpBuf[32:])
dh.DataLen = binary.BigEndian.Uint64(tmpBuf[34:])
verlen := binary.BigEndian.Uint16(tmpBuf[32:])
if verlen != 0 {
return &ErrDataRecord{pos, "invalid header", ErrVersionNonZero}
}
// XXX check DataLen >= 0
dh.DataLen = int64(binary.BigEndian.Uint64(tmpBuf[34:]))
return nil
}
// XXX do we need Decode when decode() is there?
func (dh *DataHeader) Decode(r io.ReaderAt, pos int64) error {
var tmpBuf [dataHeaderSize]byte
return dh.decode(r, pos, &tmpBuf)
// XXX do we need Load when load() is there?
func (dh *DataHeader) Load(r io.ReaderAt, pos int64) error {
var tmpBuf [DataHeaderSize]byte
return dh.load(r, pos, &tmpBuf)
}
func OpenFileStorage(path string) (*FileStorage, error) {
func Open(path string) (*FileStorage, error) {
f, err := os.Open(path) // XXX opens in O_RDONLY
if err != nil {
return nil, err // XXX err more context ?
}
// check file magic
var xxx [4]byte
var xxx [len(Magic)]byte
_, err = f.ReadAt(xxx[:], 0)
if err != nil {
return nil, err // XXX err more context
}
if string(xxx[:]) != "FS21" {
if string(xxx[:]) != Magic {
return nil, fmt.Errorf("%s: invalid magic %q", path, xxx) // XXX err?
}
......@@ -280,24 +339,24 @@ func OpenFileStorage(path string) (*FileStorage, error) {
// read tidMin/tidMax
// FIXME support empty file case
var txnhMin, txnhMax TxnHeader
err = txnhMin.Load(f, 4)
err = txnhMin.Load(f, txnValidFrom, LoadAll) // XXX txnValidFrom here -> ?
if err != nil {
return nil, err // XXX +context
}
err = txnhMax.Load(f, topPos)
err = txnhMax.Load(f, topPos, LoadAll)
// XXX expect EOF but .PrevLen must be good
if err != nil {
return nil, err // XXX +context
}
err = txhhMax.LoadPrev(f)
err = txnhMax.LoadPrev(f, LoadAll)
if err != nil {
// XXX
}
return &FileStorage{
f: f,
file: f,
index: index,
topPos: topPos,
tidMin: txnhMin.Tid,
......@@ -307,7 +366,9 @@ func OpenFileStorage(path string) (*FileStorage, error) {
func (fs *FileStorage) LastTid() zodb.Tid {
panic("TODO")
// XXX check we have transactions at all
// XXX what to return then?
return fs.tidMax
}
// ErrXidLoad is returned when there is an error while loading xid
......@@ -337,11 +398,12 @@ func (fs *FileStorage) Load(xid zodb.Xid) (data []byte, tid zodb.Tid, err error)
// search backwards for when we first have data record with tid satisfying xid.XTid
for {
//prevTid := dh.Tid
err = dh.Decode(fs.f, dataPos)
err = dh.Load(fs.file, dataPos)
if err != nil {
return nil, zodb.Tid(0), &ErrXidLoad{xid, err}
}
// TODO -> LoadPrev()
// check data record consistency
// TODO reenable
// if dh.Oid != oid {
......@@ -351,14 +413,14 @@ func (fs *FileStorage) Load(xid zodb.Xid) (data []byte, tid zodb.Tid, err error)
// if dh.Tid >= prevTid { ... }
// if dh.TxnPos >= dataPos - TxnHeaderSize { ... }
// if dh.PrevDataRecPos >= dh.TxnPos - dataHeaderSize - 8 /* XXX */ { ... }
// if dh.PrevDataRecPos >= dh.TxnPos - DataHeaderSize - 8 /* XXX */ { ... }
if dh.Tid < tidBefore {
break
}
// continue search
dataPos = dh.PrevDataRecPos
dataPos = dh.PrevRevPos
if dataPos == 0 {
// no such oid revision
return nil, zodb.Tid(0), &ErrXidLoad{xid, &zodb.ErrXidMissing{Xid: xid}}
......@@ -377,13 +439,16 @@ func (fs *FileStorage) Load(xid zodb.Xid) (data []byte, tid zodb.Tid, err error)
// scan via backpointers
for dh.DataLen == 0 {
// XXX -> LoadBack() ?
var xxx [8]byte // XXX escapes ?
_, err = fs.f.ReadAt(xxx[:], dataPos + dataHeaderSize)
_, err = fs.file.ReadAt(xxx[:], dataPos + DataHeaderSize)
if err != nil {
panic(err) // XXX
}
dataPos = int64(binary.BigEndian.Uint64(xxx[:]))
err = dh.Decode(fs.f, dataPos)
// XXX check dataPos < dh.Pos
// XXX >= dataValidFrom
err = dh.Load(fs.file, dataPos)
if err != nil {
panic(err) // XXX
}
......@@ -391,7 +456,7 @@ func (fs *FileStorage) Load(xid zodb.Xid) (data []byte, tid zodb.Tid, err error)
// now read actual data
data = make([]byte, dh.DataLen) // TODO -> slab ?
n, err := fs.f.ReadAt(data, dataPos + dataHeaderSize)
n, err := fs.file.ReadAt(data, dataPos + DataHeaderSize)
if n == len(data) {
err = nil // we don't mind to get EOF after full data read XXX ok?
}
......@@ -404,11 +469,11 @@ func (fs *FileStorage) Load(xid zodb.Xid) (data []byte, tid zodb.Tid, err error)
func (fs *FileStorage) Close() error {
// TODO dump index
err := fs.f.Close()
err := fs.file.Close()
if err != nil {
return err
}
fs.f = nil
fs.file = nil
return nil
}
......@@ -418,7 +483,7 @@ func (fs *FileStorage) StorageName() string {
type forwardIter struct {
//Pos int64 // current transaction position
fs *FileStorage
Txnh TxnHeader // current transaction information
TidMax zodb.Tid // iterate up to tid <= tidMax
......@@ -426,7 +491,7 @@ type forwardIter struct {
func (fi *forwardIter) NextTxn(flags TxnLoadFlags) error {
// XXX from what we start? how to yield 1st elem?
err := fi.Txnh.LoadNext(flags)
err := fi.Txnh.LoadNext(fi.fs.file, flags)
if err != nil {
return err
}
......@@ -449,17 +514,18 @@ type FileStorageIterator struct {
func (fsi *FileStorageIterator) NextTxn(txnInfo *zodb.TxnInfo) (dataIter zodb.IStorageRecordIterator, stop bool, err error) {
err = fsi.forwardIter.NextTxn(LoadAll)
if err != nil {
return nil, err // XXX recheck
return nil, false, err // XXX recheck
}
*txnInfo = fsi.forwardIter.Txnh.TxnInfo
// TODO set dataIter
return dataIter, nil
return dataIter, false, nil
}
func (fs *FileStorage) Iterate(tidMin, tidMax zodb.Tid) zodb.IStorageIterator {
/*
if tidMin < fs.tidMin {
tidMin = fs.tidMin
}
......@@ -470,7 +536,7 @@ func (fs *FileStorage) Iterate(tidMin, tidMax zodb.Tid) zodb.IStorageIterator {
(tidMin - fs.TidMin) vs (fs.TidMax - tidMin)
if forward {
iter = forwardIter{4, tidMin}
iter = forwardIter{len(Magic), tidMin}
} else {
iter = backwardIter{fs.topPos, tidMin}
}
......@@ -484,4 +550,6 @@ func (fs *FileStorage) Iterate(tidMin, tidMax zodb.Tid) zodb.IStorageIterator {
if t
return &FileStorageIterator{-1, tidMin, tidMax} // XXX -1 ok ?
*/
return nil
}
// XXX license/copyright
// Copyright (C) 2017 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 2, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// FileStorage v1. Tests XXX text
package fs1
import (
......
......@@ -11,6 +11,7 @@
// See COPYING file for full licensing terms.
//
// XXX partly based on code from ZODB ?
// TODO link to format in zodb/py
// FileStorage v1. Index
package fs1
......@@ -33,8 +34,8 @@ import (
"lab.nexedi.com/kirr/go123/mem"
)
// fsIndex is Oid -> Tid's position mapping used to associate Oid with latest
// transaction which changed it.
// fsIndex is Oid -> Data record position mapping used to associate Oid with
// Data record in latest transaction which changed it. XXX text
type fsIndex struct {
*fsb.Tree
}
......
......@@ -36,6 +36,15 @@ const (
//Oid0 Oid = 0 // XXX -> simply Oid(0)
)
func (tid Tid) Valid() bool {
// XXX if Tid becomes signed also check wrt 0
if tid <= TidMax {
return true
} else {
return false
}
}
func (tid Tid) String() string {
// XXX also print "tid:" prefix ?
return fmt.Sprintf("%016x", uint64(tid))
......@@ -97,6 +106,17 @@ const (
TxnInprogress = 'c' // checkpoint -- a transaction in progress; it's been thru vote() but not finish()
)
// Valid returns true if transaction status value is well-known and valid
func (ts TxnStatus) Valid() bool {
switch ts {
case TxnComplete, TxnPacked, TxnInprogress:
return true
default:
return false
}
}
// Metadata information about single transaction
type TxnInfo struct {
Tid Tid
......@@ -154,12 +174,12 @@ type IStorageIterator interface {
// NextTxn yields information about next database transaction:
// 1. transaction metadata, and
// 2. iterator over transaction data records.
// transaction mentadata is put into *txnInfo stays valid until next call to NextTxn().
// transaction metadata is put into *txnInfo and stays valid until next call to NextTxn().
NextTxn(txnInfo *TxnInfo) (dataIter IStorageRecordIterator, stop bool, err error) // XXX stop -> io.EOF ?
}
type IStorageRecordIterator interface { // XXX naming -> IRecordIterator
// NextData puts information about next storage data record into *dataInfo.
// data put into *dataInfo stays vaild until next call to NextData().
// data put into *dataInfo stays valid until next call to NextData().
NextData(dataInfo *StorageRecordInformation) (stop bool, err error) // XXX stop -> io.EOF ?
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment