Commit 2a04321a authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent d3713e72
......@@ -296,7 +296,7 @@ func TestMasterStorage(t *testing.T) {
}
// basic interaction between Client -- Storage
func TestClientStorage(t *testing.T) {
func _TestClientStorage(t *testing.T) {
// XXX temp disabled
return
......
......@@ -154,7 +154,6 @@ func (m *Master) Run(ctx context.Context) (err error) {
}
defer runningf(&ctx, "master(%v)", l.Addr())(&err)
log.Info(ctx, "serving ...")
m.node.MasterAddr = l.Addr().String()
......@@ -288,7 +287,7 @@ loop:
select {
// new connection comes in
case n := <-m.nodeCome:
node, resp := m.identify(n, /* XXX only accept storages -> PENDING */)
node, resp := m.identify(ctx, n, /* XXX only accept storages -> PENDING */)
// if new storage arrived - start recovery on it too
wg.Add(1)
......@@ -509,7 +508,7 @@ loop:
for inprogress > 0 {
select {
case n := <-m.nodeCome:
node, resp := m.identify(n, /* XXX only accept storages -> known ? RUNNING : PENDING */)
node, resp := m.identify(ctx, n, /* XXX only accept storages -> known ? RUNNING : PENDING */)
// XXX handle resp ^^^ like in recover
_, ok := resp.(*neo.AcceptIdentification)
if !ok {
......@@ -663,7 +662,7 @@ loop:
select {
// a node connected and requests identification
case n := <-m.nodeCome:
node, resp := m.identify(n, /* XXX accept everyone */)
node, resp := m.identify(ctx, n, /* XXX accept everyone */)
//state := m.clusterState
_ = node
......@@ -832,40 +831,51 @@ loop:
// If node identification is accepted .nodeTab is updated and corresponding node entry is returned.
// Response message is constructed but not send back not to block the caller - it is
// the caller responsibility to send the response to node which requested identification.
func (m *Master) identify(n nodeCome) (node *neo.Node, resp neo.Msg) {
// TODO log node accept/rejected
func (m *Master) identify(ctx context.Context, n nodeCome) (node *neo.Node, resp neo.Msg) {
// XXX also verify ? :
// - NodeType valid
// - IdTimestamp ?
if n.idReq.ClusterName != m.node.ClusterName {
return nil, &neo.Error{neo.PROTOCOL_ERROR, "cluster name mismatch"}
}
uuid := n.idReq.NodeUUID
nodeType := n.idReq.NodeType
uuid := n.idReq.NodeUUID
if uuid == 0 {
uuid = m.allocUUID(nodeType)
}
// XXX uuid < 0 (temporary) -> reallocate if conflict ?
err := func() *neo.Error {
if n.idReq.ClusterName != m.node.ClusterName {
return &neo.Error{neo.PROTOCOL_ERROR, "cluster name mismatch"}
}
node = m.nodeTab.Get(uuid)
if node != nil {
// reject - uuid is already occupied by someone else
// XXX check also for down state - it could be the same node reconnecting
return nil, &neo.Error{neo.PROTOCOL_ERROR, "uuid %v already used by another node" /*XXX*/}
}
if uuid == 0 {
uuid = m.allocUUID(nodeType)
}
// XXX uuid < 0 (temporary) -> reallocate if conflict ?
// XXX accept only certain kind of nodes depending on .clusterState, e.g.
switch nodeType {
case neo.CLIENT:
return nil, &neo.Error{neo.NOT_READY, "cluster not operational"}
// XXX check uuid matches NodeType
node = m.nodeTab.Get(uuid)
if node != nil {
// reject - uuid is already occupied by someone else
// XXX check also for down state - it could be the same node reconnecting
return &neo.Error{neo.PROTOCOL_ERROR, fmt.Sprintf("uuid %v already used by another node", uuid)}
}
// XXX ...
// XXX accept only certain kind of nodes depending on .clusterState, e.g.
switch nodeType {
case neo.CLIENT:
return &neo.Error{neo.NOT_READY, "cluster not operational"}
// XXX ...
}
return nil
}()
subj := fmt.Sprintf("identify: %s (%s)", n.conn.Link().RemoteAddr(), n.idReq.NodeUUID)
if err != nil {
log.Infof(ctx, "%s: rejecting: %s", subj, err)
return nil, err
}
log.Infof(ctx, "%s: accepting as %s", subj, uuid)
accept := &neo.AcceptIdentification{
NodeType: neo.MASTER,
......
......@@ -95,7 +95,6 @@ func (stor *Storage) Run(ctx context.Context) error {
}
defer runningf(&ctx, "storage(%v)", l.Addr())(&err)
log.Infof(ctx, "serving on %s ...", l.Addr())
// start serving incoming connections
wg := sync.WaitGroup{}
......
......@@ -64,7 +64,7 @@ func (d Depth) Info(ctx context.Context, argv ...interface{}) {
func (d Depth) Infof(ctx context.Context, format string, argv ...interface{}) {
// XXX avoid formatting if logging severity disabled
glog.InfoDepth(int(d+1), withTask(ctx, fmt.Sprintf(format, argv))...)
glog.InfoDepth(int(d+1), withTask(ctx, fmt.Sprintf(format, argv...))...)
}
func (d Depth) Error(ctx context.Context, argv ...interface{}) {
......@@ -72,7 +72,7 @@ func (d Depth) Error(ctx context.Context, argv ...interface{}) {
}
func (d Depth) Errorf(ctx context.Context, format string, argv ...interface{}) {
glog.ErrorDepth(int(d+1), withTask(ctx, fmt.Sprintf(format, argv))...)
glog.ErrorDepth(int(d+1), withTask(ctx, fmt.Sprintf(format, argv...))...)
}
......
......@@ -66,7 +66,7 @@ func (st *SyncTracer) Trace1(event interface{}) {
// The consumer, after dealing with the message, must send back an ack.
func (st *SyncTracer) Get1() *SyncTraceMsg {
msg := <-st.tracech
fmt.Printf("trace: get1: %T %v\n", msg.Event, msg.Event)
//fmt.Printf("trace: get1: %T %v\n", msg.Event, msg.Event)
return msg
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment