Commit a01ce812 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 1bbebed5
// Copyright (C) 2018 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
// Package lonet provides TCP network simulated on top of localhost TCP loopback.
//
// For testing distributed systems it is sometimes handy to imitate network of
// several TCP hosts. It is also handy that ports allocated on Dial/Listen/Accept on
// that hosts be predictable - that would help tests to verify network events
// against expected sequence. When whole system could be imitated in 1 OS-level
// process, package lab.nexedi.com/kirr/go123/xnet/pipenet serves the task via
// providing TCP-like synchronous in-memory network of net.Pipes. However
// pipenet cannot be used for cases where tested system consists of 2 or more
// OS-level processes. This is where lonet comes into play:
//
// Similarly to pipenet addresses on lonet are host:port pairs and several
// hosts could be created with different names. A host is xnet.Networker and
// so can be worked with similarly to regular TCP network access-point with
// Dial/Listen/Accept. Host's ports allocation is predictable: ports of a host
// are contiguous integer sequence starting from 1 that are all initially free,
// and whenever autobind is requested the first free port of the host will be
// used.
//
// Internally lonet network maintains registry of hosts so that lonet
// addresses could be resolved to OS-level addresses, for example α:1 and β:1
// to 127.0.0.1:4567 and 127.0.0.1:8765, and once lonet connection is
// established it becomes served by OS-level TCP connection over loopback.
//
// Example:
//
// net, err := lonet.Join(ctx, "mynet")
// hα, err := net.NewHost(ctx, "α")
// hβ, err := net.NewHost(ctx, "β")
//
// // starts listening on address "α:10"
// l, err := hα.Listen(":10")
// go func() {
// csrv, err := l.Accept() // csrv will have LocalAddr "α:11"
// }()
// ccli, err := hβ.Dial(ctx, "α:10") // ccli will be connection between "β:1" - "α:11"
//
// Once again lonet is similar to pipenet, but since it works via OS TCP stack
// it could be handy for testing networked application when there are several
// OS-level processes involved.
//
// See also shipped lonet.py for accessing lonet networks from Python.
package lonet
// Lonet organization
//
// For every lonet network there is a registry with information about hosts
// available on the network, and for each host its OS-level listening address.
// The registry is kept as SQLite database under
//
// /<tmp>/lonet/<network>/registry.db
//
// Whenever host α needs to establish connection to address on host β, it
// queries the registry for β and further talks to β on that address.
// Correspondingly when a host joins the network, it announces itself to the
// registry so that other hosts could see it.
//
//
// Handshake protocol
//
// After α establishes OS-level connection to β via main β address, it sends
// request to further establish lonet connection on top of that:
//
// > lonet "<network>" dial <α:portα> <β:portβ>\n
//
// β checks whether portβ is listening, and if yes, accepts the connection on
// corresponding on-β listener with giving feedback to α that connection was
// accepted:
//
// < lonet "<network>" connected <β:portβ'>\n
//
// After that connection is considered to be lonet-established and all further
// exchange on it is directly controlled by corresponding lonet-level
// Read/Write on α and β.
//
// If, on the other hand, lonet-level connection cannot be established, β replies:
//
// < lonet "<networkβ>" E "<error>"\n
//
// where <error> could be:
//
// - connrefused if <β:portβ> is not listening
// - network mismatch if β thinks it works on different lonet network than α
// - protocol error if β thinks that α send incorrect dial request
// - ...
import (
"context"
stderrors "errors"
"fmt"
"io"
"io/ioutil"
"log"
"net"
"os"
"path/filepath"
"strconv"
"sync"
"github.com/pkg/errors"
"lab.nexedi.com/kirr/go123/xerr"
"lab.nexedi.com/kirr/go123/xnet"
)
const NetPrefix = "lonet" // lonet package creates only "lonet*" networks
var (
// errNetClosed = stderrors.New("network connection closed")
errAddrAlreadyUsed = stderrors.New("address already in use")
errAddrNoListen = stderrors.New("cannot listen on requested address")
errConnRefused = stderrors.New("connection refused")
)
// Addr represents address of a lonet endpoint.
type Addr struct {
Net string // full network name, e.g. "lonet"
Host string // name of host access point on the network
Port int // port on host
}
// SubNetwork represents one subnetwork of a lonet network.
//
// Multiple Hosts could be created on one subnetwork.
// There can be other subnetworks in the same process or in another OS-level processes.
//
// Host names are unique through whole lonet network.
type SubNetwork struct {
// name of full network under "lonet" namespace -> e.g. ""
// full network name will be reported as "lonet"+network.
network string
// registry of whole lonet network
registry registry
// OS-level listener of this subnetwork.
// whenever connection to subnet's host is tried to be established it goes here.
oslistener net.Listener
// big subnetwork lock for everything dynamic under SubNetwork
// (e.g. Host.socketv too)
mu sync.Mutex
hostMap map[string]*Host
}
// Host represents named access point on Network XXX
type Host struct {
subnet *SubNetwork
name string
// NOTE protected by subnet.mu
socketv []*socket // port -> listener | conn ; [0] is always nil
}
var _ xnet.Networker = (*Host)(nil)
// socket represents one endpoint entry on Host.
//
// it can be either already connected or listening.
type socket struct {
host *Host // host/port this socket is bound to
port int
conn *conn // connection endpoint is here if != nil
listener *listener // listener is waiting here if != nil
}
// conn represents one endpoint of connection created under lonet network.
type conn struct {
socket *socket // local socket
peerAddr *Addr // lonet address of the remote side of this connection
net.Conn
closeOnce sync.Once
}
// listener implements net.Listener for Host.
type listener struct {
// subnetwork/host/port we are listening on
socket *socket
dialq chan dialReq // Dial requests to our port go here from OS-level listener
down chan struct{} // Close -> down=ready
closeOnce sync.Once
}
// dialReq represents one dial request to listener.
//
// it comes after OS-level connection was accepted and lonet dial already
// request parsed locally.
type dialReq struct {
from *Addr
osconn net.Conn
resp chan *Addr // accepted with this local address
}
// ----------------------------------------
// Join joins or creates new lonet network with given name.
//
// Network is the name of this network under "lonet" namespace, e.g. "α" will
// give full network name "lonetα".
//
// If network is "" new network with random unique name will be created.
//
// Join returns new subnetwork on the joined network.
func Join(ctx context.Context, network string) (_ *SubNetwork, err error) {
defer xerr.Contextf(&err, "lonet: join %q", network)
// create/join registry under /tmp/lonet/<network>/registry.db
lonet := os.TempDir() + "/lonet"
err = os.MkdirAll(lonet, 0777 | os.ModeSticky)
if err != nil {
return nil, err
}
var netdir string
if network != "" {
netdir = lonet + "/" + network
err = os.MkdirAll(netdir, 0700)
} else {
// new with random name
netdir, err = ioutil.TempDir(lonet, "")
network = filepath.Base(netdir)
}
if err != nil {
return nil, err
}
registry, err := openRegistrySQLite(ctx, netdir + "/registry.db", network)
if err != nil {
return nil, err
}
// start OS listener
oslistener, err := net.Listen("tcp4", "127.0.0.1:")
if err != nil {
registry.Close() // XXX lclose?
return nil, err
}
// joined ok
n := &SubNetwork{
network: network,
registry: registry,
oslistener: oslistener,
hostMap: make(map[string]*Host),
}
go n.serve()
return n, nil
}
// NewHost creates new lonet Host with given name.
//
// Serving of created host will be done though SubNetwork via which it was
// created. XXX
//
// XXX errors
// errHostDup if host name was already registered.
func (n *SubNetwork) NewHost(ctx context.Context, name string) (*Host, error) {
// announce host name to registry
err := n.registry.Announce(ctx, name, n.oslistener.Addr().String())
if err != nil {
return nil, err // registry error has enough context
}
// announced ok -> host can be created
n.mu.Lock()
defer n.mu.Unlock()
if n.hostMap[name] != nil {
panic(fmt.Sprintf(
"lonet %q: new host %q: announced to registry but .hostMap already !empty",
n.Network(), name))
}
host := &Host{subnet: n, name: name}
n.hostMap[name] = host
return host, nil
}
// XXX Host.resolveAddr
// Listen starts new listener on the host.
//
// It either allocates free port if laddr is "" or with 0 port, or binds to laddr.
// Once listener is started, Dials could connect to listening address.
// Connection requests created by Dials could be accepted via Accept.
func (h *Host) Listen(laddr string) (_ net.Listener, err error) {
h.subnet.mu.Lock()
defer h.subnet.mu.Unlock()
var sk *socket
if laddr == "" {
laddr = ":0"
}
var netladdr net.Addr
defer func() {
if err != nil {
err = &net.OpError{Op: "listen", Net: h.Network(), Addr: netladdr, Err: err}
}
}()
// NOTE cannot use full resolvaAddr here - it talks to registry and is blocking
a, err := h.subnet.parseAddr(laddr)
return nil, err
// local host if host name omitted
if a.Host == "" {
a.Host = h.name
}
netladdr = a
if a.Host != h.name {
return nil, errAddrNoListen
}
// find first free port if autobind requested
if a.Port == 0 {
sk = h.allocFreeSocket()
// else allocate socket in-place
} else {
// grow if needed
for a.Port >= len(h.socketv) {
h.socketv = append(h.socketv, nil)
}
if h.socketv[a.Port] != nil {
return nil, errAddrAlreadyUsed
}
sk = &socket{host: h, port: a.Port}
h.socketv[a.Port] = sk
}
// create listener under socket
l := &listener{
socket: sk,
dialq: make(chan dialReq),
down: make(chan struct{}),
}
sk.listener = l
return l, nil
}
// serve serves incomming OS-level connection to this subnetwork.
//
// for every accepted connection lonet-handshake is initiated.
func (n *SubNetwork) serve() { // XXX error?
// wait for incoming OS connections and do lonet protocol handshake on them.
// if successful - route handshaked connection to particular Host's listener.
for {
osconn, err := n.oslistener.Accept()
if err != nil {
// XXX mark subnet as down + notify all its hosts
return
}
go func() {
err := n.loaccept(osconn) // XXX + ctx?
if err != nil && errors.Cause(err) != errConnRefused {
log.Print(err) // XXX ok?
}
}()
}
}
// loaccept handles incoming OS-level connection.
//
// It performs lonet protocol handshake and if successful further conveys
// accepted connection to lonet-level Accept.
//
// If handshake is not successfull the connection is closed.
func (n *SubNetwork) loaccept(osconn net.Conn) (err error) {
// XXX also cancel handshake on ctx down
defer func() {
if err != nil {
osconn.Close()
}
}()
defer xerr.Contextf(&err, "lonet %q: accept", n.network)
// read handshake line and parse it
line, err := readline(osconn, 1024) // limit line length not to cause memory dos
if err != nil {
return err
}
// replyf performs formatted reply to osconn.
// the error returned is for result of osconn.Write.
replyf := func(format string, argv ...interface{}) error {
line := fmt.Sprintf("< lonet %q " + format + "\n",
append([]interface{}{n.network}, argv...))
_, err := osconn.Write([]byte(line))
return err
}
// ereply performs error reply to osconn.
// for convenience returned error is the error itself, not the
// error returned from osconn.Write.
ereply := func(err error) error {
replyf("E %q", err) // ignore osconn.Write error
return err
}
// ereplyf performs formatted error reply to osconn.
// for convenience returned error is the error text built, not the
// error returned from osconn.Write.
ereplyf := func(format string, argv ...interface{}) error {
return ereply(fmt.Errorf(format, argv...))
}
var network, src, dst string
_, err = fmt.Sscanf(line, "> lonet %q dial %s %s\n", &network, &src, &dst)
if err != nil {
return ereplyf("protocol error")
}
if network != n.network {
return ereplyf("network mismatch")
}
asrc, err := n.parseAddr(src)
if err != nil {
return ereplyf("src address invalid")
}
adst, err := n.parseAddr(dst)
if err != nil {
return ereplyf("dst address invalid")
}
defer xerr.Contextf(&err, "%s -> %s", src, dst)
// check dst host:port in .hostMap
n.mu.Lock()
host := n.hostMap[adst.Host]
if host == nil || adst.Port >= len(host.socketv) {
n.mu.Unlock()
return ereply(errConnRefused)
}
sk := host.socketv[adst.Port]
if sk == nil || sk.listener == nil {
n.mu.Unlock()
return ereply(errConnRefused)
}
// there is listener corresponding to dst - let's connect it
l := sk.listener
n.mu.Unlock()
resp := make(chan *Addr)
select {
//case <-ctx.Done():
// ...
case <-l.down:
return ereply(errConnRefused)
case l.dialq <- dialReq{from: asrc, osconn: osconn, resp: resp}:
// connection accepted
acceptAddr := <-resp
// XXX ignore err - it will be seen at lonet level at further read/write?
replyf("connected %s", acceptAddr)
return nil
}
}
// readline reads 1 line from r up to maxlen bytes.
func readline(r io.Reader, maxlen int) (string, error) {
buf1 := []byte{0}
var line []byte
for len(line) < maxlen {
n, err := r.Read(buf1)
if n == 1 {
err = nil
}
if err != nil {
if err == io.EOF {
err = io.ErrUnexpectedEOF
}
return string(line), err
}
line = append(line, buf1...)
if buf1[0] == '\n' {
break
}
}
return string(line), nil
}
// XXX
func (h *Host) Dial(ctx context.Context, addr string) (net.Conn, error) {
panic("TODO")
}
// Close closes network endpoint and unregisters conn from Host.
//
// All currently in-flight blocked IO is interrupted with an error
func (c *conn) Close() (err error) {
c.closeOnce.Do(func() {
err = c.Conn.Close()
sk := c.socket
h := sk.host
n := h.subnet
n.mu.Lock()
defer n.mu.Unlock()
sk.conn = nil
if sk.empty() {
h.socketv[sk.port] = nil
}
})
return err
}
// LocalAddr returns address of local end of connection.
func (c *conn) LocalAddr() net.Addr {
return c.socket.addr()
}
// RemoteAddr returns address of remote end of connection.
func (c *conn) RemoteAddr() net.Addr {
return c.peerAddr
}
// ----------------------------------------
// allocFreeSocket finds first free port and allocates socket entry for it.
//
// must be called with SubNetwork.mu held
func (h *Host) allocFreeSocket() *socket {
// find first free port
port := 1 // never allocate port 0 - it is used for autobind on listen only
for ; port < len(h.socketv); port++ {
if h.socketv[port] == nil {
break
}
}
// if all busy it exits with port >= len(h.socketv)
// grow if needed
for port >= len(h.socketv) {
h.socketv = append(h.socketv, nil)
}
sk := &socket{host: h, port: port}
h.socketv[port] = sk
return sk
}
// empty checks whether socket's both conn and listener are all nil.
// XXX recheck we really need this.
func (sk *socket) empty() bool {
return sk.conn == nil && sk.listener == nil
}
// addr returns address corresponding to socket.
func (sk *socket) addr() *Addr {
h := sk.host
return &Addr{Net: h.Network(), Host: h.name, Port: sk.port}
}
func (a *Addr) Network() string { return a.Net }
func (a *Addr) String() string { return net.JoinHostPort(a.Host, strconv.Itoa(a.Port)) }
// parseAddr parses addr into lonet address.
func (n *SubNetwork) parseAddr(addr string) (*Addr, error) {
host, portstr, err := net.SplitHostPort(addr)
if err != nil {
return nil, err
}
port, err := strconv.Atoi(portstr)
if err != nil || port < 0 {
return nil, &net.AddrError{Err: "invalid port", Addr: addr}
}
return &Addr{Net: n.Network(), Host: host, Port: port}, nil
}
// Addr returns address where listener is accepting incoming connections.
func (l *listener) Addr() net.Addr {
return l.socket.addr()
}
// Network returns full network name this subnetwork is part of.
func (n *SubNetwork) Network() string { return NetPrefix + n.network }
// Network returns full network name of underlying network.
func (h *Host) Network() string { return h.subnet.Network() }
// Name returns host name.
func (h *Host) Name() string { return h.name }
# Copyright (C) 2018 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com>
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your
# option) any later version, as published by the Free Software Foundation.
#
# You can also Link and Combine this program with other software covered by
# the terms of any of the Free Software licenses or any of the Open Source
# Initiative approved licenses and Convey the resulting work. Corresponding
# source of such a combination shall include the source code for all other
# software used.
#
# This program is distributed WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See COPYING file for full licensing terms.
# See https://www.nexedi.com/licensing for rationale and options.
# TODO doc, code
# XXX see lonet.go for lonet organization and protocol.
class SQLiteRegistry(object):
def __init__(self, dburi, network):
# XXX open
self.setup(network)
# TODO
// Copyright (C) 2018 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
package lonet
import (
"context"
"fmt"
"io"
"net"
"reflect"
"testing"
"golang.org/x/sync/errgroup"
"lab.nexedi.com/kirr/go123/exc"
)
// FIXME dup from pipenet:
// ---- 8< ----
type mklistener interface {
Listen(string) (net.Listener, error)
}
func xlisten(n mklistener, laddr string) net.Listener {
l, err := n.Listen(laddr)
exc.Raiseif(err)
return l
}
func xaccept(l net.Listener) net.Conn {
c, err := l.Accept()
exc.Raiseif(err)
return c
}
type dialer interface {
Dial(context.Context, string) (net.Conn, error)
}
func xdial(n dialer, addr string) net.Conn {
c, err := n.Dial(context.Background(), addr)
exc.Raiseif(err)
return c
}
func xread(r io.Reader) string {
buf := make([]byte, 4096)
n, err := r.Read(buf)
exc.Raiseif(err)
return string(buf[:n])
}
func xwrite(w io.Writer, data string) {
_, err := w.Write([]byte(data))
exc.Raiseif(err)
}
func xwait(w interface { Wait() error }) {
err := w.Wait()
exc.Raiseif(err)
}
func assertEq(t *testing.T, a, b interface{}) {
t.Helper()
if !reflect.DeepEqual(a, b) {
fmt.Printf("not equal:\nhave: %v\nwant: %v\n", a, b)
t.Errorf("not equal:\nhave: %v\nwant: %v", a, b)
exc.Raise(0)
}
}
// ---- 8< ----
// TODO test go-go
// TODO test go-py
// XXX handshake:
// - ok,
// - econnrefused (no such host, port not listening)
// - network mismatch
// - invalid request
func TestLonet(t *testing.T) {
// FIXME ~100% dup from TestPipeNet
X := exc.Raiseif
ctx := context.Background()
subnet, err := Join(ctx, "")
X(err)
// XXX defer shutdown/rm this lonet fully?
xaddr := func(addr string) *Addr {
a, err := subnet.parseAddr(addr)
X(err)
return a
}
, err := subnet.NewHost(ctx, "α")
X(err)
, err := subnet.NewHost(ctx, "β")
X(err)
assertEq(t, .Network(), subnet.Network())
assertEq(t, .Network(), subnet.Network())
assertEq(t, .Name(), "α")
assertEq(t, .Name(), "β")
l1, err := .Listen("")
X(err)
assertEq(t, l1.Addr(), xaddr("α:1"))
// zero port always stays unused even after autobind
_, err = .Dial(ctx, ":0")
assertEq(t, err, &net.OpError{Op: "dial", Net: subnet.Network(), Addr: xaddr("α:0"), Err: errConnRefused})
wg := &errgroup.Group{}
wg.Go(exc.Funcx(func() {
c1s := xaccept(l1)
assertEq(t, c1s.LocalAddr(), xaddr("α:2"))
assertEq(t, c1s.RemoteAddr(), xaddr("β:1"))
assertEq(t, xread(c1s), "ping")
xwrite(c1s, "pong")
c2s := xaccept(l1)
assertEq(t, c2s.LocalAddr(), xaddr("α:3"))
assertEq(t, c2s.RemoteAddr(), xaddr("β:2"))
assertEq(t, xread(c2s), "hello")
xwrite(c2s, "world")
}))
c1c := xdial(, "α:1")
assertEq(t, c1c.LocalAddr(), xaddr("β:1"))
assertEq(t, c1c.RemoteAddr(), xaddr("α:2"))
xwrite(c1c, "ping")
assertEq(t, xread(c1c), "pong")
c2c := xdial(, "α:1")
assertEq(t, c2c.LocalAddr(), xaddr("β:2"))
assertEq(t, c2c.RemoteAddr(), xaddr("α:3"))
xwrite(c2c, "hello")
assertEq(t, xread(c2c), "world")
xwait(wg)
l2 := xlisten(, ":0") // autobind again
assertEq(t, l2.Addr(), xaddr("α:4"))
}
// Copyright (C) 2018 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
package lonet
// registry of network hosts.
import (
"context"
"errors"
"fmt"
)
// registry represents access to lonet network registry.
//
// The registry holds information about hosts available on the network, and
// for each host its OS listening address. Whenever host α needs to establish XXX dup with lonet.go
// connection to address on host β, it queries the registry for β and further
// talks to β on that address. Correspondingly when a host joins the network,
// it announces itself to the registry so that other hosts could see it.
//
// The registry could be implemented in several ways, for example:
//
// - dedicated network server,
// - hosts broadcasting information to each other similar to ARP,
// - shared memory or file,
// - ...
type registry interface {
// XXX + network name?
// Announce announces host to registry.
//
// The host is named as hostname on lonet network and is listening for
// incoming lonet protocol connections on OS-level osladdr address.
//
// Returned error, if !nil, is *registryError with .Err describing the
// error cause:
//
// - errRegistryDown if registry cannot be accessed, XXX (and its underlying cause?)
// - errHostDup if hostname was already announced,
// - some other error indicating e.g. IO problem.
Announce(ctx context.Context, hostname, osladdr string) error
// Query queries registry for host.
//
// If successful - it returns OS-level network address to connect to
// the host via lonet protocol handshake.
//
// Returned error, if !nil, is *registryError with .Err describing the
// error cause:
//
// - errRegistryDown if registry cannot be accessed, XXX ^^^
// - errNoHost if hostname was not announced to registry,
// - some other error indicating e.g. IO problem.
Query(ctx context.Context, hostname string) (osladdr string, _ error)
// Close closes access to registry.
//
// Close interrupts all in-flight Announce and Query requests started
// via closing registry connection. Those interrupted requests will
// return with errRegistryDown error cause.
Close() error
}
var errRegistryDown = errors.New("registry is down")
var errNoHost = errors.New("no such host")
var errHostDup = errors.New("host already registered")
// registryError represents an error of a registry operation.
type registryError struct {
// XXX name of the network? - XXX yes
Registry string // name of the registry
Op string // operation that failed
Args interface{} // operation arguments, if any
Err error // actual error that occurred during the operation
}
func (e *registryError) Error() string {
s := e.Registry + ": " + e.Op
if e.Args != nil {
s += fmt.Sprintf(" %s", e.Args)
}
s += ": " + e.Err.Error()
return s
}
func (e *registryError) Cause() error {
return e.Err
}
// Copyright (C) 2018 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
package lonet
// registry implemented as shared SQLite file.
import (
"context"
"errors"
"fmt"
"crawshaw.io/sqlite"
"crawshaw.io/sqlite/sqliteutil"
"lab.nexedi.com/kirr/go123/xerr"
)
// registry schema (keep in sync wrt .setup())
//
// hosts:
// hostname text !null PK
// osladdr text !null
//
// meta:
// name text !null PK
// value text !null
//
// "schemaver" text - version of db schema.
// "network" text - name of lonet network this registry serves.
const schemaVer = "lonet.1"
type sqliteRegistry struct {
dbpool *sqlite.Pool
uri string // URI db was originally opened with
}
// openRegistrySQLite opens SQLite registry located at dburi.
//
// the registry is setup/verified to be serving specified lonet network.
func openRegistrySQLite(ctx context.Context, dburi, network string) (_ *sqliteRegistry, err error) {
r := &sqliteRegistry{uri: dburi}
defer r.regerr(&err, "open")
dbpool, err := sqlite.Open(dburi, 0, /* poolSize= */16) // XXX pool size ok?
if err != nil {
return nil, err
}
r.dbpool = dbpool
// initialize/check db
err = r.setup(ctx, network)
if err != nil {
r.Close()
return nil, err
}
return r, nil
}
// Close implements registry.
func (r *sqliteRegistry) Close() (err error) {
defer r.regerr(&err, "close")
return r.dbpool.Close()
}
// withConn runs f on a dbpool connection.
//
// connection is first allocated from dbpool and put back after call to f.
func (r *sqliteRegistry) withConn(ctx context.Context, f func(*sqlite.Conn) error) error {
conn := r.dbpool.Get(ctx.Done())
if conn == nil {
// either ctx cancel or dbpool close
if ctx.Err() != nil {
return ctx.Err()
}
return errRegistryDown // db closed
}
defer r.dbpool.Put(conn)
return f(conn)
}
var errNoRows = errors.New("query: empty result")
var errManyRows = errors.New("query: multiple results")
// query1 is like sqliteutil.Exec but checks that exactly 1 row is returned.
//
// if query results in no rows - errNoRows is returned.
// if query results in more than 1 row - errManyRows is returned.
func query1(conn *sqlite.Conn, query string, resultf func(stmt *sqlite.Stmt), argv ...interface{}) error {
nrow := 0
err := sqliteutil.Exec(conn, query, func(stmt *sqlite.Stmt) error {
if nrow == 1 {
return errManyRows
}
nrow++
resultf(stmt)
return nil
}, argv...)
if err != nil {
return err
}
if nrow == 0 {
return errNoRows
}
return nil
}
// setup initializes/checks registry database to be of expected schema and configuration.
func (r *sqliteRegistry) setup(ctx context.Context, network string) (err error) {
defer xerr.Contextf(&err, "setup %q", network)
return r.withConn(ctx, func(conn *sqlite.Conn) (err error) {
// NOTE: keep in sync wrt top-level text.
err = sqliteutil.ExecScript(conn, `
CREATE TABLE IF NOT EXISTS hosts (
hostname TEXT NON NULL PRIMARY KEY,
osladdr TEXT NON NULL
);
CREATE TABLE IF NOT EXISTS meta (
name TEXT NON NULL PRIMARY KEY,
value TEXT NON NULL
);
`)
if err != nil {
return err
}
// do whole checks/init under transaction, so that there is
// e.g. no race wrt another process setting config.
defer sqliteutil.Save(conn)(&err)
// check/init schema version
ver, err := r.config(conn, "schemaver")
if err != nil {
return err
}
if ver == "" {
ver = schemaVer
err = r.setConfig(conn, "schemaver", ver)
if err != nil {
return err
}
}
if ver != schemaVer {
return fmt.Errorf("schema version mismatch: want %q; have %q", schemaVer, ver)
}
// check/init network name
dbnetwork, err := r.config(conn, "network")
if err != nil {
return err
}
if dbnetwork == "" {
dbnetwork = network
err = r.setConfig(conn, "network", dbnetwork)
if err != nil {
return err
}
}
if dbnetwork != network {
return fmt.Errorf("network name mismatch: want %q; have %q", network, dbnetwork)
}
return nil
})
}
// config gets one registry configuration value by name.
//
// if there is no record corresponding to name - ("", nil) is returned.
// XXX add ok ret to indicate presence of value?
func (r *sqliteRegistry) config(conn *sqlite.Conn, name string) (value string, err error) {
defer xerr.Contextf(&err, "config: get %q", name)
err = query1(conn, "SELECT value FROM meta WHERE name = ?", func(stmt *sqlite.Stmt) {
value = stmt.ColumnText(0)
}, name)
switch err {
case errNoRows:
return "", nil
case errManyRows:
value = ""
}
return value, err
}
// setConfig sets one registry configuration value by name.
func (r *sqliteRegistry) setConfig(conn *sqlite.Conn, name, value string) (err error) {
defer xerr.Contextf(&err, "config: set %q = %q", name, value)
err = sqliteutil.Exec(conn,
"INSERT OR REPLACE INTO meta (name, value) VALUES (?, ?)", nil,
name, value)
return err
}
// Announce implements registry.
func (r *sqliteRegistry) Announce(ctx context.Context, hostname, osladdr string) (err error) {
defer r.regerr(&err, "announce", hostname, osladdr)
return r.withConn(ctx, func(conn *sqlite.Conn) error {
err := sqliteutil.Exec(conn,
"INSERT INTO hosts (hostname, osladdr) VALUES (?, ?)", nil,
hostname, osladdr)
switch sqlite.ErrCode(err) {
case sqlite.SQLITE_CONSTRAINT_UNIQUE:
err = errHostDup
}
return err
})
}
var errRegDup = errors.New("registry broken: duplicate host entries")
// Query implements registry.
func (r *sqliteRegistry) Query(ctx context.Context, hostname string) (osladdr string, err error) {
defer r.regerr(&err, "query", hostname)
err = r.withConn(ctx, func(conn *sqlite.Conn) error {
err := query1(conn, "SELECT osladdr FROM hosts WHERE hostname = ?",
func (stmt *sqlite.Stmt) {
osladdr = stmt.ColumnText(0)
}, hostname)
switch err {
case errNoRows:
return errNoHost
case errManyRows:
// hostname is PK - we should not get several results
osladdr = ""
return errRegDup
}
return err
})
return osladdr, err
}
// regerr is syntactic sugar to wrap !nil *errp into registryError.
//
// intended too be used like
//
// defer r.regerr(&err, "operation", arg1, arg2, ...)
func (r *sqliteRegistry) regerr(errp *error, op string, args ...interface{}) {
if *errp == nil {
return
}
// XXX name of the network
*errp = &registryError{
Registry: r.uri,
Op: op,
Args: args,
Err: *errp,
}
}
// Copyright (C) 2018 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
package lonet
import (
"context"
"fmt"
"io/ioutil"
"os"
"testing"
"lab.nexedi.com/kirr/go123/exc"
)
func TestRegistrySQLite(t *testing.T) {
X := exc.Raiseif
work, err := ioutil.TempDir("", "t-registry-sqlite")
X(err)
defer os.RemoveAll(work)
dbpath := work + "/1.db"
ctx := context.Background()
r, err := openRegistrySQLite(ctx, dbpath, "aaa")
X(err)
// query checks that result of Query(hostname) is as expected.
//
// if expect is error - it checks that Query returns error with cause == expect.
// otherwise expect must be string and it will check that Query
// succeeds and returns osladdr == expect.
query := func(r *sqliteRegistry, hostname string, expect interface{}) {
// XXX ^^^ -> `r registry` (needs .Network() to get network name) ?
t.Helper()
osladdr, err := r.Query(ctx, hostname)
if ewant, iserr := expect.(error); iserr {
// error expected
// XXX construct full registry error around ewant + reflect.DeepCompare?
e, ok := err.(*registryError)
if !(ok && e.Err == ewant && osladdr == "") {
t.Fatalf("%s: query %q:\nwant: \"\", %v\nhave: %q, %v",
r.uri, hostname, ewant, osladdr, err)
}
} else {
// !error expected
laddr := expect.(string)
if !(osladdr == laddr && err == nil) {
t.Fatalf("%s: query %q:\nwant: %q, nil\nhave: %q, %v",
r.uri, hostname, laddr, osladdr, err)
}
}
}
// announce checks that result of Announce(hostname, osladdr) is as expected.
//
// if len(errv) == 1 - it checks that Announce returns error with cause == errv[0].
// otherwise it will check that Announce succeeds and returns nil error.
announce := func(r *sqliteRegistry, hostname, osladdr string, errv ...error) {
t.Helper()
err := r.Announce(ctx, hostname, osladdr)
var ewant error
if len(errv) > 0 {
ewant = errv[0]
if len(errv) > 1 {
panic("only 1 error allowed in announce check")
}
}
if ewant != nil {
// error expected
// XXX construct full registry error around ewant + reflect.DeepCompare?
e, ok := err.(*registryError)
if (!ok && e.Err == ewant) {
t.Fatalf("%s: announce %q %q:\nwant %v\nhave: %v",
r.uri, hostname, osladdr, ewant, err)
}
} else {
// !error expected
if err != nil {
t.Fatalf("%s: announce %q %q: %s", r.uri, hostname, osladdr, err)
}
}
}
ø := errNoHost
// r.Network() == ...
query(r, "α", ø)
announce(r, "α", "alpha:1234")
announce(r, "α", "alpha:1234", errHostDup)
announce(r, "α", "alpha:1235", errHostDup)
query(r, "α", "alpha:1234")
query(r, "β", ø)
r2, err := openRegistrySQLite(ctx, dbpath, "aaa")
// r2.Network() == ...
query(r2, "α", "alpha:1234")
query(r2, "β", ø)
announce(r2, "β", "beta:zzz")
query(r2, "β", "beta:zzz")
query(r, "β", "beta:zzz")
X(r.Close())
query(r, "α", errRegistryDown)
query(r, "β", errRegistryDown)
announce(r, "γ", "gamma:qqq", errRegistryDown)
query(r, "γ", errRegistryDown)
query(r2, "α", "alpha:1234")
X(r2.Close())
query(r2, "α", errRegistryDown)
// verify network mismatch detection works
r3, err := openRegistrySQLite(ctx, dbpath, "bbb")
if !(r3 == nil && err != nil) {
t.Fatalf("network mismatch: not detected")
}
errWant := fmt.Sprintf(`%s: open []: setup "bbb": network name mismatch: want "bbb"; have "aaa"`, dbpath)
if err.Error() != errWant {
t.Fatalf("network mismatch: error:\nhave: %q\nwant: %q", err.Error(), errWant)
}
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment