Commit c6dda6c2 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 6312399d
......@@ -728,27 +728,3 @@ func (c *Conn) err(op string, e error) error {
}
return &ConnError{Conn: c, Op: op, Err: e}
}
// ----------------------------------------
// XXX ^^^ original description about notify/ask/answer
// All packets are classified to be of one of the following kind:
// - notify: a packet is sent without expecting any reply
// - ask: a packet is sent and reply is expected
// - answer: a packet replying to previous ask
//
// At any time there can be several Asks packets issued by both nodes.
// For an Ask packet a single Answer reply is expected XXX vs protocol where there is one request and list of replies ?
//
// XXX -> multiple subconnection explicitly closed with ability to chat
// multiple packets without spawning goroutines? And only single answer
// expected implemented that after only ask-send / answer-receive the
// (sub-)connection is explicitly closed ?
//
// XXX it is maybe better to try to avoid multiplexing by hand and let the OS do it?
//
// A reply to particular Ask packet, once received, will be delivered to
// corresponding goroutine which originally issued Ask XXX this can be put into interface
......@@ -31,6 +31,8 @@ import (
"sync"
"../zodb"
"lab.nexedi.com/kirr/go123/xerr"
)
// Master is a node overseeing and managing how whole NEO cluster works
......@@ -175,7 +177,7 @@ func (m *Master) run(ctx context.Context) {
// - !nil: recovery was cancelled
func (m *Master) recovery(ctx context.Context) (err error) {
fmt.Println("master: recovery")
defer errcontextf(&err, "master: recovery")
defer xerr.Context(&err, "master: recovery")
m.setClusterState(ClusterRecovering)
rctx, rcancel := context.WithCancel(ctx)
......@@ -299,7 +301,7 @@ func storCtlRecovery(ctx context.Context, link *NodeLink, res chan storRecovery)
link.Close()
*/
}()
defer errcontextf(&err, "%s: stor recovery", link)
defer xerr.Contextf(&err, "%s: stor recovery", link)
conn, err := link.NewConn() // FIXME bad
if err != nil {
......@@ -362,7 +364,7 @@ var errClusterDegraded = errors.New("cluster became non-operatonal")
// prerequisite for start: .partTab is operational wrt .nodeTab
func (m *Master) verify(ctx context.Context) (err error) {
fmt.Println("master: verify")
defer errcontextf(&err, "master: verify")
defer xerr.Context(&err, "master: verify")
m.setClusterState(ClusterVerifying)
vctx, vcancel := context.WithCancel(ctx)
......@@ -481,7 +483,7 @@ func storCtlVerify(ctx context.Context, link *NodeLink, res chan storVerify) {
res <- storVerify{link: link, err: err}
}
}()
defer errcontextf(&err, "%s: verify", link)
defer xerr.Contextf(&err, "%s: verify", link)
// FIXME stub
conn, _ := link.NewConn()
......@@ -525,7 +527,7 @@ func storCtlVerify(ctx context.Context, link *NodeLink, res chan storVerify) {
// prerequisite for start: .partTab is operational wrt .nodeTab and verification passed
func (m *Master) service(ctx context.Context) (err error) {
fmt.Println("master: service")
defer errcontextf(&err, "master: service")
defer xerr.Context(&err, "master: service")
m.setClusterState(ClusterRunning)
......@@ -804,6 +806,8 @@ func (m *Master) ServeClient(ctx context.Context, link *NodeLink) {
// ---- internal requests for storage driver ----
// XXX goes away
/*
// storageRecovery asks storage driver to extract cluster recovery information from storage
type storageRecovery struct {
resp chan PartitionTable // XXX +err ?
......@@ -823,6 +827,7 @@ type storageStartOperation struct {
type storageStopOperation struct {
resp chan error
}
*/
// DriveStorage serves incoming connection on which peer identified itself as storage
//
......
......@@ -96,6 +96,7 @@ func (n *netTLS) Listen(laddr string) (net.Listener, error) {
return tls.NewListener(l, n.config), nil
}
// ----------------------------------------
// String formats Address to canonical host:port form
func (addr Address) String() string {
......
......@@ -206,11 +206,11 @@ func (nt *NodeTable) Subscribe() (ch chan NodeInfo, unsubscribe func()) {
return ch, unsubscribe
}
// SubscribeBufferred subscribes to NodeTable updates without blocking updater
// SubscribeBuffered subscribes to NodeTable updates without blocking updater
// it returns a channel via which updates are delivered and unsubscribe function
// the updates will be sent to destination in non-blocking way - if destination
// channel is not ready they will be bufferred.
// it is the caller reponsibility to make sure such buffering does not grow up
// channel is not ready they will be buffered.
// it is the caller responsibility to make sure such buffering does not grow up
// to infinity - via e.g. detecting stuck connections and unsubscribing on shutdown
//
// XXX locking: client for subscribe/unsubscribe XXX ok?
......
......@@ -96,7 +96,7 @@ package neo
// S2 S2
// S3 → S2
//
// Np thus is always multiple of Ns and with furter reorderings (if needed)
// Np thus is always multiple of Ns and with further reorderings (if needed)
// could be reduced directly to Ns.
//
// Usually Master maintains partition table, plans partition updates and tells
......@@ -149,7 +149,7 @@ func (pt *PartitionTable) OperationalWith(nt *NodeTable) bool {
cellLoop:
for _, cell := range ptEntry {
switch cell.CellState {
case UP_TO_DATE, FEEDING: // XXX cell.isReadble in py
case UP_TO_DATE, FEEDING: // XXX cell.isReadable in py
// cell says it is readable. let's check whether corresponding node is up
// FIXME checking whether it is up is not really enough -
// - what is needed to check is that data on that node is up
......
......@@ -18,7 +18,7 @@ func (e *Error) Error() string {
}
const nodeTypeChar = "MSCA4567"
const nodeTypeChar = "MSCA4567" // keep in sync with NodeType constants
func (nodeUUID NodeUUID) String() string {
// return ex 'S1', 'M2', ...
......
......@@ -23,6 +23,8 @@ import (
"fmt"
"net"
"reflect"
"lab.nexedi.com/kirr/go123/xerr"
)
// Server is an interface that represents networked server
......@@ -76,6 +78,7 @@ func Serve(ctx context.Context, l net.Listener, srv Server) error {
}
// ListenAndServe listens on network address and then calls Serve to handle incoming connections
// XXX unused -> goes away ?
func ListenAndServe(ctx context.Context, net Network, laddr string, srv Server) error {
l, err := net.Listen(laddr)
if err != nil {
......@@ -88,23 +91,11 @@ func ListenAndServe(ctx context.Context, net Network, laddr string, srv Server)
// ----------------------------------------
// errcontextf adds formatted prefix context to *errp
// must be called under defer
func errcontextf(errp *error, format string, argv ...interface{}) {
if *errp == nil {
return
}
format += ": %s"
argv = append(argv, *errp)
*errp = fmt.Errorf(format, argv...)
}
// IdentifyPeer identifies peer on the link
// it expects peer to send RequestIdentification packet and replies with AcceptIdentification if identification passes.
// returns information about identified node or error.
func IdentifyPeer(link *NodeLink, myNodeType NodeType) (nodeInfo RequestIdentification /*TODO -> NodeInfo*/, err error) {
defer errcontextf(&err, "%s: identify", link)
defer xerr.Contextf(&err, "%s: identify", link)
// the first conn must come with RequestIdentification packet
conn, err := link.Accept()
......@@ -147,7 +138,7 @@ func IdentifyPeer(link *NodeLink, myNodeType NodeType) (nodeInfo RequestIdentifi
// IdentifyWith identifies local node with remote peer
// it also verifies peer's node type to what caller expects
func IdentifyWith(expectPeerType NodeType, link *NodeLink, myInfo NodeInfo, clusterName string) (accept *AcceptIdentification, err error) {
defer errcontextf(&err, "%s: request identification", link)
defer xerr.Contextf(&err, "%s: request identification", link)
conn, err := link.NewConn()
if err != nil {
......
......@@ -196,6 +196,7 @@ func (stor *Storage) ServeLink(ctx context.Context, link *NodeLink) {
link.Close() // XXX err
}()
// XXX recheck identification logic here
nodeInfo, err := IdentifyPeer(link, STORAGE)
if err != nil {
fmt.Printf("stor: %v\n", err)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment