Commit abed4adb authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 09915e9e
......@@ -23,19 +23,32 @@ import (
"testing"
)
// basic interaction between Client -- Storage
func TestClientStorage(t *testing.T) {
nlC, nlS := nodeLinkPipe()
Cnl, Snl := nodeLinkPipe()
wg := WorkGroup()
ctxS := context.Background()
Sctx, Scancel := context.WithCancel(context.Background())
S := NewStorage(nil) // TODO zodb.storage.mem
//Serve(ctx, l, S)
S.ServeLink(ctxS, nlS) // XXX go
C, err := NewClient(nlC)
//assert err != nil
_ = C
_ = err
wg.Gox(func() {
S.ServeLink(Sctx, Snl)
// XXX + test error return
})
C, err := NewClient(Cnl)
if err != nil {
t.Fatalf("creating/identifying client: %v", err)
}
lastTid, err := C.LastTid()
if !(lastTid == 111 && err == nil) { // XXX 111
t.Fatalf("C.LastTid -> %v, %v ; want %v, nil", lastTid, err, 111, err)
}
// shutdown storage
// XXX wait for S to shutdown + verify shutdown error
Scancel()
xwait(wg)
}
......@@ -157,7 +157,7 @@ func NewNodeLink(conn net.Conn, role LinkRole) *NodeLink {
func (nl *NodeLink) newConn(connId uint32) *Conn {
c := &Conn{nodeLink: nl,
connId: connId,
rxq: make(chan *PktBuf), // TODO buffering
rxq: make(chan *PktBuf, 1), // NOTE non-blocking - see serveRecv
txerr: make(chan error, 1), // NOTE non-blocking - see Conn.Send
down: make(chan struct{}),
}
......@@ -328,6 +328,7 @@ func (nl *NodeLink) serveRecv() {
// pkt.ConnId -> Conn
connId := ntoh32(pkt.Header().ConnId)
accept := false
nl.connMu.Lock()
......@@ -338,23 +339,7 @@ func (nl *NodeLink) serveRecv() {
if nl.acceptq != nil {
// we are accepting new incoming connection
conn = nl.newConn(connId)
select {
case <-nl.down:
// Accept and loop calling it can exit if shutdown was requested
// if so we are also exiting
nl.connMu.Unlock()
// make sure not to leave rx error as nil
nl.errMu.Lock()
nl.errRecv = ErrLinkDown
nl.errMu.Unlock()
return
case nl.acceptq <- conn:
// ok
}
accept = true
}
}
......@@ -369,11 +354,35 @@ func (nl *NodeLink) serveRecv() {
//
// TODO backpressure when Recv is not keeping up with Send on peer side?
// (not to let whole nodelink starve because of one connection)
//
// NOTE rxq must be buffered with at least 1 element so that
// queuing pkt succeeds for incoming connection that is not yet
// there in acceptq.
conn.rxq <- pkt
// keep connMu locked until here: so that ^^^ `conn.rxq <- pkt` can be
// sure conn stays not down e.g. closed by Conn.Close or NodeLink.shutdown
//
// XXX try to release connMu eariler - before `rxq <- pkt`
nl.connMu.Unlock()
if accept {
select {
case <-nl.down:
// Accept and loop calling it can exit if shutdown was requested
// if so we are also exiting
// make sure not to leave rx error as nil
nl.errMu.Lock()
nl.errRecv = ErrLinkDown
nl.errMu.Unlock()
return
case nl.acceptq <- conn:
// ok
}
}
}
}
......
......@@ -43,6 +43,7 @@ func NewStorage(zstor zodb.IStorage) *Storage {
// ServeLink serves incoming node-node link connection
// XXX +error return?
func (stor *Storage) ServeLink(ctx context.Context, link *NodeLink) {
fmt.Printf("stor: %s: serving new node\n", link)
......@@ -98,6 +99,7 @@ func (stor *Storage) ServeLink(ctx context.Context, link *NodeLink) {
}
// ServeClient serves incoming connection on which peer identified itself as client
// XXX +error return?
func (stor *Storage) ServeClient(ctx context.Context, conn *Conn) {
fmt.Printf("stor: %s: serving new client conn\n", conn)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment