Commit b00a904e authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 35b5e962
...@@ -26,10 +26,12 @@ import ( ...@@ -26,10 +26,12 @@ import (
"errors" "errors"
"fmt" "fmt"
"io" "io"
"math"
"net" "net"
"reflect" "reflect"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time"
"lab.nexedi.com/kirr/neo/go/xcommon/xnet" "lab.nexedi.com/kirr/neo/go/xcommon/xnet"
) )
...@@ -92,14 +94,17 @@ type Conn struct { ...@@ -92,14 +94,17 @@ type Conn struct {
downOnce sync.Once // shutdown may be called by both Close and nodelink.shutdown downOnce sync.Once // shutdown may be called by both Close and nodelink.shutdown
rxerrOnce sync.Once // rx error is reported only once - then it is link down or closed rxerrOnce sync.Once // rx error is reported only once - then it is link down or closed
closed uint32 // 1 if Close was called closed int32 // 1 if Close was called or "connection closed" entry
// 2 if this is temp. Conn created to reply "connection refused" // incremented during every replyNoConn() in progress
errMsg *Error // error message for replyNoConn
} }
var ErrLinkClosed = errors.New("node link is closed") // operations on closed NodeLink var ErrLinkClosed = errors.New("node link is closed") // operations on closed NodeLink
var ErrLinkDown = errors.New("node link is down") // e.g. due to IO error var ErrLinkDown = errors.New("node link is down") // e.g. due to IO error
var ErrLinkNoListen = errors.New("node link is not listening for incoming connections") var ErrLinkNoListen = errors.New("node link is not listening for incoming connections")
var ErrLinkManyConn = errors.New("too many opened connections")
var ErrClosedConn = errors.New("connection is closed") var ErrClosedConn = errors.New("connection is closed")
// XXX unify LinkError & ConnError -> NetError? // XXX unify LinkError & ConnError -> NetError?
...@@ -218,8 +223,24 @@ func (nl *NodeLink) NewConn() (*Conn, error) { ...@@ -218,8 +223,24 @@ func (nl *NodeLink) NewConn() (*Conn, error) {
} }
return nil, nl.err("newconn", ErrLinkDown) return nil, nl.err("newconn", ErrLinkDown)
} }
// nextConnId could wrap around uint32 limits - find first free slot to
// not blindly replace existing connection
for i := uint32(0) ;; i++ {
_, exists := nl.connTab[nl.nextConnId]
if !exists {
break
}
nl.nextConnId += 2
if i > math.MaxUint32 / 2 {
return nil, nl.err("newconn", ErrLinkManyConn)
}
}
c := nl.newConn(nl.nextConnId) c := nl.newConn(nl.nextConnId)
nl.nextConnId += 2 nl.nextConnId += 2
return c, nil return c, nil
} }
...@@ -274,6 +295,9 @@ func (c *Conn) shutdown() { ...@@ -274,6 +295,9 @@ func (c *Conn) shutdown() {
}) })
} }
var connKeepClosed = 1*time.Minute
// Close closes connection. // Close closes connection.
// Any blocked Send*() or Recv*() will be unblocked and return error // Any blocked Send*() or Recv*() will be unblocked and return error
// //
...@@ -282,18 +306,37 @@ func (c *Conn) shutdown() { ...@@ -282,18 +306,37 @@ func (c *Conn) shutdown() {
// //
// It is safe to call Close several times. // It is safe to call Close several times.
func (c *Conn) Close() error { func (c *Conn) Close() error {
// adjust nodeLink.connTab to have special entry noting this connection is closed. nl := c.nodeLink
// this way serveRecv knows to reply errConnClosed over network if peer
// sends us something over this conn. // adjust nodeLink.connTab
c.nodeLink.connMu.Lock() nl.connMu.Lock()
if c.nodeLink.connTab != nil { if nl.connTab != nil {
cc := c.nodeLink.newConn(c.connId) // connection was initiated by us - simply delete - we always
atomic.StoreUint32(&cc.closed, 1) // know if a packet comes to such connection it is closed.
// note cc.down stays not closed so that send can work if c.connId == nl.nextConnId % 2 {
delete(nl.connTab, c.connId)
// connection was initiated by peer which we accepted - put special
// "closed" connection into connTab entry for some time to reply
// "connection closed" if another packet comes to it.
} else {
cc := nl.newConn(c.connId)
// 1 so that cc is not freed by replyNoConn
atomic.StoreInt32(&cc.closed, 1)
// NOTE cc.down stays not closed so Send could work
time.AfterFunc(connKeepClosed, func() {
nl.connMu.Lock()
delete(nl.connTab, cc.connId)
nl.connMu.Unlock()
cc.shutdown()
})
}
} }
c.nodeLink.connMu.Unlock() nl.connMu.Unlock()
atomic.StoreUint32(&c.closed, 1) atomic.StoreInt32(&c.closed, 1)
c.shutdown() c.shutdown()
return nil return nil
} }
...@@ -328,7 +371,7 @@ func (nl *NodeLink) Accept() (c *Conn, err error) { ...@@ -328,7 +371,7 @@ func (nl *NodeLink) Accept() (c *Conn, err error) {
// errRecvShutdown returns appropriate error when c.down is found ready in recvPkt // errRecvShutdown returns appropriate error when c.down is found ready in recvPkt
func (c *Conn) errRecvShutdown() error { func (c *Conn) errRecvShutdown() error {
switch { switch {
case atomic.LoadUint32(&c.closed) != 0: case atomic.LoadInt32(&c.closed) != 0:
return ErrClosedConn return ErrClosedConn
case atomic.LoadUint32(&c.nodeLink.closed) != 0: case atomic.LoadUint32(&c.nodeLink.closed) != 0:
...@@ -393,36 +436,39 @@ func (nl *NodeLink) serveRecv() { ...@@ -393,36 +436,39 @@ func (nl *NodeLink) serveRecv() {
// connTab is never nil here - because shutdown before // connTab is never nil here - because shutdown before
// resetting it waits for us to finish. // resetting it waits for us to finish.
conn := nl.connTab[connId] conn := nl.connTab[connId]
if conn == nil {
// XXX check connId is proper for peer originated streams
if nl.acceptq != nil {
// we are accepting new incoming connection
conn = nl.newConn(connId)
accept = true
}
}
// connection not accepted - reply "connection refused" fmt.Printf("RX .%d -> %v\n", connId, conn)
if conn == nil { if conn == nil {
// "new" connection will be needed in all cases - e.g.
// temporarily to reply "connection refused"
conn = nl.newConn(connId) conn = nl.newConn(connId)
atomic.StoreUint32(&conn.closed, 2)
// NOTE conn.down stays not closed so that Send can work
nl.connMu.Unlock()
go conn.replyNoConn(errConnRefused)
continue
}
switch atomic.LoadUint32(&conn.closed) { fmt.Printf("connId: %d (%d)\n", connId, connId % 2)
case 1: fmt.Printf("nextConnId: %d (%d)\n", nl.nextConnId, nl.nextConnId % 2)
// connection closed - reply "connection closed"
nl.connMu.Unlock() // message with connid that should be initiated by us
go conn.replyNoConn(errConnClosed) if connId % 2 == nl.nextConnId % 2 {
continue conn.errMsg = errConnClosed
// message with connid for a stream initiated by peer
} else {
if nl.acceptq == nil {
conn.errMsg = errConnRefused
} else {
// we are accepting new incoming connection
accept = true
}
fmt.Println("ZZZ", conn.errMsg, accept)
}
}
case 2: // we are not accepting packet in any way
// "connection refused" reply is already in progress if conn.errMsg != nil {
fmt.Printf(".%d EMSG: %v\n", connId, conn.errMsg)
atomic.AddInt32(&conn.closed, 1)
nl.connMu.Unlock() nl.connMu.Unlock()
go conn.replyNoConn()
continue continue
} }
...@@ -468,15 +514,18 @@ var errConnClosed = &Error{PROTOCOL_ERROR, "connection closed"} ...@@ -468,15 +514,18 @@ var errConnClosed = &Error{PROTOCOL_ERROR, "connection closed"}
var errConnRefused = &Error{PROTOCOL_ERROR, "connection refused"} var errConnRefused = &Error{PROTOCOL_ERROR, "connection refused"}
// replyNoConn sends error message to peer when a packet was sent to closed / nonexistent connection // replyNoConn sends error message to peer when a packet was sent to closed / nonexistent connection
func (c *Conn) replyNoConn(e Msg) { // and removes connection from nodeLink connTab if ekeep==false.
c.Send(e) // ignore errors //func (c *Conn) replyNoConn(e Msg, ekeep bool) {
func (c *Conn) replyNoConn() {
c.Send(c.errMsg) // ignore errors
// remove connTab entry if it was temporary conn created only to send errConnRefused // remove connTab entry - if all users of this temporary conn created
if e == errConnRefused { // only to send the error are now gone.
c.nodeLink.connMu.Lock() c.nodeLink.connMu.Lock()
if atomic.AddInt32(&c.closed, -1) == 0 {
delete(c.nodeLink.connTab, c.connId) delete(c.nodeLink.connTab, c.connId)
c.nodeLink.connMu.Unlock()
} }
c.nodeLink.connMu.Unlock()
} }
// ---- transmit ---- // ---- transmit ----
...@@ -490,7 +539,7 @@ type txReq struct { ...@@ -490,7 +539,7 @@ type txReq struct {
// errSendShutdown returns appropriate error when c.down is found ready in Send // errSendShutdown returns appropriate error when c.down is found ready in Send
func (c *Conn) errSendShutdown() error { func (c *Conn) errSendShutdown() error {
switch { switch {
case atomic.LoadUint32(&c.closed) != 0: case atomic.LoadInt32(&c.closed) != 0:
return ErrClosedConn return ErrClosedConn
// the only other error possible besides Conn being .Close()'ed is that // the only other error possible besides Conn being .Close()'ed is that
...@@ -586,7 +635,7 @@ const dumpio = true ...@@ -586,7 +635,7 @@ const dumpio = true
func (nl *NodeLink) sendPkt(pkt *PktBuf) error { func (nl *NodeLink) sendPkt(pkt *PktBuf) error {
if dumpio { if dumpio {
// XXX -> log // XXX -> log
fmt.Printf("%v > %v: %v\n", nl.peerLink.LocalAddr(), nl.peerLink.RemoteAddr(), pkt) fmt.Printf("%v > %v: %v\n", nl.peerLink.LocalAddr(), nl.peerLink.RemoteAddr(), pkt.Dump())
//defer fmt.Printf("\t-> sendPkt err: %v\n", err) //defer fmt.Printf("\t-> sendPkt err: %v\n", err)
} }
...@@ -639,7 +688,7 @@ func (nl *NodeLink) recvPkt() (*PktBuf, error) { ...@@ -639,7 +688,7 @@ func (nl *NodeLink) recvPkt() (*PktBuf, error) {
if dumpio { if dumpio {
// XXX -> log // XXX -> log
fmt.Printf("%v < %v: %v\n", nl.peerLink.LocalAddr(), nl.peerLink.RemoteAddr(), pkt) fmt.Printf("%v < %v: %v\n", nl.peerLink.LocalAddr(), nl.peerLink.RemoteAddr(), pkt.Dump())
} }
return pkt, nil return pkt, nil
......
...@@ -32,6 +32,8 @@ import ( ...@@ -32,6 +32,8 @@ import (
"lab.nexedi.com/kirr/go123/exc" "lab.nexedi.com/kirr/go123/exc"
"lab.nexedi.com/kirr/go123/xerr" "lab.nexedi.com/kirr/go123/xerr"
"github.com/kylelemons/godebug/pretty"
) )
func xclose(c io.Closer) { func xclose(c io.Closer) {
...@@ -121,12 +123,20 @@ func xverifyPkt(pkt *PktBuf, connid uint32, msgcode uint16, payload []byte) { ...@@ -121,12 +123,20 @@ func xverifyPkt(pkt *PktBuf, connid uint32, msgcode uint16, payload []byte) {
errv.Appendf("header: unexpected msglen %v (want %v)", ntoh32(h.MsgLen), len(payload)) errv.Appendf("header: unexpected msglen %v (want %v)", ntoh32(h.MsgLen), len(payload))
} }
if !bytes.Equal(pkt.Payload(), payload) { if !bytes.Equal(pkt.Payload(), payload) {
errv.Appendf("payload differ") errv.Appendf("payload differ:\n%s",
pretty.Compare(string(payload), string(pkt.Payload())))
} }
exc.Raiseif( errv.Err() ) exc.Raiseif( errv.Err() )
} }
// Verify PktBuf to match expected message
func xverifyMsg(pkt *PktBuf, connid uint32, msg Msg) {
data := make([]byte, msg.neoMsgEncodedLen())
msg.neoMsgEncode(data)
xverifyPkt(pkt, connid, msg.neoMsgCode(), data)
}
// delay a bit // delay a bit
// needed e.g. to test Close interaction with waiting read or write // needed e.g. to test Close interaction with waiting read or write
// (we cannot easily sync and make sure e.g. read is started and became asleep) // (we cannot easily sync and make sure e.g. read is started and became asleep)
...@@ -266,7 +276,7 @@ func TestNodeLink(t *testing.T) { ...@@ -266,7 +276,7 @@ func TestNodeLink(t *testing.T) {
xwait(wgclose) xwait(wgclose)
// Test connections on top of nodelink // ---- connections on top of nodelink ----
// Close vs recvPkt // Close vs recvPkt
nl1, nl2 = _nodeLinkPipe(0, linkNoRecvSend) nl1, nl2 = _nodeLinkPipe(0, linkNoRecvSend)
...@@ -439,6 +449,8 @@ func TestNodeLink(t *testing.T) { ...@@ -439,6 +449,8 @@ func TestNodeLink(t *testing.T) {
} }
println("\n---------------------\n")
// Conn accept + exchange // Conn accept + exchange
nl1, nl2 = nodeLinkPipe() nl1, nl2 = nodeLinkPipe()
wg = &xsync.WorkGroup{} wg = &xsync.WorkGroup{}
...@@ -457,7 +469,24 @@ func TestNodeLink(t *testing.T) { ...@@ -457,7 +469,24 @@ func TestNodeLink(t *testing.T) {
xsendPkt(c, mkpkt(36, []byte("pong2"))) xsendPkt(c, mkpkt(36, []byte("pong2")))
xclose(c) xclose(c)
println("B.111")
// "connection refused" when trying to connect to not-listening peer
c = xnewconn(nl2) // XXX should get error here?
xsendPkt(c, mkpkt(38, []byte("pong3")))
pkt = xrecvPkt(c)
xverifyMsg(pkt, c.connId, errConnRefused)
println("B.222")
xsendPkt(c, mkpkt(40, []byte("pong4"))) // once again
pkt = xrecvPkt(c)
xverifyMsg(pkt, c.connId, errConnRefused)
println("B.333")
xclose(c)
}) })
println("A.111")
c = xnewconn(nl1) c = xnewconn(nl1)
xsendPkt(c, mkpkt(33, []byte("ping"))) xsendPkt(c, mkpkt(33, []byte("ping")))
pkt = xrecvPkt(c) pkt = xrecvPkt(c)
...@@ -467,6 +496,22 @@ func TestNodeLink(t *testing.T) { ...@@ -467,6 +496,22 @@ func TestNodeLink(t *testing.T) {
xverifyPkt(pkt, c.connId, 36, []byte("pong2")) xverifyPkt(pkt, c.connId, 36, []byte("pong2"))
xwait(wg) xwait(wg)
println()
println()
println("A.222")
// "connection closed" after peer closed its end
xsendPkt(c, mkpkt(37, []byte("ping3")))
println("A.qqq")
pkt = xrecvPkt(c)
xverifyMsg(pkt, c.connId, errConnClosed)
println("A.zzz")
xsendPkt(c, mkpkt(39, []byte("ping4"))) // once again
pkt = xrecvPkt(c)
xverifyMsg(pkt, c.connId, errConnClosed)
// XXX also should get EOF on recv
println("A.333")
xclose(c) xclose(c)
xclose(nl1) xclose(nl1)
xclose(nl2) xclose(nl2)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment