Commit 0430db7a authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 339da2c1
......@@ -50,9 +50,6 @@ import (
// new connection can be accepted via .Accept(), and all further communication
// send/receive exchange will be happening in between those 2 connections.
//
// For a node to be able to accept new incoming connection it has to have
// "server" role - see newNodeLink() for details. XXX might change to everyone is able to accept.
//
// A NodeLink has to be explicitly closed, once it is no longer needed.
//
// It is safe to use NodeLink from multiple goroutines simultaneously.
......@@ -65,10 +62,13 @@ type NodeLink struct {
serveWg sync.WaitGroup // for serve{Send,Recv}
acceptq chan *Conn // queue of incoming connections for Accept
// = nil if NodeLink is not accepting connections
// = nil if NodeLink is not accepting connections <- XXX no
txq chan txReq // tx requests from Conns go via here
// (rx packets are routed to Conn.rxq)
axdown chan struct{} // ready when accept is marked as no longer operational
axdown1 sync.Once // CloseAccept may be called severall times
down chan struct{} // ready when NodeLink is marked as no longer operational
downOnce sync.Once // shutdown may be due to both Close and IO error
downWg sync.WaitGroup // for activities at shutdown
......@@ -77,7 +77,8 @@ type NodeLink struct {
errMu sync.Mutex
errRecv error // error got from recvPkt on shutdown
closed uint32 // whether Close was called
axclosed int32 // whether CloseAccept was called
closed int32 // whether Close was called
}
// Conn is a connection established over NodeLink
......@@ -101,7 +102,11 @@ type Conn struct {
rxclosed int32 // whether CloseRecv was called
txclosed int32 // whether CloseSend was called
errMsg *Error // error message for replyNoConn
errMsg *Error // error message for peer if rx is down
// after Close Conn is kept for some time in link.connTab so peer could
// receive "connection closed" and then GC'ed
gcOnce sync.Once
}
......@@ -136,26 +141,6 @@ const (
linkFlagsMask LinkRole = (1<<32 - 1) << 16
)
/*
// LinkFlags allow to customize NodeLink behaviour
type LinkFlags int
const (
// LinkListen tells link to accept incoming connections.
//
// NOTE it is valid use-case even for link originating through DialLink
// to accept incoming connections over established channel.
//
// NOTE listen put to flags - not e.g. link.Listen() call - because
// otherwise e.g. for client originated links if after DialLink client
// calls link.Listen() there is a race window: before Listen is called
// in which peer could start connecting to our side.
LinkListen LinkFlags = 1 << iota
// for testing:
linkNoRecvSend LinkFlags = 1 << 16 // do not spawn serveRecv & serveSend
)
*/
// newNodeLink makes a new NodeLink from already established net.Conn
//
// Role specifies how to treat our role on the link - either as client or
......@@ -172,14 +157,11 @@ const (
// users should always use Handshake which performs protocol handshaking first.
func newNodeLink(conn net.Conn, role LinkRole) *NodeLink {
var nextConnId uint32
var acceptq chan *Conn
switch role &^ linkFlagsMask {
case LinkServer:
nextConnId = 0 // all initiated by us connId will be even
acceptq = make(chan *Conn) // accept queue; TODO use backlog
nextConnId = 0 // all initiated by us connId will be even
case LinkClient:
nextConnId = 1 // ----//---- odd
acceptq = nil // not accepting incoming connections
default:
panic("invalid conn role")
}
......@@ -188,8 +170,9 @@ func newNodeLink(conn net.Conn, role LinkRole) *NodeLink {
peerLink: conn,
connTab: map[uint32]*Conn{},
nextConnId: nextConnId,
acceptq: acceptq, // XXX reenable make(chan *Conn), // accepting initially
acceptq: make(chan *Conn), // XXX +buf
txq: make(chan txReq),
axdown: make(chan struct{}),
down: make(chan struct{}),
}
if role&linkNoRecvSend == 0 {
......@@ -205,7 +188,7 @@ func newNodeLink(conn net.Conn, role LinkRole) *NodeLink {
func (nl *NodeLink) newConn(connId uint32) *Conn {
c := &Conn{nodeLink: nl,
connId: connId,
rxq: make(chan *PktBuf, 1), // NOTE non-blocking - see serveRecv
rxq: make(chan *PktBuf, 1), // NOTE non-blocking - see serveRecv XXX +buf
txerr: make(chan error, 1), // NOTE non-blocking - see Conn.Send
txdown: make(chan struct{}),
rxdown: make(chan struct{}),
......@@ -214,12 +197,12 @@ func (nl *NodeLink) newConn(connId uint32) *Conn {
return c
}
// NewConn creates new connection on top of node-node link
// NewConn creates new connection on top of node-node link.
func (nl *NodeLink) NewConn() (*Conn, error) {
nl.connMu.Lock()
defer nl.connMu.Unlock()
if nl.connTab == nil {
if atomic.LoadUint32(&nl.closed) != 0 {
if atomic.LoadInt32(&nl.closed) != 0 {
return nil, nl.err("newconn", ErrLinkClosed)
}
return nil, nl.err("newconn", ErrLinkDown)
......@@ -245,9 +228,38 @@ func (nl *NodeLink) NewConn() (*Conn, error) {
return c, nil
}
// shutdownAX marks acceptq as no longer operational
func (link *NodeLink) shutdownAX() {
link.axdown1.Do(func() {
close(link.axdown)
// dequeue all connections already queued in link.acceptq
// (once serveRecvs sees link.axdown it won't try to put new connections into
// link.acceptq, but something finite could be already there)
loop:
for {
select {
case conn := <-link.acceptq:
// serveRecv already put at least 1 packet into conn.rxq before putting
// conn into .acceptq - shutting it down will send the error to peer.
conn.shutdownRX(errConnRefused)
// XXX vvv -> better conn.Close() ?
link.connMu.Lock()
delete(link.connTab, conn.connId)
link.connMu.Unlock()
default:
break loop
}
}
})
}
// shutdown closes raw link to peer and marks NodeLink as no longer operational.
// it also shutdowns all opened connections over this node link.
func (nl *NodeLink) shutdown() {
nl.shutdownAX()
nl.downOnce.Do(func() {
close(nl.down)
......@@ -265,18 +277,31 @@ func (nl *NodeLink) shutdown() {
// to return last error from sendPkt to requester.
nl.serveWg.Wait()
// clear + mark down .connTab + shutdown all connections
nl.connMu.Lock()
for _, conn := range nl.connTab {
// NOTE anything waking up on Conn.down must not lock
// connMu - else it will deadlock.
connTab := nl.connTab
nl.connTab = nil
nl.connMu.Unlock()
// conn.shutdown() outside of link.connMu lock
for _, conn := range connTab {
conn.shutdown()
}
nl.connTab = nil // clear + mark down
nl.connMu.Unlock()
}()
})
}
// CloseAccept instructs node link to not accept incoming conections anymore.
//
// Any blocked Accept() will be unblocked and return error.
// The peer will receive "connection refused" if it tries to connect after.
//
// It is safet to call CloseAccept several times.
func (link *NodeLink) CloseAccept() {
atomic.StoreInt32(&link.axclosed, 1)
link.shutdownAX()
}
// Close closes node-node link.
//
// All blocking operations - Accept and IO on associated connections
......@@ -284,7 +309,8 @@ func (nl *NodeLink) shutdown() {
// Underlying raw connection is closed.
// It is safe to call Close several times.
func (nl *NodeLink) Close() error {
atomic.StoreUint32(&nl.closed, 1)
atomic.StoreInt32(&nl.axclosed, 1)
atomic.StoreInt32(&nl.closed, 1)
nl.shutdown()
nl.downWg.Wait()
return nl.err("close", nl.errClose)
......@@ -302,10 +328,31 @@ func (c *Conn) shutdownTX() {
})
}
// shutdownRX marks .rxq as no loner operational
func (c *Conn) shutdownRX(errMsg *Error) {
c.rxdownOnce.Do(func() {
c.errMsg = errMsg
close(c.rxdown)
// dequeue all packets already queued in c.rxq
// (once serveRecv sees c.rxdown it won't try to put new packets into
// c.rxq, but something finite could be already there)
i := 0
loop:
for {
select {
case <-c.rxq:
i++
default:
break loop
}
}
// if something was queued already there - reply "connection closed"
if i != 0 {
go c.replyNoConn()
}
})
}
......@@ -323,28 +370,6 @@ var connKeepClosed = 1*time.Minute
func (c *Conn) CloseRecv() {
atomic.StoreInt32(&c.rxclosed, 1)
c.shutdownRX(errConnClosed)
// FIXME vvv should be active on Close path too and under shutdown() called from link shutdown
// dequeue all packets already queued in c.rxq
// (once serveRecv sees c.rxdown it won't try to put new packets into
// c.rxq, but something finite could be already there)
i := 0
loop:
for {
select {
case <-c.rxq:
i++
default:
break loop
}
}
// if something was queued already there - reply "connection closed"
if i != 0 {
go c.replyNoConn()
}
}
// Close closes connection.
......@@ -358,6 +383,7 @@ loop:
func (c *Conn) Close() error {
nl := c.nodeLink
/*
// adjust nodeLink.connTab
nl.connMu.Lock()
if nl.connTab != nil {
......@@ -370,12 +396,10 @@ func (c *Conn) Close() error {
// "closed" connection into connTab entry for some time to reply
// "connection closed" if another packet comes to it.
} else {
// XXX we do not need to create new connection - enough to put our
// connection into proper state and delete it after some time - right?
cc := nl.newConn(c.connId)
cc.shutdownRX(errConnClosed)
// // cc.closed=1 so that cc is not freed by replyNoConn
// // NOTE cc.down stays not closed so Send could work
// atomic.StoreInt32(&cc.closed, 1)
// cc.errMsg = errConnClosed
time.AfterFunc(connKeepClosed, func() {
nl.connMu.Lock()
delete(nl.connTab, cc.connId)
......@@ -387,34 +411,72 @@ func (c *Conn) Close() error {
}
nl.connMu.Unlock()
*/
atomic.StoreInt32(&c.rxclosed, 1)
atomic.StoreInt32(&c.txclosed, 1)
c.shutdown()
// adjust link.connTab
keep := false
nl.connMu.Lock()
if nl.connTab != nil {
// connection was initiated by us - simply delete - we always
// know if a packet comes to such connection - it is closed.
//
// XXX checking vvv should be possible without connMu lock
if c.connId == nl.nextConnId % 2 {
delete(nl.connTab, c.connId)
// connection was initiated by peer which we accepted.
// it is already shutted down.
// keep connTab entry for it for some time to reply
// "connection closed" if another packet comes to it.
} else {
keep = true
}
}
nl.connMu.Unlock()
if keep {
c.gcOnce.Do(func() {
time.AfterFunc(connKeepClosed, func() {
nl.connMu.Lock()
delete(nl.connTab, c.connId)
nl.connMu.Unlock()
})
})
}
return nil
}
// ---- receive ----
// Accept waits for and accepts incoming connection on top of node-node link.
func (nl *NodeLink) Accept(/*ctx context.Context*/) (c *Conn, err error) {
defer func() {
if err != nil {
err = nl.err("accept", err)
}
}()
// errAcceptShutdownAX returns appropriate error when link.axdown is found ready in Accept
func (link *NodeLink) errAcceptShutdownAX() error {
switch {
case atomic.LoadInt32(&link.closed) != 0:
return ErrLinkClosed
case atomic.LoadInt32(&link.axclosed) != 0:
return ErrLinkNoListen
// this node link is not accepting connections
if nl.acceptq == nil {
return nil, ErrLinkNoListen
default:
// XXX ok? - recheck
return ErrLinkDown
}
}
// Accept waits for and accepts incoming connection on top of node-node link.
func (nl *NodeLink) Accept(/*ctx context.Context*/) (*Conn, error) {
select {
case <-nl.down:
if atomic.LoadUint32(&nl.closed) != 0 {
return nil, ErrLinkClosed
}
return nil, ErrLinkDown
case <-nl.axdown:
return nil, nl.err("accept", nl.errAcceptShutdownAX())
case c := <-nl.acceptq:
return c, nil
// XXX for long-lived links - better to propagate ctx cancel to link.Close to
// lower cases that are run at every select.
......@@ -425,9 +487,6 @@ func (nl *NodeLink) Accept(/*ctx context.Context*/) (c *Conn, err error) {
case <-ctx.Done():
return nil, ctx.Err()
*/
case c := <-nl.acceptq:
return c, nil
}
}
......@@ -437,7 +496,7 @@ func (c *Conn) errRecvShutdown() error {
case atomic.LoadInt32(&c.rxclosed) != 0:
return ErrClosedConn
case atomic.LoadUint32(&c.nodeLink.closed) != 0:
case atomic.LoadInt32(&c.nodeLink.closed) != 0:
return ErrLinkClosed
default:
......@@ -500,6 +559,7 @@ func (nl *NodeLink) serveRecv() {
// resetting it waits for us to finish.
conn := nl.connTab[connId]
tmpclosed := false
if conn == nil {
// "new" connection will be needed in all cases - e.g.
// even temporarily to reply "connection refused"
......@@ -507,31 +567,42 @@ func (nl *NodeLink) serveRecv() {
// message with connid that should be initiated by us
if connId % 2 == nl.nextConnId % 2 {
conn.shutdownRX(errConnClosed)
tmpclosed = true
delete(nl.connTab, conn.connId)
//errTempDown = errConnClosed
// message with connid for a stream initiated by peer
} else {
if nl.acceptq == nil {
conn.shutdownRX(errConnRefused)
/*
if nl.acceptq == nil { // XXX != nil anymore
errTempDown = errConnRefused
} else {
// we are accepting new incoming connection
accept = true
}
*/
accept = true
}
/*
// delete temporary conn from .connTab - this way the
// connection will be automatically garbage-collected
// after its final use.
if !accept {
delete(nl.connTab, conn.connId)
}
*/
}
nl.connMu.Unlock()
if tmpclosed {
conn.shutdownRX(errConnClosed)
}
// don't even try to `conn.rxq <- ...` if .rxdown is ready
// ( else since select is picking random ready variant Recv/serveRecv
// could receive something on rxdown Conn half sometimes )
// could receive something on rxdown Conn sometimes )
rxdown := false
select {
case <-conn.rxdown:
......@@ -566,6 +637,40 @@ func (nl *NodeLink) serveRecv() {
if accept {
// don't even try to `link.acceptq <- ...` if .axdown is ready
// ( else since select is picking random ready variant Accept/serveRecv
// could receive something on axdown Link sometimes )
axdown := false
select {
case <-nl.axdown:
axdown = true
default:
// ok
}
// put conn to .acceptq
if !axdown {
select {
case <-nl.axdown:
axdown = true
case nl.acceptq <- conn:
// ok
}
}
// we are not accepting the connection
if axdown {
conn.shutdownRX(errConnRefused)
nl.connMu.Lock()
delete(nl.connTab, conn.connId)
nl.connMu.Unlock()
}
/*
select {
case <-nl.down:
// Accept and loop calling it can exit if shutdown was requested
......@@ -581,6 +686,7 @@ func (nl *NodeLink) serveRecv() {
case nl.acceptq <- conn:
// ok
}
*/
}
}
}
......@@ -613,7 +719,7 @@ func (c *Conn) errSendShutdown() error {
// NodeLink was closed/shutdowned itself - on actual IO problems corresponding
// error is delivered to particular Send that caused it.
case atomic.LoadUint32(&c.nodeLink.closed) != 0:
case atomic.LoadInt32(&c.nodeLink.closed) != 0:
return ErrLinkClosed
default:
......
......@@ -168,6 +168,8 @@ func nodeLinkPipe() (nl1, nl2 *NodeLink) {
func TestNodeLink(t *testing.T) {
// TODO catch exception -> add proper location from it -> t.Fatal (see git-backup)
println("000")
// Close vs recvPkt
nl1, nl2 := _nodeLinkPipe(linkNoRecvSend, linkNoRecvSend)
wg := &xsync.WorkGroup{}
......@@ -182,6 +184,8 @@ func TestNodeLink(t *testing.T) {
xwait(wg)
xclose(nl2)
println("222")
// Close vs sendPkt
nl1, nl2 = _nodeLinkPipe(linkNoRecvSend, linkNoRecvSend)
wg = &xsync.WorkGroup{}
......@@ -204,18 +208,23 @@ func TestNodeLink(t *testing.T) {
tdelay()
xclose(nl2)
})
println("222 + 1")
c, err := nl2.Accept()
if !(c == nil && xlinkError(err) == ErrLinkClosed) {
t.Fatalf("NodeLink.Accept() after close: conn = %v, err = %v", c, err)
}
println("222 + 2")
// nl1 is not accepting connections - because it has LinkClient role
// check Accept behaviour.
c, err = nl1.Accept()
if !(c == nil && xlinkError(err) == ErrLinkNoListen) {
t.Fatalf("NodeLink.Accept() on non-listening node link: conn = %v, err = %v", c, err)
}
println("222 + 3")
xclose(nl1)
println("333")
// Close vs recvPkt on another side
nl1, nl2 = _nodeLinkPipe(linkNoRecvSend, linkNoRecvSend)
wg = &xsync.WorkGroup{}
......
......@@ -160,6 +160,8 @@ type Listener interface {
// On success returned are:
// - primary link connection which carried identification
// - requested identification packet
//
// XXX Conn, RequestIdentification -> Request
Accept(ctx context.Context) (*Conn, *RequestIdentification, error)
}
......
......@@ -368,6 +368,20 @@ func TestMasterStorage(t *testing.T) {
YourUUID: neo.UUID(neo.CLIENT, 1),
}))
// XXX C <- M NotifyNodeInformation C1,M1,S1
// C asks M about PT
tc.Expect(conntx("c:1", "m:3", 3, &neo.AskPartitionTable{}))
tc.Expect(conntx("m:3", "c:1", 3, &neo.AnswerPartitionTable{
PTid: 1,
RowList: []neo.RowInfo{
{0, []neo.CellInfo{{neo.UUID(neo.STORAGE, 1), neo.UP_TO_DATE}}},
},
}))
_ = C
......
......@@ -51,10 +51,12 @@ type Master struct {
// master manages node and partition tables and broadcast their updates
// to all nodes in cluster
// XXX dup from .node - kill here
///*
stateMu sync.RWMutex // XXX recheck: needed ?
nodeTab *neo.NodeTable
partTab *neo.PartitionTable // XXX ^ is also in node
partTab *neo.PartitionTable
clusterState neo.ClusterState
//*/
......@@ -196,6 +198,20 @@ func (m *Master) Run(ctx context.Context) (err error) {
continue
}
// for storages the only incoming connection is for RequestIdentification
// and then master only drives it. So close accept as noone will be
// listening for it on your side anymore.
switch idReq.NodeType {
case neo.CLIENT:
// ok
case neo.STORAGE:
fallthrough
default:
l.CloseAccept()
}
// handover to main driver
select {
case m.nodeCome <- nodeCome{conn, idReq}:
// ok
......@@ -318,7 +334,6 @@ loop:
// new connection comes in
case n := <-m.nodeCome:
node, resp := m.identify(ctx, n, /* XXX only accept storages -> PENDING */)
// XXX set node.State = PENDING
if node == nil {
goreject(ctx, wg, n.conn, resp)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment