Commit 6fd0c9be authored by Kirill Smelkov's avatar Kirill Smelkov

X connection: Adding context to errors from NodeLink and Conn operations

parent 3dab1cf0
...@@ -92,12 +92,24 @@ type Conn struct { ...@@ -92,12 +92,24 @@ type Conn struct {
} }
// XXX include actual op (read/write/accept/connect) when there is an error ?
var ErrLinkClosed = errors.New("node link is closed") // operations on closed NodeLink var ErrLinkClosed = errors.New("node link is closed") // operations on closed NodeLink
var ErrLinkDown = errors.New("node link is down") // e.g. due to IO error var ErrLinkDown = errors.New("node link is down") // e.g. due to IO error
var ErrLinkNoListen = errors.New("node link is not listening for incoming connections") var ErrLinkNoListen = errors.New("node link is not listening for incoming connections")
var ErrClosedConn = errors.New("read/write on closed connection") var ErrClosedConn = errors.New("connection is closed")
// LinkError is usually returned by NodeLink operations
type LinkError struct {
Link *NodeLink
Op string
Err error
}
// ConnError is usually returned by Conn operations
type ConnError struct {
Conn *Conn
Op string
Err error
}
// LinkRole is a role an end of NodeLink is intended to play // LinkRole is a role an end of NodeLink is intended to play
type LinkRole int type LinkRole int
...@@ -176,9 +188,9 @@ func (nl *NodeLink) NewConn() (*Conn, error) { ...@@ -176,9 +188,9 @@ func (nl *NodeLink) NewConn() (*Conn, error) {
defer nl.connMu.Unlock() defer nl.connMu.Unlock()
if nl.connTab == nil { if nl.connTab == nil {
if atomic.LoadUint32(&nl.closed) != 0 { if atomic.LoadUint32(&nl.closed) != 0 {
return nil, ErrLinkClosed return nil, nl.err("newconn", ErrLinkClosed)
} }
return nil, ErrLinkDown return nil, nl.err("newconn", ErrLinkDown)
} }
c := nl.newConn(nl.nextConnId) c := nl.newConn(nl.nextConnId)
nl.nextConnId += 2 nl.nextConnId += 2
...@@ -225,7 +237,7 @@ func (nl *NodeLink) Close() error { ...@@ -225,7 +237,7 @@ func (nl *NodeLink) Close() error {
atomic.StoreUint32(&nl.closed, 1) atomic.StoreUint32(&nl.closed, 1)
nl.shutdown() nl.shutdown()
nl.downWg.Wait() nl.downWg.Wait()
return nl.errClose return nl.err("close", nl.errClose)
} }
// shutdown marks connection as no longer operational // shutdown marks connection as no longer operational
...@@ -256,7 +268,13 @@ func (c *Conn) Close() error { ...@@ -256,7 +268,13 @@ func (c *Conn) Close() error {
} }
// Accept waits for and accepts incoming connection on top of node-node link // Accept waits for and accepts incoming connection on top of node-node link
func (nl *NodeLink) Accept() (*Conn, error) { func (nl *NodeLink) Accept() (c *Conn, err error) {
defer func() {
if err != nil {
err = nl.err("accept", err)
}
}()
// this node link is not accepting connections // this node link is not accepting connections
if nl.acceptq == nil { if nl.acceptq == nil {
return nil, ErrLinkNoListen return nil, ErrLinkNoListen
...@@ -304,7 +322,7 @@ func (c *Conn) errRecvShutdown() error { ...@@ -304,7 +322,7 @@ func (c *Conn) errRecvShutdown() error {
func (c *Conn) Recv() (*PktBuf, error) { func (c *Conn) Recv() (*PktBuf, error) {
select { select {
case <-c.down: case <-c.down:
return nil, c.errRecvShutdown() return nil, c.err("recv", c.errRecvShutdown())
case pkt := <-c.rxq: case pkt := <-c.rxq:
return pkt, nil return pkt, nil
...@@ -318,6 +336,7 @@ func (nl *NodeLink) serveRecv() { ...@@ -318,6 +336,7 @@ func (nl *NodeLink) serveRecv() {
defer nl.serveWg.Done() defer nl.serveWg.Done()
for { for {
// receive 1 packet // receive 1 packet
// XXX if nl.peerLink was just closed by tx->shutdown we'll get ErrNetClosing
pkt, err := nl.recvPkt() pkt, err := nl.recvPkt()
//fmt.Printf("recvPkt -> %v, %v\n", pkt, err) //fmt.Printf("recvPkt -> %v, %v\n", pkt, err)
if err != nil { if err != nil {
...@@ -419,6 +438,11 @@ func (c *Conn) errSendShutdown() error { ...@@ -419,6 +438,11 @@ func (c *Conn) errSendShutdown() error {
// Send sends packet via connection // Send sends packet via connection
func (c *Conn) Send(pkt *PktBuf) error { func (c *Conn) Send(pkt *PktBuf) error {
err := c.send(pkt)
return c.err("send", err)
}
func (c *Conn) send(pkt *PktBuf) error {
// set pkt connId associated with this connection // set pkt connId associated with this connection
pkt.Header().ConnId = hton32(c.connId) pkt.Header().ConnId = hton32(c.connId)
var err error var err error
...@@ -467,6 +491,7 @@ func (nl *NodeLink) serveSend() { ...@@ -467,6 +491,7 @@ func (nl *NodeLink) serveSend() {
return return
case txreq := <-nl.txq: case txreq := <-nl.txq:
// XXX if n.peerLink was just closed by rx->shutdown we'll get ErrNetClosing
err := nl.sendPkt(txreq.pkt) err := nl.sendPkt(txreq.pkt)
//fmt.Printf("sendPkt -> %v\n", err) //fmt.Printf("sendPkt -> %v\n", err)
...@@ -501,7 +526,6 @@ func (nl *NodeLink) sendPkt(pkt *PktBuf) error { ...@@ -501,7 +526,6 @@ func (nl *NodeLink) sendPkt(pkt *PktBuf) error {
return err return err
} }
var ErrPktTooSmall = errors.New("packet too small")
var ErrPktTooBig = errors.New("packet too big") var ErrPktTooBig = errors.New("packet too big")
// recvPkt receives raw packet from peer // recvPkt receives raw packet from peer
...@@ -568,7 +592,18 @@ func Handshake(ctx context.Context, conn net.Conn, role LinkRole) (nl *NodeLink, ...@@ -568,7 +592,18 @@ func Handshake(ctx context.Context, conn net.Conn, role LinkRole) (nl *NodeLink,
return newNodeLink(conn, role), nil return newNodeLink(conn, role), nil
} }
// handshake is worker for Handshake // HandshakeError is returned when there is an error while performing handshake
type HandshakeError struct {
// XXX just keep .Conn? (but .Conn can be closed)
LocalAddr net.Addr
RemoteAddr net.Addr
Err error
}
func (e *HandshakeError) Error() string {
return fmt.Sprintf("%s - %s: handshake: %s", e.LocalAddr, e.RemoteAddr, e.Err.Error())
}
func handshake(ctx context.Context, conn net.Conn, version uint32) (err error) { func handshake(ctx context.Context, conn net.Conn, version uint32) (err error) {
errch := make(chan error, 2) errch := make(chan error, 2)
...@@ -637,21 +672,10 @@ func handshake(ctx context.Context, conn net.Conn, version uint32) (err error) { ...@@ -637,21 +672,10 @@ func handshake(ctx context.Context, conn net.Conn, version uint32) (err error) {
return nil return nil
} }
type HandshakeError struct {
// XXX just keep .Conn? (but .Conn can be closed)
LocalAddr net.Addr
RemoteAddr net.Addr
Err error
}
func (e *HandshakeError) Error() string {
return fmt.Sprintf("%s - %s: handshake: %s", e.LocalAddr, e.RemoteAddr, e.Err.Error())
}
// ---- for convenience: Dial ---- // ---- for convenience: Dial ----
// Dial connects to address on named network and wrap the connection as NodeLink // Dial connects to address on named network, handshakes and wraps the connection as NodeLink
// TODO +tls.Config // TODO +tls.Config
func Dial(ctx context.Context, network, address string) (nl *NodeLink, err error) { func Dial(ctx context.Context, network, address string) (nl *NodeLink, err error) {
d := net.Dialer{} d := net.Dialer{}
...@@ -697,10 +721,11 @@ func Listen(network, laddr string) (*Listener, error) { ...@@ -697,10 +721,11 @@ func Listen(network, laddr string) (*Listener, error) {
// ---- for convenience: String ---- // ---- for convenience: String / Error ----
func (nl *NodeLink) String() string { func (nl *NodeLink) String() string {
s := fmt.Sprintf("%s - %s", nl.peerLink.LocalAddr(), nl.peerLink.RemoteAddr()) s := fmt.Sprintf("%s - %s", nl.peerLink.LocalAddr(), nl.peerLink.RemoteAddr())
return s // XXX add "(closed)" if nl is closed ? return s // XXX add "(closed)" if nl is closed ?
// XXX other flags e.g. (down) ?
} }
func (c *Conn) String() string { func (c *Conn) String() string {
...@@ -708,6 +733,28 @@ func (c *Conn) String() string { ...@@ -708,6 +733,28 @@ func (c *Conn) String() string {
return s // XXX add "(closed)" if c is closed ? return s // XXX add "(closed)" if c is closed ?
} }
func (e *LinkError) Error() string {
return fmt.Sprintf("%s: %s: %s", e.Link, e.Op, e.Err)
}
func (e *ConnError) Error() string {
return fmt.Sprintf("%s: %s: %s", e.Conn, e.Op, e.Err)
}
func (nl *NodeLink) err(op string, e error) error {
if e == nil {
return nil
}
return &LinkError{Link: nl, Op: op, Err: e}
}
func (c *Conn) err(op string, e error) error {
if e == nil {
return nil
}
return &ConnError{Conn: c, Op: op, Err: e}
}
// ---------------------------------------- // ----------------------------------------
......
...@@ -104,6 +104,24 @@ func xhandshake(ctx context.Context, c net.Conn, version uint32) { ...@@ -104,6 +104,24 @@ func xhandshake(ctx context.Context, c net.Conn, version uint32) {
exc.Raiseif(err) exc.Raiseif(err)
} }
// xlinkError verifies that err is *LinkError and returns err.Err
func xlinkError(err error) error {
le, ok := err.(*LinkError)
if !ok {
exc.Raisef("%#v is not *LinkError", err)
}
return le.Err
}
// xconnError verifies that err is *ConnError and returns err.Err
func xconnError(err error) error {
ce, ok := err.(*ConnError)
if !ok {
exc.Raisef("%#v is not *ConnError", err)
}
return ce.Err
}
// Prepare PktBuf with content // Prepare PktBuf with content
func _mkpkt(connid uint32, msgcode uint16, payload []byte) *PktBuf { func _mkpkt(connid uint32, msgcode uint16, payload []byte) *PktBuf {
pkt := &PktBuf{make([]byte, PktHeadLen + len(payload))} pkt := &PktBuf{make([]byte, PktHeadLen + len(payload))}
...@@ -206,13 +224,13 @@ func TestNodeLink(t *testing.T) { ...@@ -206,13 +224,13 @@ func TestNodeLink(t *testing.T) {
xclose(nl2) xclose(nl2)
}) })
c, err := nl2.Accept() c, err := nl2.Accept()
if !(c == nil && err == ErrLinkClosed) { if !(c == nil && xlinkError(err) == ErrLinkClosed) {
t.Fatalf("NodeLink.Accept() after close: conn = %v, err = %v", c, err) t.Fatalf("NodeLink.Accept() after close: conn = %v, err = %v", c, err)
} }
// nl1 is not accepting connections - because it has LinkClient role // nl1 is not accepting connections - because it has LinkClient role
// check Accept behaviour. // check Accept behaviour.
c, err = nl1.Accept() c, err = nl1.Accept()
if !(c == nil && err == ErrLinkNoListen) { if !(c == nil && xlinkError(err) == ErrLinkNoListen) {
t.Fatalf("NodeLink.Accept() on non-listening node link: conn = %v, err = %v", c, err) t.Fatalf("NodeLink.Accept() on non-listening node link: conn = %v, err = %v", c, err)
} }
xclose(nl1) xclose(nl1)
...@@ -288,7 +306,7 @@ func TestNodeLink(t *testing.T) { ...@@ -288,7 +306,7 @@ func TestNodeLink(t *testing.T) {
xclose(c) xclose(c)
}) })
pkt, err = c.Recv() pkt, err = c.Recv()
if !(pkt == nil && err == ErrClosedConn) { if !(pkt == nil && xconnError(err) == ErrClosedConn) {
t.Fatalf("Conn.Recv() after close: pkt = %v err = %v", pkt, err) t.Fatalf("Conn.Recv() after close: pkt = %v err = %v", pkt, err)
} }
xwait(wg) xwait(wg)
...@@ -305,7 +323,7 @@ func TestNodeLink(t *testing.T) { ...@@ -305,7 +323,7 @@ func TestNodeLink(t *testing.T) {
}) })
pkt = &PktBuf{[]byte("data")} pkt = &PktBuf{[]byte("data")}
err = c.Send(pkt) err = c.Send(pkt)
if err != ErrClosedConn { if xconnError(err) != ErrClosedConn {
t.Fatalf("Conn.Send() after close: err = %v", err) t.Fatalf("Conn.Send() after close: err = %v", err)
} }
xwait(wg) xwait(wg)
...@@ -316,14 +334,14 @@ func TestNodeLink(t *testing.T) { ...@@ -316,14 +334,14 @@ func TestNodeLink(t *testing.T) {
wg = WorkGroup() wg = WorkGroup()
wg.Gox(func() { wg.Gox(func() {
pkt, err := c11.Recv() pkt, err := c11.Recv()
if !(pkt == nil && err == ErrLinkClosed) { if !(pkt == nil && xconnError(err) == ErrLinkClosed) {
exc.Raisef("Conn.Recv() after NodeLink close: pkt = %v err = %v", pkt, err) exc.Raisef("Conn.Recv() after NodeLink close: pkt = %v err = %v", pkt, err)
} }
}) })
wg.Gox(func() { wg.Gox(func() {
pkt := &PktBuf{[]byte("data")} pkt := &PktBuf{[]byte("data")}
err := c12.Send(pkt) err := c12.Send(pkt)
if err != ErrLinkClosed { if xconnError(err) != ErrLinkClosed {
exc.Raisef("Conn.Send() after NodeLink close: err = %v", err) exc.Raisef("Conn.Send() after NodeLink close: err = %v", err)
} }
}) })
...@@ -345,24 +363,25 @@ func TestNodeLink(t *testing.T) { ...@@ -345,24 +363,25 @@ func TestNodeLink(t *testing.T) {
pkt, err := c21.Recv() pkt, err := c21.Recv()
want1 := io.EOF // if recvPkt wakes up due to peer close want1 := io.EOF // if recvPkt wakes up due to peer close
want2 := io.ErrClosedPipe // if recvPkt wakes up due to sendPkt wakes up first and closes nl1 want2 := io.ErrClosedPipe // if recvPkt wakes up due to sendPkt wakes up first and closes nl1
if !(pkt == nil && (err == want1 || err == want2)) { cerr := xconnError(err)
if !(pkt == nil && (cerr == want1 || cerr == want2)) {
exc.Raisef("Conn.Recv after peer NodeLink shutdown: pkt = %v err = %v", pkt, err) exc.Raisef("Conn.Recv after peer NodeLink shutdown: pkt = %v err = %v", pkt, err)
} }
errRecv = err errRecv = cerr
}) })
wg.Gox(func() { wg.Gox(func() {
pkt := &PktBuf{[]byte("data")} pkt := &PktBuf{[]byte("data")}
err := c22.Send(pkt) err := c22.Send(pkt)
want := io.ErrClosedPipe // always this in both due to peer close or recvPkt waking up and closing nl2 want := io.ErrClosedPipe // always this in both due to peer close or recvPkt waking up and closing nl2
if err != want { if xconnError(err) != want {
exc.Raisef("Conn.Send after peer NodeLink shutdown: %v", err) exc.Raisef("Conn.Send after peer NodeLink shutdown: %v", err)
} }
}) })
wg.Gox(func() { wg.Gox(func() {
conn, err := nl2.Accept() conn, err := nl2.Accept()
if !(conn == nil && err == ErrLinkDown) { if !(conn == nil && xlinkError(err) == ErrLinkDown) {
exc.Raisef("Accept after peer NodeLink shutdown: conn = %v err = %v", conn, err) exc.Raisef("Accept after peer NodeLink shutdown: conn = %v err = %v", conn, err)
} }
}) })
...@@ -374,64 +393,64 @@ func TestNodeLink(t *testing.T) { ...@@ -374,64 +393,64 @@ func TestNodeLink(t *testing.T) {
// NewConn after NodeLink shutdown // NewConn after NodeLink shutdown
c, err = nl2.NewConn() c, err = nl2.NewConn()
if err != ErrLinkDown { if xlinkError(err) != ErrLinkDown {
t.Fatalf("NewConn after NodeLink shutdown: %v", err) t.Fatalf("NewConn after NodeLink shutdown: %v", err)
} }
// Accept after NodeLink shutdown // Accept after NodeLink shutdown
c, err = nl2.Accept() c, err = nl2.Accept()
if err != ErrLinkDown { if xlinkError(err) != ErrLinkDown {
t.Fatalf("Accept after NodeLink shutdown: conn = %v err = %v", c, err) t.Fatalf("Accept after NodeLink shutdown: conn = %v err = %v", c, err)
} }
// Recv/Send on another Conn // Recv/Send on another Conn
pkt, err = c23.Recv() pkt, err = c23.Recv()
if !(pkt == nil && err == errRecv) { if !(pkt == nil && xconnError(err) == errRecv) {
t.Fatalf("Conn.Recv 2 after peer NodeLink shutdown: pkt = %v err = %v", pkt, err) t.Fatalf("Conn.Recv 2 after peer NodeLink shutdown: pkt = %v err = %v", pkt, err)
} }
err = c23.Send(&PktBuf{[]byte("data")}) err = c23.Send(&PktBuf{[]byte("data")})
if err != ErrLinkDown { if xconnError(err) != ErrLinkDown {
t.Fatalf("Conn.Send 2 after peer NodeLink shutdown: %v", err) t.Fatalf("Conn.Send 2 after peer NodeLink shutdown: %v", err)
} }
// Recv/Send error on second call // Recv/Send error on second call
pkt, err = c21.Recv() pkt, err = c21.Recv()
if !(pkt == nil && err == ErrLinkDown) { if !(pkt == nil && xconnError(err) == ErrLinkDown) {
t.Fatalf("Conn.Recv after NodeLink shutdown: pkt = %v err = %v", pkt, err) t.Fatalf("Conn.Recv after NodeLink shutdown: pkt = %v err = %v", pkt, err)
} }
err = c22.Send(&PktBuf{[]byte("data")}) err = c22.Send(&PktBuf{[]byte("data")})
if err != ErrLinkDown { if xconnError(err) != ErrLinkDown {
t.Fatalf("Conn.Send after NodeLink shutdown: %v", err) t.Fatalf("Conn.Send after NodeLink shutdown: %v", err)
} }
xclose(c23) xclose(c23)
// Recv/Send on closed Conn but not closed NodeLink // Recv/Send on closed Conn but not closed NodeLink
pkt, err = c23.Recv() pkt, err = c23.Recv()
if !(pkt == nil && err == ErrClosedConn) { if !(pkt == nil && xconnError(err) == ErrClosedConn) {
t.Fatalf("Conn.Recv after close but only stopped NodeLink: pkt = %v err = %v", pkt, err) t.Fatalf("Conn.Recv after close but only stopped NodeLink: pkt = %v err = %v", pkt, err)
} }
err = c23.Send(&PktBuf{[]byte("data")}) err = c23.Send(&PktBuf{[]byte("data")})
if err != ErrClosedConn { if xconnError(err) != ErrClosedConn {
t.Fatalf("Conn.Send after close but only stopped NodeLink: %v", err) t.Fatalf("Conn.Send after close but only stopped NodeLink: %v", err)
} }
xclose(nl2) xclose(nl2)
// Recv/Send NewConn/Accept error after NodeLink close // Recv/Send NewConn/Accept error after NodeLink close
pkt, err = c21.Recv() pkt, err = c21.Recv()
if !(pkt == nil && err == ErrLinkClosed) { if !(pkt == nil && xconnError(err) == ErrLinkClosed) {
t.Fatalf("Conn.Recv after NodeLink shutdown: pkt = %v err = %v", pkt, err) t.Fatalf("Conn.Recv after NodeLink shutdown: pkt = %v err = %v", pkt, err)
} }
err = c22.Send(&PktBuf{[]byte("data")}) err = c22.Send(&PktBuf{[]byte("data")})
if err != ErrLinkClosed { if xconnError(err) != ErrLinkClosed {
t.Fatalf("Conn.Send after NodeLink shutdown: %v", err) t.Fatalf("Conn.Send after NodeLink shutdown: %v", err)
} }
c, err = nl2.NewConn() c, err = nl2.NewConn()
if err != ErrLinkClosed { if xlinkError(err) != ErrLinkClosed {
t.Fatalf("NewConn after NodeLink close: %v", err) t.Fatalf("NewConn after NodeLink close: %v", err)
} }
c, err = nl2.Accept() c, err = nl2.Accept()
if err != ErrLinkClosed { if xlinkError(err) != ErrLinkClosed {
t.Fatalf("Accept after NodeLink close: %v", err) t.Fatalf("Accept after NodeLink close: %v", err)
} }
...@@ -440,11 +459,11 @@ func TestNodeLink(t *testing.T) { ...@@ -440,11 +459,11 @@ func TestNodeLink(t *testing.T) {
xclose(c22) xclose(c22)
// Recv/Send error after Close & NodeLink shutdown // Recv/Send error after Close & NodeLink shutdown
pkt, err = c21.Recv() pkt, err = c21.Recv()
if !(pkt == nil && err == ErrClosedConn) { if !(pkt == nil && xconnError(err) == ErrClosedConn) {
t.Fatalf("Conn.Recv after close and NodeLink close: pkt = %v err = %v", pkt, err) t.Fatalf("Conn.Recv after close and NodeLink close: pkt = %v err = %v", pkt, err)
} }
err = c22.Send(&PktBuf{[]byte("data")}) err = c22.Send(&PktBuf{[]byte("data")})
if err != ErrClosedConn { if xconnError(err) != ErrClosedConn {
t.Fatalf("Conn.Send after close and NodeLink close: %v", err) t.Fatalf("Conn.Send after close and NodeLink close: %v", err)
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment