Commit 47da6c8e authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 6d541706
......@@ -120,8 +120,13 @@ type LinkRole int
const (
LinkServer LinkRole = iota // link created as server
LinkClient // link created as client
// for testing:
linkNoRecvSend LinkRole = 1 << 16 // do not spawn serveRecv & serveSend
linkFlagsMask LinkRole = (1<<32 - 1) << 16
)
/*
// LinkFlags allow to customize NodeLink behaviour
type LinkFlags int
const (
......@@ -139,6 +144,7 @@ const (
// for testing:
linkNoRecvSend LinkFlags = 1 << 16 // do not spawn serveRecv & serveSend
)
*/
// newNodeLink makes a new NodeLink from already established net.Conn
//
......@@ -149,38 +155,34 @@ const (
// there is no conflict in identifiers if one side always allocates them as
// even (server) and its peer as odd (client).
//
// Flags allows to customize link behaviour.
//
// Usually server role should be used for connections created via
// net.Listen/net.Accept and client role for connections created via net.Dial.
//
// Though it is possible to wrap just-established raw connection into NodeLink,
// users should always use Handshake which performs protocol handshaking first.
func newNodeLink(conn net.Conn, role LinkRole, flags LinkFlags) *NodeLink {
func newNodeLink(conn net.Conn, role LinkRole) *NodeLink {
var nextConnId uint32
var acceptq chan *Conn // not accepting incoming connections by default
switch role {
var acceptq chan *Conn
switch role &^ linkFlagsMask {
case LinkServer:
nextConnId = 0 // all initiated by us connId will be even
acceptq = make(chan *Conn) // accept queue; TODO use backlog?
case LinkClient:
nextConnId = 1 // ----//---- odd
acceptq = nil // not accepting incoming connections
default:
panic("invalid conn role")
}
if flags&LinkListen != 0 {
acceptq = make(chan *Conn) // accept queue; TODO use backlog
}
nl := &NodeLink{
peerLink: conn,
connTab: map[uint32]*Conn{},
nextConnId: nextConnId,
acceptq: acceptq,
acceptq: acceptq, // XXX reenable make(chan *Conn), // accepting initially
txq: make(chan txReq),
down: make(chan struct{}),
}
if flags&linkNoRecvSend == 0 {
if role&linkNoRecvSend == 0 {
nl.serveWg.Add(2)
go nl.serveRecv()
go nl.serveSend()
......@@ -606,14 +608,14 @@ func (nl *NodeLink) recvPkt() (*PktBuf, error) {
// Handshake performs NEO protocol handshake just after raw connection between 2 nodes was established.
// On success raw connection is returned wrapped into NodeLink.
// On error raw connection is closed.
func Handshake(ctx context.Context, conn net.Conn, role LinkRole, flags LinkFlags) (nl *NodeLink, err error) {
func Handshake(ctx context.Context, conn net.Conn, role LinkRole) (nl *NodeLink, err error) {
err = handshake(ctx, conn, PROTOCOL_VERSION)
if err != nil {
return nil, err
}
// handshake ok -> NodeLink
return newNodeLink(conn, role, flags), nil
return newNodeLink(conn, role), nil
}
// HandshakeError is returned when there is an error while performing handshake
......@@ -729,7 +731,9 @@ func ListenLink(net xnet.Networker, laddr string) (LinkListener, error) {
// LinkListener is net.Listener adapted to return handshaked NodeLink on Accept.
type LinkListener interface {
net.Listener
// from net.Listener:
Close() error
Addr() net.Addr
// Accept returns new incoming connection wrapped into NodeLink.
// It accepts only those connections which pass handshake.
......
......@@ -146,7 +146,9 @@ func (n *NodeCommon) Listen() (Listener, error) {
// Listener is LinkListener adapted to return NodeLink with requested identification on Accept.
type Listener interface {
LinkListener
// from LinkListener:
Close() error
Addr() net.Addr
// Accept accepts incoming client connection.
//
......@@ -160,7 +162,7 @@ type Listener interface {
}
type listener struct {
l *LinkListener
l LinkListener
acceptq chan accepted
closed chan struct {}
}
......
......@@ -163,7 +163,7 @@ func (stor *Storage) talkMaster(ctx context.Context) (err error) {
}
}
// talkMaster1 does 1 cycle of connect/talk/disconnect to master
// talkMaster1 does 1 cycle of connect/talk/disconnect to master.
// it returns error describing why such cycle had to finish
// XXX distinguish between temporary problems and non-temporary ones?
func (stor *Storage) talkMaster1(ctx context.Context) (err error) {
......@@ -196,49 +196,65 @@ func (stor *Storage) talkMaster1(ctx context.Context) (err error) {
stor.node.MyInfo.NodeUUID = accept.YourNodeUUID
}
// accept next connection from master. only 1 connection is served at any given time.
// every new connection from master means talk over previous connection is done/cancelled.
// XXX check compatibility with py
type accepted struct {
conn *Conn
err error
}
acceptq := make(chan accepted, 1)
go func () {
for {
conn, err := Mlink.Accept()
acceptq <- accepted{conn, err}
if err != nil {
break
}
}
}()
// now handle notifications and commands from master
talkq := make(chan error, 1)
loop:
for {
// check if it was context cancel or command from master to shutdown
select {
case <-ctx.Done():
return ctx.Err()
default:
}
// main worker which talks with master over Mconn
// puts error after talk finishes -> talkq
go func() {
err := func() error {
// let master initialize us. If successful this ends with StartOperation command.
err := stor.m1initialize(ctx, Mconn)
if err != nil {
log.Error(ctx, err)
return err
}
if err != nil /* TODO .IsShutdown(...) */ { // TODO
return err
}
// we got StartOperation command. Let master drive us during servicing phase.
err = stor.m1serve(ctx, Mconn)
log.Error(ctx, err)
return err
}()
talkq <- err
}()
// accept next connection from master. only 1 connection is served at any given time
// XXX every new connection from master means previous connection was closed
// XXX how to do so and stay compatible to py?
//
// XXX or simply use only the first connection and if M decides
// to cancel - close whole nodelink and S reconnects?
// if Mconn != nil {
Mconn.Close() // XXX err
Mconn = nil
// }
// XXX must be in background - accept -> close prevConn
Mconn, err = Mlink.Accept()
if err != nil {
return err // XXX ?
}
// talk finished / next connection / cancel
select {
case err = <-talkq:
// XXX check for shutdown command
continue loop // retry from initializing
// XXX close Mconn on ctx cancel so m1initialize or m1serve wake up
case MnextConn, err := <-acceptq:
lclose(ctx, Mconn) // wakeup/cancel current talk
if err != nil {
return err
}
<-talkq
Mconn = MnextConn
// let master initialize us. If successful this ends with StartOperation command.
err = stor.m1initialize(ctx, Mconn)
if err != nil {
log.Error(ctx, err)
continue // retry initializing
case <-ctx.Done():
return ctx.Err()
}
// we got StartOperation command. Let master drive us during servicing phase.
err = stor.m1serve(ctx, Mconn)
log.Error(ctx, err)
continue // retry from initializing
}
return nil // XXX err
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment