Commit 23870baf authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent fe8d33cd
......@@ -67,6 +67,8 @@ type Client struct {
// protected by .node.StateMu
operational bool // XXX <- somehow move to NodeApp?
opReady chan struct{} // reinitialized each time state becomes non-operational
url *url.URL // original URL we were opened with
}
var _ zodb.IStorage = (*Client)(nil)
......@@ -75,6 +77,10 @@ func (c *Client) StorageName() string {
return fmt.Sprintf("neo(%s)", c.node.ClusterName)
}
func (c *Client) URL() string {
return c.url.String()
}
// NewClient creates new client node.
//
// It will connect to master @masterAddr and identify with sepcified cluster name.
......@@ -521,7 +527,9 @@ func openClientByURL(ctx context.Context, u *url.URL, opt *zodb.OpenOptions) (zo
// XXX we are not passing ctx to NewClient - right?
// as ctx for open can be done after open finishes - not covering
// whole storage working lifetime.
return NewClient(u.User.Username(), u.Host, net), nil
c := NewClient(u.User.Username(), u.Host, net)
c.url = u // FIXME move this inside NewClient
return c, nil
}
func init() {
......
......@@ -68,6 +68,7 @@ import (
"fmt"
"io"
"log"
"net/url"
"os"
"sync"
......@@ -89,6 +90,8 @@ type FileStorage struct {
// XXX keep loaded with LoadNoStrings ?
txnhMin TxnHeader
txnhMax TxnHeader
url *url.URL // original URL we were opened with
}
// IStorage
......@@ -98,6 +101,10 @@ func (fs *FileStorage) StorageName() string {
return "FileStorage v1"
}
func (fs *FileStorage) URL() string {
return fs.url.String()
}
func (fs *FileStorage) LastTid(_ context.Context) (zodb.Tid, error) {
// XXX check we have transactions at all - what to return if not?
......@@ -418,7 +425,7 @@ func (fs *FileStorage) Iterate(tidMin, tidMax zodb.Tid) zodb.ITxnIterator {
return ziter
}
// --- open + rebuild index --- TODO review completely
// --- open + rebuild index ---
// open opens FileStorage without loading index
func open(path string) (_ *FileStorage, err error) {
......@@ -445,8 +452,6 @@ func open(path string) (_ *FileStorage, err error) {
// FIXME rework opening logic to support case when last txn was committed only partially
// determine topPos from file size
// if it is invalid (e.g. a transaction committed only half-way) we'll catch it
// while loading/recreating index XXX recheck this logic
fi, err := f.Stat()
if err != nil {
return nil, err
......@@ -454,13 +459,13 @@ func open(path string) (_ *FileStorage, err error) {
topPos := fi.Size()
// read tidMin/tidMax
// FIXME support empty file case -> then both txnhMin and txnhMax stays invalid
err = fs.txnhMin.Load(f, txnValidFrom, LoadAll) // XXX txnValidFrom here -> ?
err = fs.txnhMin.Load(f, txnValidFrom, LoadAll)
err = okEOF(err) // e.g. it is EOF when file is empty
if err != nil {
return nil, err
}
err = fs.txnhMax.Load(f, topPos, LoadAll)
// expect EOF but .LenPrev must be good
// expect EOF forward
// FIXME ^^^ it will be no EOF if a txn was committed only partially
if err != io.EOF {
if err == nil {
......@@ -468,15 +473,21 @@ func open(path string) (_ *FileStorage, err error) {
}
return nil, fmt.Errorf("%s: %s", f.Name(), err)
}
if fs.txnhMax.LenPrev <= 0 {
// .LenPrev must be good or EOF backward
switch fs.txnhMax.LenPrev {
case 0:
return nil, fmt.Errorf("%s: could not read LenPrev @%d (last transaction)", f.Name(), fs.txnhMax.Pos)
}
case -1:
// ok - EOF backward
default:
// .LenPrev is ok - read last previous record
err = fs.txnhMax.LoadPrev(f, LoadAll)
if err != nil {
return nil, err
}
}
return fs, nil
}
......@@ -496,59 +507,60 @@ func Open(ctx context.Context, path string) (_ *FileStorage, err error) {
}
}()
// load/rebuild index
err = fs.loadIndex()
// load-verify / rebuild index
err = fs.loadIndex(ctx)
if err != nil {
log.Print(err)
log.Printf("%s: index recompute...", path)
// XXX if !ro -> .reindex() which saves it
fs.index, err = fs.computeIndex(ctx)
if err != nil {
return nil, err
}
}
// TODO verify index is sane / topPos matches
// XXX zodb/py iirc scans 10 transactions back and verifies index against it
// XXX also if index is not good - it has to be just rebuild without open error
if fs.index.TopPos != fs.txnhMax.Pos + fs.txnhMax.Len {
return nil, fmt.Errorf("%s: inconsistent index topPos (TODO rebuild index)", path)
// TODO if opened !ro -> .saveIndex()
}
return fs, nil
}
func (fs *FileStorage) Close() error {
// TODO dump index if !ro
err := fs.file.Close()
if err != nil {
return err
}
fs.file = nil
// TODO if opened !ro -> .saveIndex()
return nil
}
func (fs *FileStorage) computeIndex(ctx context.Context) (index *Index, err error) {
// XXX lock?
fsSeq := xbufio.NewSeqReaderAt(fs.file)
return BuildIndex(ctx, fsSeq, nil/*XXX no progress*/)
return BuildIndex(ctx, fsSeq, nil/*no progress; XXX somehow log it? */)
}
// loadIndex loads on-disk index to RAM
func (fs *FileStorage) loadIndex() (err error) {
// loadIndex loads on-disk index to RAM and verifies it against data lightly
func (fs *FileStorage) loadIndex(ctx context.Context) (err error) {
// XXX lock?
// XXX LoadIndexFile already contains "%s: index load"
defer xerr.Contextf(&err, "%s", fs.file.Name())
index, err := LoadIndexFile(fs.file.Name() + ".index")
if err != nil {
return err // XXX err ctx
return err
}
topPos := fs.txnhMax.Pos + fs.txnhMax.Len
if index.TopPos != topPos {
return fmt.Errorf("inconsistent index topPos: data=%d index=%d", topPos, index.TopPos)
}
// XXX here?
// TODO verify index sane / topPos matches
if index.TopPos != fs.txnhMax.Pos + fs.txnhMax.Len {
panic("inconsistent index topPos") // XXX
// quickly verify index sanity for last 100 transactions
fsSeq := xbufio.NewSeqReaderAt(fs.file)
_, err = index.Verify(ctx, fsSeq, 100, nil/*no progress*/)
if err != nil {
return err
}
fs.index = index
......@@ -558,7 +570,7 @@ func (fs *FileStorage) loadIndex() (err error) {
// saveIndex flushes in-RAM index to disk
func (fs *FileStorage) saveIndex() (err error) {
// XXX lock?
defer xerr.Contextf(&err, "%s: index save", fs.file.Name())
defer xerr.Contextf(&err, "%s", fs.file.Name())
err = fs.index.SaveFile(fs.file.Name() + ".index")
if err != nil {
......@@ -568,20 +580,3 @@ func (fs *FileStorage) saveIndex() (err error) {
// XXX fsync here?
return nil
}
// Reindex rebuilds the index
//
// XXX -> not exported @ fs1
func (fs *FileStorage) reindex(ctx context.Context) error {
// XXX lock appends?
index, err := fs.computeIndex(ctx)
if err != nil {
return err
}
fs.index = index
err = fs.saveIndex()
return err // XXX ok?
}
......@@ -39,7 +39,11 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.OpenOptions) (zodb.ISt
return nil, fmt.Errorf("fs1: %s: TODO write mode not implemented", path)
}
return Open(ctx, path)
fs, err := Open(ctx, path)
if fs != nil {
fs.url = u // FIXME move this inside Open
}
return fs, err
}
func init() {
......
......@@ -134,8 +134,11 @@ func (e *ErrXidMissing) Error() string {
// IStorage is the interface provided by ZODB storages
type IStorage interface {
// StorageName returns storage name
StorageName() string
// URL returns URL of this storage
URL() string
// XXX also +StorageName() with storage driver name?
// Close closes storage
Close() error
......
......@@ -214,7 +214,7 @@ func (d *dumper) Dump(ctx context.Context, stor zodb.IStorage, tidMin, tidMax zo
}
if err != nil {
return fmt.Errorf("%s: dumping %v..%v: %v", stor, tidMin, tidMax, err)
return fmt.Errorf("%s: dump %v..%v: %v", stor.URL(), tidMin, tidMax, err)
}
return nil
......
......@@ -38,7 +38,7 @@ type paramFunc func(ctx context.Context, stor zodb.IStorage) (string, error)
var infov = []struct {name string; getParam paramFunc} {
// XXX e.g. stor.LastTid() should return err itself
{"name", func(ctx context.Context, stor zodb.IStorage) (string, error) {
return stor.StorageName(), nil
return stor.URL(), nil
}},
// TODO reenable size
// {"size", func(stor zodb.IStorage) (string, error) { return stor.StorageSize(), nil }},
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment