Commit 7025320e authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent ad1c9f61
...@@ -29,6 +29,7 @@ import ( ...@@ -29,6 +29,7 @@ import (
"lab.nexedi.com/kirr/neo/go/neo" "lab.nexedi.com/kirr/neo/go/neo"
"lab.nexedi.com/kirr/neo/go/zodb" "lab.nexedi.com/kirr/neo/go/zodb"
"lab.nexedi.com/kirr/neo/go/xcommon/xnet" "lab.nexedi.com/kirr/neo/go/xcommon/xnet"
"lab.nexedi.com/kirr/neo/go/xcommon/xcontext"
"lab.nexedi.com/kirr/go123/xerr" "lab.nexedi.com/kirr/go123/xerr"
) )
...@@ -48,7 +49,7 @@ type Storage struct { ...@@ -48,7 +49,7 @@ type Storage struct {
// context for providing operational service // context for providing operational service
// it is renewed every time master tells us StartOpertion, so users // it is renewed every time master tells us StartOpertion, so users
// must read it initially only once under opMu // must read it initially only once under opMu via opCtxRead
opMu sync.Mutex opMu sync.Mutex
opCtx context.Context opCtx context.Context
...@@ -160,7 +161,11 @@ func (stor *Storage) talkMaster1(ctx context.Context) error { ...@@ -160,7 +161,11 @@ func (stor *Storage) talkMaster1(ctx context.Context) error {
return err return err
} }
defer func() {
errClose := Mlink.Close()
err = xerr.First(err, errClose)
// TODO Mlink.Close() on return / cancel // TODO Mlink.Close() on return / cancel
}()
// request identification this way registering our node to master // request identification this way registering our node to master
accept, err := neo.IdentifyWith(neo.MASTER, Mlink, stor.myInfo, stor.clusterName) accept, err := neo.IdentifyWith(neo.MASTER, Mlink, stor.myInfo, stor.clusterName)
...@@ -180,13 +185,20 @@ func (stor *Storage) talkMaster1(ctx context.Context) error { ...@@ -180,13 +185,20 @@ func (stor *Storage) talkMaster1(ctx context.Context) error {
} }
// now handle notifications and commands from master // now handle notifications and commands from master
var Mconn *neo.Conn
for { for {
// accept next connection from master. only 1 connection is served at any given time
// XXX every new connection from master means previous connection was closed // XXX every new connection from master means previous connection was closed
// XXX how to do so and stay compatible to py? // XXX how to do so and stay compatible to py?
// //
// XXX or simply use only the first connection and if M decides // XXX or simply use only the first connection and if M decides
// to cancel - close whole nodelink and S reconnects? // to cancel - close whole nodelink and S reconnects?
Mconn, err := Mlink.Accept() if Mconn != nil {
Mconn.Close() // XXX err
Mconn = nil
}
Mconn, err = Mlink.Accept()
if err != nil { if err != nil {
return err // XXX ? return err // XXX ?
} }
...@@ -200,7 +212,7 @@ func (stor *Storage) talkMaster1(ctx context.Context) error { ...@@ -200,7 +212,7 @@ func (stor *Storage) talkMaster1(ctx context.Context) error {
err = stor.m1serve(ctx, Mconn) err = stor.m1serve(ctx, Mconn)
fmt.Println("stor: %v: master: %v", err) fmt.Println("stor: %v: master: %v", err)
// XXX check if it was command to shotdown and if so break // XXX check if it was command to shutdown and if so break
continue // retry from initializing continue // retry from initializing
} }
...@@ -320,6 +332,13 @@ func (stor *Storage) m1serve(ctx context.Context, Mconn *neo.Conn) (err error) { ...@@ -320,6 +332,13 @@ func (stor *Storage) m1serve(ctx context.Context, Mconn *neo.Conn) (err error) {
} }
} }
// XXX naming -> withOpCtx? withUntilOperational?
func (stor *Storage) opCtxRead() context.Context {
stor.opMu.Lock()
defer stor.opMu.Unlock()
return stor.opCtx
}
// ServeLink serves incoming node-node link connection // ServeLink serves incoming node-node link connection
// XXX +error return? // XXX +error return?
func (stor *Storage) ServeLink(ctx context.Context, link *neo.NodeLink) { func (stor *Storage) ServeLink(ctx context.Context, link *neo.NodeLink) {
...@@ -382,36 +401,18 @@ func (stor *Storage) ServeLink(ctx context.Context, link *neo.NodeLink) { ...@@ -382,36 +401,18 @@ func (stor *Storage) ServeLink(ctx context.Context, link *neo.NodeLink) {
func (stor *Storage) ServeClient(ctx context.Context, conn *neo.Conn) { func (stor *Storage) ServeClient(ctx context.Context, conn *neo.Conn) {
fmt.Printf("stor: %s: serving new client conn\n", conn) fmt.Printf("stor: %s: serving new client conn\n", conn)
// rederive ctx from ctx and .operationCtx (which is cancelled when M tells us StopOperation) // rederive ctx to be also cancelled if M tells us StopOperation
// XXX -> xcontext ? ctx, cancel := xcontext.Merge(ctx, stor.opCtxRead())
ctx, opCancel := context.WithCancel(ctx) //ctx, cancel := stor.withWhileOperational(ctx)
go func() { defer cancel()
// cancel ctx when global operation context is cancelled
stor.opMu.Lock()
opCtx := stor.opCtx
stor.opMu.Unlock()
select {
case <-opCtx.Done():
opCancel()
case <-ctx.Done():
// noop - to avoid goroutine leak
}
}()
// close connection when either cancelling or returning (e.g. due to an error) // close connection when either cancelling or returning (e.g. due to an error)
// ( when cancelling - conn.Close will signal to current IO to // ( when cancelling - conn.Close will signal to current IO to
// terminate with an error ) // terminate with an error )
// XXX dup -> utility
retch := make(chan struct{})
defer func() { close(retch) }()
go func() { go func() {
select { <-ctx.Done()
case <-ctx.Done(): // XXX tell client if we are shutting down?
// XXX tell client we are shutting down?
// XXX ret err = cancelled ? // XXX ret err = cancelled ?
case <-retch:
}
fmt.Printf("stor: %v: closing client conn\n", conn) fmt.Printf("stor: %v: closing client conn\n", conn)
conn.Close() // XXX err conn.Close() // XXX err
}() }()
......
// Copyright (C) 2017 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
// Package xcontext provides addons to std package context.
package xcontext
import (
"context"
"sync"
"time"
)
// Merge merges 2 contexts into 1.
//
// The result context:
// - is done when ctx1 or ctx2 is done, or cancel called, whichever happens first,
// - has deadline = min(ctx1.Deadline, ctx2.Deadline),
// - has associated values merged from ctx1 and ctx2.
//
// Canceling this context releases resources associated with it, so code should
// call cancel as soon as the operations running in this Context complete.
func Merge(ctx1, ctx2 context.Context) (context.Context, context.CancelFunc) {
mc := &mergeCtx{
ctx1: ctx1,
ctx2: ctx2,
done: make(chan struct{}),
cancelCh: make(chan struct{}),
}
go mc.wait()
return mc, mc.cancel
}
type mergeCtx struct {
ctx1, ctx2 context.Context
done chan struct{}
doneErr error
cancelCh chan struct{}
cancelOnce sync.Once
}
// wait waits when .ctx1 or .ctx2 is done and then mark mergeCtx as done
func (mc *mergeCtx) wait() {
select {
case <-mc.ctx1.Done():
mc.doneErr = mc.ctx1.Err()
case <-mc.ctx2.Done():
mc.doneErr = mc.ctx2.Err()
case <-mc.cancelCh:
mc.doneErr = context.Canceled
}
close(mc.done)
}
// cancel sends signal to wait to shutdown
// cancel is the context.CancelFunc returned for mergeCtx by Merge
func (mc *mergeCtx) cancel() {
mc.cancelOnce.Do(func() {
close(mc.cancelCh)
})
}
func (mc *mergeCtx) Done() <-chan struct{} {
return mc.done
}
func (mc *mergeCtx) Err() error {
// synchronize on .done to avoid .doneErr read races
select {
case <-mc.done:
default:
// done not yet closed
return nil
}
// .done closed; .doneErr was set before - no race
return mc.doneErr
}
func (mc *mergeCtx) Deadline() (time.Time, bool) {
d1, ok1 := mc.ctx1.Deadline()
d2, ok2 := mc.ctx2.Deadline()
switch {
case !ok1:
return d2, ok2
case !ok2:
return d1, ok1
case d1.Before(d2):
return d1, true
default:
return d2, true
}
}
func (mc *mergeCtx) Value(key interface{}) interface{} {
v := mc.ctx1.Value(key)
if v != nil {
return v
}
return mc.ctx2.Value(key)
}
// Copyright (C) 2017 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
// Package xcontext provides addons to std package context.
package xcontext
import (
"context"
"testing"
"time"
)
func TestMerge(t *testing.T) {
bg := context.Background()
ctx1, cancel1 := context.WithCancel(bg)
ctx2, cancel2 := context.WithCancel(bg)
ctx1 = context.WithValue(ctx1, 1, "hello")
ctx2 = context.WithValue(ctx2, 2, "world")
mc, _ := Merge(ctx1, ctx2)
assertEq := func(a, b interface{}) {
t.Helper()
if a != b {
t.Fatalf("%v != %v", a, b)
}
}
assertEq(mc.Value(1), "hello")
assertEq(mc.Value(2), "world")
assertEq(mc.Value(3), nil)
t0 := time.Time{}
d, ok := mc.Deadline()
if !(d == t0 && ok == false) {
t.Fatal("deadline must be unset")
}
assertEq(mc.Err(), nil)
select {
case <-mc.Done():
t.Fatal("done before any parent done")
default:
}
cancel2()
<-mc.Done()
assertEq(mc.Err(), context.Canceled)
////////
mc, _ = Merge(ctx1, bg)
assertEq(mc.Value(1), "hello")
assertEq(mc.Value(2), nil)
assertEq(mc.Value(3), nil)
d, ok = mc.Deadline()
if !(d == t0 && ok == false) {
t.Fatal("deadline must be unset")
}
assertEq(mc.Err(), nil)
select {
case <-mc.Done():
t.Fatal("done before any parent done")
default:
}
cancel1()
<-mc.Done()
assertEq(mc.Err(), context.Canceled)
////////
t1 := t0.AddDate(7777, 1, 1)
t2 := t0.AddDate(9999, 1, 1)
ctx1, _ = context.WithDeadline(bg, t1)
ctx2, _ = context.WithDeadline(bg, t2)
checkDeadline := func(a, b context.Context, tt time.Time) {
t.Helper()
m, _ := Merge(a, b)
d, ok := m.Deadline()
if !ok {
t.Fatal("no deadline returned")
}
if d != tt {
t.Fatalf("incorrect deadline: %v ; want %v", d, tt)
}
}
checkDeadline(ctx1, bg, t1)
checkDeadline(bg, ctx2, t2)
checkDeadline(ctx1, ctx2, t1)
checkDeadline(ctx2, ctx1, t1)
////////
mc, mcancel := Merge(bg, bg)
select {
case <-mc.Done():
t.Fatal("done before any parent done")
default:
}
mcancel()
mcancel()
<-mc.Done()
assertEq(mc.Err(), context.Canceled)
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment