Commit 07acc02a authored by Russ Cox's avatar Russ Cox

compress/flate: do not use background goroutines

Programs expect that Read and Write are synchronous.
The background goroutines make the implementation
a little easier, but they introduce asynchrony that
trips up calling code.  Remove them.

R=golang-dev, krasin
CC=golang-dev
https://golang.org/cl/4548079
parent 422abf3b
This diff is collapsed.
...@@ -57,7 +57,7 @@ var deflateInflateTests = []*deflateInflateTest{ ...@@ -57,7 +57,7 @@ var deflateInflateTests = []*deflateInflateTest{
&deflateInflateTest{[]byte{0x11, 0x12}}, &deflateInflateTest{[]byte{0x11, 0x12}},
&deflateInflateTest{[]byte{0x11, 0x11, 0x11, 0x11, 0x11, 0x11, 0x11, 0x11}}, &deflateInflateTest{[]byte{0x11, 0x11, 0x11, 0x11, 0x11, 0x11, 0x11, 0x11}},
&deflateInflateTest{[]byte{0x11, 0x10, 0x13, 0x41, 0x21, 0x21, 0x41, 0x13, 0x87, 0x78, 0x13}}, &deflateInflateTest{[]byte{0x11, 0x10, 0x13, 0x41, 0x21, 0x21, 0x41, 0x13, 0x87, 0x78, 0x13}},
&deflateInflateTest{getLargeDataChunk()}, &deflateInflateTest{largeDataChunk()},
} }
var reverseBitsTests = []*reverseBitsTest{ var reverseBitsTests = []*reverseBitsTest{
...@@ -71,23 +71,22 @@ var reverseBitsTests = []*reverseBitsTest{ ...@@ -71,23 +71,22 @@ var reverseBitsTests = []*reverseBitsTest{
&reverseBitsTest{29, 5, 23}, &reverseBitsTest{29, 5, 23},
} }
func getLargeDataChunk() []byte { func largeDataChunk() []byte {
result := make([]byte, 100000) result := make([]byte, 100000)
for i := range result { for i := range result {
result[i] = byte(int64(i) * int64(i) & 0xFF) result[i] = byte(i * i & 0xFF)
} }
return result return result
} }
func TestDeflate(t *testing.T) { func TestDeflate(t *testing.T) {
for _, h := range deflateTests { for _, h := range deflateTests {
buffer := bytes.NewBuffer(nil) var buf bytes.Buffer
w := NewWriter(buffer, h.level) w := NewWriter(&buf, h.level)
w.Write(h.in) w.Write(h.in)
w.Close() w.Close()
if bytes.Compare(buffer.Bytes(), h.out) != 0 { if !bytes.Equal(buf.Bytes(), h.out) {
t.Errorf("buffer is wrong; level = %v, buffer.Bytes() = %v, expected output = %v", t.Errorf("Deflate(%d, %x) = %x, want %x", h.level, h.in, buf.Bytes(), h.out)
h.level, buffer.Bytes(), h.out)
} }
} }
} }
......
...@@ -195,9 +195,8 @@ type Reader interface { ...@@ -195,9 +195,8 @@ type Reader interface {
// Decompress state. // Decompress state.
type decompressor struct { type decompressor struct {
// Input/output sources. // Input source.
r Reader r Reader
w io.Writer
roffset int64 roffset int64
woffset int64 woffset int64
...@@ -220,38 +219,79 @@ type decompressor struct { ...@@ -220,38 +219,79 @@ type decompressor struct {
// Temporary buffer (avoids repeated allocation). // Temporary buffer (avoids repeated allocation).
buf [4]byte buf [4]byte
// Next step in the decompression,
// and decompression state.
step func(*decompressor)
final bool
err os.Error
toRead []byte
hl, hd *huffmanDecoder
copyLen int
copyDist int
} }
func (f *decompressor) inflate() (err os.Error) { func (f *decompressor) nextBlock() {
final := false if f.final {
for err == nil && !final { if f.hw != f.hp {
for f.nb < 1+2 { f.flush((*decompressor).nextBlock)
if err = f.moreBits(); err != nil { return
return
}
} }
final = f.b&1 == 1 f.err = os.EOF
f.b >>= 1 return
typ := f.b & 3 }
f.b >>= 2 for f.nb < 1+2 {
f.nb -= 1 + 2 if f.err = f.moreBits(); f.err != nil {
switch typ { return
case 0: }
err = f.dataBlock() }
case 1: f.final = f.b&1 == 1
// compressed, fixed Huffman tables f.b >>= 1
err = f.decodeBlock(&fixedHuffmanDecoder, nil) typ := f.b & 3
case 2: f.b >>= 2
// compressed, dynamic Huffman tables f.nb -= 1 + 2
if err = f.readHuffman(); err == nil { switch typ {
err = f.decodeBlock(&f.h1, &f.h2) case 0:
} f.dataBlock()
default: case 1:
// 3 is reserved. // compressed, fixed Huffman tables
err = CorruptInputError(f.roffset) f.hl = &fixedHuffmanDecoder
f.hd = nil
f.huffmanBlock()
case 2:
// compressed, dynamic Huffman tables
if f.err = f.readHuffman(); f.err != nil {
break
} }
f.hl = &f.h1
f.hd = &f.h2
f.huffmanBlock()
default:
// 3 is reserved.
f.err = CorruptInputError(f.roffset)
} }
return }
func (f *decompressor) Read(b []byte) (int, os.Error) {
for {
if len(f.toRead) > 0 {
n := copy(b, f.toRead)
f.toRead = f.toRead[n:]
return n, nil
}
if f.err != nil {
return 0, f.err
}
f.step(f)
}
panic("unreachable")
}
func (f *decompressor) Close() os.Error {
if f.err == os.EOF {
return nil
}
return f.err
} }
// RFC 1951 section 3.2.7. // RFC 1951 section 3.2.7.
...@@ -356,11 +396,12 @@ func (f *decompressor) readHuffman() os.Error { ...@@ -356,11 +396,12 @@ func (f *decompressor) readHuffman() os.Error {
// hl and hd are the Huffman states for the lit/length values // hl and hd are the Huffman states for the lit/length values
// and the distance values, respectively. If hd == nil, using the // and the distance values, respectively. If hd == nil, using the
// fixed distance encoding associated with fixed Huffman blocks. // fixed distance encoding associated with fixed Huffman blocks.
func (f *decompressor) decodeBlock(hl, hd *huffmanDecoder) os.Error { func (f *decompressor) huffmanBlock() {
for { for {
v, err := f.huffSym(hl) v, err := f.huffSym(f.hl)
if err != nil { if err != nil {
return err f.err = err
return
} }
var n uint // number of bits extra var n uint // number of bits extra
var length int var length int
...@@ -369,13 +410,15 @@ func (f *decompressor) decodeBlock(hl, hd *huffmanDecoder) os.Error { ...@@ -369,13 +410,15 @@ func (f *decompressor) decodeBlock(hl, hd *huffmanDecoder) os.Error {
f.hist[f.hp] = byte(v) f.hist[f.hp] = byte(v)
f.hp++ f.hp++
if f.hp == len(f.hist) { if f.hp == len(f.hist) {
if err = f.flush(); err != nil { // After the flush, continue this loop.
return err f.flush((*decompressor).huffmanBlock)
} return
} }
continue continue
case v == 256: case v == 256:
return nil // Done with huffman block; read next block.
f.step = (*decompressor).nextBlock
return
// otherwise, reference to older data // otherwise, reference to older data
case v < 265: case v < 265:
length = v - (257 - 3) length = v - (257 - 3)
...@@ -402,7 +445,8 @@ func (f *decompressor) decodeBlock(hl, hd *huffmanDecoder) os.Error { ...@@ -402,7 +445,8 @@ func (f *decompressor) decodeBlock(hl, hd *huffmanDecoder) os.Error {
if n > 0 { if n > 0 {
for f.nb < n { for f.nb < n {
if err = f.moreBits(); err != nil { if err = f.moreBits(); err != nil {
return err f.err = err
return
} }
} }
length += int(f.b & uint32(1<<n-1)) length += int(f.b & uint32(1<<n-1))
...@@ -411,18 +455,20 @@ func (f *decompressor) decodeBlock(hl, hd *huffmanDecoder) os.Error { ...@@ -411,18 +455,20 @@ func (f *decompressor) decodeBlock(hl, hd *huffmanDecoder) os.Error {
} }
var dist int var dist int
if hd == nil { if f.hd == nil {
for f.nb < 5 { for f.nb < 5 {
if err = f.moreBits(); err != nil { if err = f.moreBits(); err != nil {
return err f.err = err
return
} }
} }
dist = int(reverseByte[(f.b&0x1F)<<3]) dist = int(reverseByte[(f.b&0x1F)<<3])
f.b >>= 5 f.b >>= 5
f.nb -= 5 f.nb -= 5
} else { } else {
if dist, err = f.huffSym(hd); err != nil { if dist, err = f.huffSym(f.hd); err != nil {
return err f.err = err
return
} }
} }
...@@ -430,14 +476,16 @@ func (f *decompressor) decodeBlock(hl, hd *huffmanDecoder) os.Error { ...@@ -430,14 +476,16 @@ func (f *decompressor) decodeBlock(hl, hd *huffmanDecoder) os.Error {
case dist < 4: case dist < 4:
dist++ dist++
case dist >= 30: case dist >= 30:
return CorruptInputError(f.roffset) f.err = CorruptInputError(f.roffset)
return
default: default:
nb := uint(dist-2) >> 1 nb := uint(dist-2) >> 1
// have 1 bit in bottom of dist, need nb more. // have 1 bit in bottom of dist, need nb more.
extra := (dist & 1) << nb extra := (dist & 1) << nb
for f.nb < nb { for f.nb < nb {
if err = f.moreBits(); err != nil { if err = f.moreBits(); err != nil {
return err f.err = err
return
} }
} }
extra |= int(f.b & uint32(1<<nb-1)) extra |= int(f.b & uint32(1<<nb-1))
...@@ -448,12 +496,14 @@ func (f *decompressor) decodeBlock(hl, hd *huffmanDecoder) os.Error { ...@@ -448,12 +496,14 @@ func (f *decompressor) decodeBlock(hl, hd *huffmanDecoder) os.Error {
// Copy history[-dist:-dist+length] into output. // Copy history[-dist:-dist+length] into output.
if dist > len(f.hist) { if dist > len(f.hist) {
return InternalError("bad history distance") f.err = InternalError("bad history distance")
return
} }
// No check on length; encoding can be prescient. // No check on length; encoding can be prescient.
if !f.hfull && dist > f.hp { if !f.hfull && dist > f.hp {
return CorruptInputError(f.roffset) f.err = CorruptInputError(f.roffset)
return
} }
p := f.hp - dist p := f.hp - dist
...@@ -465,9 +515,11 @@ func (f *decompressor) decodeBlock(hl, hd *huffmanDecoder) os.Error { ...@@ -465,9 +515,11 @@ func (f *decompressor) decodeBlock(hl, hd *huffmanDecoder) os.Error {
f.hp++ f.hp++
p++ p++
if f.hp == len(f.hist) { if f.hp == len(f.hist) {
if err = f.flush(); err != nil { // After flush continue copying out of history.
return err f.copyLen = length - (i + 1)
} f.copyDist = dist
f.flush((*decompressor).copyHuff)
return
} }
if p == len(f.hist) { if p == len(f.hist) {
p = 0 p = 0
...@@ -477,8 +529,33 @@ func (f *decompressor) decodeBlock(hl, hd *huffmanDecoder) os.Error { ...@@ -477,8 +529,33 @@ func (f *decompressor) decodeBlock(hl, hd *huffmanDecoder) os.Error {
panic("unreached") panic("unreached")
} }
func (f *decompressor) copyHuff() {
length := f.copyLen
dist := f.copyDist
p := f.hp - dist
if p < 0 {
p += len(f.hist)
}
for i := 0; i < length; i++ {
f.hist[f.hp] = f.hist[p]
f.hp++
p++
if f.hp == len(f.hist) {
f.copyLen = length - (i + 1)
f.flush((*decompressor).copyHuff)
return
}
if p == len(f.hist) {
p = 0
}
}
// Continue processing Huffman block.
f.huffmanBlock()
}
// Copy a single uncompressed data block from input to output. // Copy a single uncompressed data block from input to output.
func (f *decompressor) dataBlock() os.Error { func (f *decompressor) dataBlock() {
// Uncompressed. // Uncompressed.
// Discard current half-byte. // Discard current half-byte.
f.nb = 0 f.nb = 0
...@@ -488,21 +565,30 @@ func (f *decompressor) dataBlock() os.Error { ...@@ -488,21 +565,30 @@ func (f *decompressor) dataBlock() os.Error {
nr, err := io.ReadFull(f.r, f.buf[0:4]) nr, err := io.ReadFull(f.r, f.buf[0:4])
f.roffset += int64(nr) f.roffset += int64(nr)
if err != nil { if err != nil {
return &ReadError{f.roffset, err} f.err = &ReadError{f.roffset, err}
return
} }
n := int(f.buf[0]) | int(f.buf[1])<<8 n := int(f.buf[0]) | int(f.buf[1])<<8
nn := int(f.buf[2]) | int(f.buf[3])<<8 nn := int(f.buf[2]) | int(f.buf[3])<<8
if uint16(nn) != uint16(^n) { if uint16(nn) != uint16(^n) {
return CorruptInputError(f.roffset) f.err = CorruptInputError(f.roffset)
return
} }
if n == 0 { if n == 0 {
// 0-length block means sync // 0-length block means sync
return f.flush() f.flush((*decompressor).nextBlock)
return
} }
// Read len bytes into history, f.copyLen = n
// writing as history fills. f.copyData()
}
func (f *decompressor) copyData() {
// Read f.dataLen bytes into history,
// pausing for reads as history fills.
n := f.copyLen
for n > 0 { for n > 0 {
m := len(f.hist) - f.hp m := len(f.hist) - f.hp
if m > n { if m > n {
...@@ -511,17 +597,18 @@ func (f *decompressor) dataBlock() os.Error { ...@@ -511,17 +597,18 @@ func (f *decompressor) dataBlock() os.Error {
m, err := io.ReadFull(f.r, f.hist[f.hp:f.hp+m]) m, err := io.ReadFull(f.r, f.hist[f.hp:f.hp+m])
f.roffset += int64(m) f.roffset += int64(m)
if err != nil { if err != nil {
return &ReadError{f.roffset, err} f.err = &ReadError{f.roffset, err}
return
} }
n -= m n -= m
f.hp += m f.hp += m
if f.hp == len(f.hist) { if f.hp == len(f.hist) {
if err = f.flush(); err != nil { f.copyLen = n
return err f.flush((*decompressor).copyData)
} return
} }
} }
return nil f.step = (*decompressor).nextBlock
} }
func (f *decompressor) setDict(dict []byte) { func (f *decompressor) setDict(dict []byte) {
...@@ -577,17 +664,8 @@ func (f *decompressor) huffSym(h *huffmanDecoder) (int, os.Error) { ...@@ -577,17 +664,8 @@ func (f *decompressor) huffSym(h *huffmanDecoder) (int, os.Error) {
} }
// Flush any buffered output to the underlying writer. // Flush any buffered output to the underlying writer.
func (f *decompressor) flush() os.Error { func (f *decompressor) flush(step func(*decompressor)) {
if f.hw == f.hp { f.toRead = f.hist[f.hw:f.hp]
return nil
}
n, err := f.w.Write(f.hist[f.hw:f.hp])
if n != f.hp-f.hw && err == nil {
err = io.ErrShortWrite
}
if err != nil {
return &WriteError{f.woffset, err}
}
f.woffset += int64(f.hp - f.hw) f.woffset += int64(f.hp - f.hw)
f.hw = f.hp f.hw = f.hp
if f.hp == len(f.hist) { if f.hp == len(f.hist) {
...@@ -595,7 +673,7 @@ func (f *decompressor) flush() os.Error { ...@@ -595,7 +673,7 @@ func (f *decompressor) flush() os.Error {
f.hw = 0 f.hw = 0
f.hfull = true f.hfull = true
} }
return nil f.step = step
} }
func makeReader(r io.Reader) Reader { func makeReader(r io.Reader) Reader {
...@@ -605,30 +683,15 @@ func makeReader(r io.Reader) Reader { ...@@ -605,30 +683,15 @@ func makeReader(r io.Reader) Reader {
return bufio.NewReader(r) return bufio.NewReader(r)
} }
// decompress reads DEFLATE-compressed data from r and writes
// the uncompressed data to w.
func (f *decompressor) decompress(r io.Reader, w io.Writer) os.Error {
f.r = makeReader(r)
f.w = w
f.woffset = 0
if err := f.inflate(); err != nil {
return err
}
if err := f.flush(); err != nil {
return err
}
return nil
}
// NewReader returns a new ReadCloser that can be used // NewReader returns a new ReadCloser that can be used
// to read the uncompressed version of r. It is the caller's // to read the uncompressed version of r. It is the caller's
// responsibility to call Close on the ReadCloser when // responsibility to call Close on the ReadCloser when
// finished reading. // finished reading.
func NewReader(r io.Reader) io.ReadCloser { func NewReader(r io.Reader) io.ReadCloser {
var f decompressor var f decompressor
pr, pw := io.Pipe() f.r = makeReader(r)
go func() { pw.CloseWithError(f.decompress(r, pw)) }() f.step = (*decompressor).nextBlock
return pr return &f
} }
// NewReaderDict is like NewReader but initializes the reader // NewReaderDict is like NewReader but initializes the reader
...@@ -639,7 +702,7 @@ func NewReader(r io.Reader) io.ReadCloser { ...@@ -639,7 +702,7 @@ func NewReader(r io.Reader) io.ReadCloser {
func NewReaderDict(r io.Reader, dict []byte) io.ReadCloser { func NewReaderDict(r io.Reader, dict []byte) io.ReadCloser {
var f decompressor var f decompressor
f.setDict(dict) f.setDict(dict)
pr, pw := io.Pipe() f.r = makeReader(r)
go func() { pw.CloseWithError(f.decompress(r, pw)) }() f.step = (*decompressor).nextBlock
return pr return &f
} }
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment