Commit aba6d115 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent a585e05d
......@@ -135,7 +135,7 @@ func openClientByURL(ctx context.Context, u *url.URL) (zodb.IStorage, error) {
// XXX for now url is treated as storage node URL
// XXX check/use other url fields
net := xnet.NetPlain("tcp") // TODO + TLS; not only "tcp" ?
storLink, err := neo.Dial(ctx, net, u.Host)
storLink, err := neo.DialLink(ctx, net, u.Host) // XXX -> Dial
if err != nil {
return nil, err
}
......
......@@ -31,6 +31,8 @@ import (
"sync"
"sync/atomic"
"lab.nexedi.com/kirr/go123/xerr"
"lab.nexedi.com/kirr/neo/go/xcommon/xnet"
)
......@@ -682,12 +684,10 @@ func handshake(ctx context.Context, conn net.Conn, version uint32) (err error) {
}
// ---- for convenience: Dial & Listen ----
// XXX we also need 1) Dial + request identification & 2) Listen + verify/accept identification
// ---- Dial & Listen at raw NodeLink level ----
// Dial connects to address on given network, handshakes and wraps the connection as NodeLink
func Dial(ctx context.Context, net xnet.Networker, addr string) (nl *NodeLink, err error) {
// DialLink connects to address on given network, handshakes and wraps the connection as NodeLink
func DialLink(ctx context.Context, net xnet.Networker, addr string) (nl *NodeLink, err error) {
peerConn, err := net.Dial(ctx, addr)
if err != nil {
return nil, err
......@@ -696,17 +696,17 @@ func Dial(ctx context.Context, net xnet.Networker, addr string) (nl *NodeLink, e
return Handshake(ctx, peerConn, LinkClient)
}
// Listen starts listening on laddr for incoming connections and wraps them as NodeLink.
// ListenLink starts listening on laddr for incoming connections and wraps them as NodeLink.
// The listener accepts only those connections that pass handshake.
func Listen(net xnet.Networker, laddr string) (*Listener, error) {
func ListenLink(net xnet.Networker, laddr string) (*LinkListener, error) {
rawl, err := net.Listen(laddr)
if err != nil {
return nil, err
}
l := &Listener{
l := &LinkListener{
l: rawl,
acceptq: make(chan accepted),
acceptq: make(chan linkAccepted),
closed: make(chan struct{}),
}
go l.run()
......@@ -714,26 +714,26 @@ func Listen(net xnet.Networker, laddr string) (*Listener, error) {
return l, nil
}
// Listener wraps net.Listener to return handshaked NodeLink on Accept.
// LinkListener wraps net.Listener to return handshaked NodeLink on Accept.
// Create only via Listen.
type Listener struct {
type LinkListener struct {
l net.Listener
acceptq chan accepted
acceptq chan linkAccepted
closed chan struct {}
}
type accepted struct {
type linkAccepted struct {
link *NodeLink
err error
}
func (l *Listener) Close() error {
func (l *LinkListener) Close() error {
err := l.l.Close()
close(l.closed)
return err
}
func (l *Listener) run() {
func (l *LinkListener) run() {
// context that cancels when listener stops
runCtx, runCancel := context.WithCancel(context.Background())
defer runCancel()
......@@ -752,11 +752,11 @@ func (l *Listener) run() {
}
}
func (l *Listener) accept(ctx context.Context, conn net.Conn, err error) {
func (l *LinkListener) accept(ctx context.Context, conn net.Conn, err error) {
link, err := l.accept1(ctx, conn, err)
select {
case l.acceptq <- accepted{link, err}:
case l.acceptq <- linkAccepted{link, err}:
// ok
case <-l.closed:
......@@ -767,7 +767,7 @@ func (l *Listener) accept(ctx context.Context, conn net.Conn, err error) {
}
}
func (l *Listener) accept1(ctx context.Context, conn net.Conn, err error) (*NodeLink, error) {
func (l *LinkListener) accept1(ctx context.Context, conn net.Conn, err error) (*NodeLink, error) {
// XXX err ctx?
if err != nil {
......@@ -783,7 +783,7 @@ func (l *Listener) accept1(ctx context.Context, conn net.Conn, err error) (*Node
return link, nil
}
func (l *Listener) Accept() (*NodeLink, error) {
func (l *LinkListener) Accept() (*NodeLink, error) {
select{
case <-l.closed:
// we know raw listener is already closed - return proper error about it
......@@ -795,7 +795,7 @@ func (l *Listener) Accept() (*NodeLink, error) {
}
}
func (l *Listener) Addr() net.Addr {
func (l *LinkListener) Addr() net.Addr {
return l.l.Addr()
}
......@@ -972,3 +972,202 @@ func (c *Conn) Ask(req Msg, resp Msg) error {
return err
}
// ---- Dial & Listen at application level ----
// IdentifyWith identifies local node with remote peer
// it also verifies peer's node type to what caller expects
func IdentifyWith(expectPeerType NodeType, link *NodeLink, myInfo NodeInfo, clusterName string) (accept *AcceptIdentification, err error) {
defer xerr.Contextf(&err, "%s: request identification", link)
conn, err := link.NewConn()
if err != nil {
return nil, err
}
defer func() {
err2 := conn.Close()
err = xerr.First(err, err2)
}()
accept = &AcceptIdentification{}
err = conn.Ask(&RequestIdentification{
NodeType: myInfo.NodeType,
NodeUUID: myInfo.NodeUUID,
Address: myInfo.Address,
ClusterName: clusterName,
IdTimestamp: myInfo.IdTimestamp, // XXX ok?
}, accept)
if err != nil {
return nil, err // XXX err ctx ?
}
if accept.NodeType != expectPeerType {
return nil, fmt.Errorf("accepted, but peer is not %v (identifies as %v)", expectPeerType, accept.NodeType)
}
return accept, nil
}
// Dial connects to address on given network, handshakes and requests identification
// XXX -> meth. of Node
// XXX text
func Dial(ctx context.Context, net xnet.Networker, addr string, idReq *RequestIdentification) (_ *Conn, _ *AcceptIdentification, err error) {
link, err := DialLink(ctx, net, addr)
if err != nil {
return nil, nil, err
}
// XXX vvv = IdentifyWith - use it ?
defer xerr.Contextf(&err, "%s: request identification", link)
// close link on error return
defer func() {
if err != nil {
link.Close()
}
}()
conn, err := link.NewConn()
if err != nil {
return nil, nil, err
}
accept := &AcceptIdentification{}
err = conn.Ask(idReq, accept)
if err != nil {
return nil, nil, err
}
// XXX also check expected peer type here ?
return conn, accept, nil
}
// XXX doc
func Listen(net xnet.Networker, laddr string) (*Listener, error) {
ll, err := ListenLink(net, laddr)
if err != nil {
return nil, err
}
l := &Listener{
l: ll,
acceptq: make(chan accepted),
closed: make(chan struct{}),
}
go l.run()
return l, nil
}
// Listener wraps LinkListener to return link on which identification was correctly requested XXX
// Create via Listen. XXX
type Listener struct {
l *LinkListener
acceptq chan accepted
closed chan struct {}
}
type accepted struct {
conn *Conn
idReq *RequestIdentification
err error
}
func (l *Listener) Close() error {
err := l.l.Close()
close(l.closed)
return err
}
func (l *Listener) run() {
for {
// stop on close
select {
case <-l.closed:
return
default:
}
// XXX add backpressure on too much incoming connections without client .Accept ?
link, err := l.l.Accept()
go l.accept(link, err)
}
}
func (l *Listener) accept(link *NodeLink, err error) {
res := make(chan accepted, 1)
go func() {
conn, idReq, err := l.accept1(link, err)
res <- accepted{conn, idReq, err}
}()
// wait for accept1 result & resend it to .acceptq
// close link in case of listening cancel or error
//
// the only case when link stays alive is when acceptance was
// successful and link ownership is passed to Accept.
ok := false
select {
case <-l.closed:
case a := <-res:
select {
case l.acceptq <- a:
ok = (a.err == nil)
case <-l.closed:
}
}
if !ok {
link.Close()
}
}
func (l *Listener) accept1(link *NodeLink, err0 error) (_ *Conn, _ *RequestIdentification, err error) {
if err0 != nil {
return nil, nil, err0
}
defer xerr.Context(&err, "identify")
// identify peer
// the first conn must come with RequestIdentification packet
conn, err := link.Accept()
if err != nil {
return nil, nil, err
}
idReq := &RequestIdentification{}
_, err = conn.Expect(idReq)
if err != nil {
// XXX ok to let peer know error as is? e.g. even IO error on Recv?
err2 := conn.Send(&Error{PROTOCOL_ERROR, err.Error()})
err = xerr.Merge(err, err2)
return nil, nil, err
}
return conn, idReq, nil
}
// Accept accepts incoming client connection.
//
// On success the link was handshaked and on returned Conn peer sent us
// RequestIdentification packet which we did not yet answer.
func (l *Listener) Accept() (*Conn, *RequestIdentification, error) {
select{
case <-l.closed:
// we know raw listener is already closed - return proper error about it
_, err := l.l.Accept()
return nil, nil, err
case a := <-l.acceptq:
return a.conn, a.idReq, a.err
}
}
func (l *Listener) Addr() net.Addr {
return l.l.Addr()
}
......@@ -67,7 +67,7 @@ type Master struct {
// event: node connects
type nodeCome struct {
conn *neo.Conn
idReq neo.RequestIdentification // we received this identification request
idReq *neo.RequestIdentification // we received this identification request
idResp chan neo.Msg // what we reply (AcceptIdentification | Error) XXX kill
}
......@@ -145,7 +145,7 @@ func (m *Master) setClusterState(state neo.ClusterState) {
// Run starts master node and runs it until ctx is cancelled or fatal error
func (m *Master) Run(ctx context.Context) error {
// start listening
l, err := m.node.Listen() // XXX -> Listen
l, err := m.node.Listen()
if err != nil {
return err // XXX err ctx
}
......@@ -767,8 +767,8 @@ func (m *Master) ServeLink(ctx context.Context, link *neo.NodeLink) {
return
}
idReq := neo.RequestIdentification{}
_, err = conn.Expect(&idReq)
idReq := &neo.RequestIdentification{}
_, err = conn.Expect(idReq)
if err != nil {
logf("identify: %v", err)
// XXX ok to let peer know error as is? e.g. even IO error on Recv?
......
......@@ -25,126 +25,13 @@ package server
import (
// "context"
// "fmt"
"net"
// "net"
"lab.nexedi.com/kirr/neo/go/neo"
"lab.nexedi.com/kirr/go123/xerr"
)
// Listener wraps neo.Listener to return link on which identification was correctly requested XXX
// Create via Listen. XXX
type Listener struct {
l *neo.Listener
acceptq chan accepted
closed chan struct {}
}
type accepted struct {
conn *neo.Conn
idReq *neo.RequestIdentification
err error
}
func (l *Listener) Close() error {
err := l.l.Close()
close(l.closed)
return err
}
func (l *Listener) run() {
for {
// stop on close
select {
case <-l.closed:
return
default:
}
// XXX add backpressure on too much incoming connections without client .Accept ?
link, err := l.l.Accept()
go l.accept(link, err)
}
}
func (l *Listener) accept(link *neo.NodeLink, err error) {
res := make(chan accepted, 1)
go func() {
conn, idReq, err := l.accept1(link, err)
res <- accepted{conn, idReq, err}
}()
// wait for accept1 result & resend it to .acceptq
// close link in case of listening cancel or error
//
// the only case when link stays alive is when acceptance was
// successful and link ownership is passed to Accept.
ok := false
select {
case <-l.closed:
case a := <-res:
select {
case l.acceptq <- a:
ok = (a.err == nil)
case <-l.closed:
}
}
if !ok {
link.Close()
}
}
func (l *Listener) accept1(link *neo.NodeLink, err0 error) (_ *neo.Conn, _ *neo.RequestIdentification, err error) {
if err0 != nil {
return nil, nil, err0
}
defer xerr.Context(&err, "identify")
// identify peer
// the first conn must come with RequestIdentification packet
conn, err := link.Accept()
if err != nil {
return nil, nil, err
}
idReq := &neo.RequestIdentification{}
_, err = conn.Expect(idReq)
if err != nil {
// XXX ok to let peer know error as is? e.g. even IO error on Recv?
err2 := conn.Send(&neo.Error{neo.PROTOCOL_ERROR, err.Error()})
err = xerr.Merge(err, err2)
return nil, nil, err
}
return conn, idReq, nil
}
// Accept accepts incoming client connection.
//
// On success the link was handshaked and on returned Conn peer sent us
// RequestIdentification packet which we did not yet answer.
func (l *Listener) Accept() (*neo.Conn, *neo.RequestIdentification, error) {
select{
case <-l.closed:
// we know raw listener is already closed - return proper error about it
_, err := l.l.Accept()
return nil, nil, err
case a := <-l.acceptq:
return a.conn, a.idReq, a.err
}
}
func (l *Listener) Addr() net.Addr {
return l.l.Addr()
}
/*
// Server is an interface that represents networked server
type Server interface {
......
......@@ -149,22 +149,26 @@ func (stor *Storage) talkMaster(ctx context.Context) error {
// it returns error describing why such cycle had to finish
// XXX distinguish between temporary problems and non-temporary ones?
func (stor *Storage) talkMaster1(ctx context.Context) (err error) {
Mlink, err := neo.Dial(ctx, stor.node.Net, stor.node.MasterAddr)
Mconn, accept, err := neo.Dial(ctx, stor.node.Net, stor.node.MasterAddr)
if err != nil {
return err
}
Mlink := Mconn.Link()
// close Mlink on return / cancel
defer func() {
errClose := Mlink.Close()
err = xerr.First(err, errClose)
err2 := Mlink.Close()
err = xerr.First(err, err2)
}()
/*
// request identification this way registering our node to master
accept, err := neo.IdentifyWith(neo.MASTER, Mlink, stor.node.MyInfo, stor.node.ClusterName)
if err != nil {
return err
}
*/
// XXX add master UUID -> nodeTab ? or master will notify us with it himself ?
......@@ -178,7 +182,6 @@ func (stor *Storage) talkMaster1(ctx context.Context) (err error) {
}
// now handle notifications and commands from master
var Mconn *neo.Conn
for {
// check if it was context cancel or command from master to shutdown
select {
......@@ -197,10 +200,10 @@ func (stor *Storage) talkMaster1(ctx context.Context) (err error) {
//
// XXX or simply use only the first connection and if M decides
// to cancel - close whole nodelink and S reconnects?
if Mconn != nil {
// if Mconn != nil {
Mconn.Close() // XXX err
Mconn = nil
}
// }
Mconn, err = Mlink.Accept()
if err != nil {
......
......@@ -56,9 +56,7 @@ func IdentifyWith(expectPeerType NodeType, link *NodeLink, myInfo NodeInfo, clus
}
defer func() {
err2 := conn.Close()
if err == nil && err2 != nil {
err = err2
}
err = xerr.First(err, err2)
}()
accept = &AcceptIdentification{}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment