Commit 9c047ee1 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 7badf3b1
...@@ -28,7 +28,6 @@ import ( ...@@ -28,7 +28,6 @@ import (
"os" "os"
"strings" "strings"
"sync" "sync"
// "time"
"github.com/pkg/errors" "github.com/pkg/errors"
"lab.nexedi.com/kirr/go123/mem" "lab.nexedi.com/kirr/go123/mem"
...@@ -37,9 +36,7 @@ import ( ...@@ -37,9 +36,7 @@ import (
"lab.nexedi.com/kirr/go123/xnet" "lab.nexedi.com/kirr/go123/xnet"
"lab.nexedi.com/kirr/go123/xsync" "lab.nexedi.com/kirr/go123/xsync"
// "lab.nexedi.com/kirr/neo/go/internal/log"
"lab.nexedi.com/kirr/neo/go/internal/task" "lab.nexedi.com/kirr/neo/go/internal/task"
// "lab.nexedi.com/kirr/neo/go/internal/xio"
"lab.nexedi.com/kirr/neo/go/internal/xurl" "lab.nexedi.com/kirr/neo/go/internal/xurl"
"lab.nexedi.com/kirr/neo/go/internal/xzlib" "lab.nexedi.com/kirr/neo/go/internal/xzlib"
"lab.nexedi.com/kirr/neo/go/internal/xzodb" "lab.nexedi.com/kirr/neo/go/internal/xzodb"
...@@ -54,6 +51,7 @@ import ( ...@@ -54,6 +51,7 @@ import (
type Client struct { type Client struct {
node *_MasteredNode node *_MasteredNode
// Run is run under:
runWG *xsync.WorkGroup runWG *xsync.WorkGroup
runCancel func() runCancel func()
...@@ -89,16 +87,33 @@ func NewClient(clusterName, masterAddr string, net xnet.Networker) *Client { ...@@ -89,16 +87,33 @@ func NewClient(clusterName, masterAddr string, net xnet.Networker) *Client {
return c return c
} }
// Close implements zodb.IStorageDriver.
func (c *Client) Close() (err error) {
c.runCancel()
err = c.runWG.Wait()
if errors.Is(err, context.Canceled) {
err = nil // we canceled it
}
// close networker if configured to do so
if c.ownNet {
err2 := c.node.Net.Close()
if err == nil {
err = err2
}
}
return err
}
// Run starts client node and runs it until either ctx is canceled or master // Run starts client node and runs it until either ctx is canceled or master
// commands it to shutdown. (TODO verify M->shutdown) // commands it to shutdown.
func (c *Client) Run(ctx context.Context) (err error) { func (c *Client) Run(ctx context.Context) (err error) {
// run process which performs master talk // run process which performs master talk
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
c.runCancel = cancel c.runCancel = cancel
c.runWG.Go(func(runCtx context.Context) error { c.runWG.Go(func(runCtx context.Context) error {
ctx, cancel := xcontext.Merge(ctx, runCtx) // TODO -> MergeCancel ctx, cancel := xcontext.Merge(ctx, runCtx)
defer cancel() defer cancel()
return c.node.TalkMaster(ctx, func(ctx context.Context, mlink *neonet.NodeLink) error { return c.node.TalkMaster(ctx, func(ctx context.Context, mlink *neonet.NodeLink) error {
...@@ -127,66 +142,9 @@ func (c *Client) Run(ctx context.Context) (err error) { ...@@ -127,66 +142,9 @@ func (c *Client) Run(ctx context.Context) (err error) {
return c.runWG.Wait() return c.runWG.Wait()
} }
// Close implements zodb.IStorageDriver.
func (c *Client) Close() (err error) {
c.runCancel()
err = c.runWG.Wait()
if errors.Is(err, context.Canceled) {
err = nil // we canceled it
}
// close networker if configured to do so
if c.ownNet {
err2 := c.node.Net.Close()
if err == nil {
err = err2
}
}
return err
}
// syncMaster asks M for DB head right after identification.
func (c *Client) syncMaster(ctx context.Context, mlink *neonet.NodeLink) (err error) {
defer task.Running(&ctx, "sync0")(&err) // XXX unify with Sync ?
// query last_tid
lastTxn := proto.AnswerLastTransaction{}
err = mlink.Ask1(&proto.LastTransaction{}, &lastTxn)
if err != nil {
return err
}
if c.at0Initialized {
if lastTxn.Tid != c.head0 {
return fmt.Errorf("new transactions were committed while we were disconnected from master (%s -> %s)", c.head0, lastTxn.Tid)
}
} else {
// since we read lastTid, in separate protocol exchange there is a
// chance, that by the time when lastTid was read, some new transactions
// were committed. This way lastTid will be > than some first
// transactions received by watcher via "invalidateObjects" server
// notification.
//
// filter-out first < at0 messages for this reason.
//
// TODO change NEO protocol so that when C connects to M, M sends it
// current head and guarantees to send only followup invalidation
// updates.
c.at0Mu.Lock()
c.at0 = lastTxn.Tid
c.at0Initialized = true
c.flushEventq0()
c.at0Mu.Unlock()
close(c.at0Ready)
}
return nil
}
// recvMaster receives and handles notifications from master. // recvMaster receives and handles notifications from master.
func (c *Client) recvMaster(ctx context.Context) (err error) { func (c *Client) recvMaster(ctx context.Context) (err error) {
defer task.Running(&ctx, "rx")(&err) // XXX recheck vs talkMaster defer task.Running(&ctx, "rx")(&err)
for { for {
req, err := c.node.RecvM1() req, err := c.node.RecvM1()
...@@ -249,6 +207,44 @@ func (c *Client) invalidateObjects(msg *proto.InvalidateObjects) error { ...@@ -249,6 +207,44 @@ func (c *Client) invalidateObjects(msg *proto.InvalidateObjects) error {
return nil return nil
} }
// syncMaster asks M for DB head right after identification.
func (c *Client) syncMaster(ctx context.Context, mlink *neonet.NodeLink) (err error) {
defer task.Running(&ctx, "sync0")(&err) // XXX unify with Sync ?
// query last_tid
lastTxn := proto.AnswerLastTransaction{}
err = mlink.Ask1(&proto.LastTransaction{}, &lastTxn)
if err != nil {
return err
}
if c.at0Initialized {
if lastTxn.Tid != c.head0 {
return fmt.Errorf("new transactions were committed while we were disconnected from master (%s -> %s)", c.head0, lastTxn.Tid)
}
} else {
// since we read lastTid, in separate protocol exchange there is a
// chance, that by the time when lastTid was read, some new transactions
// were committed. This way lastTid will be > than some first
// transactions received by watcher via "invalidateObjects" server
// notification.
//
// filter-out first < at0 messages for this reason.
//
// TODO change NEO protocol so that when C connects to M, M sends it
// current head and guarantees to send only followup invalidation
// updates.
c.at0Mu.Lock()
c.at0 = lastTxn.Tid
c.at0Initialized = true
c.flushEventq0()
c.at0Mu.Unlock()
close(c.at0Ready)
}
return nil
}
// flushEventq0 flushes events queued in c.eventq0. // flushEventq0 flushes events queued in c.eventq0.
// must be called under .at0Mu // must be called under .at0Mu
func (c *Client) flushEventq0() { func (c *Client) flushEventq0() {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment