Commit 887c6a2e authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent ea36e180
...@@ -80,13 +80,13 @@ package lonet ...@@ -80,13 +80,13 @@ package lonet
// After α establishes OS-level connection to β via main β address, it sends // After α establishes OS-level connection to β via main β address, it sends
// request to further establish lonet connection on top of that: // request to further establish lonet connection on top of that:
// //
// > lonet "<network>" dial <α:portα> <β:portβ> \n // > lonet "<network>" dial <α:portα> <β:portβ>\n
// //
// β checks whether portβ is listening, and if yes, accepts the connection on // β checks whether portβ is listening, and if yes, accepts the connection on
// corresponding on-β listener with giving feedback to α that connection was // corresponding on-β listener with giving feedback to α that connection was
// accepted: // accepted:
// //
// < lonet "<network>" connected <β:portβ'> \n // < lonet "<network>" connected <β:portβ'>\n
// //
// After that connection is considered to be lonet-established and all further // After that connection is considered to be lonet-established and all further
// exchange on it is directly controlled by corresponding lonet-level // exchange on it is directly controlled by corresponding lonet-level
...@@ -94,7 +94,7 @@ package lonet ...@@ -94,7 +94,7 @@ package lonet
// //
// If, on the other hand, lonet-level connection cannot be established, β replies: // If, on the other hand, lonet-level connection cannot be established, β replies:
// //
// < lonet "<networkβ>" E "<error>" \n // < lonet "<networkβ>" E "<error>"\n
// //
// where <error> could be: // where <error> could be:
// //
...@@ -105,6 +105,7 @@ package lonet ...@@ -105,6 +105,7 @@ package lonet
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"io" "io"
"io/ioutil" "io/ioutil"
...@@ -121,7 +122,12 @@ import ( ...@@ -121,7 +122,12 @@ import (
const NetPrefix = "lonet" // lonet package creates only "lonet*" networks const NetPrefix = "lonet" // lonet package creates only "lonet*" networks
// XXX errors var (
// errNetClosed = errors.New("network connection closed")
// errAddrAlreadyUsed = errors.New("address already in use")
// errAddrNoListen = errors.New("cannot listen on requested address")
errConnRefused = errors.New("connection refused")
)
// Addr represents address of a lonet endpoint. // Addr represents address of a lonet endpoint.
type Addr struct { type Addr struct {
...@@ -204,9 +210,9 @@ type listener struct { ...@@ -204,9 +210,9 @@ type listener struct {
// it comes after OS-level connection was accepted and lonet dial already // it comes after OS-level connection was accepted and lonet dial already
// request parsed locally. // request parsed locally.
type dialReq struct { type dialReq struct {
from Addr from *Addr
osconn net.Conn osconn net.Conn
resp chan Addr // accepted with this local address resp chan *Addr // accepted with this local address
} }
// ---------------------------------------- // ----------------------------------------
...@@ -303,7 +309,9 @@ func (h *Host) Listen(laddr string) (net.Listener, error) { ...@@ -303,7 +309,9 @@ func (h *Host) Listen(laddr string) (net.Listener, error) {
panic("TODO") panic("TODO")
} }
// XXX // serve serves incomming OS-level connection to this subnetwork.
//
// for every accepted connection lonet-handshake is initiated.
func (n *SubNetwork) serve() { // XXX error? func (n *SubNetwork) serve() { // XXX error?
// wait for incoming OS connections and do lonet protocol handshake on them. // wait for incoming OS connections and do lonet protocol handshake on them.
// if successful - route handshaked connection to particular Host's listener. // if successful - route handshaked connection to particular Host's listener.
...@@ -325,37 +333,67 @@ func (n *SubNetwork) serve() { // XXX error? ...@@ -325,37 +333,67 @@ func (n *SubNetwork) serve() { // XXX error?
// loaccept handles incoming OS-level connection. // loaccept handles incoming OS-level connection.
// //
// it performs lonet protocol handshake and if successful further conveys // It performs lonet protocol handshake and if successful further conveys
// accepted connection to lonet-level Accept. // accepted connection to lonet-level Accept.
//
// If handshake is not successfull the connection is closed.
func (n *SubNetwork) loaccept(osconn net.Conn) (err error) { func (n *SubNetwork) loaccept(osconn net.Conn) (err error) {
// XXX also cancel handshake on ctx down
defer func() {
if err != nil {
osconn.Close()
}
}()
defer xerr.Contextf(&err, "lonet %q: handshake", n.network) defer xerr.Contextf(&err, "lonet %q: handshake", n.network)
// read handshake line and parse it // read handshake line and parse it
// XXX cancel handshake on ctx down line, err := readline(osconn, 1024) // limit line length not to cause memory dos
line, err := readline(osconn, maxlen)
if err != nil { if err != nil {
return err return err
} }
// replyf performs formatted reply to osconn.
// the error returned is for result of osconn.Write.
replyf := func(format string, argv ...interface{}) error {
line := fmt.Sprintf("< lonet %q " + format + "\n",
append([]interface{}{n.network}, argv...))
_, err := osconn.Write([]byte(line))
return err
}
// ereply performs error reply to osconn.
// for convenience returned error is the error itself, not the
// error returned from osconn.Write.
ereply := func(err error) error {
replyf("E %q", err) // ignore osconn.Write error
return err
}
// ereplyf performs formatted error reply to osconn.
// for convenience returned error is the error text built, not the
// error returned from osconn.Write.
ereplyf := func(format string, argv ...interface{}) error {
return ereply(fmt.Errorf(format, argv...))
}
var network, src, dst string var network, src, dst string
_, err = fmt.Sscanf(line, "> lonet %q dial %s %s\n", &network, &src, &dst) _, err = fmt.Sscanf(line, "> lonet %q dial %s %s\n", &network, &src, &dst)
if err != nil { if err != nil {
ereply("protocol error") return ereplyf("protocol error")
return err
} }
if network != n.network { if network != n.network {
ereply("network mismatch") return ereplyf("network mismatch")
return fmt.Errorf("network mismatch")
} }
asrc, err := ParseAddr(src) asrc, err := n.parseAddr(src)
if err != nil { if err != nil {
return ereply("src address invalid") return ereplyf("src address invalid")
} }
adst, err := ParseAddr(dst) adst, err := n.parseAddr(dst)
if err != nil { if err != nil {
return ereply("dst address invalid") return ereplyf("dst address invalid")
} }
// check dst host:port in .hostMap // check dst host:port in .hostMap
...@@ -364,37 +402,39 @@ func (n *SubNetwork) loaccept(osconn net.Conn) (err error) { ...@@ -364,37 +402,39 @@ func (n *SubNetwork) loaccept(osconn net.Conn) (err error) {
if host == nil || adst.Port >= len(host.socketv) { if host == nil || adst.Port >= len(host.socketv) {
n.mu.Unlock() n.mu.Unlock()
return ereplyf(errConnRefused) return ereply(errConnRefused)
} }
sk := host.socketv[adst.Port] sk := host.socketv[adst.Port]
if sk == nil || sk.listener == nil { if sk == nil || sk.listener == nil {
n.mu.Unlock() n.mu.Unlock()
return ereplyf(errConnRefused) return ereply(errConnRefused)
} }
// there is listener corresponding to dst - let's connect it // there is listener corresponding to dst - let's connect it
l := sk.listener l := sk.listener
n.mu.Unlock() n.mu.Unlock()
resp := make(chan XXX) resp := make(chan *Addr)
select { select {
//case <-ctx.Done(): //case <-ctx.Done():
// ... // ...
case <-l.down: case <-l.down:
return ereplyf(errConnRefused) return ereply(errConnRefused)
case l.dialq <- dialReq{from: asrc, osconn: osconn, resp: resp}: case l.dialq <- dialReq{from: asrc, osconn: osconn, resp: resp}:
// connection accepted // connection accepted
acceptAddr := <-resp acceptAddr := <-resp
// XXX ignore err - it will be seen at lonet level at further read/write?
replyf("connected %s", acceptAddr) replyf("connected %s", acceptAddr)
return nil
} }
} }
// readline reads 1 line from r up to maxlen bytes. // readline reads 1 line from r up to maxlen bytes.
func readline(r io.Reader, maxlen int) (string, _ error) { func readline(r io.Reader, maxlen int) (string, error) {
buf1 := []byte{0} buf1 := []byte{0}
var line []byte var line []byte
for len(line) < maxlen { for len(line) < maxlen {
...@@ -406,7 +446,7 @@ func readline(r io.Reader, maxlen int) (string, _ error) { ...@@ -406,7 +446,7 @@ func readline(r io.Reader, maxlen int) (string, _ error) {
if err == io.EOF { if err == io.EOF {
err = io.ErrUnexpectedEOF err = io.ErrUnexpectedEOF
} }
return nil, err return string(line), err
} }
line = append(line, buf1...) line = append(line, buf1...)
...@@ -475,7 +515,18 @@ func (sk *socket) addr() *Addr { ...@@ -475,7 +515,18 @@ func (sk *socket) addr() *Addr {
func (a *Addr) Network() string { return a.Net } func (a *Addr) Network() string { return a.Net }
func (a *Addr) String() string { return net.JoinHostPort(a.Host, strconv.Itoa(a.Port)) } func (a *Addr) String() string { return net.JoinHostPort(a.Host, strconv.Itoa(a.Port)) }
// XXX ParseAddr // parseAddr parses addr into lonet address.
func (n *SubNetwork) parseAddr(addr string) (*Addr, error) {
host, portstr, err := net.SplitHostPort(addr)
if err != nil {
return nil, err
}
port, err := strconv.Atoi(portstr)
if err != nil || port < 0 {
return nil, &net.AddrError{Err: "invalid port", Addr: addr}
}
return &Addr{Net: n.Network(), Host: host, Port: port}, nil
}
// Addr returns address where listener is accepting incoming connections. // Addr returns address where listener is accepting incoming connections.
func (l *listener) Addr() net.Addr { func (l *listener) Addr() net.Addr {
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
# See https://www.nexedi.com/licensing for rationale and options. # See https://www.nexedi.com/licensing for rationale and options.
# TODO doc, code # TODO doc, code
# XXX see lonet.go for lonet organization and protocol.
class SQLiteRegistry(object): class SQLiteRegistry(object):
......
...@@ -19,6 +19,13 @@ ...@@ -19,6 +19,13 @@
package lonet package lonet
import (
"context"
"testing"
"lab.nexedi.com/kirr/go123/exc"
)
// TODO test go-go // TODO test go-go
// TODO test go-py // TODO test go-py
...@@ -27,3 +34,19 @@ package lonet ...@@ -27,3 +34,19 @@ package lonet
// - econnrefused (no such host, port not listening) // - econnrefused (no such host, port not listening)
// - network mismatch // - network mismatch
// - invalid request // - invalid request
func TestLonet(t *testing.T) {
X := exc.Raiseif
ctx := context.Background()
subnet, err := Join(ctx, "")
X(err)
// XXX defer shutdown/rm this lonet fully?
, err := subnet.NewHost(ctx, "α")
X(err)
, err := subnet.NewHost(ctx, "β")
X(err)
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment