Commit 082ba218 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 42fd27c5
...@@ -30,7 +30,6 @@ package neo ...@@ -30,7 +30,6 @@ package neo
import ( import (
"context" "context"
"fmt" "fmt"
"math"
"net" "net"
"sync" "sync"
...@@ -80,7 +79,7 @@ func NewNodeApp(net xnet.Networker, typ NodeType, clusterName, masterAddr, serve ...@@ -80,7 +79,7 @@ func NewNodeApp(net xnet.Networker, typ NodeType, clusterName, masterAddr, serve
} }
app := &NodeApp{ app := &NodeApp{
MyInfo: NodeInfo{Type: typ, Addr: addr, IdTimestamp: math.NaN()}, MyInfo: NodeInfo{Type: typ, Addr: addr, IdTime: 0},
ClusterName: clusterName, ClusterName: clusterName,
Net: net, Net: net,
MasterAddr: masterAddr, MasterAddr: masterAddr,
...@@ -129,7 +128,7 @@ func (app *NodeApp) Dial(ctx context.Context, peerType NodeType, addr string) (_ ...@@ -129,7 +128,7 @@ func (app *NodeApp) Dial(ctx context.Context, peerType NodeType, addr string) (_
UUID: app.MyInfo.UUID, UUID: app.MyInfo.UUID,
Address: app.MyInfo.Addr, Address: app.MyInfo.Addr,
ClusterName: app.ClusterName, ClusterName: app.ClusterName,
IdTimestamp: app.MyInfo.IdTimestamp, // XXX ok? IdTime: app.MyInfo.IdTime, // XXX ok?
} }
accept := &AcceptIdentification{} accept := &AcceptIdentification{}
// FIXME error if peer sends us something with another connID // FIXME error if peer sends us something with another connID
...@@ -332,17 +331,17 @@ func (l *listener) Addr() net.Addr { ...@@ -332,17 +331,17 @@ func (l *listener) Addr() net.Addr {
// UpdateNodeTab applies updates to .NodeTab from message and logs changes appropriately. // UpdateNodeTab applies updates to .NodeTab from message and logs changes appropriately.
func (app *NodeApp) UpdateNodeTab(ctx context.Context, msg *NotifyNodeInformation) { func (app *NodeApp) UpdateNodeTab(ctx context.Context, msg *NotifyNodeInformation) {
// XXX msg.IdTimestamp ? // XXX msg.IdTime ?
for _, nodeInfo := range msg.NodeList { for _, nodeInfo := range msg.NodeList {
log.Infof(ctx, "rx node update: %v", nodeInfo) log.Infof(ctx, "rx node update: %v", nodeInfo)
app.NodeTab.Update(nodeInfo) app.NodeTab.Update(nodeInfo)
// XXX we have to provide IdTimestamp when requesting identification to other peers // XXX we have to provide IdTime when requesting identification to other peers
// (e.g. Spy checks this is what master broadcast them and if not replis "unknown by master") // (e.g. Spy checks this is what master broadcast them and if not replis "unknown by master")
if nodeInfo.UUID == app.MyInfo.UUID { if nodeInfo.UUID == app.MyInfo.UUID {
// XXX recheck locking // XXX recheck locking
// XXX do .MyInfo = nodeInfo ? // XXX do .MyInfo = nodeInfo ?
app.MyInfo.IdTimestamp = nodeInfo.IdTimestamp app.MyInfo.IdTime = nodeInfo.IdTime
} }
} }
......
...@@ -178,7 +178,11 @@ func (nt *NodeTable) String() string { ...@@ -178,7 +178,11 @@ func (nt *NodeTable) String() string {
// XXX also for .storv // XXX also for .storv
for _, n := range nt.nodev { for _, n := range nt.nodev {
// XXX recheck output // XXX recheck output
fmt.Fprintf(&buf, "%s (%s)\t%s\t%s\n", n.UUID, n.Type, n.State, n.Addr) fmt.Fprintf(&buf, "%s (%s)\t%s\t%s", n.UUID, n.Type, n.State, n.Addr)
if n.IdTime.Valid() {
fmt.Fprintf(&buf, "\t@ %v", n.IdTime)
}
fmt.Fprintf(&buf, "\n")
} }
return buf.String() return buf.String()
......
...@@ -27,6 +27,7 @@ import ( ...@@ -27,6 +27,7 @@ import (
"net" "net"
"strconv" "strconv"
"strings" "strings"
"time"
) )
// XXX or better translate to some other errors ? // XXX or better translate to some other errors ?
...@@ -47,51 +48,96 @@ func (cs *ClusterState) Set(v ClusterState) { ...@@ -47,51 +48,96 @@ func (cs *ClusterState) Set(v ClusterState) {
traceClusterStateChanged(cs) traceClusterStateChanged(cs)
} }
const nodeTypeChar = "MSCA4567" // keep in sync with NodeType constants //const nodeTypeChar = "MSCA????" // keep in sync with NodeType constants
const nodeTypeChar = "SMCA" // XXX neo/py does this out of sync with NodeType constants
// String returns string representation of a node uuid. // String returns string representation of a node uuid.
// It returns ex 'S1', 'M2', ... // It returns ex 'S1', 'M2', ...
func (nodeUUID NodeUUID) String() string { func (nodeUUID NodeUUID) String() string {
if nodeUUID == 0 { if nodeUUID == 0 {
return "?0" return "?(0)0"
} }
typ := nodeUUID >> 24
num := nodeUUID & (1<<24 - 1) num := nodeUUID & (1<<24 - 1)
temp := typ&(1 << 7) != 0 // XXX UUID_NAMESPACES description does not match neo/py code
typ &= 1<<7 - 1 //typ := nodeUUID >> 24
//temp := typ&(1 << 7) != 0
//typ &= 1<<7 - 1
//nodeType := typ >> 4
typ := uint8(-int8(nodeUUID >> 24)) >> 4
nodeType := NodeType(typ >> 4)
s := fmt.Sprintf("%c%d", nodeTypeChar[nodeType], num)
if typ < 4 {
return fmt.Sprintf("%c%d", nodeTypeChar[typ], num)
}
return fmt.Sprintf("?(%d)%d", typ, num)
/*
// 's1', 'm2', for temporary nids // 's1', 'm2', for temporary nids
if temp { if temp {
s = strings.ToLower(s) s = strings.ToLower(s)
} }
return s return s
*/
} }
// XXX goes out of sync wrt NodeType constants
var nodeTypeNum = [...]int8 {
STORAGE: 0x00,
MASTER: -0x10,
CLIENT: -0x20,
ADMIN: -0x30,
}
// UUID creates node uuid from node type and number. // UUID creates node uuid from node type and number.
// XXX test
func UUID(typ NodeType, num int32) NodeUUID { func UUID(typ NodeType, num int32) NodeUUID {
// XXX neo/py does not what UUID_NAMESPACES describes
/*
temp := uint32(0) temp := uint32(0)
if num < 0 { if num < 0 {
temp = 1 temp = 1
num = -num num = -num
} }
*/
if int(typ) >= len(nodeTypeNum) {
panic("typ invalid")
}
typn := nodeTypeNum[typ]
if num >> 24 != 0 { if (num < 0) || num >> 24 != 0 {
panic("node number out of range") panic("node number out of range")
} }
uuid := temp << (7 + 3*8) | uint32(typ) << (4 + 3*8) | uint32(num) //uuid := temp << (7 + 3*8) | uint32(typ) << (4 + 3*8) | uint32(num)
uuid := uint32(uint8(typn)) << (3*8) | uint32(num)
return NodeUUID(uuid) return NodeUUID(uuid)
} }
// ---------------------------------------- // ----------------------------------------
// Valid returns whether t was initialized
func (t IdTime) Valid() bool {
return t != 0
}
func (t IdTime) String() string {
if !t.Valid() {
return "ø"
}
sec := int64(t)
nsec := int64((float64(t) - float64(sec)) * 1E9)
return time.Unix(sec, nsec).String()
}
// ----------------------------------------
// Addr converts network address string into NEO Address // Addr converts network address string into NEO Address
// TODO make neo.Address just string without host:port split // TODO make neo.Address just string without host:port split
func AddrString(network, addr string) (Address, error) { func AddrString(network, addr string) (Address, error) {
......
...@@ -269,6 +269,35 @@ type Checksum [20]byte ...@@ -269,6 +269,35 @@ type Checksum [20]byte
// Zero value means "invalid id" (<-> None in py.PPTID) // Zero value means "invalid id" (<-> None in py.PPTID)
type PTid uint64 type PTid uint64
// IdTime represents time of identification
type IdTime float64
func (t IdTime) neoEncodedLen() int {
return 8
}
func (t IdTime) neoEncode(b []byte) int {
// use 0 as value for no data (NaN != NaN -> hard to use NaN in tests)
// NOTE neo/py uses None for "no data"; we use 0 for "no data" to avoid pointer
tt := float64(t)
if tt == 0 {
tt = math.NaN()
}
float64_neoEncode(b, tt)
return 8
}
func (t *IdTime) neoDecode(data []byte) (uint32, bool) {
if len(data) < 8 {
return 0, false
}
tt := float64_neoDecode(data)
if math.IsNaN(tt) {
tt = 0
}
*t = IdTime(tt)
return 8, true
}
// NodeInfo is information about a node // NodeInfo is information about a node
//neo:proto typeonly //neo:proto typeonly
...@@ -277,7 +306,7 @@ type NodeInfo struct { ...@@ -277,7 +306,7 @@ type NodeInfo struct {
Addr Address // serving address Addr Address // serving address
UUID NodeUUID UUID NodeUUID
State NodeState State NodeState
IdTimestamp float64 // FIXME clarify semantic where it is used IdTime IdTime // FIXME clarify semantic where it is used
} }
//neo:proto typeonly //neo:proto typeonly
...@@ -311,7 +340,7 @@ type RequestIdentification struct { ...@@ -311,7 +340,7 @@ type RequestIdentification struct {
UUID NodeUUID UUID NodeUUID
Address Address // where requesting node is also accepting connections Address Address // where requesting node is also accepting connections
ClusterName string ClusterName string
IdTimestamp float64 IdTime IdTime
} }
//neo:proto answer //neo:proto answer
...@@ -352,7 +381,7 @@ type NotPrimaryMaster struct { ...@@ -352,7 +381,7 @@ type NotPrimaryMaster struct {
// Notify information about one or more nodes. PM -> Any. // Notify information about one or more nodes. PM -> Any.
type NotifyNodeInformation struct { type NotifyNodeInformation struct {
// XXX in py this is monotonic_time() of call to broadcastNodesInformation() & friends // XXX in py this is monotonic_time() of call to broadcastNodesInformation() & friends
IdTimestamp float64 IdTime IdTime
NodeList []NodeInfo NodeList []NodeInfo
} }
...@@ -1042,7 +1071,7 @@ func bool2byte(b bool) byte { ...@@ -1042,7 +1071,7 @@ func bool2byte(b bool) byte {
// NOTE py.None encodes as '\xff' * 8 (-> we use NaN for None) // NOTE py.None encodes as '\xff' * 8 (-> we use NaN for None)
// NOTE '\xff' * 8 represents FP NaN but many other NaN bits representations exist // NOTE '\xff' * 8 represents FP NaN but many other NaN bits representations exist
func float64_NEOEncode(b []byte, f float64) { func float64_neoEncode(b []byte, f float64) {
var fu uint64 var fu uint64
if !math.IsNaN(f) { if !math.IsNaN(f) {
fu = math.Float64bits(f) fu = math.Float64bits(f)
...@@ -1054,7 +1083,7 @@ func float64_NEOEncode(b []byte, f float64) { ...@@ -1054,7 +1083,7 @@ func float64_NEOEncode(b []byte, f float64) {
binary.BigEndian.PutUint64(b, fu) binary.BigEndian.PutUint64(b, fu)
} }
func float64_NEODecode(b []byte) float64 { func float64_neoDecode(b []byte) float64 {
fu := binary.BigEndian.Uint64(b) fu := binary.BigEndian.Uint64(b)
return math.Float64frombits(fu) return math.Float64frombits(fu)
} }
......
...@@ -244,7 +244,7 @@ func TestMsgMarshal(t *testing.T) { ...@@ -244,7 +244,7 @@ func TestMsgMarshal(t *testing.T) {
u32(7) + u32(4) + u32(1) + u32(3) + u32(9) + u32(4), u32(7) + u32(4) + u32(1) + u32(3) + u32(9) + u32(4),
}, },
// uint32, Address, string, float64 // uint32, Address, string, IdTime
{&RequestIdentification{CLIENT, 17, Address{"localhost", 7777}, "myname", 0.12345678}, {&RequestIdentification{CLIENT, 17, Address{"localhost", 7777}, "myname", 0.12345678},
u32(2) + u32(17) + u32(9) + u32(2) + u32(17) + u32(9) +
...@@ -253,22 +253,21 @@ func TestMsgMarshal(t *testing.T) { ...@@ -253,22 +253,21 @@ func TestMsgMarshal(t *testing.T) {
hex("3fbf9add1091c895"), hex("3fbf9add1091c895"),
}, },
// float64, empty Address, int32 // IdTime, empty Address, int32
{&NotifyNodeInformation{1504466245.926185, []NodeInfo{ {&NotifyNodeInformation{1504466245.926185, []NodeInfo{
{CLIENT, Address{}, UUID(CLIENT, 1), RUNNING, 1504466245.925599}}}, {CLIENT, Address{}, UUID(CLIENT, 1), RUNNING, 1504466245.925599}}},
hex("41d66b15517b469d") + u32(1) + hex("41d66b15517b469d") + u32(1) +
u32(2) + u32(0) /* <- ø Address */ + hex("20000001") + u32(2) + u32(2) + u32(0) /* <- ø Address */ + hex("e0000001") + u32(2) +
hex("41d66b15517b3d04"), hex("41d66b15517b3d04"),
}, },
// empty IdTime
{&NotifyNodeInformation{0, []NodeInfo{}}, hex("ffffffffffffffff") + hex("00000000")},
// TODO we need tests for: // TODO we need tests for:
// []varsize + trailing // []varsize + trailing
// map[]varsize + trailing // map[]varsize + trailing
// TODO special cases for:
// - float64 (+ nan !nan ...)
} }
for _, tt := range testv { for _, tt := range testv {
...@@ -294,3 +293,52 @@ func TestMsgMarshalAllOverflowLightly(t *testing.T) { ...@@ -294,3 +293,52 @@ func TestMsgMarshalAllOverflowLightly(t *testing.T) {
testMsgMarshal(t, msg, string(zerol)) testMsgMarshal(t, msg, string(zerol))
} }
} }
func TestUUID(t *testing.T) {
var testv = []struct{typ NodeType; num int32; uuid uint32; str string}{
{STORAGE, 1, 0x00000001, "S1"},
{MASTER, 2, 0xf0000002, "M2"},
{CLIENT, 3, 0xe0000003, "C3"},
{ADMIN, 4, 0xd0000004, "A4"},
}
for _, tt := range testv {
uuid := UUID(tt.typ, tt.num)
if uint32(uuid) != tt.uuid {
t.Errorf("%v: uuid=%08x ; want %08x", tt, uuid, tt.uuid)
}
if uuids := uuid.String(); uuids != tt.str {
t.Errorf("%v: str(uuid): %q ; want %q", tt, uuids, tt.str)
}
}
}
func TestUUIDDecode(t *testing.T) {
var testv = []struct{uuid uint32; str string}{
{0, "?(0)0"},
{0x00000001, "S1"},
{0xf0000002, "M2"},
{0xe0000003, "C3"},
{0xd0000004, "A4"},
{0xc0000005, "?(4)5"},
{0xb0000006, "?(5)6"},
{0xa0000007, "?(6)7"},
{0x90000008, "?(7)8"},
{0x80000009, "?(8)9"},
{0x7000000a, "?(9)10"},
{0x6000000b, "?(10)11"},
{0x5000000c, "?(11)12"},
{0x4000000d, "?(12)13"},
{0x3000000e, "?(13)14"},
{0x2000000f, "?(14)15"},
{0x10000010, "?(15)16"},
{0x00000011, "S17"},
}
for _, tt := range testv {
str := NodeUUID(tt.uuid).String()
if str != tt.str {
t.Errorf("%08x -> %q ; want %q", tt.uuid, str, tt.str)
}
}
}
...@@ -390,7 +390,7 @@ var basicTypes = map[types.BasicKind]basicCodec{ ...@@ -390,7 +390,7 @@ var basicTypes = map[types.BasicKind]basicCodec{
types.Uint32: {4, "binary.BigEndian.PutUint32(%v, %v)", "binary.BigEndian.Uint32(%v)"}, types.Uint32: {4, "binary.BigEndian.PutUint32(%v, %v)", "binary.BigEndian.Uint32(%v)"},
types.Uint64: {8, "binary.BigEndian.PutUint64(%v, %v)", "binary.BigEndian.Uint64(%v)"}, types.Uint64: {8, "binary.BigEndian.PutUint64(%v, %v)", "binary.BigEndian.Uint64(%v)"},
types.Float64: {8, "float64_NEOEncode(%v, %v)", "float64_NEODecode(%v)"}, types.Float64: {8, "float64_neoEncode(%v, %v)", "float64_neoDecode(%v)"},
} }
// does a type have fixed wire size and, if yes, what it is? // does a type have fixed wire size and, if yes, what it is?
......
...@@ -229,7 +229,7 @@ func TestMasterStorage(t *testing.T) { ...@@ -229,7 +229,7 @@ func TestMasterStorage(t *testing.T) {
gwg := &xsync.WorkGroup{} gwg := &xsync.WorkGroup{}
// start master // start master
Mclock := &vclock{} Mclock := &vclock{100}
M := NewMaster("abc1", ":1", Mhost) M := NewMaster("abc1", ":1", Mhost)
M.monotime = Mclock.monotime M.monotime = Mclock.monotime
Mctx, Mcancel := context.WithCancel(bg) Mctx, Mcancel := context.WithCancel(bg)
...@@ -241,7 +241,7 @@ func TestMasterStorage(t *testing.T) { ...@@ -241,7 +241,7 @@ func TestMasterStorage(t *testing.T) {
// M starts listening // M starts listening
tc.Expect(netlisten("m:1")) tc.Expect(netlisten("m:1"))
tc.Expect(node(M.node, "m:1", neo.MASTER, 1, neo.RUNNING, 0.0)) tc.Expect(node(M.node, "m:1", neo.MASTER, 1, neo.RUNNING, 0))
tc.Expect(clusterState(&M.node.ClusterState, neo.ClusterRecovering)) tc.Expect(clusterState(&M.node.ClusterState, neo.ClusterRecovering))
// TODO create C; C tries connect to master - rejected ("not yet operational") // TODO create C; C tries connect to master - rejected ("not yet operational")
...@@ -269,7 +269,7 @@ func TestMasterStorage(t *testing.T) { ...@@ -269,7 +269,7 @@ func TestMasterStorage(t *testing.T) {
IdTimestamp: 0, IdTimestamp: 0,
})) }))
tc.Expect(node(M.node, "s:1", neo.STORAGE, 1, neo.PENDING, 0.01)) tc.Expect(node(M.node, "s:1", neo.STORAGE, 1, neo.PENDING, 100.01))
tc.Expect(conntx("m:2", "s:2", 1, &neo.AcceptIdentification{ tc.Expect(conntx("m:2", "s:2", 1, &neo.AcceptIdentification{
NodeType: neo.MASTER, NodeType: neo.MASTER,
...@@ -306,7 +306,7 @@ func TestMasterStorage(t *testing.T) { ...@@ -306,7 +306,7 @@ func TestMasterStorage(t *testing.T) {
exc.Raiseif(err) exc.Raiseif(err)
}) })
tc.Expect(node(M.node, "s:1", neo.STORAGE, 1, neo.RUNNING, 0.01)) tc.Expect(node(M.node, "s:1", neo.STORAGE, 1, neo.RUNNING, 100.01))
xwait(wg) xwait(wg)
// XXX M.partTab <- S1 // XXX M.partTab <- S1
...@@ -367,7 +367,7 @@ func TestMasterStorage(t *testing.T) { ...@@ -367,7 +367,7 @@ func TestMasterStorage(t *testing.T) {
IdTimestamp: 0, IdTimestamp: 0,
})) }))
tc.Expect(node(M.node, "", neo.CLIENT, 1, neo.RUNNING, 0.02)) tc.Expect(node(M.node, "", neo.CLIENT, 1, neo.RUNNING, 100.02))
tc.Expect(conntx("m:3", "c:1", 1, &neo.AcceptIdentification{ tc.Expect(conntx("m:3", "c:1", 1, &neo.AcceptIdentification{
NodeType: neo.MASTER, NodeType: neo.MASTER,
...@@ -392,16 +392,16 @@ func TestMasterStorage(t *testing.T) { ...@@ -392,16 +392,16 @@ func TestMasterStorage(t *testing.T) {
tc.Expect(conntx("m:3", "c:1", 0, &neo.NotifyNodeInformation{ tc.Expect(conntx("m:3", "c:1", 0, &neo.NotifyNodeInformation{
IdTimestamp: 0, // XXX ? IdTimestamp: 0, // XXX ?
NodeList: []neo.NodeInfo{ NodeList: []neo.NodeInfo{
nodei("m:1", neo.MASTER, 1, neo.RUNNING, 0.00), nodei("m:1", neo.MASTER, 1, neo.RUNNING, 0),
nodei("s:1", neo.STORAGE, 1, neo.RUNNING, 0.01), nodei("s:1", neo.STORAGE, 1, neo.RUNNING, 100.01),
nodei("", neo.CLIENT, 1, neo.RUNNING, 0.02), nodei("", neo.CLIENT, 1, neo.RUNNING, 100.02),
}, },
})) }))
Cnode := *(**neo.NodeApp)(unsafe.Pointer(C)) // XXX hack, fragile Cnode := *(**neo.NodeApp)(unsafe.Pointer(C)) // XXX hack, fragile
tc.Expect(node(Cnode, "m:1", neo.MASTER, 1, neo.RUNNING, 0.00)) tc.Expect(node(Cnode, "m:1", neo.MASTER, 1, neo.RUNNING, 0))
tc.Expect(node(Cnode, "s:1", neo.STORAGE, 1, neo.RUNNING, 0.01)) tc.Expect(node(Cnode, "s:1", neo.STORAGE, 1, neo.RUNNING, 100.01))
tc.Expect(node(Cnode, "", neo.CLIENT, 1, neo.RUNNING, 0.02)) tc.Expect(node(Cnode, "", neo.CLIENT, 1, neo.RUNNING, 100.02))
// C asks M about last tid XXX better master sends it itself on new client connected // C asks M about last tid XXX better master sends it itself on new client connected
......
...@@ -208,8 +208,8 @@ gensqlite() { ...@@ -208,8 +208,8 @@ gensqlite() {
#neopylite #neopylite
neopysql neopysql
#time demo-zbigarray read neo://$cluster@$Mbind #time demo-zbigarray read neo://$cluster@$Mbind
./zsha1.py neo://$cluster@$Mbind #./zsha1.py neo://$cluster@$Mbind
go run zsha1.go neo://$cluster@$Mbind go run zsha1.go -logtostderr neo://$cluster@$Mbind
xneoctl set cluster stopping xneoctl set cluster stopping
xmysql -e "SHUTDOWN" xmysql -e "SHUTDOWN"
......
...@@ -9,7 +9,7 @@ import ( ...@@ -9,7 +9,7 @@ import (
"log" "log"
"flag" "flag"
"fmt" "fmt"
"os" //"os"
"time" "time"
"lab.nexedi.com/kirr/neo/go/zodb" "lab.nexedi.com/kirr/neo/go/zodb"
...@@ -20,7 +20,7 @@ import ( ...@@ -20,7 +20,7 @@ import (
func main() { func main() {
flag.Parse() flag.Parse()
url := os.Args[1] // XXX dirty url := flag.Args()[0] // XXX dirty
bg := context.Background() bg := context.Background()
stor, err := zodb.OpenStorageURL(bg, url) stor, err := zodb.OpenStorageURL(bg, url)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment