Commit 09915e9e authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 56999c26
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
// See COPYING file for full licensing terms. // See COPYING file for full licensing terms.
package neo package neo
// access to NEO database via ZODB interfaces // client node
import ( import (
"context" "context"
...@@ -26,6 +26,7 @@ import ( ...@@ -26,6 +26,7 @@ import (
"../zodb" "../zodb"
) )
// Client talks to NEO cluster and exposes access it via ZODB interfaces
type Client struct { type Client struct {
storLink *NodeLink // link to storage node storLink *NodeLink // link to storage node
storConn *Conn // XXX main connection to storage storConn *Conn // XXX main connection to storage
...@@ -34,7 +35,7 @@ type Client struct { ...@@ -34,7 +35,7 @@ type Client struct {
var _ zodb.IStorage = (*Client)(nil) var _ zodb.IStorage = (*Client)(nil)
func (c *Client) StorageName() string { func (c *Client) StorageName() string {
return "neo" // TODO more specific return "neo" // TODO more specific (+ cluster name, ...)
} }
func (c *Client) Close() error { func (c *Client) Close() error {
...@@ -119,22 +120,14 @@ func (c *Client) Iterate(tidMin, tidMax zodb.Tid) zodb.IStorageIterator { ...@@ -119,22 +120,14 @@ func (c *Client) Iterate(tidMin, tidMax zodb.Tid) zodb.IStorageIterator {
} }
// TODO read-only support // NewClient creates and identifies new client connected to storage over storLink
func openClientByURL(ctx context.Context, u *url.URL) (zodb.IStorage, error) { func NewClient(storLink *NodeLink) (*Client, error) {
// XXX for now url is treated as storage node URL // first identify ourselves to peer
// XXX check/use other url fields
storLink, err := Dial(ctx, "tcp", u.Host)
if err != nil {
return nil, err
}
// first identify ourselves via conn
storType, err := IdentifyMe(storLink, CLIENT) storType, err := IdentifyMe(storLink, CLIENT)
if err != nil { if err != nil {
return nil, err // XXX err ctx return nil, err // XXX err ctx
} }
if storType != STORAGE { if storType != STORAGE {
storLink.Close() // XXX err
return nil, fmt.Errorf("%v: peer is not storage (identifies as %v)", storLink, storType) return nil, fmt.Errorf("%v: peer is not storage (identifies as %v)", storLink, storType)
} }
...@@ -144,13 +137,47 @@ func openClientByURL(ctx context.Context, u *url.URL) (zodb.IStorage, error) { ...@@ -144,13 +137,47 @@ func openClientByURL(ctx context.Context, u *url.URL) (zodb.IStorage, error) {
// asking storage in parallel. At the same time creating new conn for // asking storage in parallel. At the same time creating new conn for
// every request is ok? -> not so good to create new goroutine per 1 object read // every request is ok? -> not so good to create new goroutine per 1 object read
// XXX -> server could reuse goroutines -> so not so bad ? // XXX -> server could reuse goroutines -> so not so bad ?
conn, err := storLink.NewConn() storConn, err := storLink.NewConn()
if err != nil { if err != nil {
storLink.Close() // XXX err
return nil, err // XXX err ctx ? return nil, err // XXX err ctx ?
} }
return &Client{storLink, conn}, nil return &Client{storLink, storConn}, nil
}
// TODO read-only support
func openClientByURL(ctx context.Context, u *url.URL) (zodb.IStorage, error) {
// XXX for now url is treated as storage node URL
// XXX check/use other url fields
storLink, err := Dial(ctx, "tcp", u.Host)
if err != nil {
return nil, err
}
// close storLink on error or ctx cancel
defer func() {
if err != nil {
storLink.Close()
}
}()
// XXX try to prettify this
type Result struct {*Client; error}
done := make(chan Result, 1)
go func() {
client, err := NewClient(storLink)
done <- Result{client, err}
}()
select {
case <-ctx.Done():
return nil, ctx.Err()
case r := <-done:
return r.Client, r.error
}
} }
//func Open(...) (*Client, error) { //func Open(...) (*Client, error) {
......
// Copyright (C) 2017 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Open Source Initiative approved licenses and Convey
// the resulting work. Corresponding source of such a combination shall include
// the source code for all other software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
package neo
// test interaction between nodes
import (
"context"
"testing"
)
func TestClientStorage(t *testing.T) {
nlC, nlS := nodeLinkPipe()
ctxS := context.Background()
S := NewStorage(nil) // TODO zodb.storage.mem
//Serve(ctx, l, S)
S.ServeLink(ctxS, nlS) // XXX go
C, err := NewClient(nlC)
//assert err != nil
_ = C
_ = err
}
...@@ -47,6 +47,7 @@ func Serve(ctx context.Context, l *Listener, srv Server) error { ...@@ -47,6 +47,7 @@ func Serve(ctx context.Context, l *Listener, srv Server) error {
go func() { go func() {
select { select {
case <-ctx.Done(): case <-ctx.Done():
// XXX err = cancelled
case <-retch: case <-retch:
} }
l.Close() // XXX err l.Close() // XXX err
......
...@@ -56,6 +56,7 @@ func (stor *Storage) ServeLink(ctx context.Context, link *NodeLink) { ...@@ -56,6 +56,7 @@ func (stor *Storage) ServeLink(ctx context.Context, link *NodeLink) {
select { select {
case <-ctx.Done(): case <-ctx.Done():
// XXX tell peers we are shutting down? // XXX tell peers we are shutting down?
// XXX ret err = cancelled ?
case <-retch: case <-retch:
} }
fmt.Printf("stor: %v: closing link\n", link) fmt.Printf("stor: %v: closing link\n", link)
...@@ -110,6 +111,7 @@ func (stor *Storage) ServeClient(ctx context.Context, conn *Conn) { ...@@ -110,6 +111,7 @@ func (stor *Storage) ServeClient(ctx context.Context, conn *Conn) {
select { select {
case <-ctx.Done(): case <-ctx.Done():
// XXX tell client we are shutting down? // XXX tell client we are shutting down?
// XXX ret err = cancelled ?
case <-retch: case <-retch:
} }
fmt.Printf("stor: %v: closing client conn\n", conn) fmt.Printf("stor: %v: closing client conn\n", conn)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment