Commit 964b6cf3 authored by Rob Pike's avatar Rob Pike

add HTTP support

R=rsc
DELTA=159  (110 added, 29 deleted, 20 changed)
OCL=31646
CL=31652
parent b6e66639
...@@ -8,6 +8,7 @@ import ( ...@@ -8,6 +8,7 @@ import (
"gob"; "gob";
"io"; "io";
"log"; "log";
"net";
"os"; "os";
"rpc"; "rpc";
"sync"; "sync";
...@@ -94,6 +95,25 @@ func NewClient(conn io.ReadWriteCloser) *Client { ...@@ -94,6 +95,25 @@ func NewClient(conn io.ReadWriteCloser) *Client {
return client; return client;
} }
// Dial connects to an HTTP RPC server at the specified network address.
func DialHTTP(network, address string) (*Client, os.Error) {
conn, err := net.Dial(network, "", address);
if err != nil {
return nil, err
}
io.WriteString(conn, "GET " + rpcPath + " HTTP/1.0\n\n");
return NewClient(conn), nil;
}
// Dial connects to an RPC server at the specified network address.
func Dial(network, address string) (*Client, os.Error) {
conn, err := net.Dial(network, "", address);
if err != nil {
return nil, err
}
return NewClient(conn), nil;
}
// Go invokes the function asynchronously. It returns the Call structure representing // Go invokes the function asynchronously. It returns the Call structure representing
// the invocation. // the invocation.
func (client *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call { func (client *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call {
......
...@@ -6,6 +6,7 @@ package rpc ...@@ -6,6 +6,7 @@ package rpc
import ( import (
"gob"; "gob";
"http";
"log"; "log";
"io"; "io";
"net"; "net";
...@@ -17,8 +18,6 @@ import ( ...@@ -17,8 +18,6 @@ import (
"utf8"; "utf8";
) )
import "fmt" // TODO DELETE
// Precompute the reflect type for os.Error. Can't use os.Error directly // Precompute the reflect type for os.Error. Can't use os.Error directly
// because Typeof takes an empty interface value. This is annoying. // because Typeof takes an empty interface value. This is annoying.
var unusedError *os.Error; var unusedError *os.Error;
...@@ -50,25 +49,22 @@ type Response struct { ...@@ -50,25 +49,22 @@ type Response struct {
Error string; Error string;
} }
// Server represents the set of services available to an RPC client. type serverType struct {
// The zero type for Server is ready to have services added.
type Server struct {
serviceMap map[string] *service; serviceMap map[string] *service;
} }
// This variable is a global whose "public" methods are really private methods
// called from the global functions of this package: rpc.Add, rpc.ServeConn, etc.
// For example, rpc.Add() calls server.add().
var server = &serverType{ make(map[string] *service) }
// Is this a publicly vislble - upper case - name? // Is this a publicly vislble - upper case - name?
func isPublic(name string) bool { func isPublic(name string) bool {
rune, wid_ := utf8.DecodeRuneInString(name); rune, wid_ := utf8.DecodeRuneInString(name);
return unicode.IsUpper(rune) return unicode.IsUpper(rune)
} }
// Add publishes in the server the set of methods of the func (server *serverType) add(rcvr interface{}) os.Error {
// recevier value that satisfy the following conditions:
// - public method
// - two arguments, both pointers to structs
// - one return value of type os.Error
// It returns an error if the receiver is not suitable.
func (server *Server) Add(rcvr interface{}) os.Error {
if server.serviceMap == nil { if server.serviceMap == nil {
server.serviceMap = make(map[string] *service); server.serviceMap = make(map[string] *service);
} }
...@@ -80,7 +76,7 @@ func (server *Server) Add(rcvr interface{}) os.Error { ...@@ -80,7 +76,7 @@ func (server *Server) Add(rcvr interface{}) os.Error {
log.Exit("rpc: no service name for type", s.typ.String()) log.Exit("rpc: no service name for type", s.typ.String())
} }
if !isPublic(sname) { if !isPublic(sname) {
s := "rpc server.Add: type " + sname + " is not public"; s := "rpc Add: type " + sname + " is not public";
log.Stderr(s); log.Stderr(s);
return os.ErrorString(s); return os.ErrorString(s);
} }
...@@ -132,7 +128,7 @@ func (server *Server) Add(rcvr interface{}) os.Error { ...@@ -132,7 +128,7 @@ func (server *Server) Add(rcvr interface{}) os.Error {
} }
if len(s.method) == 0 { if len(s.method) == 0 {
s := "rpc server.Add: type " + sname + " has no public methods of suitable type"; s := "rpc Add: type " + sname + " has no public methods of suitable type";
log.Stderr(s); log.Stderr(s);
return os.ErrorString(s); return os.ErrorString(s);
} }
...@@ -177,7 +173,7 @@ func (s *service) call(sending *sync.Mutex, function *reflect.FuncValue, req *Re ...@@ -177,7 +173,7 @@ func (s *service) call(sending *sync.Mutex, function *reflect.FuncValue, req *Re
s.sendResponse(sending, req, replyv.Interface(), enc, errmsg); s.sendResponse(sending, req, replyv.Interface(), enc, errmsg);
} }
func (server *Server) serve(conn io.ReadWriteCloser) { func (server *serverType) serve(conn io.ReadWriteCloser) {
dec := gob.NewDecoder(conn); dec := gob.NewDecoder(conn);
enc := gob.NewEncoder(conn); enc := gob.NewEncoder(conn);
sending := new(sync.Mutex); sending := new(sync.Mutex);
...@@ -222,20 +218,69 @@ func (server *Server) serve(conn io.ReadWriteCloser) { ...@@ -222,20 +218,69 @@ func (server *Server) serve(conn io.ReadWriteCloser) {
conn.Close(); conn.Close();
} }
func (server *serverType) accept(lis net.Listener) {
for {
conn, addr, err := lis.Accept();
if err != nil {
log.Exit("rpc.Serve: accept:", err.String()); // TODO(r): exit?
}
go server.serve(conn);
}
}
// Add publishes in the server the set of methods of the
// receiver value that satisfy the following conditions:
// - public method
// - two arguments, both pointers to structs
// - one return value of type os.Error
// It returns an error if the receiver is not suitable.
func Add(rcvr interface{}) os.Error {
return server.add(rcvr)
}
// ServeConn runs the server on a single connection. When the connection // ServeConn runs the server on a single connection. When the connection
// completes, service terminates. // completes, service terminates.
func (server *Server) ServeConn(conn io.ReadWriteCloser) { func ServeConn(conn io.ReadWriteCloser) {
go server.serve(conn) go server.serve(conn)
} }
// Accept accepts connections on the listener and serves requests // Accept accepts connections on the listener and serves requests
// for each incoming connection. // for each incoming connection.
func (server *Server) Accept(lis net.Listener) { func Accept(lis net.Listener) {
for { server.accept(lis)
conn, addr, err := lis.Accept(); }
if err != nil {
log.Exit("rpc.Serve: accept:", err.String()); // TODO(r): exit? type bufRWC struct {
} r io.Reader;
go server.ServeConn(conn); w io.Writer;
c io.Closer;
}
func (b *bufRWC) Read(p []byte) (n int, err os.Error) {
return b.r.Read(p);
}
func (b *bufRWC) Write(p []byte) (n int, err os.Error) {
return b.w.Write(p);
}
func (b *bufRWC) Close() os.Error {
return b.c.Close();
}
func serveHTTP(c *http.Conn, req *http.Request) {
conn, buf, err := c.Hijack();
if err != nil {
log.Stderr("rpc hijacking ", c.RemoteAddr, ": ", err.String());
return;
} }
server.serve(&bufRWC{buf, conn, conn});
}
var rpcPath string = "/_goRPC_"
// HandleHTTP registers an HTTP handler for RPC messages.
// It is still necessary to call http.Serve().
func HandleHTTP() {
http.Handle(rpcPath, http.HandlerFunc(serveHTTP));
} }
...@@ -5,7 +5,6 @@ ...@@ -5,7 +5,6 @@
package rpc package rpc
import ( import (
"fmt";
"gob"; "gob";
"http"; "http";
"io"; "io";
...@@ -19,6 +18,7 @@ import ( ...@@ -19,6 +18,7 @@ import (
) )
var serverAddr string var serverAddr string
var httpServerAddr string
const second = 1e9 const second = 1e9
...@@ -56,30 +56,35 @@ func (t *Arith) Error(args *Args, reply *Reply) os.Error { ...@@ -56,30 +56,35 @@ func (t *Arith) Error(args *Args, reply *Reply) os.Error {
} }
func startServer() { func startServer() {
server := new(Server); rpc.Add(new(Arith));
server.Add(new(Arith));
l, e := net.Listen("tcp", ":0"); // any available address l, e := net.Listen("tcp", ":0"); // any available address
if e != nil { if e != nil {
log.Stderrf("net.Listen tcp :0: %v", e); log.Exitf("net.Listen tcp :0: %v", e);
os.Exit(1);
} }
serverAddr = l.Addr(); serverAddr = l.Addr();
log.Stderr("Test RPC server listening on ", serverAddr); log.Stderr("Test RPC server listening on ", serverAddr);
go server.Accept(l); go rpc.Accept(l);
HandleHTTP();
l, e = net.Listen("tcp", ":0"); // any available address
if e != nil {
log.Stderrf("net.Listen tcp :0: %v", e);
os.Exit(1);
}
httpServerAddr = l.Addr();
log.Stderr("Test HTTP RPC server listening on ", httpServerAddr);
go http.Serve(l, nil);
} }
func TestRPC(t *testing.T) { func TestRPC(t *testing.T) {
var i int;
once.Do(startServer); once.Do(startServer);
conn, err := net.Dial("tcp", "", serverAddr); client, err := Dial("tcp", serverAddr);
if err != nil { if err != nil {
t.Fatal("dialing:", err) t.Fatal("dialing", err);
} }
client := NewClient(conn);
// Synchronous calls // Synchronous calls
args := &Args{7,8}; args := &Args{7,8};
reply := new(Reply); reply := new(Reply);
...@@ -124,9 +129,24 @@ func TestRPC(t *testing.T) { ...@@ -124,9 +129,24 @@ func TestRPC(t *testing.T) {
} }
} }
func TestCheckUnknownService(t *testing.T) { func TestHTTPRPC(t *testing.T) {
var i int; once.Do(startServer);
client, err := DialHTTP("tcp", httpServerAddr);
if err != nil {
t.Fatal("dialing", err);
}
// Synchronous calls
args := &Args{7,8};
reply := new(Reply);
err = client.Call("Arith.Add", args, reply);
if reply.C != args.A + args.B {
t.Errorf("Add: expected %d got %d", reply.C, args.A + args.B);
}
}
func TestCheckUnknownService(t *testing.T) {
once.Do(startServer); once.Do(startServer);
conn, err := net.Dial("tcp", "", serverAddr); conn, err := net.Dial("tcp", "", serverAddr);
...@@ -147,8 +167,6 @@ func TestCheckUnknownService(t *testing.T) { ...@@ -147,8 +167,6 @@ func TestCheckUnknownService(t *testing.T) {
} }
func TestCheckUnknownMethod(t *testing.T) { func TestCheckUnknownMethod(t *testing.T) {
var i int;
once.Do(startServer); once.Do(startServer);
conn, err := net.Dial("tcp", "", serverAddr); conn, err := net.Dial("tcp", "", serverAddr);
...@@ -169,8 +187,6 @@ func TestCheckUnknownMethod(t *testing.T) { ...@@ -169,8 +187,6 @@ func TestCheckUnknownMethod(t *testing.T) {
} }
func TestCheckBadType(t *testing.T) { func TestCheckBadType(t *testing.T) {
var i int;
once.Do(startServer); once.Do(startServer);
conn, err := net.Dial("tcp", "", serverAddr); conn, err := net.Dial("tcp", "", serverAddr);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment