Commit a585e05d authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 258372a2
......@@ -684,6 +684,8 @@ func handshake(ctx context.Context, conn net.Conn, version uint32) (err error) {
// ---- for convenience: Dial & Listen ----
// XXX we also need 1) Dial + request identification & 2) Listen + verify/accept identification
// Dial connects to address on given network, handshakes and wraps the connection as NodeLink
func Dial(ctx context.Context, net xnet.Networker, addr string) (nl *NodeLink, err error) {
peerConn, err := net.Dial(ctx, addr)
......@@ -744,7 +746,7 @@ func (l *Listener) run() {
default:
}
// XXX add backpressure on too much incoming connections without client .Accept
// XXX add backpressure on too much incoming connections without client .Accept ?
conn, err := l.l.Accept()
go l.accept(runCtx, conn, err)
}
......@@ -754,16 +756,25 @@ func (l *Listener) accept(ctx context.Context, conn net.Conn, err error) {
link, err := l.accept1(ctx, conn, err)
select {
case <-l.closed:
case l.acceptq <- accepted{link, err}:
// ok
case <-l.closed:
// shutdown
if link != nil {
link.Close()
}
}
}
func (l *Listener) accept1(ctx context.Context, conn net.Conn, err error) (*NodeLink, error) {
// XXX err ctx?
if err != nil {
return nil, err
}
// NOTE Handshake closes conn in case of failure
link, err := Handshake(ctx, conn, LinkServer)
if err != nil {
return nil, err
......
......@@ -24,7 +24,7 @@ import (
"context"
"errors"
"fmt"
"math"
// "math"
"sync"
"lab.nexedi.com/kirr/neo/go/neo"
......@@ -66,9 +66,9 @@ type Master struct {
// event: node connects
type nodeCome struct {
link *neo.NodeLink
conn *neo.Conn
idReq neo.RequestIdentification // we received this identification request
idResp chan neo.Msg // what we reply (AcceptIdentification | Error)
idResp chan neo.Msg // what we reply (AcceptIdentification | Error) XXX kill
}
// event: node disconnects
......@@ -108,36 +108,6 @@ func NewMaster(clusterName, serveAddr string, net xnet.Networker) *Master {
return m
}
// Run starts master node and runs it until ctx is cancelled or fatal error
func (m *Master) Run(ctx context.Context) error {
// start listening
l, err := m.node.Listen()
if err != nil {
return err // XXX err ctx
}
m.node.MasterAddr = l.Addr().String()
// serve incoming connections
wg := sync.WaitGroup{}
serveCtx, serveCancel := context.WithCancel(ctx)
wg.Add(1)
go func() {
defer wg.Done()
err = Serve(serveCtx, l, m)
_ = err // XXX what to do with err ?
}()
// main driving logic
err = m.runMain(ctx)
serveCancel()
wg.Wait()
return err // XXX errctx
}
// Start requests cluster to eventually transition into running state
// it returns an error if such transition is not currently possible to begin (e.g. partition table is not operational)
// it returns nil if the transition began.
......@@ -172,6 +142,52 @@ func (m *Master) setClusterState(state neo.ClusterState) {
}
// Run starts master node and runs it until ctx is cancelled or fatal error
func (m *Master) Run(ctx context.Context) error {
// start listening
l, err := m.node.Listen() // XXX -> Listen
if err != nil {
return err // XXX err ctx
}
m.node.MasterAddr = l.Addr().String()
// accept incoming connections and pass them to main driver
wg := sync.WaitGroup{}
serveCtx, serveCancel := context.WithCancel(ctx)
wg.Add(1)
go func() {
defer wg.Done()
for serveCtx.Err() != nil {
conn, idReq, err := l.Accept()
if err != nil {
// TODO log / throttle
continue
}
select {
case m.nodeCome <- nodeCome{conn, idReq, nil/*XXX kill*/}:
// ok
case <-serveCtx.Done():
// shutdown
conn.Link().Close() // XXX log err ?
return
}
}
}()
// main driving logic
err = m.runMain(ctx)
serveCancel()
l.Close() // XXX log err ?
wg.Wait()
return err // XXX errctx
}
// runMain is the process which implements main master cluster management logic: node tracking, cluster
// state updates, scheduling data movement between storage nodes etc
func (m *Master) runMain(ctx context.Context) (err error) {
......@@ -695,7 +711,7 @@ func (m *Master) accept(n nodeCome) (node *neo.Node, ok bool) {
IdTimestamp: monotime(),
}
node = m.nodeTab.Update(nodeInfo, n.link) // NOTE this notifies all nodeTab subscribers
node = m.nodeTab.Update(nodeInfo, n.conn.Link()) // NOTE this notifies all nodeTab subscribers
return node, true
}
......@@ -763,11 +779,10 @@ func (m *Master) ServeLink(ctx context.Context, link *neo.NodeLink) {
return
}
// convey identification request to master
idRespCh := make(chan neo.Msg)
m.nodeCome <- nodeCome{link, idReq, idRespCh}
idResp := <-idRespCh
// convey identification request to master and we are done here - the master takes on the torch
m.nodeCome <- nodeCome{conn, idReq, nil/*XXX kill*/}
/*
// if master accepted this node - don't forget to notify when it leaves
_, rejected := idResp.(error)
if !rejected {
......@@ -861,6 +876,7 @@ func (m *Master) ServeLink(ctx context.Context, link *neo.NodeLink) {
// storage:
m.DriveStorage(ctx, link)
*/
}
// ServeClient serves incoming connection on which peer identified itself as client
......
......@@ -23,14 +23,129 @@ package server
// common parts for organizing network servers
import (
"context"
"fmt"
// "context"
// "fmt"
"net"
"lab.nexedi.com/kirr/neo/go/neo"
"lab.nexedi.com/kirr/go123/xerr"
)
// Listener wraps neo.Listener to return link on which identification was correctly requested XXX
// Create via Listen. XXX
type Listener struct {
l *neo.Listener
acceptq chan accepted
closed chan struct {}
}
type accepted struct {
conn *neo.Conn
idReq *neo.RequestIdentification
err error
}
func (l *Listener) Close() error {
err := l.l.Close()
close(l.closed)
return err
}
func (l *Listener) run() {
for {
// stop on close
select {
case <-l.closed:
return
default:
}
// XXX add backpressure on too much incoming connections without client .Accept ?
link, err := l.l.Accept()
go l.accept(link, err)
}
}
func (l *Listener) accept(link *neo.NodeLink, err error) {
res := make(chan accepted, 1)
go func() {
conn, idReq, err := l.accept1(link, err)
res <- accepted{conn, idReq, err}
}()
// wait for accept1 result & resend it to .acceptq
// close link in case of listening cancel or error
//
// the only case when link stays alive is when acceptance was
// successful and link ownership is passed to Accept.
ok := false
select {
case <-l.closed:
case a := <-res:
select {
case l.acceptq <- a:
ok = (a.err == nil)
case <-l.closed:
}
}
if !ok {
link.Close()
}
}
func (l *Listener) accept1(link *neo.NodeLink, err0 error) (_ *neo.Conn, _ *neo.RequestIdentification, err error) {
if err0 != nil {
return nil, nil, err0
}
defer xerr.Context(&err, "identify")
// identify peer
// the first conn must come with RequestIdentification packet
conn, err := link.Accept()
if err != nil {
return nil, nil, err
}
idReq := &neo.RequestIdentification{}
_, err = conn.Expect(idReq)
if err != nil {
// XXX ok to let peer know error as is? e.g. even IO error on Recv?
err2 := conn.Send(&neo.Error{neo.PROTOCOL_ERROR, err.Error()})
err = xerr.Merge(err, err2)
return nil, nil, err
}
return conn, idReq, nil
}
// Accept accepts incoming client connection.
//
// On success the link was handshaked and on returned Conn peer sent us
// RequestIdentification packet which we did not yet answer.
func (l *Listener) Accept() (*neo.Conn, *neo.RequestIdentification, error) {
select{
case <-l.closed:
// we know raw listener is already closed - return proper error about it
_, err := l.l.Accept()
return nil, nil, err
case a := <-l.acceptq:
return a.conn, a.idReq, a.err
}
}
func (l *Listener) Addr() net.Addr {
return l.l.Addr()
}
/*
// Server is an interface that represents networked server
type Server interface {
// ServeLink serves already established nodelink (connection) in a blocking way.
......@@ -70,10 +185,12 @@ func Serve(ctx context.Context, l *neo.Listener, srv Server) error {
return err
}
// XXX close link when either cancelling or returning?
// XXX only returning with error!
go srv.ServeLink(ctx, link)
}
}
*/
// ----------------------------------------
......
......@@ -99,8 +99,11 @@ func (stor *Storage) Run(ctx context.Context) error {
wg.Add(1)
go func() {
defer wg.Done()
err = Serve(serveCtx, l, stor)
_ = err // XXX what to do with err ?
// TODO l.Accept() -> nodeCome
// err = Serve(serveCtx, l, stor)
// _ = err // XXX what to do with err ?
}()
// connect to master and get commands and updates from it
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment