Commit 6d541706 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 17cc053b
......@@ -137,7 +137,7 @@ const (
LinkListen LinkFlags = 1 << iota
// for testing:
linkNoRecvSend LinkRole = 1 << 16 // do not spawn serveRecv & serveSend
linkNoRecvSend LinkFlags = 1 << 16 // do not spawn serveRecv & serveSend
)
// newNodeLink makes a new NodeLink from already established net.Conn
......@@ -168,7 +168,7 @@ func newNodeLink(conn net.Conn, role LinkRole, flags LinkFlags) *NodeLink {
panic("invalid conn role")
}
if flags & LinkListen {
if flags&LinkListen != 0 {
acceptq = make(chan *Conn) // accept queue; TODO use backlog
}
......@@ -606,14 +606,14 @@ func (nl *NodeLink) recvPkt() (*PktBuf, error) {
// Handshake performs NEO protocol handshake just after raw connection between 2 nodes was established.
// On success raw connection is returned wrapped into NodeLink.
// On error raw connection is closed.
func Handshake(ctx context.Context, conn net.Conn, role LinkRole) (nl *NodeLink, err error) {
func Handshake(ctx context.Context, conn net.Conn, role LinkRole, flags LinkFlags) (nl *NodeLink, err error) {
err = handshake(ctx, conn, PROTOCOL_VERSION)
if err != nil {
return nil, err
}
// handshake ok -> NodeLink
return newNodeLink(conn, role), nil
return newNodeLink(conn, role, flags), nil
}
// HandshakeError is returned when there is an error while performing handshake
......@@ -711,13 +711,13 @@ func DialLink(ctx context.Context, net xnet.Networker, addr string) (nl *NodeLin
// ListenLink starts listening on laddr for incoming connections and wraps them as NodeLink.
// The listener accepts only those connections that pass handshake.
func ListenLink(net xnet.Networker, laddr string) (*LinkListener, error) {
func ListenLink(net xnet.Networker, laddr string) (LinkListener, error) {
rawl, err := net.Listen(laddr)
if err != nil {
return nil, err
}
l := &LinkListener{
l := &linkListener{
l: rawl,
acceptq: make(chan linkAccepted),
closed: make(chan struct{}),
......@@ -727,9 +727,16 @@ func ListenLink(net xnet.Networker, laddr string) (*LinkListener, error) {
return l, nil
}
// LinkListener wraps net.Listener to return handshaked NodeLink on Accept.
// Create only via Listen.
type LinkListener struct {
// LinkListener is net.Listener adapted to return handshaked NodeLink on Accept.
type LinkListener interface {
net.Listener
// Accept returns new incoming connection wrapped into NodeLink.
// It accepts only those connections which pass handshake.
Accept() (*NodeLink, error)
}
type linkListener struct {
l net.Listener
acceptq chan linkAccepted
closed chan struct {}
......@@ -740,13 +747,13 @@ type linkAccepted struct {
err error
}
func (l *LinkListener) Close() error {
func (l *linkListener) Close() error {
err := l.l.Close()
close(l.closed)
return err
}
func (l *LinkListener) run() {
func (l *linkListener) run() {
// context that cancels when listener stops
runCtx, runCancel := context.WithCancel(context.Background())
defer runCancel()
......@@ -765,7 +772,7 @@ func (l *LinkListener) run() {
}
}
func (l *LinkListener) accept(ctx context.Context, conn net.Conn, err error) {
func (l *linkListener) accept(ctx context.Context, conn net.Conn, err error) {
link, err := l.accept1(ctx, conn, err)
select {
......@@ -780,7 +787,7 @@ func (l *LinkListener) accept(ctx context.Context, conn net.Conn, err error) {
}
}
func (l *LinkListener) accept1(ctx context.Context, conn net.Conn, err error) (*NodeLink, error) {
func (l *linkListener) accept1(ctx context.Context, conn net.Conn, err error) (*NodeLink, error) {
// XXX err ctx?
if err != nil {
......@@ -796,7 +803,7 @@ func (l *LinkListener) accept1(ctx context.Context, conn net.Conn, err error) (*
return link, nil
}
func (l *LinkListener) Accept() (*NodeLink, error) {
func (l *linkListener) Accept() (*NodeLink, error) {
select{
case <-l.closed:
// we know raw listener is already closed - return proper error about it
......@@ -808,7 +815,7 @@ func (l *LinkListener) Accept() (*NodeLink, error) {
}
}
func (l *LinkListener) Addr() net.Addr {
func (l *linkListener) Addr() net.Addr {
return l.l.Addr()
}
......
......@@ -114,7 +114,7 @@ func (n *NodeCommon) Dial(ctx context.Context, peerType NodeType, addr string) (
// Listen starts listening at node's listening address.
// If the address is empty one new free is automatically selected.
// The node information about where it listens at is appropriately updated.
func (n *NodeCommon) Listen() (*Listener, error) {
func (n *NodeCommon) Listen() (Listener, error) {
// start listening
ll, err := ListenLink(n.Net, n.MyInfo.Address.String())
if err != nil {
......@@ -134,7 +134,7 @@ func (n *NodeCommon) Listen() (*Listener, error) {
n.MyInfo.Address = addr
l := &Listener{
l := &listener{
l: ll,
acceptq: make(chan accepted),
closed: make(chan struct{}),
......@@ -144,9 +144,22 @@ func (n *NodeCommon) Listen() (*Listener, error) {
return l, nil
}
// Listener wraps LinkListener to return link on which identification was correctly requested XXX
// Create via Listen. XXX
type Listener struct {
// Listener is LinkListener adapted to return NodeLink with requested identification on Accept.
type Listener interface {
LinkListener
// Accept accepts incoming client connection.
//
// On success the link was handshaked and peer sent us RequestIdentification
// packet which we did not yet answer.
//
// On success returned are:
// - primary link connection which carried identification
// - requested identification packet
Accept() (*Conn, *RequestIdentification, error)
}
type listener struct {
l *LinkListener
acceptq chan accepted
closed chan struct {}
......@@ -158,13 +171,13 @@ type accepted struct {
err error
}
func (l *Listener) Close() error {
func (l *listener) Close() error {
err := l.l.Close()
close(l.closed)
return err
}
func (l *Listener) run() {
func (l *listener) run() {
for {
// stop on close
select {
......@@ -179,7 +192,7 @@ func (l *Listener) run() {
}
}
func (l *Listener) accept(link *NodeLink, err error) {
func (l *listener) accept(link *NodeLink, err error) {
res := make(chan accepted, 1)
go func() {
conn, idReq, err := l.accept1(link, err)
......@@ -209,7 +222,7 @@ func (l *Listener) accept(link *NodeLink, err error) {
}
}
func (l *Listener) accept1(link *NodeLink, err0 error) (_ *Conn, _ *RequestIdentification, err error) {
func (l *listener) accept1(link *NodeLink, err0 error) (_ *Conn, _ *RequestIdentification, err error) {
if err0 != nil {
return nil, nil, err0
}
......@@ -223,6 +236,10 @@ func (l *Listener) accept1(link *NodeLink, err0 error) (_ *Conn, _ *RequestIdent
return nil, nil, err
}
// NOTE NodeLink currently guarantees that after link.Accept() there is
// at least 1 packet in accepted conn. This way the following won't
// block/deadlock if packets with other ConnID comes.
// Still it is a bit fragile.
idReq := &RequestIdentification{}
_, err = conn.Expect(idReq)
if err != nil {
......@@ -235,15 +252,7 @@ func (l *Listener) accept1(link *NodeLink, err0 error) (_ *Conn, _ *RequestIdent
return conn, idReq, nil
}
// Accept accepts incoming client connection.
//
// On success the link was handshaked and peer sent us RequestIdentification
// packet which we did not yet answer.
//
// On success returned are:
// - primary link connection which carried identification
// - requested identification packet
func (l *Listener) Accept() (*Conn, *RequestIdentification, error) {
func (l *listener) Accept() (*Conn, *RequestIdentification, error) {
select{
case <-l.closed:
// we know raw listener is already closed - return proper error about it
......@@ -255,7 +264,7 @@ func (l *Listener) Accept() (*Conn, *RequestIdentification, error) {
}
}
func (l *Listener) Addr() net.Addr {
func (l *listener) Addr() net.Addr {
return l.l.Addr()
}
......
......@@ -288,6 +288,7 @@ loop:
// new connection comes in
case n := <-m.nodeCome:
node, resp := m.identify(ctx, n, /* XXX only accept storages -> PENDING */)
// XXX set node.State = PENDING
// if new storage arrived - start recovery on it too
wg.Add(1)
......
......@@ -220,6 +220,7 @@ func (stor *Storage) talkMaster1(ctx context.Context) (err error) {
Mconn = nil
// }
// XXX must be in background - accept -> close prevConn
Mconn, err = Mlink.Accept()
if err != nil {
return err // XXX ?
......
......@@ -63,6 +63,7 @@ class AdministrationHandler(MasterHandler):
raise ProtocolError('Cannot exit recovery without any '
'storage node')
for node in storage_list:
# XXX note vvv ^^^
assert node.isPending(), node
if node.getConnection().isPending():
# XXX: It's wrong to use ProtocolError here. We must reply
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment