Commit da2025b4 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent e43a68e4
...@@ -35,9 +35,8 @@ import ( ...@@ -35,9 +35,8 @@ import (
"github.com/someonegg/gocontainer/rbuf" "github.com/someonegg/gocontainer/rbuf"
"lab.nexedi.com/kirr/neo/go/xcommon/xnet"
"lab.nexedi.com/kirr/go123/xbytes" "lab.nexedi.com/kirr/go123/xbytes"
"lab.nexedi.com/kirr/neo/go/xcommon/xnet"
) )
// NodeLink is a node-node link in NEO // NodeLink is a node-node link in NEO
...@@ -78,8 +77,8 @@ type NodeLink struct { ...@@ -78,8 +77,8 @@ type NodeLink struct {
errMu sync.Mutex errMu sync.Mutex
errRecv error // error got from recvPkt on shutdown errRecv error // error got from recvPkt on shutdown
axclosed int32 // whether CloseAccept was called axclosed atomic32 // whether CloseAccept was called
closed int32 // whether Close was called closed atomic32 // whether Close was called
rxbuf rbuf.RingBuf // buffer for reading from peerLink rxbuf rbuf.RingBuf // buffer for reading from peerLink
} }
...@@ -95,18 +94,12 @@ type Conn struct { ...@@ -95,18 +94,12 @@ type Conn struct {
connId uint32 connId uint32
rxq chan *PktBuf // received packets for this Conn go here rxq chan *PktBuf // received packets for this Conn go here
rxqActive int32 // atomic: 1 while serveRecv is doing `rxq <- ...` rxqActive atomic32 // 1 while serveRecv is doing `rxq <- ...`
rxdownFlag int32 // atomic: 1 when RX is marked no longer operational rxdownFlag atomic32 // 1 when RX is marked no longer operational
rxerrOnce sync.Once // rx error is reported only once - then it is link down or closed XXX !light? rxerrOnce sync.Once // rx error is reported only once - then it is link down or closed XXX !light?
errMsg *Error // error message for peer if rx is down XXX try to do without it errMsg *Error // error message for peer if rx is down XXX try to do without it
txerr chan error // transmit results for this Conn go back here
txdown chan struct{} // ready when Conn TX is marked as no longer operational
txdownOnce sync.Once // tx shutdown may be called by both Close and nodelink.shutdown
txclosed int32 // whether CloseSend was called
// there are two modes a Conn could be used: // there are two modes a Conn could be used:
// - full mode - where full Conn functionality is working, and // - full mode - where full Conn functionality is working, and
// - light mode - where only subset functionality is working // - light mode - where only subset functionality is working
...@@ -119,7 +112,13 @@ type Conn struct { ...@@ -119,7 +112,13 @@ type Conn struct {
rxdown chan struct{} // ready when RX is marked no longer operational rxdown chan struct{} // ready when RX is marked no longer operational
rxdownOnce sync.Once // ----//---- XXX review rxdownOnce sync.Once // ----//---- XXX review
rxclosed int32 // whether CloseRecv was called rxclosed atomic32 // whether CloseRecv was called
txerr chan error // transmit results for this Conn go back here
txdown chan struct{} // ready when Conn TX is marked as no longer operational
txdownOnce sync.Once // tx shutdown may be called by both Close and nodelink.shutdown
txclosed atomic32 // whether CloseSend was called
// closing Conn is shutdown + some cleanup work to remove it from // closing Conn is shutdown + some cleanup work to remove it from
...@@ -143,7 +142,8 @@ type LinkError struct { ...@@ -143,7 +142,8 @@ type LinkError struct {
// ConnError is returned by Conn operations // ConnError is returned by Conn operations
type ConnError struct { type ConnError struct {
Conn *Conn // XXX Conn's are reused -> connId/link explicitly? Link *NodeLink
ConnId uint32 // NOTE Conn's are reused - cannot use *Conn here
Op string Op string
Err error Err error
} }
...@@ -213,7 +213,7 @@ var connPool = sync.Pool{New: func() interface{} { ...@@ -213,7 +213,7 @@ var connPool = sync.Pool{New: func() interface{} {
}} }}
// connAlloc allocates Conn from freelist // connAlloc allocates Conn from freelist
func connAlloc(link *NodeLink, connId uint32) *Conn { func (link *NodeLink) connAlloc(connId uint32) *Conn {
c := connPool.Get().(*Conn) c := connPool.Get().(*Conn)
c.reinit() c.reinit()
c.link = link c.link = link
...@@ -227,24 +227,48 @@ func (c *Conn) release() { ...@@ -227,24 +227,48 @@ func (c *Conn) release() {
connPool.Put(c) connPool.Put(c)
} }
// FIXME // reinit reinitializes connection after reallocating it from freelist
// FIXME
// FIXME
func (c *Conn) reinit() { func (c *Conn) reinit() {
// FIXME review and put everything here !!! // .link - already set XXX set =nil ?
// .connId - already set XXX set =0 ?
// .rxq - set initially; does not change
c.connId = 0 c.connId = 0
c.rxqActive = 0 c.rxqActive.Set(0) // XXX store relaxed?
c.rxdownFlag = 0 c.rxdownFlag.Set(0) // ----//----
// XXX rxerr*
// XXX errMsg c.rxerrOnce = sync.Once{} // XXX ok?
c.errMsg = nil // XXX what here?
// XXX more // XXX vvv not strictly needed for light mode?
ensureOpen(&c.rxdown)
c.rxdownOnce = sync.Once{} // XXX ok?
c.rxclosed.Set(0)
// .txerr - never closed
ensureOpen(&c.txdown)
c.txdownOnce = sync.Once{} // XXX ok?
c.txclosed.Set(0)
c.closeOnce = sync.Once{} // XXX ok?
}
// ensureOpen make sure *ch stays non-closed chan struct{} for signalling.
// if it is already closed, the channel is remade.
func ensureOpen(ch *chan struct{}) {
select {
case <-*ch:
*ch = make(chan struct{})
default:
// not closed - nothing to do
}
} }
// newConn creates new Conn with id=connId and registers it into connTab. // newConn creates new Conn with id=connId and registers it into connTab.
// Must be called with connMu held. // Must be called with connMu held.
func (nl *NodeLink) newConn(connId uint32) *Conn { func (nl *NodeLink) newConn(connId uint32) *Conn {
c := connAlloc(nl, connId) c := nl.connAlloc(connId)
nl.connTab[connId] = c nl.connTab[connId] = c
return c return c
} }
...@@ -254,7 +278,7 @@ func (nl *NodeLink) NewConn() (*Conn, error) { ...@@ -254,7 +278,7 @@ func (nl *NodeLink) NewConn() (*Conn, error) {
nl.connMu.Lock() nl.connMu.Lock()
defer nl.connMu.Unlock() defer nl.connMu.Unlock()
if nl.connTab == nil { if nl.connTab == nil {
if atomic.LoadInt32(&nl.closed) != 0 { if nl.closed.Get() != 0 {
return nil, nl.err("newconn", ErrLinkClosed) return nil, nl.err("newconn", ErrLinkClosed)
} }
return nil, nl.err("newconn", ErrLinkDown) return nil, nl.err("newconn", ErrLinkDown)
...@@ -349,7 +373,7 @@ func (nl *NodeLink) shutdown() { ...@@ -349,7 +373,7 @@ func (nl *NodeLink) shutdown() {
// //
// It is safet to call CloseAccept several times. // It is safet to call CloseAccept several times.
func (link *NodeLink) CloseAccept() { func (link *NodeLink) CloseAccept() {
atomic.StoreInt32(&link.axclosed, 1) link.axclosed.Set(1)
link.shutdownAX() link.shutdownAX()
} }
...@@ -360,8 +384,8 @@ func (link *NodeLink) CloseAccept() { ...@@ -360,8 +384,8 @@ func (link *NodeLink) CloseAccept() {
// Underlying raw connection is closed. // Underlying raw connection is closed.
// It is safe to call Close several times. // It is safe to call Close several times.
func (nl *NodeLink) Close() error { func (nl *NodeLink) Close() error {
atomic.StoreInt32(&nl.axclosed, 1) nl.axclosed.Set(1)
atomic.StoreInt32(&nl.closed, 1) nl.closed.Set(1)
nl.shutdown() nl.shutdown()
nl.downWg.Wait() nl.downWg.Wait()
return nl.err("close", nl.errClose) return nl.err("close", nl.errClose)
...@@ -390,7 +414,7 @@ func (c *Conn) shutdownRX(errMsg *Error) { ...@@ -390,7 +414,7 @@ func (c *Conn) shutdownRX(errMsg *Error) {
// downRX marks .rxq as no longer operational. // downRX marks .rxq as no longer operational.
func (c *Conn) downRX(errMsg *Error) { func (c *Conn) downRX(errMsg *Error) {
c.errMsg = errMsg c.errMsg = errMsg
atomic.StoreInt32(&c.rxdownFlag, 1) // XXX cmpxchg and return if already down? c.rxdownFlag.Set(1) // XXX cmpxchg and return if already down?
// dequeue all packets already queued in c.rxq // dequeue all packets already queued in c.rxq
// (once serveRecv sees c.rxdown it won't try to put new packets into // (once serveRecv sees c.rxdown it won't try to put new packets into
...@@ -403,7 +427,7 @@ loop: ...@@ -403,7 +427,7 @@ loop:
i++ i++
default: default:
if atomic.LoadInt32(&c.rxqActive) == 0 && len(c.rxq) == 0 { if c.rxqActive.Get() == 0 && len(c.rxq) == 0 {
break loop break loop
} }
} }
...@@ -444,7 +468,7 @@ func (c *Conn) lightClose() { ...@@ -444,7 +468,7 @@ func (c *Conn) lightClose() {
// //
// It is safe to call CloseRecv several times. // It is safe to call CloseRecv several times.
func (c *Conn) CloseRecv() { func (c *Conn) CloseRecv() {
atomic.StoreInt32(&c.rxclosed, 1) c.rxclosed.Set(1)
c.shutdownRX(errConnClosed) c.shutdownRX(errConnClosed)
} }
...@@ -459,8 +483,8 @@ func (c *Conn) CloseRecv() { ...@@ -459,8 +483,8 @@ func (c *Conn) CloseRecv() {
func (c *Conn) Close() error { func (c *Conn) Close() error {
nl := c.link nl := c.link
c.closeOnce.Do(func() { c.closeOnce.Do(func() {
atomic.StoreInt32(&c.rxclosed, 1) c.rxclosed.Set(1)
atomic.StoreInt32(&c.txclosed, 1) c.txclosed.Set(1)
c.shutdown() c.shutdown()
// adjust link.connTab // adjust link.connTab
...@@ -509,10 +533,10 @@ func (c *Conn) Close() error { ...@@ -509,10 +533,10 @@ func (c *Conn) Close() error {
// errAcceptShutdownAX returns appropriate error when link.axdown is found ready in Accept // errAcceptShutdownAX returns appropriate error when link.axdown is found ready in Accept
func (link *NodeLink) errAcceptShutdownAX() error { func (link *NodeLink) errAcceptShutdownAX() error {
switch { switch {
case atomic.LoadInt32(&link.closed) != 0: case link.closed.Get() != 0:
return ErrLinkClosed return ErrLinkClosed
case atomic.LoadInt32(&link.axclosed) != 0: case link.axclosed.Get() != 0:
return ErrLinkNoListen return ErrLinkNoListen
default: default:
...@@ -545,10 +569,10 @@ func (nl *NodeLink) Accept(/*ctx context.Context*/) (*Conn, error) { ...@@ -545,10 +569,10 @@ func (nl *NodeLink) Accept(/*ctx context.Context*/) (*Conn, error) {
// errRecvShutdown returns appropriate error when c.rxdown is found ready in recvPkt // errRecvShutdown returns appropriate error when c.rxdown is found ready in recvPkt
func (c *Conn) errRecvShutdown() error { func (c *Conn) errRecvShutdown() error {
switch { switch {
case atomic.LoadInt32(&c.rxclosed) != 0: case c.rxclosed.Get() != 0:
return ErrClosedConn return ErrClosedConn
case atomic.LoadInt32(&c.link.closed) != 0: case c.link.closed.Get() != 0:
return ErrLinkClosed return ErrLinkClosed
default: default:
...@@ -643,12 +667,12 @@ func (nl *NodeLink) serveRecv() { ...@@ -643,12 +667,12 @@ func (nl *NodeLink) serveRecv() {
// NOTE rxq must be buffered with at least 1 element so that // NOTE rxq must be buffered with at least 1 element so that
// queuing pkt succeeds for incoming connection that is not yet // queuing pkt succeeds for incoming connection that is not yet
// there in acceptq. // there in acceptq.
atomic.StoreInt32(&conn.rxqActive, 1) conn.rxqActive.Set(1)
rxdown := atomic.LoadInt32(&conn.rxdownFlag) != 0 rxdown := conn.rxdownFlag.Get() != 0
if !rxdown { if !rxdown {
conn.rxq <- pkt conn.rxq <- pkt
} }
atomic.StoreInt32(&conn.rxqActive, 0) conn.rxqActive.Set(0)
/* /*
// XXX goes away in favour of .rxdownFlag; reasons // XXX goes away in favour of .rxdownFlag; reasons
...@@ -753,14 +777,14 @@ type txReq struct { ...@@ -753,14 +777,14 @@ type txReq struct {
// errSendShutdown returns appropriate error when c.txdown is found ready in Send // errSendShutdown returns appropriate error when c.txdown is found ready in Send
func (c *Conn) errSendShutdown() error { func (c *Conn) errSendShutdown() error {
switch { switch {
case atomic.LoadInt32(&c.txclosed) != 0: case c.txclosed.Get() != 0:
return ErrClosedConn return ErrClosedConn
// the only other error possible besides Conn being .Close()'ed is that // the only other error possible besides Conn being .Close()'ed is that
// NodeLink was closed/shutdowned itself - on actual IO problems corresponding // NodeLink was closed/shutdowned itself - on actual IO problems corresponding
// error is delivered to particular Send that caused it. // error is delivered to particular Send that caused it.
case atomic.LoadInt32(&c.link.closed) != 0: case c.link.closed.Get() != 0:
return ErrLinkClosed return ErrLinkClosed
default: default:
...@@ -1253,7 +1277,7 @@ func (e *LinkError) Error() string { ...@@ -1253,7 +1277,7 @@ func (e *LinkError) Error() string {
} }
func (e *ConnError) Error() string { func (e *ConnError) Error() string {
return fmt.Sprintf("%s: %s: %s", e.Conn, e.Op, e.Err) return fmt.Sprintf("%s .%d: %s: %s", e.Link, e.ConnId, e.Op, e.Err)
} }
func (e *LinkError) Cause() error { return e.Err } func (e *LinkError) Cause() error { return e.Err }
...@@ -1270,7 +1294,7 @@ func (c *Conn) err(op string, e error) error { ...@@ -1270,7 +1294,7 @@ func (c *Conn) err(op string, e error) error {
if e == nil { if e == nil {
return nil return nil
} }
return &ConnError{Conn: c, Op: op, Err: e} return &ConnError{Link: c.link, ConnId: c.connId, Op: op, Err: e}
} }
...@@ -1296,7 +1320,7 @@ func (c *Conn) Recv() (Msg, error) { ...@@ -1296,7 +1320,7 @@ func (c *Conn) Recv() (Msg, error) {
if msgType == nil { if msgType == nil {
err = fmt.Errorf("invalid msgCode (%d)", msgCode) err = fmt.Errorf("invalid msgCode (%d)", msgCode)
// XXX "decode" -> "recv: decode"? // XXX "decode" -> "recv: decode"?
return nil, &ConnError{Conn: c, Op: "decode", Err: err} return nil, c.err("decode", err)
} }
// TODO use free-list for decoded messages + when possible decode in-place // TODO use free-list for decoded messages + when possible decode in-place
...@@ -1306,7 +1330,7 @@ func (c *Conn) Recv() (Msg, error) { ...@@ -1306,7 +1330,7 @@ func (c *Conn) Recv() (Msg, error) {
_, err = msg.neoMsgDecode(pkt.Payload()) _, err = msg.neoMsgDecode(pkt.Payload())
if err != nil { if err != nil {
return nil, &ConnError{Conn: c, Op: "decode", Err: err} // XXX "decode:" is already in ErrDecodeOverflow return nil, c.err("decode", err) // XXX "decode:" is already in ErrDecodeOverflow
} }
traceConnRecv(c, msg) traceConnRecv(c, msg)
...@@ -1355,7 +1379,7 @@ func (c *Conn) Expect(msgv ...Msg) (which int, err error) { ...@@ -1355,7 +1379,7 @@ func (c *Conn) Expect(msgv ...Msg) (which int, err error) {
if msg.neoMsgCode() == msgCode { if msg.neoMsgCode() == msgCode {
_, err = msg.neoMsgDecode(pkt.Payload()) _, err = msg.neoMsgDecode(pkt.Payload())
if err != nil { if err != nil {
return -1, &ConnError{Conn: c, Op: "decode", Err: err} return -1, c.err("decode", err)
} }
return i, nil return i, nil
} }
...@@ -1364,11 +1388,11 @@ func (c *Conn) Expect(msgv ...Msg) (which int, err error) { ...@@ -1364,11 +1388,11 @@ func (c *Conn) Expect(msgv ...Msg) (which int, err error) {
// unexpected message // unexpected message
msgType := msgTypeRegistry[msgCode] msgType := msgTypeRegistry[msgCode]
if msgType == nil { if msgType == nil {
return -1, &ConnError{c, "decode", fmt.Errorf("invalid msgCode (%d)", msgCode)} return -1, c.err("decode", fmt.Errorf("invalid msgCode (%d)", msgCode))
} }
// XXX also add which messages were expected ? // XXX also add which messages were expected ?
return -1, &ConnError{c, "recv", fmt.Errorf("unexpected message: %v", msgType)} return -1, c.err("recv", fmt.Errorf("unexpected message: %v", msgType))
} }
// Ask sends request and receives response. // Ask sends request and receives response.
...@@ -1502,3 +1526,18 @@ func (link *NodeLink) Ask1(req Msg, resp Msg) (err error) { ...@@ -1502,3 +1526,18 @@ func (link *NodeLink) Ask1(req Msg, resp Msg) (err error) {
func (req *Request) Link() *NodeLink { func (req *Request) Link() *NodeLink {
return req.conn.Link() return req.conn.Link()
} }
// ---- misc ----
// syntax sugar for atomic load/store to raise signal/noise in logic
type atomic32 struct {
v int32 // struct member so `var a atomic32; if a == 0 ...` does not work
}
func (a *atomic32) Get() int32 {
return atomic.LoadInt32(&a.v)
}
func (a *atomic32) Set(v int32) {
atomic.StoreInt32(&a.v, v)
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment