Commit cf2128bf authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent a702d674
...@@ -177,30 +177,95 @@ func (stor *Storage) talkMaster1(ctx context.Context) error { ...@@ -177,30 +177,95 @@ func (stor *Storage) talkMaster1(ctx context.Context) error {
} }
// now handle notifications and commands from master // now handle notifications and commands from master
// FIXME wrong - either keep conn as one used from identification or accept from listening
conn, err := Mlink.NewConn()
if err != nil { panic(err) } // XXX
for { for {
notify, err := conn.Recv() Mconn, err := Mlink.Accept()
if err != nil { if err != nil {
// XXX TODO return // XXX ?
} }
_ = notify // XXX temp err = stor.m1initialize(ctx, Mconn)
if err != nil {
panic(err) // XXX
} }
err = stor.m1serve(ctx, Mconn)
if err != nil {
panic(err) // XXX
}
}
return nil // XXX err
}
// m1initialize drives storage by master messages during initialization phase
//
// when it finishes error indicates:
// - nil: initialization was ok and a command came from master to start operation
// - !nil: initialization was cancelled or failed somehow
func (stor *Storage) m1initialize(ctx context.Context, Mconn *neo.Conn) error {
for {
msg, err := Mconn.Recv() // XXX abort on ctx (XXX or upper?)
if err != nil {
panic(err) // XXX
}
switch msg.(type) {
case *neo.AskRecovery:
// TODO send M (ptid, backup_tid, truncate_tid)
case *neo.AskPartitionTable:
// TODO read and send M locally-saved PT (ptid, []PtRow)
case *neo.AskLockedTransaction:
// TODO // TODO
// <- StartOperation
case *neo.AskLastIDs:
// TODO send M (last_oid, last_tid)
case *neo.NotifyPartitionTable:
// TODO save locally what M told us
case *neo.NotifyClusterInformation:
// TODO .clusterState = ... XXX what to do with it?
case *neo.NotifyNodeInformation:
// XXX check for myUUID and condier it a command (like neo/py) does?
// TODO update .nodeTab
case *neo.StartOperation:
return nil // ok
default:
// XXX
}
}
}
// m1serve drives storage by master messages during service hase
//
// XXX err return - document
func (stor *Storage) m1serve(ctx contextContext, Mconn *neo.Conn) error {
// refresh stor.opCtx and cancel it when we finish
opCtx, opCancel := context.WithCancel(ctx) opCtx, opCancel := context.WithCancel(ctx)
stor.opMu.Lock() stor.opMu.Lock()
stor.opCtx = opCtx stor.opCtx = opCtx
stor.opMu.Unlock() stor.opMu.Unlock()
defer opCancel() defer opCancel()
// reply M we are ready
err := Mconn.Send(neo.NotifyReady{})
if err != nil {
return err // XXX err ctx
}
return nil // XXX err // TODO handle M notifications and commands
for {
// TODO
}
} }
// ServeLink serves incoming node-node link connection // ServeLink serves incoming node-node link connection
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment