Commit e17082f1 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 195b80a3
......@@ -26,7 +26,7 @@ import (
"lab.nexedi.com/kirr/neo/go/neo"
"lab.nexedi.com/kirr/neo/go/zodb"
"lab.nexedi.com/kirr/neo/go/xcommon/xnet"
// "lab.nexedi.com/kirr/neo/go/xcommon/xnet"
)
// Client talks to NEO cluster and exposes access it via ZODB interfaces
......@@ -98,13 +98,17 @@ func (c *Client) Iterate(tidMin, tidMax zodb.Tid) zodb.IStorageIterator {
// NewClient creates and identifies new client connected to storage over storLink
func NewClient(storLink *neo.NodeLink) (*Client, error) {
func NewClient(masterAddr string) (*Client, error) {
// TODO .myInfo.NodeType = CLIENT
// .clusterName = clusterName
// .net = ...
cli := &Client{}
//return &Client{storLink, storConn}, nil
cli.node.Dial(context.TODO(), neo.MASTER, masterAddr)
panic("TODO")
/*
// XXX move -> Run?
// first identify ourselves to peer
accept, err := neo.IdentifyWith(neo.STORAGE, storLink, cli.node.MyInfo, cli.node.ClusterName)
......@@ -128,11 +132,15 @@ func NewClient(storLink *neo.NodeLink) (*Client, error) {
_ = storConn // XXX temp
return cli, nil
*/
}
// TODO read-only support
func openClientByURL(ctx context.Context, u *url.URL) (zodb.IStorage, error) {
// XXX for now url is treated as storage node URL
// XXX u.Host -> masterAddr (not storage)
panic("TODO")
/*
// XXX check/use other url fields
net := xnet.NetPlain("tcp") // TODO + TLS; not only "tcp" ?
storLink, err := neo.DialLink(ctx, net, u.Host) // XXX -> Dial
......@@ -163,7 +171,7 @@ func openClientByURL(ctx context.Context, u *url.URL) (zodb.IStorage, error) {
case r := <-done:
return r.Client, r.error
}
*/
}
//func Open(...) (*Client, error) {
......
......@@ -31,8 +31,6 @@ import (
"sync"
"sync/atomic"
"lab.nexedi.com/kirr/go123/xerr"
"lab.nexedi.com/kirr/neo/go/xcommon/xnet"
)
......@@ -972,202 +970,3 @@ func (c *Conn) Ask(req Msg, resp Msg) error {
return err
}
// ---- Dial & Listen at application level ----
// IdentifyWith identifies local node with remote peer
// it also verifies peer's node type to what caller expects
func IdentifyWith(expectPeerType NodeType, link *NodeLink, myInfo NodeInfo, clusterName string) (accept *AcceptIdentification, err error) {
defer xerr.Contextf(&err, "%s: request identification", link)
conn, err := link.NewConn()
if err != nil {
return nil, err
}
defer func() {
err2 := conn.Close()
err = xerr.First(err, err2)
}()
accept = &AcceptIdentification{}
err = conn.Ask(&RequestIdentification{
NodeType: myInfo.NodeType,
NodeUUID: myInfo.NodeUUID,
Address: myInfo.Address,
ClusterName: clusterName,
IdTimestamp: myInfo.IdTimestamp, // XXX ok?
}, accept)
if err != nil {
return nil, err // XXX err ctx ?
}
if accept.NodeType != expectPeerType {
return nil, fmt.Errorf("accepted, but peer is not %v (identifies as %v)", expectPeerType, accept.NodeType)
}
return accept, nil
}
// Dial connects to address on given network, handshakes and requests identification
// XXX -> meth. of Node
// XXX text
func Dial(ctx context.Context, net xnet.Networker, addr string, idReq *RequestIdentification) (_ *Conn, _ *AcceptIdentification, err error) {
link, err := DialLink(ctx, net, addr)
if err != nil {
return nil, nil, err
}
// XXX vvv = IdentifyWith - use it ?
defer xerr.Contextf(&err, "%s: request identification", link)
// close link on error return
defer func() {
if err != nil {
link.Close()
}
}()
conn, err := link.NewConn()
if err != nil {
return nil, nil, err
}
accept := &AcceptIdentification{}
err = conn.Ask(idReq, accept)
if err != nil {
return nil, nil, err
}
// XXX also check expected peer type here ?
return conn, accept, nil
}
// XXX doc
func Listen(net xnet.Networker, laddr string) (*Listener, error) {
ll, err := ListenLink(net, laddr)
if err != nil {
return nil, err
}
l := &Listener{
l: ll,
acceptq: make(chan accepted),
closed: make(chan struct{}),
}
go l.run()
return l, nil
}
// Listener wraps LinkListener to return link on which identification was correctly requested XXX
// Create via Listen. XXX
type Listener struct {
l *LinkListener
acceptq chan accepted
closed chan struct {}
}
type accepted struct {
conn *Conn
idReq *RequestIdentification
err error
}
func (l *Listener) Close() error {
err := l.l.Close()
close(l.closed)
return err
}
func (l *Listener) run() {
for {
// stop on close
select {
case <-l.closed:
return
default:
}
// XXX add backpressure on too much incoming connections without client .Accept ?
link, err := l.l.Accept()
go l.accept(link, err)
}
}
func (l *Listener) accept(link *NodeLink, err error) {
res := make(chan accepted, 1)
go func() {
conn, idReq, err := l.accept1(link, err)
res <- accepted{conn, idReq, err}
}()
// wait for accept1 result & resend it to .acceptq
// close link in case of listening cancel or error
//
// the only case when link stays alive is when acceptance was
// successful and link ownership is passed to Accept.
ok := false
select {
case <-l.closed:
case a := <-res:
select {
case l.acceptq <- a:
ok = (a.err == nil)
case <-l.closed:
}
}
if !ok {
link.Close()
}
}
func (l *Listener) accept1(link *NodeLink, err0 error) (_ *Conn, _ *RequestIdentification, err error) {
if err0 != nil {
return nil, nil, err0
}
defer xerr.Context(&err, "identify")
// identify peer
// the first conn must come with RequestIdentification packet
conn, err := link.Accept()
if err != nil {
return nil, nil, err
}
idReq := &RequestIdentification{}
_, err = conn.Expect(idReq)
if err != nil {
// XXX ok to let peer know error as is? e.g. even IO error on Recv?
err2 := conn.Send(&Error{PROTOCOL_ERROR, err.Error()})
err = xerr.Merge(err, err2)
return nil, nil, err
}
return conn, idReq, nil
}
// Accept accepts incoming client connection.
//
// On success the link was handshaked and on returned Conn peer sent us
// RequestIdentification packet which we did not yet answer.
func (l *Listener) Accept() (*Conn, *RequestIdentification, error) {
select{
case <-l.closed:
// we know raw listener is already closed - return proper error about it
_, err := l.l.Accept()
return nil, nil, err
case a := <-l.acceptq:
return a.conn, a.idReq, a.err
}
}
func (l *Listener) Addr() net.Addr {
return l.l.Addr()
}
......@@ -28,6 +28,12 @@ package neo
//go:generate sh -c "go run ../xcommon/tracing/cmd/gotrace/{gotrace,util}.go ."
import (
"context"
"fmt"
"net"
"lab.nexedi.com/kirr/go123/xerr"
"lab.nexedi.com/kirr/neo/go/xcommon/xnet"
"lab.nexedi.com/kirr/neo/go/zodb"
)
......@@ -51,16 +57,66 @@ type NodeCommon struct {
Net xnet.Networker // network AP we are sending/receiving on
MasterAddr string // address of master XXX -> Address ?
// XXX + NodeTab (information about nodes in the cluster) ?
// XXX + PartTab (information about data distribution in the cluster) ?
NodeTab NodeTable // information about nodes in the cluster
PartTab PartitionTable // information about data distribution in the cluster
}
// Dial connects to another node in the cluster
//
// It handshakes, requests identification and checks peer type. If successful returned are:
// - primary link connection which carried identification
// - accept identification reply
func (n *NodeCommon) Dial(ctx context.Context, peerType NodeType, addr string) (_ *Conn, _ *AcceptIdentification, err error) {
link, err := DialLink(ctx, n.Net, addr)
if err != nil {
return nil, nil, err
}
defer xerr.Contextf(&err, "%s: request identification", link)
// close link on error return
// FIXME also close link on ctx cancel
defer func() {
if err != nil {
link.Close()
}
}()
conn, err := link.NewConn()
if err != nil {
return nil, nil, err
}
req := &RequestIdentification{
NodeType: n.MyInfo.NodeType,
NodeUUID: n.MyInfo.NodeUUID,
Address: n.MyInfo.Address,
ClusterName: n.ClusterName,
IdTimestamp: n.MyInfo.IdTimestamp, // XXX ok?
}
accept := &AcceptIdentification{}
err = conn.Ask(req, accept)
if err != nil {
return nil, nil, err
}
if accept.NodeType != peerType {
// XXX send Error to peer?
return nil, nil, fmt.Errorf("accepted, but peer is not %v (identifies as %v)", peerType, accept.NodeType)
}
//accept.MyNodeUUID, link // XXX register .NodeTab? (or better LinkTab as NodeTab is driven by M)
//accept.YourNodeUUID // XXX M can tell us to change UUID -> take in effect
return conn, accept, nil
}
// Listen starts listening at node's listening address.
// If the address is empty one new free is automatically selected.
// The node information about where it listens at is appropriately updated.
func (n *NodeCommon) Listen() (*Listener, error) {
// start listening
l, err := Listen(n.Net, n.MyInfo.Address.String()) // XXX ugly
ll, err := ListenLink(n.Net, n.MyInfo.Address.String())
if err != nil {
return nil, err // XXX err ctx
}
......@@ -69,20 +125,139 @@ func (n *NodeCommon) Listen() (*Listener, error) {
// NOTE listen("tcp", ":1234") gives l.Addr 0.0.0.0:1234 and
// listen("tcp6", ":1234") gives l.Addr [::]:1234
// -> host is never empty
addr, err := Addr(l.Addr())
addr, err := Addr(ll.Addr())
if err != nil {
// XXX -> panic here ?
l.Close()
ll.Close()
return nil, err // XXX err ctx
}
n.MyInfo.Address = addr
l := &Listener{
l: ll,
acceptq: make(chan accepted),
closed: make(chan struct{}),
}
go l.run()
return l, nil
}
// XXX func (n *Node) IdentifyWith(...) ?
// XXX better -> Connect() (=Dial, IdentifyWith, process common ID reply ...)
// Listener wraps LinkListener to return link on which identification was correctly requested XXX
// Create via Listen. XXX
type Listener struct {
l *LinkListener
acceptq chan accepted
closed chan struct {}
}
type accepted struct {
conn *Conn
idReq *RequestIdentification
err error
}
func (l *Listener) Close() error {
err := l.l.Close()
close(l.closed)
return err
}
func (l *Listener) run() {
for {
// stop on close
select {
case <-l.closed:
return
default:
}
// XXX add backpressure on too much incoming connections without client .Accept ?
link, err := l.l.Accept()
go l.accept(link, err)
}
}
func (l *Listener) accept(link *NodeLink, err error) {
res := make(chan accepted, 1)
go func() {
conn, idReq, err := l.accept1(link, err)
res <- accepted{conn, idReq, err}
}()
// wait for accept1 result & resend it to .acceptq
// close link in case of listening cancel or error
//
// the only case when link stays alive is when acceptance was
// successful and link ownership is passed to Accept.
ok := false
select {
case <-l.closed:
case a := <-res:
select {
case l.acceptq <- a:
ok = (a.err == nil)
case <-l.closed:
}
}
if !ok {
link.Close()
}
}
func (l *Listener) accept1(link *NodeLink, err0 error) (_ *Conn, _ *RequestIdentification, err error) {
if err0 != nil {
return nil, nil, err0
}
defer xerr.Context(&err, "identify")
// identify peer
// the first conn must come with RequestIdentification packet
conn, err := link.Accept()
if err != nil {
return nil, nil, err
}
idReq := &RequestIdentification{}
_, err = conn.Expect(idReq)
if err != nil {
// XXX ok to let peer know error as is? e.g. even IO error on Recv?
err2 := conn.Send(&Error{PROTOCOL_ERROR, err.Error()})
err = xerr.Merge(err, err2)
return nil, nil, err
}
return conn, idReq, nil
}
// Accept accepts incoming client connection.
//
// On success the link was handshaked and peer sent us RequestIdentification
// packet which we did not yet answer.
//
// On success returned are:
// - primary link connection which carried identification
// - requested identification packet
func (l *Listener) Accept() (*Conn, *RequestIdentification, error) {
select{
case <-l.closed:
// we know raw listener is already closed - return proper error about it
_, err := l.l.Accept()
return nil, nil, err
case a := <-l.acceptq:
return a.conn, a.idReq, a.err
}
}
func (l *Listener) Addr() net.Addr {
return l.l.Addr()
}
// TODO functions to update:
// .PartTab from NotifyPartitionTable msg
......
......@@ -26,7 +26,7 @@ import (
//"sync"
)
// NodeTable represents all nodes in a cluster
// NodeTable represents known nodes in a cluster
//
// Usually Master maintains such table and provides it to other nodes to know
// each other but in general use-cases can be different.
......
......@@ -159,6 +159,7 @@ func (m *Master) Run(ctx context.Context) error {
go func() {
defer wg.Done()
// XXX dup in storage
for serveCtx.Err() != nil {
conn, idReq, err := l.Accept()
if err != nil {
......
......@@ -100,10 +100,26 @@ func (stor *Storage) Run(ctx context.Context) error {
go func() {
defer wg.Done()
// TODO l.Accept() -> nodeCome
// XXX dup from master
for serveCtx.Err() != nil {
conn, idReq, err := l.Accept()
if err != nil {
// TODO log / throttle
continue
}
_ = idReq
// err = Serve(serveCtx, l, stor)
// _ = err // XXX what to do with err ?
select {
//case m.nodeCome <- nodeCome{conn, idReq, nil/*XXX kill*/}:
// // ok
case <-serveCtx.Done():
// shutdown
conn.Link().Close() // XXX log err ?
return
}
}
}()
// connect to master and get commands and updates from it
......@@ -149,7 +165,7 @@ func (stor *Storage) talkMaster(ctx context.Context) error {
// it returns error describing why such cycle had to finish
// XXX distinguish between temporary problems and non-temporary ones?
func (stor *Storage) talkMaster1(ctx context.Context) (err error) {
Mconn, accept, err := neo.Dial(ctx, stor.node.Net, stor.node.MasterAddr)
Mconn, accept, err := stor.node.Dial(ctx, neo.MASTER, stor.node.MasterAddr)
if err != nil {
return err
}
......@@ -162,20 +178,13 @@ func (stor *Storage) talkMaster1(ctx context.Context) (err error) {
err = xerr.First(err, err2)
}()
/*
// request identification this way registering our node to master
accept, err := neo.IdentifyWith(neo.MASTER, Mlink, stor.node.MyInfo, stor.node.ClusterName)
if err != nil {
return err
}
*/
// XXX add master UUID -> nodeTab ? or master will notify us with it himself ?
if !(accept.NumPartitions == 1 && accept.NumReplicas == 1) {
return fmt.Errorf("TODO for 1-storage POC: Npt: %v Nreplica: %v", accept.NumPartitions, accept.NumReplicas)
}
// XXX -> node.Dial ?
if accept.YourNodeUUID != stor.node.MyInfo.NodeUUID {
fmt.Printf("stor: %v: master told us to have UUID=%v\n", Mlink, accept.YourNodeUUID)
stor.node.MyInfo.NodeUUID = accept.YourNodeUUID
......
......@@ -2,11 +2,7 @@ package neo
// TODO move me properly -> connection.go
import (
"fmt"
"lab.nexedi.com/kirr/neo/go/zodb"
"lab.nexedi.com/kirr/go123/xerr"
)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment