Commit 48e38f91 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 2c0345b9
......@@ -949,7 +949,7 @@ func (c *Conn) Expect(msgv ...Msg) (which int, err error) {
return -1, &ConnError{c, "recv", fmt.Errorf("unexpected message: %v", msgType)}
}
// Ask sends request and receives response
// Ask sends request and receives response.
// It expects response to be exactly of resp type and errors otherwise
// XXX clarify error semantic (when Error is decoded)
// XXX do the same as Expect wrt respv ?
......
......@@ -27,6 +27,8 @@ import (
// "math"
"sync"
"github.com/pkg/errors"
"lab.nexedi.com/kirr/neo/go/neo"
"lab.nexedi.com/kirr/neo/go/zodb"
"lab.nexedi.com/kirr/neo/go/xcommon/xnet"
......@@ -150,6 +152,7 @@ func (m *Master) Run(ctx context.Context) error {
}
m.node.MasterAddr = l.Addr().String()
m.logf("serving on %s ...", m.node.MasterAddr)
// accept incoming connections and pass them to main driver
wg := sync.WaitGroup{}
......@@ -204,7 +207,7 @@ func (m *Master) runMain(ctx context.Context) (err error) {
// a command came to us to start the cluster.
err := m.recovery(ctx)
if err != nil {
fmt.Println(err)
m.log(err)
return err // recovery cancelled
}
......@@ -212,14 +215,14 @@ func (m *Master) runMain(ctx context.Context) (err error) {
// case previously it was unclean shutdown.
err = m.verify(ctx)
if err != nil {
fmt.Println(err)
m.log(err)
continue // -> recovery
}
// provide service as long as partition table stays operational
err = m.service(ctx)
if err != nil {
fmt.Println(err)
m.log(err)
continue // -> recovery
}
......@@ -255,7 +258,7 @@ type storRecovery struct {
// - nil: recovery was ok and a command came for cluster to start
// - !nil: recovery was cancelled
func (m *Master) recovery(ctx context.Context) (err error) {
fmt.Println("master: recovery")
m.log("recovery")
defer xerr.Context(&err, "master: recovery")
m.setClusterState(neo.ClusterRecovering)
......@@ -282,7 +285,7 @@ loop:
select {
// new connection comes in
case n := <-m.nodeCome:
node, resp := m.accept(n, /* XXX only accept storages -> PENDING */)
node, resp := m.identify(n, /* XXX only accept storages -> PENDING */)
// if new storage arrived - start recovery on it too
wg.Add(1)
......@@ -294,10 +297,10 @@ loop:
return
}
err := m.welcome(n.conn, resp)
err := m.accept(n.conn, resp)
if err != nil {
// XXX better m.nodeLeave <- nodeLeave{node, err} ?
// XXX move this m.nodeLeave <- to welcome() ?
// XXX move this m.nodeLeave <- to accept() ?
recovery <- storRecovery{node: node, err: err}
}
......@@ -305,36 +308,47 @@ loop:
storCtlRecovery(rctx, node, recovery)
}()
// XXX calrify who sends here
/*
case n := <-m.nodeLeave:
// XXX close n.node.Link ?
// TODO close n.node.Link
//
// XXX dup wrt recovery err != nil -> func
m.nodeTab.SetNodeState(n.node, DOWN)
// XXX update something indicating cluster currently can be operational or not ?
*/
// a storage node came through recovery - let's see whether
// ptid ↑ and if so we should take partition table from there
case r := <-recovery:
// inprogress--
if r.err != nil {
// XXX err ctx?
// XXX log here or in producer?
fmt.Printf("master: %v\n", r.err)
m.logf("%v", r.err)
// XXX close stor link / update .nodeTab
break
}
// XXX only if not cancelled
m.logf("master: %v: closing link", r.node.Link)
// close stor link / update .nodeTab
err := r.node.Link.Close()
if err != nil {
m.logf("master: %v\n", r.node.Link)
}
m.nodeTab.SetNodeState(r.node, DOWN)
// we are interested in latest partTab
// NOTE during recovery no one must be subscribed to
// partTab so it is ok to simply change whole m.partTab
if r.partTab.PTid > m.partTab.PTid {
m.partTab = r.partTab
} else {
// we are interested in latest partTab
// NOTE during recovery no one must be subscribed to
// partTab so it is ok to simply change whole m.partTab
if r.partTab.PTid > m.partTab.PTid {
m.partTab = r.partTab
}
}
// XXX update something indicating cluster currently can be operational or not
// XXX handle case of new cluster - when no storage reports valid parttab
// XXX -> create new parttab
// XXX update something indicating cluster currently can be operational or not ?
// request to start the cluster - if ok we exit replying ok
......@@ -365,12 +379,7 @@ loop:
}
}
// // consume left recovery responses (which should come without delay since it was cancelled)
// for ; inprogress > 0; inprogress-- {
// <-recovery
// }
// wait all workers finish (which should come without delay since it was cancelled)
// wait all workers to finish (which should come without delay since it was cancelled)
done := make(chan struct{})
go func() {
wg.Wait()
......@@ -379,7 +388,18 @@ loop:
for {
select {
// XXX <-m.nodeLeave ?
case <-recovery:
// we do not care errors here - they are either cancelled or IO errors
// we just log them and return - in case it is IO error
// on link it will be caught on next send/recv XXX
switch errors.Cause(r.err) {
case context.Canceled, context.DeadlineExceeded:
// ok
default:
// XXX not so ok
}
case <-done:
break
}
......@@ -392,29 +412,25 @@ loop:
// it retrieves various ids and partition table from as stored on the storage
func storCtlRecovery(ctx context.Context, stor *neo.Node, res chan storRecovery) {
var err error
// XXX where this close link on error should be ?
defer func() {
if err == nil {
return
}
// XXX on err still provide feedback to storRecovery chan ?
// on error provide feedback to storRecovery chan
res <- storRecovery{node: stor, err: err}
/*
fmt.Printf("master: %v", err)
// this must interrupt everything connected to stor node and
// thus eventually result in nodeLeave event to main driver
link.Close()
*/
}()
defer xerr.Contextf(&err, "%s: stor recovery", stor.link)
conn, err := stor.link.NewConn() // FIXME bad -> not bad
conn, err := stor.link.NewConn()
if err != nil {
return
}
defer func() {
err2 := conn.Close()
err = xerr.First(err, err2)
}()
// XXX cancel on ctx
recovery := neo.AnswerRecovery{}
......@@ -475,7 +491,7 @@ var errClusterDegraded = errors.New("cluster became non-operatonal")
//
// prerequisite for start: .partTab is operational wrt .nodeTab
func (m *Master) verify(ctx context.Context) (err error) {
fmt.Println("master: verify")
m.log("verify")
defer xerr.Context(&err, "master: verify")
m.setClusterState(neo.ClusterVerifying)
......@@ -503,7 +519,7 @@ loop:
for inprogress > 0 {
select {
case n := <-m.nodeCome:
node, ok := m.accept(n, /* XXX only accept storages -> known ? RUNNING : PENDING */)
node, ok := m.identify(n, /* XXX only accept storages -> known ? RUNNING : PENDING */)
if !ok {
break
}
......@@ -531,7 +547,7 @@ loop:
inprogress--
if v.err != nil {
fmt.Printf("master: verify: %v\n", v.err)
m.logf("verify: %v", v.err)
// mark storage as non-working in nodeTab
// FIXME better -> v.node.setState(DOWN) ?
......@@ -644,7 +660,7 @@ func storCtlVerify(ctx context.Context, link *neo.NodeLink, res chan storVerify)
//
// prerequisite for start: .partTab is operational wrt .nodeTab and verification passed
func (m *Master) service(ctx context.Context) (err error) {
fmt.Println("master: service")
m.log("service")
defer xerr.Context(&err, "master: service")
// XXX we also need to tell storages StartOperation first
......@@ -657,7 +673,7 @@ loop:
select {
// a node connected and requests identification
case n := <-m.nodeCome:
node, resp, ok := m.accept(n, /* XXX accept everyone */)
node, resp, ok := m.identify(n, /* XXX accept everyone */)
state := m.ClusterState
......@@ -671,7 +687,7 @@ loop:
return
}
err = welcome(n.conn, resp)
err = accept(n.conn, resp)
if err {
// XXX
}
......@@ -702,7 +718,7 @@ loop:
if r.err != nil {
// XXX err ctx?
// XXX log here or in producer?
fmt.Printf("master: %v\n", r.err)
m.logf("%v", r.err)
// TODO close stor link / update .nodeTab
break
......@@ -740,7 +756,7 @@ loop:
// XXX was "stop verification if so"
case v := <-verify:
if v.err != nil {
fmt.Printf("master: verify: %v\n", v.err)
m.logf("verify: %v", v.err)
// mark storage as non-working in nodeTab
// FIXME better -> v.node.setState(DOWN) ?
......@@ -819,11 +835,11 @@ loop:
return err
}
// accept processes identification request of just connected node and either accepts or declines it.
// If node identification is accepted nodeTab is updated and corresponding node entry is returned.
// identify processes identification request of just connected node and either accepts or declines it.
// If node identification is accepted .nodeTab is updated and corresponding node entry is returned.
// Response message is constructed but not send back not to block the caller - it is
// the caller responsibility to send the response to node which requested identification.
func (m *Master) accept(n nodeCome) (node *neo.Node, resp neo.Msg) {
func (m *Master) identify(n nodeCome) (node *neo.Node, resp neo.Msg) {
// TODO log node accept/rejected
// XXX also verify ? :
......@@ -899,8 +915,10 @@ func (m *Master) reject(conn *neo.Conn, resp neo.Msg) {
log xerr.Merge(err1, err2, err3)
}
// welcome sends acceptive identification response and closes conn
func (m *Master) welcome(conn *neo.Conn, resp neo.Msg) error {
// accept sends acceptive identification response and closes conn
// XXX if problem -> .nodeLeave
// XXX spawn ping goroutine from here?
func (m *Master) accept(conn *neo.Conn, resp neo.Msg) error {
// XXX cancel on ctx
err1 := conn.Send(resp)
err2 := conn.Close()
......@@ -930,7 +948,7 @@ func (m *Master) allocUUID(nodeType neo.NodeType) neo.NodeUUID {
// XXX +error return?
func (m *Master) ServeLink(ctx context.Context, link *neo.NodeLink) {
logf := func(format string, argv ...interface{}) {
fmt.Printf("master: %s: " + format + "\n", append([]interface{}{link}, argv...)...)
m.logf("%s: " + format + "\n", append([]interface{}{link}, argv...)...)
}
logf("serving new node")
......@@ -1236,3 +1254,26 @@ func (m *Master) ServeAdmin(ctx context.Context, conn *neo.Conn) {
func (m *Master) ServeMaster(ctx context.Context, conn *neo.Conn) {
// TODO (for elections)
}
// ---- utils ----
// -> Logger
//func (m *Master) log(v interface{}) {
// XXX
//}
func (m *Master) logf(format string, argv ...interface{}) {
// TODO support custom log.Logger
log.Output(2, fmt.Sprintf("master @%v: " + format, append([]string{m.MasterAddr}, argv...)...))
}
func (m *Master) vlogf(format string, argv ...interface{}) {
// XXX -> verbose settings
if false {
return
}
m.log(format, argv)
}
......@@ -143,9 +143,9 @@ func (stor *Storage) talkMaster(ctx context.Context) error {
// XXX errctx
for {
fmt.Printf("stor: master(%v): connecting\n", stor.node.MasterAddr) // XXX info
stor.vlogf("master(%v): connecting ...", stor.node.MasterAddr)
err := stor.talkMaster1(ctx)
fmt.Printf("stor: master(%v): %v\n", stor.node.MasterAddr, err)
stor.logf("master(%v): %v", stor.node.MasterAddr, err)
// TODO if err = shutdown -> return
......@@ -186,7 +186,7 @@ func (stor *Storage) talkMaster1(ctx context.Context) (err error) {
// XXX -> node.Dial ?
if accept.YourNodeUUID != stor.node.MyInfo.NodeUUID {
fmt.Printf("stor: %v: master told us to have UUID=%v\n", Mlink, accept.YourNodeUUID)
stor.logf("%v: master told us to have UUID=%v", Mlink, accept.YourNodeUUID)
stor.node.MyInfo.NodeUUID = accept.YourNodeUUID
}
......@@ -224,13 +224,13 @@ func (stor *Storage) talkMaster1(ctx context.Context) (err error) {
// let master initialize us. If successful this ends with StartOperation command.
err = stor.m1initialize(ctx, Mconn)
if err != nil {
fmt.Println("stor: %v: master: %v", err)
stor.logf("%v: master: %v", Mconn, err)
continue // retry initializing
}
// we got StartOperation command. Let master drive us during servicing phase.
err = stor.m1serve(ctx, Mconn)
fmt.Println("stor: %v: master: %v", err)
stor.logf("%v: master: %v", Mconn, err)
continue // retry from initializing
}
......@@ -358,7 +358,7 @@ func (stor *Storage) m1serve(ctx context.Context, Mconn *neo.Conn) (err error) {
// ServeLink serves incoming node-node link connection
// XXX +error return?
func (stor *Storage) ServeLink(ctx context.Context, link *neo.NodeLink) {
fmt.Printf("stor: %s: serving new node\n", link)
stor.logf("%s: serving new node", link)
// close link when either cancelling or returning (e.g. due to an error)
// ( when cancelling - link.Close will signal to all current IO to
......@@ -373,7 +373,7 @@ func (stor *Storage) ServeLink(ctx context.Context, link *neo.NodeLink) {
// XXX ret err = ctx.Err()
case <-retch:
}
fmt.Printf("stor: %v: closing link\n", link)
stor.logf("%v: closing link", link)
link.Close() // XXX err
}()
......@@ -381,7 +381,7 @@ func (stor *Storage) ServeLink(ctx context.Context, link *neo.NodeLink) {
// XXX only accept when operational (?)
nodeInfo, err := IdentifyPeer(link, neo.STORAGE)
if err != nil {
fmt.Printf("stor: %v\n", err)
stor.logf("%v", err)
return
}
......@@ -392,7 +392,7 @@ func (stor *Storage) ServeLink(ctx context.Context, link *neo.NodeLink) {
default:
// XXX vvv should be reply to peer
fmt.Printf("stor: %v: unexpected peer type: %v\n", link, nodeInfo.NodeType)
stor.logf("%v: unexpected peer type: %v", link, nodeInfo.NodeType)
return
}
......@@ -400,7 +400,7 @@ func (stor *Storage) ServeLink(ctx context.Context, link *neo.NodeLink) {
for {
conn, err := link.Accept()
if err != nil {
fmt.Printf("stor: %v\n", err) // XXX err ctx
stor.logf("%v", err) // XXX err ctx
break
}
......@@ -426,7 +426,7 @@ func (stor *Storage) withWhileOperational(ctx context.Context) (context.Context,
// the connection is closed when serveClient returns
// XXX +error return?
func (stor *Storage) serveClient(ctx context.Context, conn *neo.Conn) {
fmt.Printf("stor: %s: serving new client conn\n", conn)
stor.logf("%s: serving new client conn", conn)
// rederive ctx to be also cancelled if M tells us StopOperation
ctx, cancel := stor.withWhileOperational(ctx)
......@@ -457,9 +457,9 @@ func (stor *Storage) serveClient(ctx context.Context, conn *neo.Conn) {
case err = <-done:
}
fmt.Printf("stor: %v: %v\n", conn, err)
stor.logf("%v: %v", conn, err)
// XXX vvv -> defer ?
fmt.Printf("stor: %v: closing client conn\n", conn)
stor.logf("%v: closing client conn", conn)
conn.Close() // XXX err
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment