Commit fc667f6c authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent eeb1f958
......@@ -38,8 +38,6 @@ import (
"lab.nexedi.com/kirr/neo/go/xcommon/xcontext"
"lab.nexedi.com/kirr/neo/go/xcommon/xio"
"lab.nexedi.com/kirr/neo/go/xcommon/xnet"
"lab.nexedi.com/kirr/go123/xerr"
)
// Master is a node overseeing and managing how whole NEO cluster works
......@@ -62,24 +60,13 @@ type Master struct {
// channels from workers directly serving peers to main driver
nodeCome chan nodeCome // node connected XXX -> acceptq?
nodeLeave chan nodeLeave // node disconnected XXX -> don't need
// nodeLeave chan nodeLeave // node disconnected XXX -> don't need
// so tests could override
monotime func() float64
}
// event: node connects
type nodeCome struct {
req *neo.Request
idReq *neo.RequestIdentification // we received this identification request
}
// event: node disconnects
type nodeLeave struct {
node *neo.Node
}
// NewMaster creates new master node that will listen on serveAddr.
// Use Run to actually start running the node.
func NewMaster(clusterName, serveAddr string, net xnet.Networker) *Master {
......@@ -91,7 +78,7 @@ func NewMaster(clusterName, serveAddr string, net xnet.Networker) *Master {
ctlShutdown: make(chan chan error),
nodeCome: make(chan nodeCome),
nodeLeave: make(chan nodeLeave),
// nodeLeave: make(chan nodeLeave),
monotime: monotime,
}
......@@ -118,14 +105,14 @@ func (m *Master) Stop() {
<-ech
}
// Shutdown requests all known nodes in the cluster to stop
// Shutdown requests all known nodes in the cluster to stop.
// XXX + master's run to finish ?
func (m *Master) Shutdown() error {
panic("TODO")
}
// setClusterState sets .clusterState and notifies subscribers
// setClusterState sets .clusterState and notifies subscribers.
func (m *Master) setClusterState(state neo.ClusterState) {
m.node.ClusterState.Set(state)
......@@ -133,7 +120,7 @@ func (m *Master) setClusterState(state neo.ClusterState) {
}
// Run starts master node and runs it until ctx is cancelled or fatal error
// Run starts master node and runs it until ctx is cancelled or fatal error.
func (m *Master) Run(ctx context.Context) (err error) {
// start listening
l, err := m.node.Listen()
......@@ -167,14 +154,19 @@ func (m *Master) Run(ctx context.Context) (err error) {
wg := sync.WaitGroup{}
serveCtx, serveCancel := context.WithCancel(ctx)
wg.Add(1)
go func() {
go func(ctx context.Context) (err error) {
defer wg.Done()
defer task.Running(&ctx, "accept")(&err)
// XXX dup in storage
for serveCtx.Err() == nil {
req, idReq, err := l.Accept(serveCtx)
for {
if ctx.Err() != nil {
return ctx.Err()
}
req, idReq, err := l.Accept(ctx)
if err != nil {
// TODO log / throttle
log.Error(ctx, err) // XXX throttle?
continue
}
......@@ -196,13 +188,13 @@ func (m *Master) Run(ctx context.Context) (err error) {
case m.nodeCome <- nodeCome{req, idReq}:
// ok
case <-serveCtx.Done():
case <-ctx.Done():
// shutdown
lclose(serveCtx, req.Link())
return
lclose(ctx, req.Link())
continue
}
}
}()
}(serveCtx)
// main driving logic
err = m.runMain(ctx)
......@@ -330,7 +322,7 @@ loop:
go func() {
defer wg.Done()
err := m.accept(ctx, n.req, resp)
err := accept(ctx, n.req, resp)
if err != nil {
recovery <- storRecovery{stor: node, err: err}
return
......@@ -577,7 +569,7 @@ loop:
go func() {
defer wg.Done()
err := m.accept(ctx, n.req, resp)
err := accept(ctx, n.req, resp)
if err != nil {
verify <- storVerify{stor: node, err: err}
return
......@@ -586,6 +578,7 @@ loop:
storCtlVerify(ctx, node, m.node.PartTab, verify)
}()
/*
case n := <-m.nodeLeave:
n.node.SetState(neo.DOWN)
......@@ -597,6 +590,7 @@ loop:
err = errClusterDegraded
break loop
}
*/
// a storage node came through verification - adjust our last{Oid,Tid} if ok
// on error check - whether cluster became non-operational and stop verification if so
......@@ -783,7 +777,7 @@ loop:
go func() {
defer wg.Done()
err = m.accept(ctx, n.req, resp)
err = accept(ctx, n.req, resp)
if err != nil {
serviced <- serviceDone{node: node, err: err}
return
......@@ -806,6 +800,7 @@ loop:
// TODO if S goes away -> check partTab still operational -> if not - recovery
_ = d
/*
// XXX who sends here?
case n := <-m.nodeLeave:
n.node.SetState(neo.DOWN)
......@@ -815,6 +810,7 @@ loop:
err = errClusterDegraded
break loop
}
*/
// XXX what else ? (-> txn control at least)
......@@ -1106,36 +1102,6 @@ func (m *Master) identify(ctx context.Context, n nodeCome) (node *neo.Node, resp
return node, accept
}
// reject sends rejective identification response and closes associated link
func reject(ctx context.Context, req *neo.Request, resp neo.Msg) {
// XXX cancel on ctx?
// XXX log?
err1 := req.Reply(resp)
err2 := req.Link().Close()
err := xerr.Merge(err1, err2)
if err != nil {
log.Error(ctx, "reject:", err)
}
}
// goreject spawns reject in separate goroutine properly added/done on wg
func goreject(ctx context.Context, wg *sync.WaitGroup, req *neo.Request, resp neo.Msg) {
wg.Add(1)
defer wg.Done()
go reject(ctx, req, resp)
}
// accept replies with acceptive identification response
// XXX if problem -> .nodeLeave
// XXX spawn ping goroutine from here?
func (m *Master) accept(ctx context.Context, req *neo.Request, resp neo.Msg) error {
// XXX cancel on ctx
err1 := req.Reply(resp)
return err1 // XXX while trying to work on single conn
//err2 := conn.Close()
//return xerr.First(err1, err2)
}
// allocUUID allocates new node uuid for a node of kind nodeType
// XXX it is bad idea for master to assign uuid to coming node
// -> better nodes generate really unique UUID themselves and always show with them
......
......@@ -26,8 +26,10 @@ import (
"context"
// "fmt"
// "net"
"sync"
"lab.nexedi.com/kirr/neo/go/neo"
"lab.nexedi.com/kirr/neo/go/xcommon/log"
"lab.nexedi.com/kirr/go123/xerr"
)
......@@ -115,3 +117,50 @@ func IdentifyPeer(ctx context.Context, link *neo.NodeLink, myNodeType neo.NodeTy
return req, nil
}
// ----------------------------------------
// event: node connects
type nodeCome struct {
req *neo.Request
idReq *neo.RequestIdentification // we received this identification request
}
/*
// event: node disconnects
type nodeLeave struct {
node *neo.Node
}
*/
// reject sends rejective identification response and closes associated link
func reject(ctx context.Context, req *neo.Request, resp neo.Msg) {
// XXX cancel on ctx?
// XXX log?
err1 := req.Reply(resp)
err2 := req.Link().Close()
err := xerr.Merge(err1, err2)
if err != nil {
log.Error(ctx, "reject:", err)
}
}
// goreject spawns reject in separate goroutine properly added/done on wg
func goreject(ctx context.Context, wg *sync.WaitGroup, req *neo.Request, resp neo.Msg) {
wg.Add(1)
defer wg.Done()
go reject(ctx, req, resp)
}
// accept replies with acceptive identification response
// XXX spawn ping goroutine from here?
func accept(ctx context.Context, req *neo.Request, resp neo.Msg) error {
// XXX cancel on ctx
err1 := req.Reply(resp)
return err1 // XXX while trying to work on single conn
//err2 := conn.Close()
//return xerr.First(err1, err2)
}
......@@ -54,6 +54,8 @@ type Storage struct {
// 2 ? (data.fs)
// 3 packed/ (deltified objects)
zstor zodb.IStorage // underlying ZODB storage XXX -> directly work with fs1 & friends
//nodeCome chan nodeCome // node connected
}
// NewStorage creates new storage node that will listen on serveAddr and talk to master on masterAddr.
......@@ -89,30 +91,46 @@ func (stor *Storage) Run(ctx context.Context) error {
wg := sync.WaitGroup{}
serveCtx, serveCancel := context.WithCancel(ctx)
wg.Add(1)
go func() {
go func(ctx context.Context) (err error) {
defer wg.Done()
defer task.Running(&ctx, "serve")(&err) // XXX or "accept" ?
// XXX dup from master
for serveCtx.Err() == nil {
conn, idReq, err := l.Accept(serveCtx)
for {
if ctx.Err() != nil {
return ctx.Err()
}
req, idReq, err := l.Accept(ctx)
if err != nil {
// TODO log / throttle
log.Error(ctx, err) // XXX throttle?
continue
}
_ = idReq
resp, ok := stor.identify(idReq)
if !ok {
goreject(ctx, &wg, req, resp)
continue
}
wg.Add(1)
go func() {
defer wg.Done()
}()
// handover to main driver
select {
//case m.nodeCome <- nodeCome{conn, idReq, nil/*XXX kill*/}:
//case stor.nodeCome <- nodeCome{req, idReq}:
// // ok
case <-serveCtx.Done():
case <-ctx.Done():
// shutdown
lclose(serveCtx, conn.Link())
return
lclose(ctx, req.Link())
continue
}
}
}()
}(serveCtx)
// connect to master and get commands and updates from it
err = stor.talkMaster(ctx)
......@@ -154,6 +172,7 @@ func (stor *Storage) talkMaster(ctx context.Context) (err error) {
}
// talkMaster1 does 1 cycle of connect/talk/disconnect to master.
//
// it returns error describing why such cycle had to finish
// XXX distinguish between temporary problems and non-temporary ones?
func (stor *Storage) talkMaster1(ctx context.Context) (err error) {
......@@ -190,100 +209,13 @@ func (stor *Storage) talkMaster1(ctx context.Context) (err error) {
err = stor.m1serve(ctx, reqStart)
log.Error(ctx, err)
return err
/*
// accept next connection from master. only 1 connection is served at any given time.
// every new connection from master means talk over previous connection is cancelled.
// XXX recheck compatibility with py
type accepted struct {conn *neo.Conn; err error}
acceptq := make(chan accepted)
go func () {
// XXX (temp ?) disabled not to let S accept new connections
// reason: not (yet ?) clear how to allow listen on dialed link without
// missing immediate sends or deadlocks if peer does not follow
// expected protocol exchange (2 receive paths: Recv & Accept)
return
for {
conn, err := Mlink.Accept()
select {
case acceptq <- accepted{conn, err}:
case <-retch:
return
}
if err != nil {
log.Error(ctx, err)
return
}
}
}()
// handle notifications and commands from master
talkq := make(chan error, 1)
for {
// wait for next connection from master if talk over previous one finished.
// XXX rafactor all this into SingleTalker ? (XXX ServeSingle ?)
if Mconn == nil {
select {
case a := <-acceptq:
if a.err != nil {
return a.err
}
Mconn = a.conn
case <-ctx.Done():
return ctx.Err()
}
}
// one talk cycle for master to drive us
// puts error after talk finishes -> talkq
talk := func() error {
// let master initialize us. If successful this ends with StartOperation command.
err := stor.m1initialize(ctx, Mconn)
if err != nil {
log.Error(ctx, err)
return err
}
// we got StartOperation command. Let master drive us during servicing phase.
err = stor.m1serve(ctx, Mconn)
log.Error(ctx, err)
return err
}
go func() {
talkq <- talk()
}()
// next connection / talk finished / cancel
select {
case a := <-acceptq:
lclose(ctx, Mconn) // wakeup/cancel current talk
<-talkq // wait till it finish
if a.err != nil {
return a.err
}
Mconn = a.conn // proceed next cycle on accepted conn
case err = <-talkq:
// XXX check for shutdown command
lclose(ctx, Mconn)
Mconn = nil // now wait for accept to get next Mconn
case <-ctx.Done():
return ctx.Err()
}
}
*/
}
// m1initialize drives storage by master messages during initialization phase
//
// Initialization includes master retrieving info for cluster recovery and data
// verification before starting operation. Initialization finishes either
// successfully with receiving master commanding to start operation, or
// successfully with receiving master command to start operation, or
// unsuccessfully with connection closing indicating initialization was
// cancelled or some other error.
//
......@@ -411,6 +343,29 @@ func (stor *Storage) m1serve(ctx context.Context, reqStart *neo.Request) (err er
// --- serve incoming connections from other nodes ---
// identify processes identification request from connected peer.
func (stor *Storage) identify(idReq *neo.RequestIdentification) (neo.Msg, bool) {
// XXX stub: we accept clients and don't care about their UUID
if idReq.NodeType != neo.CLIENT {
return &neo.Error{neo.PROTOCOL_ERROR, "only clients are accepted"}, false
}
if idReq.ClusterName != stor.node.ClusterName {
return &neo.Error{neo.PROTOCOL_ERROR, "cluster name mismatch"}, false
}
// XXX check operational?
return &neo.AcceptIdentification{
NodeType: stor.node.MyInfo.Type,
MyUUID: stor.node.MyInfo.UUID, // XXX lock wrt update
NumPartitions: 1, // XXX
NumReplicas: 1, // XXX
YourUUID: idReq.UUID,
}, true
}
// ServeLink serves incoming node-node link connection
func (stor *Storage) ServeLink(ctx context.Context, link *neo.NodeLink) (err error) {
defer task.Runningf(&ctx, "serve %s", link)(&err)
......@@ -473,6 +428,7 @@ func (stor *Storage) serveClient(ctx context.Context, req neo.Request) {
//log.Infof(ctx, "%s: serving new client conn", conn) // XXX -> running?
// rederive ctx to be also cancelled if M tells us StopOperation
// XXX level up
ctx, cancel := stor.withWhileOperational(ctx)
defer cancel()
......@@ -499,52 +455,6 @@ func (stor *Storage) serveClient(ctx context.Context, req neo.Request) {
}
}
// serveClient serves incoming connection on which peer identified itself as client
// the connection is closed when serveClient returns
// XXX +error return?
//
// XXX version that keeps 1 goroutine per 1 Conn
// XXX unusable until Conn.Close signals peer
/*
func (stor *Storage) serveClient(ctx context.Context, conn *neo.Conn) {
log.Infof(ctx, "%s: serving new client conn", conn) // XXX -> running?
// rederive ctx to be also cancelled if M tells us StopOperation
ctx, cancel := stor.withWhileOperational(ctx)
defer cancel()
// main work to serve
done := make(chan error, 1)
go func() {
for {
err := stor.serveClient1(conn)
if err != nil {
done <- err
break
}
}
}()
// close connection when either cancelling or returning (e.g. due to an error)
// ( when cancelling - conn.Close will signal to current IO to
// terminate with an error )
var err error
select {
case <-ctx.Done():
// XXX tell client we are shutting down?
// XXX should we also wait for main work to finish?
err = ctx.Err()
case err = <-done:
}
log.Infof(ctx, "%v: %v", conn, err)
// XXX vvv -> defer ?
log.Infof(ctx, "%v: closing client conn", conn)
conn.Close() // XXX err
}
*/
// serveClient1 prepares response for 1 request from client
func (stor *Storage) serveClient1(ctx context.Context, req neo.Msg) (resp neo.Msg) {
switch req := req.(type) {
......@@ -593,3 +503,53 @@ func (stor *Storage) serveClient1(ctx context.Context, req neo.Msg) (resp neo.Ms
//req.Put(...)
}
// ----------------------------------------
// serveClient serves incoming connection on which peer identified itself as client
// the connection is closed when serveClient returns
// XXX +error return?
//
// XXX version that keeps 1 goroutine per 1 Conn
// XXX unusable until Conn.Close signals peer
/*
func (stor *Storage) serveClient(ctx context.Context, conn *neo.Conn) {
log.Infof(ctx, "%s: serving new client conn", conn) // XXX -> running?
// rederive ctx to be also cancelled if M tells us StopOperation
ctx, cancel := stor.withWhileOperational(ctx)
defer cancel()
// main work to serve
done := make(chan error, 1)
go func() {
for {
err := stor.serveClient1(conn)
if err != nil {
done <- err
break
}
}
}()
// close connection when either cancelling or returning (e.g. due to an error)
// ( when cancelling - conn.Close will signal to current IO to
// terminate with an error )
var err error
select {
case <-ctx.Done():
// XXX tell client we are shutting down?
// XXX should we also wait for main work to finish?
err = ctx.Err()
case err = <-done:
}
log.Infof(ctx, "%v: %v", conn, err)
// XXX vvv -> defer ?
log.Infof(ctx, "%v: closing client conn", conn)
conn.Close() // XXX err
}
*/
......@@ -55,28 +55,27 @@ func Merge(ctx1, ctx2 context.Context) (context.Context, context.CancelFunc) {
// ----//---- same for ctx2?
*/
/*
XXX do we need vvv?
// if src ctx is already cancelled - make mc cancelled right after creation
//
// this saves goroutine spawn and makes
//
// ctx = Merge(ctx1, ctx2); ctx.Err != nil
//
// check possible.
select {
case <-ctx1.Done():
mc.done = ctx1.Done()
close(mc.done)
mc.doneErr = ctx1.Err()
case <-ctx2.Done():
mc.done = ctx2.Done()
close(mc.done)
mc.doneErr = ctx2.Err()
// src ctx not canceled - spawn ctx{1,2}.done merger.
default:
done := make(chan struct{})
mc.done = done
go mc.wait(done)
// src ctx not canceled - spawn ctx{1,2}.done merger.
go mc.wait()
}
*/
go mc.wait()
return mc, mc.cancel
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment