Commit cf78ad88 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 03077c10
......@@ -44,6 +44,12 @@ type Storage struct {
masterAddr string // address of master
// ---- 8< ----
// context for providing operational service
// it is renewed every time master tells us StartOpertion, so users
// must read it initially only once under opMu
opMu sync.Mutex
opCtx context.Context
zstor zodb.IStorage // underlying ZODB storage XXX temp ?
}
......@@ -65,6 +71,11 @@ func NewStorage(cluster, masterAddr, serveAddr string, net xnet.Networker, zstor
zstor: zstor,
}
// operational context is initially done (no service should be provided)
noOpCtx, cancel := context.WithCancel(context.Background())
stor.opCtx = noOpCtx
cancel()
return stor
}
......@@ -120,8 +131,8 @@ func (stor *Storage) talkMaster(ctx context.Context) error {
fmt.Printf("stor: master(%v): connecting\n", stor.masterAddr) // XXX info
err := stor.talkMaster1(ctx)
fmt.Printf("stor: master(%v): %v\n", stor.masterAddr, err)
// TODO if err = shutdown -> return
// TODO if err = shutdown -> return
// XXX handle shutdown command from master
// throttle reconnecting / exit on cancel
......@@ -207,7 +218,8 @@ func (stor *Storage) ServeLink(ctx context.Context, link *neo.NodeLink) {
link.Close() // XXX err
}()
// XXX recheck identification logic here
// XXX only accept clients
// XXX only accept when operational (?)
nodeInfo, err := IdentifyPeer(link, neo.STORAGE)
if err != nil {
fmt.Printf("stor: %v\n", err)
......@@ -220,6 +232,7 @@ func (stor *Storage) ServeLink(ctx context.Context, link *neo.NodeLink) {
serveConn = stor.ServeClient
default:
// XXX vvv should be reply to peer
fmt.Printf("stor: %v: unexpected peer type: %v\n", link, nodeInfo.NodeType)
return
}
......@@ -232,7 +245,6 @@ func (stor *Storage) ServeLink(ctx context.Context, link *neo.NodeLink) {
break
}
// XXX adjust ctx ?
// XXX wrap conn close to happen here, not in ServeClient ?
go serveConn(ctx, conn)
}
......@@ -241,24 +253,28 @@ func (stor *Storage) ServeLink(ctx context.Context, link *neo.NodeLink) {
}
func (stor *Storage) ServeMaster(ctx context.Context, conn *neo.Conn) {
// state changes:
//
// - Recovery
// - StartOperation
// - StopOperation
// ? NotifyClusterInformation
// - NotifyNodeInformation (e.g. M tells us we are RUNNING)
// ? NotifyPartitionTable
}
// ServeClient serves incoming connection on which peer identified itself as client
// XXX +error return?
func (stor *Storage) ServeClient(ctx context.Context, conn *neo.Conn) {
fmt.Printf("stor: %s: serving new client conn\n", conn)
// rederive ctx from ctx and .operationCtx (which is cancelled when M tells us StopOperation)
// XXX -> xcontext ?
ctx, opCancel := context.WithCancel(ctx)
go func() {
// cancel ctx when global operation context is cancelled
stor.opMu.Lock()
opCtx := stor.opCtx
stor.opMu.Unlock()
select {
case <-opCtx.Done():
opCancel()
case <-ctx.Done():
// noop - to avoid goroutine leak
}
}()
// close connection when either cancelling or returning (e.g. due to an error)
// ( when cancelling - conn.Close will signal to current IO to
// terminate with an error )
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment