Commit 9fbd20fb authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 67f98d72
......@@ -26,39 +26,73 @@ import (
"lab.nexedi.com/kirr/neo/go/neo"
"lab.nexedi.com/kirr/neo/go/zodb"
// "lab.nexedi.com/kirr/neo/go/xcommon/xnet"
"lab.nexedi.com/kirr/neo/go/xcommon/xnet"
)
// Client talks to NEO cluster and exposes access it via ZODB interfaces
// Client talks to NEO cluster and exposes access to it via ZODB interfaces
type Client struct {
node neo.NodeCommon
storLink *neo.NodeLink // link to storage node
storConn *neo.Conn // XXX main connection to storage
// storLink *neo.NodeLink // link to storage node
// storConn *neo.Conn // XXX main connection to storage
}
var _ zodb.IStorage = (*Client)(nil)
func (c *Client) StorageName() string {
return "neo" // TODO more specific (+ cluster name, ...)
return "neo"
}
// XXX loading cache (+ singleflight)
// NewClient creates new client node.
// it will connect to master @masterAddr and identify with sepcified cluster name
func NewClient(clusterName, masterAddr string, net xnet.Networker) (*Client, error) {
cli := &Client{
node: neo.NodeCommon{
MyInfo: neo.NodeInfo{Type: neo.CLIENT, Addr: neo.Address{}},
ClusterName: clusterName,
Net: net,
MasterAddr: masterAddr,
//NodeTab: &neo.NodeTable{},
//PartTab: &neo.PartitionTable{},
},
}
// XXX -> background
cli.node.Dial(context.TODO(), neo.MASTER, masterAddr)
panic("TODO")
}
func (c *Client) Close() error {
// NOTE this will abort all currently in-flght IO and close all connections over storLink
err := c.storLink.Close()
// XXX also wait for some goroutines to finish ?
return err
panic("TODO")
// // NOTE this will abort all currently in-flght IO and close all connections over storLink
// err := c.storLink.Close()
// // XXX also wait for some goroutines to finish ?
// return err
}
func (c *Client) LastTid() (zodb.Tid, error) {
panic("TODO")
/*
c.Mlink // XXX check we are connected
conn, err := c.Mlink.NewConn()
if err != nil {
// XXX
}
// XXX defer conn.Close
// FIXME do not use global conn (see comment in openClientByURL)
// XXX open new conn for this particular req/reply ?
reply := neo.AnswerLastTransaction{}
err := c.storConn.Ask(&neo.LastTransaction{}, &reply)
err := conn.Ask(&neo.LastTransaction{}, &reply)
if err != nil {
return 0, err // XXX err ctx
}
return reply.Tid, nil
*/
}
func (c *Client) LastOid() (zodb.Oid, error) {
......@@ -67,6 +101,8 @@ func (c *Client) LastOid() (zodb.Oid, error) {
}
func (c *Client) Load(xid zodb.Xid) (data []byte, tid zodb.Tid, err error) {
panic("TODO")
/*
// FIXME do not use global conn (see comment in openClientByURL)
req := neo.GetObject{Oid: xid.Oid}
if xid.TidBefore {
......@@ -89,6 +125,7 @@ func (c *Client) Load(xid zodb.Xid) (data []byte, tid zodb.Tid, err error) {
// reply.NextSerial
// reply.DataSerial
return resp.Data, resp.Serial, nil
*/
}
func (c *Client) Iterate(tidMin, tidMax zodb.Tid) zodb.IStorageIterator {
......@@ -97,44 +134,6 @@ func (c *Client) Iterate(tidMin, tidMax zodb.Tid) zodb.IStorageIterator {
}
// NewClient creates and identifies new client connected to storage over storLink
func NewClient(masterAddr string) (*Client, error) {
// TODO .myInfo.NodeType = CLIENT
// .clusterName = clusterName
// .net = ...
cli := &Client{}
//return &Client{storLink, storConn}, nil
cli.node.Dial(context.TODO(), neo.MASTER, masterAddr)
panic("TODO")
/*
// XXX move -> Run?
// first identify ourselves to peer
accept, err := neo.IdentifyWith(neo.STORAGE, storLink, cli.node.MyInfo, cli.node.ClusterName)
if err != nil {
return nil, err
}
// TODO verify accept more
_ = accept
// identification passed
// XXX only one conn is not appropriate for multiple goroutines/threads
// asking storage in parallel. At the same time creating new conn for
// every request is ok? -> not so good to create new goroutine per 1 object read
// XXX -> server could reuse goroutines -> so not so bad ?
storConn, err := storLink.NewConn()
if err != nil {
return nil, err // XXX err ctx
}
_ = storConn // XXX temp
return cli, nil
*/
}
// TODO read-only support
func openClientByURL(ctx context.Context, u *url.URL) (zodb.IStorage, error) {
// XXX u.Host -> masterAddr (not storage)
......@@ -174,9 +173,6 @@ func openClientByURL(ctx context.Context, u *url.URL) (zodb.IStorage, error) {
*/
}
//func Open(...) (*Client, error) {
//}
func init() {
zodb.RegisterStorage("neo", openClientByURL)
}
......@@ -88,6 +88,8 @@ type Node struct {
NodeInfo
// XXX have Node point to -> NodeTable?
// XXX decouple vvv from Node ?
// link to this node; =nil if not connected
Link *NodeLink
......
......@@ -20,7 +20,11 @@
package neo
// partition table
import "fmt"
import (
"fmt"
"lab.nexedi.com/kirr/neo/go/zodb"
)
// PartitionTable represents object space partitioning in a cluster
//
......@@ -111,13 +115,13 @@ import "fmt"
type PartitionTable struct {
// XXX do we need sync.Mutex here for updates ?
tab [][]PartitionCell // [#Np] pid -> []Cell
tab [][]Cell // [#Np] pid -> []Cell
PTid PTid // ↑ for versioning XXX -> ver ? XXX move out of here?
}
// PartitionCell describes one storage in a pid entry in partition table
type PartitionCell struct {
// Cell describes one storage in a pid entry in partition table
type Cell struct {
CellInfo
// XXX ? + .haveUpToTid associated node has data up to such tid
......@@ -132,17 +136,26 @@ type PartitionCell struct {
//
}
// Get returns cells oid is associated with
func (pt *PartitionTable) Get(oid zodb.Oid) []Cell {
if len(pt.tab) == 0 {
return nil
}
pid := uint64(oid) % uint64(len(pt.tab))
return pt.tab[pid]
}
// MakePartTab creates new partition with uniformly distributed nodes
// The partition table created will be of len=np
// FIXME R=1 hardcoded
func MakePartTab(np int, nodev []*Node) *PartitionTable {
// XXX stub, not tested
tab := make([][]PartitionCell, np)
tab := make([][]Cell, np)
for i, j := 0, 0; i < np; i, j = i+1, j+1 % len(nodev) {
node := nodev[j]
// XXX assert node.State > DOWN
fmt.Printf("tab[%d] <- %v\n", i, node.UUID)
tab[i] = []PartitionCell{{CellInfo: CellInfo{node.UUID, UP_TO_DATE /*XXX ok?*/}}}
tab[i] = []Cell{{CellInfo: CellInfo{node.UUID, UP_TO_DATE /*XXX ok?*/}}}
}
return &PartitionTable{tab: tab}
......@@ -157,7 +170,7 @@ func MakePartTab(np int, nodev []*Node) *PartitionTable {
//
// information about nodes being up or down is obtained from supplied NodeTable
//
// XXX or keep not only NodeUUID in PartitionCell - add *Node ?
// XXX or keep not only NodeUUID in Cell - add *Node ?
func (pt *PartitionTable) OperationalWith(nt *NodeTable) bool {
for _, ptEntry := range pt.tab {
if len(ptEntry) == 0 {
......@@ -218,12 +231,12 @@ func PartTabFromDump(ptid PTid, rowv []RowInfo) *PartitionTable {
for _, row := range rowv {
i := row.Offset
for i >= uint32(len(pt.tab)) {
pt.tab = append(pt.tab, []PartitionCell{})
pt.tab = append(pt.tab, []Cell{})
}
//pt.tab[i] = append(pt.tab[i], row.CellList...)
for _, cell := range row.CellList {
pt.tab[i] = append(pt.tab[i], PartitionCell{cell})
pt.tab[i] = append(pt.tab[i], Cell{cell})
}
}
......
......@@ -304,7 +304,7 @@ func TestMasterStorage(t *testing.T) {
tc.Expect(conntx("m:2", "s:2", 1, &neo.NotifyPartitionTable{
PTid: 1,
RowList: []neo.RowInfo{
{0, []neo.CellInfo{{S.node.MyInfo.UUID, neo.UP_TO_DATE}}},
{0, []neo.CellInfo{{neo.UUID(neo.STORAGE, 1), neo.UP_TO_DATE}}},
},
}))
......
......@@ -825,6 +825,9 @@ loop:
}
}()
case d := <-serviced:
// TODO if S goes away -> check partTab still operational -> if not - recovery
// XXX who sends here?
case n := <-m.nodeLeave:
m.nodeTab.SetNodeState(n.node, neo.DOWN)
......@@ -857,7 +860,6 @@ loop:
}
// storCtlService drives a storage node during cluster service state
// XXX text
func storCtlService(ctx context.Context, stor *neo.Node, done chan serviceDone) {
err := storCtlService1(ctx, stor)
done <- serviceDone{node: stor, err: err}
......@@ -893,6 +895,8 @@ func storCtlService1(ctx context.Context, stor *neo.Node) (err error) {
}
}
// ----------------------------------------
// identify processes identification request of just connected node and either accepts or declines it.
// If node identification is accepted .nodeTab is updated and corresponding node entry is returned.
// Response message is constructed but not send back not to block the caller - it is
......@@ -991,7 +995,7 @@ func reject(ctx context.Context, conn *neo.Conn, resp neo.Msg) {
func goreject(ctx context.Context, wg *sync.WaitGroup, conn *neo.Conn, resp neo.Msg) {
wg.Add(1)
defer wg.Done()
reject(ctx, conn, resp)
go reject(ctx, conn, resp)
}
// accept sends acceptive identification response and closes conn
......
......@@ -171,6 +171,7 @@ func (stor *Storage) talkMaster1(ctx context.Context) (err error) {
log.Info(ctx, "connecting ...")
Mconn, accept, err := stor.node.Dial(ctx, neo.MASTER, stor.node.MasterAddr)
if err != nil {
// FIXME it is not only identification - e.g. ECONNREFUSED
log.Info(ctx, "identification rejected") // XXX ok here? (err is logged above)
return err
}
......
......@@ -37,7 +37,7 @@ type XTid struct {
TidBefore bool // XXX merge into Tid itself (high bit) ?
}
// Xid is "extended" oid = oid + serial/beforeTid, completely specifying object address.
// Xid is "extended" oid = oid + serial/beforeTid, completely specifying object address query.
type Xid struct {
XTid
Oid
......
......@@ -18,15 +18,16 @@ def main():
#'master_nodes': '[2001:67c:1254:e:20::3977]:2051', # M on webr-wneo-*1*
#'name': 'woelfel-munich-clone',
'master_nodes': '[2001:67c:1254:e:21::ffa]:2051', # M on webr-wneo-2
#'master_nodes': '[2001:67c:1254:e:21::ffa]:2051', # M on webr-wneo-2
'master_nodes': '[::1]:2051', # M on webr-wneo-2
'name': 'woelfel-munich-clone-backup-comp-2591',
'read_only': True,
'logfile': 'x.log',
'ca': etc1 + '/ca.crt',
'cert': etc1 + '/neo.crt',
'key': etc1 + '/neo.key',
#'ca': etc1 + '/ca.crt',
#'cert': etc1 + '/neo.crt',
#'key': etc1 + '/neo.key',
}
print 'aaa'
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment