Commit ad1c9f61 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent cf2128bf
......@@ -29,6 +29,8 @@ import (
"lab.nexedi.com/kirr/neo/go/neo"
"lab.nexedi.com/kirr/neo/go/zodb"
"lab.nexedi.com/kirr/neo/go/xcommon/xnet"
"lab.nexedi.com/kirr/go123/xerr"
)
// XXX fmt -> log
......@@ -102,8 +104,8 @@ func (stor *Storage) Run(ctx context.Context) error {
stor.myInfo.Address = addr
// start serving incoming connections
wg := sync.WaitGroup{}
serveCtx, serveCancel := context.WithCancel(ctx)
wg.Add(1)
go func() {
......@@ -112,18 +114,20 @@ func (stor *Storage) Run(ctx context.Context) error {
_ = err // XXX what to do with err ?
}()
// connect to master and get commands and updates from it
err = stor.talkMaster(ctx)
// we are done - shutdown
serveCancel()
wg.Wait()
return err // XXX err ctx
}
// talkMaster connects to master, announces self and receives notifications and commands
// XXX and notifies master about ? (e.g. StartOperation -> NotifyReady)
// talkMaster connects to master, announces self and receives commands and notifications
// it tries to persist master link reconnecting as needed
//
// it always return an error - either due to cancel or commannd from master to shutdown
// it always returns an error - either due to cancel or commannd from master to shutdown
func (stor *Storage) talkMaster(ctx context.Context) error {
// XXX errctx
......@@ -173,83 +177,118 @@ func (stor *Storage) talkMaster1(ctx context.Context) error {
if accept.YourNodeUUID != stor.myInfo.NodeUUID {
fmt.Printf("stor: %v: master told us to have UUID=%v\n", Mlink, accept.YourNodeUUID)
stor.myInfo.NodeUUID = accept.YourNodeUUID
// XXX notify anyone?
}
// now handle notifications and commands from master
for {
// XXX every new connection from master means previous connection was closed
// XXX how to do so and stay compatible to py?
//
// XXX or simply use only the first connection and if M decides
// to cancel - close whole nodelink and S reconnects?
Mconn, err := Mlink.Accept()
if err != nil {
return // XXX ?
return err // XXX ?
}
err = stor.m1initialize(ctx, Mconn)
if err != nil {
panic(err) // XXX
fmt.Println("stor: %v: master: %v", err)
// XXX recheck closing Mconn
continue // retry initializing
}
err = stor.m1serve(ctx, Mconn)
if err != nil {
panic(err) // XXX
}
fmt.Println("stor: %v: master: %v", err)
// XXX check if it was command to shotdown and if so break
continue // retry from initializing
}
return nil // XXX err
}
// m1initialize drives storage by master messages during initialization phase
//
// when it finishes error indicates:
// Initialization includes master retrieving info for cluster recovery and data
// verification before starting operation. Initialization finishes either
// successfully with receiving master commanding to start operation, or
// unsuccessfully with connection closing indicating initialization was
// cancelled or some other error.
//
// return error indicates:
// - nil: initialization was ok and a command came from master to start operation
// - !nil: initialization was cancelled or failed somehow
func (stor *Storage) m1initialize(ctx context.Context, Mconn *neo.Conn) error {
func (stor *Storage) m1initialize(ctx context.Context, Mconn *neo.Conn) (err error) {
defer xerr.Context(&err, "init")
for {
msg, err := Mconn.Recv() // XXX abort on ctx (XXX or upper?)
// XXX abort on ctx (XXX or upper?)
msg, err := Mconn.Recv()
if err != nil {
panic(err) // XXX
return err
}
switch msg.(type) {
case *neo.AskRecovery:
// TODO send M (ptid, backup_tid, truncate_tid)
default:
return fmt.Errorf("unexpected message: %T", msg)
case *neo.StartOperation:
// ok, transition to serve
return nil
case *neo.Recovery:
err = Mconn.Send(&neo.AnswerRecovery{
PTid: 0, // XXX stub
BackupTid: neo.INVALID_TID,
TruncateTid: neo.INVALID_TID})
case *neo.AskPartitionTable:
// TODO read and send M locally-saved PT (ptid, []PtRow)
case *neo.AskLockedTransaction:
// TODO
case *neo.LockedTransactions:
// XXX r/o stub
err = Mconn.Send(&neo.AnswerLockedTransactions{})
case *neo.AskLastIDs:
// TODO send M (last_oid, last_tid)
// TODO AskUnfinishedTransactions
case *neo.LastIDs:
lastTid, zerr1 := stor.zstor.LastTid()
lastOid, zerr2 := stor.zstor.LastOid()
if zerr := xerr.First(zerr1, zerr2); zerr != nil {
return zerr
}
err = Mconn.Send(&neo.AnswerLastIDs{LastTid: lastTid, LastOid: lastOid})
case *neo.NotifyPartitionTable:
// TODO save locally what M told us
case *neo.NotifyClusterInformation:
case *neo.NotifyClusterState:
// TODO .clusterState = ... XXX what to do with it?
case *neo.NotifyNodeInformation:
// XXX check for myUUID and condier it a command (like neo/py) does?
// XXX check for myUUID and consider it a command (like neo/py) does?
// TODO update .nodeTab
case *neo.StartOperation:
return nil // ok
}
default:
// XXX
if err != nil {
return err
}
}
}
// m1serve drives storage by master messages during service hase
//
// XXX err return - document
func (stor *Storage) m1serve(ctx contextContext, Mconn *neo.Conn) error {
// refresh stor.opCtx and cancel it when we finish
// it always returns with an error describing why serve has to be stopped -
// either due to master commanding us to stop, or context cancel or some other
// error.
func (stor *Storage) m1serve(ctx context.Context, Mconn *neo.Conn) (err error) {
defer xerr.Context(&err, "serve")
// refresh stor.opCtx and cancel it when we finish so that client
// handlers know they need to stop operating
opCtx, opCancel := context.WithCancel(ctx)
stor.opMu.Lock()
stor.opCtx = opCtx
......@@ -257,14 +296,27 @@ func (stor *Storage) m1serve(ctx contextContext, Mconn *neo.Conn) error {
defer opCancel()
// reply M we are ready
err := Mconn.Send(neo.NotifyReady{})
err = Mconn.Send(&neo.NotifyReady{})
if err != nil {
return err // XXX err ctx
return err
}
// TODO handle M notifications and commands
for {
// TODO
// XXX abort on ctx (XXX or upper?)
msg, err := Mconn.Recv()
if err != nil {
return err
}
switch msg.(type) {
default:
return fmt.Errorf("unexpected message: %T", msg)
case *neo.StopOperation:
return fmt.Errorf("stop requested")
// TODO commit related messages
}
}
}
......
......@@ -25,7 +25,6 @@ Format is the same as in fstail/py originally written by Jeremy Hylton:
https://github.com/zopefoundation/ZODB/blob/master/src/ZODB/scripts/fstail.py
https://github.com/zopefoundation/ZODB/commit/551122cc
*/
package main
import (
......
......@@ -20,7 +20,8 @@
// XXX partly based on code from ZODB ?
// TODO link to format in zodb/py
// Package fs1 implements so-called FileStorage v1 ZODB storage. XXX text
// Package fs1 implements so-called FileStorage v1 ZODB storage.
// XXX text
package fs1
import (
......@@ -225,7 +226,8 @@ const (
LoadNoStrings = 0x01 // do not load user/desc/ext strings
)
// Load reads and decodes transaction record header
// Load reads and decodes transaction record header.
//
// pos: points to transaction start
// no prerequisite requirements are made to previous txnh state
// TODO describe what happens at EOF and when .LenPrev is still valid
......@@ -360,7 +362,8 @@ func (txnh *TxnHeader) loadStrings(r io.ReaderAt /* *os.File */) error {
return nil
}
// LoadPrev reads and decodes previous transaction record header
// LoadPrev reads and decodes previous transaction record header.
//
// prerequisite: txnh .Pos, .LenPrev and .Len are initialized: XXX (.Len for .Tid)
// - by successful call to Load() initially XXX but EOF also works
// - by subsequent successful calls to LoadPrev / LoadNext XXX recheck
......@@ -435,7 +438,7 @@ func (txnh *TxnHeader) LoadNext(r io.ReaderAt, flags TxnLoadFlags) error {
// --- Data record ---
// Len returns whole data record length
// Len returns whole data record length.
func (dh *DataHeader) Len() int64 {
dataLen := dh.DataLen
if dataLen == 0 {
......@@ -447,7 +450,7 @@ func (dh *DataHeader) Len() int64 {
}
// load reads and decodes data record header
// Load reads and decodes data record header.
// pos: points to data header start
// no prerequisite requirements are made to previous dh state
func (dh *DataHeader) Load(r io.ReaderAt /* *os.File */, pos int64) error {
......@@ -502,7 +505,7 @@ func (dh *DataHeader) Load(r io.ReaderAt /* *os.File */, pos int64) error {
return nil
}
// LoadPrevRev reads and decodes previous revision data record header
// LoadPrevRev reads and decodes previous revision data record header.
// prerequisite: dh .Oid .Tid .PrevRevPos are initialized:
// - TODO describe how
// when there is no previous revision: io.EOF is returned
......@@ -543,7 +546,7 @@ func (dh *DataHeader) loadPrevRev(r io.ReaderAt /* *os.File */) error {
return nil
}
// LoadBack reads and decodes data header for revision linked via back-pointer
// LoadBack reads and decodes data header for revision linked via back-pointer.
// prerequisite: dh XXX .DataLen == 0
// if link is to zero (means deleted record) io.EOF is returned
func (dh *DataHeader) LoadBack(r io.ReaderAt /* *os.File */) error {
......@@ -593,7 +596,7 @@ func (dh *DataHeader) LoadBack(r io.ReaderAt /* *os.File */) error {
return err
}
// LoadNext reads and decodes data header for next data record in the same transaction
// LoadNext reads and decodes data header for next data record in the same transaction.
// prerequisite: dh .Pos .DataLen are initialized
// when there is no more data records: io.EOF is returned
func (dh *DataHeader) LoadNext(r io.ReaderAt /* *os.File */, txnh *TxnHeader) error {
......@@ -637,7 +640,7 @@ func (dh *DataHeader) loadNext(r io.ReaderAt /* *os.File */, txnh *TxnHeader) er
return nil
}
// LoadData loads data for the data record taking backpointers into account
// LoadData loads data for the data record taking backpointers into account.
// Data is loaded into *buf, which, if needed, is reallocated to hold all loading data size XXX
// NOTE on success dh state is changed to data header of original data transaction
// NOTE "deleted" records are indicated via returning *buf=nil
......@@ -723,10 +726,20 @@ func Open(ctx context.Context, path string) (*FileStorage, error) {
func (fs *FileStorage) LastTid() (zodb.Tid, error) {
// XXX check we have transactions at all
// XXX what to return then?
// XXX what to return if not?
// XXX must be under lock
return fs.txnhMax.Tid, nil // XXX error always nil ?
}
func (fs *FileStorage) LastOid() (zodb.Oid, error) {
// XXX check we have objects at all?
// XXX what to return if not?
// XXX must be under lock
// XXX what if an oid was deleted?
lastOid, _ := fs.index.Last() // returns zero-value, if empty
return lastOid, nil // XXX error always nil?
}
// ErrXidLoad is returned when there is an error while loading xid
type ErrXidLoad struct {
Xid zodb.Xid
......
// DO NOT EDIT - AUTOGENERATED (by gen-fsbtree from github.com/cznic/b 93348d0)
// Code generated by gen-fsbtree from github.com/cznic/b 93348d0; DO NOT EDIT.
// KEY=zodb.Oid VALUE=int64
// ---- 8< ----
......
// DO NOT EDIT - AUTOGENERATED (by gen-fsbtree from github.com/cznic/b 93348d0)
// Code generated by gen-fsbtree from github.com/cznic/b 93348d0; DO NOT EDIT.
// ---- 8< ----
package fsb
import (
"bytes"
......
......@@ -36,8 +36,9 @@ make -s -C $Bdir generic |sed \
# also extract dump() routine
out=fsbtree_util.go
echo "// DO NOT EDIT - AUTOGENERATED (by gen-fsbtree from $b $Brev)" >$out
echo "// Code generated by gen-fsbtree from $b $Brev; DO NOT EDIT." >$out
echo "// ---- 8< ----" >>$out
echo >>$out
cat >>$out <<EOF
package fsb
import (
......
......@@ -325,7 +325,7 @@ out:
return 0, nil, &IndexLoadError{xio.Name(r), picklePos, err}
}
// LoadIndexFile loads index from a file
// LoadIndexFile loads index from a file @ path
func LoadIndexFile(path string) (topPos int64, fsi *fsIndex, err error) {
f, err := os.Open(path)
if err != nil {
......
......@@ -139,9 +139,14 @@ type IStorage interface {
// History(oid, size=1)
// LastTid returns the id of the last committed transaction.
// if not transactions have been committed yet, LastTid returns Tid zero value
// if no transactions have been committed yet, LastTid returns Tid zero value
LastTid() (Tid, error)
// LastOid returns highest object id of objects committed to storage.
// if there is no data committed yet, LastOid returns Oid zero value
// XXX ZODB/py does not define this in IStorage
LastOid() (Oid, error)
// LoadSerial and LoadBefore generalized into 1 Load (see Xid for details)
// TODO data []byte -> something allocated from slab ?
// XXX currently deleted data is returned as data=nil -- is it ok?
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment