Commit 6cf05c5e authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent c8be2150
......@@ -870,8 +870,19 @@ func (m *Master) service(ctx context.Context) (err error) {
ctx, cancel := xxcontext.Merge/*Cancel*/(ctx, peerCtx)
defer cancel()
switch node.Type {
case proto.STORAGE:
err = storCtlService(ctx, node)
case proto.CLIENT:
err = m.serveClient(ctx, node)
// XXX ADMIN
}
// XXX do we need vvv ?
ack := make(chan struct{})
servedq <- storRecovery{stor: stor, partTab: pt, err: err, ack: ack}
servedq <- serviceDone{stor: stor, err: err, ack: ack}
<-ack
// canceled service does not necessarily mean we should down the peer
......@@ -882,15 +893,12 @@ func (m *Master) service(ctx context.Context) (err error) {
})
}
// spawn per-storage service driver
for _, stor := range m.node.State.NodeTab.StorageList() {
if stor.State == proto.RUNNING { // XXX note PENDING - not adding to service; ok?
wg.Add(1)
go func() {
defer wg.Done()
err := storCtlService(ctx, stor)
serviced <- serviceDone{node: stor, err: err}
}()
// spawn peer serve driver (it should be only storages on entry here?)
for _, peer := range m.peerTab() {
// XXX clients? other nodes?
// XXX note PENDING - not adding to service; ok?
if peer.node.Type == proto.Storage && peer.node.State == proto.RUNNING {
goServe(peer)
}
}
......@@ -915,6 +923,7 @@ loop:
m.disconnectPeer(ctx, n.peer)
// if cluster became non-operational - cancel service
// XXX cancel() ?
if !m.node.State.PartTab.OperationalWith(m.node.State.NodeTab) {
err = errClusterDegraded
break loop
......@@ -927,28 +936,7 @@ loop:
break
}
wg.Add(1)
go func() {
defer wg.Done()
err = m.accept(node, state0, n.req, resp)
if err != nil {
serviced <- serviceDone{node: node, err: err}
return
}
switch node.Type {
case proto.STORAGE:
err = storCtlService(ctx, node)
case proto.CLIENT:
err = m.serveClient(ctx, node)
// XXX ADMIN
}
serviced <- serviceDone{node: node, err: err}
}()
goServe(peer)
case d := <-servedq:
// TODO if S goes away -> check partTab still operational -> if not - recovery
......@@ -967,9 +955,9 @@ loop:
}
// storCtlService drives a storage node during cluster service state
func storCtlService(ctx context.Context, stor *xneo.PeerNode) (err error) {
slink := stor.Link()
func storCtlService(ctx context.Context, stor *_MasteredPeer) (err error) {
defer task.Runningf(&ctx, "%s: stor service", stor.node.NID)(&err)
slink := stor.node.Link()
// XXX current neo/py does StartOperation / NotifyReady as separate
// sends, not exchange on the same conn. - py draftly fixed: see
......@@ -1012,8 +1000,8 @@ func storCtlService(ctx context.Context, stor *xneo.PeerNode) (err error) {
// serveClient serves incoming client link.
func (m *Master) serveClient(ctx context.Context, cli *_MasteredPeer) (err error) {
clink := cli.Link()
defer task.Runningf(&ctx, "%s: client service", cli.node.NID)(&err)
clink := cli.Link()
// wg, ctx := errgroup.WithContext(ctx) // XXX -> sync.WorkGroup
defer xio.CloseWhenDone(ctx, clink)() // XXX -> cli.ResetLink? (better not here)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment