Commit 78681a6e authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 529c4666
...@@ -100,7 +100,8 @@ func (c *Client) LastOid() (zodb.Oid, error) { ...@@ -100,7 +100,8 @@ func (c *Client) LastOid() (zodb.Oid, error) {
} }
func (c *Client) Load(xid zodb.Xid) (data []byte, tid zodb.Tid, err error) { func (c *Client) Load(xid zodb.Xid) (data []byte, tid zodb.Tid, err error) {
// XXX check pt is operational first? // XXX check pt is operational first? -> no if there is no data - we'll
// just won't find ready cell
cellv := c.node.PartTab.Get(xid.Oid) cellv := c.node.PartTab.Get(xid.Oid)
// XXX cellv = filter(cellv, UP_TO_DATE) // XXX cellv = filter(cellv, UP_TO_DATE)
cell := cellv[rand.Intn(len(cellv))] cell := cellv[rand.Intn(len(cellv))]
...@@ -110,15 +111,7 @@ func (c *Client) Load(xid zodb.Xid) (data []byte, tid zodb.Tid, err error) { ...@@ -110,15 +111,7 @@ func (c *Client) Load(xid zodb.Xid) (data []byte, tid zodb.Tid, err error) {
} }
// XXX check stor.State == RUNNING // XXX check stor.State == RUNNING
//Slink := c.Connect(stor) // single-flight Dial; puts result into stor.Link (XXX ok?) Sconn, err := stor.Conn()
//Slink := stor.Connect() // single-flight Dial; puts result into stor.Link (XXX ok?)
Slink := stor.Link // XXX stub
// TODO maintain conn pool so every new GetObject request does not
// spawn new goroutine on server
// Sconn = stor.GetConn()
// XXX defer if ok stor.PutConn(Sconn)
Sconn, err := Slink.NewConn()
if err != nil { if err != nil {
panic(0) // XXX panic(0) // XXX
} }
......
...@@ -111,14 +111,14 @@ var ErrClosedConn = errors.New("connection is closed") ...@@ -111,14 +111,14 @@ var ErrClosedConn = errors.New("connection is closed")
// (think from point of view how user should be handling errors) // (think from point of view how user should be handling errors)
// XXX or it is good to be able to distinguish between only conn error vs whole-link error? // XXX or it is good to be able to distinguish between only conn error vs whole-link error?
// LinkError is usually returned by NodeLink operations // LinkError is returned by NodeLink operations
type LinkError struct { type LinkError struct {
Link *NodeLink Link *NodeLink
Op string Op string
Err error Err error
} }
// ConnError is usually returned by Conn operations // ConnError is returned by Conn operations
type ConnError struct { type ConnError struct {
Conn *Conn Conn *Conn
Op string Op string
...@@ -277,10 +277,11 @@ func (nl *NodeLink) shutdown() { ...@@ -277,10 +277,11 @@ func (nl *NodeLink) shutdown() {
} }
// Close closes node-node link. // Close closes node-node link.
//
// All blocking operations - Accept and IO on associated connections // All blocking operations - Accept and IO on associated connections
// established over node link - are automatically interrupted with an error. // established over node link - are automatically interrupted with an error.
// Underlying raw connection is closed. // Underlying raw connection is closed.
// It is safe to call Close several times // It is safe to call Close several times.
func (nl *NodeLink) Close() error { func (nl *NodeLink) Close() error {
atomic.StoreUint32(&nl.closed, 1) atomic.StoreUint32(&nl.closed, 1)
nl.shutdown() nl.shutdown()
...@@ -344,7 +345,7 @@ func (c *Conn) Close() error { ...@@ -344,7 +345,7 @@ func (c *Conn) Close() error {
// ---- receive ---- // ---- receive ----
// Accept waits for and accepts incoming connection on top of node-node link // Accept waits for and accepts incoming connection on top of node-node link.
func (nl *NodeLink) Accept() (c *Conn, err error) { func (nl *NodeLink) Accept() (c *Conn, err error) {
defer func() { defer func() {
if err != nil { if err != nil {
......
...@@ -79,8 +79,6 @@ type NodeTable struct { ...@@ -79,8 +79,6 @@ type NodeTable struct {
//storv []*Node // storages //storv []*Node // storages
nodev []*Node // all other nodes -> *Peer nodev []*Node // all other nodes -> *Peer
notifyv []chan NodeInfo // subscribers notifyv []chan NodeInfo // subscribers
//ver int // ↑ for versioning XXX do we need this?
} }
...@@ -92,27 +90,29 @@ const δtRedial = 3 * time.Second ...@@ -92,27 +90,29 @@ const δtRedial = 3 * time.Second
// Peer represents a peer node in the cluster. // Peer represents a peer node in the cluster.
type Peer struct { type Peer struct {
NodeInfo // .uuid, .addr, ... NodeInfo // .type, .addr, .uuid, ... XXX also protect by mu?
// link to this peer
linkMu sync.Mutex linkMu sync.Mutex
link *NodeLink // link to peer or nil if not connected link *NodeLink // link to this peer; nil if not connected
dialT time.Time // dialing finished at this time dialT time.Time // last dial finished at this time
// dialer notifies waiters via this; reinitialized at each redial; nil while not dialing // dialer notifies waiters via this; reinitialized at each redial; nil while not dialing
// //
// NOTE duplicates .link to have the following properties: // NOTE duplicates .link to have the following properties:
// //
// 1. all waiters of current in-progress dial wakup immediately after // 1. all waiters of current in-progress dial wakeup immediately after
// dial completes and get link/error from dial result. // dial completes and get link/error from dial result.
// //
// 2. any .Connect() that sees .link=nil starts new redial with throttle // 2. any .Link() that sees .link=nil starts new redial with throttle
// to make sure peer is dialed not faster than δtRedial. // to make sure peer is dialed not faster than δtRedial.
// //
// (if we do not have dialing.link waiter will need to relock // (if we do not have dialing.link waiter will need to relock
// peer.linkMu and for some waiters chances are another .Connect() // peer.linkMu and for some waiters chances are another .Link()
// already started redialing and they will have to wait again) // already started redialing and they will have to wait again)
dialing *dialed dialing *dialed
// // live connection pool that user provided back here via .PutConn()
// connPool []*Conn
} }
// dialed is result of dialing a peer. // dialed is result of dialing a peer.
...@@ -122,12 +122,18 @@ type dialed struct { ...@@ -122,12 +122,18 @@ type dialed struct {
ready chan struct{} ready chan struct{}
} }
// Connect returns link to this peer. XXX -> DialLink ? // Link returns link to the peer.
// //
// If the link was not yet established Connect dials the peer appropriately, // If the link was not yet established Link dials the peer appropriately,
// handshakes, requests identification and checks that identification reply is // handshakes, requests identification and checks that identification reply is
// as expected. // as expected.
func (p *Peer) Connect(ctx context.Context) (*NodeLink, error) { //
// Several Link calls may be done in parallel - in any case only 1 link-level
// dial will be made and others will share established link.
//
// In case Link returns an error - future Link will attempt to reconnect with
// "don't reconnect too fast" throttling.
func (p *Peer) Link(ctx context.Context) (*NodeLink, error) {
p.linkMu.Lock() p.linkMu.Lock()
// ok if already connected // ok if already connected
...@@ -193,6 +199,59 @@ func (p *Peer) Connect(ctx context.Context) (*NodeLink, error) { ...@@ -193,6 +199,59 @@ func (p *Peer) Connect(ctx context.Context) (*NodeLink, error) {
return dialing.link, dialing.err return dialing.link, dialing.err
} }
// Conn returns conn to the peer.
//
// If there is no link established - conn first dials peer (see Link).
//
// For established link Conn either creates new connection over the link,
// XXX (currently inactive) or gets one from the pool of unused connections (see PutConn).
func (p *Peer) Conn(ctx context.Context) (*Conn, error) {
var err error
/*
p.linkMu.Lock()
if l := len(p.connPool); l > 0 {
conn := p.connPool[l-1]
p.connPool = p.connPool[:l-1]
p.linkMu.Unlock()
return conn, nil
}
*/
// connection poll is empty - let's create new connection from .link
link := p.link
p.linkMu.Unlock()
// we might need to (re)dial
if link == nil {
link, err = p.Link(ctx)
if err != nil {
return nil, err
}
}
return link.NewConn()
}
/*
// PutConn saves c in the pool of unused connections.
//
// Since connections saved into pool can be reused by other code, after
// PutConn call the caller must not use the connection directly.
//
// PutConn ignores connections not created for current peer link.
func (p *Peer) PutConn(c *Conn) {
p.linkMu.Lock()
// NOTE we can't panic on p.link != c.Link() - reason is: p.link can change on redial
if p.link == c.Link() {
p.connPool = append(p.connPool, c)
}
p.linkMu.Unlock()
}
*/
// XXX dial does low-level work to dial peer // XXX dial does low-level work to dial peer
// XXX p.* reading without lock - ok? // XXX p.* reading without lock - ok?
...@@ -210,11 +269,13 @@ func (p *Peer) dial(ctx context.Context) (*NodeLink, error) { ...@@ -210,11 +269,13 @@ func (p *Peer) dial(ctx context.Context) (*NodeLink, error) {
switch { switch {
case accept.NodeType != p.Type: case accept.NodeType != p.Type:
err = fmt.Errorf("connected, but peer is not %v (identifies as %v)", p.Type, accept.NodeType) err = fmt.Errorf("connected, but peer is not %v (identifies as %v)", p.Type, accept.NodeType)
case accept.MyUUID != p.UUID: case accept.MyUUID != p.UUID:
err = fmt.Errorf("connected, but peer's uuid is not %v (identifies as %v)", p.UUID, accept.MyUUID) err = fmt.Errorf("connected, but peer's uuid is not %v (identifies as %v)", p.UUID, accept.MyUUID)
case accept.YourUUID != me.MyInfo.UUID: case accept.YourUUID != me.MyInfo.UUID:
err = fmt.Errorf("connected, but peer gives us uuid %v (our is %v)", accept.YourUUID, me.MyInfo.UUID) err = fmt.Errorf("connected, but peer gives us uuid %v (our is %v)", accept.YourUUID, me.MyInfo.UUID)
case !(accept.NumPartitions == 1 && accept.NumReplicas == 1): case !(accept.NumPartitions == 1 && accept.NumReplicas == 1):
err = fmt.Errorf("connected but TODO peer works with ! 1x1 partition table.") err = fmt.Errorf("connected but TODO peer works with ! 1x1 partition table.")
} }
...@@ -230,6 +291,21 @@ func (p *Peer) dial(ctx context.Context) (*NodeLink, error) { ...@@ -230,6 +291,21 @@ func (p *Peer) dial(ctx context.Context) (*NodeLink, error) {
/* XXX closing .link on .state = DOWN
func (p *Peer) SetState(state NodeState) {
// XXX lock?
p.State = state
traceNodeChanged(nt, node)
if state == DOWN {
if p.link != nil {
lclose(ctx, p.link)
p.link = nil
// XXX clear p.connPool
}
}
nt.notify(node.NodeInfo)
}
*/
......
...@@ -476,6 +476,33 @@ func (stor *Storage) withWhileOperational(ctx context.Context) (context.Context, ...@@ -476,6 +476,33 @@ func (stor *Storage) withWhileOperational(ctx context.Context) (context.Context,
return xcontext.Merge(ctx, opCtx) return xcontext.Merge(ctx, opCtx)
} }
func (stor *Storage) serveClient(ctx context.Context, conn *neo.Conn) {
log.Infof(ctx, "%s: serving new client conn", conn) // XXX -> running?
// rederive ctx to be also cancelled if M tells us StopOperation
ctx, cancel := stor.withWhileOperational(ctx)
defer cancel()
link := conn.Link()
for {
err := stor.serveClient1(ctx, conn)
if err != nil {
return err
}
lclose(conn)
// keep on going in the same goroutine to avoid goroutine creation overhead
// TODO Accept += timeout, go away if inactive
conn, err = link.Accept(ctx)
if err != nil {
// lclose(link) XXX ?
return err
}
}
}
// serveClient serves incoming connection on which peer identified itself as client // serveClient serves incoming connection on which peer identified itself as client
// the connection is closed when serveClient returns // the connection is closed when serveClient returns
// XXX +error return? // XXX +error return?
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment