Commit 4d45dd32 authored by Rob Pike's avatar Rob Pike

first part of networked channels.

limitations:
	poor error handling
	teardown not done
	exporter must send, importer must receive
	testing is rudimentary at best

R=rsc
CC=golang-dev
https://golang.org/cl/186234
parent 75c6dc9f
# Copyright 2009 The Go Authors. All rights reserved.
# Use of this source code is governed by a BSD-style
# license that can be found in the LICENSE file.
include ../../Make.$(GOARCH)
TARG=netchan
GOFILES=\
common.go\
export.go\
import.go\
include ../../Make.pkg
// Copyright 2009 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package netchan
import (
"gob"
"log"
"net"
"os"
"sync"
)
type Dir int
const (
Recv Dir = iota
Send
)
// Mutex-protected encoder and decoder pair
type encDec struct {
decLock sync.Mutex
dec *gob.Decoder
encLock sync.Mutex
enc *gob.Encoder
}
func newEncDec(conn net.Conn) *encDec {
return &encDec{
dec: gob.NewDecoder(conn),
enc: gob.NewEncoder(conn),
}
}
func (ed *encDec) decode(e interface{}) os.Error {
ed.decLock.Lock()
defer ed.decLock.Unlock()
err := ed.dec.Decode(e)
if err != nil {
log.Stderr("exporter decode:", err)
// TODO: tear down connection
return err
}
return nil
}
func (ed *encDec) encode(e0, e1 interface{}) os.Error {
ed.encLock.Lock()
defer ed.encLock.Unlock()
err := ed.enc.Encode(e0)
if err == nil && e1 != nil {
err = ed.enc.Encode(e1)
}
if err != nil {
log.Stderr("exporter encode:", err)
// TODO: tear down connection?
return err
}
return nil
}
// Copyright 2009 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
/*
The netchan package implements type-safe networked channels:
it allows the two ends of a channel to appear on different
computers connected by a network. It does this by transporting
data sent to a channel on one machine so it can be recovered
by a receive of a channel of the same type on the other.
An exporter publishes a set of channels by name. An importer
connects to the exporting machine and imports the channels
by name. After importing the channels, the two machines can
use the channels in the usual way.
Networked channels are not synchronized; they always behave
as if there is a buffer of at least one element between the
two machines.
TODO: at the moment, the exporting machine must send and
the importing machine must receive. This restriction will
be lifted soon.
*/
package netchan
import (
"log"
"net"
"os"
"reflect"
"sync"
)
// Export
// A channel and its associated information: a direction
type exportChan struct {
ch *reflect.ChanValue
dir Dir
}
// An Exporter allows a set of channels to be published on a single
// network port. A single machine may have multiple Exporters
// but they must use different ports.
type Exporter struct {
listener net.Listener
chanLock sync.Mutex // protects access to channel map
chans map[string]*exportChan
}
type expClient struct {
*encDec
exp *Exporter
}
func newClient(exp *Exporter, conn net.Conn) *expClient {
client := new(expClient)
client.exp = exp
client.encDec = newEncDec(conn)
return client
}
// TODO: ASSUMES EXPORT MEANS SEND
// Sent once per channel from importer to exporter to report that it's listening to a channel
type request struct {
name string
dir Dir
count int
}
// Reply to request, sent from exporter to importer on each send.
type response struct {
name string
error string
}
// Wait for incoming connections, start a new runner for each
func (exp *Exporter) listen() {
for {
conn, err := exp.listener.Accept()
if err != nil {
log.Stderr("exporter.listen:", err)
break
}
log.Stderr("accepted call from", conn.RemoteAddr())
client := newClient(exp, conn)
go client.run()
}
}
// Send a single client all its data. For each request, this will launch
// a serveRecv goroutine to deliver the data for that channel.
func (client *expClient) run() {
req := new(request)
for {
if err := client.decode(req); err != nil {
log.Stderr("error decoding client request:", err)
// TODO: tear down connection
break
}
log.Stderrf("export request: %+v", req)
if req.dir == Recv {
go client.serveRecv(req)
} else {
log.Stderr("export request: can't handle channel direction", req.dir)
resp := new(response)
resp.name = req.name
resp.error = "export request: can't handle channel direction"
client.encode(resp, nil)
break
}
}
}
// Send all the data on a single channel to a client asking for a Recv
func (client *expClient) serveRecv(req *request) {
exp := client.exp
resp := new(response)
resp.name = req.name
var ok bool
exp.chanLock.Lock()
ech, ok := exp.chans[req.name]
exp.chanLock.Unlock()
if !ok {
resp.error = "no such channel: " + req.name
log.Stderr("export:", resp.error)
client.encode(resp, nil) // ignore any encode error, hope client gets it
return
}
for {
if ech.dir != Send {
log.Stderr("TODO: recv export unimplemented")
break
}
val := ech.ch.Recv()
if err := client.encode(resp, val.Interface()); err != nil {
log.Stderr("error encoding client response:", err)
break
}
if req.count > 0 {
req.count--
if req.count == 0 {
break
}
}
}
}
// NewExporter creates a new Exporter to export channels
// on the network and local address defined as in net.Listen.
func NewExporter(network, localaddr string) (*Exporter, os.Error) {
listener, err := net.Listen(network, localaddr)
if err != nil {
return nil, err
}
e := &Exporter{
listener: listener,
chans: make(map[string]*exportChan),
}
go e.listen()
return e, nil
}
// Addr returns the Exporter's local network address.
func (exp *Exporter) Addr() net.Addr { return exp.listener.Addr() }
func checkChan(chT interface{}, dir Dir) (*reflect.ChanValue, os.Error) {
chanType, ok := reflect.Typeof(chT).(*reflect.ChanType)
if !ok {
return nil, os.ErrorString("not a channel")
}
if dir != Send && dir != Recv {
return nil, os.ErrorString("unknown channel direction")
}
switch chanType.Dir() {
case reflect.BothDir:
case reflect.SendDir:
if dir != Recv {
return nil, os.ErrorString("to import/export with Send, must provide <-chan")
}
case reflect.RecvDir:
if dir != Send {
return nil, os.ErrorString("to import/export with Recv, must provide chan<-")
}
}
return reflect.NewValue(chT).(*reflect.ChanValue), nil
}
// Export exports a channel of a given type and specified direction. The
// channel to be exported is provided in the call and may be of arbitrary
// channel type.
// Despite the literal signature, the effective signature is
// Export(name string, chT chan T, dir Dir)
// where T must be a struct, pointer to struct, etc.
func (exp *Exporter) Export(name string, chT interface{}, dir Dir) os.Error {
ch, err := checkChan(chT, dir)
if err != nil {
return err
}
exp.chanLock.Lock()
defer exp.chanLock.Unlock()
_, present := exp.chans[name]
if present {
return os.ErrorString("channel name already being exported:" + name)
}
exp.chans[name] = &exportChan{ch, dir}
return nil
}
// Copyright 2009 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package netchan
import (
"log"
"net"
"os"
"reflect"
"sync"
)
// Import
// A channel and its associated information: a template value, direction and a count
type importChan struct {
ch *reflect.ChanValue
dir Dir
ptr *reflect.PtrValue // a pointer value we can point at each new item
count int
}
// An Importer allows a set of channels to be imported from a single
// remote machine/network port. A machine may have multiple
// importers, even from the same machine/network port.
type Importer struct {
*encDec
conn net.Conn
chanLock sync.Mutex // protects access to channel map
chans map[string]*importChan
}
// TODO: ASSUMES IMPORT MEANS RECEIVE
// NewImporter creates a new Importer object to import channels
// from an Exporter at the network and remote address as defined in net.Dial.
// The Exporter must be available and serving when the Importer is
// created.
func NewImporter(network, remoteaddr string) (*Importer, os.Error) {
conn, err := net.Dial(network, "", remoteaddr)
if err != nil {
return nil, err
}
imp := new(Importer)
imp.encDec = newEncDec(conn)
imp.conn = conn
imp.chans = make(map[string]*importChan)
go imp.run()
return imp, nil
}
// Handle the data from a single imported data stream, which will
// have the form
// (response, data)*
// The response identifies by name which channel is receiving data.
// TODO: allow an importer to send.
func (imp *Importer) run() {
// Loop on responses; requests are sent by ImportNValues()
resp := new(response)
for {
if err := imp.decode(resp); err != nil {
log.Stderr("importer response decode:", err)
break
}
if resp.error != "" {
log.Stderr("importer response error:", resp.error)
// TODO: tear down connection
break
}
imp.chanLock.Lock()
ich, ok := imp.chans[resp.name]
imp.chanLock.Unlock()
if !ok {
log.Stderr("unknown name in request:", resp.name)
break
}
if ich.dir != Recv {
log.Stderr("TODO: import send unimplemented")
break
}
// Create a new value for each received item.
val := reflect.MakeZero(ich.ptr.Type().(*reflect.PtrType).Elem())
ich.ptr.PointTo(val)
if err := imp.decode(ich.ptr.Interface()); err != nil {
log.Stderr("importer value decode:", err)
return
}
ich.ch.Send(val)
}
}
// Import imports a channel of the given type and specified direction.
// It is equivalent to ImportNValues with a count of 0, meaning unbounded.
func (imp *Importer) Import(name string, chT interface{}, dir Dir, pT interface{}) os.Error {
return imp.ImportNValues(name, chT, dir, pT, 0)
}
// ImportNValues imports a channel of the given type and specified direction
// and then receives or transmits up to n values on that channel. A value of
// n==0 implies an unbounded number of values. The channel to be bound to
// the remote site's channel is provided in the call and may be of arbitrary
// channel type.
// Despite the literal signature, the effective signature is
// ImportNValues(name string, chT chan T, dir Dir, pT T)
// where T must be a struct, pointer to struct, etc. pT may be more indirect
// than the value type of the channel (e.g. chan T, pT *T) but it must be a
// pointer.
// Example usage:
// imp, err := NewImporter("tcp", "netchanserver.mydomain.com:1234")
// if err != nil { log.Exit(err) }
// ch := make(chan myType)
// err := imp.ImportNValues("name", ch, Recv, new(myType), 1)
// if err != nil { log.Exit(err) }
// fmt.Printf("%+v\n", <-ch)
// (TODO: Can we eliminate the need for pT?)
func (imp *Importer) ImportNValues(name string, chT interface{}, dir Dir, pT interface{}, n int) os.Error {
ch, err := checkChan(chT, dir)
if err != nil {
return err
}
// Make sure pT is a pointer (to a pointer...) to a struct.
rt := reflect.Typeof(pT)
if _, ok := rt.(*reflect.PtrType); !ok {
return os.ErrorString("not a pointer:" + rt.String())
}
if _, ok := reflect.Indirect(reflect.NewValue(pT)).(*reflect.StructValue); !ok {
return os.ErrorString("not a pointer to a struct:" + rt.String())
}
imp.chanLock.Lock()
defer imp.chanLock.Unlock()
_, present := imp.chans[name]
if present {
return os.ErrorString("channel name already being imported:" + name)
}
ptr := reflect.MakeZero(reflect.Typeof(pT)).(*reflect.PtrValue)
imp.chans[name] = &importChan{ch, dir, ptr, n}
// Tell the other side about this channel.
req := new(request)
req.name = name
req.dir = dir
req.count = n
if err := imp.encode(req, nil); err != nil {
log.Stderr("importer request encode:", err)
return err
}
return nil
}
// Copyright 2009 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package netchan
import (
"fmt"
"testing"
)
type value struct {
i int
s string
}
func exportSend(exp *Exporter, t *testing.T) {
c := make(chan value)
err := exp.Export("name", c, Send)
if err != nil {
t.Fatal("export:", err)
}
c <- value{23, "hello"}
}
func importReceive(imp *Importer, t *testing.T) {
ch := make(chan value)
err := imp.ImportNValues("name", ch, Recv, new(value), 1)
if err != nil {
t.Fatal("import:", err)
}
v := <-ch
fmt.Printf("%v\n", v)
if v.i != 23 || v.s != "hello" {
t.Errorf("bad value: expected 23, hello; got %+v\n", v)
}
}
func TestBabyStep(t *testing.T) {
exp, err := NewExporter("tcp", ":0")
if err != nil {
t.Fatal("new exporter:", err)
}
go exportSend(exp, t)
imp, err := NewImporter("tcp", exp.Addr().String())
if err != nil {
t.Fatal("new importer:", err)
}
importReceive(imp, t)
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment