Commit feb8d0b2 authored by Rob Pike's avatar Rob Pike

netchan: make fields public for pending gob change

R=rsc
CC=golang-dev
https://golang.org/cl/3882042
parent fee3aca2
...@@ -43,22 +43,22 @@ const ( ...@@ -43,22 +43,22 @@ const (
// A header is sent as a prefix to every transmission. It will be followed by // A header is sent as a prefix to every transmission. It will be followed by
// a request structure, an error structure, or an arbitrary user payload structure. // a request structure, an error structure, or an arbitrary user payload structure.
type header struct { type header struct {
name string Name string
payloadType int PayloadType int
seqNum int64 SeqNum int64
} }
// Sent with a header once per channel from importer to exporter to report // Sent with a header once per channel from importer to exporter to report
// that it wants to bind to a channel with the specified direction for count // that it wants to bind to a channel with the specified direction for count
// messages. If count is -1, it means unlimited. // messages. If count is -1, it means unlimited.
type request struct { type request struct {
count int64 Count int64
dir Dir Dir Dir
} }
// Sent with a header to report an error. // Sent with a header to report an error.
type error struct { type error struct {
error string Error string
} }
// Used to unify management of acknowledgements for import and export. // Used to unify management of acknowledgements for import and export.
...@@ -111,7 +111,7 @@ func (ed *encDec) decode(value reflect.Value) os.Error { ...@@ -111,7 +111,7 @@ func (ed *encDec) decode(value reflect.Value) os.Error {
// Encode a header and payload onto the connection. // Encode a header and payload onto the connection.
func (ed *encDec) encode(hdr *header, payloadType int, payload interface{}) os.Error { func (ed *encDec) encode(hdr *header, payloadType int, payload interface{}) os.Error {
ed.encLock.Lock() ed.encLock.Lock()
hdr.payloadType = payloadType hdr.PayloadType = payloadType
err := ed.enc.Encode(hdr) err := ed.enc.Encode(hdr)
if err == nil { if err == nil {
if payload != nil { if payload != nil {
......
...@@ -67,7 +67,7 @@ func newClient(exp *Exporter, conn net.Conn) *expClient { ...@@ -67,7 +67,7 @@ func newClient(exp *Exporter, conn net.Conn) *expClient {
func (client *expClient) sendError(hdr *header, err string) { func (client *expClient) sendError(hdr *header, err string) {
error := &error{err} error := &error{err}
expLog("sending error to client:", error.error) expLog("sending error to client:", error.Error)
client.encode(hdr, payError, error) // ignore any encode error, hope client gets it client.encode(hdr, payError, error) // ignore any encode error, hope client gets it
client.mu.Lock() client.mu.Lock()
client.errored = true client.errored = true
...@@ -77,14 +77,14 @@ func (client *expClient) sendError(hdr *header, err string) { ...@@ -77,14 +77,14 @@ func (client *expClient) sendError(hdr *header, err string) {
func (client *expClient) getChan(hdr *header, dir Dir) *chanDir { func (client *expClient) getChan(hdr *header, dir Dir) *chanDir {
exp := client.exp exp := client.exp
exp.mu.Lock() exp.mu.Lock()
ech, ok := exp.chans[hdr.name] ech, ok := exp.chans[hdr.Name]
exp.mu.Unlock() exp.mu.Unlock()
if !ok { if !ok {
client.sendError(hdr, "no such channel: "+hdr.name) client.sendError(hdr, "no such channel: "+hdr.Name)
return nil return nil
} }
if ech.dir != dir { if ech.dir != dir {
client.sendError(hdr, "wrong direction for channel: "+hdr.name) client.sendError(hdr, "wrong direction for channel: "+hdr.Name)
return nil return nil
} }
return ech return ech
...@@ -106,24 +106,24 @@ func (client *expClient) run() { ...@@ -106,24 +106,24 @@ func (client *expClient) run() {
expLog("error decoding client header:", err) expLog("error decoding client header:", err)
break break
} }
switch hdr.payloadType { switch hdr.PayloadType {
case payRequest: case payRequest:
*req = request{} *req = request{}
if err := client.decode(reqValue); err != nil { if err := client.decode(reqValue); err != nil {
expLog("error decoding client request:", err) expLog("error decoding client request:", err)
break break
} }
switch req.dir { switch req.Dir {
case Recv: case Recv:
go client.serveRecv(*hdr, req.count) go client.serveRecv(*hdr, req.Count)
case Send: case Send:
// Request to send is clear as a matter of protocol // Request to send is clear as a matter of protocol
// but not actually used by the implementation. // but not actually used by the implementation.
// The actual sends will have payload type payData. // The actual sends will have payload type payData.
// TODO: manage the count? // TODO: manage the count?
default: default:
error.error = "request: can't handle channel direction" error.Error = "request: can't handle channel direction"
expLog(error.error, req.dir) expLog(error.Error, req.Dir)
client.encode(hdr, payError, error) client.encode(hdr, payError, error)
} }
case payData: case payData:
...@@ -132,19 +132,19 @@ func (client *expClient) run() { ...@@ -132,19 +132,19 @@ func (client *expClient) run() {
client.serveClosed(*hdr) client.serveClosed(*hdr)
case payAck: case payAck:
client.mu.Lock() client.mu.Lock()
if client.ackNum != hdr.seqNum-1 { if client.ackNum != hdr.SeqNum-1 {
// Since the sequence number is incremented and the message is sent // Since the sequence number is incremented and the message is sent
// in a single instance of locking client.mu, the messages are guaranteed // in a single instance of locking client.mu, the messages are guaranteed
// to be sent in order. Therefore receipt of acknowledgement N means // to be sent in order. Therefore receipt of acknowledgement N means
// all messages <=N have been seen by the recipient. We check anyway. // all messages <=N have been seen by the recipient. We check anyway.
expLog("sequence out of order:", client.ackNum, hdr.seqNum) expLog("sequence out of order:", client.ackNum, hdr.SeqNum)
} }
if client.ackNum < hdr.seqNum { // If there has been an error, don't back up the count. if client.ackNum < hdr.SeqNum { // If there has been an error, don't back up the count.
client.ackNum = hdr.seqNum client.ackNum = hdr.SeqNum
} }
client.mu.Unlock() client.mu.Unlock()
default: default:
log.Exit("netchan export: unknown payload type", hdr.payloadType) log.Exit("netchan export: unknown payload type", hdr.PayloadType)
} }
} }
client.exp.delClient(client) client.exp.delClient(client)
...@@ -171,7 +171,7 @@ func (client *expClient) serveRecv(hdr header, count int64) { ...@@ -171,7 +171,7 @@ func (client *expClient) serveRecv(hdr header, count int64) {
// number, not one beyond. // number, not one beyond.
client.mu.Lock() client.mu.Lock()
client.seqNum++ client.seqNum++
hdr.seqNum = client.seqNum hdr.SeqNum = client.seqNum
client.seqLock.Lock() // guarantee ordering of messages client.seqLock.Lock() // guarantee ordering of messages
client.mu.Unlock() client.mu.Unlock()
err := client.encode(&hdr, payData, val.Interface()) err := client.encode(&hdr, payData, val.Interface())
......
...@@ -78,7 +78,7 @@ func (imp *Importer) run() { ...@@ -78,7 +78,7 @@ func (imp *Importer) run() {
imp.shutdown() imp.shutdown()
return return
} }
switch hdr.payloadType { switch hdr.PayloadType {
case payData: case payData:
// done lower in loop // done lower in loop
case payError: case payError:
...@@ -86,25 +86,25 @@ func (imp *Importer) run() { ...@@ -86,25 +86,25 @@ func (imp *Importer) run() {
impLog("error:", e) impLog("error:", e)
return return
} }
if err.error != "" { if err.Error != "" {
impLog("response error:", err.error) impLog("response error:", err.Error)
if sent := imp.errors <- os.ErrorString(err.error); !sent { if sent := imp.errors <- os.ErrorString(err.Error); !sent {
imp.shutdown() imp.shutdown()
return return
} }
continue // errors are not acknowledged. continue // errors are not acknowledged.
} }
case payClosed: case payClosed:
ich := imp.getChan(hdr.name) ich := imp.getChan(hdr.Name)
if ich != nil { if ich != nil {
ich.ch.Close() ich.ch.Close()
} }
continue // closes are not acknowledged. continue // closes are not acknowledged.
default: default:
impLog("unexpected payload type:", hdr.payloadType) impLog("unexpected payload type:", hdr.PayloadType)
return return
} }
ich := imp.getChan(hdr.name) ich := imp.getChan(hdr.Name)
if ich == nil { if ich == nil {
continue continue
} }
...@@ -113,8 +113,8 @@ func (imp *Importer) run() { ...@@ -113,8 +113,8 @@ func (imp *Importer) run() {
return return
} }
// Acknowledge receipt // Acknowledge receipt
ackHdr.name = hdr.name ackHdr.Name = hdr.Name
ackHdr.seqNum = hdr.seqNum ackHdr.SeqNum = hdr.SeqNum
imp.encode(ackHdr, payAck, nil) imp.encode(ackHdr, payAck, nil)
// Create a new value for each received item. // Create a new value for each received item.
value := reflect.MakeZero(ich.ch.Type().(*reflect.ChanType).Elem()) value := reflect.MakeZero(ich.ch.Type().(*reflect.ChanType).Elem())
...@@ -178,8 +178,8 @@ func (imp *Importer) ImportNValues(name string, chT interface{}, dir Dir, n int) ...@@ -178,8 +178,8 @@ func (imp *Importer) ImportNValues(name string, chT interface{}, dir Dir, n int)
} }
imp.chans[name] = &chanDir{ch, dir} imp.chans[name] = &chanDir{ch, dir}
// Tell the other side about this channel. // Tell the other side about this channel.
hdr := &header{name: name} hdr := &header{Name: name}
req := &request{count: int64(n), dir: dir} req := &request{Count: int64(n), Dir: dir}
if err = imp.encode(hdr, payRequest, req); err != nil { if err = imp.encode(hdr, payRequest, req); err != nil {
impLog("request encode:", err) impLog("request encode:", err)
return err return err
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment