Commit b85f5087 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent d38c5c77
......@@ -38,7 +38,6 @@ import (
"lab.nexedi.com/kirr/neo/go/xcommon/xnet"
"lab.nexedi.com/kirr/go123/xbytes"
"lab.nexedi.com/kirr/go123/xerr"
)
// NodeLink is a node-node link in NEO
......@@ -59,6 +58,7 @@ import (
type NodeLink struct {
peerLink net.Conn // raw conn to peer
connPool connPool
connMu sync.Mutex
connTab map[uint32]*Conn // connId -> Conn associated with connId
nextConnId uint32 // next connId to use for Conn initiated by us
......@@ -129,6 +129,37 @@ type Conn struct {
closeOnce sync.Once
}
// connPool is free-list for Conn
type connPool struct {
sync.Pool
}
New := func() *Conn {
return &Conn{
rxq: make(chan *PktBuf, 1), // NOTE non-blocking - see serveRecv XXX +buf
}
func (p *connPool) Get() *Conn {
c := p.Pool.Get().(*Conn)
c.reinit()
return c
}
func (c *Conn) reinit() {
c.connId = 0
c.rxqActive = 0
c.rxdownFlag = 0
// XXX rxerr*
// XXX errMsg
// XXX more
}
func (p *connPool) Put(c *Conn) {
p.Pool.Put(c)
}
var ErrLinkClosed = errors.New("node link is closed") // operations on closed NodeLink
var ErrLinkDown = errors.New("node link is down") // e.g. due to IO error
......@@ -358,7 +389,7 @@ func (c *Conn) shutdownRX(errMsg *Error) {
// downRX marks .rxq as no longer operational.
func (c *Conn) downRX(errMsg *Error) {
c.errMsg = errMsg
atomic.StoreInt32(&c.rxdownFlag, 1)
atomic.StoreInt32(&c.rxdownFlag, 1) // XXX cmpxchg and return if already down?
// dequeue all packets already queued in c.rxq
// (once serveRecv sees c.rxdown it won't try to put new packets into
......@@ -388,6 +419,24 @@ loop:
// "connection closed" if a packet comes in with same connID.
var connKeepClosed = 1*time.Minute
// release releases connection to freelist.
//
// No Send or Recv must be in flight.
// The caller must not use c after call to release.
func (c *Conn) release() {
nl := c.link
nl.connMu.Lock()
if nl.connTab != nil {
// XXX find way to keep initiated by us conn as closed for some time (see Conn.Close)
// but timer costs too much...
delete(nl.connTab, c.connId)
}
nl.connMu.Unlock()
// XXX just in case
c.reinit()
}
// CloseRecv closes reading end of connection.
//
// Any blocked Recv*() will be unblocked and return error.
......@@ -1368,9 +1417,9 @@ func (link *NodeLink) Recv1() (Request, error) {
}
// NOTE serveRecv guaranty that when a conn is accepted, there is 1 message in conn.rxq
msg, err := conn.Recv()
msg, err := conn.Recv() // XXX directly from <-rxq
if err != nil {
conn.Close() // XXX -> lclose(conn)
conn.Close() // XXX -> conn.release
return Request{}, err
}
......@@ -1386,17 +1435,21 @@ func (link *NodeLink) Recv1() (Request, error) {
//
// XXX doc
func (req *Request) Reply(resp Msg) error {
err1 := req.conn.Send(resp)
err2 := req.conn.Close()
return xerr.First(err1, err2)
return req.conn.Send(resp)
//err1 := req.conn.Send(resp)
//err2 := req.conn.Close() // XXX no - only Send here?
//return xerr.First(err1, err2)
}
// Close should be called to free request resources for requests without a reply.
// Release must be called to free request resources.
//
// XXX doc
// It is safe to call Close several times.
func (req *Request) Close() error {
return req.conn.Close()
func (req *Request) Release() {
//return req.conn.Close()
// XXX req.Msg.Release() ?
req.Msg = nil
req.conn.release()
req.conn = nil // just in case
}
......@@ -1409,11 +1462,11 @@ func (link *NodeLink) Send1(msg Msg) error {
return err
}
// XXX conn.CloseRecv() ?
conn.downRX(errConnClosed) // FIXME just new conn this way
err1 := conn.Send(msg)
err2 := conn.Close()
return xerr.First(err1, err2)
err = conn.Send(msg)
conn.release()
return err
}
......
......@@ -970,6 +970,8 @@ func benchmarkLinkRTT(b *testing.B, l1, l2 *NodeLink) {
b.Fatal(err)
}
}
req.Release()
}
}()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment