Commit badeb060 authored by Kirill Smelkov's avatar Kirill Smelkov

X identify chat draftly works

parent cb23e144
...@@ -53,7 +53,8 @@ func (c *Client) LastTid() (zodb.Tid, error) { ...@@ -53,7 +53,8 @@ func (c *Client) LastTid() (zodb.Tid, error) {
reply, err := RecvAndDecode(c.storConn) reply, err := RecvAndDecode(c.storConn)
if err != nil { if err != nil {
return 0, err // XXX err context // XXX err context (e.g. peer resetting connection -> currently only EOF)
return 0, err
} }
switch reply := reply.(type) { switch reply := reply.(type) {
...@@ -87,13 +88,21 @@ func openClientByURL(u *url.URL) (zodb.IStorage, error) { ...@@ -87,13 +88,21 @@ func openClientByURL(u *url.URL) (zodb.IStorage, error) {
return nil, err return nil, err
} }
// identify ourselves via conn
storType, err := IdentifyMe(storLink, CLIENT)
if err != nil {
return nil, err // XXX err ctx
}
if storType != STORAGE {
storLink.Close() // XXX err
return nil, fmt.Errorf("%v: peer is not storage (identifies as %v)", storLink, storType)
}
conn, err := storLink.NewConn() conn, err := storLink.NewConn()
if err != nil { if err != nil {
return nil, err // XXX err ctx ? return nil, err // XXX err ctx ?
} }
// TODO identify ourselves via conn
return &Client{storLink, conn}, nil return &Client{storLink, conn}, nil
} }
......
...@@ -467,7 +467,7 @@ func (nl *NodeLink) serveSend() { ...@@ -467,7 +467,7 @@ func (nl *NodeLink) serveSend() {
// sendPkt sends raw packet to peer // sendPkt sends raw packet to peer
// tx error, if any, is returned as is and is analyzed in serveSend // tx error, if any, is returned as is and is analyzed in serveSend
func (nl *NodeLink) sendPkt(pkt *PktBuf) error { func (nl *NodeLink) sendPkt(pkt *PktBuf) error {
if false { if true {
// XXX -> log // XXX -> log
fmt.Printf("%v > %v: %v\n", nl.peerLink.LocalAddr(), nl.peerLink.RemoteAddr(), pkt) fmt.Printf("%v > %v: %v\n", nl.peerLink.LocalAddr(), nl.peerLink.RemoteAddr(), pkt)
//defer fmt.Printf("\t-> sendPkt err: %v\n", err) //defer fmt.Printf("\t-> sendPkt err: %v\n", err)
...@@ -524,7 +524,7 @@ func (nl *NodeLink) recvPkt() (*PktBuf, error) { ...@@ -524,7 +524,7 @@ func (nl *NodeLink) recvPkt() (*PktBuf, error) {
} }
} }
if false { if true {
// XXX -> log // XXX -> log
fmt.Printf("%v < %v: %v\n", nl.peerLink.LocalAddr(), nl.peerLink.RemoteAddr(), pkt) fmt.Printf("%v < %v: %v\n", nl.peerLink.LocalAddr(), nl.peerLink.RemoteAddr(), pkt)
} }
......
...@@ -80,10 +80,10 @@ func ListenAndServe(ctx context.Context, net_, laddr string, srv Server) error { ...@@ -80,10 +80,10 @@ func ListenAndServe(ctx context.Context, net_, laddr string, srv Server) error {
// ---------------------------------------- // ----------------------------------------
// Identify identifies peer on the link // IdentifyPeer identifies peer on the link
// it expects peer to send RequestIdentification packet and replies with AcceptIdentification if identification parameters are ok. // it expects peer to send RequestIdentification packet and replies with AcceptIdentification if identification passes.
// returns information about identified node or error. // returns information about identified node or error.
func Identify(link *NodeLink) (nodeInfo RequestIdentification /*TODO -> NodeInfo*/, err error) { func IdentifyPeer(link *NodeLink, myNodeType NodeType) (nodeInfo RequestIdentification /*TODO -> NodeInfo*/, err error) {
// the first conn must come with RequestIdentification packet // the first conn must come with RequestIdentification packet
conn, err := link.Accept() conn, err := link.Accept()
if err != nil { if err != nil {
...@@ -92,7 +92,7 @@ func Identify(link *NodeLink) (nodeInfo RequestIdentification /*TODO -> NodeInfo ...@@ -92,7 +92,7 @@ func Identify(link *NodeLink) (nodeInfo RequestIdentification /*TODO -> NodeInfo
defer func() { defer func() {
err2 := conn.Close() err2 := conn.Close()
if err == nil { if err == nil {
err = err2 err = err2 // XXX err ctx
// XXX also clear nodeInfo ? // XXX also clear nodeInfo ?
} }
}() }()
...@@ -106,6 +106,8 @@ func Identify(link *NodeLink) (nodeInfo RequestIdentification /*TODO -> NodeInfo ...@@ -106,6 +106,8 @@ func Identify(link *NodeLink) (nodeInfo RequestIdentification /*TODO -> NodeInfo
default: default:
return nodeInfo, fmt.Errorf("expected RequestIdentification ; got %T", pkt) return nodeInfo, fmt.Errorf("expected RequestIdentification ; got %T", pkt)
// XXX also handle Error
case *RequestIdentification: case *RequestIdentification:
if pkt.ProtocolVersion != PROTOCOL_VERSION { if pkt.ProtocolVersion != PROTOCOL_VERSION {
// TODO also tell peer with Error // TODO also tell peer with Error
...@@ -115,7 +117,7 @@ func Identify(link *NodeLink) (nodeInfo RequestIdentification /*TODO -> NodeInfo ...@@ -115,7 +117,7 @@ func Identify(link *NodeLink) (nodeInfo RequestIdentification /*TODO -> NodeInfo
// TODO (.NodeType, .UUID, .Address, .Name, .IdTimestamp) -> check + register to NM // TODO (.NodeType, .UUID, .Address, .Name, .IdTimestamp) -> check + register to NM
err = EncodeAndSend(conn, &AcceptIdentification{ err = EncodeAndSend(conn, &AcceptIdentification{
NodeType: pkt.NodeType, NodeType: myNodeType,
MyUUID: 0, // XXX MyUUID: 0, // XXX
NumPartitions: 0, // XXX NumPartitions: 0, // XXX
NumReplicas: 0, // XXX NumReplicas: 0, // XXX
...@@ -134,6 +136,50 @@ func Identify(link *NodeLink) (nodeInfo RequestIdentification /*TODO -> NodeInfo ...@@ -134,6 +136,50 @@ func Identify(link *NodeLink) (nodeInfo RequestIdentification /*TODO -> NodeInfo
return nodeInfo, nil return nodeInfo, nil
} }
// IdentifyMe identifies local node to remote peer
func IdentifyMe(link *NodeLink, nodeType NodeType /*XXX*/) (peerType NodeType, err error) {
conn, err := link.NewConn()
if err != nil {
return peerType, err
}
defer func() {
err2 := conn.Close()
if err == nil && err2 != nil {
err = err2 // XXX err ctx
// XXX also reset peerType
}
}()
err = EncodeAndSend(conn, &RequestIdentification{
ProtocolVersion: PROTOCOL_VERSION,
NodeType: nodeType,
UUID: 0, // XXX
Address: Address{}, // XXX
Name: "", // XXX cluster name ?
IdTimestamp: 0, // XXX
})
if err != nil {
return peerType, err
}
pkt, err := RecvAndDecode(conn)
if err != nil {
return peerType, err
}
switch pkt := pkt.(type) {
default:
return peerType, fmt.Errorf("expected AcceptIdentification ; got %T", pkt)
// XXX also handle Error
case *AcceptIdentification:
return pkt.NodeType, nil
}
}
// ---------------------------------------- // ----------------------------------------
// XXX place = ok ? not ok -> move out of here // XXX place = ok ? not ok -> move out of here
// XXX naming for RecvAndDecode and EncodeAndSend // XXX naming for RecvAndDecode and EncodeAndSend
......
...@@ -62,7 +62,7 @@ func (stor *Storage) ServeLink(ctx context.Context, link *NodeLink) { ...@@ -62,7 +62,7 @@ func (stor *Storage) ServeLink(ctx context.Context, link *NodeLink) {
link.Close() // XXX err link.Close() // XXX err
}() }()
nodeInfo, err := Identify(link) nodeInfo, err := IdentifyPeer(link, STORAGE)
if err != nil { if err != nil {
fmt.Printf("peer identification failed: %v\n", err) fmt.Printf("peer identification failed: %v\n", err)
return return
...@@ -83,7 +83,7 @@ func (stor *Storage) ServeLink(ctx context.Context, link *NodeLink) { ...@@ -83,7 +83,7 @@ func (stor *Storage) ServeLink(ctx context.Context, link *NodeLink) {
conn, err := link.Accept() conn, err := link.Accept()
if err != nil { if err != nil {
fmt.Printf("accept: %v\n", err) // XXX err ctx fmt.Printf("accept: %v\n", err) // XXX err ctx
continue break
} }
// XXX adjust ctx ? // XXX adjust ctx ?
...@@ -91,6 +91,7 @@ func (stor *Storage) ServeLink(ctx context.Context, link *NodeLink) { ...@@ -91,6 +91,7 @@ func (stor *Storage) ServeLink(ctx context.Context, link *NodeLink) {
go serveConn(ctx, conn) go serveConn(ctx, conn)
} }
// TODO wait all spawned serveConn
} }
// connAddr returns string describing conn XXX text, naming // connAddr returns string describing conn XXX text, naming
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment