Commit ccd74eb4 authored by Kirill Smelkov's avatar Kirill Smelkov

X merge neo/server & neo/client -> neo

parent 98ac2433
// Copyright (C) 2017 Nexedi SA and Contributors. // Copyright (C) 2017-2018 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com> // Kirill Smelkov <kirr@nexedi.com>
// //
// This program is free software: you can Use, Study, Modify and Redistribute // This program is free software: you can Use, Study, Modify and Redistribute
...@@ -17,16 +17,15 @@ ...@@ -17,16 +17,15 @@
// See COPYING file for full licensing terms. // See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options. // See https://www.nexedi.com/licensing for rationale and options.
// Package client provides ZODB storage interface for accessing NEO cluster. package neo
package client
// XXX old: Package client provides ZODB storage interface for accessing NEO cluster.
import ( import (
"context" "context"
"crypto/sha1"
"fmt" "fmt"
"math/rand" "math/rand"
"net/url" "net/url"
"os"
"sync" "sync"
"time" "time"
...@@ -35,7 +34,6 @@ import ( ...@@ -35,7 +34,6 @@ import (
"lab.nexedi.com/kirr/go123/mem" "lab.nexedi.com/kirr/go123/mem"
"lab.nexedi.com/kirr/go123/xnet" "lab.nexedi.com/kirr/go123/xnet"
"lab.nexedi.com/kirr/neo/go/neo"
"lab.nexedi.com/kirr/neo/go/neo/neonet" "lab.nexedi.com/kirr/neo/go/neo/neonet"
"lab.nexedi.com/kirr/neo/go/neo/proto" "lab.nexedi.com/kirr/neo/go/neo/proto"
"lab.nexedi.com/kirr/neo/go/neo/internal/common" "lab.nexedi.com/kirr/neo/go/neo/internal/common"
...@@ -47,7 +45,7 @@ import ( ...@@ -47,7 +45,7 @@ import (
// Client talks to NEO cluster and exposes access to it via ZODB interfaces. // Client talks to NEO cluster and exposes access to it via ZODB interfaces.
type Client struct { type Client struct {
node *neo.NodeApp node *NodeApp
talkMasterCancel func() talkMasterCancel func()
...@@ -79,7 +77,7 @@ var _ zodb.IStorageDriver = (*Client)(nil) ...@@ -79,7 +77,7 @@ var _ zodb.IStorageDriver = (*Client)(nil)
// It will connect to master @masterAddr and identify with specified cluster name. // It will connect to master @masterAddr and identify with specified cluster name.
func NewClient(clusterName, masterAddr string, net xnet.Networker) *Client { func NewClient(clusterName, masterAddr string, net xnet.Networker) *Client {
cli := &Client{ cli := &Client{
node: neo.NewNodeApp(net, proto.CLIENT, clusterName, masterAddr, ""), node: NewNodeApp(net, proto.CLIENT, clusterName, masterAddr, ""),
mlinkReady: make(chan struct{}), mlinkReady: make(chan struct{}),
operational: false, operational: false,
opReady: make(chan struct{}), opReady: make(chan struct{}),
...@@ -141,7 +139,7 @@ func (c *Client) masterLink(ctx context.Context) (*neonet.NodeLink, error) { ...@@ -141,7 +139,7 @@ func (c *Client) masterLink(ctx context.Context) (*neonet.NodeLink, error) {
// XXX move somehow -> NodeApp? // XXX move somehow -> NodeApp?
func (c *Client) updateOperational() (sendReady func()) { func (c *Client) updateOperational() (sendReady func()) {
// XXX py client does not wait for cluster state = running // XXX py client does not wait for cluster state = running
operational := // c.node.ClusterState == neo.ClusterRunning && operational := // c.node.ClusterState == ClusterRunning &&
c.node.PartTab.OperationalWith(c.node.NodeTab) c.node.PartTab.OperationalWith(c.node.NodeTab)
//fmt.Printf("\nupdateOperatinal: %v\n", operational) //fmt.Printf("\nupdateOperatinal: %v\n", operational)
...@@ -336,7 +334,7 @@ func (c *Client) initFromMaster(ctx context.Context, mlink *neonet.NodeLink) (er ...@@ -336,7 +334,7 @@ func (c *Client) initFromMaster(ctx context.Context, mlink *neonet.NodeLink) (er
return err return err
} }
pt := neo.PartTabFromDump(rpt.PTid, rpt.RowList) pt := PartTabFromDump(rpt.PTid, rpt.RowList)
log.Infof(ctx, "master initialized us with next parttab:\n%s", pt) log.Infof(ctx, "master initialized us with next parttab:\n%s", pt)
c.node.StateMu.Lock() c.node.StateMu.Lock()
c.node.PartTab = pt c.node.PartTab = pt
...@@ -348,8 +346,8 @@ func (c *Client) initFromMaster(ctx context.Context, mlink *neonet.NodeLink) (er ...@@ -348,8 +346,8 @@ func (c *Client) initFromMaster(ctx context.Context, mlink *neonet.NodeLink) (er
XXX don't need this in init? XXX don't need this in init?
// ask M about last_tid // ask M about last_tid
rlastTxn := neo.AnswerLastTransaction{} rlastTxn := AnswerLastTransaction{}
err = mlink.Ask1(&neo.LastTransaction{}, &rlastTxn) err = mlink.Ask1(&LastTransaction{}, &rlastTxn)
if err != nil { if err != nil {
return err return err
} }
...@@ -398,15 +396,6 @@ func (c *Client) Load(ctx context.Context, xid zodb.Xid) (buf *mem.Buf, serial z ...@@ -398,15 +396,6 @@ func (c *Client) Load(ctx context.Context, xid zodb.Xid) (buf *mem.Buf, serial z
return buf, serial, err return buf, serial, err
} }
// XXX for benchmarking: how much sha1 computation takes time from latency
var xsha1skip bool
func init() {
if os.Getenv("X_NEOGO_SHA1_SKIP") == "y" {
fmt.Fprintln(os.Stderr, "# NEO/go/client: skipping SHA1 checks")
xsha1skip = true
}
}
func (c *Client) _Load(ctx context.Context, xid zodb.Xid) (*mem.Buf, zodb.Tid, error) { func (c *Client) _Load(ctx context.Context, xid zodb.Xid) (*mem.Buf, zodb.Tid, error) {
err := c.withOperational(ctx) err := c.withOperational(ctx)
if err != nil { if err != nil {
...@@ -415,7 +404,7 @@ func (c *Client) _Load(ctx context.Context, xid zodb.Xid) (*mem.Buf, zodb.Tid, e ...@@ -415,7 +404,7 @@ func (c *Client) _Load(ctx context.Context, xid zodb.Xid) (*mem.Buf, zodb.Tid, e
// here we have cluster state operational and rlocked. Retrieve // here we have cluster state operational and rlocked. Retrieve
// storages we might need to access and release the lock. // storages we might need to access and release the lock.
storv := make([]*neo.Node, 0, 1) storv := make([]*Node, 0, 1)
for _, cell := range c.node.PartTab.Get(xid.Oid) { for _, cell := range c.node.PartTab.Get(xid.Oid) {
if cell.Readable() { if cell.Readable() {
stor := c.node.NodeTab.Get(cell.UUID) stor := c.node.NodeTab.Get(cell.UUID)
...@@ -460,7 +449,7 @@ func (c *Client) _Load(ctx context.Context, xid zodb.Xid) (*mem.Buf, zodb.Tid, e ...@@ -460,7 +449,7 @@ func (c *Client) _Load(ctx context.Context, xid zodb.Xid) (*mem.Buf, zodb.Tid, e
buf := resp.Data buf := resp.Data
if !xsha1skip { if !xsha1skip {
checksum := sha1.Sum(buf.Data) checksum := sha1Sum(buf.Data)
if checksum != resp.Checksum { if checksum != resp.Checksum {
return nil, 0, fmt.Errorf("data corrupt: checksum mismatch") return nil, 0, fmt.Errorf("data corrupt: checksum mismatch")
} }
...@@ -491,6 +480,9 @@ func (c *Client) Iterate(ctx context.Context, tidMin, tidMax zodb.Tid) zodb.ITxn ...@@ -491,6 +480,9 @@ func (c *Client) Iterate(ctx context.Context, tidMin, tidMax zodb.Tid) zodb.ITxn
} }
// ---- ZODB open/url support ----
func openClientByURL(ctx context.Context, u *url.URL, opt *zodb.OpenOptions) (zodb.IStorageDriver, error) { func openClientByURL(ctx context.Context, u *url.URL, opt *zodb.OpenOptions) (zodb.IStorageDriver, error) {
// neo://name@master1,master2,...,masterN?options // neo://name@master1,master2,...,masterN?options
......
// Copyright (C) 2017 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
package client
// misc utilities
import (
"bytes"
"compress/zlib"
"context"
"io"
"lab.nexedi.com/kirr/neo/go/xcommon/log"
)
// lclose closes c and logs closing error if there was any.
// the error is otherwise ignored
// XXX dup in neo,server
func lclose(ctx context.Context, c io.Closer) {
err := c.Close()
if err != nil {
log.Error(ctx, err)
}
}
// decompress decompresses data according to zlib encoding.
//
// out buffer, if there is enough capacity, is used for decompression destination.
// if out has not enough capacity a new buffer is allocated and used.
//
// return: destination buffer with full decompressed data or error.
func decompress(in []byte, out []byte) (data []byte, err error) {
bin := bytes.NewReader(in)
zr, err := zlib.NewReader(bin)
if err != nil {
return nil, err
}
defer func() {
err2 := zr.Close()
if err2 != nil && err == nil {
err = err2
data = nil
}
}()
bout := bytes.NewBuffer(out)
_, err = io.Copy(bout, zr)
if err != nil {
return nil, err
}
return bout.Bytes(), nil
}
// Copyright (C) 2017 Nexedi SA and Contributors. // Copyright (C) 2017-2018 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com> // Kirill Smelkov <kirr@nexedi.com>
// //
// This program is free software: you can Use, Study, Modify and Redistribute // This program is free software: you can Use, Study, Modify and Redistribute
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
// See COPYING file for full licensing terms. // See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options. // See https://www.nexedi.com/licensing for rationale and options.
package server package neo
// test interaction between nodes // test interaction between nodes
//go:generate gotrace gen . //go:generate gotrace gen .
...@@ -31,16 +31,13 @@ import ( ...@@ -31,16 +31,13 @@ import (
//"reflect" //"reflect"
"sync" "sync"
"testing" "testing"
"unsafe"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
//"github.com/kylelemons/godebug/pretty" //"github.com/kylelemons/godebug/pretty"
"lab.nexedi.com/kirr/neo/go/neo"
"lab.nexedi.com/kirr/neo/go/neo/neonet" "lab.nexedi.com/kirr/neo/go/neo/neonet"
"lab.nexedi.com/kirr/neo/go/neo/proto" "lab.nexedi.com/kirr/neo/go/neo/proto"
"lab.nexedi.com/kirr/neo/go/neo/client"
//"lab.nexedi.com/kirr/neo/go/neo/internal/common" //"lab.nexedi.com/kirr/neo/go/neo/internal/common"
//"lab.nexedi.com/kirr/neo/go/zodb" //"lab.nexedi.com/kirr/neo/go/zodb"
...@@ -293,8 +290,8 @@ type TraceCollector struct { ...@@ -293,8 +290,8 @@ type TraceCollector struct {
pg *tracing.ProbeGroup pg *tracing.ProbeGroup
d *tsync.EventDispatcher d *tsync.EventDispatcher
node2Name map[*neo.NodeApp]string node2Name map[*NodeApp]string
nodeTab2Owner map[*neo.NodeTable]string nodeTab2Owner map[*NodeTable]string
clusterState2Owner map[*proto.ClusterState]string clusterState2Owner map[*proto.ClusterState]string
} }
...@@ -303,13 +300,12 @@ func NewTraceCollector(dispatch *tsync.EventDispatcher) *TraceCollector { ...@@ -303,13 +300,12 @@ func NewTraceCollector(dispatch *tsync.EventDispatcher) *TraceCollector {
pg: &tracing.ProbeGroup{}, pg: &tracing.ProbeGroup{},
d: dispatch, d: dispatch,
node2Name: make(map[*neo.NodeApp]string), node2Name: make(map[*NodeApp]string),
nodeTab2Owner: make(map[*neo.NodeTable]string), nodeTab2Owner: make(map[*NodeTable]string),
clusterState2Owner: make(map[*proto.ClusterState]string), clusterState2Owner: make(map[*proto.ClusterState]string),
} }
} }
//trace:import "lab.nexedi.com/kirr/neo/go/neo"
//trace:import "lab.nexedi.com/kirr/neo/go/neo/neonet" //trace:import "lab.nexedi.com/kirr/neo/go/neo/neonet"
//trace:import "lab.nexedi.com/kirr/neo/go/neo/proto" //trace:import "lab.nexedi.com/kirr/neo/go/neo/proto"
...@@ -319,7 +315,7 @@ func (t *TraceCollector) Attach() { ...@@ -319,7 +315,7 @@ func (t *TraceCollector) Attach() {
//neo_traceMsgRecv_Attach(t.pg, t.traceNeoMsgRecv) //neo_traceMsgRecv_Attach(t.pg, t.traceNeoMsgRecv)
neonet_traceMsgSendPre_Attach(t.pg, t.traceNeoMsgSendPre) neonet_traceMsgSendPre_Attach(t.pg, t.traceNeoMsgSendPre)
proto_traceClusterStateChanged_Attach(t.pg, t.traceClusterState) proto_traceClusterStateChanged_Attach(t.pg, t.traceClusterState)
neo_traceNodeChanged_Attach(t.pg, t.traceNode) traceNodeChanged_Attach(t.pg, t.traceNode)
traceMasterStartReady_Attach(t.pg, t.traceMasterStartReady) traceMasterStartReady_Attach(t.pg, t.traceMasterStartReady)
tracing.Unlock() tracing.Unlock()
} }
...@@ -332,7 +328,7 @@ func (t *TraceCollector) Detach() { ...@@ -332,7 +328,7 @@ func (t *TraceCollector) Detach() {
// //
// This way it can translate e.g. *NodeTable -> owner node name when creating // This way it can translate e.g. *NodeTable -> owner node name when creating
// corresponding event. // corresponding event.
func (t *TraceCollector) RegisterNode(node *neo.NodeApp, name string) { func (t *TraceCollector) RegisterNode(node *NodeApp, name string) {
tracing.Lock() tracing.Lock()
defer tracing.Unlock() defer tracing.Unlock()
...@@ -368,7 +364,7 @@ func (t *TraceCollector) traceClusterState(cs *proto.ClusterState) { ...@@ -368,7 +364,7 @@ func (t *TraceCollector) traceClusterState(cs *proto.ClusterState) {
t.d.Dispatch(&eventClusterState{where, *cs}) t.d.Dispatch(&eventClusterState{where, *cs})
} }
func (t *TraceCollector) traceNode(nt *neo.NodeTable, n *neo.Node) { func (t *TraceCollector) traceNode(nt *NodeTable, n *Node) {
//t.d.Dispatch(&eventNodeTab{unsafe.Pointer(nt), n.NodeInfo}) //t.d.Dispatch(&eventNodeTab{unsafe.Pointer(nt), n.NodeInfo})
where := t.nodeTab2Owner[nt] where := t.nodeTab2Owner[nt]
t.d.Dispatch(&eventNodeTab{where, n.NodeInfo}) t.d.Dispatch(&eventNodeTab{where, n.NodeInfo})
...@@ -476,7 +472,7 @@ func TestMasterStorage(t *testing.T) { ...@@ -476,7 +472,7 @@ func TestMasterStorage(t *testing.T) {
// cluster nodes // cluster nodes
M := NewMaster("abc1", ":1", Mhost) M := NewMaster("abc1", ":1", Mhost)
zstor := xfs1stor("../../zodb/storage/fs1/testdata/1.fs") zstor := xfs1stor("../zodb/storage/fs1/testdata/1.fs")
S := NewStorage("abc1", "m:1", ":1", Shost, zstor) S := NewStorage("abc1", "m:1", ":1", Shost, zstor)
// let tracer know how to map state addresses to node names // let tracer know how to map state addresses to node names
...@@ -625,9 +621,8 @@ func TestMasterStorage(t *testing.T) { ...@@ -625,9 +621,8 @@ func TestMasterStorage(t *testing.T) {
return // XXX temp return // XXX temp
// create client // create client
C := client.NewClient("abc1", "m:1", Chost) C := NewClient("abc1", "m:1", Chost)
Cnode := *(**neo.NodeApp)(unsafe.Pointer(C)) // XXX hack, fragile tracer.RegisterNode(C.node, "c")
tracer.RegisterNode(Cnode, "c")
// trace // trace
...@@ -868,7 +863,7 @@ func TestMasterStorage(t *testing.T) { ...@@ -868,7 +863,7 @@ func TestMasterStorage(t *testing.T) {
/* /*
func benchmarkGetObject(b *testing.B, Mnet, Snet, Cnet xnet.Networker, benchit func(xcload1 func())) { func benchmarkGetObject(b *testing.B, Mnet, Snet, Cnet xnet.Networker, benchit func(xcload1 func())) {
// create test cluster <- XXX factor to utility func // create test cluster <- XXX factor to utility func
zstor := xfs1stor("../../zodb/storage/fs1/testdata/1.fs") zstor := xfs1stor("../zodb/storage/fs1/testdata/1.fs")
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
wg, ctx := errgroup.WithContext(ctx) wg, ctx := errgroup.WithContext(ctx)
...@@ -883,7 +878,7 @@ func benchmarkGetObject(b *testing.B, Mnet, Snet, Cnet xnet.Networker, benchit f ...@@ -883,7 +878,7 @@ func benchmarkGetObject(b *testing.B, Mnet, Snet, Cnet xnet.Networker, benchit f
tc := tsync.NewEventChecker(b, tracer.SyncChan) tc := tsync.NewEventChecker(b, tracer.SyncChan)
pg := &tracing.ProbeGroup{} pg := &tracing.ProbeGroup{}
tracing.Lock() tracing.Lock()
pnode := neo_traceNodeChanged_Attach(nil, tracer.traceNode) pnode := traceNodeChanged_Attach(nil, tracer.traceNode)
traceMasterStartReady_Attach(pg, tracer.traceMasterStartReady) traceMasterStartReady_Attach(pg, tracer.traceMasterStartReady)
tracing.Unlock() tracing.Unlock()
......
// Copyright (C) 2017 Nexedi SA and Contributors. // Copyright (C) 2017-2018 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com> // Kirill Smelkov <kirr@nexedi.com>
// //
// This program is free software: you can Use, Study, Modify and Redistribute // This program is free software: you can Use, Study, Modify and Redistribute
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
// See COPYING file for full licensing terms. // See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options. // See https://www.nexedi.com/licensing for rationale and options.
package server package neo
// master node // master node
import ( import (
...@@ -33,7 +33,6 @@ import ( ...@@ -33,7 +33,6 @@ import (
"lab.nexedi.com/kirr/go123/xnet" "lab.nexedi.com/kirr/go123/xnet"
"lab.nexedi.com/kirr/neo/go/neo"
"lab.nexedi.com/kirr/neo/go/neo/neonet" "lab.nexedi.com/kirr/neo/go/neo/neonet"
"lab.nexedi.com/kirr/neo/go/neo/proto" "lab.nexedi.com/kirr/neo/go/neo/proto"
"lab.nexedi.com/kirr/neo/go/zodb" "lab.nexedi.com/kirr/neo/go/zodb"
...@@ -45,7 +44,7 @@ import ( ...@@ -45,7 +44,7 @@ import (
// Master is a node overseeing and managing how whole NEO cluster works // Master is a node overseeing and managing how whole NEO cluster works
type Master struct { type Master struct {
node *neo.NodeApp node *NodeApp
// master manages node and partition tables and broadcast their updates // master manages node and partition tables and broadcast their updates
// to all nodes in cluster // to all nodes in cluster
...@@ -75,7 +74,7 @@ type Master struct { ...@@ -75,7 +74,7 @@ type Master struct {
// Use Run to actually start running the node. // Use Run to actually start running the node.
func NewMaster(clusterName, serveAddr string, net xnet.Networker) *Master { func NewMaster(clusterName, serveAddr string, net xnet.Networker) *Master {
m := &Master{ m := &Master{
node: neo.NewNodeApp(net, proto.MASTER, clusterName, serveAddr, serveAddr), node: NewNodeApp(net, proto.MASTER, clusterName, serveAddr, serveAddr),
ctlStart: make(chan chan error), ctlStart: make(chan chan error),
ctlStop: make(chan chan struct{}), ctlStop: make(chan chan struct{}),
...@@ -274,8 +273,8 @@ func (m *Master) runMain(ctx context.Context) (err error) { ...@@ -274,8 +273,8 @@ func (m *Master) runMain(ctx context.Context) (err error) {
// storRecovery is result of 1 storage node passing recovery phase // storRecovery is result of 1 storage node passing recovery phase
type storRecovery struct { type storRecovery struct {
stor *neo.Node stor *Node
partTab *neo.PartitionTable partTab *PartitionTable
err error err error
// XXX + backup_tid, truncate_tid ? // XXX + backup_tid, truncate_tid ?
...@@ -463,13 +462,13 @@ loop2: ...@@ -463,13 +462,13 @@ loop2:
// if we are starting for new cluster - create partition table // if we are starting for new cluster - create partition table
if m.node.PartTab.PTid == 0 { if m.node.PartTab.PTid == 0 {
// XXX -> m.nodeTab.StorageList(State > DOWN) // XXX -> m.nodeTab.StorageList(State > DOWN)
storv := []*neo.Node{} storv := []*Node{}
for _, stor := range m.node.NodeTab.StorageList() { for _, stor := range m.node.NodeTab.StorageList() {
if stor.State > proto.DOWN { if stor.State > proto.DOWN {
storv = append(storv, stor) storv = append(storv, stor)
} }
} }
m.node.PartTab = neo.MakePartTab(1 /* XXX hardcoded */, storv) m.node.PartTab = MakePartTab(1 /* XXX hardcoded */, storv)
m.node.PartTab.PTid = 1 m.node.PartTab.PTid = 1
log.Infof(ctx, "creating new partition table: %s", m.node.PartTab) log.Infof(ctx, "creating new partition table: %s", m.node.PartTab)
} }
...@@ -479,7 +478,7 @@ loop2: ...@@ -479,7 +478,7 @@ loop2:
// storCtlRecovery drives a storage node during cluster recovering state // storCtlRecovery drives a storage node during cluster recovering state
// it retrieves various ids and partition table from as stored on the storage // it retrieves various ids and partition table from as stored on the storage
func storCtlRecovery(ctx context.Context, stor *neo.Node, res chan storRecovery) { func storCtlRecovery(ctx context.Context, stor *Node, res chan storRecovery) {
var err error var err error
defer func() { defer func() {
if err == nil { if err == nil {
...@@ -507,7 +506,7 @@ func storCtlRecovery(ctx context.Context, stor *neo.Node, res chan storRecovery) ...@@ -507,7 +506,7 @@ func storCtlRecovery(ctx context.Context, stor *neo.Node, res chan storRecovery)
} }
// reconstruct partition table from response // reconstruct partition table from response
pt := neo.PartTabFromDump(resp.PTid, resp.RowList) pt := PartTabFromDump(resp.PTid, resp.RowList)
res <- storRecovery{stor: stor, partTab: pt} res <- storRecovery{stor: stor, partTab: pt}
} }
...@@ -675,14 +674,14 @@ loop2: ...@@ -675,14 +674,14 @@ loop2:
// storVerify is result of a storage node passing verification phase // storVerify is result of a storage node passing verification phase
type storVerify struct { type storVerify struct {
stor *neo.Node stor *Node
lastOid zodb.Oid lastOid zodb.Oid
lastTid zodb.Tid lastTid zodb.Tid
err error err error
} }
// storCtlVerify drives a storage node during cluster verifying (= starting) state // storCtlVerify drives a storage node during cluster verifying (= starting) state
func storCtlVerify(ctx context.Context, stor *neo.Node, pt *neo.PartitionTable, res chan storVerify) { func storCtlVerify(ctx context.Context, stor *Node, pt *PartitionTable, res chan storVerify) {
// XXX link.Close on err // XXX link.Close on err
// XXX cancel on ctx // XXX cancel on ctx
...@@ -739,7 +738,7 @@ func storCtlVerify(ctx context.Context, stor *neo.Node, pt *neo.PartitionTable, ...@@ -739,7 +738,7 @@ func storCtlVerify(ctx context.Context, stor *neo.Node, pt *neo.PartitionTable,
// serviceDone is the error returned after service-phase node handling is finished // serviceDone is the error returned after service-phase node handling is finished
type serviceDone struct { type serviceDone struct {
node *neo.Node node *Node
err error err error
} }
...@@ -846,7 +845,7 @@ loop: ...@@ -846,7 +845,7 @@ loop:
} }
// storCtlService drives a storage node during cluster service state // storCtlService drives a storage node during cluster service state
func storCtlService(ctx context.Context, stor *neo.Node) (err error) { func storCtlService(ctx context.Context, stor *Node) (err error) {
slink := stor.Link() slink := stor.Link()
defer task.Runningf(&ctx, "%s: stor service", slink.RemoteAddr())(&err) defer task.Runningf(&ctx, "%s: stor service", slink.RemoteAddr())(&err)
...@@ -893,7 +892,7 @@ func storCtlService(ctx context.Context, stor *neo.Node) (err error) { ...@@ -893,7 +892,7 @@ func storCtlService(ctx context.Context, stor *neo.Node) (err error) {
} }
// serveClient serves incoming client link // serveClient serves incoming client link
func (m *Master) serveClient(ctx context.Context, cli *neo.Node) (err error) { func (m *Master) serveClient(ctx context.Context, cli *Node) (err error) {
clink := cli.Link() clink := cli.Link()
defer task.Runningf(&ctx, "%s: client service", clink.RemoteAddr())(&err) defer task.Runningf(&ctx, "%s: client service", clink.RemoteAddr())(&err)
...@@ -1026,7 +1025,7 @@ func (m *Master) keepPeerUpdated(ctx context.Context, link *neonet.NodeLink) (er ...@@ -1026,7 +1025,7 @@ func (m *Master) keepPeerUpdated(ctx context.Context, link *neonet.NodeLink) (er
// If node identification is accepted .nodeTab is updated and corresponding node entry is returned. // If node identification is accepted .nodeTab is updated and corresponding node entry is returned.
// Response message is constructed but not send back not to block the caller - it is // Response message is constructed but not send back not to block the caller - it is
// the caller responsibility to send the response to node which requested identification. // the caller responsibility to send the response to node which requested identification.
func (m *Master) identify(ctx context.Context, n nodeCome) (node *neo.Node, resp proto.Msg) { func (m *Master) identify(ctx context.Context, n nodeCome) (node *Node, resp proto.Msg) {
// XXX also verify ? : // XXX also verify ? :
// - NodeType valid // - NodeType valid
// - IdTime ? // - IdTime ?
......
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
// See COPYING file for full licensing terms. // See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options. // See https://www.nexedi.com/licensing for rationale and options.
package server package neo
import ( import (
"context" "context"
......
...@@ -29,7 +29,7 @@ import ( ...@@ -29,7 +29,7 @@ import (
"lab.nexedi.com/kirr/go123/prog" "lab.nexedi.com/kirr/go123/prog"
"lab.nexedi.com/kirr/go123/xnet" "lab.nexedi.com/kirr/go123/xnet"
"lab.nexedi.com/kirr/neo/go/neo/server" "lab.nexedi.com/kirr/neo/go/neo"
) )
const masterSummary = "run master node" const masterSummary = "run master node"
...@@ -64,7 +64,7 @@ func masterMain(argv []string) { ...@@ -64,7 +64,7 @@ func masterMain(argv []string) {
net := xnet.NetPlain("tcp") // TODO + TLS; not only "tcp" ? net := xnet.NetPlain("tcp") // TODO + TLS; not only "tcp" ?
masterSrv := server.NewMaster(*cluster, *bind, net) masterSrv := neo.NewMaster(*cluster, *bind, net)
ctx := context.Background() ctx := context.Background()
/* /*
......
...@@ -31,7 +31,7 @@ import ( ...@@ -31,7 +31,7 @@ import (
"lab.nexedi.com/kirr/go123/prog" "lab.nexedi.com/kirr/go123/prog"
"lab.nexedi.com/kirr/go123/xnet" "lab.nexedi.com/kirr/go123/xnet"
"lab.nexedi.com/kirr/neo/go/neo/server" "lab.nexedi.com/kirr/neo/go/neo"
"lab.nexedi.com/kirr/neo/go/zodb/storage/fs1" "lab.nexedi.com/kirr/neo/go/zodb/storage/fs1"
) )
...@@ -93,7 +93,7 @@ func storageMain(argv []string) { ...@@ -93,7 +93,7 @@ func storageMain(argv []string) {
net := xnet.NetPlain("tcp") // TODO + TLS; not only "tcp" ? net := xnet.NetPlain("tcp") // TODO + TLS; not only "tcp" ?
storSrv := server.NewStorage(*cluster, master, *bind, net, zstor) storSrv := neo.NewStorage(*cluster, master, *bind, net, zstor)
ctx := context.Background() ctx := context.Background()
/* /*
......
...@@ -17,8 +17,7 @@ ...@@ -17,8 +17,7 @@
// See COPYING file for full licensing terms. // See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options. // See https://www.nexedi.com/licensing for rationale and options.
// Package server provides servers side of NEO. package neo
package server
// common parts for organizing network servers // common parts for organizing network servers
......
// Copyright (C) 2017 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
package server
// misc utilities
import (
"context"
"io"
"lab.nexedi.com/kirr/neo/go/xcommon/log"
)
// lclose closes c and logs closing error if there was any.
// the error is otherwise ignored
// XXX dup in neo, client
func lclose(ctx context.Context, c io.Closer) {
err := c.Close()
if err != nil {
log.Error(ctx, err)
}
}
// Code generated by lab.nexedi.com/kirr/go123/tracing/cmd/gotrace; DO NOT EDIT.
package server
// code generated for tracepoints
import (
"lab.nexedi.com/kirr/go123/tracing"
"unsafe"
)
// traceevent: traceMasterStartReady(m *Master, ready bool)
type _t_traceMasterStartReady struct {
tracing.Probe
probefunc func(m *Master, ready bool)
}
var _traceMasterStartReady *_t_traceMasterStartReady
func traceMasterStartReady(m *Master, ready bool) {
if _traceMasterStartReady != nil {
_traceMasterStartReady_run(m, ready)
}
}
func _traceMasterStartReady_run(m *Master, ready bool) {
for p := _traceMasterStartReady; p != nil; p = (*_t_traceMasterStartReady)(unsafe.Pointer(p.Next())) {
p.probefunc(m, ready)
}
}
func traceMasterStartReady_Attach(pg *tracing.ProbeGroup, probe func(m *Master, ready bool)) *tracing.Probe {
p := _t_traceMasterStartReady{probefunc: probe}
tracing.AttachProbe(pg, (**tracing.Probe)(unsafe.Pointer(&_traceMasterStartReady)), &p.Probe)
return &p.Probe
}
// trace export signature
func _trace_exporthash_1002eef247af7731924a09f42e9c3f4131f5bccb() {}
...@@ -17,20 +17,17 @@ ...@@ -17,20 +17,17 @@
// See COPYING file for full licensing terms. // See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options. // See https://www.nexedi.com/licensing for rationale and options.
package server package neo
// storage node // storage node
import ( import (
"context" "context"
"crypto/sha1"
"fmt" "fmt"
"os"
"sync" "sync"
"time" "time"
"github.com/pkg/errors" "github.com/pkg/errors"
"lab.nexedi.com/kirr/neo/go/neo"
"lab.nexedi.com/kirr/neo/go/neo/neonet" "lab.nexedi.com/kirr/neo/go/neo/neonet"
"lab.nexedi.com/kirr/neo/go/neo/proto" "lab.nexedi.com/kirr/neo/go/neo/proto"
"lab.nexedi.com/kirr/neo/go/neo/internal/common" "lab.nexedi.com/kirr/neo/go/neo/internal/common"
...@@ -47,7 +44,7 @@ import ( ...@@ -47,7 +44,7 @@ import (
// Storage is NEO node that keeps data and provides read/write access to it via network. // Storage is NEO node that keeps data and provides read/write access to it via network.
type Storage struct { type Storage struct {
node *neo.NodeApp node *NodeApp
// context for providing operational service // context for providing operational service
// it is renewed every time master tells us StartOpertion, so users // it is renewed every time master tells us StartOpertion, so users
...@@ -80,7 +77,7 @@ type Storage struct { ...@@ -80,7 +77,7 @@ type Storage struct {
// Use Run to actually start running the node. // Use Run to actually start running the node.
func NewStorage(clusterName, masterAddr, serveAddr string, net xnet.Networker, zstor *fs1.FileStorage) *Storage { func NewStorage(clusterName, masterAddr, serveAddr string, net xnet.Networker, zstor *fs1.FileStorage) *Storage {
stor := &Storage{ stor := &Storage{
node: neo.NewNodeApp(net, proto.STORAGE, clusterName, masterAddr, serveAddr), node: NewNodeApp(net, proto.STORAGE, clusterName, masterAddr, serveAddr),
zstor: zstor, zstor: zstor,
} }
...@@ -531,23 +528,6 @@ func (stor *Storage) serveClient(ctx context.Context, req neonet.Request) { ...@@ -531,23 +528,6 @@ func (stor *Storage) serveClient(ctx context.Context, req neonet.Request) {
} }
} }
// XXX for benchmarking: how much sha1 computation takes time from latency
var xsha1skip bool
func init() {
if os.Getenv("X_NEOGO_SHA1_SKIP") == "y" {
fmt.Fprintln(os.Stderr, "# NEO/go/storage: skipping SHA1 computations")
xsha1skip = true
}
}
func sha1Sum(b []byte) [sha1.Size]byte {
if !xsha1skip {
return sha1.Sum(b)
}
return [sha1.Size]byte{} // all 0
}
// serveClient1 prepares response for 1 request from client // serveClient1 prepares response for 1 request from client
func (stor *Storage) serveClient1(ctx context.Context, req proto.Msg) (resp proto.Msg) { func (stor *Storage) serveClient1(ctx context.Context, req proto.Msg) (resp proto.Msg) {
switch req := req.(type) { switch req := req.(type) {
......
...@@ -17,9 +17,8 @@ ...@@ -17,9 +17,8 @@
// See COPYING file for full licensing terms. // See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options. // See https://www.nexedi.com/licensing for rationale and options.
package server package neo
// time related utilities // time related utilities
// XXX -> neo ?
import ( import (
"time" "time"
......
...@@ -20,18 +20,68 @@ ...@@ -20,18 +20,68 @@
package neo package neo
import ( import (
"bytes"
"compress/zlib"
"context" "context"
"crypto/sha1"
"fmt"
"io" "io"
"os"
"lab.nexedi.com/kirr/neo/go/xcommon/log" "lab.nexedi.com/kirr/neo/go/xcommon/log"
) )
// lclose closes c and logs closing error if there was any. // lclose closes c and logs closing error if there was any.
// the error is otherwise ignored // the error is otherwise ignored
// XXX dup in server, client
func lclose(ctx context.Context, c io.Closer) { func lclose(ctx context.Context, c io.Closer) {
err := c.Close() err := c.Close()
if err != nil { if err != nil {
log.Error(ctx, err) log.Error(ctx, err)
} }
} }
// decompress decompresses data according to zlib encoding.
//
// out buffer, if there is enough capacity, is used for decompression destination.
// if out has not enough capacity a new buffer is allocated and used.
//
// return: destination buffer with full decompressed data or error.
func decompress(in []byte, out []byte) (data []byte, err error) {
bin := bytes.NewReader(in)
zr, err := zlib.NewReader(bin)
if err != nil {
return nil, err
}
defer func() {
err2 := zr.Close()
if err2 != nil && err == nil {
err = err2
data = nil
}
}()
bout := bytes.NewBuffer(out)
_, err = io.Copy(bout, zr)
if err != nil {
return nil, err
}
return bout.Bytes(), nil
}
// XXX for benchmarking: how much sha1 computation takes time from latency
var xsha1skip bool
func init() {
if os.Getenv("X_NEOGO_SHA1_SKIP") == "y" {
fmt.Fprintf(os.Stderr, "# NEO/go (%s): skipping SHA1 computations\n", os.Args[0])
xsha1skip = true
}
}
func sha1Sum(b []byte) [sha1.Size]byte {
if !xsha1skip {
return sha1.Sum(b)
}
return [sha1.Size]byte{} // all 0
}
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
// See COPYING file for full licensing terms. // See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options. // See https://www.nexedi.com/licensing for rationale and options.
package client package neo
import ( import (
"testing" "testing"
......
...@@ -8,6 +8,33 @@ import ( ...@@ -8,6 +8,33 @@ import (
"unsafe" "unsafe"
) )
// traceevent: traceMasterStartReady(m *Master, ready bool)
type _t_traceMasterStartReady struct {
tracing.Probe
probefunc func(m *Master, ready bool)
}
var _traceMasterStartReady *_t_traceMasterStartReady
func traceMasterStartReady(m *Master, ready bool) {
if _traceMasterStartReady != nil {
_traceMasterStartReady_run(m, ready)
}
}
func _traceMasterStartReady_run(m *Master, ready bool) {
for p := _traceMasterStartReady; p != nil; p = (*_t_traceMasterStartReady)(unsafe.Pointer(p.Next())) {
p.probefunc(m, ready)
}
}
func traceMasterStartReady_Attach(pg *tracing.ProbeGroup, probe func(m *Master, ready bool)) *tracing.Probe {
p := _t_traceMasterStartReady{probefunc: probe}
tracing.AttachProbe(pg, (**tracing.Probe)(unsafe.Pointer(&_traceMasterStartReady)), &p.Probe)
return &p.Probe
}
// traceevent: traceNodeChanged(nt *NodeTable, n *Node) // traceevent: traceNodeChanged(nt *NodeTable, n *Node)
type _t_traceNodeChanged struct { type _t_traceNodeChanged struct {
...@@ -36,4 +63,4 @@ func traceNodeChanged_Attach(pg *tracing.ProbeGroup, probe func(nt *NodeTable, n ...@@ -36,4 +63,4 @@ func traceNodeChanged_Attach(pg *tracing.ProbeGroup, probe func(nt *NodeTable, n
} }
// trace export signature // trace export signature
func _trace_exporthash_3520b2da37a17b902760c32971b0fd9ccb6d2ddb() {} func _trace_exporthash_ee76c0bfa710c94614a1fd0fe7a79e9cb723a340() {}
// Code generated by lab.nexedi.com/kirr/go123/tracing/cmd/gotrace; DO NOT EDIT. // Code generated by lab.nexedi.com/kirr/go123/tracing/cmd/gotrace; DO NOT EDIT.
package server package neo
// code generated for tracepoints // code generated for tracepoints
import ( import (
"lab.nexedi.com/kirr/go123/tracing" "lab.nexedi.com/kirr/go123/tracing"
_ "unsafe" _ "unsafe"
"lab.nexedi.com/kirr/neo/go/neo"
"lab.nexedi.com/kirr/neo/go/neo/neonet" "lab.nexedi.com/kirr/neo/go/neo/neonet"
"lab.nexedi.com/kirr/neo/go/neo/proto" "lab.nexedi.com/kirr/neo/go/neo/proto"
) )
// traceimport: "lab.nexedi.com/kirr/neo/go/neo"
// rerun "gotrace gen" if you see link failure ↓↓↓
//go:linkname neo_trace_exporthash lab.nexedi.com/kirr/neo/go/neo._trace_exporthash_3520b2da37a17b902760c32971b0fd9ccb6d2ddb
func neo_trace_exporthash()
func init() { neo_trace_exporthash() }
//go:linkname neo_traceNodeChanged_Attach lab.nexedi.com/kirr/neo/go/neo.traceNodeChanged_Attach
func neo_traceNodeChanged_Attach(*tracing.ProbeGroup, func(nt *neo.NodeTable, n *neo.Node)) *tracing.Probe
// traceimport: "lab.nexedi.com/kirr/neo/go/neo/neonet" // traceimport: "lab.nexedi.com/kirr/neo/go/neo/neonet"
// rerun "gotrace gen" if you see link failure ↓↓↓ // rerun "gotrace gen" if you see link failure ↓↓↓
......
...@@ -29,5 +29,5 @@ package wks ...@@ -29,5 +29,5 @@ package wks
import ( import (
_ "lab.nexedi.com/kirr/neo/go/zodb/storage/fs1" _ "lab.nexedi.com/kirr/neo/go/zodb/storage/fs1"
_ "lab.nexedi.com/kirr/neo/go/neo/client" _ "lab.nexedi.com/kirr/neo/go/neo"
) )
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment