Commit 193d09a6 authored by James Robinson's avatar James Robinson Committed by Nigel Tao

compress/flate: add Reset() to allow reusing large buffers to compress multiple buffers

This adds a Reset() to compress/flate's decompressor and plumbs that through
to compress/zlib and compress/gzip's Readers so callers can avoid large
allocations when performing many inflate operations. In particular this
preserves the allocation of the decompressor.hist buffer, which is 32kb and
overwritten as needed while inflating.

On the benchmark described in issue 6317, produces the following speedup on
my 2.3ghz Intel Core i7 MBP with go version devel +6b696a34e0af Sun Aug 03
15:14:59 2014 -0700 darwin/amd64:

blocked.text w/out patch vs blocked.text w/ patch:
benchmark           old ns/op      new ns/op      delta
BenchmarkGunzip     8371577533     7927917687     -5.30%

benchmark           old allocs     new allocs     delta
BenchmarkGunzip     176818         148519         -16.00%

benchmark           old bytes     new bytes     delta
BenchmarkGunzip     292184936     12739528      -95.64%

flat.text vs blocked.text w/patch:
benchmark           old ns/op      new ns/op      delta
BenchmarkGunzip     7939447827     7927917687     -0.15%

benchmark           old allocs     new allocs     delta
BenchmarkGunzip     90702          148519         +63.74%

benchmark           old bytes     new bytes     delta
BenchmarkGunzip     9959528       12739528      +27.91%

Similar speedups to those bradfitz saw in  https://golang.org/cl/13416045.

Fixes #6317.
Fixes #7950.

LGTM=nigeltao
R=golang-codereviews, bradfitz, dan.kortschak, adg, nigeltao, jamesr
CC=golang-codereviews
https://golang.org/cl/97140043
parent 18c7fbdf
...@@ -56,6 +56,15 @@ func (e *WriteError) Error() string { ...@@ -56,6 +56,15 @@ func (e *WriteError) Error() string {
return "flate: write error at offset " + strconv.FormatInt(e.Offset, 10) + ": " + e.Err.Error() return "flate: write error at offset " + strconv.FormatInt(e.Offset, 10) + ": " + e.Err.Error()
} }
// Resetter resets a ReadCloser returned by NewReader or NewReaderDict to
// to switch to a new underlying Reader. This permits reusing a ReadCloser
// instead of allocating a new one.
type Resetter interface {
// Reset discards any buffered data and resets the Resetter as if it was
// newly initialized with the given reader.
Reset(r io.Reader, dict []byte) error
}
// Note that much of the implementation of huffmanDecoder is also copied // Note that much of the implementation of huffmanDecoder is also copied
// into gen.go (in package main) for the purpose of precomputing the // into gen.go (in package main) for the purpose of precomputing the
// fixed huffman tables so they can be included statically. // fixed huffman tables so they can be included statically.
...@@ -679,12 +688,28 @@ func makeReader(r io.Reader) Reader { ...@@ -679,12 +688,28 @@ func makeReader(r io.Reader) Reader {
return bufio.NewReader(r) return bufio.NewReader(r)
} }
func (f *decompressor) Reset(r io.Reader, dict []byte) error {
*f = decompressor{
r: makeReader(r),
bits: f.bits,
codebits: f.codebits,
hist: f.hist,
step: (*decompressor).nextBlock,
}
if dict != nil {
f.setDict(dict)
}
return nil
}
// NewReader returns a new ReadCloser that can be used // NewReader returns a new ReadCloser that can be used
// to read the uncompressed version of r. // to read the uncompressed version of r.
// If r does not also implement io.ByteReader, // If r does not also implement io.ByteReader,
// the decompressor may read more data than necessary from r. // the decompressor may read more data than necessary from r.
// It is the caller's responsibility to call Close on the ReadCloser // It is the caller's responsibility to call Close on the ReadCloser
// when finished reading. // when finished reading.
//
// The ReadCloser returned by NewReader also implements Resetter.
func NewReader(r io.Reader) io.ReadCloser { func NewReader(r io.Reader) io.ReadCloser {
var f decompressor var f decompressor
f.bits = new([maxLit + maxDist]int) f.bits = new([maxLit + maxDist]int)
...@@ -700,6 +725,8 @@ func NewReader(r io.Reader) io.ReadCloser { ...@@ -700,6 +725,8 @@ func NewReader(r io.Reader) io.ReadCloser {
// the uncompressed data stream started with the given dictionary, // the uncompressed data stream started with the given dictionary,
// which has already been read. NewReaderDict is typically used // which has already been read. NewReaderDict is typically used
// to read data compressed by NewWriterDict. // to read data compressed by NewWriterDict.
//
// The ReadCloser returned by NewReader also implements Resetter.
func NewReaderDict(r io.Reader, dict []byte) io.ReadCloser { func NewReaderDict(r io.Reader, dict []byte) io.ReadCloser {
var f decompressor var f decompressor
f.r = makeReader(r) f.r = makeReader(r)
......
// Copyright 2014 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package flate
import (
"bytes"
"io"
"testing"
)
func TestReset(t *testing.T) {
ss := []string{
"lorem ipsum izzle fo rizzle",
"the quick brown fox jumped over",
}
deflated := make([]bytes.Buffer, 2)
for i, s := range ss {
w, _ := NewWriter(&deflated[i], 1)
w.Write([]byte(s))
w.Close()
}
inflated := make([]bytes.Buffer, 2)
f := NewReader(&deflated[0])
io.Copy(&inflated[0], f)
f.(Resetter).Reset(&deflated[1], nil)
io.Copy(&inflated[1], f)
f.Close()
for i, s := range ss {
if s != inflated[i].String() {
t.Errorf("inflated[%d]:\ngot %q\nwant %q", i, inflated[i], s)
}
}
}
...@@ -208,7 +208,11 @@ func (z *Reader) readHeader(save bool) error { ...@@ -208,7 +208,11 @@ func (z *Reader) readHeader(save bool) error {
} }
z.digest.Reset() z.digest.Reset()
z.decompressor = flate.NewReader(z.r) if z.decompressor == nil {
z.decompressor = flate.NewReader(z.r)
} else {
z.decompressor.(flate.Resetter).Reset(z.r, nil)
}
return nil return nil
} }
......
...@@ -51,10 +51,21 @@ type reader struct { ...@@ -51,10 +51,21 @@ type reader struct {
scratch [4]byte scratch [4]byte
} }
// NewReader creates a new io.ReadCloser. // Resetter resets a ReadCloser returned by NewReader or NewReaderDict to
// Reads from the returned io.ReadCloser read and decompress data from r. // to switch to a new underlying Reader. This permits reusing a ReadCloser
// instead of allocating a new one.
type Resetter interface {
// Reset discards any buffered data and resets the Resetter as if it was
// newly initialized with the given reader.
Reset(r io.Reader, dict []byte) error
}
// NewReader creates a new ReadCloser.
// Reads from the returned ReadCloser read and decompress data from r.
// The implementation buffers input and may read more data than necessary from r. // The implementation buffers input and may read more data than necessary from r.
// It is the caller's responsibility to call Close on the ReadCloser when done. // It is the caller's responsibility to call Close on the ReadCloser when done.
//
// The ReadCloser returned by NewReader also implements Resetter.
func NewReader(r io.Reader) (io.ReadCloser, error) { func NewReader(r io.Reader) (io.ReadCloser, error) {
return NewReaderDict(r, nil) return NewReaderDict(r, nil)
} }
...@@ -62,35 +73,14 @@ func NewReader(r io.Reader) (io.ReadCloser, error) { ...@@ -62,35 +73,14 @@ func NewReader(r io.Reader) (io.ReadCloser, error) {
// NewReaderDict is like NewReader but uses a preset dictionary. // NewReaderDict is like NewReader but uses a preset dictionary.
// NewReaderDict ignores the dictionary if the compressed data does not refer to it. // NewReaderDict ignores the dictionary if the compressed data does not refer to it.
// If the compressed data refers to a different dictionary, NewReaderDict returns ErrDictionary. // If the compressed data refers to a different dictionary, NewReaderDict returns ErrDictionary.
//
// The ReadCloser returned by NewReaderDict also implements Resetter.
func NewReaderDict(r io.Reader, dict []byte) (io.ReadCloser, error) { func NewReaderDict(r io.Reader, dict []byte) (io.ReadCloser, error) {
z := new(reader) z := new(reader)
if fr, ok := r.(flate.Reader); ok { err := z.Reset(r, dict)
z.r = fr
} else {
z.r = bufio.NewReader(r)
}
_, err := io.ReadFull(z.r, z.scratch[0:2])
if err != nil { if err != nil {
return nil, err return nil, err
} }
h := uint(z.scratch[0])<<8 | uint(z.scratch[1])
if (z.scratch[0]&0x0f != zlibDeflate) || (h%31 != 0) {
return nil, ErrHeader
}
if z.scratch[1]&0x20 != 0 {
_, err = io.ReadFull(z.r, z.scratch[0:4])
if err != nil {
return nil, err
}
checksum := uint32(z.scratch[0])<<24 | uint32(z.scratch[1])<<16 | uint32(z.scratch[2])<<8 | uint32(z.scratch[3])
if checksum != adler32.Checksum(dict) {
return nil, ErrDictionary
}
z.decompressor = flate.NewReaderDict(z.r, dict)
} else {
z.decompressor = flate.NewReader(z.r)
}
z.digest = adler32.New()
return z, nil return z, nil
} }
...@@ -131,3 +121,41 @@ func (z *reader) Close() error { ...@@ -131,3 +121,41 @@ func (z *reader) Close() error {
z.err = z.decompressor.Close() z.err = z.decompressor.Close()
return z.err return z.err
} }
func (z *reader) Reset(r io.Reader, dict []byte) error {
if fr, ok := r.(flate.Reader); ok {
z.r = fr
} else {
z.r = bufio.NewReader(r)
}
_, err := io.ReadFull(z.r, z.scratch[0:2])
if err != nil {
return err
}
h := uint(z.scratch[0])<<8 | uint(z.scratch[1])
if (z.scratch[0]&0x0f != zlibDeflate) || (h%31 != 0) {
return ErrHeader
}
haveDict := z.scratch[1]&0x20 != 0
if haveDict {
_, err = io.ReadFull(z.r, z.scratch[0:4])
if err != nil {
return err
}
checksum := uint32(z.scratch[0])<<24 | uint32(z.scratch[1])<<16 | uint32(z.scratch[2])<<8 | uint32(z.scratch[3])
if checksum != adler32.Checksum(dict) {
return ErrDictionary
}
}
if z.decompressor == nil {
if haveDict {
z.decompressor = flate.NewReaderDict(z.r, dict)
} else {
z.decompressor = flate.NewReader(z.r)
}
} else {
z.decompressor.(flate.Resetter).Reset(z.r, dict)
}
z.digest = adler32.New()
return nil
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment