Commit bebc5e77 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 35d32ba6
......@@ -821,10 +821,17 @@ func (nl *NodeLink) serveRecv() {
// NOTE if nl.peerLink was just closed by tx->shutdown we'll get ErrNetClosing
pkt, err := nl.recvPkt()
//fmt.Printf("\n%p recvPkt -> %v, %v\n", nl, pkt, err)
if err != nil {
// pkt.ConnId -> Conn
var connId uint32
if err == nil {
connId, _, _, err = pktDecodeHead(nl.enc, pkt)
}
// on IO error framing over peerLink becomes broken
// so we shut down node link and all connections over it.
// Same if we cannot decode packet header.
if err != nil {
nl.errMu.Lock()
nl.errRecv = err
nl.errMu.Unlock()
......@@ -833,8 +840,7 @@ func (nl *NodeLink) serveRecv() {
return
}
// pkt.ConnId -> Conn
connId := packed.Ntoh32(pkt.Header().ConnId)
accept := false
nl.connMu.Lock()
......@@ -1155,11 +1161,17 @@ func (nl *NodeLink) serveSend() {
// sendPktDirect sends raw packet with appropriate connection ID directly via link.
func (c *Conn) sendPktDirect(pkt *pktBuf) error {
// set pkt connId associated with this connection
pkt.Header().ConnId = packed.Hton32(c.connId)
// connId must be set to one associated with this connection
connID, _, _, err := pktDecodeHead(c.link.enc, pkt)
if err != nil {
panic(fmt.Sprintf("Conn.sendPkt: bad packet: %s", err))
}
if connID != c.connId {
panic("Conn.sendPkt: connId wrong")
}
// NOTE if n.peerLink was just closed by rx->shutdown we'll get ErrNetClosing
err := c.link.sendPkt(pkt)
err = c.link.sendPkt(pkt)
//fmt.Printf("sendPkt -> %v\n", err)
// on IO error framing over peerLink becomes broken
......@@ -1555,13 +1567,15 @@ func (c *Conn) Expect(msgv ...proto.Msg) (which int, err error) {
}
defer pkt.Free()
// XXX encN-specific
pkth := pkt.Header()
msgCode := packed.Ntoh16(pkth.MsgCode)
// decode packet
_, msgCode, payload, err := pktDecodeHead(c.link.enc, pkt)
if err != nil {
return -1, err
}
for i, msg := range msgv {
if proto.MsgCode(msg) == msgCode {
_, err := c.link.enc.NEOMsgDecode(msg, pkt.Payload())
_, err := c.link.enc.NEOMsgDecode(msg, payload)
if err != nil {
return -1, c.err("decode", err)
}
......
......@@ -640,9 +640,9 @@ func _TestNodeLink(t *T) {
gox(wg, func(_ context.Context) {
pkt := xrecvPkt(c)
// XXX encN-specific
n := packed.Ntoh16(pkt.Header().MsgCode)
x := replyOrder[n]
_, msgCode, _, err := pktDecodeHead(t.enc, pkt)
exc.Raiseif(err)
x := replyOrder[msgCode]
// wait before it is our turn & echo pkt back
<-x.start
......
......@@ -39,7 +39,7 @@ type pktBuf struct {
}
// HeaderN returns pointer to packet header in 'N'-encoding.
func (pkt *pktBuf) Header() *proto.PktHeader { return pkt.HeaderN() } // XXX kill
//func (pkt *pktBuf) Header() *proto.PktHeader { return pkt.HeaderN() } // XXX kill
func (pkt *pktBuf) HeaderN() *proto.PktHeader {
// NOTE no need to check len(.data) < PktHeader:
// .data is always allocated with cap >= PktHeaderLen.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment