Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
G
go
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
Analytics
Analytics
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Commits
Issue Boards
Open sidebar
Kirill Smelkov
go
Commits
b07af158
Commit
b07af158
authored
Jul 14, 2009
by
Rob Pike
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
improve rpc code. more robust. more tests.
R=rsc DELTA=186 (133 added, 20 deleted, 33 changed) OCL=31611 CL=31616
parent
bc2fda9c
Changes
4
Show whitespace changes
Inline
Side-by-side
Showing
4 changed files
with
158 additions
and
45 deletions
+158
-45
src/pkg/gob/decoder.go
src/pkg/gob/decoder.go
+1
-1
src/pkg/rpc/client.go
src/pkg/rpc/client.go
+28
-12
src/pkg/rpc/server.go
src/pkg/rpc/server.go
+55
-17
src/pkg/rpc/server_test.go
src/pkg/rpc/server_test.go
+74
-15
No files found.
src/pkg/gob/decoder.go
View file @
b07af158
...
@@ -73,7 +73,7 @@ func (dec *Decoder) Decode(e interface{}) os.Error {
...
@@ -73,7 +73,7 @@ func (dec *Decoder) Decode(e interface{}) os.Error {
// TODO(r): need to make the decoder work correctly if the wire type is compatible
// TODO(r): need to make the decoder work correctly if the wire type is compatible
// but not equal to the local type (e.g, extra fields).
// but not equal to the local type (e.g, extra fields).
if
info
.
wire
.
name
()
!=
dec
.
seen
[
id
]
.
name
()
{
if
info
.
wire
.
name
()
!=
dec
.
seen
[
id
]
.
name
()
{
dec
.
state
.
err
=
os
.
ErrorString
(
"gob decode: incorrect type for wire value
"
);
dec
.
state
.
err
=
os
.
ErrorString
(
"gob decode: incorrect type for wire value
: want "
+
info
.
wire
.
name
()
+
"; received "
+
dec
.
seen
[
id
]
.
name
()
);
return
dec
.
state
.
err
return
dec
.
state
.
err
}
}
...
...
src/pkg/rpc/client.go
View file @
b07af158
...
@@ -7,6 +7,7 @@ package rpc
...
@@ -7,6 +7,7 @@ package rpc
import
(
import
(
"gob"
;
"gob"
;
"io"
;
"io"
;
"log"
;
"os"
;
"os"
;
"rpc"
;
"rpc"
;
"sync"
;
"sync"
;
...
@@ -25,6 +26,7 @@ type Call struct {
...
@@ -25,6 +26,7 @@ type Call struct {
// Client represents an RPC Client.
// Client represents an RPC Client.
type
Client
struct
{
type
Client
struct
{
sync
.
Mutex
;
// protects pending, seq
sync
.
Mutex
;
// protects pending, seq
closed
bool
;
sending
sync
.
Mutex
;
sending
sync
.
Mutex
;
seq
uint64
;
seq
uint64
;
conn
io
.
ReadWriteCloser
;
conn
io
.
ReadWriteCloser
;
...
@@ -49,29 +51,35 @@ func (client *Client) send(c *Call) {
...
@@ -49,29 +51,35 @@ func (client *Client) send(c *Call) {
client
.
enc
.
Encode
(
request
);
client
.
enc
.
Encode
(
request
);
err
:=
client
.
enc
.
Encode
(
c
.
Args
);
err
:=
client
.
enc
.
Encode
(
c
.
Args
);
if
err
!=
nil
{
if
err
!=
nil
{
panicln
(
"
client encode error:"
,
err
)
panicln
(
"
rpc: client encode error:"
,
err
);
}
}
client
.
sending
.
Unlock
();
client
.
sending
.
Unlock
();
}
}
func
(
client
*
Client
)
serve
()
{
func
(
client
*
Client
)
serve
()
{
for
{
var
err
os
.
Error
;
for
err
==
nil
{
response
:=
new
(
Response
);
response
:=
new
(
Response
);
err
:=
client
.
dec
.
Decode
(
response
);
err
=
client
.
dec
.
Decode
(
response
);
if
err
!=
nil
{
if
err
==
os
.
EOF
{
break
;
}
break
;
}
seq
:=
response
.
Seq
;
seq
:=
response
.
Seq
;
client
.
Lock
();
client
.
Lock
();
c
:=
client
.
pending
[
seq
];
c
:=
client
.
pending
[
seq
];
client
.
pending
[
seq
]
=
c
,
false
;
client
.
pending
[
seq
]
=
c
,
false
;
client
.
Unlock
();
client
.
Unlock
();
client
.
dec
.
Decode
(
c
.
Reply
);
err
=
client
.
dec
.
Decode
(
c
.
Reply
);
if
err
!=
nil
{
panicln
(
"client decode error:"
,
err
)
}
c
.
Error
=
os
.
ErrorString
(
response
.
Error
);
c
.
Error
=
os
.
ErrorString
(
response
.
Error
);
// We don't want to block here
, i
t is the caller's responsibility to make
// We don't want to block here
. I
t is the caller's responsibility to make
// sure the channel has enough buffer space. See comment in
Start
().
// sure the channel has enough buffer space. See comment in
Go
().
doNotBlock
:=
c
.
Done
<-
c
;
doNotBlock
:=
c
.
Done
<-
c
;
}
}
client
.
closed
=
true
;
log
.
Stderr
(
"client protocol error:"
,
err
);
}
}
// NewClient returns a new Client to handle requests to the
// NewClient returns a new Client to handle requests to the
...
@@ -86,9 +94,9 @@ func NewClient(conn io.ReadWriteCloser) *Client {
...
@@ -86,9 +94,9 @@ func NewClient(conn io.ReadWriteCloser) *Client {
return
client
;
return
client
;
}
}
//
Start
invokes the function asynchronously. It returns the Call structure representing
//
Go
invokes the function asynchronously. It returns the Call structure representing
// the invocation.
// the invocation.
func
(
client
*
Client
)
Start
(
serviceMethod
string
,
args
interface
{},
reply
interface
{},
done
chan
*
Call
)
*
Call
{
func
(
client
*
Client
)
Go
(
serviceMethod
string
,
args
interface
{},
reply
interface
{},
done
chan
*
Call
)
*
Call
{
c
:=
new
(
Call
);
c
:=
new
(
Call
);
c
.
ServiceMethod
=
serviceMethod
;
c
.
ServiceMethod
=
serviceMethod
;
c
.
Args
=
args
;
c
.
Args
=
args
;
...
@@ -102,12 +110,20 @@ func (client *Client) Start(serviceMethod string, args interface{}, reply interf
...
@@ -102,12 +110,20 @@ func (client *Client) Start(serviceMethod string, args interface{}, reply interf
// RPCs that will be using that channel.
// RPCs that will be using that channel.
}
}
c
.
Done
=
done
;
c
.
Done
=
done
;
if
client
.
closed
{
c
.
Error
=
os
.
EOF
;
doNotBlock
:=
c
.
Done
<-
c
;
return
c
;
}
client
.
send
(
c
);
client
.
send
(
c
);
return
c
;
return
c
;
}
}
// Call invokes the named function, waits for it to complete, and returns its error status.
// Call invokes the named function, waits for it to complete, and returns its error status.
func
(
client
*
Client
)
Call
(
serviceMethod
string
,
args
interface
{},
reply
interface
{})
os
.
Error
{
func
(
client
*
Client
)
Call
(
serviceMethod
string
,
args
interface
{},
reply
interface
{})
os
.
Error
{
call
:=
<-
client
.
Start
(
serviceMethod
,
args
,
reply
,
nil
)
.
Done
;
if
client
.
closed
{
return
os
.
EOF
}
call
:=
<-
client
.
Go
(
serviceMethod
,
args
,
reply
,
nil
)
.
Done
;
return
call
.
Error
;
return
call
.
Error
;
}
}
src/pkg/rpc/server.go
View file @
b07af158
...
@@ -8,6 +8,7 @@ import (
...
@@ -8,6 +8,7 @@ import (
"gob"
;
"gob"
;
"log"
;
"log"
;
"io"
;
"io"
;
"net"
;
"os"
;
"os"
;
"reflect"
;
"reflect"
;
"strings"
;
"strings"
;
...
@@ -16,6 +17,8 @@ import (
...
@@ -16,6 +17,8 @@ import (
"utf8"
;
"utf8"
;
)
)
import
"fmt"
// TODO DELETE
// Precompute the reflect type for os.Error. Can't use os.Error directly
// Precompute the reflect type for os.Error. Can't use os.Error directly
// because Typeof takes an empty interface value. This is annoying.
// because Typeof takes an empty interface value. This is annoying.
var
unusedError
*
os
.
Error
;
var
unusedError
*
os
.
Error
;
...
@@ -137,31 +140,43 @@ func (server *Server) Add(rcvr interface{}) os.Error {
...
@@ -137,31 +140,43 @@ func (server *Server) Add(rcvr interface{}) os.Error {
return
nil
;
return
nil
;
}
}
// A value to be sent as a placeholder for the response when we receive invalid request.
type
InvalidRequest
struct
{
marker
int
}
var
invalidRequest
=
InvalidRequest
{
1
}
func
_new
(
t
*
reflect
.
PtrType
)
*
reflect
.
PtrValue
{
func
_new
(
t
*
reflect
.
PtrType
)
*
reflect
.
PtrValue
{
v
:=
reflect
.
MakeZero
(
t
)
.
(
*
reflect
.
PtrValue
);
v
:=
reflect
.
MakeZero
(
t
)
.
(
*
reflect
.
PtrValue
);
v
.
PointTo
(
reflect
.
MakeZero
(
t
.
Elem
()));
v
.
PointTo
(
reflect
.
MakeZero
(
t
.
Elem
()));
return
v
;
return
v
;
}
}
func
(
s
*
service
)
call
(
sending
*
sync
.
Mutex
,
function
*
reflect
.
FuncValue
,
req
*
Request
,
argv
,
replyv
reflect
.
Value
,
enc
*
gob
.
Encoder
)
{
func
(
s
*
service
)
sendResponse
(
sending
*
sync
.
Mutex
,
req
*
Request
,
reply
interface
{},
enc
*
gob
.
Encoder
,
errmsg
string
)
{
// Invoke the method, providing a new value for the reply.
returnValues
:=
function
.
Call
([]
reflect
.
Value
{
s
.
rcvr
,
argv
,
replyv
});
// The return value for the method is an os.Error.
err
:=
returnValues
[
0
]
.
Interface
();
resp
:=
new
(
Response
);
resp
:=
new
(
Response
);
if
err
!=
nil
{
resp
.
Error
=
err
.
(
os
.
Error
)
.
String
();
}
// Encode the response header
// Encode the response header
sending
.
Lock
();
sending
.
Lock
();
resp
.
ServiceMethod
=
req
.
ServiceMethod
;
resp
.
ServiceMethod
=
req
.
ServiceMethod
;
resp
.
Error
=
errmsg
;
resp
.
Seq
=
req
.
Seq
;
resp
.
Seq
=
req
.
Seq
;
enc
.
Encode
(
resp
);
enc
.
Encode
(
resp
);
// Encode the reply value.
// Encode the reply value.
enc
.
Encode
(
reply
v
.
Interface
()
);
enc
.
Encode
(
reply
);
sending
.
Unlock
();
sending
.
Unlock
();
}
}
func
(
s
*
service
)
call
(
sending
*
sync
.
Mutex
,
function
*
reflect
.
FuncValue
,
req
*
Request
,
argv
,
replyv
reflect
.
Value
,
enc
*
gob
.
Encoder
)
{
// Invoke the method, providing a new value for the reply.
returnValues
:=
function
.
Call
([]
reflect
.
Value
{
s
.
rcvr
,
argv
,
replyv
});
// The return value for the method is an os.Error.
errInter
:=
returnValues
[
0
]
.
Interface
();
errmsg
:=
""
;
if
errInter
!=
nil
{
errmsg
=
errInter
.
(
os
.
Error
)
.
String
();
}
s
.
sendResponse
(
sending
,
req
,
replyv
.
Interface
(),
enc
,
errmsg
);
}
func
(
server
*
Server
)
serve
(
conn
io
.
ReadWriteCloser
)
{
func
(
server
*
Server
)
serve
(
conn
io
.
ReadWriteCloser
)
{
dec
:=
gob
.
NewDecoder
(
conn
);
dec
:=
gob
.
NewDecoder
(
conn
);
enc
:=
gob
.
NewEncoder
(
conn
);
enc
:=
gob
.
NewEncoder
(
conn
);
...
@@ -171,33 +186,56 @@ func (server *Server) serve(conn io.ReadWriteCloser) {
...
@@ -171,33 +186,56 @@ func (server *Server) serve(conn io.ReadWriteCloser) {
req
:=
new
(
Request
);
req
:=
new
(
Request
);
err
:=
dec
.
Decode
(
req
);
err
:=
dec
.
Decode
(
req
);
if
err
!=
nil
{
if
err
!=
nil
{
panicln
(
"can't handle decode error yet"
,
err
.
String
());
log
.
Stderr
(
"rpc: server cannot decode request:"
,
err
);
break
;
}
}
serviceMethod
:=
strings
.
Split
(
req
.
ServiceMethod
,
"."
,
0
);
serviceMethod
:=
strings
.
Split
(
req
.
ServiceMethod
,
"."
,
0
);
if
len
(
serviceMethod
)
!=
2
{
if
len
(
serviceMethod
)
!=
2
{
panicln
(
"service/Method request ill-formed:"
,
req
.
ServiceMethod
);
log
.
Stderr
(
"rpc: service/Method request ill-formed:"
,
req
.
ServiceMethod
);
break
;
}
}
// Look up the request.
// Look up the request.
service
,
ok
:=
server
.
serviceMap
[
serviceMethod
[
0
]];
service
,
ok
:=
server
.
serviceMap
[
serviceMethod
[
0
]];
if
!
ok
{
if
!
ok
{
panicln
(
"can't find service"
,
serviceMethod
[
0
]);
s
:=
"rpc: can't find service "
+
req
.
ServiceMethod
;
service
.
sendResponse
(
sending
,
req
,
invalidRequest
,
enc
,
s
);
break
;
}
}
mtype
,
ok
:=
service
.
method
[
serviceMethod
[
1
]];
mtype
,
ok
:=
service
.
method
[
serviceMethod
[
1
]];
if
!
ok
{
if
!
ok
{
panicln
(
"can't find method"
,
serviceMethod
[
1
]);
s
:=
"rpc: can't find method "
+
req
.
ServiceMethod
;
service
.
sendResponse
(
sending
,
req
,
invalidRequest
,
enc
,
s
);
break
;
}
}
method
:=
mtype
.
method
;
method
:=
mtype
.
method
;
// Decode the argument value.
// Decode the argument value.
argv
:=
_new
(
mtype
.
argType
);
argv
:=
_new
(
mtype
.
argType
);
replyv
:=
_new
(
mtype
.
replyType
);
err
=
dec
.
Decode
(
argv
.
Interface
());
err
=
dec
.
Decode
(
argv
.
Interface
());
if
err
!=
nil
{
if
err
!=
nil
{
panicln
(
"can't handle payload decode error yet"
,
err
.
String
());
log
.
Stderr
(
"tearing down connection:"
,
err
);
service
.
sendResponse
(
sending
,
req
,
replyv
.
Interface
(),
enc
,
err
.
String
());
break
;
}
}
go
service
.
call
(
sending
,
method
.
Func
,
req
,
argv
,
_new
(
mtype
.
replyType
)
,
enc
);
go
service
.
call
(
sending
,
method
.
Func
,
req
,
argv
,
replyv
,
enc
);
}
}
conn
.
Close
();
}
}
// Serve runs the server on the connection.
// ServeConn runs the server on a single connection. When the connection
func
(
server
*
Server
)
Serve
(
conn
io
.
ReadWriteCloser
)
{
// completes, service terminates.
func
(
server
*
Server
)
ServeConn
(
conn
io
.
ReadWriteCloser
)
{
go
server
.
serve
(
conn
)
go
server
.
serve
(
conn
)
}
}
// Accept accepts connections on the listener and serves requests
// for each incoming connection.
func
(
server
*
Server
)
Accept
(
lis
net
.
Listener
)
{
for
{
conn
,
addr
,
err
:=
lis
.
Accept
();
if
err
!=
nil
{
log
.
Exit
(
"rpc.Serve: accept:"
,
err
.
String
());
// TODO(r): exit?
}
go
server
.
ServeConn
(
conn
);
}
}
src/pkg/rpc/server_test.go
View file @
b07af158
...
@@ -11,8 +11,10 @@ import (
...
@@ -11,8 +11,10 @@ import (
"io"
;
"io"
;
"log"
;
"log"
;
"net"
;
"net"
;
"once"
;
"os"
;
"os"
;
"rpc"
;
"rpc"
;
"strings"
;
"testing"
;
"testing"
;
)
)
...
@@ -53,15 +55,6 @@ func (t *Arith) Error(args *Args, reply *Reply) os.Error {
...
@@ -53,15 +55,6 @@ func (t *Arith) Error(args *Args, reply *Reply) os.Error {
panicln
(
"ERROR"
);
panicln
(
"ERROR"
);
}
}
func
run
(
server
*
Server
,
l
net
.
Listener
)
{
conn
,
addr
,
err
:=
l
.
Accept
();
if
err
!=
nil
{
println
(
"accept:"
,
err
.
String
());
os
.
Exit
(
1
);
}
server
.
Serve
(
conn
);
}
func
startServer
()
{
func
startServer
()
{
server
:=
new
(
Server
);
server
:=
new
(
Server
);
server
.
Add
(
new
(
Arith
));
server
.
Add
(
new
(
Arith
));
...
@@ -72,14 +65,13 @@ func startServer() {
...
@@ -72,14 +65,13 @@ func startServer() {
}
}
serverAddr
=
l
.
Addr
();
serverAddr
=
l
.
Addr
();
log
.
Stderr
(
"Test RPC server listening on "
,
serverAddr
);
log
.
Stderr
(
"Test RPC server listening on "
,
serverAddr
);
// go http.Serve(l, nil);
go
server
.
Accept
(
l
);
go
run
(
server
,
l
);
}
}
func
TestRPC
(
t
*
testing
.
T
)
{
func
TestRPC
(
t
*
testing
.
T
)
{
var
i
int
;
var
i
int
;
startServer
(
);
once
.
Do
(
startServer
);
conn
,
err
:=
net
.
Dial
(
"tcp"
,
""
,
serverAddr
);
conn
,
err
:=
net
.
Dial
(
"tcp"
,
""
,
serverAddr
);
if
err
!=
nil
{
if
err
!=
nil
{
...
@@ -106,9 +98,9 @@ func TestRPC(t *testing.T) {
...
@@ -106,9 +98,9 @@ func TestRPC(t *testing.T) {
// Out of order.
// Out of order.
args
=
&
Args
{
7
,
8
};
args
=
&
Args
{
7
,
8
};
mulReply
:=
new
(
Reply
);
mulReply
:=
new
(
Reply
);
mulCall
:=
client
.
Start
(
"Arith.Mul"
,
args
,
mulReply
,
nil
);
mulCall
:=
client
.
Go
(
"Arith.Mul"
,
args
,
mulReply
,
nil
);
addReply
:=
new
(
Reply
);
addReply
:=
new
(
Reply
);
addCall
:=
client
.
Start
(
"Arith.Add"
,
args
,
addReply
,
nil
);
addCall
:=
client
.
Go
(
"Arith.Add"
,
args
,
addReply
,
nil
);
<-
addCall
.
Done
;
<-
addCall
.
Done
;
if
addReply
.
C
!=
args
.
A
+
args
.
B
{
if
addReply
.
C
!=
args
.
A
+
args
.
B
{
...
@@ -126,6 +118,73 @@ func TestRPC(t *testing.T) {
...
@@ -126,6 +118,73 @@ func TestRPC(t *testing.T) {
err
=
client
.
Call
(
"Arith.Div"
,
args
,
reply
);
err
=
client
.
Call
(
"Arith.Div"
,
args
,
reply
);
// expect an error: zero divide
// expect an error: zero divide
if
err
==
nil
{
if
err
==
nil
{
t
.
Errorf
(
"Div: expected error"
);
t
.
Error
(
"Div: expected error"
);
}
else
if
err
.
String
()
!=
"divide by zero"
{
t
.
Error
(
"Div: expected divide by zero error; got"
,
err
);
}
}
func
TestCheckUnknownService
(
t
*
testing
.
T
)
{
var
i
int
;
once
.
Do
(
startServer
);
conn
,
err
:=
net
.
Dial
(
"tcp"
,
""
,
serverAddr
);
if
err
!=
nil
{
t
.
Fatal
(
"dialing:"
,
err
)
}
client
:=
NewClient
(
conn
);
args
:=
&
Args
{
7
,
8
};
reply
:=
new
(
Reply
);
err
=
client
.
Call
(
"Unknown.Add"
,
args
,
reply
);
if
err
==
nil
{
t
.
Error
(
"expected error calling unknown service"
);
}
else
if
strings
.
Index
(
err
.
String
(),
"service"
)
<
0
{
t
.
Error
(
"expected error about service; got"
,
err
);
}
}
func
TestCheckUnknownMethod
(
t
*
testing
.
T
)
{
var
i
int
;
once
.
Do
(
startServer
);
conn
,
err
:=
net
.
Dial
(
"tcp"
,
""
,
serverAddr
);
if
err
!=
nil
{
t
.
Fatal
(
"dialing:"
,
err
)
}
client
:=
NewClient
(
conn
);
args
:=
&
Args
{
7
,
8
};
reply
:=
new
(
Reply
);
err
=
client
.
Call
(
"Arith.Unknown"
,
args
,
reply
);
if
err
==
nil
{
t
.
Error
(
"expected error calling unknown service"
);
}
else
if
strings
.
Index
(
err
.
String
(),
"method"
)
<
0
{
t
.
Error
(
"expected error about method; got"
,
err
);
}
}
func
TestCheckBadType
(
t
*
testing
.
T
)
{
var
i
int
;
once
.
Do
(
startServer
);
conn
,
err
:=
net
.
Dial
(
"tcp"
,
""
,
serverAddr
);
if
err
!=
nil
{
t
.
Fatal
(
"dialing:"
,
err
)
}
client
:=
NewClient
(
conn
);
reply
:=
new
(
Reply
);
err
=
client
.
Call
(
"Arith.Add"
,
reply
,
reply
);
// args, reply would be the correct thing to use
if
err
==
nil
{
t
.
Error
(
"expected error calling Arith.Add with wrong arg type"
);
}
else
if
strings
.
Index
(
err
.
String
(),
"type"
)
<
0
{
t
.
Error
(
"expected error about type; got"
,
err
);
}
}
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment