Commit 28fb1ee8 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent a57954f1
......@@ -20,7 +20,6 @@ package neo
import (
"context"
"fmt"
"net/url"
"../zodb"
......@@ -28,6 +27,16 @@ import (
// Client talks to NEO cluster and exposes access it via ZODB interfaces
type Client struct {
// XXX move -> nodeCommon?
// ---- 8< ----
myInfo NodeInfo // XXX -> only NodeUUID
clusterName string
net Network // network we are sending/receiving on
masterAddr string // address of master XXX -> Address ?
// ---- 8< ----
storLink *NodeLink // link to storage node
storConn *Conn // XXX main connection to storage
}
......@@ -89,15 +98,21 @@ func (c *Client) Iterate(tidMin, tidMax zodb.Tid) zodb.IStorageIterator {
// NewClient creates and identifies new client connected to storage over storLink
func NewClient(storLink *NodeLink) (*Client, error) {
// TODO .myInfo.NodeType = CLIENT
// .clusterName = clusterName
// .net = ...
cli := &Client{}
//return &Client{storLink, storConn}, nil
// XXX move -> Run?
// first identify ourselves to peer
storType, err := IdentifyMe(storLink, CLIENT)
accept, err := IdentifyWith(STORAGE, storLink, cli.myInfo, cli.clusterName)
if err != nil {
return nil, err
}
if storType != STORAGE {
// XXX + "newclient" to err ctx ?
return nil, fmt.Errorf("%v: peer is not storage (identifies as %v)", storLink, storType)
}
// TODO verify accept more
_ = accept
// identification passed
......@@ -110,7 +125,8 @@ func NewClient(storLink *NodeLink) (*Client, error) {
return nil, err // XXX err ctx
}
return &Client{storLink, storConn}, nil
_ = storConn // XXX temp
return cli, nil
}
// TODO read-only support
......
......@@ -20,7 +20,9 @@ package neo
import (
"context"
"fmt"
"net"
"strconv"
"crypto/tls"
"../xcommon/pipenet"
......@@ -98,7 +100,7 @@ func (n *netTLS) Listen(laddr string) (net.Listener, error) {
// String formats Address to canonical host:port form
func (addr Address) String() string {
// XXX in py if .Host == "" -> whole Address is assumed to be empty
net.JoinHostPort(addr.Host, fmt.Sprintf("%d", addr.Port))
return net.JoinHostPort(addr.Host, fmt.Sprintf("%d", addr.Port))
}
// ParseAddress parses networked address of form host:port into NEO Address
......
//go:generate stringer -output proto-str2.go -type ErrorCode proto.go
//go:generate stringer -output proto-str2.go -type ErrorCode,NodeType proto.go
package neo
......
// Code generated by "stringer -output proto-str2.go -type ErrorCode proto.go"; DO NOT EDIT.
// Code generated by "stringer -output proto-str2.go -type ErrorCode,NodeType proto.go"; DO NOT EDIT.
package neo
......@@ -14,3 +14,14 @@ func (i ErrorCode) String() string {
}
return _ErrorCode_name[_ErrorCode_index[i]:_ErrorCode_index[i+1]]
}
const _NodeType_name = "MASTERSTORAGECLIENTADMIN"
var _NodeType_index = [...]uint8{0, 6, 13, 19, 24}
func (i NodeType) String() string {
if i < 0 || i >= NodeType(len(_NodeType_index)-1) {
return fmt.Sprintf("NodeType(%d)", i)
}
return _NodeType_name[_NodeType_index[i]:_NodeType_index[i+1]]
}
......@@ -144,36 +144,40 @@ func IdentifyPeer(link *NodeLink, myNodeType NodeType) (nodeInfo RequestIdentifi
return req, nil
}
// IdentifyMe identifies local node to remote peer
func IdentifyMe(link *NodeLink, myInfo NodeInfo, clusterName string) (peerType NodeType, err error) {
// IdentifyWith identifies local node with remote peer
// it also verifies peer's node type to what caller expects
func IdentifyWith(expectPeerType NodeType, link *NodeLink, myInfo NodeInfo, clusterName string) (accept *AcceptIdentification, err error) {
defer errcontextf(&err, "%s: request identification", link)
conn, err := link.NewConn()
if err != nil {
return 0, err
return nil, err
}
defer func() {
err2 := conn.Close()
if err == nil && err2 != nil {
err = err2
// XXX also reset peerType
}
}()
resp := AcceptIdentification{}
accept = &AcceptIdentification{}
err = Ask(conn, &RequestIdentification{
NodeType: myInfo.NodeType,
NodeUUID: myInfo.NodeUUID,
Address: myInfo.Address,
ClusterName: clusterName,
IdTimestamp: myInfo.IdTimestamp, // XXX ok?
}, &resp)
}, accept)
if err != nil {
return 0, err
return nil, err // XXX err ctx ?
}
return resp.NodeType, nil
if accept.NodeType != expectPeerType {
return nil, fmt.Errorf("accepted, but peer is not %v (identifies as %v)", expectPeerType, accept.NodeType)
}
return accept, nil
}
// ----------------------------------------
......@@ -193,14 +197,17 @@ func RecvAndDecode(conn *Conn) (NEOEncoder, error) { // XXX NEOEncoder -> interf
msgCode := ntoh16(pkth.MsgCode)
msgType := pktTypeRegistry[msgCode]
if msgType == nil {
return nil, fmt.Errorf("invalid msgCode (%d)", msgCode) // XXX err context
err = fmt.Errorf("invalid msgCode (%d)", msgCode)
// XXX -> ProtoError ?
return nil, &ConnError{Conn: conn, Op: "decode", Err: err}
}
// TODO use free-list for decoded packets + when possible decode in-place
pktObj := reflect.New(msgType).Interface().(NEOCodec)
_, err = pktObj.NEODecode(pkt.Payload())
if err != nil {
return nil, err // XXX err ctx ?
// XXX -> ProtoError ?
return nil, &ConnError{Conn: conn, Op: "decode", Err: err}
}
return pktObj, nil
......@@ -236,6 +243,7 @@ func Ask(conn *Conn, req NEOEncoder, resp NEODecoder) error {
// ProtoError is returned when there waa a protocol error, like receiving
// unexpected packet or packet with wrong header
// XXX -> ConnError{Op: "decode"} ?
type ProtoError struct {
Conn *Conn
Err error
......
......@@ -25,8 +25,8 @@ import (
"io"
"log"
"os"
//"time"
"strings"
"time"
"../zodb"
"../zodb/storage/fs1"
......@@ -36,11 +36,14 @@ import (
// Storage is NEO storage server application
type Storage struct {
// XXX move -> nodeCommon?
// ---- 8< ----
myInfo NodeInfo // XXX -> only Address + NodeUUID ?
clusterName string
net Network // network we are working on
net Network // network we are sending/receiving on
masterAddr string // address of master XXX -> Address ?
// ---- 8< ----
zstor zodb.IStorage // underlying ZODB storage XXX temp ?
}
......@@ -48,8 +51,20 @@ type Storage struct {
// NewStorage creates new storage node that will listen on serveAddr and talk to master on masterAddr
// The storage uses zstor as underlying backend for storing data.
// To actually start running the node - call Run. XXX text
func NewStorage(cluster string, net Network, masterAddr string, serveAddr string, zstor zodb.IStorage) *Storage {
stor := &Storage{clusterName: cluster, net: net, masterAddr: masterAddr, zstor: zstor}
func NewStorage(cluster string, masterAddr string, serveAddr string, net Network, zstor zodb.IStorage) *Storage {
// convert serveAddr into neo format
addr, err := ParseAddress(serveAddr)
if err != nil {
panic(err) // XXX
}
stor := &Storage{
myInfo: NodeInfo{NodeType: STORAGE, Address: addr},
clusterName: cluster,
net: net,
masterAddr: masterAddr,
zstor: zstor,
}
return stor
}
......@@ -59,7 +74,7 @@ func NewStorage(cluster string, net Network, masterAddr string, serveAddr string
// commands it to shutdown.
func (stor *Storage) Run(ctx context.Context) error {
// start listening
l, err := net.Listen(serveAddr)
l, err := stor.net.Listen(stor.myInfo.Address.String()) // XXX ugly
if err != nil {
return err // XXX err ctx
}
......@@ -77,7 +92,7 @@ func (stor *Storage) Run(ctx context.Context) error {
return err // XXX err ctx
}
my.Address = addr
stor.myInfo.Address = addr
go stor.talkMaster(ctx)
......@@ -88,11 +103,15 @@ func (stor *Storage) Run(ctx context.Context) error {
}
// talkMaster connects to master, announces self and receives notifications and commands
// XXX and notifies master about ? (e.g. StartOperation -> NotifyReady)
// it tries to persist master link reconnecting as needed
func (stor *Storage) talkMaster(ctx context.Context) {
for {
stor.talkMaster1(ctx) // XXX err -> log ?
fmt.Printf("stor: master(%v): connecting\n", stor.masterAddr) // XXX info
err := stor.talkMaster1(ctx)
fmt.Printf("stor: master(%v): %v\n", stor.masterAddr, err)
// XXX handle shutdown command from master
// throttle reconnecting / exit on cancel
select {
......@@ -106,18 +125,53 @@ func (stor *Storage) talkMaster(ctx context.Context) {
}
}
func (stor *Storage) talkMaster1(ctx context.Context) {
fmt.Printf("stor: connecting to master %v\n", stor.masterAddr) // XXX info
// talkMaster1 does 1 cycle of connect/talk/disconnect to master
// it returns error describing why such cycle had to finish
// XXX distinguish between temporary problems and non-temporary ones?
func (stor *Storage) talkMaster1(ctx context.Context) error {
Mlink, err := Dial(ctx, stor.net, stor.masterAddr)
if err != nil {
// err: XXX log or return ?
return err
}
// TODO Mlink.Close() on return / cancel
?, err := IdentifyMe(Mlink, stor.myInfo, stor.clusterName)
// TODO
// request identification this way registering our node to master
accept, err := IdentifyWith(MASTER, Mlink, stor.myInfo, stor.clusterName)
if err != nil {
return err
}
// XXX add master UUID -> nodeTab ? or master will notify us with it himself ?
if !(accept.NumPartitions == 1 && accept.NumReplicas == 1) {
return fmt.Errorf("TODO for 1-storage POC: Npt: %v Nreplica: %v", accept.NumPartitions, accept.NumReplicas)
}
if accept.YourNodeUUID != stor.myInfo.NodeUUID {
fmt.Printf("stor: %v: master told us to have UUID=%v\n", Mlink, accept.YourNodeUUID)
stor.myInfo.NodeUUID = accept.YourNodeUUID
// XXX notify anyone?
}
// now handle notifications and commands from master
// FIXME wrong - either keep conn as one used from identification or accept from listening
conn, err := Mlink.NewConn()
if err != nil { panic(err) } // XXX
for {
notify, err := RecvAndDecode(conn)
if err != nil {
// XXX TODO
}
_ = notify // XXX temp
}
}
// ServeLink serves incoming node-node link connection
......@@ -318,13 +372,31 @@ Run NEO storage node.
}
func storageMain(argv []string) {
var bind string
flags := flag.NewFlagSet("", flag.ExitOnError)
flags.Usage = func() { storageUsage(os.Stderr); flags.PrintDefaults() } // XXX prettify
flags.StringVar(&bind, "bind", bind, "address to serve on")
cluster := flags.String("cluster", "", "the cluster name")
masters := flags.String("masters", "", "list of masters")
bind := flags.String("bind", "", "address to serve on")
flags.Parse(argv[1:])
if *cluster == "" {
// XXX vvv -> die or log.Fatalf ?
fmt.Fprintf(os.Stderr, "cluster name must be provided")
os.Exit(2)
}
masterv := strings.Split(*masters, ",")
if len(masterv) == 0 {
fmt.Fprintf(os.Stderr, "master list must be provided")
os.Exit(2)
}
if len(masterv) > 1 {
fmt.Fprintf(os.Stderr, "BUG neo/go POC currently supports only 1 master")
os.Exit(2)
}
master := masterv[0]
argv = flags.Args()
if len(argv) < 1 {
flags.Usage()
......@@ -337,7 +409,9 @@ func storageMain(argv []string) {
log.Fatal(err)
}
storSrv := NewStorage(zstor)
net := NetPlain("tcp") // TODO + TLS; not only "tcp" ?
storSrv := NewStorage(*cluster, master, *bind, net, zstor)
ctx := context.Background()
/*
......@@ -348,8 +422,7 @@ func storageMain(argv []string) {
}()
*/
net := NetPlain("tcp") // TODO + TLS; not only "tcp" ?
//err = ListenAndServe(ctx, net, bind, storSrv)
err = storSrv.Run(ctx)
if err != nil {
log.Fatal(err)
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment