Commit dc6474fb authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 6c7ce736
...@@ -57,8 +57,16 @@ type NodeLink struct { ...@@ -57,8 +57,16 @@ type NodeLink struct {
acceptq chan *Conn // queue of incoming connections for Accept acceptq chan *Conn // queue of incoming connections for Accept
// = nil if NodeLink is not accepting connections // = nil if NodeLink is not accepting connections
txreq chan txReq // tx requests from Conns go via here txq chan txReq // tx requests from Conns go via here
closed chan struct{}
// errMu sync.Mutex -> use connMu
// sendErr error // error got from sendPkt, if any
recvErr error // error got from recvPkt, if any
// once because: NodeLink has to be explicitly closed by user; it can also
// be "closed" by IO errors on peerLink
closeOnce sync.Once
closed chan struct{} // XXX text
} }
// Conn is a connection established over NodeLink // Conn is a connection established over NodeLink
...@@ -93,7 +101,7 @@ const ( ...@@ -93,7 +101,7 @@ const (
// NewNodeLink makes a new NodeLink from already established net.Conn // NewNodeLink makes a new NodeLink from already established net.Conn
// //
// Role specifies how to treat our role on the link - either as client or // Role specifies how to treat our role on the link - either as client or
// server one. The difference in between client and server roles are in: // server. The difference in between client and server roles are in:
// //
// 1. how connection ids are allocated for connections initiated at our side: // 1. how connection ids are allocated for connections initiated at our side:
// there is no conflict in identifiers if one side always allocates them as // there is no conflict in identifiers if one side always allocates them as
...@@ -122,7 +130,7 @@ func NewNodeLink(conn net.Conn, role LinkRole) *NodeLink { ...@@ -122,7 +130,7 @@ func NewNodeLink(conn net.Conn, role LinkRole) *NodeLink {
connTab: map[uint32]*Conn{}, connTab: map[uint32]*Conn{},
nextConnId: nextConnId, nextConnId: nextConnId,
acceptq: acceptq, acceptq: acceptq,
txreq: make(chan txReq), txq: make(chan txReq),
closed: make(chan struct{}), closed: make(chan struct{}),
} }
if role&linkNoRecvSend == 0 { if role&linkNoRecvSend == 0 {
...@@ -133,20 +141,29 @@ func NewNodeLink(conn net.Conn, role LinkRole) *NodeLink { ...@@ -133,20 +141,29 @@ func NewNodeLink(conn net.Conn, role LinkRole) *NodeLink {
return nl return nl
} }
// worker for Close & friends. Must be called with connMu held.
// marks all active Conns and NodeLink itself as closed
func (nl *NodeLink) close() {
nl.closeOnce.Do(func() {
for _, conn := range nl.connTab {
conn.close() // XXX explicitly pass error here ?
}
nl.connTab = nil // clear + mark closed
close(nl.closed)
})
}
// Close closes node-node link. // Close closes node-node link.
// IO on connections established over it is automatically interrupted with an error. // IO on connections established over it is automatically interrupted with an error.
func (nl *NodeLink) Close() error { func (nl *NodeLink) Close() error {
// mark all active Conns as closed
nl.connMu.Lock() nl.connMu.Lock()
defer nl.connMu.Unlock() defer nl.connMu.Unlock()
for _, conn := range nl.connTab {
conn.close() nl.close()
}
nl.connTab = nil // clear + mark closed
// close actual link to peer // close actual link to peer
// this will wakeup serve{Send,Recv} // this will wakeup serve{Send,Recv}
close(nl.closed)
err := nl.peerLink.Close() err := nl.peerLink.Close()
// wait for serve{Send,Recv} to complete // wait for serve{Send,Recv} to complete
...@@ -161,7 +178,11 @@ func (nl *NodeLink) sendPkt(pkt *PktBuf) error { ...@@ -161,7 +178,11 @@ func (nl *NodeLink) sendPkt(pkt *PktBuf) error {
// XXX -> log // XXX -> log
fmt.Printf("%v > %v: %v\n", nl.peerLink.LocalAddr(), nl.peerLink.RemoteAddr(), pkt) fmt.Printf("%v > %v: %v\n", nl.peerLink.LocalAddr(), nl.peerLink.RemoteAddr(), pkt)
} }
// XXX if nl is closed peerLink will return "io on closed xxx" but
// maybe better to check explicitly and return ErrClosedLink
_, err := nl.peerLink.Write(pkt.Data) // FIXME write Data in full _, err := nl.peerLink.Write(pkt.Data) // FIXME write Data in full
//defer fmt.Printf("\t-> sendPkt err: %v\n", err)
if err != nil { if err != nil {
// XXX do we need to retry if err is temporary? // XXX do we need to retry if err is temporary?
// TODO data could be written partially and thus the message stream is now broken // TODO data could be written partially and thus the message stream is now broken
...@@ -172,6 +193,9 @@ func (nl *NodeLink) sendPkt(pkt *PktBuf) error { ...@@ -172,6 +193,9 @@ func (nl *NodeLink) sendPkt(pkt *PktBuf) error {
// recvPkt receives raw packet from peer // recvPkt receives raw packet from peer
func (nl *NodeLink) recvPkt() (*PktBuf, error) { func (nl *NodeLink) recvPkt() (*PktBuf, error) {
// XXX if nl is closed peerLink will return "io on closed xxx" but
// maybe better to check explicitly and return ErrClosedLink
// TODO organize rx buffers management (freelist etc) // TODO organize rx buffers management (freelist etc)
// TODO cleanup lots of ntoh32(...) // TODO cleanup lots of ntoh32(...)
// XXX do we need to retry if err is temporary? // XXX do we need to retry if err is temporary?
...@@ -277,28 +301,42 @@ func (nl *NodeLink) serveRecv() { ...@@ -277,28 +301,42 @@ func (nl *NodeLink) serveRecv() {
for { for {
// receive 1 packet // receive 1 packet
pkt, err := nl.recvPkt() pkt, err := nl.recvPkt()
fmt.Printf("recvPkt -> %v, %v\n", pkt, err)
if err != nil { if err != nil {
// this might be just error on close - simply stop in such case // on IO error framing over peerLink becomes broken
// so we are marking node link and all connections as closed
println("\tzzz")
nl.connMu.Lock()
println("\tzzz 2")
defer nl.connMu.Unlock()
println("\tqqq")
select { select {
case <-nl.closed: case <-nl.closed:
// XXX check err actually what is on interrupt? // error due to closing NodeLink
return nl.recvErr = ErrLinkClosed
default:
nl.recvErr = err
} }
panic(err) // XXX err -> if !temporary -> nl.closeWithError(err)
println("\trrr")
// wake-up all conns & mark node link as closed
nl.close()
println("\tsss")
return
} }
// pkt.ConnId -> Conn // pkt.ConnId -> Conn
connId := ntoh32(pkt.Header().ConnId) connId := ntoh32(pkt.Header().ConnId)
accept := false
nl.connMu.Lock() nl.connMu.Lock()
conn := nl.connTab[connId] conn := nl.connTab[connId]
if conn == nil { if conn == nil {
if nl.acceptq != nil { if nl.acceptq != nil {
// we are accepting new incoming connection // we are accepting new incoming connection
conn = nl.newConn(connId) conn = nl.newConn(connId)
// XXX what if Accept exited because of just recently close(nl.closed)? accept = true
// -> check nl.closed here too ?
nl.acceptq <- conn
} }
} }
nl.connMu.Unlock() nl.connMu.Unlock()
...@@ -309,6 +347,12 @@ func (nl *NodeLink) serveRecv() { ...@@ -309,6 +347,12 @@ func (nl *NodeLink) serveRecv() {
continue continue
} }
if accept {
// XXX what if Accept exited because of just recently close(nl.closed)?
// -> check nl.closed here too ?
nl.acceptq <- conn
}
// route packet to serving goroutine handler // route packet to serving goroutine handler
// XXX what if Conn.Recv exited because of just recently close(nl.closed) ? // XXX what if Conn.Recv exited because of just recently close(nl.closed) ?
// -> check nl.closed here too ? // -> check nl.closed here too ?
...@@ -317,7 +361,7 @@ func (nl *NodeLink) serveRecv() { ...@@ -317,7 +361,7 @@ func (nl *NodeLink) serveRecv() {
} }
// request to transmit a packet. Result error goes back to errch // txReq is request to transmit a packet. Result error goes back to errch
type txReq struct { type txReq struct {
pkt *PktBuf pkt *PktBuf
errch chan error errch chan error
...@@ -327,27 +371,54 @@ type txReq struct { ...@@ -327,27 +371,54 @@ type txReq struct {
// serially executes them over associated node link. // serially executes them over associated node link.
func (nl *NodeLink) serveSend() { func (nl *NodeLink) serveSend() {
defer nl.serveWg.Done() defer nl.serveWg.Done()
runloop: var err error
for { for {
select { select {
case <-nl.closed: case <-nl.closed:
break runloop return
case txreq := <-nl.txq:
err = nl.sendPkt(txreq.pkt)
case txreq := <-nl.txreq:
err := nl.sendPkt(txreq.pkt)
if err != nil { if err != nil {
// XXX also close whole nodeLink since tx framing now can be broken? // on IO error framing over peerLink becomes broken
// -> not here - this logic should be in sendPkt // so we are marking node link and all connections as closed
select {
case <-nl.closed:
// error due to closing NodeLink
err = ErrLinkClosed
default:
} }
}
txreq.errch <- err txreq.errch <- err
if err != nil {
nl.connMu.Lock()
defer nl.connMu.Unlock()
// nl.sendErr = err
// wake-up all conns & mark node link as closed
nl.close()
}
} }
} }
} }
// ErrClosedConn is the error indicated for read/write operations on closed Conn // ErrClosedConn is the error indicated for read/write operations on closed Conn
var ErrClosedConn = errors.New("read/write on closed connection") var ErrClosedConn = errors.New("read/write on closed connection")
func errClosedConn(err error) error {
if err != nil {
return err
}
return ErrClosedConn
}
// Send packet via connection // Send packet via connection
func (c *Conn) Send(pkt *PktBuf) error { func (c *Conn) Send(pkt *PktBuf) error {
// set pkt connId associated with this connection // set pkt connId associated with this connection
...@@ -357,8 +428,9 @@ func (c *Conn) Send(pkt *PktBuf) error { ...@@ -357,8 +428,9 @@ func (c *Conn) Send(pkt *PktBuf) error {
select { select {
case <-c.closed: case <-c.closed:
return ErrClosedConn return ErrClosedConn
// return errClosedConn(c.nodeLink.sendErr) // XXX locking ?
case c.nodeLink.txreq <- txReq{pkt, c.txerr}: case c.nodeLink.txq <- txReq{pkt, c.txerr}:
select { select {
// tx request was sent to serveSend and is being transmitted on the wire. // tx request was sent to serveSend and is being transmitted on the wire.
// the transmission may block for indefinitely long though and // the transmission may block for indefinitely long though and
...@@ -371,23 +443,13 @@ func (c *Conn) Send(pkt *PktBuf) error { ...@@ -371,23 +443,13 @@ func (c *Conn) Send(pkt *PktBuf) error {
// NOTE after we return straight here serveSend won't be later // NOTE after we return straight here serveSend won't be later
// blocked on c.txerr<- because that backchannel is a non-blocking one. // blocked on c.txerr<- because that backchannel is a non-blocking one.
case <-c.closed: case <-c.closed:
// XXX also poll c.txerr
return ErrClosedConn return ErrClosedConn
// return errClosedConn(c.nodeLink.sendErr) // XXX locking ?
case err = <-c.txerr: case err = <-c.txerr:
} //fmt.Printf("%v <- c.txerr\n", err)
} return err
// if we got transmission error chances are it was due to underlying NodeLink
// being closed. If our Conn was also requested to be closed adjust err
// to ErrClosedConn along the way.
//
// ( reaching here is theoretically possible if both c.closed and
// c.txerr are ready above )
if err != nil {
select {
case <-c.closed:
err = ErrClosedConn
default:
} }
} }
...@@ -398,10 +460,12 @@ func (c *Conn) Send(pkt *PktBuf) error { ...@@ -398,10 +460,12 @@ func (c *Conn) Send(pkt *PktBuf) error {
func (c *Conn) Recv() (*PktBuf, error) { func (c *Conn) Recv() (*PktBuf, error) {
select { select {
case <-c.closed: case <-c.closed:
return nil, ErrClosedConn // XXX get err from c.nodeLink.recvErr
// XXX if nil -> ErrClosedConn ?
return nil, ErrClosedConn // XXX -> EOF ?
case pkt := <-c.rxq: case pkt := <-c.rxq:
return pkt, nil return pkt, nil // XXX error = ?
} }
} }
...@@ -488,15 +552,3 @@ func Listen(network, laddr string) (*Listener, error) { ...@@ -488,15 +552,3 @@ func Listen(network, laddr string) (*Listener, error) {
// //
// A reply to particular Ask packet, once received, will be delivered to // A reply to particular Ask packet, once received, will be delivered to
// corresponding goroutine which originally issued Ask XXX this can be put into interface // corresponding goroutine which originally issued Ask XXX this can be put into interface
// // Send notify packet to peer
// func (c *NodeLink) Notify(pkt XXX) error {
// // TODO
// }
//
// // Send packet and wait for replied answer packet
// func (c *NodeLink) Ask(pkt XXX) (answer Pkt, err error) {
// // TODO
// }
...@@ -158,6 +158,7 @@ func nodeLinkPipe() (nl1, nl2 *NodeLink) { ...@@ -158,6 +158,7 @@ func nodeLinkPipe() (nl1, nl2 *NodeLink) {
func TestNodeLink(t *testing.T) { func TestNodeLink(t *testing.T) {
// TODO catch exception -> add proper location from it -> t.Fatal (see git-backup) // TODO catch exception -> add proper location from it -> t.Fatal (see git-backup)
/*
// Close vs recvPkt // Close vs recvPkt
nl1, nl2 := _nodeLinkPipe(linkNoRecvSend, linkNoRecvSend) nl1, nl2 := _nodeLinkPipe(linkNoRecvSend, linkNoRecvSend)
wg := WorkGroup() wg := WorkGroup()
...@@ -206,6 +207,35 @@ func TestNodeLink(t *testing.T) { ...@@ -206,6 +207,35 @@ func TestNodeLink(t *testing.T) {
} }
xclose(nl1) xclose(nl1)
// Close vs recvPkt on another side
nl1, nl2 = _nodeLinkPipe(linkNoRecvSend, linkNoRecvSend)
wg = WorkGroup()
wg.Gox(func() {
tdelay()
xclose(nl2)
})
pkt, err = nl1.recvPkt()
if !(pkt == nil && err == io.EOF) { // NOTE io.EOF on Read per io.Pipe
t.Fatalf("NodeLink.recvPkt() after peer shutdown: pkt = %v err = %v", pkt, err)
}
xwait(wg)
xclose(nl1)
// Close vs sendPkt on another side
nl1, nl2 = _nodeLinkPipe(linkNoRecvSend, linkNoRecvSend)
wg = WorkGroup()
wg.Gox(func() {
tdelay()
xclose(nl2)
})
pkt = &PktBuf{[]byte("data")}
err = nl1.sendPkt(pkt)
if err != io.ErrClosedPipe { // NOTE io.ErrClosedPipe on Write per io.Pipe
t.Fatalf("NodeLink.sendPkt() after peer shutdown: pkt = %v err = %v", pkt, err)
}
xwait(wg)
xclose(nl1)
// raw exchange // raw exchange
nl1, nl2 = _nodeLinkPipe(linkNoRecvSend, linkNoRecvSend) nl1, nl2 = _nodeLinkPipe(linkNoRecvSend, linkNoRecvSend)
...@@ -277,14 +307,14 @@ func TestNodeLink(t *testing.T) { ...@@ -277,14 +307,14 @@ func TestNodeLink(t *testing.T) {
wg.Gox(func() { wg.Gox(func() {
pkt, err := c11.Recv() pkt, err := c11.Recv()
if !(pkt == nil && err == ErrClosedConn) { if !(pkt == nil && err == ErrClosedConn) {
exc.Raisef("Conn.Recv() after NodeLink.close: pkt = %v err = %v", pkt, err) exc.Raisef("Conn.Recv() after NodeLink close: pkt = %v err = %v", pkt, err)
} }
}) })
wg.Gox(func() { wg.Gox(func() {
pkt := &PktBuf{[]byte("data")} pkt := &PktBuf{[]byte("data")}
err := c12.Send(pkt) err := c12.Send(pkt)
if err != ErrClosedConn { if err != ErrClosedConn {
exc.Raisef("Conn.Send() after close: err = %v", err) exc.Raisef("Conn.Send() after NodeLink close: err = %v", err)
} }
}) })
tdelay() tdelay()
...@@ -293,7 +323,46 @@ func TestNodeLink(t *testing.T) { ...@@ -293,7 +323,46 @@ func TestNodeLink(t *testing.T) {
xclose(c11) xclose(c11)
xclose(c12) xclose(c12)
xclose(nl2) xclose(nl2)
*/
// NodeLink.Close vs Conn.Send/Recv on another side TODO
nl1, nl2 := _nodeLinkPipe(0, linkNoRecvSend)
c11 := nl1.NewConn()
c12 := nl1.NewConn()
wg := WorkGroup()
wg.Gox(func() {
println(">>> RECV START")
pkt, err := c11.Recv()
println(">>> recv wakeup")
if !(pkt == nil && err == ErrClosedConn) { // XXX -> EOF ?
exc.Raisef("Conn.Recv after peer NodeLink shutdown: pkt = %v err = %v", pkt, err)
}
println("recv ok")
})
wg.Gox(func() {
pkt := &PktBuf{[]byte("data")}
println(">>> SEND START")
err := c12.Send(pkt)
println(">>> send wakeup")
if err != io.ErrClosedPipe { // XXX we are here but what the error should be?
exc.Raisef("Conn.Send() after peer NodeLink shutdown: err = %v", err)
}
println(">>> SEND OK")
})
tdelay()
xclose(nl2)
println("111")
xwait(wg)
println("222")
xclose(c11)
println("aaa")
xclose(c12)
println("bbb")
xclose(nl1)
println("333")
/*
// Conn accept + exchange // Conn accept + exchange
nl1, nl2 = nodeLinkPipe() nl1, nl2 = nodeLinkPipe()
wg = WorkGroup() wg = WorkGroup()
...@@ -379,4 +448,5 @@ func TestNodeLink(t *testing.T) { ...@@ -379,4 +448,5 @@ func TestNodeLink(t *testing.T) {
xclose(c2) xclose(c2)
xclose(nl1) xclose(nl1)
xclose(nl2) xclose(nl2)
*/
} }
...@@ -100,7 +100,7 @@ func connAddr(conn *Conn) string { ...@@ -100,7 +100,7 @@ func connAddr(conn *Conn) string {
// ServeClient serves incoming connection on which peer identified itself as client // ServeClient serves incoming connection on which peer identified itself as client
func (stor *Storage) ServeClient(ctx context.Context, conn *Conn) { func (stor *Storage) ServeClient(ctx context.Context, conn *Conn) {
fmt.Printf("stor: serving new client conn %s\n", connAddr(conn) fmt.Printf("stor: serving new client conn %s\n", connAddr(conn))
// close connection when either cancelling or returning (e.g. due to an error) // close connection when either cancelling or returning (e.g. due to an error)
// ( when cancelling - conn.Close will signal to current IO to // ( when cancelling - conn.Close will signal to current IO to
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment