Commit 8cb4e9c0 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent badeb060
...@@ -464,10 +464,12 @@ func (nl *NodeLink) serveSend() { ...@@ -464,10 +464,12 @@ func (nl *NodeLink) serveSend() {
// ---- raw IO ---- // ---- raw IO ----
const dumpio = true
// sendPkt sends raw packet to peer // sendPkt sends raw packet to peer
// tx error, if any, is returned as is and is analyzed in serveSend // tx error, if any, is returned as is and is analyzed in serveSend
func (nl *NodeLink) sendPkt(pkt *PktBuf) error { func (nl *NodeLink) sendPkt(pkt *PktBuf) error {
if true { if dumpio {
// XXX -> log // XXX -> log
fmt.Printf("%v > %v: %v\n", nl.peerLink.LocalAddr(), nl.peerLink.RemoteAddr(), pkt) fmt.Printf("%v > %v: %v\n", nl.peerLink.LocalAddr(), nl.peerLink.RemoteAddr(), pkt)
//defer fmt.Printf("\t-> sendPkt err: %v\n", err) //defer fmt.Printf("\t-> sendPkt err: %v\n", err)
...@@ -524,7 +526,7 @@ func (nl *NodeLink) recvPkt() (*PktBuf, error) { ...@@ -524,7 +526,7 @@ func (nl *NodeLink) recvPkt() (*PktBuf, error) {
} }
} }
if true { if dumpio {
// XXX -> log // XXX -> log
fmt.Printf("%v < %v: %v\n", nl.peerLink.LocalAddr(), nl.peerLink.RemoteAddr(), pkt) fmt.Printf("%v < %v: %v\n", nl.peerLink.LocalAddr(), nl.peerLink.RemoteAddr(), pkt)
} }
...@@ -572,6 +574,17 @@ func Listen(network, laddr string) (*Listener, error) { ...@@ -572,6 +574,17 @@ func Listen(network, laddr string) (*Listener, error) {
// ---- for convenience: String ----
func (nl *NodeLink) String() string {
s := fmt.Sprintf("%s - %s", nl.peerLink.LocalAddr(), nl.peerLink.RemoteAddr())
return s // XXX add "(closed)" if nl is closed ?
}
func (c *Conn) String() string {
s := fmt.Sprintf("%s .%d", c.nodeLink, c.connId)
return s // XXX add "(closed)" if c is closed ?
}
// ---------------------------------------- // ----------------------------------------
......
...@@ -44,7 +44,7 @@ func NewStorage(zstor zodb.IStorage) *Storage { ...@@ -44,7 +44,7 @@ func NewStorage(zstor zodb.IStorage) *Storage {
// ServeLink serves incoming node-node link connection // ServeLink serves incoming node-node link connection
func (stor *Storage) ServeLink(ctx context.Context, link *NodeLink) { func (stor *Storage) ServeLink(ctx context.Context, link *NodeLink) {
fmt.Printf("stor: serving new node %s <-> %s\n", link.peerLink.LocalAddr(), link.peerLink.RemoteAddr()) fmt.Printf("stor: %s: serving new node\n", link)
// close link when either cancelling or returning (e.g. due to an error) // close link when either cancelling or returning (e.g. due to an error)
// ( when cancelling - link.Close will signal to all current IO to // ( when cancelling - link.Close will signal to all current IO to
...@@ -58,12 +58,13 @@ func (stor *Storage) ServeLink(ctx context.Context, link *NodeLink) { ...@@ -58,12 +58,13 @@ func (stor *Storage) ServeLink(ctx context.Context, link *NodeLink) {
// XXX tell peers we are shutting down? // XXX tell peers we are shutting down?
case <-retch: case <-retch:
} }
fmt.Printf("stor: closing link to %s\n", link.peerLink.RemoteAddr()) fmt.Printf("stor: %v: closing link\n", link)
link.Close() // XXX err link.Close() // XXX err
}() }()
nodeInfo, err := IdentifyPeer(link, STORAGE) nodeInfo, err := IdentifyPeer(link, STORAGE)
if err != nil { if err != nil {
// XXX include link here or in IdentifyPeer ?
fmt.Printf("peer identification failed: %v\n", err) fmt.Printf("peer identification failed: %v\n", err)
return return
} }
...@@ -74,7 +75,7 @@ func (stor *Storage) ServeLink(ctx context.Context, link *NodeLink) { ...@@ -74,7 +75,7 @@ func (stor *Storage) ServeLink(ctx context.Context, link *NodeLink) {
serveConn = stor.ServeClient serveConn = stor.ServeClient
default: default:
fmt.Printf("unexpected peer type: %v\n", nodeInfo.NodeType) fmt.Printf("stor: %v: unexpected peer type: %v\n", link, nodeInfo.NodeType)
return return
} }
...@@ -82,7 +83,8 @@ func (stor *Storage) ServeLink(ctx context.Context, link *NodeLink) { ...@@ -82,7 +83,8 @@ func (stor *Storage) ServeLink(ctx context.Context, link *NodeLink) {
for { for {
conn, err := link.Accept() conn, err := link.Accept()
if err != nil { if err != nil {
fmt.Printf("accept: %v\n", err) // XXX err ctx // XXX both link and accept op should be generated in link.Accept
fmt.Printf("stor: %v: accept: %v\n", link, err) // XXX err ctx
break break
} }
...@@ -94,14 +96,9 @@ func (stor *Storage) ServeLink(ctx context.Context, link *NodeLink) { ...@@ -94,14 +96,9 @@ func (stor *Storage) ServeLink(ctx context.Context, link *NodeLink) {
// TODO wait all spawned serveConn // TODO wait all spawned serveConn
} }
// connAddr returns string describing conn XXX text, naming
func connAddr(conn *Conn) string {
return fmt.Sprintf("%s .%d", conn.nodeLink.peerLink.RemoteAddr(), conn.connId)
}
// ServeClient serves incoming connection on which peer identified itself as client // ServeClient serves incoming connection on which peer identified itself as client
func (stor *Storage) ServeClient(ctx context.Context, conn *Conn) { func (stor *Storage) ServeClient(ctx context.Context, conn *Conn) {
fmt.Printf("stor: serving new client conn %s\n", connAddr(conn)) fmt.Printf("stor: %s: serving new client conn\n", conn)
// close connection when either cancelling or returning (e.g. due to an error) // close connection when either cancelling or returning (e.g. due to an error)
// ( when cancelling - conn.Close will signal to current IO to // ( when cancelling - conn.Close will signal to current IO to
...@@ -115,7 +112,7 @@ func (stor *Storage) ServeClient(ctx context.Context, conn *Conn) { ...@@ -115,7 +112,7 @@ func (stor *Storage) ServeClient(ctx context.Context, conn *Conn) {
// XXX tell client we are shutting down? // XXX tell client we are shutting down?
case <-retch: case <-retch:
} }
fmt.Printf("stor: closing client conn %s\n", connAddr(conn)) fmt.Printf("stor: %v: closing client conn\n", conn)
conn.Close() // XXX err conn.Close() // XXX err
}() }()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment