Commit e43a68e4 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent b85f5087
...@@ -58,7 +58,6 @@ import ( ...@@ -58,7 +58,6 @@ import (
type NodeLink struct { type NodeLink struct {
peerLink net.Conn // raw conn to peer peerLink net.Conn // raw conn to peer
connPool connPool
connMu sync.Mutex connMu sync.Mutex
connTab map[uint32]*Conn // connId -> Conn associated with connId connTab map[uint32]*Conn // connId -> Conn associated with connId
nextConnId uint32 // next connId to use for Conn initiated by us nextConnId uint32 // next connId to use for Conn initiated by us
...@@ -129,38 +128,6 @@ type Conn struct { ...@@ -129,38 +128,6 @@ type Conn struct {
closeOnce sync.Once closeOnce sync.Once
} }
// connPool is free-list for Conn
type connPool struct {
sync.Pool
}
New := func() *Conn {
return &Conn{
rxq: make(chan *PktBuf, 1), // NOTE non-blocking - see serveRecv XXX +buf
}
func (p *connPool) Get() *Conn {
c := p.Pool.Get().(*Conn)
c.reinit()
return c
}
func (c *Conn) reinit() {
c.connId = 0
c.rxqActive = 0
c.rxdownFlag = 0
// XXX rxerr*
// XXX errMsg
// XXX more
}
func (p *connPool) Put(c *Conn) {
p.Pool.Put(c)
}
var ErrLinkClosed = errors.New("node link is closed") // operations on closed NodeLink var ErrLinkClosed = errors.New("node link is closed") // operations on closed NodeLink
var ErrLinkDown = errors.New("node link is down") // e.g. due to IO error var ErrLinkDown = errors.New("node link is down") // e.g. due to IO error
var ErrLinkNoListen = errors.New("node link is not listening for incoming connections") var ErrLinkNoListen = errors.New("node link is not listening for incoming connections")
...@@ -234,16 +201,50 @@ func newNodeLink(conn net.Conn, role LinkRole) *NodeLink { ...@@ -234,16 +201,50 @@ func newNodeLink(conn net.Conn, role LinkRole) *NodeLink {
return nl return nl
} }
// newConn creates new Conn with id=connId and registers it into connTab. // connPool is freelist for Conn
// Must be called with connMu held. // XXX make it per-link?
func (nl *NodeLink) newConn(connId uint32) *Conn { var connPool = sync.Pool{New: func() interface{} {
c := &Conn{link: nl, return &Conn{
connId: connId,
rxq: make(chan *PktBuf, 1), // NOTE non-blocking - see serveRecv XXX +buf rxq: make(chan *PktBuf, 1), // NOTE non-blocking - see serveRecv XXX +buf
txerr: make(chan error, 1), // NOTE non-blocking - see Conn.Send txerr: make(chan error, 1), // NOTE non-blocking - see Conn.Send
txdown: make(chan struct{}), txdown: make(chan struct{}),
rxdown: make(chan struct{}), rxdown: make(chan struct{}),
} }
}}
// connAlloc allocates Conn from freelist
func connAlloc(link *NodeLink, connId uint32) *Conn {
c := connPool.Get().(*Conn)
c.reinit()
c.link = link
c.connId = connId
return c
}
// release releases connection to freelist
func (c *Conn) release() {
c.reinit() // XXX just in case
connPool.Put(c)
}
// FIXME
// FIXME
// FIXME
func (c *Conn) reinit() {
// FIXME review and put everything here !!!
c.connId = 0
c.rxqActive = 0
c.rxdownFlag = 0
// XXX rxerr*
// XXX errMsg
// XXX more
}
// newConn creates new Conn with id=connId and registers it into connTab.
// Must be called with connMu held.
func (nl *NodeLink) newConn(connId uint32) *Conn {
c := connAlloc(nl, connId)
nl.connTab[connId] = c nl.connTab[connId] = c
return c return c
} }
...@@ -419,11 +420,11 @@ loop: ...@@ -419,11 +420,11 @@ loop:
// "connection closed" if a packet comes in with same connID. // "connection closed" if a packet comes in with same connID.
var connKeepClosed = 1*time.Minute var connKeepClosed = 1*time.Minute
// release releases connection to freelist. // lightClose closes light connection.
// //
// No Send or Recv must be in flight. // No Send or Recv must be in flight.
// The caller must not use c after call to release. // The caller must not use c after call to close - the connection is returned to freelist.
func (c *Conn) release() { func (c *Conn) lightClose() {
nl := c.link nl := c.link
nl.connMu.Lock() nl.connMu.Lock()
if nl.connTab != nil { if nl.connTab != nil {
...@@ -433,8 +434,7 @@ func (c *Conn) release() { ...@@ -433,8 +434,7 @@ func (c *Conn) release() {
} }
nl.connMu.Unlock() nl.connMu.Unlock()
// XXX just in case c.release()
c.reinit()
} }
// CloseRecv closes reading end of connection. // CloseRecv closes reading end of connection.
...@@ -1448,7 +1448,7 @@ func (req *Request) Release() { ...@@ -1448,7 +1448,7 @@ func (req *Request) Release() {
//return req.conn.Close() //return req.conn.Close()
// XXX req.Msg.Release() ? // XXX req.Msg.Release() ?
req.Msg = nil req.Msg = nil
req.conn.release() req.conn.lightClose()
req.conn = nil // just in case req.conn = nil // just in case
} }
...@@ -1480,12 +1480,7 @@ func (link *NodeLink) Ask1(req Msg, resp Msg) (err error) { ...@@ -1480,12 +1480,7 @@ func (link *NodeLink) Ask1(req Msg, resp Msg) (err error) {
return err return err
} }
defer func() { defer conn.lightClose()
err2 := conn.Close()
if err == nil {
err = err2
}
}()
err = conn.Send(req) err = conn.Send(req)
if err != nil { if err != nil {
......
...@@ -992,6 +992,9 @@ func benchmarkLinkRTT(b *testing.B, l1, l2 *NodeLink) { ...@@ -992,6 +992,9 @@ func benchmarkLinkRTT(b *testing.B, l1, l2 *NodeLink) {
if !(obj.Oid == get.Oid && obj.Serial == get.Serial && obj.DataSerial == get.Tid) { if !(obj.Oid == get.Oid && obj.Serial == get.Serial && obj.DataSerial == get.Tid) {
b.Fatalf("read back: %v ; requested %v", obj, get) b.Fatalf("read back: %v ; requested %v", obj, get)
} }
// XXX must be obj.Release
obj.Data.XRelease()
} }
xclose(l1) xclose(l1)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment