connection.go 38.4 KB
Newer Older
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1 2
// Copyright (C) 2016-2017  Nexedi SA and Contributors.
//                          Kirill Smelkov <kirr@nexedi.com>
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
3 4
//
// This program is free software: you can Use, Study, Modify and Redistribute
5
// it under the terms of the GNU General Public License version 3, or (at your
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
6 7
// option) any later version, as published by the Free Software Foundation.
//
8
// You can also Link and Combine this program with other software covered by
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
9 10 11 12
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
13
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
14 15 16 17
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
18
// See https://www.nexedi.com/licensing for rationale and options.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
19 20

package neo
21
// Connection management
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
22 23

import (
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
24
	"context"
25
	"encoding/binary"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
26
	"errors"
27
	"fmt"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
28
	"io"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
29
	"math"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
30
	"net"
31
	"reflect"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
32
	"sync"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
33
	"sync/atomic"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
34
	"time"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
35

36 37
	"github.com/someonegg/gocontainer/rbuf"

38
	"lab.nexedi.com/kirr/neo/go/xcommon/xnet"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
39

40
	"lab.nexedi.com/kirr/go123/xbytes"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
41 42
)

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
43
// NodeLink is a node-node link in NEO
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
44
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
45 46 47 48
// A node-node link represents bidirectional symmetrical communication
// channel in between 2 NEO nodes. The link provides service for packets
// exchange and for multiplexing several communication connections on
// top of the node-node link.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
49
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
50 51
// New connection can be created with .NewConn() . Once connection is
// created and data is sent over it, on peer's side another corresponding
52 53
// new connection can be accepted via .Accept(), and all further communication
// send/receive exchange will be happening in between those 2 connections.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
54
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
55
// A NodeLink has to be explicitly closed, once it is no longer needed.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
56
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
57
// It is safe to use NodeLink from multiple goroutines simultaneously.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
58
type NodeLink struct {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
59
	peerLink net.Conn // raw conn to peer
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
60

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
61
	connPool   connPool
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
62
	connMu     sync.Mutex
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
63 64
	connTab    map[uint32]*Conn // connId -> Conn associated with connId
	nextConnId uint32           // next connId to use for Conn initiated by us
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
65

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
66 67
	serveWg sync.WaitGroup	// for serve{Send,Recv}
	acceptq chan *Conn	// queue of incoming connections for Accept
68 69
	txq	chan txReq	// tx requests from Conns go via here
				// (rx packets are routed to Conn.rxq)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
70

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
71 72 73
	axdown  chan struct{}	// ready when accept is marked as no longer operational
	axdown1 sync.Once	// CloseAccept may be called severall times

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
74 75 76 77
	down     chan struct{}  // ready when NodeLink is marked as no longer operational
	downOnce sync.Once      // shutdown may be due to both Close and IO error
	downWg   sync.WaitGroup // for activities at shutdown
	errClose error          // error got from peerLink.Close
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
78

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
79 80
	errMu    sync.Mutex
	errRecv	 error		// error got from recvPkt on shutdown
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
81

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
82 83
	axclosed int32		// whether CloseAccept was called
	closed   int32		// whether Close was called
84 85

	rxbuf    rbuf.RingBuf	// buffer for reading from peerLink
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
86 87
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
88 89 90 91 92
// Conn is a connection established over NodeLink
//
// Data can be sent and received over it.
// Once connection is no longer needed it has to be closed.
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
93
// It is safe to use Conn from multiple goroutines simultaneously.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
94
type Conn struct {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
95
	link      *NodeLink
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
96
	connId    uint32
97 98 99 100 101

	rxq	   chan *PktBuf	 // received packets for this Conn go here
	rxqActive  int32	 // atomic: 1 while serveRecv is doing `rxq <- ...`
	rxdownFlag int32	 // atomic: 1 when RX is marked no longer operational

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
102 103
	rxerrOnce sync.Once     // rx error is reported only once - then it is link down or closed XXX !light?
	errMsg	  *Error	// error message for peer if rx is down	XXX try to do without it
104

105
	txerr     chan error	// transmit results for this Conn go back here
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
106

107 108 109
	txdown     chan struct{} // ready when Conn TX is marked as no longer operational
	txdownOnce sync.Once	 // tx shutdown may be called by both Close and nodelink.shutdown
	txclosed  int32		// whether CloseSend was called
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
110

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
111 112 113 114 115 116 117 118 119 120 121 122 123 124 125
	// there are two modes a Conn could be used:
	// - full mode - where full Conn functionality is working, and
	// - light mode - where only subset functionality is working
	//
	// the light mode is used to implement Recv1 & friends - there any
	// connection is used max to send and/or receive only 1 packet and then
	// has to be reused for efficiency ideally without reallocating anything.
	//
	// everything below is used during !light mode only.

	rxdown     chan struct{} // ready when RX is marked no longer operational
	rxdownOnce sync.Once	 // ----//----	XXX review
	rxclosed  int32		 // whether CloseRecv was called


126 127 128 129
	// closing Conn is shutdown + some cleanup work to remove it from
	// link.connTab including arming timers etc. Let this work be spawned only once.
	// (for Conn.Close to be valid called several times)
	closeOnce sync.Once
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
130 131
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162
// connPool is free-list for Conn
type connPool struct {
	sync.Pool
}

New := func() *Conn {
	return &Conn{
		rxq:    make(chan *PktBuf, 1),	// NOTE non-blocking - see serveRecv XXX +buf

}

func (p *connPool) Get() *Conn {
	c := p.Pool.Get().(*Conn)
	c.reinit()
	return c
}

func (c *Conn) reinit() {
	c.connId = 0
	c.rxqActive = 0
	c.rxdownFlag = 0
	// XXX rxerr*
	// XXX errMsg

	// XXX more
}

func (p *connPool) Put(c *Conn) {
	p.Pool.Put(c)
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
163

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
164 165
var ErrLinkClosed   = errors.New("node link is closed")	// operations on closed NodeLink
var ErrLinkDown     = errors.New("node link is down")	// e.g. due to IO error
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
166
var ErrLinkNoListen = errors.New("node link is not listening for incoming connections")
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
167
var ErrLinkManyConn = errors.New("too many opened connections")
168
var ErrClosedConn   = errors.New("connection is closed")
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
169

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
170
// LinkError is returned by NodeLink operations
171 172 173 174 175 176
type LinkError struct {
	Link *NodeLink
	Op   string
	Err  error
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
177
// ConnError is returned by Conn operations
178
type ConnError struct {
179
	Conn *Conn	// XXX Conn's are reused -> connId/link explicitly?
180 181 182
	Op   string
	Err  error
}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
183

184
// LinkRole is a role an end of NodeLink is intended to play
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
185
type LinkRole int
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
186
const (
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
187 188
	LinkServer LinkRole = iota // link created as server
	LinkClient                 // link created as client
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
189 190 191 192

	// for testing:
	linkNoRecvSend LinkRole = 1 << 16 // do not spawn serveRecv & serveSend
	linkFlagsMask  LinkRole = (1<<32 - 1) << 16
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
193 194
)

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
195
// newNodeLink makes a new NodeLink from already established net.Conn
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
196
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
197
// Role specifies how to treat our role on the link - either as client or
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
198
// server. The difference in between client and server roles is in:
199
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
200
//    how connection ids are allocated for connections initiated at our side:
201 202 203
//    there is no conflict in identifiers if one side always allocates them as
//    even (server) and its peer as odd (client).
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
204 205
// Usually server role should be used for connections created via
// net.Listen/net.Accept and client role for connections created via net.Dial.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
206 207 208
//
// Though it is possible to wrap just-established raw connection into NodeLink,
// users should always use Handshake which performs protocol handshaking first.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
209
func newNodeLink(conn net.Conn, role LinkRole) *NodeLink {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
210
	var nextConnId uint32
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
211
	switch role &^ linkFlagsMask {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
212
	case LinkServer:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
213
		nextConnId = 0 // all initiated by us connId will be even
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
214
	case LinkClient:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
215
		nextConnId = 1 // ----//---- odd
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
216 217 218 219
	default:
		panic("invalid conn role")
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
220
	nl := &NodeLink{
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
221 222 223
		peerLink:   conn,
		connTab:    map[uint32]*Conn{},
		nextConnId: nextConnId,
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
224
		acceptq:    make(chan *Conn),	// XXX +buf
225
		txq:        make(chan txReq),
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
226
		axdown:     make(chan struct{}),
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
227
		down:       make(chan struct{}),
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
228
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
229
	if role&linkNoRecvSend == 0 {
230
		nl.serveWg.Add(2)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
231
		go nl.serveRecv()
232
		go nl.serveSend()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
233
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
234
	return nl
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
235 236
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
237 238 239
// newConn creates new Conn with id=connId and registers it into connTab.
// Must be called with connMu held.
func (nl *NodeLink) newConn(connId uint32) *Conn {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
240
	c := &Conn{link: nl,
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
241
		connId: connId,
Kirill Smelkov's avatar
Kirill Smelkov committed
242
		rxq:    make(chan *PktBuf, 1),	// NOTE non-blocking - see serveRecv XXX +buf
243 244
		txerr:  make(chan error, 1),	// NOTE non-blocking - see Conn.Send
		txdown: make(chan struct{}),
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
245
		rxdown: make(chan struct{}),
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
246 247 248 249 250
	}
	nl.connTab[connId] = c
	return c
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
251
// NewConn creates new connection on top of node-node link.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
252
func (nl *NodeLink) NewConn() (*Conn, error) {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
253 254 255
	nl.connMu.Lock()
	defer nl.connMu.Unlock()
	if nl.connTab == nil {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
256
		if atomic.LoadInt32(&nl.closed) != 0 {
257
			return nil, nl.err("newconn", ErrLinkClosed)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
258
		}
259
		return nil, nl.err("newconn", ErrLinkDown)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
260
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
261 262 263 264 265 266 267 268 269 270 271 272 273 274 275

	// nextConnId could wrap around uint32 limits - find first free slot to
	// not blindly replace existing connection
	for i := uint32(0) ;; i++ {
		_, exists := nl.connTab[nl.nextConnId]
		if !exists {
			break
		}
		nl.nextConnId += 2

		if i > math.MaxUint32 / 2 {
			return nil, nl.err("newconn", ErrLinkManyConn)
		}
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
276 277
	c := nl.newConn(nl.nextConnId)
	nl.nextConnId += 2
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
278

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
279
	return c, nil
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
280 281
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308
// shutdownAX marks acceptq as no longer operational
func (link *NodeLink) shutdownAX() {
	link.axdown1.Do(func() {
		close(link.axdown)

		// dequeue all connections already queued in link.acceptq
		// (once serveRecvs sees link.axdown it won't try to put new connections into
		//  link.acceptq, but something finite could be already there)
loop:
		for {
			select {
			case conn := <-link.acceptq:
				// serveRecv already put at least 1 packet into conn.rxq before putting
				// conn into .acceptq - shutting it down will send the error to peer.
				conn.shutdownRX(errConnRefused)

				link.connMu.Lock()
				delete(link.connTab, conn.connId)
				link.connMu.Unlock()

			default:
				break loop
			}
		}
	})
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
309
// shutdown closes raw link to peer and marks NodeLink as no longer operational.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
310
// it also shutdowns all opened connections over this node link.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
311
func (nl *NodeLink) shutdown() {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
312
	nl.shutdownAX()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
313
	nl.downOnce.Do(func() {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
314
		close(nl.down)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
315

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
316 317
		// close actual link to peer. this will wakeup {send,recv}Pkt
		// NOTE we need it here so that e.g. aborting on error in serveSend wakes up serveRecv
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
318
		nl.errClose = nl.peerLink.Close()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
319

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
320
		nl.downWg.Add(1)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
321
		go func() {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
322
			defer nl.downWg.Done()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
323

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
324 325 326 327
			// wait for serve{Send,Recv} to complete before shutting connections down
			//
			// we have to do it so that e.g. serveSend has chance
			// to return last error from sendPkt to requester.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
328 329
			nl.serveWg.Wait()

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
330
			// clear + mark down .connTab + shutdown all connections
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
331
			nl.connMu.Lock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
332 333 334 335 336 337
			connTab := nl.connTab
			nl.connTab = nil
			nl.connMu.Unlock()

			// conn.shutdown() outside of link.connMu lock
			for _, conn := range connTab {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
338
				conn.shutdown()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
339 340
			}
		}()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
341 342 343
	})
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
344 345 346 347 348 349 350 351 352 353 354
// CloseAccept instructs node link to not accept incoming conections anymore.
//
// Any blocked Accept() will be unblocked and return error.
// The peer will receive "connection refused" if it tries to connect after.
//
// It is safet to call CloseAccept several times.
func (link *NodeLink) CloseAccept() {
	atomic.StoreInt32(&link.axclosed, 1)
	link.shutdownAX()
}

355
// Close closes node-node link.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
356
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
357 358
// All blocking operations - Accept and IO on associated connections
// established over node link - are automatically interrupted with an error.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
359
// Underlying raw connection is closed.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
360
// It is safe to call Close several times.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
361
func (nl *NodeLink) Close() error {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
362 363
	atomic.StoreInt32(&nl.axclosed, 1)
	atomic.StoreInt32(&nl.closed, 1)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
364
	nl.shutdown()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
365
	nl.downWg.Wait()
366
	return nl.err("close", nl.errClose)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
367 368
}

369
// shutdown marks connection as no longer operational and interrupts Send and Recv.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
370
func (c *Conn) shutdown() {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
371 372 373 374 375
	c.shutdownTX()
	c.shutdownRX(errConnClosed)
}

func (c *Conn) shutdownTX() {
376 377 378
	c.txdownOnce.Do(func() {
		close(c.txdown)
	})
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
379 380
}

381
// shutdownRX marks .rxq as no loner operational and interrupts Recv.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
382 383
func (c *Conn) shutdownRX(errMsg *Error) {
	c.rxdownOnce.Do(func() {
384
		close(c.rxdown)	// wakeup Conn.Recv
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
385
		c.downRX(errMsg)
386 387 388
	})
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
389 390 391
// downRX marks .rxq as no longer operational.
func (c *Conn) downRX(errMsg *Error) {
	c.errMsg = errMsg
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
392
	atomic.StoreInt32(&c.rxdownFlag, 1)	// XXX cmpxchg and return if already down?
393 394 395 396 397

	// dequeue all packets already queued in c.rxq
	// (once serveRecv sees c.rxdown it won't try to put new packets into
	//  c.rxq, but something finite could be already there)
	i := 0
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
398
loop:
399 400 401 402
	for {
		select {
		case <-c.rxq:
			i++
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
403

404 405
		default:
			if atomic.LoadInt32(&c.rxqActive) == 0 && len(c.rxq) == 0 {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
406 407 408
				break loop
			}
		}
409
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
410

411 412 413 414
	// if something was queued already there - reply "connection closed"
	if i != 0 {
		go c.replyNoConn()
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
415 416
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
417

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
418
// time to keep record of a closed connection so that we can properly reply
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
419
// "connection closed" if a packet comes in with same connID.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
420 421
var connKeepClosed = 1*time.Minute

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439
// release releases connection to freelist.
//
// No Send or Recv must be in flight.
// The caller must not use c after call to release.
func (c *Conn) release() {
	nl := c.link
	nl.connMu.Lock()
	if nl.connTab != nil {
		// XXX find way to keep initiated by us conn as closed for some time (see Conn.Close)
		// but timer costs too much...
		delete(nl.connTab, c.connId)
	}
	nl.connMu.Unlock()

	// XXX just in case
	c.reinit()
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
440 441 442
// CloseRecv closes reading end of connection.
//
// Any blocked Recv*() will be unblocked and return error.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
443
// The peer will receive "connection closed" if it tries to send anything after.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
444 445 446
//
// It is safe to call CloseRecv several times.
func (c *Conn) CloseRecv() {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
447 448
	atomic.StoreInt32(&c.rxclosed, 1)
	c.shutdownRX(errConnClosed)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
449 450
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
451
// Close closes connection.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
452 453
//
// Any blocked Send*() or Recv*() will be unblocked and return error.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
454 455
//
// NOTE for Send() - once transmission was started - it will complete in the
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
456
// background on the wire not to break node-node link framing.
457
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
458
// It is safe to call Close several times.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
459
func (c *Conn) Close() error {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
460
	nl := c.link
461 462
	c.closeOnce.Do(func() {
		atomic.StoreInt32(&c.rxclosed, 1)
463
		atomic.StoreInt32(&c.txclosed, 1)
464
		c.shutdown()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
465

466 467 468 469 470 471 472 473 474 475
		// adjust link.connTab
		var tmpclosed *Conn
		nl.connMu.Lock()
		if nl.connTab != nil {
			// connection was initiated by us - simply delete - we always
			// know if a packet comes to such connection - it is closed.
			//
			// XXX checking vvv should be possible without connMu lock
			if c.connId == nl.nextConnId % 2 {
				delete(nl.connTab, c.connId)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
476

477 478 479 480 481 482 483
			// connection was initiated by peer which we accepted - put special
			// "closed" connection into connTab entry for some time to reply
			// "connection closed" if another packet comes to it.
			//
			// ( we cannot reuse same connection since after it is marked as
			//   closed Send refuses to work )
			} else {
484 485 486 487 488
				// delete(nl.connTab, c.connId)
				// XXX vvv was temp. disabled - costs a lot in 1req=1conn model

				// c implicitly goes away from connTab
				tmpclosed = nl.newConn(c.connId)
489
			}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
490
		}
491
		nl.connMu.Unlock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
492

493 494
		if tmpclosed != nil {
			tmpclosed.shutdownRX(errConnClosed)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
495 496 497 498 499 500

			time.AfterFunc(connKeepClosed, func() {
				nl.connMu.Lock()
				delete(nl.connTab, c.connId)
				nl.connMu.Unlock()
			})
501 502
		}
	})
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
503

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
504
	return nil
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
505 506
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
507 508
// ---- receive ----

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
509 510 511 512 513 514 515 516
// errAcceptShutdownAX returns appropriate error when link.axdown is found ready in Accept
func (link *NodeLink) errAcceptShutdownAX() error {
	switch {
	case atomic.LoadInt32(&link.closed) != 0:
		return ErrLinkClosed

	case atomic.LoadInt32(&link.axclosed) != 0:
		return ErrLinkNoListen
517

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
518 519 520
	default:
		// XXX ok? - recheck
		return ErrLinkDown
521
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
522
}
523

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
524 525
// Accept waits for and accepts incoming connection on top of node-node link.
func (nl *NodeLink) Accept(/*ctx context.Context*/) (*Conn, error) {
526
	select {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
527 528 529 530 531
	case <-nl.axdown:
		return nil, nl.err("accept", nl.errAcceptShutdownAX())

	case c := <-nl.acceptq:
		return c, nil
532

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
533 534 535 536
// XXX for long-lived links - better to propagate ctx cancel to link.Close to
// lower cases that are run at every select.
//
// XXX see xio.CloseWhenDone() for helper for this.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
537
/*
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
538 539 540
	// XXX ctx cancel tests
	case <-ctx.Done():
		return nil, ctx.Err()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
541
*/
542 543 544
	}
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
545
// errRecvShutdown returns appropriate error when c.rxdown is found ready in recvPkt
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
546
func (c *Conn) errRecvShutdown() error {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
547
	switch {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
548
	case atomic.LoadInt32(&c.rxclosed) != 0:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
549 550
		return ErrClosedConn

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
551
	case atomic.LoadInt32(&c.link.closed) != 0:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
552 553 554
		return ErrLinkClosed

	default:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
555 556
		// we have to check what was particular RX error on nodelink shutdown
		// only do that once - after reporting RX error the first time
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
557 558 559
		// tell client the node link is no longer operational.
		var err error
		c.rxerrOnce.Do(func() {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
560 561 562
			c.link.errMu.Lock()
			err = c.link.errRecv
			c.link.errMu.Unlock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
563 564
		})
		if err == nil {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
565
			err = ErrLinkDown
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
566 567 568 569 570
		}
		return err
	}
}

571 572
// recvPkt receives raw packet from connection
func (c *Conn) recvPkt() (*PktBuf, error) {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
573
	select {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
574
	case <-c.rxdown:
575
		return nil, c.err("recv", c.errRecvShutdown())
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
576

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
577 578
	case pkt := <-c.rxq:
		return pkt, nil
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
579 580
	}
}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
581

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
582
// serveRecv handles incoming packets routing them to either appropriate
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
583 584
// already-established connection or, if node link is accepting incoming
// connections, to new connection put to accept queue.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
585
func (nl *NodeLink) serveRecv() {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
586
	defer nl.serveWg.Done()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
587 588
	for {
		// receive 1 packet
589
		// XXX if nl.peerLink was just closed by tx->shutdown we'll get ErrNetClosing
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
590
		pkt, err := nl.recvPkt()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
591
		//fmt.Printf("recvPkt -> %v, %v\n", pkt, err)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
592
		if err != nil {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
593
			// on IO error framing over peerLink becomes broken
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
594
			// so we shut down node link and all connections over it.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
595

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
596 597 598
			nl.errMu.Lock()
			nl.errRecv = err
			nl.errMu.Unlock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
599

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
600
			nl.shutdown()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
601
			return
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
602 603
		}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
604 605
		// pkt.ConnId -> Conn
		connId := ntoh32(pkt.Header().ConnId)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
606
		accept := false
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
607 608

		nl.connMu.Lock()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
609

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
610
		// connTab is never nil here - because shutdown before
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
611
		// resetting it waits for us to finish.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
612 613
		conn := nl.connTab[connId]

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
614
		tmpclosed := false
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
615
		if conn == nil {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
616
			// "new" connection will be needed in all cases - e.g.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
617
			// even temporarily to reply "connection refused"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
618 619
			conn = nl.newConn(connId)

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
620 621
			// message with connid that should be initiated by us
			if connId % 2 == nl.nextConnId % 2 {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
622 623
				tmpclosed = true
				delete(nl.connTab, conn.connId)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
624 625

			// message with connid for a stream initiated by peer
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
626
			// it will be considered to be accepted (not if .axdown)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
627
			} else {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
628
				accept = true
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
629
			}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
630
		}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
631 632 633

		nl.connMu.Unlock()

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
634 635 636 637
		if tmpclosed {
			conn.shutdownRX(errConnClosed)
		}

638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657
		// route packet to serving goroutine handler
		//
		// TODO backpressure when Recv is not keeping up with Send on peer side?
		//      (not to let whole link starve because of one connection)
		//
		// NOTE rxq must be buffered with at least 1 element so that
		// queuing pkt succeeds for incoming connection that is not yet
		// there in acceptq.
		atomic.StoreInt32(&conn.rxqActive, 1)
		rxdown := atomic.LoadInt32(&conn.rxdownFlag) != 0
		if !rxdown {
			conn.rxq <- pkt
		}
		atomic.StoreInt32(&conn.rxqActive, 0)

/*
		// XXX goes away in favour of .rxdownFlag; reasons
		// - no need to reallocate rxdown for light conn
		// - no select

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
658
		// don't even try `conn.rxq <- ...` if conn.rxdown is ready
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
659
		// ( else since select is picking random ready variant Recv/serveRecv
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
660
		//   could receive something on rxdown Conn sometimes )
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
661 662 663 664 665 666 667
		rxdown := false
		select {
		case <-conn.rxdown:
			rxdown = true
		default:
			// ok
		}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
668

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
669
		// route packet to serving goroutine handler
670 671
		//
		// TODO backpressure when Recv is not keeping up with Send on peer side?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
672
		//      (not to let whole link starve because of one connection)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
673 674 675 676
		//
		// NOTE rxq must be buffered with at least 1 element so that
		// queuing pkt succeeds for incoming connection that is not yet
		// there in acceptq.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
677
		if !rxdown {
678
			// XXX can avoid select here: if conn closer cares to drain rxq (?)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
679 680 681 682 683 684 685 686
			select {
			case <-conn.rxdown:
				rxdown = true

			case conn.rxq <- pkt:
				// ok
			}
		}
687
*/
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
688 689 690 691 692 693

		// we are not accepting packet in any way
		if rxdown {
			go conn.replyNoConn()
			continue
		}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
694

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
695

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
696
		// this packet established new connection - try to accept it
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
697
		if accept {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
698
			// don't even try `link.acceptq <- ...` if link.axdown is ready
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
699 700 701 702 703 704 705 706 707 708 709 710 711
			// ( else since select is picking random ready variant Accept/serveRecv
			//   could receive something on axdown Link sometimes )
			axdown := false
			select {
			case <-nl.axdown:
				axdown = true

			default:
				// ok
			}

			// put conn to .acceptq
			if !axdown {
712
				// XXX can avoid select here if shutdownAX cares to drain acceptq (?)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728
				select {
				case <-nl.axdown:
					axdown = true

				case nl.acceptq <- conn:
					// ok
				}
			}

			// we are not accepting the connection
			if axdown {
				conn.shutdownRX(errConnRefused)
				nl.connMu.Lock()
				delete(nl.connTab, conn.connId)
				nl.connMu.Unlock()
			}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
729
		}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
730 731 732
	}
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
733 734 735 736 737 738
// ---- network replies for closed / refused connections ----

var errConnClosed  = &Error{PROTOCOL_ERROR, "connection closed"}
var errConnRefused = &Error{PROTOCOL_ERROR, "connection refused"}

// replyNoConn sends error message to peer when a packet was sent to closed / nonexistent connection
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
739
func (c *Conn) replyNoConn() {
740
	//fmt.Printf("%v: -> replyNoConn %v\n", c, c.errMsg)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
741
	c.Send(c.errMsg) // ignore errors
742
	//fmt.Printf("%v: replyNoConn(%v) -> %v\n", c, c.errMsg, err)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
743 744 745
}

// ---- transmit ----
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
746

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
747
// txReq is request to transmit a packet. Result error goes back to errch
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
748
type txReq struct {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
749
	pkt   *PktBuf
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
750 751 752
	errch chan error
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
753
// errSendShutdown returns appropriate error when c.txdown is found ready in Send
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
754
func (c *Conn) errSendShutdown() error {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
755
	switch {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
756
	case atomic.LoadInt32(&c.txclosed) != 0:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
757 758 759
		return ErrClosedConn

	// the only other error possible besides Conn being .Close()'ed is that
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
760
	// NodeLink was closed/shutdowned itself - on actual IO problems corresponding
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
761 762
	// error is delivered to particular Send that caused it.

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
763
	case atomic.LoadInt32(&c.link.closed) != 0:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
764 765 766
		return ErrLinkClosed

	default:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
767
		return ErrLinkDown
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
768 769 770
	}
}

Kirill Smelkov's avatar
Kirill Smelkov committed
771 772 773
// sendPkt sends raw packet via connection.
//
// on success pkt is freed.
774 775
func (c *Conn) sendPkt(pkt *PktBuf) error {
	err := c.sendPkt2(pkt)
776 777 778
	return c.err("send", err)
}

779
// XXX serveSend is not needed - Conn.Write already can be used by multiple
Kirill Smelkov's avatar
Kirill Smelkov committed
780 781 782 783
// goroutines simultaneously and works atomically; (same for Conn.Read etc - see pool.FD)
// https://github.com/golang/go/blob/go1.9-3-g099336988b/src/net/net.go#L109
// https://github.com/golang/go/blob/go1.9-3-g099336988b/src/internal/poll/fd_unix.go#L14

784
/*
Kirill Smelkov's avatar
Kirill Smelkov committed
785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800
func (c *Conn) sendPkt2(pkt *PktBuf) error {
	// set pkt connId associated with this connection
	pkt.Header().ConnId = hton32(c.connId)

	// XXX if n.peerLink was just closed by rx->shutdown we'll get ErrNetClosing
	err := c.link.sendPkt(pkt)
	//fmt.Printf("sendPkt -> %v\n", err)

	// on IO error framing over peerLink becomes broken
	// so we shut down node link and all connections over it.
	if err != nil {
		c.link.shutdown()
	}

	return err
}
801
*/
Kirill Smelkov's avatar
Kirill Smelkov committed
802

803
///*
804
func (c *Conn) sendPkt2(pkt *PktBuf) error {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
805 806 807 808 809
	// set pkt connId associated with this connection
	pkt.Header().ConnId = hton32(c.connId)
	var err error

	select {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
810
	case <-c.txdown:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
811
		return c.errSendShutdown()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
812

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
813
	case c.link.txq <- txReq{pkt, c.txerr}:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
814 815 816 817
		select {
		// tx request was sent to serveSend and is being transmitted on the wire.
		// the transmission may block for indefinitely long though and
		// we cannot interrupt it as the only way to interrupt is
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
818
		// .link.Close() which will close all other Conns.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
819
		//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
820
		// That's why we are also checking for c.txdown while waiting
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
821 822 823 824
		// for reply from serveSend (and leave pkt to finish transmitting).
		//
		// NOTE after we return straight here serveSend won't be later
		// blocked on c.txerr<- because that backchannel is a non-blocking one.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
825
		case <-c.txdown:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
826 827

			// also poll c.txerr here because: when there is TX error,
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
828
			// serveSend sends to c.txerr _and_ closes c.txdown .
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
829 830 831
			// We still want to return actual transmission error to caller.
			select {
			case err = <-c.txerr:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
832
				return err
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
833
			default:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
834
				return c.errSendShutdown()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
835 836 837
			}

		case err = <-c.txerr:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
838
			return err
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
839 840 841 842
		}
	}
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
843 844 845
// serveSend handles requests to transmit packets from client connections and
// serially executes them over associated node link.
func (nl *NodeLink) serveSend() {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
846
	defer nl.serveWg.Done()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
847 848
	for {
		select {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
849
		case <-nl.down:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
850 851 852
			return

		case txreq := <-nl.txq:
853
			// XXX if n.peerLink was just closed by rx->shutdown we'll get ErrNetClosing
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
854
			err := nl.sendPkt(txreq.pkt)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
855
			//fmt.Printf("sendPkt -> %v\n", err)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
856

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
857 858 859
			// FIXME if several goroutines call conn.Send
			// simultaneously - c.txerr even if buffered(1) will be
			// overflown and thus deadlock here.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
860
			txreq.errch <- err
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
861

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
862 863
			// on IO error framing over peerLink becomes broken
			// so we shut down node link and all connections over it.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
864
			if err != nil {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
865
				nl.shutdown()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
866 867
				return
			}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
868 869 870
		}
	}
}
871
//*/
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
872

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
873

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
874
// ---- raw IO ----
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
875

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
876
const dumpio = false
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
877

Kirill Smelkov's avatar
Kirill Smelkov committed
878 879 880 881 882
// sendPkt sends raw packet to peer.
//
// tx error, if any, is returned as is and is analyzed in serveSend.
//
// XXX pkt should be freed always or only on error?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
883
func (nl *NodeLink) sendPkt(pkt *PktBuf) error {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
884
	if dumpio {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
885
		// XXX -> log
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
886
		fmt.Printf("%v > %v: %v\n", nl.peerLink.LocalAddr(), nl.peerLink.RemoteAddr(), pkt)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
887 888
		//defer fmt.Printf("\t-> sendPkt err: %v\n", err)
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
889

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
890 891
	// NOTE Write writes data in full, or it is error
	_, err := nl.peerLink.Write(pkt.Data)
Kirill Smelkov's avatar
Kirill Smelkov committed
892
	pkt.Free()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
893 894
	return err
}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
895

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
896
var ErrPktTooBig = errors.New("packet too big")
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
897

Kirill Smelkov's avatar
Kirill Smelkov committed
898 899
// recvPkt receives raw packet from peer.
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
900 901
// rx error, if any, is returned as is and is analyzed in serveRecv
func (nl *NodeLink) recvPkt() (*PktBuf, error) {
Kirill Smelkov's avatar
Kirill Smelkov committed
902
	pkt := pktAlloc(4096)
903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921
	// len=4K but cap can be more since pkt is from pool - use all space to buffer reads
	// XXX vvv -> pktAlloc() ?
	data := pkt.Data[:cap(pkt.Data)]

	n := 0 // number of pkt bytes obtained so far

	// next packet could be already prefetched in part by previous read
	if nl.rxbuf.Len() > 0 {
		δn, _ := nl.rxbuf.Read(data[:pktHeaderLen])
		n += δn
	}

	// first read to read pkt header and hopefully rest of packet in 1 syscall
	if n < pktHeaderLen {
		δn, err := io.ReadAtLeast(nl.peerLink, data[n:], pktHeaderLen - n)
		if err != nil {
			return nil, err
		}
		n += δn
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
922 923
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
924
	pkth := pkt.Header()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
925

926
	pktLen := int(pktHeaderLen + ntoh32(pkth.MsgLen)) // whole packet length
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
927
	if pktLen > pktMaxSize {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
928 929
		return nil, ErrPktTooBig
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
930

931 932 933 934 935 936 937 938 939
	// resize data if we don't have enough room in it
	data = xbytes.Resize(data, pktLen)
	data = data[:cap(data)]

	// we might have more data already prefetched in rxbuf
	if nl.rxbuf.Len() > 0 {
		δn, _ := nl.rxbuf.Read(data[n:pktLen])
		n += δn
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
940

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
941
	// read rest of pkt data, if we need to
942 943
	if n < pktLen {
		δn, err := io.ReadAtLeast(nl.peerLink, data[n:], pktLen - n)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
944 945 946
		if err != nil {
			return nil, err
		}
947
		n += δn
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
948
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
949

950 951 952 953 954 955 956 957 958
	// put overread data into rxbuf for next reader
	if n > pktLen {
		nl.rxbuf.Write(data[pktLen:n])
	}

	// fixup data/pkt
	data = data[:n]
	pkt.Data = data

Kirill Smelkov's avatar
Kirill Smelkov committed
959
	if /* XXX temp show only tx */ true && dumpio {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
960
		// XXX -> log
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
961
		fmt.Printf("%v < %v: %v\n", nl.peerLink.LocalAddr(), nl.peerLink.RemoteAddr(), pkt)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
962 963 964
	}

	return pkt, nil
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
965 966 967
}


Kirill Smelkov's avatar
Kirill Smelkov committed
968 969
// ---- Handshake ----

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
970 971 972
// Handshake performs NEO protocol handshake just after raw connection between 2 nodes was established.
// On success raw connection is returned wrapped into NodeLink.
// On error raw connection is closed.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
973
func Handshake(ctx context.Context, conn net.Conn, role LinkRole) (nl *NodeLink, err error) {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
974
	err = handshake(ctx, conn, ProtocolVersion)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
975 976 977 978 979
	if err != nil {
		return nil, err
	}

	// handshake ok -> NodeLink
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
980
	return newNodeLink(conn, role), nil
Kirill Smelkov's avatar
Kirill Smelkov committed
981 982
}

983 984 985
// HandshakeError is returned when there is an error while performing handshake
type HandshakeError struct {
	// XXX just keep .Conn? (but .Conn can be closed)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
986 987 988
	LocalAddr  net.Addr
	RemoteAddr net.Addr
	Err        error
989 990 991 992 993 994
}

func (e *HandshakeError) Error() string {
	return fmt.Sprintf("%s - %s: handshake: %s", e.LocalAddr, e.RemoteAddr, e.Err.Error())
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
995
func handshake(ctx context.Context, conn net.Conn, version uint32) (err error) {
Kirill Smelkov's avatar
Kirill Smelkov committed
996 997
	errch := make(chan error, 2)

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
998 999 1000
	// tx handshake word
	txWg := sync.WaitGroup{}
	txWg.Add(1)
Kirill Smelkov's avatar
Kirill Smelkov committed
1001 1002
	go func() {
		var b [4]byte
1003
		binary.BigEndian.PutUint32(b[:], version) // XXX -> hton32 ?
Kirill Smelkov's avatar
Kirill Smelkov committed
1004 1005 1006
		_, err := conn.Write(b[:])
		// XXX EOF -> ErrUnexpectedEOF ?
		errch <- err
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1007
		txWg.Done()
Kirill Smelkov's avatar
Kirill Smelkov committed
1008 1009
	}()

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1010 1011

	// rx handshake word
Kirill Smelkov's avatar
Kirill Smelkov committed
1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026
	go func() {
		var b [4]byte
		_, err := io.ReadFull(conn, b[:])
		if err == io.EOF {
			err = io.ErrUnexpectedEOF // can be returned with n = 0
		}
		if err == nil {
			peerVersion := binary.BigEndian.Uint32(b[:]) // XXX -> ntoh32 ?
			if peerVersion != version {
				err = fmt.Errorf("protocol version mismatch: peer = %08x  ; our side = %08x", peerVersion, version)
			}
		}
		errch <- err
	}()

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1027 1028 1029 1030 1031 1032 1033 1034 1035
	connClosed := false
	defer func() {
		// make sure our version is always sent on the wire, if possible,
		// so that peer does not see just closed connection when on rx we see version mismatch
		//
		// NOTE if cancelled tx goroutine will wake up without delay
		txWg.Wait()

		// don't forget to close conn if returning with error + add handshake err context
Kirill Smelkov's avatar
Kirill Smelkov committed
1036
		if err != nil {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047
			err = &HandshakeError{conn.LocalAddr(), conn.RemoteAddr(), err}
			if !connClosed {
				conn.Close()
			}
		}
	}()


	for i := 0; i < 2; i++ {
		select {
		case <-ctx.Done():
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1048
			conn.Close() // interrupt IO
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1049 1050 1051 1052 1053 1054 1055
			connClosed = true
			return ctx.Err()

		case err = <-errch:
			if err != nil {
				return err
			}
Kirill Smelkov's avatar
Kirill Smelkov committed
1056 1057 1058
		}
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1059
	// handshaked ok
Kirill Smelkov's avatar
Kirill Smelkov committed
1060 1061 1062
	return nil
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1063

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1064
// ---- Dial & Listen at raw NodeLink level ----
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1065

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1066 1067
// DialLink connects to address on given network, handshakes and wraps the connection as NodeLink
func DialLink(ctx context.Context, net xnet.Networker, addr string) (nl *NodeLink, err error) {
1068
	peerConn, err := net.Dial(ctx, addr)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1069 1070 1071
	if err != nil {
		return nil, err
	}
Kirill Smelkov's avatar
Kirill Smelkov committed
1072

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1073
	return Handshake(ctx, peerConn, LinkClient)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1074 1075
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1076
// ListenLink starts listening on laddr for incoming connections and wraps them as NodeLink.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1077
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1078
// The listener accepts only those connections that pass handshake.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1079
func ListenLink(net xnet.Networker, laddr string) (LinkListener, error) {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1080 1081 1082 1083 1084
	rawl, err := net.Listen(laddr)
	if err != nil {
		return nil, err
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1085
	l := &linkListener{
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1086
		l:        rawl,
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1087
		acceptq:  make(chan linkAccepted),
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1088 1089 1090 1091 1092 1093 1094
		closed:   make(chan struct{}),
	}
	go l.run()

	return l, nil
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1095 1096
// LinkListener is net.Listener adapted to return handshaked NodeLink on Accept.
type LinkListener interface {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1097 1098 1099
	// from net.Listener:
	Close() error
	Addr() net.Addr
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1100 1101 1102 1103 1104 1105 1106

	// Accept returns new incoming connection wrapped into NodeLink.
	// It accepts only those connections which pass handshake.
	Accept() (*NodeLink, error)
}

type linkListener struct {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1107
	l       net.Listener
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1108
	acceptq chan linkAccepted
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1109 1110 1111
	closed  chan struct {}
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1112
type linkAccepted struct {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1113 1114 1115 1116
	link  *NodeLink
	err   error
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1117
func (l *linkListener) Close() error {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1118
	err := l.l.Close()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1119 1120 1121 1122
	close(l.closed)
	return err
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1123
func (l *linkListener) run() {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135
	// context that cancels when listener stops
	runCtx, runCancel := context.WithCancel(context.Background())
	defer runCancel()

	for {
		// stop on close
		select {
		case <-l.closed:
			return
		default:
		}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1136
		// XXX add backpressure on too much incoming connections without client .Accept ?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1137
		conn, err := l.l.Accept()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1138 1139 1140 1141
		go l.accept(runCtx, conn, err)
	}
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1142
func (l *linkListener) accept(ctx context.Context, conn net.Conn, err error) {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1143 1144 1145
	link, err := l.accept1(ctx, conn, err)

	select {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1146
	case l.acceptq <- linkAccepted{link, err}:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1147 1148 1149 1150 1151 1152 1153
		// ok

	case <-l.closed:
		// shutdown
		if link != nil {
			link.Close()
		}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1154 1155 1156
	}
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1157
func (l *linkListener) accept1(ctx context.Context, conn net.Conn, err error) (*NodeLink, error) {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1158 1159
	// XXX err ctx?

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1160 1161 1162
	if err != nil {
		return nil, err
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1163

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1164
	// NOTE Handshake closes conn in case of failure
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1165 1166 1167 1168 1169 1170 1171 1172
	link, err := Handshake(ctx, conn, LinkServer)
	if err != nil {
		return nil, err
	}

	return link, nil
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1173
func (l *linkListener) Accept() (*NodeLink, error) {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1174 1175 1176
	select{
	case <-l.closed:
		// we know raw listener is already closed - return proper error about it
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1177
		_, err := l.l.Accept()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1178 1179 1180 1181 1182 1183
		return nil, err

	case a := <-l.acceptq:
		return a.link, a.err
	}
}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1184

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1185
func (l *linkListener) Addr() net.Addr {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1186 1187 1188
	return l.l.Addr()
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215
/*
XXX do if this is needed in a second place besides talkMaster1
// ---- Listen for single Conn over NodeLink use-cases ----

// XXX
func ListenSingleConn(link *NodeLink) ConnListener {
	l := &listen1conn{link}
	// XXX go ...
	return l
}

// ConnListener XXX ...
type ConnListener interface {
	// XXX +Close, Addr ?

	// Accept returns new connection multiplexed over NodeLink
	Accept() (*Conn, error)
}

type listen1conn struct {
	link *NodeLink
}

func ...
*/


Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229
// ---- for convenience: Conn -> NodeLink & local/remote link addresses  ----

// LocalAddr returns local address of the underlying link to peer.
func (nl *NodeLink) LocalAddr() net.Addr {
	return nl.peerLink.LocalAddr()
}

// RemoteAddr returns remote address of the underlying link to peer.
func (nl *NodeLink) RemoteAddr() net.Addr {
	return nl.peerLink.RemoteAddr()
}

// Link returns underlying NodeLink of this connection.
func (c *Conn) Link() *NodeLink {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1230
	return c.link
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1231 1232 1233 1234 1235 1236 1237 1238
}

// ConnID returns connection identifier used for the connection.
func (c *Conn) ConnID() uint32 {
	return c.connId
}


Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1239
// ---- for convenience: String / Error / Cause ----
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1240
func (nl *NodeLink) String() string {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1241
	s := fmt.Sprintf("%s - %s", nl.LocalAddr(), nl.RemoteAddr())
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1242
	return s	// XXX add "(closed)" if nl is closed ?
1243
			// XXX other flags e.g. (down) ?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1244 1245 1246
}

func (c *Conn) String() string {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1247
	s := fmt.Sprintf("%s .%d", c.link, c.connId)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1248 1249 1250
	return s	// XXX add "(closed)" if c is closed ?
}

1251 1252 1253 1254 1255 1256 1257 1258
func (e *LinkError) Error() string {
	return fmt.Sprintf("%s: %s: %s", e.Link, e.Op, e.Err)
}

func (e *ConnError) Error() string {
	return fmt.Sprintf("%s: %s: %s", e.Conn, e.Op, e.Err)
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1259 1260 1261
func (e *LinkError) Cause() error { return e.Err }
func (e *ConnError) Cause() error { return e.Err }

1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274
func (nl *NodeLink) err(op string, e error) error {
	if e == nil {
		return nil
	}
	return &LinkError{Link: nl, Op: op, Err: e}
}

func (c *Conn) err(op string, e error) error {
	if e == nil {
		return nil
	}
	return &ConnError{Conn: c, Op: op, Err: e}
}
Kirill Smelkov's avatar
Kirill Smelkov committed
1275 1276 1277 1278


// ---- exchange of messages ----

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1279 1280 1281
//trace:event traceConnRecv(c *Conn, msg Msg)
//trace:event traceConnSendPre(c *Conn, msg Msg)
// XXX do we also need traceConnSend?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1282

Kirill Smelkov's avatar
Kirill Smelkov committed
1283 1284 1285 1286 1287 1288 1289
// Recv receives message
// it receives packet and decodes message from it
func (c *Conn) Recv() (Msg, error) {
	pkt, err := c.recvPkt()
	if err != nil {
		return nil, err
	}
Kirill Smelkov's avatar
Kirill Smelkov committed
1290
	defer pkt.Free()
Kirill Smelkov's avatar
Kirill Smelkov committed
1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303

	// decode packet
	pkth := pkt.Header()
	msgCode := ntoh16(pkth.MsgCode)
	msgType := msgTypeRegistry[msgCode]
	if msgType == nil {
		err = fmt.Errorf("invalid msgCode (%d)", msgCode)
		// XXX "decode" -> "recv: decode"?
		return nil, &ConnError{Conn: c, Op: "decode", Err: err}
	}

	// TODO use free-list for decoded messages + when possible decode in-place
	msg := reflect.New(msgType).Interface().(Msg)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1304 1305 1306
//	msg := reflect.NewAt(msgType, bufAlloc(msgType.Size())


Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1307
	_, err = msg.neoMsgDecode(pkt.Payload())
Kirill Smelkov's avatar
Kirill Smelkov committed
1308
	if err != nil {
Kirill Smelkov's avatar
Kirill Smelkov committed
1309
		return nil, &ConnError{Conn: c, Op: "decode", Err: err}	// XXX "decode:" is already in ErrDecodeOverflow
Kirill Smelkov's avatar
Kirill Smelkov committed
1310 1311
	}

1312
	traceConnRecv(c, msg)
Kirill Smelkov's avatar
Kirill Smelkov committed
1313 1314 1315
	return msg, nil
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1316 1317
// Send sends message.
//
Kirill Smelkov's avatar
Kirill Smelkov committed
1318 1319
// it encodes message into packet and sends it
func (c *Conn) Send(msg Msg) error {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1320
	traceConnSendPre(c, msg)
1321

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1322
	l := msg.neoMsgEncodedLen()
Kirill Smelkov's avatar
Kirill Smelkov committed
1323
	buf := pktAlloc(pktHeaderLen+l)
Kirill Smelkov's avatar
Kirill Smelkov committed
1324 1325

	h := buf.Header()
1326
	// h.ConnId will be set by conn.sendPkt
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1327
	h.MsgCode = hton16(msg.neoMsgCode())
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1328
	h.MsgLen = hton32(uint32(l)) // XXX casting: think again
Kirill Smelkov's avatar
Kirill Smelkov committed
1329

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1330
	msg.neoMsgEncode(buf.Payload())
Kirill Smelkov's avatar
Kirill Smelkov committed
1331 1332

	// XXX more context in err? (msg type)
Kirill Smelkov's avatar
Kirill Smelkov committed
1333
	return c.sendPkt(buf)
Kirill Smelkov's avatar
Kirill Smelkov committed
1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348
}


// Expect receives message and checks it is one of expected types
//
// if verification is successful the message is decoded inplace and returned
// which indicates index of received message.
//
// on error (-1, err) is returned
func (c *Conn) Expect(msgv ...Msg) (which int, err error) {
	// XXX a bit dup wrt Recv
	pkt, err := c.recvPkt()
	if err != nil {
		return -1, err
	}
Kirill Smelkov's avatar
Kirill Smelkov committed
1349
	defer pkt.Free()
Kirill Smelkov's avatar
Kirill Smelkov committed
1350 1351 1352 1353 1354

	pkth := pkt.Header()
	msgCode := ntoh16(pkth.MsgCode)

	for i, msg := range msgv {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1355 1356
		if msg.neoMsgCode() == msgCode {
			_, err = msg.neoMsgDecode(pkt.Payload())
Kirill Smelkov's avatar
Kirill Smelkov committed
1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373
			if err != nil {
				return -1, &ConnError{Conn: c, Op: "decode", Err: err}
			}
			return i, nil
		}
	}

	// unexpected message
	msgType := msgTypeRegistry[msgCode]
	if msgType == nil {
		return -1, &ConnError{c, "decode", fmt.Errorf("invalid msgCode (%d)", msgCode)}
	}

	// XXX also add which messages were expected ?
	return -1, &ConnError{c, "recv", fmt.Errorf("unexpected message: %v", msgType)}
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1374
// Ask sends request and receives response.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1375
//
Kirill Smelkov's avatar
Kirill Smelkov committed
1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395
// It expects response to be exactly of resp type and errors otherwise
// XXX clarify error semantic (when Error is decoded)
// XXX do the same as Expect wrt respv ?
func (c *Conn) Ask(req Msg, resp Msg) error {
	err := c.Send(req)
	if err != nil {
		return err
	}

	nerr := &Error{}
	which, err := c.Expect(resp, nerr)
	switch which {
	case 0:
		return nil
	case 1:
		return ErrDecode(nerr)
	}

	return err
}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1396 1397

// ---- exchange of 1-1 request-reply ----
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1398 1399 1400
// (impedance matcher for current neo/py imlementation)

// TODO Recv1/Reply/Send1/Ask1 tests
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415

// Request is a message received from the link + connection handle to make a reply.
//
// Request represents 1 request - 0|1 reply interaction model XXX
type Request struct {
	Msg  Msg
	conn *Conn
}

// Recv1 receives message from link and wraps it into Request.
//
// XXX doc
func (link *NodeLink) Recv1() (Request, error) {
	conn, err := link.Accept(/*context.TODO()*/)	// XXX remove context?
	if err != nil {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1416
		return Request{}, err	// XXX or return *Request? (want to avoid alloc)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1417 1418 1419
	}

	// NOTE serveRecv guaranty that when a conn is accepted, there is 1 message in conn.rxq
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1420
	msg, err := conn.Recv()		// XXX directly from <-rxq
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1421
	if err != nil {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1422
		conn.Close() // XXX -> conn.release
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1423
		return Request{}, err
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1424 1425
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1426
	// noone will be reading from conn anymore - mark rx down so that if
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1427 1428
	// peer sends any another packet with same .ConnID serveRecv does not
	// deadlock trying to put it to conn.rxq.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1429
	conn.downRX(errConnClosed)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1430 1431 1432 1433 1434 1435 1436

	return Request{Msg: msg, conn: conn}, nil
}

// Reply sends response to request.
//
// XXX doc
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1437
func (req *Request) Reply(resp Msg) error {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1438 1439 1440 1441
	return req.conn.Send(resp)
	//err1 := req.conn.Send(resp)
	//err2 := req.conn.Close()	// XXX no - only Send here?
	//return xerr.First(err1, err2)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1442 1443
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1444
// Release must be called to free request resources.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1445 1446
//
// XXX doc
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1447 1448 1449 1450 1451 1452
func (req *Request) Release() {
	//return req.conn.Close()
	// XXX req.Msg.Release() ?
	req.Msg = nil
	req.conn.release()
	req.conn = nil // just in case
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1453 1454 1455
}


1456
// Send1 sends one message over link.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1457 1458 1459 1460 1461 1462 1463 1464
//
// XXX doc
func (link *NodeLink) Send1(msg Msg) error {
	conn, err := link.NewConn()
	if err != nil {
		return err
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1465
	conn.downRX(errConnClosed)	// FIXME just new conn this way
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1466

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1467 1468 1469
	err = conn.Send(msg)
	conn.release()
	return err
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505
}


// Ask1 sends request and receives response.	XXX in 1-1 model
//
// See Conn.Ask for semantic details.
// XXX doc
func (link *NodeLink) Ask1(req Msg, resp Msg) (err error) {
	conn, err := link.NewConn()
	if err != nil {
		return err
	}

	defer func() {
		err2 := conn.Close()
		if err == nil {
			err = err2
		}
	}()

	err = conn.Send(req)
	if err != nil {
		return err
	}

	nerr := &Error{}
	which, err := conn.Expect(resp, nerr)
	switch which {
	case 0:
		return nil
	case 1:
		return ErrDecode(nerr)
	}

	return err
}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1506 1507 1508 1509

func (req *Request) Link() *NodeLink {
	return req.conn.Link()
}