Commit 9a37b817 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 2d01bfb2
...@@ -92,8 +92,8 @@ import ( ...@@ -92,8 +92,8 @@ import (
"sync" "sync"
"time" "time"
"lab.nexedi.com/kirr/neo/go/neo/proto"
"lab.nexedi.com/kirr/neo/go/internal/packed" "lab.nexedi.com/kirr/neo/go/internal/packed"
"lab.nexedi.com/kirr/neo/go/neo/proto"
"github.com/someonegg/gocontainer/rbuf" "github.com/someonegg/gocontainer/rbuf"
...@@ -159,7 +159,7 @@ type NodeLink struct { ...@@ -159,7 +159,7 @@ type NodeLink struct {
// creating Head-of-line (HOL) blocking problem. // creating Head-of-line (HOL) blocking problem.
// //
// XXX ^^^ problem reproducible on deco but not on z6001 // XXX ^^^ problem reproducible on deco but not on z6001
const rxghandoff = true // XXX whether to do rxghandoff trick const rxghandoff = true // XXX whether to do rxghandoff trick
// Conn is a connection established over NodeLink. // Conn is a connection established over NodeLink.
// //
...@@ -221,7 +221,7 @@ type LinkError struct { ...@@ -221,7 +221,7 @@ type LinkError struct {
// ConnError is returned by Conn operations. // ConnError is returned by Conn operations.
type ConnError struct { type ConnError struct {
Link *NodeLink Link *NodeLink
ConnId uint32 // NOTE Conn's are reused - cannot use *Conn here ConnId uint32 // NOTE Conn's are reused - cannot use *Conn here
Op string Op string
Err error Err error
} }
...@@ -304,7 +304,7 @@ func (link *NodeLink) connAlloc(connId uint32) *Conn { ...@@ -304,7 +304,7 @@ func (link *NodeLink) connAlloc(connId uint32) *Conn {
// release releases connection to freelist. // release releases connection to freelist.
func (c *Conn) release() { func (c *Conn) release() {
c.reinit() // XXX just in case c.reinit() // XXX just in case
connPool.Put(c) connPool.Put(c)
} }
...@@ -313,25 +313,24 @@ func (c *Conn) reinit() { ...@@ -313,25 +313,24 @@ func (c *Conn) reinit() {
c.link = nil c.link = nil
c.connId = 0 c.connId = 0
// .rxq - set initially; does not change // .rxq - set initially; does not change
c.rxqWrite.Set(0) // XXX store relaxed? c.rxqWrite.Set(0) // XXX store relaxed?
c.rxqRead.Set(0) // ----//---- c.rxqRead.Set(0) // ----//----
c.rxdownFlag.Set(0) // ----//---- c.rxdownFlag.Set(0) // ----//----
c.rxerrOnce = sync.Once{} // XXX ok?
c.rxerrOnce = sync.Once{} // XXX ok?
// XXX vvv not strictly needed for light mode? // XXX vvv not strictly needed for light mode?
// ensureOpen(&c.rxdown) // ensureOpen(&c.rxdown)
c.rxdownOnce = sync.Once{} // XXX ok? c.rxdownOnce = sync.Once{} // XXX ok?
c.rxclosed.Set(0) c.rxclosed.Set(0)
// .txerr - never closed // .txerr - never closed
ensureOpen(&c.txdown) ensureOpen(&c.txdown)
c.txdownOnce = sync.Once{} // XXX ok? c.txdownOnce = sync.Once{} // XXX ok?
c.txclosed.Set(0) c.txclosed.Set(0)
c.closeOnce = sync.Once{} // XXX ok? c.closeOnce = sync.Once{} // XXX ok?
} }
// ensureOpen make sure *ch stays non-closed chan struct{} for signalling. // ensureOpen make sure *ch stays non-closed chan struct{} for signalling.
...@@ -372,7 +371,7 @@ func (link *NodeLink) _NewConn() (*Conn, error) { ...@@ -372,7 +371,7 @@ func (link *NodeLink) _NewConn() (*Conn, error) {
// nextConnId could wrap around uint32 limits - find first free slot to // nextConnId could wrap around uint32 limits - find first free slot to
// not blindly replace existing connection // not blindly replace existing connection
for i := uint32(0) ;; i++ { for i := uint32(0); ; i++ {
_, exists := link.connTab[link.nextConnId] _, exists := link.connTab[link.nextConnId]
if !exists { if !exists {
break break
...@@ -395,7 +394,7 @@ func (link *NodeLink) shutdownAX() { ...@@ -395,7 +394,7 @@ func (link *NodeLink) shutdownAX() {
link.axdown1.Do(func() { link.axdown1.Do(func() {
// close(link.axdown) // close(link.axdown)
link.axdownFlag.Set(1) // XXX cmpxchg and return if already down? link.axdownFlag.Set(1) // XXX cmpxchg and return if already down?
// drain all connections from .acceptq: // drain all connections from .acceptq:
// - something could be already buffered there // - something could be already buffered there
...@@ -588,7 +587,7 @@ func (c *Conn) downRX(errMsg *proto.Error) { ...@@ -588,7 +587,7 @@ func (c *Conn) downRX(errMsg *proto.Error) {
// time to keep record of a closed connection so that we can properly reply // time to keep record of a closed connection so that we can properly reply
// "connection closed" if a packet comes in with same connID. // "connection closed" if a packet comes in with same connID.
var connKeepClosed = 1*time.Minute var connKeepClosed = 1 * time.Minute
// CloseRecv closes reading end of connection. // CloseRecv closes reading end of connection.
// //
...@@ -1311,7 +1310,7 @@ func (c *Conn) err(op string, e error) error { ...@@ -1311,7 +1310,7 @@ func (c *Conn) err(op string, e error) error {
// msgPack allocates pktBuf and encodes msg into it. // msgPack allocates pktBuf and encodes msg into it.
func msgPack(connId uint32, msg proto.Msg) *pktBuf { func msgPack(connId uint32, msg proto.Msg) *pktBuf {
l := msg.NEOMsgEncodedLen() l := msg.NEOMsgEncodedLen()
buf := pktAlloc(proto.PktHeaderLen+l) buf := pktAlloc(proto.PktHeaderLen + l)
h := buf.Header() h := buf.Header()
h.ConnId = packed.Hton32(connId) h.ConnId = packed.Hton32(connId)
...@@ -1370,7 +1369,7 @@ func (link *NodeLink) sendMsg(connId uint32, msg proto.Msg) error { ...@@ -1370,7 +1369,7 @@ func (link *NodeLink) sendMsg(connId uint32, msg proto.Msg) error {
traceMsgSendPre(link, connId, msg) traceMsgSendPre(link, connId, msg)
buf := msgPack(connId, msg) buf := msgPack(connId, msg)
return link.sendPkt(buf) // XXX more context in err? (msg type) return link.sendPkt(buf) // XXX more context in err? (msg type)
// FIXME ^^^ shutdown whole link on error // FIXME ^^^ shutdown whole link on error
} }
...@@ -1505,11 +1504,11 @@ type Request struct { ...@@ -1505,11 +1504,11 @@ type Request struct {
func (link *NodeLink) Recv1() (Request, error) { func (link *NodeLink) Recv1() (Request, error) {
conn, err := link.Accept() conn, err := link.Accept()
if err != nil { if err != nil {
return Request{}, err // XXX or return *Request? (want to avoid alloc) return Request{}, err // XXX or return *Request? (want to avoid alloc)
} }
// NOTE serveRecv guaranty that when a conn is accepted, there is 1 message in conn.rxq // NOTE serveRecv guaranty that when a conn is accepted, there is 1 message in conn.rxq
msg, err := conn.Recv() // XXX better directly from <-rxq ? msg, err := conn.Recv() // XXX better directly from <-rxq ?
if err != nil { if err != nil {
conn.Close() // XXX -> conn.lightClose() conn.Close() // XXX -> conn.lightClose()
return Request{}, err return Request{}, err
...@@ -1539,7 +1538,7 @@ func (req *Request) Reply(resp proto.Msg) error { ...@@ -1539,7 +1538,7 @@ func (req *Request) Reply(resp proto.Msg) error {
// The request object cannot be used any more after call to Close. // The request object cannot be used any more after call to Close.
// //
// See "Lightweight mode" in top-level package doc for overview. // See "Lightweight mode" in top-level package doc for overview.
func (req *Request) Close() { // XXX +error? func (req *Request) Close() { // XXX +error?
// XXX req.Msg.Release() ? // XXX req.Msg.Release() ?
req.Msg = nil req.Msg = nil
req.conn.lightClose() req.conn.lightClose()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment