Commit 2a5d30fb authored by Russ Cox's avatar Russ Cox

io: revised Pipe implementation

* renamed channels to say what gets sent
* use channel closed status instead of racy check of boolean

R=nigeltao_golang
CC=golang-dev
https://golang.org/cl/196065
parent 5db5f68d
...@@ -2,8 +2,8 @@ ...@@ -2,8 +2,8 @@
// Use of this source code is governed by a BSD-style // Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file. // license that can be found in the LICENSE file.
// Pipe adapter to connect code expecting an io.Read // Pipe adapter to connect code expecting an io.Reader
// with code expecting an io.Write. // with code expecting an io.Writer.
package io package io
...@@ -12,11 +12,6 @@ import ( ...@@ -12,11 +12,6 @@ import (
"sync" "sync"
) )
type pipeReturn struct {
n int
err os.Error
}
// Shared pipe structure. // Shared pipe structure.
type pipe struct { type pipe struct {
rclosed bool // Read end closed? rclosed bool // Read end closed?
...@@ -25,41 +20,35 @@ type pipe struct { ...@@ -25,41 +20,35 @@ type pipe struct {
werr os.Error // Error supplied to CloseWriter werr os.Error // Error supplied to CloseWriter
wpend []byte // Written data waiting to be read. wpend []byte // Written data waiting to be read.
wtot int // Bytes consumed so far in current write. wtot int // Bytes consumed so far in current write.
cr chan []byte // Write sends data here... cw chan []byte // Write sends data here...
cw chan pipeReturn // ... and reads the n, err back from here. cr chan bool // ... and reads a done notification from here.
} }
func (p *pipe) Read(data []byte) (n int, err os.Error) { func (p *pipe) Read(data []byte) (n int, err os.Error) {
if p == nil || p.rclosed { if p.rclosed {
return 0, os.EINVAL return 0, os.EINVAL
} }
// Wait for next write block if necessary. // Wait for next write block if necessary.
if p.wpend == nil { if p.wpend == nil {
if !p.wclosed { if !closed(p.cw) {
p.wpend = <-p.cr p.wpend = <-p.cw
} }
if p.wclosed { if closed(p.cw) {
return 0, p.werr return 0, p.werr
} }
p.wtot = 0 p.wtot = 0
} }
// Read from current write block. // Read from current write block.
n = len(data) n = copy(data, p.wpend)
if n > len(p.wpend) {
n = len(p.wpend)
}
for i := 0; i < n; i++ {
data[i] = p.wpend[i]
}
p.wtot += n p.wtot += n
p.wpend = p.wpend[n:] p.wpend = p.wpend[n:]
// If write block is done, finish the write. // If write block is done, finish the write.
if len(p.wpend) == 0 { if len(p.wpend) == 0 {
p.wpend = nil p.wpend = nil
p.cw <- pipeReturn{p.wtot, nil} p.cr <- true
p.wtot = 0 p.wtot = 0
} }
...@@ -67,58 +56,52 @@ func (p *pipe) Read(data []byte) (n int, err os.Error) { ...@@ -67,58 +56,52 @@ func (p *pipe) Read(data []byte) (n int, err os.Error) {
} }
func (p *pipe) Write(data []byte) (n int, err os.Error) { func (p *pipe) Write(data []byte) (n int, err os.Error) {
if p == nil || p.wclosed { if p.wclosed {
return 0, os.EINVAL return 0, os.EINVAL
} }
if p.rclosed { if closed(p.cr) {
return 0, p.rerr return 0, p.rerr
} }
// Send data to reader. // Send write to reader.
p.cr <- data p.cw <- data
// Wait for reader to finish copying it. // Wait for reader to finish copying it.
res := <-p.cw <-p.cr
return res.n, res.err if closed(p.cr) {
_, _ = <-p.cw // undo send if reader is gone
return 0, p.rerr
}
return len(data), nil
} }
func (p *pipe) CloseReader(rerr os.Error) os.Error { func (p *pipe) CloseReader(rerr os.Error) os.Error {
if p == nil || p.rclosed { if p.rclosed {
return os.EINVAL return os.EINVAL
} }
// Stop any future writes.
p.rclosed = true p.rclosed = true
// Wake up writes.
if rerr == nil { if rerr == nil {
rerr = os.EPIPE rerr = os.EPIPE
} }
p.rerr = rerr p.rerr = rerr
close(p.cr)
// Stop the current write.
if !p.wclosed {
p.cw <- pipeReturn{p.wtot, rerr}
}
return nil return nil
} }
func (p *pipe) CloseWriter(werr os.Error) os.Error { func (p *pipe) CloseWriter(werr os.Error) os.Error {
if werr == nil { if p.wclosed {
werr = os.EOF
}
if p == nil || p.wclosed {
return os.EINVAL return os.EINVAL
} }
// Stop any future reads.
p.wclosed = true p.wclosed = true
p.werr = werr
// Stop the current read. // Wake up reads.
if !p.rclosed { if werr == nil {
p.cr <- nil werr = os.EOF
} }
p.werr = werr
close(p.cw)
return nil return nil
} }
...@@ -212,12 +195,9 @@ func (w *PipeWriter) finish() { w.Close() } ...@@ -212,12 +195,9 @@ func (w *PipeWriter) finish() { w.Close() }
// Reads on one end are matched with writes on the other, // Reads on one end are matched with writes on the other,
// copying data directly between the two; there is no internal buffering. // copying data directly between the two; there is no internal buffering.
func Pipe() (*PipeReader, *PipeWriter) { func Pipe() (*PipeReader, *PipeWriter) {
p := new(pipe) p := &pipe{
p.cr = make(chan []byte, 1) cw: make(chan []byte, 1),
p.cw = make(chan pipeReturn, 1) cr: make(chan bool, 1),
r := new(PipeReader) }
r.p = p return &PipeReader{p: p}, &PipeWriter{p: p}
w := new(PipeWriter)
w.p = p
return r, w
} }
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment