Commit e0fdc0e7 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 3a76054d
......@@ -17,17 +17,12 @@
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
// Package xcontext provides addons to std package context.
//
// XXX docs:
// - Canceled
// - WhenDone
// Package xcontext is staging place for go123/xcontext.
package xcontext
import (
"context"
"errors"
"io"
)
// Cancelled reports whether an error is due to a canceled context.
......@@ -51,6 +46,7 @@ func Canceled(err error) bool {
}
/*
// WhenDone arranges for f to be called either when ctx is cancelled or
// surrounding function returns.
//
......@@ -59,7 +55,7 @@ func Canceled(err error) bool {
// func myfunc(ctx, ...) {
// defer xcontext.WhenDone(ctx, func() { ... })()
//
// XXX -> use WithCloseOnErrCancel instead?
// XXX -> use WithCloseOn{Err,Ret}Cancel instead?
func WhenDone(ctx context.Context, f func()) func() {
done := make(chan struct{})
go func() {
......@@ -77,41 +73,4 @@ func WhenDone(ctx context.Context, f func()) func() {
close(done)
}
}
// WithCloseOnErrCancel closes c on ctx cancel while f is run, or if f returns with an error.
//
// It is usually handy to propagate cancellation to interrupt IO. XXX when f creates/leaves link alive.
// XXX naming?
// XXX don't close on f return?
func WithCloseOnErrCancel(ctx context.Context, c io.Closer, f func() error) (err error) {
closed := false
fdone := make(chan error)
defer func() {
<-fdone // wait for f to complete
if err != nil {
if !closed {
c.Close() // XXX log err?
}
}
}()
go func() (err error) {
defer func() {
fdone <- err
close(fdone)
}()
return f()
}()
select {
case <-ctx.Done():
c.Close() // interrupt IO XXX log err?
closed = true
return ctx.Err()
case err := <-fdone:
return err
}
}
*/
......@@ -17,15 +17,16 @@
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
// Package xio provides addons to standard package io.
// Package xio is staging place for go123/xio.
package xio
import (
"context"
"errors"
"io"
"lab.nexedi.com/kirr/neo/go/internal/log"
"lab.nexedi.com/kirr/neo/go/internal/xcontext"
// "lab.nexedi.com/kirr/neo/go/internal/xcontext"
)
......@@ -45,6 +46,7 @@ func EOFok(err error) error {
return err
}
/*
// CloseWhenDone arranges for c to be closed either when ctx is cancelled or
// surrounding function returns.
//
......@@ -64,6 +66,7 @@ func CloseWhenDone(ctx context.Context, c io.Closer) func() {
}
})
}
*/
// LClose closes c and logs closing error if there was any.
// the error is otherwise ignored
......@@ -75,3 +78,65 @@ func LClose(ctx context.Context, c io.Closer) {
log.Error(ctx, err)
}
}
// WithCloseOnErrCancel closes c on ctx cancel while f is run, or if f returns with an error.
//
// It is usually handy to propagate cancellation to interrupt IO.
func WithCloseOnErrCancel(ctx context.Context, c io.Closer, f func() error) (err error) {
closed := false
fdone := make(chan error)
defer func() {
errf, ok := <-fdone // wait for f to complete XXX return f's error
if ok {
// it was ctx cancel and `return ctx.Err()` vvv
// -> change return to be what f returned
err = errf
}
if err != nil {
if !closed {
LClose(ctx, c)
}
}
}()
go func() (err error) {
defer func() {
fdone <- err
close(fdone)
}()
return f()
}()
select {
case <-ctx.Done():
LClose(ctx, c) // interrupt IO
closed = true
return ctx.Err()
case err := <-fdone:
return err
}
}
// WithCloseOnRetCancel closes c on ctx cancel while f is run, or when f returns.
//
// It is usually handy to propagate cancellation to interrupt IO.
func WithCloseOnRetCancel(ctx context.Context, c io.Closer, f func() error) error {
var errf error
err := WithCloseOnErrCancel(ctx, c, func() error {
errf = f()
e := errf
if e == nil {
e = retOK // force c close
}
return e
})
if err == retOK {
err = errf
}
return err
}
var retOK = errors.New("ok")
......@@ -32,6 +32,7 @@ import (
"github.com/golang/glog"
"github.com/pkg/errors"
"lab.nexedi.com/kirr/go123/mem"
"lab.nexedi.com/kirr/go123/xerr"
"lab.nexedi.com/kirr/go123/xcontext"
"lab.nexedi.com/kirr/go123/xnet"
"lab.nexedi.com/kirr/go123/xsync"
......@@ -98,10 +99,8 @@ func (c *Client) Close() (err error) {
// close networker if configured to do so
if c.ownNet {
err2 := c.node.Net.Close()
if err == nil {
err = err2
}
__ := c.node.Net.Close()
err = xerr.First(err, __)
}
return err
}
......
......@@ -30,6 +30,7 @@ import (
"github.com/soheilhy/cmux"
"lab.nexedi.com/kirr/go123/xerr"
"lab.nexedi.com/kirr/go123/xnet"
"lab.nexedi.com/kirr/go123/xsync"
"lab.nexedi.com/kirr/neo/go/internal/log"
......@@ -90,27 +91,30 @@ func neoMatch(r io.Reader) bool {
// default HTTP mux.
//
// Default HTTP mux can be assumed to contain /debug/pprof and the like.
func listenAndServe(ctx context.Context, net xnet.Networker, laddr string, serve func(ctx context.Context, l xnet.Listener) error) error {
func listenAndServe(ctx context.Context, net xnet.Networker, laddr string, serve func(ctx context.Context, l xnet.Listener) error) (err error) {
l, err := net.Listen(ctx, laddr)
if err != nil {
return err
}
// XXX who closes l?
defer func() { // just in case if mux.Close does not close l
__ := l.Close()
err = xerr.First(err, __)
}()
log.Infof(ctx, "listening at %s ...", l.Addr())
log.Flush() // XXX ok?
log.Flush()
mux := cmux.New(xnet.BindCtxL(l, ctx))
neoL := mux.Match(neoMatch)
httpL := mux.Match(cmux.HTTP1(), cmux.HTTP2()) // XXX verify http2 works
httpL := mux.Match(cmux.HTTP1(), cmux.HTTP2())
miscL := mux.Match(cmux.Any())
wg := xsync.NewWorkGroup(ctx)
wg.Go(func(ctx context.Context) error {
// XXX shutdown serve on ctx cancel (-> WithCloseOnErrCancel(mux); cmux recently added mux.Close)
return mux.Serve()
return xio.WithCloseOnRetCancel(ctx, &noErrCloser{mux},
mux.Serve,
)
})
wg.Go(func(ctx context.Context) error {
......@@ -118,12 +122,15 @@ func listenAndServe(ctx context.Context, net xnet.Networker, laddr string, serve
})
wg.Go(func(ctx context.Context) error {
// XXX shutdown http on ctx cancel
return http.Serve(httpL, nil)
srv := &http.Server{}
return xio.WithCloseOnRetCancel(ctx, srv, func() error {
// TODO ^^^ better Shutdown instead of Close
return srv.Serve(httpL)
})
})
wg.Go(func(ctx context.Context) error {
// XXX shutdown on ctx cancel
return xio.WithCloseOnRetCancel(ctx, miscL, func() error {
for {
conn, err := miscL.Accept()
if err != nil {
......@@ -140,13 +147,22 @@ func listenAndServe(ctx context.Context, net xnet.Networker, laddr string, serve
if n > 0 {
serr = fmt.Sprintf("peer sent %q", b[:n])
}
log.Infof(ctx, "%s: %s", subj, serr)
log.Warningf(ctx, "%s: %s", subj, serr)
xio.LClose(ctx, conn)
}
})
})
return wg.Wait()
}
err = wg.Wait()
return err
// noErrCloser turns `Close()` -> `Close() err` that alwasys returns nil.
type noErrCloser struct {
c interface { Close() }
}
func (c *noErrCloser) Close() error {
c.c.Close()
return nil
}
......@@ -54,6 +54,7 @@ import (
"time"
"lab.nexedi.com/kirr/go123/xcontext"
"lab.nexedi.com/kirr/go123/xerr"
"lab.nexedi.com/kirr/go123/xnet"
"lab.nexedi.com/kirr/go123/xsync"
......@@ -232,6 +233,10 @@ func (m *Master) Run(ctx context.Context, l xnet.Listener) (err error) {
// wrap listener with link / identification hello checker
lli := xneo.NewListener(neonet.NewLinkListener(l))
defer func() {
__ := lli.Close()
err = xerr.First(err, __)
}()
// accept: accept incoming connections and pass them to main driver
m.mainWG.Go(func(ctx context.Context) (err error) {
......@@ -286,8 +291,6 @@ func (m *Master) Run(ctx context.Context, l xnet.Listener) (err error) {
err = ctx.Err()
}
xio.LClose(ctx, lli) // XXX here ok? (probably not)
return err
}
......@@ -1007,9 +1010,8 @@ func (m *Master) serveClient(ctx context.Context, cli *_MasteredPeer) (err error
defer task.Runningf(&ctx, "%s: serve client", cli.node.NID)(&err)
clink := cli.node.Link()
defer xio.CloseWhenDone(ctx, clink)()
// M <- C requests handler
return xio.WithCloseOnRetCancel(ctx, clink, func() error {
for {
req, err := clink.Recv1()
if err != nil {
......@@ -1023,6 +1025,7 @@ func (m *Master) serveClient(ctx context.Context, cli *_MasteredPeer) (err error
return err
}
}
})
}
// serveClient1 prepares response for 1 request from client.
......@@ -1121,9 +1124,8 @@ func (m *Master) identify(ctx context.Context, n nodeCome) (peer *_MasteredPeer,
if err != nil {
log.Infof(ctx, "%s: rejecting: %s", subj, err)
m.mainWG.Go(func(ctx context.Context) error {
xxcontext.WithCloseOnErrCancel(ctx, link, func() error {
n.req.Reply(err)
return fmt.Errorf("X") // to close link
xio.WithCloseOnRetCancel(ctx, link, func() error {
return n.req.Reply(err)
})
return nil // not to cancel main by a failing reject
})
......@@ -1200,7 +1202,7 @@ func (m *Master) identify(ctx context.Context, n nodeCome) (peer *_MasteredPeer,
})
// XXX compensate a bit for lack of ctx handling in Send/Recv
return xxcontext.WithCloseOnErrCancel(ctx, link, func() error {
return xio.WithCloseOnErrCancel(ctx, link, func() error {
// send accept and indicate to run that initial acceptance is done
err := peer.accept(ctx)
......
......@@ -31,7 +31,7 @@ import (
"lab.nexedi.com/kirr/neo/go/internal/log"
"lab.nexedi.com/kirr/neo/go/internal/task"
"lab.nexedi.com/kirr/neo/go/internal/xcontext"
"lab.nexedi.com/kirr/neo/go/internal/xio"
"lab.nexedi.com/kirr/neo/go/neo/neonet"
"lab.nexedi.com/kirr/neo/go/neo/proto"
"lab.nexedi.com/kirr/neo/go/neo/xneo"
......@@ -149,8 +149,7 @@ func (node *_MasteredNode) talkMaster1(ctx, ctxPreTalkM context.Context, f func(
return err
}
// XXX close on RetCancel
return xcontext.WithCloseOnErrCancel(ctx, mlink, func() (err error) {
return xio.WithCloseOnRetCancel(ctx, mlink, func() (err error) {
if accept.YourNID != node.MyInfo.NID {
log.Infof(ctx, "master %s told us to be %s", accept.MyNID, accept.YourNID)
node.MyInfo.NID = accept.YourNID // XXX locking ? -> opMu ?
......
......@@ -32,7 +32,6 @@ import (
"lab.nexedi.com/kirr/go123/xerr"
"lab.nexedi.com/kirr/go123/xnet"
"lab.nexedi.com/kirr/neo/go/internal/xcontext"
"lab.nexedi.com/kirr/neo/go/internal/xio"
"lab.nexedi.com/kirr/neo/go/neo/internal/tneonet"
"lab.nexedi.com/kirr/neo/go/neo/proto"
......@@ -114,7 +113,7 @@ func _handshakeClient(ctx context.Context, conn net.Conn, version uint32, encPre
rxbuf = newXBufReader(conn, /*any non-small limit*/1024)
var peerEnc proto.Encoding
err = xcontext.WithCloseOnErrCancel(ctx, conn, func() error {
err = xio.WithCloseOnErrCancel(ctx, conn, func() error {
// tx client hello
err := txHello("tx hello", conn, version, encPrefer)
if err != nil {
......@@ -154,7 +153,7 @@ func _handshakeServer(ctx context.Context, conn net.Conn, version uint32) (enc p
rxbuf = newXBufReader(conn, /*any non-small limit*/1024)
var peerEnc proto.Encoding
err = xcontext.WithCloseOnErrCancel(ctx, conn, func() error {
err = xio.WithCloseOnErrCancel(ctx, conn, func() error {
// rx client hello
var peerVer uint32
var err error
......
......@@ -87,9 +87,18 @@ func (stor *Storage) Run(ctx context.Context, l xnet.Listener) (err error) {
// wrap listener with link / identificaton hello checker
stor.lli = xneo.NewListener(neonet.NewLinkListener(l))
defer func() {
__ := stor.lli.Close()
err = xerr.First(err, __)
}()
defer func() {
__ := stor.back.Close()
err = xerr.First(err, __)
}()
// connect to master and let it drive us via commands and updates
err = stor.node.TalkMaster(ctx, func(ctx context.Context, mlink *_MasterLink) error {
return stor.node.TalkMaster(ctx, func(ctx context.Context, mlink *_MasterLink) error {
// XXX move -> SetNumReplicas handler
// // NumReplicas: neo/py meaning for n(replica) = `n(real-replica) - 1`
// if !(accept.NumPartitions == 1 && accept.NumReplicas == 0) {
......@@ -105,15 +114,6 @@ func (stor *Storage) Run(ctx context.Context, l xnet.Listener) (err error) {
// we got StartOperation command. Let master drive us during service phase.
return stor.m1serve(ctx, mlink, reqStart)
})
// XXX should Storage do it, or should it leave back non-closed?
// TODO -> Storage should not close backend.
err2 := stor.back.Close()
if err == nil {
err = err2
}
return err
}
// m1initialize drives storage by master messages during initialization phase
......@@ -286,7 +286,9 @@ func (stor *Storage) serve(ctx context.Context) (err error) {
wg.Add(1)
go func() {
defer wg.Done()
err := stor.serveLink(ctx, req, idReq)
err := xio.WithCloseOnRetCancel(ctx, req.Link(), func() error {
return stor.serveLink(ctx, req, idReq)
})
if err == nil {
if ctx.Err() == nil {
// the error is not due to serve cancel
......@@ -334,7 +336,6 @@ func (stor *Storage) identify_(idReq *proto.RequestIdentification) (proto.Msg, *
func (stor *Storage) serveLink(ctx context.Context, req *neonet.Request, idReq *proto.RequestIdentification) (err error) {
link := req.Link()
defer task.Runningf(&ctx, "serve %s", idReq.NID)(&err)
defer xio.CloseWhenDone(ctx, link)()
// first process identification
// TODO -> .Accept() that would listen, handshake and identify a node
......
......@@ -30,7 +30,7 @@ import (
"lab.nexedi.com/kirr/neo/go/internal/log"
"lab.nexedi.com/kirr/neo/go/internal/task"
"lab.nexedi.com/kirr/neo/go/internal/xcontext"
"lab.nexedi.com/kirr/neo/go/internal/xio"
"lab.nexedi.com/kirr/neo/go/neo/neonet"
"lab.nexedi.com/kirr/neo/go/neo/proto"
)
......@@ -75,7 +75,7 @@ func (l *listener) Accept(ctx context.Context) (_ *neonet.Request, msgID *proto.
// the first conn must come with RequestIdentification packet
defer xerr.Context(&err, "identify")
var req neonet.Request
err = xcontext.WithCloseOnErrCancel(ctx, link, func() error {
err = xio.WithCloseOnErrCancel(ctx, link, func() error {
var err error
req, err = link.Recv1(/*XXX ctx*/)
if err != nil {
......@@ -123,7 +123,7 @@ func Dial(ctx context.Context, typ proto.NodeType, net xnet.Networker, addr stri
defer xerr.Contextf(&err, "%s: request identification", link)
accept := &proto.AcceptIdentification{}
err = xcontext.WithCloseOnErrCancel(ctx, link, func() error {
err = xio.WithCloseOnErrCancel(ctx, link, func() error {
// FIXME error if peer sends us something with another connID
// (currently we ignore and serveRecv will deadlock)
//
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment