Commit 599d17bc authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent dd3bb8b4
...@@ -18,6 +18,7 @@ import ( ...@@ -18,6 +18,7 @@ import (
"errors" "errors"
"io" "io"
"net" "net"
"sync"
"unsafe" "unsafe"
) )
...@@ -45,16 +46,21 @@ import ( ...@@ -45,16 +46,21 @@ import (
type NodeLink struct { type NodeLink struct {
peerLink net.Conn // raw conn to peer peerLink net.Conn // raw conn to peer
// TODO locking connMu sync.Mutex // TODO -> RW ?
connTab map[uint32]*Conn // msgid -> connection associated with msgid connTab map[uint32]*Conn // connid -> connection associated with connid
nextConnId uint32 // next connId to use for Conn initiated by us
handleNewConn func(conn *Conn) // handler for new connections XXX -> ConnHandler (a-la Handler in net/http) ? handleNewConn func(conn *Conn) // handler for new connections XXX -> ConnHandler (a-la Handler in net/http) ?
// TODO peerLink .LocalAddr() vs .RemoteAddr() -> msgid even/odd ? (XXX vs NAT ?)
txreq chan txReq // tx requests from Conns go here txreq chan txReq // tx requests from Conns go here
// (received pkt go dispatched to connTab[connid].rxq)
closed chan struct{} closed chan struct{}
} }
// Conn is a connection established over NodeLink // Conn is a connection established over NodeLink
// //
// Data can be sent and received over it. // Data can be sent and received over it.
...@@ -63,12 +69,14 @@ type NodeLink struct { ...@@ -63,12 +69,14 @@ type NodeLink struct {
// It is safe to use Conn from multiple goroutines simultaneously. // It is safe to use Conn from multiple goroutines simultaneously.
type Conn struct { type Conn struct {
nodeLink *NodeLink nodeLink *NodeLink
connId uint32
rxq chan *PktBuf rxq chan *PktBuf
txerr chan error // transmit errors go back here txerr chan error // transmit errors go back here
closed chan struct{} closed chan struct{}
} }
// Buffer with packet data // Buffer with packet data
// XXX move me out of here
type PktBuf struct { type PktBuf struct {
Data []byte // whole packet data including all headers XXX -> Buf ? Data []byte // whole packet data including all headers XXX -> Buf ?
} }
...@@ -85,13 +93,36 @@ func (pkt *PktBuf) Payload() []byte { ...@@ -85,13 +93,36 @@ func (pkt *PktBuf) Payload() []byte {
} }
type ConnRole int
const (
ConnServer ConnRole = iota // connection created as server
ConnClient // connection created as client
)
// Make a new NodeLink from already established net.Conn // Make a new NodeLink from already established net.Conn
func NewNodeLink(c net.Conn) *NodeLink { //
// role specifies how to treat conn - either as server or client one.
// The differrence in between client and server roles are in connid % 2 XXX text
//
// Usually server role should be used for connections created via
// net.Listen/net.Accept and client role for connections created via net.Dial.
func NewNodeLink(conn net.Conn, role ConnRole) *NodeLink {
var nextConnId uint32
switch role {
case ConnServer:
nextConnId = 0
case ConnClient:
nextConnId = 1
default:
panic("invalid conn role")
}
nl := NodeLink{ nl := NodeLink{
peerLink: c, peerLink: conn,
connTab: map[uint32]*Conn{}, connTab: map[uint32]*Conn{},
txreq: make(chan txReq), nextConnId: nextConnId,
closed: make(chan struct{}), txreq: make(chan txReq),
closed: make(chan struct{}),
} }
// TODO go nl.serveRecv() // TODO go nl.serveRecv()
// TODO go nl.serveSend() // TODO go nl.serveSend()
...@@ -103,10 +134,18 @@ func NewNodeLink(c net.Conn) *NodeLink { ...@@ -103,10 +134,18 @@ func NewNodeLink(c net.Conn) *NodeLink {
func (nl *NodeLink) Close() error { func (nl *NodeLink) Close() error {
close(nl.closed) close(nl.closed)
err := nl.peerLink.Close() err := nl.peerLink.Close()
// TODO close active Conns // close active Conns
nl.connMu.Lock()
defer nl.connMu.Unlock()
for _, conn := range nl.connTab {
// FIXME it also wants to lock conntab -> conn.close() ?
println("conn", conn.connId, " -> closing ...")
conn.Close() // XXX err
}
nl.connTab = nil // XXX ok? vs panic on NewConn after close ?
// XXX wait for serve{Send,Recv} to complete // XXX wait for serve{Send,Recv} to complete
nl.wg.Wait() //nl.wg.Wait()
return err
} }
// send raw packet to peer // send raw packet to peer
...@@ -171,8 +210,11 @@ func (nl *NodeLink) NewConn() *Conn { ...@@ -171,8 +210,11 @@ func (nl *NodeLink) NewConn() *Conn {
txerr: make(chan error), txerr: make(chan error),
closed: make(chan struct{}), closed: make(chan struct{}),
} }
// XXX locking nl.connMu.Lock()
nl.connTab[0] = c // FIXME 0 -> msgid; XXX also check not a duplicate defer nl.connMu.Unlock()
c.connId = nl.nextConnId
nl.nextConnId += 2
nl.connTab[c.connId] = c
return c return c
} }
...@@ -193,10 +235,10 @@ func (nl *NodeLink) serveRecv() { ...@@ -193,10 +235,10 @@ func (nl *NodeLink) serveRecv() {
panic(err) // XXX err panic(err) // XXX err
} }
// if we don't yet have connection established for pkt.MsgId - // if we don't yet have connection established for pkt.ConnId -
// spawn connection-serving goroutine // spawn connection-serving goroutine
// XXX connTab locking // XXX connTab locking
conn := nl.connTab[ntoh32(pkt.Header().MsgId)] conn := nl.connTab[ntoh32(pkt.Header().ConnId)]
if conn == nil { if conn == nil {
if nl.handleNewConn == nil { if nl.handleNewConn == nil {
// we are not accepting incoming connections - ignore packet // we are not accepting incoming connections - ignore packet
...@@ -231,7 +273,6 @@ func (nl *NodeLink) serveSend() { ...@@ -231,7 +273,6 @@ func (nl *NodeLink) serveSend() {
break break
case txreq := <-nl.txreq: case txreq := <-nl.txreq:
pkt.Header().MsgId = hton32(0) // TODO next msgid, or using same msgid as received
err := nl.sendPkt(txreq.pkt) err := nl.sendPkt(txreq.pkt)
if err != nil { if err != nil {
// XXX also close whole nodeLink since tx framing now can be broken? // XXX also close whole nodeLink since tx framing now can be broken?
...@@ -254,6 +295,10 @@ var ErrClosedConn = errors.New("read/write on closed connection") ...@@ -254,6 +295,10 @@ var ErrClosedConn = errors.New("read/write on closed connection")
// Send packet via connection // Send packet via connection
func (c *Conn) Send(pkt *PktBuf) error { func (c *Conn) Send(pkt *PktBuf) error {
// set pkt connid associated with this connection
// TODO for new Conn - it should be set by serveSend ?
pkt.Header().ConnId = hton32(c.connId)
select { select {
case <-c.closed: case <-c.closed:
return ErrClosedConn return ErrClosedConn
...@@ -291,6 +336,11 @@ func (c *Conn) Close() error { // XXX do we need error here? ...@@ -291,6 +336,11 @@ func (c *Conn) Close() error { // XXX do we need error here?
// TODO
//func Dial(ctx context.Context, network, address string) (*NodeLink, error) // + tls.Config
//func Listen(network, laddr string) (net.Listener, error) // + tls.Config
// ln.Accept -> will return net.Conn wrapped in NodeLink
// ---------------------------------------- // ----------------------------------------
......
...@@ -88,10 +88,10 @@ func xwait(w interface { Wait() error }) { ...@@ -88,10 +88,10 @@ func xwait(w interface { Wait() error }) {
} }
// Prepare PktBuf with content // Prepare PktBuf with content
func mkpkt(msgid uint32, msgcode uint16, payload []byte) *PktBuf { func mkpkt(connid uint32, msgcode uint16, payload []byte) *PktBuf {
pkt := &PktBuf{make([]byte, PktHeadLen + len(payload))} pkt := &PktBuf{make([]byte, PktHeadLen + len(payload))}
h := pkt.Header() h := pkt.Header()
h.MsgId = hton32(msgid) h.ConnId = hton32(connid)
h.MsgCode = hton16(msgcode) h.MsgCode = hton16(msgcode)
h.Len = hton32(PktHeadLen + 4) h.Len = hton32(PktHeadLen + 4)
copy(pkt.Payload(), payload) copy(pkt.Payload(), payload)
...@@ -99,12 +99,12 @@ func mkpkt(msgid uint32, msgcode uint16, payload []byte) *PktBuf { ...@@ -99,12 +99,12 @@ func mkpkt(msgid uint32, msgcode uint16, payload []byte) *PktBuf {
} }
// Verify PktBuf is as expected // Verify PktBuf is as expected
func xverifyPkt(pkt *PktBuf, msgid uint32, msgcode uint16, payload []byte) { func xverifyPkt(pkt *PktBuf, connid uint32, msgcode uint16, payload []byte) {
errv := xerr.Errorv{} errv := xerr.Errorv{}
h := pkt.Header() h := pkt.Header()
// TODO include caller location // TODO include caller location
if ntoh32(h.MsgId) != msgid { if ntoh32(h.ConnId) != connid {
errv.Appendf("header: unexpected msgid %v (want %v)", ntoh32(h.MsgId), msgid) errv.Appendf("header: unexpected connid %v (want %v)", ntoh32(h.ConnId), connid)
} }
if ntoh16(h.MsgCode) != msgcode { if ntoh16(h.MsgCode) != msgcode {
errv.Appendf("header: unexpected msgcode %v (want %v)", ntoh16(h.MsgCode), msgcode) errv.Appendf("header: unexpected msgcode %v (want %v)", ntoh16(h.MsgCode), msgcode)
...@@ -129,8 +129,8 @@ func tdelay() { ...@@ -129,8 +129,8 @@ func tdelay() {
// create NodeLinks connected via net.Pipe // create NodeLinks connected via net.Pipe
func nodeLinkPipe() (nl1, nl2 *NodeLink) { func nodeLinkPipe() (nl1, nl2 *NodeLink) {
node1, node2 := net.Pipe() node1, node2 := net.Pipe()
nl1 = NewNodeLink(node1) nl1 = NewNodeLink(node1, ConnClient)
nl2 = NewNodeLink(node2) nl2 = NewNodeLink(node2, ConnServer)
return nl1, nl2 return nl1, nl2
} }
...@@ -138,6 +138,7 @@ func nodeLinkPipe() (nl1, nl2 *NodeLink) { ...@@ -138,6 +138,7 @@ func nodeLinkPipe() (nl1, nl2 *NodeLink) {
func TestNodeLink(t *testing.T) { func TestNodeLink(t *testing.T) {
// TODO catch exception -> add proper location from it -> t.Fatal (see git-backup) // TODO catch exception -> add proper location from it -> t.Fatal (see git-backup)
println("111")
nl1, nl2 := nodeLinkPipe() nl1, nl2 := nodeLinkPipe()
// Close vs recvPkt // Close vs recvPkt
...@@ -197,6 +198,7 @@ func TestNodeLink(t *testing.T) { ...@@ -197,6 +198,7 @@ func TestNodeLink(t *testing.T) {
// test channels on top of nodelink // test channels on top of nodelink
println("222")
nl1, nl2 = nodeLinkPipe() nl1, nl2 = nodeLinkPipe()
// Close vs Recv // Close vs Recv
...@@ -227,6 +229,7 @@ func TestNodeLink(t *testing.T) { ...@@ -227,6 +229,7 @@ func TestNodeLink(t *testing.T) {
xwait(wg) xwait(wg)
// NodeLink.Close vs Conn.Send/Recv // NodeLink.Close vs Conn.Send/Recv
println("333")
c11 := nl1.NewConn() c11 := nl1.NewConn()
c12 := nl1.NewConn() c12 := nl1.NewConn()
wg = WorkGroup() wg = WorkGroup()
...@@ -248,6 +251,7 @@ func TestNodeLink(t *testing.T) { ...@@ -248,6 +251,7 @@ func TestNodeLink(t *testing.T) {
xwait(wg) xwait(wg)
xclose(nl2) // for completeness xclose(nl2) // for completeness
println("444")
/* /*
......
...@@ -127,7 +127,7 @@ type RowInfo struct { ...@@ -127,7 +127,7 @@ type RowInfo struct {
// XXX link request <-> answer ? // XXX link request <-> answer ?
// TODO ensure len(encoded packet header) == 10 // TODO ensure len(encoded packet header) == 10
type PktHead struct { type PktHead struct {
MsgId be32 ConnId be32 // NOTE is .msgid in py
MsgCode be16 MsgCode be16
Len be32 // whole packet length (including header) Len be32 // whole packet length (including header)
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment