Commit 5440bfc2 authored by Brad Fitzpatrick's avatar Brad Fitzpatrick

net/http/httputil: rewrite flushing code, disable on Server-Sent Events

* Rewrite the flushing code to not use a persistent goroutine, which
  also simplifies testing.
* Define the meaning of a negative flush interval. Its meaning doesn't
  change, but now it's locked in, and then we can use it to optimize
  the performance of the non-buffered case to avoid use of an AfterFunc.
* Support (internal-only) special casing of FlushInterval values per
  request/response.
* For now, treat Server-Sent Event responses as unbuffered. (or rather,
  immediately flushed from the buffer per-write)

Fixes #27816

Change-Id: Ie0f975c997daa3db539504137c741a96d7022665
Reviewed-on: https://go-review.googlesource.com/c/137335
Run-TryBot: Brad Fitzpatrick <bradfitz@golang.org>
TryBot-Result: Gobot Gobot <gobot@golang.org>
Reviewed-by: default avatarDmitri Shuralyov <dmitshur@golang.org>
parent ffc7bc55
...@@ -18,10 +18,6 @@ import ( ...@@ -18,10 +18,6 @@ import (
"time" "time"
) )
// onExitFlushLoop is a callback set by tests to detect the state of the
// flushLoop() goroutine.
var onExitFlushLoop func()
// ReverseProxy is an HTTP Handler that takes an incoming request and // ReverseProxy is an HTTP Handler that takes an incoming request and
// sends it to another server, proxying the response back to the // sends it to another server, proxying the response back to the
// client. // client.
...@@ -42,6 +38,12 @@ type ReverseProxy struct { ...@@ -42,6 +38,12 @@ type ReverseProxy struct {
// to flush to the client while copying the // to flush to the client while copying the
// response body. // response body.
// If zero, no periodic flushing is done. // If zero, no periodic flushing is done.
// A negative value means to flush immediately
// after each write to the client.
// The FlushInterval is ignored when ReverseProxy
// recognizes a response as a streaming response;
// for such reponses, writes are flushed to the client
// immediately.
FlushInterval time.Duration FlushInterval time.Duration
// ErrorLog specifies an optional logger for errors // ErrorLog specifies an optional logger for errors
...@@ -271,7 +273,7 @@ func (p *ReverseProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) { ...@@ -271,7 +273,7 @@ func (p *ReverseProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
fl.Flush() fl.Flush()
} }
} }
err = p.copyResponse(rw, res.Body) err = p.copyResponse(rw, res.Body, p.flushInterval(req, res))
if err != nil { if err != nil {
defer res.Body.Close() defer res.Body.Close()
// Since we're streaming the response, if we run into an error all we can do // Since we're streaming the response, if we run into an error all we can do
...@@ -332,15 +334,28 @@ func removeConnectionHeaders(h http.Header) { ...@@ -332,15 +334,28 @@ func removeConnectionHeaders(h http.Header) {
} }
} }
func (p *ReverseProxy) copyResponse(dst io.Writer, src io.Reader) error { // flushInterval returns the p.FlushInterval value, conditionally
if p.FlushInterval != 0 { // overriding its value for a specific request/response.
func (p *ReverseProxy) flushInterval(req *http.Request, res *http.Response) time.Duration {
resCT := res.Header.Get("Content-Type")
// For Server-Sent Events responses, flush immediately.
// The MIME type is defined in https://www.w3.org/TR/eventsource/#text-event-stream
if resCT == "text/event-stream" {
return -1 // negative means immediately
}
// TODO: more specific cases? e.g. res.ContentLength == -1?
return p.FlushInterval
}
func (p *ReverseProxy) copyResponse(dst io.Writer, src io.Reader, flushInterval time.Duration) error {
if flushInterval != 0 {
if wf, ok := dst.(writeFlusher); ok { if wf, ok := dst.(writeFlusher); ok {
mlw := &maxLatencyWriter{ mlw := &maxLatencyWriter{
dst: wf, dst: wf,
latency: p.FlushInterval, latency: flushInterval,
done: make(chan bool),
} }
go mlw.flushLoop()
defer mlw.stop() defer mlw.stop()
dst = mlw dst = mlw
} }
...@@ -403,34 +418,44 @@ type writeFlusher interface { ...@@ -403,34 +418,44 @@ type writeFlusher interface {
type maxLatencyWriter struct { type maxLatencyWriter struct {
dst writeFlusher dst writeFlusher
latency time.Duration latency time.Duration // non-zero; negative means to flush immediately
mu sync.Mutex // protects Write + Flush mu sync.Mutex // protects t, flushPending, and dst.Flush
done chan bool t *time.Timer
flushPending bool
} }
func (m *maxLatencyWriter) Write(p []byte) (int, error) { func (m *maxLatencyWriter) Write(p []byte) (n int, err error) {
m.mu.Lock() m.mu.Lock()
defer m.mu.Unlock() defer m.mu.Unlock()
return m.dst.Write(p) n, err = m.dst.Write(p)
if m.latency < 0 {
m.dst.Flush()
return
}
if m.flushPending {
return
}
if m.t == nil {
m.t = time.AfterFunc(m.latency, m.delayedFlush)
} else {
m.t.Reset(m.latency)
}
m.flushPending = true
return
} }
func (m *maxLatencyWriter) flushLoop() { func (m *maxLatencyWriter) delayedFlush() {
t := time.NewTicker(m.latency) m.mu.Lock()
defer t.Stop() defer m.mu.Unlock()
for { m.dst.Flush()
select { m.flushPending = false
case <-m.done:
if onExitFlushLoop != nil {
onExitFlushLoop()
}
return
case <-t.C:
m.mu.Lock()
m.dst.Flush()
m.mu.Unlock()
}
}
} }
func (m *maxLatencyWriter) stop() { m.done <- true } func (m *maxLatencyWriter) stop() {
m.mu.Lock()
defer m.mu.Unlock()
if m.t != nil {
m.t.Stop()
}
}
...@@ -297,10 +297,6 @@ func TestReverseProxyFlushInterval(t *testing.T) { ...@@ -297,10 +297,6 @@ func TestReverseProxyFlushInterval(t *testing.T) {
proxyHandler := NewSingleHostReverseProxy(backendURL) proxyHandler := NewSingleHostReverseProxy(backendURL)
proxyHandler.FlushInterval = time.Microsecond proxyHandler.FlushInterval = time.Microsecond
done := make(chan bool)
onExitFlushLoop = func() { done <- true }
defer func() { onExitFlushLoop = nil }()
frontend := httptest.NewServer(proxyHandler) frontend := httptest.NewServer(proxyHandler)
defer frontend.Close() defer frontend.Close()
...@@ -314,13 +310,6 @@ func TestReverseProxyFlushInterval(t *testing.T) { ...@@ -314,13 +310,6 @@ func TestReverseProxyFlushInterval(t *testing.T) {
if bodyBytes, _ := ioutil.ReadAll(res.Body); string(bodyBytes) != expected { if bodyBytes, _ := ioutil.ReadAll(res.Body); string(bodyBytes) != expected {
t.Errorf("got body %q; expected %q", bodyBytes, expected) t.Errorf("got body %q; expected %q", bodyBytes, expected)
} }
select {
case <-done:
// OK
case <-time.After(5 * time.Second):
t.Error("maxLatencyWriter flushLoop() never exited")
}
} }
func TestReverseProxyCancelation(t *testing.T) { func TestReverseProxyCancelation(t *testing.T) {
...@@ -946,3 +935,48 @@ func TestReverseProxy_PanicBodyError(t *testing.T) { ...@@ -946,3 +935,48 @@ func TestReverseProxy_PanicBodyError(t *testing.T) {
req, _ := http.NewRequest("GET", "http://foo.tld/", nil) req, _ := http.NewRequest("GET", "http://foo.tld/", nil)
rproxy.ServeHTTP(httptest.NewRecorder(), req) rproxy.ServeHTTP(httptest.NewRecorder(), req)
} }
func TestSelectFlushInterval(t *testing.T) {
tests := []struct {
name string
p *ReverseProxy
req *http.Request
res *http.Response
want time.Duration
}{
{
name: "default",
res: &http.Response{},
p: &ReverseProxy{FlushInterval: 123},
want: 123,
},
{
name: "server-sent events overrides non-zero",
res: &http.Response{
Header: http.Header{
"Content-Type": {"text/event-stream"},
},
},
p: &ReverseProxy{FlushInterval: 123},
want: -1,
},
{
name: "server-sent events overrides zero",
res: &http.Response{
Header: http.Header{
"Content-Type": {"text/event-stream"},
},
},
p: &ReverseProxy{FlushInterval: 0},
want: -1,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := tt.p.flushInterval(tt.req, tt.res)
if got != tt.want {
t.Errorf("flushLatency = %v; want %v", got, tt.want)
}
})
}
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment