Commit 35b5e962 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent df5aca5b
......@@ -47,7 +47,7 @@ import (
// send/receive exchange will be happening in between those 2 connections.
//
// For a node to be able to accept new incoming connection it has to have
// "server" role - see newNodeLink() for details.
// "server" role - see newNodeLink() for details. XXX might change to everyone is able to accept.
//
// A NodeLink has to be explicitly closed, once it is no longer needed.
//
......@@ -92,7 +92,8 @@ type Conn struct {
downOnce sync.Once // shutdown may be called by both Close and nodelink.shutdown
rxerrOnce sync.Once // rx error is reported only once - then it is link down or closed
closed uint32 // whether Close was called
closed uint32 // 1 if Close was called
// 2 if this is temp. Conn created to reply "connection refused"
}
......@@ -101,6 +102,10 @@ var ErrLinkDown = errors.New("node link is down") // e.g. due to IO error
var ErrLinkNoListen = errors.New("node link is not listening for incoming connections")
var ErrClosedConn = errors.New("connection is closed")
// XXX unify LinkError & ConnError -> NetError?
// (think from point of view how user should be handling errors)
// XXX or it is good to be able to distinguish between only conn error vs whole-link error?
// LinkError is usually returned by NodeLink operations
type LinkError struct {
Link *NodeLink
......@@ -276,14 +281,16 @@ func (c *Conn) shutdown() {
// background on the wire not to break node-node link framing.
//
// It is safe to call Close several times.
//
// TODO Close on one end must make Recv/Send on another end fail
// (UC: sending []txn-info)
func (c *Conn) Close() error {
// adjust nodeLink.connTab
// (if nodelink was already shut down and connTab=nil - delete will be noop)
// adjust nodeLink.connTab to have special entry noting this connection is closed.
// this way serveRecv knows to reply errConnClosed over network if peer
// sends us something over this conn.
c.nodeLink.connMu.Lock()
delete(c.nodeLink.connTab, c.connId)
if c.nodeLink.connTab != nil {
cc := c.nodeLink.newConn(c.connId)
atomic.StoreUint32(&cc.closed, 1)
// note cc.down stays not closed so that send can work
}
c.nodeLink.connMu.Unlock()
atomic.StoreUint32(&c.closed, 1)
......@@ -291,6 +298,8 @@ func (c *Conn) Close() error {
return nil
}
// ---- receive ----
// Accept waits for and accepts incoming connection on top of node-node link
func (nl *NodeLink) Accept() (c *Conn, err error) {
defer func() {
......@@ -394,9 +403,25 @@ func (nl *NodeLink) serveRecv() {
}
}
// we have not accepted incoming connection - ignore packet
// connection not accepted - reply "connection refused"
if conn == nil {
// XXX also log / increment counter?
conn = nl.newConn(connId)
atomic.StoreUint32(&conn.closed, 2)
// NOTE conn.down stays not closed so that Send can work
nl.connMu.Unlock()
go conn.replyNoConn(errConnRefused)
continue
}
switch atomic.LoadUint32(&conn.closed) {
case 1:
// connection closed - reply "connection closed"
nl.connMu.Unlock()
go conn.replyNoConn(errConnClosed)
continue
case 2:
// "connection refused" reply is already in progress
nl.connMu.Unlock()
continue
}
......@@ -437,6 +462,24 @@ func (nl *NodeLink) serveRecv() {
}
}
// ---- network replies for closed / refused connections ----
var errConnClosed = &Error{PROTOCOL_ERROR, "connection closed"}
var errConnRefused = &Error{PROTOCOL_ERROR, "connection refused"}
// replyNoConn sends error message to peer when a packet was sent to closed / nonexistent connection
func (c *Conn) replyNoConn(e Msg) {
c.Send(e) // ignore errors
// remove connTab entry if it was temporary conn created only to send errConnRefused
if e == errConnRefused {
c.nodeLink.connMu.Lock()
delete(c.nodeLink.connTab, c.connId)
c.nodeLink.connMu.Unlock()
}
}
// ---- transmit ----
// txReq is request to transmit a packet. Result error goes back to errch
type txReq struct {
......
......@@ -141,8 +141,10 @@ type NodeUUID int32
// ErrDecodeOverflow is the error returned by neoMsgDecode when decoding hit buffer overflow
var ErrDecodeOverflow = errors.New("decode: bufer overflow")
// Msg is the interface implemented by NEO messages to marshal/unmarshal them into/from wire format
// Msg is the interface implemented by all NEO messages.
type Msg interface {
// marshal/unmarshal into/from wire format:
// neoMsgCode returns message code needed to be used for particular message type
// on the wire
neoMsgCode() uint16
......
......@@ -205,6 +205,9 @@ func (stor *Storage) talkMaster1(ctx context.Context) (err error) {
type accepted struct {conn *neo.Conn; err error}
acceptq := make(chan accepted)
go func () {
// XXX (temp ?) disabled not to let S accept new connections
return
for {
conn, err := Mlink.Accept()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment