Commit 2b8c87d4 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent b23c7153
......@@ -58,13 +58,16 @@ type nodeCome struct {
// node disconnects
type nodeLeave struct {
// TODO
link *NodeLink
// XXX TODO
}
// storage node passed recovery phase
type storRecovery struct {
partTab PartitionTable
// XXX + lastOid, lastTid, backup_tid, truncate_tid ?
// XXX + err ?
}
func NewMaster(clusterName string) *Master {
......@@ -203,7 +206,7 @@ func (m *Master) accept(n nodeCome) (nodeInfo NodeInfo, ok bool) {
return nodeInfo, true
}
// storCtlRecovery drives a storage node during cluster recoving state
// storCtlRecovery drives a storage node during cluster recovering state
// TODO text
func (m *Master) storCtlRecovery(ctx context.Context, link *NodeLink) {
var err error
......@@ -212,6 +215,8 @@ func (m *Master) storCtlRecovery(ctx context.Context, link *NodeLink) {
return
}
// XXX on err still provide feedback to storRecovery chan ?
fmt.Printf("master: %v", err)
// this must interrupt everything connected to stor node and
......@@ -280,9 +285,11 @@ func (m *Master) allocUUID(nodeType NodeType) NodeUUID {
// ServeLink serves incoming node-node link connection
// XXX +error return?
func (m *Master) ServeLink(ctx context.Context, link *NodeLink) {
fmt.Printf("master: %s: serving new node\n", link)
logf := func(format string, argv ...interface{}) {
fmt.Printf("master: %s: " + format + "\n", append([]interface{}{link}, argv...))
}
//var node *Node
logf("serving new node")
// close link when either cancelling or returning (e.g. due to an error)
// ( when cancelling - link.Close will signal to all current IO to
......@@ -297,34 +304,59 @@ func (m *Master) ServeLink(ctx context.Context, link *NodeLink) {
// XXX ret err = ctx.Err()
case <-retch:
}
fmt.Printf("master: %v: closing link\n", link)
logf("closing link")
link.Close() // XXX err
}()
// identify
// XXX -> change to use nodeCome
nodeInfo, err := IdentifyPeer(link, MASTER)
// identify peer
// the first conn must come with RequestIdentification packet
conn, err := link.Accept()
if err != nil {
fmt.Printf("master: %v\n", err)
logf("identify: %v", err)
return
}
// TODO get connNotify as conn left after identification
connNotify, err := link.NewConn()
idReq := RequestIdentification{}
err = Expect(conn, &idReq)
if err != nil {
logf("identify: %v", err)
// XXX ok to let peer know error as is? e.g. even IO error on Recv?
err = EncodeAndSend(conn, &Error{PROTOCOL_ERROR, err.Error()})
if err != nil {
panic("TODO") // XXX
logf("failed to send error: %v", err)
}
return
}
// notify main logic node connects/disconnects
_ = nodeInfo
/*
node = &Node{nodeInfo, link}
m.nodeq <- node
// convey identification request to master
idRespCh := make(chan NEOEncoder)
m.nodeCome <- nodeCome{link, idReq, idRespCh}
idResp := <-idRespCh
// if master accepted this node - don't forget to notify when it leaves
_, noaccept := idResp.(error)
if !noaccept {
defer func() {
node.state = DOWN
m.nodeq <- node
m.nodeLeave <- nodeLeave{link}
}()
*/
}
// let the peer know identification result
err = EncodeAndSend(conn, idResp)
if err != nil {
return
}
// nothing to do more here if identification was not accepted
if noaccept {
logf("identify: %v", idResp)
return
}
logf("identify: accepted")
// ----------------------------------------
// XXX recheck vvv
// subscribe to nodeTab/partTab/clusterState and notify peer with updates
......
......@@ -121,18 +121,12 @@ func IdentifyPeer(link *NodeLink, myNodeType NodeType) (nodeInfo RequestIdentifi
}
}()
pkt, err := RecvAndDecode(conn)
req := RequestIdentification{}
err = Expect(conn, &req)
if err != nil {
return nodeInfo, err
}
switch pkt := pkt.(type) {
default:
return nodeInfo, fmt.Errorf("unexpected request: %T", pkt)
// XXX also handle Error
case *RequestIdentification:
// TODO (.NodeType, .UUID, .Address, .Name, .IdTimestamp) -> check + register to NM
// TODO hook here in logic to check identification request, assign nodeID etc
......@@ -140,19 +134,16 @@ func IdentifyPeer(link *NodeLink, myNodeType NodeType) (nodeInfo RequestIdentifi
err = EncodeAndSend(conn, &AcceptIdentification{
NodeType: myNodeType,
MyNodeUUID: 0, // XXX
NumPartitions: 0, // XXX
NumReplicas: 0, // XXX
YourNodeUUID: pkt.NodeUUID,
NumPartitions: 1, // XXX
NumReplicas: 1, // XXX
YourNodeUUID: req.NodeUUID,
})
if err != nil {
return nodeInfo, err
}
nodeInfo = *pkt
}
return nodeInfo, nil
return req, nil
}
// IdentifyMe identifies local node to remote peer
......@@ -240,6 +231,13 @@ func Ask(conn *Conn, req NEOEncoder, resp NEODecoder) error {
return err
}
err = Expect(conn, resp)
return err
}
// Expect receives 1 packet and expects it to be exactly of msg type
// XXX naming (-> Recv1 ?)
func Expect(conn *Conn, msg NEODecoder) error {
pkt, err := conn.Recv()
if err != nil {
return err
......@@ -253,7 +251,7 @@ func Ask(conn *Conn, req NEOEncoder, resp NEODecoder) error {
return fmt.Errorf("invalid msgCode (%d)", msgCode) // XXX err ctx
}
if msgType != reflect.TypeOf(resp) {
if msgType != reflect.TypeOf(msg) {
// Error response
if msgType == reflect.TypeOf(Error{}) {
errResp := Error{}
......@@ -265,10 +263,10 @@ func Ask(conn *Conn, req NEOEncoder, resp NEODecoder) error {
return errDecode(&errResp) // XXX err ctx
}
return fmt.Errorf("unexpected reply: %T", msgType) // XXX err ctx
return fmt.Errorf("unexpected packet: %T", msgType) // XXX err ctx
}
_, err = resp.NEODecode(pkt.Payload())
_, err = msg.NEODecode(pkt.Payload())
if err != nil {
return err // XXX err ctx
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment