Commit cd68d92f authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 72ee34f1
...@@ -55,7 +55,7 @@ type Master struct { ...@@ -55,7 +55,7 @@ type Master struct {
// channels controlling main driver // channels controlling main driver
ctlStart chan chan error // request to start cluster ctlStart chan chan error // request to start cluster
ctlStop chan chan error // request to stop cluster ctlStop chan chan struct{} // request to stop cluster
ctlShutdown chan chan error // request to shutdown cluster XXX with ctx ? ctlShutdown chan chan error // request to shutdown cluster XXX with ctx ?
// channels from workers directly serving peers to main driver // channels from workers directly serving peers to main driver
...@@ -92,7 +92,7 @@ func NewMaster(clusterName, serveAddr string, net xnet.Networker) *Master { ...@@ -92,7 +92,7 @@ func NewMaster(clusterName, serveAddr string, net xnet.Networker) *Master {
masterAddr: serveAddr, // XXX ok? masterAddr: serveAddr, // XXX ok?
ctlStart: make(chan chan error), ctlStart: make(chan chan error),
ctlStop: make(chan chan error), ctlStop: make(chan chan struct{}),
ctlShutdown: make(chan chan error), ctlShutdown: make(chan chan error),
nodeCome: make(chan nodeCome), nodeCome: make(chan nodeCome),
...@@ -116,8 +116,8 @@ func (m *Master) Run(ctx context.Context) error { ...@@ -116,8 +116,8 @@ func (m *Master) Run(ctx context.Context) error {
m.masterAddr = l.Addr().String() m.masterAddr = l.Addr().String()
// serve incoming connections
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
serveCtx, serveCancel := context.WithCancel(ctx) serveCtx, serveCancel := context.WithCancel(ctx)
wg.Add(1) wg.Add(1)
go func() { go func() {
...@@ -126,7 +126,9 @@ func (m *Master) Run(ctx context.Context) error { ...@@ -126,7 +126,9 @@ func (m *Master) Run(ctx context.Context) error {
_ = err // XXX what to do with err ? _ = err // XXX what to do with err ?
}() }()
// main driving logic
err = m.runMain(ctx) err = m.runMain(ctx)
serveCancel() serveCancel()
wg.Wait() wg.Wait()
...@@ -147,9 +149,8 @@ func (m *Master) Start() error { ...@@ -147,9 +149,8 @@ func (m *Master) Start() error {
} }
// Stop requests cluster to eventually transition into recovery state // Stop requests cluster to eventually transition into recovery state
// XXX should be always possible ? func (m *Master) Stop() {
func (m *Master) Stop() error { ech := make(chan struct{})
ech := make(chan error)
m.ctlStop <- ech m.ctlStop <- ech
return <-ech return <-ech
} }
...@@ -177,20 +178,27 @@ func (m *Master) runMain(ctx context.Context) (err error) { ...@@ -177,20 +178,27 @@ func (m *Master) runMain(ctx context.Context) (err error) {
// NOTE Run's goroutine is the only mutator of nodeTab, partTab and other cluster state // NOTE Run's goroutine is the only mutator of nodeTab, partTab and other cluster state
for ctx.Err() == nil { for ctx.Err() == nil {
// recover partition table from storages and wait till enough
// storages connects us so that we can see the partition table
// can be operational.
//
// Successful recovery means all ^^^ preconditions are met and
// a command came to us to start the cluster.
err := m.recovery(ctx) err := m.recovery(ctx)
if err != nil { if err != nil {
fmt.Println(err) fmt.Println(err)
return err // recovery cancelled return err // recovery cancelled
} }
// successful recovery -> verify // make sure transactions on storages are properly finished, in
// case previously it was unclean shutdown.
err = m.verify(ctx) err = m.verify(ctx)
if err != nil { if err != nil {
fmt.Println(err) fmt.Println(err)
continue // -> recovery continue // -> recovery
} }
// successful verify -> service // provide service as long as partition table stays operational
err = m.service(ctx) err = m.service(ctx)
if err != nil { if err != nil {
fmt.Println(err) fmt.Println(err)
...@@ -301,7 +309,7 @@ loop: ...@@ -301,7 +309,7 @@ loop:
ech <- fmt.Errorf("start: cluster is non-operational") ech <- fmt.Errorf("start: cluster is non-operational")
case ech := <-m.ctlStop: case ech := <-m.ctlStop:
ech <- nil // we are already recovering close(ech) // ok; we are already recovering
case <-ctx.Done(): case <-ctx.Done():
err = ctx.Err() err = ctx.Err()
...@@ -491,7 +499,7 @@ loop: ...@@ -491,7 +499,7 @@ loop:
ech <- nil // we are already starting ech <- nil // we are already starting
case ech := <-m.ctlStop: case ech := <-m.ctlStop:
ech <- nil // ok close(ech) // ok
err = errStopRequested err = errStopRequested
break loop break loop
...@@ -577,6 +585,8 @@ func (m *Master) service(ctx context.Context) (err error) { ...@@ -577,6 +585,8 @@ func (m *Master) service(ctx context.Context) (err error) {
// XXX we also need to tell storages StartOperation first // XXX we also need to tell storages StartOperation first
m.setClusterState(neo.ClusterRunning) m.setClusterState(neo.ClusterRunning)
// XXX spawn per-storage driver about nodetab
loop: loop:
for { for {
select { select {
...@@ -604,8 +614,9 @@ loop: ...@@ -604,8 +614,9 @@ loop:
ech <- nil // we are already started ech <- nil // we are already started
case ech := <-m.ctlStop: case ech := <-m.ctlStop:
ech <- nil // ok close(ech) // ok
err = fmt.Errorf("stop requested") err = fmt.Errorf("stop requested")
// XXX tell storages to stop
break loop break loop
case <-ctx.Done(): case <-ctx.Done():
......
...@@ -34,9 +34,7 @@ import ( ...@@ -34,9 +34,7 @@ import (
"lab.nexedi.com/kirr/go123/xerr" "lab.nexedi.com/kirr/go123/xerr"
) )
// XXX fmt -> log // Storage is NEO node that keeps data and provides read/write access to it
// Storage is NEO storage server application
type Storage struct { type Storage struct {
neo.NodeCommon neo.NodeCommon
...@@ -60,6 +58,7 @@ type Storage struct { ...@@ -60,6 +58,7 @@ type Storage struct {
// Use Run to actually start running the node. // Use Run to actually start running the node.
func NewStorage(cluster, masterAddr, serveAddr string, net xnet.Networker, zstor zodb.IStorage) *Storage { func NewStorage(cluster, masterAddr, serveAddr string, net xnet.Networker, zstor zodb.IStorage) *Storage {
// convert serveAddr into neo format // convert serveAddr into neo format
// XXX -> new.NewNode() ?
addr, err := neo.AddrString(net.Network(), serveAddr) addr, err := neo.AddrString(net.Network(), serveAddr)
if err != nil { if err != nil {
panic(err) // XXX panic(err) // XXX
...@@ -86,7 +85,7 @@ func NewStorage(cluster, masterAddr, serveAddr string, net xnet.Networker, zstor ...@@ -86,7 +85,7 @@ func NewStorage(cluster, masterAddr, serveAddr string, net xnet.Networker, zstor
// commands it to shutdown. // commands it to shutdown.
func (stor *Storage) Run(ctx context.Context) error { func (stor *Storage) Run(ctx context.Context) error {
// start listening // start listening
l, err := stor.Listen() l, err := stor.node.Listen()
if err != nil { if err != nil {
return err // XXX err ctx return err // XXX err ctx
} }
...@@ -112,7 +111,7 @@ func (stor *Storage) Run(ctx context.Context) error { ...@@ -112,7 +111,7 @@ func (stor *Storage) Run(ctx context.Context) error {
return err // XXX err ctx return err // XXX err ctx
} }
// --- channel with master directing us --- // --- connect to master and let it direct us ---
// talkMaster connects to master, announces self and receives commands and notifications. // talkMaster connects to master, announces self and receives commands and notifications.
// it tries to persist master link reconnecting as needed // it tries to persist master link reconnecting as needed
...@@ -293,6 +292,10 @@ func (stor *Storage) m1initialize(ctx context.Context, Mconn *neo.Conn) (err err ...@@ -293,6 +292,10 @@ func (stor *Storage) m1initialize(ctx context.Context, Mconn *neo.Conn) (err err
// m1serve drives storage by master messages during service phase // m1serve drives storage by master messages during service phase
// //
// Service is regular phase serving requests from clients to load/save object,
// handling transaction commit (with master) and syncing data with other
// storage nodes (XXX correct?).
//
// it always returns with an error describing why serve has to be stopped - // it always returns with an error describing why serve has to be stopped -
// either due to master commanding us to stop, or context cancel or some other // either due to master commanding us to stop, or context cancel or some other
// error. // error.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment