Commit 95d0183c authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent d77ccb51
...@@ -136,7 +136,7 @@ func (nt *NodeTable) GetByLink(link *NodeLink) *Node { ...@@ -136,7 +136,7 @@ func (nt *NodeTable) GetByLink(link *NodeLink) *Node {
} }
// XXX doc // XXX doc
func (nt *NodeTable) SetNodeState(node *Node, state neo.NodeState) { func (nt *NodeTable) SetNodeState(node *Node, state NodeState) {
node.NodeState = state node.NodeState = state
nt.notify(node.NodeInfo) nt.notify(node.NodeInfo)
} }
......
...@@ -22,7 +22,7 @@ package server ...@@ -22,7 +22,7 @@ package server
import ( import (
"context" "context"
"errors" stderrors "errors"
"fmt" "fmt"
// "math" // "math"
"sync" "sync"
...@@ -31,6 +31,8 @@ import ( ...@@ -31,6 +31,8 @@ import (
"lab.nexedi.com/kirr/neo/go/neo" "lab.nexedi.com/kirr/neo/go/neo"
"lab.nexedi.com/kirr/neo/go/zodb" "lab.nexedi.com/kirr/neo/go/zodb"
"lab.nexedi.com/kirr/neo/go/xcommon/log"
"lab.nexedi.com/kirr/neo/go/xcommon/xcontext"
"lab.nexedi.com/kirr/neo/go/xcommon/xnet" "lab.nexedi.com/kirr/neo/go/xcommon/xnet"
"lab.nexedi.com/kirr/go123/xerr" "lab.nexedi.com/kirr/go123/xerr"
...@@ -150,7 +152,7 @@ func (m *Master) Run(ctx context.Context) error { ...@@ -150,7 +152,7 @@ func (m *Master) Run(ctx context.Context) error {
if err != nil { if err != nil {
return err // XXX err ctx return err // XXX err ctx
} }
m.logf("serving on %s ...", l.Addr()) log.Infof(ctx, "serving on %s ...", l.Addr())
m.node.MasterAddr = l.Addr().String() m.node.MasterAddr = l.Addr().String()
...@@ -194,7 +196,7 @@ func (m *Master) Run(ctx context.Context) error { ...@@ -194,7 +196,7 @@ func (m *Master) Run(ctx context.Context) error {
// runMain is the process which implements main master cluster management logic: node tracking, cluster // runMain is the process which implements main master cluster management logic: node tracking, cluster
// state updates, scheduling data movement between storage nodes etc // state updates, scheduling data movement between storage nodes etc
func (m *Master) runMain(ctx context.Context) (err error) { func (m *Master) runMain(ctx context.Context) (err error) {
//defer xerr.Context(&err, "master: run") defer running(&ctx, "run")(&err) // XXX needed?
// NOTE Run's goroutine is the only mutator of nodeTab, partTab and other cluster state // NOTE Run's goroutine is the only mutator of nodeTab, partTab and other cluster state
...@@ -207,7 +209,7 @@ func (m *Master) runMain(ctx context.Context) (err error) { ...@@ -207,7 +209,7 @@ func (m *Master) runMain(ctx context.Context) (err error) {
// a command came to us to start the cluster. // a command came to us to start the cluster.
err := m.recovery(ctx) err := m.recovery(ctx)
if err != nil { if err != nil {
m.log(err) log.Error(ctx, err)
return err // recovery cancelled return err // recovery cancelled
} }
...@@ -215,14 +217,14 @@ func (m *Master) runMain(ctx context.Context) (err error) { ...@@ -215,14 +217,14 @@ func (m *Master) runMain(ctx context.Context) (err error) {
// case previously it was unclean shutdown. // case previously it was unclean shutdown.
err = m.verify(ctx) err = m.verify(ctx)
if err != nil { if err != nil {
m.log(err) log.Error(ctx, err)
continue // -> recovery continue // -> recovery
} }
// provide service as long as partition table stays operational // provide service as long as partition table stays operational
err = m.service(ctx) err = m.service(ctx)
if err != nil { if err != nil {
m.log(err) log.Error(ctx, err)
continue // -> recovery continue // -> recovery
} }
...@@ -231,7 +233,7 @@ func (m *Master) runMain(ctx context.Context) (err error) { ...@@ -231,7 +233,7 @@ func (m *Master) runMain(ctx context.Context) (err error) {
// XXX shutdown ? // XXX shutdown ?
} }
return errors.WithMessage(ctx.Err(), "master: run") return ctx.Err()
} }
...@@ -246,7 +248,7 @@ func (m *Master) runMain(ctx context.Context) (err error) { ...@@ -246,7 +248,7 @@ func (m *Master) runMain(ctx context.Context) (err error) {
// storRecovery is result of 1 storage node passing recovery phase // storRecovery is result of 1 storage node passing recovery phase
type storRecovery struct { type storRecovery struct {
node *Node node *neo.Node
partTab neo.PartitionTable partTab neo.PartitionTable
// XXX + backup_tid, truncate_tid ? // XXX + backup_tid, truncate_tid ?
err error err error
...@@ -260,12 +262,8 @@ type storRecovery struct { ...@@ -260,12 +262,8 @@ type storRecovery struct {
func (m *Master) recovery(ctx context.Context) (err error) { func (m *Master) recovery(ctx context.Context) (err error) {
defer running(&ctx, "recovery")(&err) defer running(&ctx, "recovery")(&err)
ctx = task.Running(ctx, "recovery")
defer task.ErrContext(&err, ctx)
log.Infof(ctx, "") // XXX automatically log in task.Running?
m.setClusterState(neo.ClusterRecovering) m.setClusterState(neo.ClusterRecovering)
rctx, rcancel := context.WithCancel(ctx) ctx, rcancel := context.WithCancel(ctx)
defer rcancel() defer rcancel()
recovery := make(chan storRecovery) recovery := make(chan storRecovery)
...@@ -278,7 +276,7 @@ func (m *Master) recovery(ctx context.Context) (err error) { ...@@ -278,7 +276,7 @@ func (m *Master) recovery(ctx context.Context) (err error) {
wg.Add(1) wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()
storCtlRecovery(rctx, stor, recovery) storCtlRecovery(ctx, stor, recovery)
}() }()
} }
} }
...@@ -296,7 +294,7 @@ loop: ...@@ -296,7 +294,7 @@ loop:
defer wg.Done() defer wg.Done()
if node == nil { if node == nil {
m.reject(n.conn, resp) m.reject(ctx, n.conn, resp)
return return
} }
...@@ -308,7 +306,7 @@ loop: ...@@ -308,7 +306,7 @@ loop:
} }
// start recovery // start recovery
storCtlRecovery(rctx, node, recovery) storCtlRecovery(ctx, node, recovery)
}() }()
/* /*
...@@ -325,18 +323,18 @@ loop: ...@@ -325,18 +323,18 @@ loop:
// ptid ↑ and if so we should take partition table from there // ptid ↑ and if so we should take partition table from there
case r := <-recovery: case r := <-recovery:
if r.err != nil { if r.err != nil {
logf(ctx, "%v", r.err) log.Error(ctx, r.err)
if !xcontext.Canceled(errors.Cause(r.err)) { if !xcontext.Canceled(errors.Cause(r.err)) {
logf(ctx, "%v: closing link", r.node.Link) log.Infof(ctx, "%v: closing link", r.node.Link)
// close stor link / update .nodeTab // close stor link / update .nodeTab
err := r.node.Link.Close() err := r.node.Link.Close()
if err != nil { if err != nil {
logf(ctx, "master: %v", err) log.Error(ctx, err)
} }
m.nodeTab.SetNodeState(r.node, DOWN) m.nodeTab.SetNodeState(r.node, neo.DOWN)
} }
} else { } else {
...@@ -393,7 +391,7 @@ loop: ...@@ -393,7 +391,7 @@ loop:
for { for {
select { select {
// XXX <-m.nodeLeave ? // XXX <-m.nodeLeave ?
case <-recovery: case r := <-recovery:
// we do not care errors here - they are either cancelled or IO errors // we do not care errors here - they are either cancelled or IO errors
// we just log them and return - in case it is IO error // we just log them and return - in case it is IO error
// on link it will be caught on next send/recv XXX // on link it will be caught on next send/recv XXX
...@@ -421,9 +419,9 @@ func storCtlRecovery(ctx context.Context, stor *neo.Node, res chan storRecovery) ...@@ -421,9 +419,9 @@ func storCtlRecovery(ctx context.Context, stor *neo.Node, res chan storRecovery)
// on error provide feedback to storRecovery chan // on error provide feedback to storRecovery chan
res <- storRecovery{node: stor, err: err} res <- storRecovery{node: stor, err: err}
}() }()
defer xerr.Contextf(&err, "%s: stor recovery", stor.link) defer runningf(&ctx, "%s: stor recovery", stor.Link)(&err)
conn, err := stor.link.NewConn() conn, err := stor.Link.NewConn()
if err != nil { if err != nil {
return return
} }
...@@ -468,8 +466,8 @@ func storCtlRecovery(ctx context.Context, stor *neo.Node, res chan storRecovery) ...@@ -468,8 +466,8 @@ func storCtlRecovery(ctx context.Context, stor *neo.Node, res chan storRecovery)
} }
var errStopRequested = errors.New("stop requested") var errStopRequested = stderrors.New("stop requested")
var errClusterDegraded = errors.New("cluster became non-operatonal") var errClusterDegraded = stderrors.New("cluster became non-operatonal")
// Cluster Verification (data recovery) // Cluster Verification (data recovery)
...@@ -492,8 +490,7 @@ var errClusterDegraded = errors.New("cluster became non-operatonal") ...@@ -492,8 +490,7 @@ var errClusterDegraded = errors.New("cluster became non-operatonal")
// //
// prerequisite for start: .partTab is operational wrt .nodeTab // prerequisite for start: .partTab is operational wrt .nodeTab
func (m *Master) verify(ctx context.Context) (err error) { func (m *Master) verify(ctx context.Context) (err error) {
m.log("verify") defer running(&ctx, "verify")(&err)
defer xerr.Context(&err, "master: verify")
m.setClusterState(neo.ClusterVerifying) m.setClusterState(neo.ClusterVerifying)
vctx, vcancel := context.WithCancel(ctx) vctx, vcancel := context.WithCancel(ctx)
...@@ -512,7 +509,7 @@ func (m *Master) verify(ctx context.Context) (err error) { ...@@ -512,7 +509,7 @@ func (m *Master) verify(ctx context.Context) (err error) {
for _, stor := range m.nodeTab.StorageList() { for _, stor := range m.nodeTab.StorageList() {
if stor.NodeState > neo.DOWN { // XXX state cmp ok ? XXX or stor.Link != nil ? if stor.NodeState > neo.DOWN { // XXX state cmp ok ? XXX or stor.Link != nil ?
inprogress++ inprogress++
go storCtlVerify(vctx, stor.Link, verify) go storCtlVerify(vctx, stor, verify)
} }
} }
...@@ -520,7 +517,9 @@ loop: ...@@ -520,7 +517,9 @@ loop:
for inprogress > 0 { for inprogress > 0 {
select { select {
case n := <-m.nodeCome: case n := <-m.nodeCome:
node, ok := m.identify(n, /* XXX only accept storages -> known ? RUNNING : PENDING */) node, resp := m.identify(n, /* XXX only accept storages -> known ? RUNNING : PENDING */)
// XXX handle resp ^^^ like in recover
_, ok := resp.(*neo.AcceptIdentification)
if !ok { if !ok {
break break
} }
...@@ -528,10 +527,10 @@ loop: ...@@ -528,10 +527,10 @@ loop:
// new storage arrived - start verification on it too // new storage arrived - start verification on it too
// XXX ok? or it must first go through recovery check? // XXX ok? or it must first go through recovery check?
inprogress++ inprogress++
go storCtlVerify(vctx, node.Link, verify) go storCtlVerify(vctx, node, verify)
case n := <-m.nodeLeave: case n := <-m.nodeLeave:
m.nodeTab.UpdateLinkDown(n.link) m.nodeTab.SetNodeState(n.node, neo.DOWN)
// if cluster became non-operational - we cancel verification // if cluster became non-operational - we cancel verification
if !m.partTab.OperationalWith(&m.nodeTab) { if !m.partTab.OperationalWith(&m.nodeTab) {
...@@ -548,11 +547,10 @@ loop: ...@@ -548,11 +547,10 @@ loop:
inprogress-- inprogress--
if v.err != nil { if v.err != nil {
m.logf("verify: %v", v.err) log.Error(ctx, v.err)
// mark storage as non-working in nodeTab // mark storage as non-working in nodeTab
// FIXME better -> v.node.setState(DOWN) ? m.nodeTab.SetNodeState(v.node, neo.DOWN)
m.nodeTab.UpdateLinkDown(v.link)
// check partTab is still operational // check partTab is still operational
// if not -> cancel to go back to recovery // if not -> cancel to go back to recovery
...@@ -597,7 +595,7 @@ loop: ...@@ -597,7 +595,7 @@ loop:
// storVerify is result of a storage node passing verification phase // storVerify is result of a storage node passing verification phase
type storVerify struct { type storVerify struct {
node *Node node *neo.Node
lastOid zodb.Oid lastOid zodb.Oid
lastTid zodb.Tid lastTid zodb.Tid
// link *neo.NodeLink // XXX -> Node // link *neo.NodeLink // XXX -> Node
...@@ -605,20 +603,20 @@ type storVerify struct { ...@@ -605,20 +603,20 @@ type storVerify struct {
} }
// storCtlVerify drives a storage node during cluster verifying (= starting) state // storCtlVerify drives a storage node during cluster verifying (= starting) state
func storCtlVerify(ctx context.Context, link *neo.NodeLink, res chan storVerify) { func storCtlVerify(ctx context.Context, stor *neo.Node, res chan storVerify) {
// XXX link.Close on err // XXX link.Close on err
// XXX cancel on ctx // XXX cancel on ctx
var err error var err error
defer func() { defer func() {
if err != nil { if err != nil {
res <- storVerify{link: link, err: err} res <- storVerify{node: stor, err: err}
} }
}() }()
defer xerr.Contextf(&err, "%s: verify", link) defer runningf(&ctx, "%s: stor verify", stor.Link)(&err)
// FIXME stub // FIXME stub
conn, _ := link.NewConn() conn, _ := stor.Link.NewConn()
// XXX NotifyPT (so storages save locally recovered PT) // XXX NotifyPT (so storages save locally recovered PT)
...@@ -641,7 +639,7 @@ func storCtlVerify(ctx context.Context, link *neo.NodeLink, res chan storVerify) ...@@ -641,7 +639,7 @@ func storCtlVerify(ctx context.Context, link *neo.NodeLink, res chan storVerify)
} }
// send results to driver // send results to driver
res <- storVerify{link: link, lastOid: last.LastOid, lastTid: last.LastTid} res <- storVerify{node: stor, lastOid: last.LastOid, lastTid: last.LastTid}
} }
...@@ -661,8 +659,7 @@ func storCtlVerify(ctx context.Context, link *neo.NodeLink, res chan storVerify) ...@@ -661,8 +659,7 @@ func storCtlVerify(ctx context.Context, link *neo.NodeLink, res chan storVerify)
// //
// prerequisite for start: .partTab is operational wrt .nodeTab and verification passed // prerequisite for start: .partTab is operational wrt .nodeTab and verification passed
func (m *Master) service(ctx context.Context) (err error) { func (m *Master) service(ctx context.Context) (err error) {
m.log("service") defer running(&ctx, "service")(&err)
defer xerr.Context(&err, "master: service")
// XXX we also need to tell storages StartOperation first // XXX we also need to tell storages StartOperation first
m.setClusterState(neo.ClusterRunning) m.setClusterState(neo.ClusterRunning)
...@@ -674,10 +671,11 @@ loop: ...@@ -674,10 +671,11 @@ loop:
select { select {
// a node connected and requests identification // a node connected and requests identification
case n := <-m.nodeCome: case n := <-m.nodeCome:
node, resp, ok := m.identify(n, /* XXX accept everyone */) node, resp := m.identify(n, /* XXX accept everyone */)
state := m.ClusterState
//state := m.clusterState
_ = node
_ = resp
/* /*
wg.Add(1) wg.Add(1)
go func() { go func() {
...@@ -784,7 +782,7 @@ loop: ...@@ -784,7 +782,7 @@ loop:
*/ */
case n := <-m.nodeLeave: case n := <-m.nodeLeave:
m.nodeTab.UpdateLinkDown(n.link) m.nodeTab.SetNodeState(n.node, neo.DOWN)
// if cluster became non-operational - cancel service // if cluster became non-operational - cancel service
if !m.partTab.OperationalWith(&m.nodeTab) { if !m.partTab.OperationalWith(&m.nodeTab) {
...@@ -795,12 +793,13 @@ loop: ...@@ -795,12 +793,13 @@ loop:
// XXX what else ? (-> txn control at least) // XXX what else ? (-> txn control at least)
/*
case ech := <-m.ctlStart: case ech := <-m.ctlStart:
switch m.ClusterState { switch m.clusterState {
case ClusterVerifying, ClusterRunning: case neo.ClusterVerifying, neo.ClusterRunning:
ech <- nil // we are already started ech <- nil // we are already started
case ClusterRecovering: case neo.ClusterRecovering:
if m.partTab.OperationalWith(&m.nodeTab) { if m.partTab.OperationalWith(&m.nodeTab) {
// reply "ok to start" after whole recovery finishes // reply "ok to start" after whole recovery finishes
...@@ -820,6 +819,7 @@ loop: ...@@ -820,6 +819,7 @@ loop:
// XXX case ClusterStopping: // XXX case ClusterStopping:
} }
*/
case ech := <-m.ctlStop: case ech := <-m.ctlStop:
close(ech) // ok close(ech) // ok
...@@ -907,13 +907,16 @@ func (m *Master) identify(n nodeCome) (node *neo.Node, resp neo.Msg) { ...@@ -907,13 +907,16 @@ func (m *Master) identify(n nodeCome) (node *neo.Node, resp neo.Msg) {
} }
// reject sends rejective identification response and closes associated link // reject sends rejective identification response and closes associated link
func (m *Master) reject(conn *neo.Conn, resp neo.Msg) { func (m *Master) reject(ctx context.Context, conn *neo.Conn, resp neo.Msg) {
// XXX cancel on ctx? // XXX cancel on ctx?
// XXX log? // XXX log?
err1 := conn.Send(resp) err1 := conn.Send(resp)
err2 := conn.Close() err2 := conn.Close()
err3 := conn.Link().Close() err3 := conn.Link().Close()
log xerr.Merge(err1, err2, err3) err := xerr.Merge(err1, err2, err3)
if err != nil {
log.Error(ctx, "reject:", err)
}
} }
// accept sends acceptive identification response and closes conn // accept sends acceptive identification response and closes conn
...@@ -1255,35 +1258,3 @@ func (m *Master) ServeAdmin(ctx context.Context, conn *neo.Conn) { ...@@ -1255,35 +1258,3 @@ func (m *Master) ServeAdmin(ctx context.Context, conn *neo.Conn) {
func (m *Master) ServeMaster(ctx context.Context, conn *neo.Conn) { func (m *Master) ServeMaster(ctx context.Context, conn *neo.Conn) {
// TODO (for elections) // TODO (for elections)
} }
// ---- utils ----
// -> Logger
//func (m *Master) log(v interface{}) {
// XXX
//}
func (m *Master) logf(format string, argv ...interface{}) {
// TODO support custom log.Logger
log.Output(2, fmt.Sprintf("master(%v): " + format, append([]string{m.MasterAddr}, argv...)...))
}
func (m *Master) vlogf(format string, argv ...interface{}) {
// XXX -> verbose settings
if false {
return
}
m.log(format, argv)
}
func (m *Master) errctx(errp *error, context string) {
if *errp == nil {
return
}
xerr.Contextf(errp, "master(%v): %s", m.node.MasterAddr, context)
}
...@@ -79,7 +79,8 @@ func Serve(ctx context.Context, l *neo.Listener, srv Server) error { ...@@ -79,7 +79,8 @@ func Serve(ctx context.Context, l *neo.Listener, srv Server) error {
} }
*/ */
/* // FIXME kill vvv
///*
// ---------------------------------------- // ----------------------------------------
// XXX goes away? (we need a func to make sure to recv RequestIdentification // XXX goes away? (we need a func to make sure to recv RequestIdentification
...@@ -129,4 +130,4 @@ func IdentifyPeer(link *neo.NodeLink, myNodeType neo.NodeType) (nodeInfo neo.Req ...@@ -129,4 +130,4 @@ func IdentifyPeer(link *neo.NodeLink, myNodeType neo.NodeType) (nodeInfo neo.Req
return req, nil return req, nil
} }
*/ //*/
...@@ -28,6 +28,7 @@ import ( ...@@ -28,6 +28,7 @@ import (
"lab.nexedi.com/kirr/neo/go/neo" "lab.nexedi.com/kirr/neo/go/neo"
"lab.nexedi.com/kirr/neo/go/zodb" "lab.nexedi.com/kirr/neo/go/zodb"
"lab.nexedi.com/kirr/neo/go/xcommon/log"
"lab.nexedi.com/kirr/neo/go/xcommon/xnet" "lab.nexedi.com/kirr/neo/go/xcommon/xnet"
"lab.nexedi.com/kirr/neo/go/xcommon/xcontext" "lab.nexedi.com/kirr/neo/go/xcommon/xcontext"
...@@ -92,7 +93,7 @@ func (stor *Storage) Run(ctx context.Context) error { ...@@ -92,7 +93,7 @@ func (stor *Storage) Run(ctx context.Context) error {
if err != nil { if err != nil {
return err // XXX err ctx return err // XXX err ctx
} }
stor.logf("serving on %s ...", l.Addr()) log.Infof(ctx, "serving on %s ...", l.Addr())
// start serving incoming connections // start serving incoming connections
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
...@@ -140,13 +141,13 @@ func (stor *Storage) Run(ctx context.Context) error { ...@@ -140,13 +141,13 @@ func (stor *Storage) Run(ctx context.Context) error {
// it tries to persist master link reconnecting as needed // it tries to persist master link reconnecting as needed
// //
// it always returns an error - either due to cancel or command from master to shutdown // it always returns an error - either due to cancel or command from master to shutdown
func (stor *Storage) talkMaster(ctx context.Context) error { func (stor *Storage) talkMaster(ctx context.Context) (err error) {
// XXX errctx defer runningf(&ctx, "talk master(%v)", stor.node.MasterAddr)(&err)
for { for {
stor.vlogf("master(%v): connecting ...", stor.node.MasterAddr) log.Info(ctx, "connecting ...")
err := stor.talkMaster1(ctx) err := stor.talkMaster1(ctx)
stor.logf("master(%v): %v", stor.node.MasterAddr, err) log.Error(ctx, err)
// TODO if err = shutdown -> return // TODO if err = shutdown -> return
...@@ -187,7 +188,7 @@ func (stor *Storage) talkMaster1(ctx context.Context) (err error) { ...@@ -187,7 +188,7 @@ func (stor *Storage) talkMaster1(ctx context.Context) (err error) {
// XXX -> node.Dial ? // XXX -> node.Dial ?
if accept.YourNodeUUID != stor.node.MyInfo.NodeUUID { if accept.YourNodeUUID != stor.node.MyInfo.NodeUUID {
stor.logf("%v: master told us to have UUID=%v", Mlink, accept.YourNodeUUID) log.Infof(ctx, "%v: master told us to have UUID=%v", Mlink, accept.YourNodeUUID)
stor.node.MyInfo.NodeUUID = accept.YourNodeUUID stor.node.MyInfo.NodeUUID = accept.YourNodeUUID
} }
...@@ -225,13 +226,13 @@ func (stor *Storage) talkMaster1(ctx context.Context) (err error) { ...@@ -225,13 +226,13 @@ func (stor *Storage) talkMaster1(ctx context.Context) (err error) {
// let master initialize us. If successful this ends with StartOperation command. // let master initialize us. If successful this ends with StartOperation command.
err = stor.m1initialize(ctx, Mconn) err = stor.m1initialize(ctx, Mconn)
if err != nil { if err != nil {
stor.logf("%v: master: %v", Mconn, err) log.Error(ctx, err)
continue // retry initializing continue // retry initializing
} }
// we got StartOperation command. Let master drive us during servicing phase. // we got StartOperation command. Let master drive us during servicing phase.
err = stor.m1serve(ctx, Mconn) err = stor.m1serve(ctx, Mconn)
stor.logf("%v: master: %v", Mconn, err) log.Error(ctx, err)
continue // retry from initializing continue // retry from initializing
} }
...@@ -250,7 +251,7 @@ func (stor *Storage) talkMaster1(ctx context.Context) (err error) { ...@@ -250,7 +251,7 @@ func (stor *Storage) talkMaster1(ctx context.Context) (err error) {
// - nil: initialization was ok and a command came from master to start operation // - nil: initialization was ok and a command came from master to start operation
// - !nil: initialization was cancelled or failed somehow // - !nil: initialization was cancelled or failed somehow
func (stor *Storage) m1initialize(ctx context.Context, Mconn *neo.Conn) (err error) { func (stor *Storage) m1initialize(ctx context.Context, Mconn *neo.Conn) (err error) {
defer xerr.Context(&err, "init") defer runningf(&ctx, "init %v", Mconn)(&err)
for { for {
msg, err := Mconn.Recv() msg, err := Mconn.Recv()
...@@ -319,7 +320,7 @@ func (stor *Storage) m1initialize(ctx context.Context, Mconn *neo.Conn) (err err ...@@ -319,7 +320,7 @@ func (stor *Storage) m1initialize(ctx context.Context, Mconn *neo.Conn) (err err
// either due to master commanding us to stop, or context cancel or some other // either due to master commanding us to stop, or context cancel or some other
// error. // error.
func (stor *Storage) m1serve(ctx context.Context, Mconn *neo.Conn) (err error) { func (stor *Storage) m1serve(ctx context.Context, Mconn *neo.Conn) (err error) {
defer xerr.Context(&err, "serve") defer runningf(&ctx, "serve %v", Mconn)(&err)
// refresh stor.opCtx and cancel it when we finish so that client // refresh stor.opCtx and cancel it when we finish so that client
// handlers know they need to stop operating as master told us to do so. // handlers know they need to stop operating as master told us to do so.
...@@ -359,7 +360,7 @@ func (stor *Storage) m1serve(ctx context.Context, Mconn *neo.Conn) (err error) { ...@@ -359,7 +360,7 @@ func (stor *Storage) m1serve(ctx context.Context, Mconn *neo.Conn) (err error) {
// ServeLink serves incoming node-node link connection // ServeLink serves incoming node-node link connection
// XXX +error return? // XXX +error return?
func (stor *Storage) ServeLink(ctx context.Context, link *neo.NodeLink) { func (stor *Storage) ServeLink(ctx context.Context, link *neo.NodeLink) {
stor.logf("%s: serving new node", link) log.Infof(ctx, "%s: serving new node", link) // XXX -> running?
// close link when either cancelling or returning (e.g. due to an error) // close link when either cancelling or returning (e.g. due to an error)
// ( when cancelling - link.Close will signal to all current IO to // ( when cancelling - link.Close will signal to all current IO to
...@@ -374,7 +375,7 @@ func (stor *Storage) ServeLink(ctx context.Context, link *neo.NodeLink) { ...@@ -374,7 +375,7 @@ func (stor *Storage) ServeLink(ctx context.Context, link *neo.NodeLink) {
// XXX ret err = ctx.Err() // XXX ret err = ctx.Err()
case <-retch: case <-retch:
} }
stor.logf("%v: closing link", link) log.Info(ctx, "%v: closing link", link)
link.Close() // XXX err link.Close() // XXX err
}() }()
...@@ -382,7 +383,7 @@ func (stor *Storage) ServeLink(ctx context.Context, link *neo.NodeLink) { ...@@ -382,7 +383,7 @@ func (stor *Storage) ServeLink(ctx context.Context, link *neo.NodeLink) {
// XXX only accept when operational (?) // XXX only accept when operational (?)
nodeInfo, err := IdentifyPeer(link, neo.STORAGE) nodeInfo, err := IdentifyPeer(link, neo.STORAGE)
if err != nil { if err != nil {
stor.logf("%v", err) log.Error(ctx, err)
return return
} }
...@@ -393,7 +394,7 @@ func (stor *Storage) ServeLink(ctx context.Context, link *neo.NodeLink) { ...@@ -393,7 +394,7 @@ func (stor *Storage) ServeLink(ctx context.Context, link *neo.NodeLink) {
default: default:
// XXX vvv should be reply to peer // XXX vvv should be reply to peer
stor.logf("%v: unexpected peer type: %v", link, nodeInfo.NodeType) log.Errorf(ctx, "%v: unexpected peer type: %v", link, nodeInfo.NodeType)
return return
} }
...@@ -401,7 +402,7 @@ func (stor *Storage) ServeLink(ctx context.Context, link *neo.NodeLink) { ...@@ -401,7 +402,7 @@ func (stor *Storage) ServeLink(ctx context.Context, link *neo.NodeLink) {
for { for {
conn, err := link.Accept() conn, err := link.Accept()
if err != nil { if err != nil {
stor.logf("%v", err) // XXX err ctx log.Error(ctx, err)
break break
} }
...@@ -427,7 +428,7 @@ func (stor *Storage) withWhileOperational(ctx context.Context) (context.Context, ...@@ -427,7 +428,7 @@ func (stor *Storage) withWhileOperational(ctx context.Context) (context.Context,
// the connection is closed when serveClient returns // the connection is closed when serveClient returns
// XXX +error return? // XXX +error return?
func (stor *Storage) serveClient(ctx context.Context, conn *neo.Conn) { func (stor *Storage) serveClient(ctx context.Context, conn *neo.Conn) {
stor.logf("%s: serving new client conn", conn) log.Infof(ctx, "%s: serving new client conn", conn) // XXX -> running?
// rederive ctx to be also cancelled if M tells us StopOperation // rederive ctx to be also cancelled if M tells us StopOperation
ctx, cancel := stor.withWhileOperational(ctx) ctx, cancel := stor.withWhileOperational(ctx)
...@@ -458,9 +459,9 @@ func (stor *Storage) serveClient(ctx context.Context, conn *neo.Conn) { ...@@ -458,9 +459,9 @@ func (stor *Storage) serveClient(ctx context.Context, conn *neo.Conn) {
case err = <-done: case err = <-done:
} }
stor.logf("%v: %v", conn, err) log.Infof(ctx, "%v: %v", conn, err)
// XXX vvv -> defer ? // XXX vvv -> defer ?
stor.logf("%v: closing client conn", conn) log.Infof(ctx, "%v: closing client conn", conn)
conn.Close() // XXX err conn.Close() // XXX err
} }
......
...@@ -19,3 +19,39 @@ ...@@ -19,3 +19,39 @@
package server package server
// misc utilities // misc utilities
import (
"context"
"fmt"
"lab.nexedi.com/kirr/neo/go/xcommon/task"
"lab.nexedi.com/kirr/neo/go/xcommon/log"
)
// running is syntatic sugar to push new task to operational stack, log it and
// adjust error return with task prefix.
//
// use like this:
//
// defer running(&ctx, "my task")(&err)
func running(ctxp *context.Context, name string) func(*error) {
return _running(ctxp, name)
}
// runningf is running cousing with formatting support
func runningf(ctxp *context.Context, format string, argv ...interface{}) func(*error) {
return _running(ctxp, fmt.Sprintf(format, argv...))
}
func _running(ctxp *context.Context, name string) func(*error) {
ctx := task.Running(*ctxp, name)
*ctxp = ctx
log.Depth(2).Info(ctx) // XXX level = ok?
return func(errp *error) {
// NOTE not *ctxp here - as context pointed by ctxp could be
// changed when this deferred function is run
task.ErrContext(errp, ctx)
// XXX also log task stop here?
}
}
...@@ -23,30 +23,79 @@ ...@@ -23,30 +23,79 @@
package log package log
import ( import (
"context"
"fmt"
"github.com/golang/glog" "github.com/golang/glog"
"lab.nexedi.com/kirr/neo/go/xcommon/task" "lab.nexedi.com/kirr/neo/go/xcommon/task"
) )
/*
// taskPrefix returns prefix associated to current operational task stack to put to logs // taskPrefix returns prefix associated to current operational task stack to put to logs
func taskPrefix(ctx) string { func taskPrefix(ctx context.Context) string {
s := task.Current(ctx).String() s := task.Current(ctx).String()
if s != "" { if s != "" {
s += ": " s += ": "
} }
return s return s
} }
*/
// withTask prepends string describing current operational task stack to argv and returns it
// handy to use this way:
//
// func info(ctx, argv ...interface{}) {
// glog.Info(withTask(ctx, argv...)...)
// }
//
// see https://golang.org/issues/21388
func withTask(ctx context.Context, argv ...interface{}) []interface{} {
task := task.Current(ctx).String()
if task == "" {
return argv
}
if len(argv) != 0 {
task += ":"
}
return append([]interface{}{task}, argv...)
}
type Depth int type Depth int
func (d Depth) Info(ctx context.Context, argv ...interface{}) {
// XXX avoid task formatting if logging severity disabled
glog.InfoDepth(int(d+1), withTask(ctx, argv...)...)
}
func (d Depth) Infof(ctx context.Context, format string, argv ...interface{}) { func (d Depth) Infof(ctx context.Context, format string, argv ...interface{}) {
// XXX avoid formatting if logging severity disables info // XXX avoid formatting if logging severity disabled
glog.InfoDepth(d+1, taskPrefix(ctx) + fmt.Sprintf(format, argv) glog.InfoDepth(int(d+1), withTask(ctx, fmt.Sprintf(format, argv))...)
}
func (d Depth) Error(ctx context.Context, argv ...interface{}) {
glog.ErrorDepth(int(d+1), withTask(ctx, argv...)...)
}
func (d Depth) Errorf(ctx context.Context, format string, argv ...interface{}) {
glog.ErrorDepth(int(d+1), withTask(ctx, fmt.Sprintf(format, argv))...)
} }
func Info(ctx context.Context, argv ...interface{}) { Depth(1).Info(ctx, argv...) }
func Error(ctx context.Context, argv ...interface{}) { Depth(1).Error(ctx, argv...) }
func Infof(ctx context.Context, format string, argv ...interface{}) { func Infof(ctx context.Context, format string, argv ...interface{}) {
Depth(1).Infof(ctx, format, argv) Depth(1).Infof(ctx, format, argv...)
}
func Errorf(ctx context.Context, format string, argv ...interface{}) {
Depth(1).Errorf(ctx, format, argv...)
} }
......
...@@ -22,6 +22,9 @@ package task ...@@ -22,6 +22,9 @@ package task
import ( import (
"context" "context"
"fmt"
"lab.nexedi.com/kirr/go123/xerr"
) )
// Task represents currently running operation // Task represents currently running operation
...@@ -34,13 +37,18 @@ type taskKey struct{} ...@@ -34,13 +37,18 @@ type taskKey struct{}
// Running creates new task and returns new context with that task set to current // Running creates new task and returns new context with that task set to current
func Running(ctx context.Context, name string) context.Context { func Running(ctx context.Context, name string) context.Context {
context.WithValue(&Task{Parent: Current(ctx), Name: name}) return context.WithValue(ctx, taskKey{}, &Task{Parent: Current(ctx), Name: name})
}
// Runningf is Running cousing with formatting support
func Runningf(ctx context.Context, format string, argv ...interface{}) context.Context {
return Running(ctx, fmt.Sprintf(format, argv...))
} }
// Current returns current task represented by context. // Current returns current task represented by context.
// if there is no current task - it returns nil. // if there is no current task - it returns nil.
func Current(ctx Context) *Task { func Current(ctx context.Context) *Task {
task, _ := ctx.Value(taskKey).(*Task) task, _ := ctx.Value(taskKey{}).(*Task)
return task return task
} }
...@@ -49,16 +57,16 @@ func Current(ctx Context) *Task { ...@@ -49,16 +57,16 @@ func Current(ctx Context) *Task {
// //
// func myfunc(ctx, ...) (..., err error) { // func myfunc(ctx, ...) (..., err error) {
// ctx = task.Running("doing something") // ctx = task.Running("doing something")
// defer task.ErrContext(&err, ctx) // defer task.ErrContext(&err, ctx)
// ... // ...
// //
// Please see lab.nexedi.com/kirr/go123/xerr.Context for semantic details // Please see lab.nexedi.com/kirr/go123/xerr.Context for semantic details
func ErrContext(errp *error, ctx Context) { func ErrContext(errp *error, ctx context.Context) {
task := Current(ctx) task := Current(ctx)
if task == nil { if task == nil {
return return
} }
return xerr.Context(errp, task.Name) xerr.Context(errp, task.Name)
} }
// String returns string representing whole operational stack. // String returns string representing whole operational stack.
...@@ -68,11 +76,11 @@ func ErrContext(errp *error, ctx Context) { ...@@ -68,11 +76,11 @@ func ErrContext(errp *error, ctx Context) {
// //
// nil Task is represented as "" // nil Task is represented as ""
func (t *Task) String() string { func (t *Task) String() string {
if o == nil { if t == nil {
return "" return ""
} }
prefix := Parent.String() prefix := t.Parent.String()
if prefix != "" { if prefix != "" {
prefix += ": " prefix += ": "
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment