Commit 515220ef authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent e72a5d80
...@@ -36,6 +36,13 @@ var _ zodb.IStorage = (*FileStorage)(nil) ...@@ -36,6 +36,13 @@ var _ zodb.IStorage = (*FileStorage)(nil)
// TxnHeader represents header of a transaction record // TxnHeader represents header of a transaction record
type TxnHeader struct { type TxnHeader struct {
RecLenm8 uint64 // XXX consider reorder with zodb.TxnInfo ?
zodb.TxnInfo
// underlying storage for user/desc/extension
stringsRam []byte
/*
Tid zodb.Tid Tid zodb.Tid
RecLenm8 uint64 RecLenm8 uint64
Status zodb.TxnStatus Status zodb.TxnStatus
...@@ -45,10 +52,25 @@ type TxnHeader struct { ...@@ -45,10 +52,25 @@ type TxnHeader struct {
User []byte // TODO Encode ^^^ User []byte // TODO Encode ^^^
Description []byte Description []byte
Extension []byte Extension []byte
*/
//Datav []DataRec //Datav []DataRec
} }
const txnHeaderFixSize = 8+8+1+2+2+2 // = 23
// ErrTxnRecord is returned on transaction record read / decode errors
// XXX merge with ErrDataRecord -> ErrRecord{pos, "transaction|data", "read", err} ?
type ErrTxnRecord struct {
Pos int64 // position of transaction record
Subj string // about what .Err is
Err error // actual error
}
func (e *ErrTxnRecord) Error() string {
return fmt.Sprintf("transaction record @%v: %v: %v", e.Pos, e.Subj, e.Err)
}
// DataHeader represents header of a data record // DataHeader represents header of a data record
type DataHeader struct { type DataHeader struct {
Oid zodb.Oid Oid zodb.Oid
...@@ -64,9 +86,9 @@ type DataHeader struct { ...@@ -64,9 +86,9 @@ type DataHeader struct {
// XXX include word0 ? // XXX include word0 ?
} }
const DataHeaderSize = 42 const dataHeaderSize = 8+8+8+8+2+8 // = 42
// ErrData is returned on data record read / decode errors // ErrDataRecord is returned on data record read / decode errors
type ErrDataRecord struct { type ErrDataRecord struct {
Pos int64 // position of data record Pos int64 // position of data record
Subj string // about what .Err is Subj string // about what .Err is
...@@ -81,12 +103,56 @@ func (e *ErrDataRecord) Error() string { ...@@ -81,12 +103,56 @@ func (e *ErrDataRecord) Error() string {
var ErrVersionNonZero = errors.New("non-zero version") var ErrVersionNonZero = errors.New("non-zero version")
// decode reads and decodes transactione record header from a readerAt
// XXX io.ReaderAt -> *os.File (if iface conv costly)
func (th *TxnHeader) decode(r io.ReaderAt, pos int64, tmpBuf *[txnHeaderFixSize]byte) (n int, err error) {
n, err = r.ReadAt(tmpBuf[:], pos)
// XXX EOF after txn header is not good
if err != nil {
return 0, &ErrTxnRecord{pos, "read", err}
}
th.Tid = zodb.Tid(binary.BigEndian.Uint64(tmpBuf[0:]))
th.RecLenm8 = uint64(binary.BigEndian.Uint64(tmpBuf[8:]))
th.Status = zodb.TxnStatus(tmpBuf[16])
luser := binary.BigEndian.Uint16(tmpBuf[17:])
ldesc := binary.BigEndian.Uint16(tmpBuf[19:])
lext := binary.BigEndian.Uint16(tmpBuf[21:])
lstr := int(luser) + int(ldesc) + int(lext)
if lstr <= cap(th.stringsRam) {
th.stringsRam = th.stringsRam[:lstr]
} else {
th.stringsRam = make([]byte, lstr)
}
n, err = r.ReadAt(th.stringsRam, pos + txnHeaderFixSize)
// XXX EOF after txn header is not good
if err != nil {
return 0, &ErrTxnRecord{pos, "read", err}
}
th.User = th.stringsRam[0:luser]
th.Description = th.stringsRam[luser:luser+ldesc]
th.Extension = th.stringsRam[luser+ldesc:luser+ldesc+lext]
return txnHeaderFixSize + lstr, nil
}
// XXX do we need Decode when decode() is there?
func (th *TxnHeader) Decode(r io.ReaderAt, pos int64) (n int, err error) {
var tmpBuf [txnHeaderFixSize]byte
return th.decode(r, pos, &tmpBuf)
}
// decode reads and decodes data record header from a readerAt // decode reads and decodes data record header from a readerAt
// XXX io.ReaderAt -> *os.File (if iface conv costly) // XXX io.ReaderAt -> *os.File (if iface conv costly)
func (dh *DataHeader) decode(r io.ReaderAt, pos int64, tmpBuf *[DataHeaderSize]byte) error { func (dh *DataHeader) decode(r io.ReaderAt, pos int64, tmpBuf *[dataHeaderSize]byte) error {
n, err := r.ReadAt(tmpBuf[:], pos) n, err := r.ReadAt(tmpBuf[:], pos)
if n == DataHeaderSize { // XXX vvv if EOF is after header - record is broken
if n == dataHeaderSize {
err = nil // we don't mind if it was EOF after full header read err = nil // we don't mind if it was EOF after full header read
} }
...@@ -110,7 +176,7 @@ func (dh *DataHeader) decode(r io.ReaderAt, pos int64, tmpBuf *[DataHeaderSize]b ...@@ -110,7 +176,7 @@ func (dh *DataHeader) decode(r io.ReaderAt, pos int64, tmpBuf *[DataHeaderSize]b
// XXX do we need Decode when decode() is there? // XXX do we need Decode when decode() is there?
func (dh *DataHeader) Decode(r io.ReaderAt, pos int64) error { func (dh *DataHeader) Decode(r io.ReaderAt, pos int64) error {
var tmpBuf [DataHeaderSize]byte var tmpBuf [dataHeaderSize]byte
return dh.decode(r, pos, &tmpBuf) return dh.decode(r, pos, &tmpBuf)
} }
...@@ -183,7 +249,7 @@ func (fs *FileStorage) Load(xid zodb.Xid) (data []byte, tid zodb.Tid, err error) ...@@ -183,7 +249,7 @@ func (fs *FileStorage) Load(xid zodb.Xid) (data []byte, tid zodb.Tid, err error)
// if dh.Tid >= prevTid { ... } // if dh.Tid >= prevTid { ... }
// if dh.TxnPos >= dataPos - TxnHeaderSize { ... } // if dh.TxnPos >= dataPos - TxnHeaderSize { ... }
// if dh.PrevDataRecPos >= dh.TxnPos - DataHeaderSize - 8 /* XXX */ { ... } // if dh.PrevDataRecPos >= dh.TxnPos - dataHeaderSize - 8 /* XXX */ { ... }
if dh.Tid < tidBefore { if dh.Tid < tidBefore {
break break
...@@ -210,7 +276,7 @@ func (fs *FileStorage) Load(xid zodb.Xid) (data []byte, tid zodb.Tid, err error) ...@@ -210,7 +276,7 @@ func (fs *FileStorage) Load(xid zodb.Xid) (data []byte, tid zodb.Tid, err error)
// scan via backpointers // scan via backpointers
for dh.DataLen == 0 { for dh.DataLen == 0 {
var xxx [8]byte // XXX escapes ? var xxx [8]byte // XXX escapes ?
_, err = fs.f.ReadAt(xxx[:], dataPos + DataHeaderSize) _, err = fs.f.ReadAt(xxx[:], dataPos + dataHeaderSize)
if err != nil { if err != nil {
panic(err) // XXX panic(err) // XXX
} }
...@@ -223,7 +289,7 @@ func (fs *FileStorage) Load(xid zodb.Xid) (data []byte, tid zodb.Tid, err error) ...@@ -223,7 +289,7 @@ func (fs *FileStorage) Load(xid zodb.Xid) (data []byte, tid zodb.Tid, err error)
// now read actual data // now read actual data
data = make([]byte, dh.DataLen) // TODO -> slab ? data = make([]byte, dh.DataLen) // TODO -> slab ?
n, err := fs.f.ReadAt(data, dataPos + DataHeaderSize) n, err := fs.f.ReadAt(data, dataPos + dataHeaderSize)
if n == len(data) { if n == len(data) {
err = nil // we don't mind to get EOF after full data read XXX ok? err = nil // we don't mind to get EOF after full data read XXX ok?
} }
......
...@@ -111,17 +111,38 @@ func TestLoad(t *testing.T) { ...@@ -111,17 +111,38 @@ func TestLoad(t *testing.T) {
txni := zodb.TxnInfo{} txni := zodb.TxnInfo{}
datai := zodb.StorageRecordInformation{} datai := zodb.StorageRecordInformation{}
var stop bool
for { // XXX vvv assumes i < j
ok, err := iter.NextTxn(&txni) // FIXME first tidMin and last tidMax
if err != nil { for k := i; ; k++ {
panic(err) // XXX err stop, err = iter.NextTxn(&txni)
} if stop || err != nil {
if !ok {
break break
} }
dbe = _1fs_dbEntryv[k]
if !reflect.DeepEqual(txni, dbe.Header.TxnInfo) {
t.Errorf("iterating tidMin..tidMac (TODO): step XXX: unexpected txn entry.\nhave: %v\nwant: %v", txni, dbe.Header.TxnInfo)
}
for {
stop, err = txni.Iter.NextData(&datai)
if err != nil {
panic(err) // XXX
}
if stop {
break
}
}
} }
// TODO check err
} }
} }
} }
...@@ -97,19 +97,13 @@ const ( ...@@ -97,19 +97,13 @@ const (
TxnInprogress = 'c' // checkpoint -- a transaction in progress; it's been thru vote() but not finish() TxnInprogress = 'c' // checkpoint -- a transaction in progress; it's been thru vote() but not finish()
) )
// Information about single transaction // Metadata information about single transaction
// XXX -> storage.ITransactionInformation
//type IStorageTransactionInformation interface {
//type StorageTransactionInformation struct {
type TxnInfo struct { type TxnInfo struct {
Tid Tid Tid Tid
Status TxnStatus Status TxnStatus
User []byte User []byte
Description []byte Description []byte
Extension []byte Extension []byte
// TODO iter -> IStorageRecordInformation
Iter IStorageRecordIterator
} }
// Information about single storage record // Information about single storage record
...@@ -157,13 +151,15 @@ type IStorage interface { ...@@ -157,13 +151,15 @@ type IStorage interface {
} }
type IStorageIterator interface { type IStorageIterator interface {
// NextTxn puts information about next storage transaction into *txnInfo. // NextTxn yields information about next database transaction:
// data put into *txnInfo stays valid until next call to NextTxn(). // 1. transaction metadata, and
NextTxn(txnInfo *TxnInfo) (ok bool, err error) // XXX ok -> stop ? // 2. iterator over transaction data records.
// transaction mentadata is put into *txnInfo stays valid until next call to NextTxn().
NextTxn(txnInfo *TxnInfo) (dataIter IStorageRecordIterator, stop bool, err error) // XXX stop -> io.EOF ?
} }
type IStorageRecordIterator interface { // XXX naming -> IRecordIterator type IStorageRecordIterator interface { // XXX naming -> IRecordIterator
// NextData puts information about next storage data record into *dataInfo. // NextData puts information about next storage data record into *dataInfo.
// data put into *dataInfo stays vaild until next call to NextData(). // data put into *dataInfo stays vaild until next call to NextData().
NextData(dataInfo *StorageRecordInformation) (ok bool, err error) NextData(dataInfo *StorageRecordInformation) (stop bool, err error) // XXX stop -> io.EOF ?
} }
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment