Commit 4732ff17 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 49f0c8b9
...@@ -180,7 +180,7 @@ func (nl *NodeLink) Close() error { ...@@ -180,7 +180,7 @@ func (nl *NodeLink) Close() error {
// sendPkt sends raw packet to peer // sendPkt sends raw packet to peer
// tx error, if any, is returned as is and is analyzed in serveSend // tx error, if any, is returned as is and is analyzed in serveSend
func (nl *NodeLink) sendPkt(pkt *PktBuf) error { func (nl *NodeLink) sendPkt(pkt *PktBuf) error {
if true { if false {
// XXX -> log // XXX -> log
fmt.Printf("%v > %v: %v\n", nl.peerLink.LocalAddr(), nl.peerLink.RemoteAddr(), pkt) fmt.Printf("%v > %v: %v\n", nl.peerLink.LocalAddr(), nl.peerLink.RemoteAddr(), pkt)
//defer fmt.Printf("\t-> sendPkt err: %v\n", err) //defer fmt.Printf("\t-> sendPkt err: %v\n", err)
...@@ -237,7 +237,7 @@ func (nl *NodeLink) recvPkt() (*PktBuf, error) { ...@@ -237,7 +237,7 @@ func (nl *NodeLink) recvPkt() (*PktBuf, error) {
} }
} }
if true { if false {
// XXX -> log // XXX -> log
fmt.Printf("%v < %v: %v\n", nl.peerLink.LocalAddr(), nl.peerLink.RemoteAddr(), pkt) fmt.Printf("%v < %v: %v\n", nl.peerLink.LocalAddr(), nl.peerLink.RemoteAddr(), pkt)
} }
...@@ -376,6 +376,7 @@ func (nl *NodeLink) serveSend() { ...@@ -376,6 +376,7 @@ func (nl *NodeLink) serveSend() {
case txreq := <-nl.txq: case txreq := <-nl.txq:
err = nl.sendPkt(txreq.pkt) err = nl.sendPkt(txreq.pkt)
fmt.Printf("sendPkt -> %v\n", err)
if err != nil { if err != nil {
// on IO error framing over peerLink becomes broken // on IO error framing over peerLink becomes broken
...@@ -391,6 +392,15 @@ func (nl *NodeLink) serveSend() { ...@@ -391,6 +392,15 @@ func (nl *NodeLink) serveSend() {
txreq.errch <- err // XXX recheck wakeup logic for err case txreq.errch <- err // XXX recheck wakeup logic for err case
// XXX we need to first wait till _both_ serveRecv & serveSend complete
// and only then close all Conns. Reason: e.g. when remote shutdowns
// both sendPkt and recvPkt get error. If recvPkt was
// first serveRecv will be first to mark connections as
// closed and even though sendPkt will return proper IO
// error it won't be delivered as Conn.Send waiting for
// it already waked up on c.closed without seeing
// txreq.errch being ready.
if err != nil { if err != nil {
// XXX use errMu to lock vvv if needed // XXX use errMu to lock vvv if needed
// nl.sendErr = err // nl.sendErr = err
...@@ -440,9 +450,17 @@ func (c *Conn) Send(pkt *PktBuf) error { ...@@ -440,9 +450,17 @@ func (c *Conn) Send(pkt *PktBuf) error {
// NOTE after we return straight here serveSend won't be later // NOTE after we return straight here serveSend won't be later
// blocked on c.txerr<- because that backchannel is a non-blocking one. // blocked on c.txerr<- because that backchannel is a non-blocking one.
case <-c.closed: case <-c.closed:
// XXX also poll c.txerr
return ErrClosedConn // also poll c.txerr here because: when there is TX error,
// return errClosedConn(c.nodeLink.sendErr) // XXX locking ? // serveSend sends to c.txerr _and_ closes c.closed .
// We still want to return actual transmission error to caller.
select {
case err = <-c.txerr:
return err
default:
return ErrClosedConn
// return errClosedConn(c.nodeLink.sendErr) // XXX locking ?
}
case err = <-c.txerr: case err = <-c.txerr:
//fmt.Printf("%v <- c.txerr\n", err) //fmt.Printf("%v <- c.txerr\n", err)
...@@ -450,7 +468,7 @@ func (c *Conn) Send(pkt *PktBuf) error { ...@@ -450,7 +468,7 @@ func (c *Conn) Send(pkt *PktBuf) error {
} }
} }
return err // return err
} }
// Receive packet from connection // Receive packet from connection
......
...@@ -158,7 +158,7 @@ func nodeLinkPipe() (nl1, nl2 *NodeLink) { ...@@ -158,7 +158,7 @@ func nodeLinkPipe() (nl1, nl2 *NodeLink) {
func TestNodeLink(t *testing.T) { func TestNodeLink(t *testing.T) {
// TODO catch exception -> add proper location from it -> t.Fatal (see git-backup) // TODO catch exception -> add proper location from it -> t.Fatal (see git-backup)
///* /*
// Close vs recvPkt // Close vs recvPkt
nl1, nl2 := _nodeLinkPipe(linkNoRecvSend, linkNoRecvSend) nl1, nl2 := _nodeLinkPipe(linkNoRecvSend, linkNoRecvSend)
wg := WorkGroup() wg := WorkGroup()
...@@ -323,13 +323,13 @@ func TestNodeLink(t *testing.T) { ...@@ -323,13 +323,13 @@ func TestNodeLink(t *testing.T) {
xclose(c11) xclose(c11)
xclose(c12) xclose(c12)
xclose(nl2) xclose(nl2)
//*/ */
// NodeLink.Close vs Conn.Send/Recv on another side TODO // NodeLink.Close vs Conn.Send/Recv on another side TODO
nl1, nl2 = _nodeLinkPipe(0, linkNoRecvSend) nl1, nl2 := _nodeLinkPipe(0, linkNoRecvSend)
c11 = nl1.NewConn() c11 := nl1.NewConn()
c12 = nl1.NewConn() c12 := nl1.NewConn()
wg = WorkGroup() wg := WorkGroup()
wg.Gox(func() { wg.Gox(func() {
println(">>> RECV START") println(">>> RECV START")
pkt, err := c11.Recv() pkt, err := c11.Recv()
...@@ -344,25 +344,23 @@ func TestNodeLink(t *testing.T) { ...@@ -344,25 +344,23 @@ func TestNodeLink(t *testing.T) {
println(">>> SEND START") println(">>> SEND START")
err := c12.Send(pkt) err := c12.Send(pkt)
println(">>> send wakeup") println(">>> send wakeup")
if err != io.ErrClosedPipe { // XXX we are here but what the error should be? if want := io.ErrClosedPipe; err != want {// XXX we are here but what the error should be?
exc.Raisef("Conn.Send() after peer NodeLink shutdown: err = %v", err) exc.Raisef("Conn.Send() after peer NodeLink shutdown: unexpected err\nhave: %v\nwant: %v", err, want)
} }
println(">>> SEND OK") println(">>> SEND OK")
}) })
tdelay() tdelay()
println("NL2.Close")
xclose(nl2) xclose(nl2)
println("111")
xwait(wg) xwait(wg)
println("222") // TODO check Recv/Send error on second call
xclose(c11) xclose(c11)
println("aaa")
xclose(c12) xclose(c12)
println("bbb") // TODO check Recv/Send error after Close
xclose(nl1) xclose(nl1)
println("333")
///* /*
// Conn accept + exchange // Conn accept + exchange
nl1, nl2 = nodeLinkPipe() nl1, nl2 = nodeLinkPipe()
wg = WorkGroup() wg = WorkGroup()
...@@ -448,5 +446,5 @@ func TestNodeLink(t *testing.T) { ...@@ -448,5 +446,5 @@ func TestNodeLink(t *testing.T) {
xclose(c2) xclose(c2)
xclose(nl1) xclose(nl1)
xclose(nl2) xclose(nl2)
//*/ */
} }
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment