Commit 5dd372bd authored by Blake Gentry's avatar Blake Gentry Committed by Brad Fitzpatrick

net/http: retry idempotent HTTP reqs on dead reused conns

If we try to reuse a connection that the server is in the process of
closing, we may end up successfully writing out our request (or a
portion of our request) only to find a connection error when we try to
read from (or finish writing to) the socket. This manifests as an EOF
returned from the Transport's RoundTrip.

The issue, among others, is described in #4677.

This change follows some of the Chromium guidelines for retrying
idempotent requests only when the connection has been already been used
successfully and no header data has yet been received for the response.

As part of this change, an unexported error was defined for
errMissingHost, which was previously defined inline. errMissingHost is
the only non-network error returned from a Request's Write() method.

Additionally, this breaks TestLinuxSendfile because its test server
explicitly triggers the type of scenario this change is meant to retry
on. Because that test server stops accepting conns on the test listener
before the retry, the test would time out. To fix this, the test was
altered to use a non-idempotent test type (POST).

Change-Id: I1ca630b944f0ed7ec1d3d46056a50fb959481a16
Reviewed-on: https://go-review.googlesource.com/3210Reviewed-by: default avatarBrad Fitzpatrick <bradfitz@golang.org>
Run-TryBot: Brad Fitzpatrick <bradfitz@golang.org>
parent 7c20ea93
...@@ -125,6 +125,11 @@ func SetPendingDialHooks(before, after func()) { ...@@ -125,6 +125,11 @@ func SetPendingDialHooks(before, after func()) {
prePendingDial, postPendingDial = before, after prePendingDial, postPendingDial = before, after
} }
// SetRetriedHook sets the hook that runs when an idempotent retry occurs.
func SetRetriedHook(hook func()) {
retried = hook
}
var ExportServerNewConn = (*Server).newConn var ExportServerNewConn = (*Server).newConn
var ExportCloseWriteAndWait = (*conn).closeWriteAndWait var ExportCloseWriteAndWait = (*conn).closeWriteAndWait
......
...@@ -946,7 +946,7 @@ func TestLinuxSendfile(t *testing.T) { ...@@ -946,7 +946,7 @@ func TestLinuxSendfile(t *testing.T) {
res.Body.Close() res.Body.Close()
// Force child to exit cleanly. // Force child to exit cleanly.
Get(fmt.Sprintf("http://%s/quit", ln.Addr())) Post(fmt.Sprintf("http://%s/quit", ln.Addr()), "", nil)
child.Wait() child.Wait()
rx := regexp.MustCompile(`sendfile(64)?\(\d+,\s*\d+,\s*NULL,\s*\d+\)\s*=\s*\d+\s*\n`) rx := regexp.MustCompile(`sendfile(64)?\(\d+,\s*\d+,\s*NULL,\s*\d+\)\s*=\s*\d+\s*\n`)
......
...@@ -367,6 +367,10 @@ func (r *Request) WriteProxy(w io.Writer) error { ...@@ -367,6 +367,10 @@ func (r *Request) WriteProxy(w io.Writer) error {
return r.write(w, true, nil, nil) return r.write(w, true, nil, nil)
} }
// errMissingHost is returned by Write when there is no Host or URL present in
// the Request.
var errMissingHost = errors.New("http: Request.Write on Request with no Host or URL set")
// extraHeaders may be nil // extraHeaders may be nil
// waitForContinue may be nil // waitForContinue may be nil
func (req *Request) write(w io.Writer, usingProxy bool, extraHeaders Header, waitForContinue func() bool) error { func (req *Request) write(w io.Writer, usingProxy bool, extraHeaders Header, waitForContinue func() bool) error {
...@@ -377,7 +381,7 @@ func (req *Request) write(w io.Writer, usingProxy bool, extraHeaders Header, wai ...@@ -377,7 +381,7 @@ func (req *Request) write(w io.Writer, usingProxy bool, extraHeaders Header, wai
host := cleanHost(req.Host) host := cleanHost(req.Host)
if host == "" { if host == "" {
if req.URL == nil { if req.URL == nil {
return errors.New("http: Request.Write on Request with no Host or URL set") return errMissingHost
} }
host = cleanHost(req.URL.Host) host = cleanHost(req.URL.Host)
} }
...@@ -1042,3 +1046,11 @@ func (r *Request) closeBody() { ...@@ -1042,3 +1046,11 @@ func (r *Request) closeBody() {
r.Body.Close() r.Body.Close()
} }
} }
func (r *Request) isReplayable() bool {
return r.Body == nil &&
(r.Method == "GET" ||
r.Method == "HEAD" ||
r.Method == "OPTIONS" ||
r.Method == "TRACE")
}
...@@ -244,28 +244,79 @@ func (t *Transport) RoundTrip(req *Request) (*Response, error) { ...@@ -244,28 +244,79 @@ func (t *Transport) RoundTrip(req *Request) (*Response, error) {
req.closeBody() req.closeBody()
return nil, errors.New("http: no Host in request URL") return nil, errors.New("http: no Host in request URL")
} }
treq := &transportRequest{Request: req}
cm, err := t.connectMethodForRequest(treq) for {
if err != nil { // treq gets modified by roundTrip, so we need to recreate for each retry.
req.closeBody() treq := &transportRequest{Request: req}
return nil, err cm, err := t.connectMethodForRequest(treq)
if err != nil {
req.closeBody()
return nil, err
}
// Get the cached or newly-created connection to either the
// host (for http or https), the http proxy, or the http proxy
// pre-CONNECTed to https server. In any case, we'll be ready
// to send it requests.
pconn, err := t.getConn(req, cm)
if err != nil {
t.setReqCanceler(req, nil)
req.closeBody()
return nil, err
}
var resp *Response
if pconn.alt != nil {
// HTTP/2 path.
resp, err = pconn.alt.RoundTrip(req)
} else {
resp, err = pconn.roundTrip(treq)
}
if err == nil {
return resp, nil
}
if err := checkTransportResend(err, req, pconn); err != nil {
return nil, err
}
if retried != nil {
retried()
}
} }
}
// Get the cached or newly-created connection to either the // checkTransportResend checks whether a failed HTTP request can be
// host (for http or https), the http proxy, or the http proxy // resent on a new connection. The non-nil input error is the error from
// pre-CONNECTed to https server. In any case, we'll be ready // roundTrip, which might be wrapped in a beforeRespHeaderError error.
// to send it requests. //
pconn, err := t.getConn(req, cm) // The return value is err or the unwrapped error inside a
if err != nil { // beforeRespHeaderError.
t.setReqCanceler(req, nil) func checkTransportResend(err error, req *Request, pconn *persistConn) error {
req.closeBody() brhErr, ok := err.(beforeRespHeaderError)
return nil, err if !ok {
return err
} }
if pconn.alt != nil { err = brhErr.error // unwrap the custom error in case we return it
// HTTP/2 path. if err != errMissingHost && pconn.isReused() && req.isReplayable() {
return pconn.alt.RoundTrip(req) // If we try to reuse a connection that the server is in the process of
// closing, we may end up successfully writing out our request (or a
// portion of our request) only to find a connection error when we try to
// read from (or finish writing to) the socket.
// There can be a race between the socket pool checking whether a socket
// is still connected, receiving the FIN, and sending/reading data on a
// reused socket. If we receive the FIN between the connectedness check
// and writing/reading from the socket, we may first learn the socket is
// disconnected when we get a ERR_SOCKET_NOT_CONNECTED. This will most
// likely happen when trying to retrieve its IP address. See
// http://crbug.com/105824 for more details.
// We resend a request only if we reused a keep-alive connection and did
// not yet receive any header data. This automatically prevents an
// infinite resend loop because we'll run out of the cached keep-alive
// connections eventually.
return nil
} }
return pconn.roundTrip(treq) return err
} }
// ErrSkipAltProtocol is a sentinel error value defined by Transport.RegisterProtocol. // ErrSkipAltProtocol is a sentinel error value defined by Transport.RegisterProtocol.
...@@ -408,6 +459,7 @@ func (t *Transport) putIdleConn(pconn *persistConn) bool { ...@@ -408,6 +459,7 @@ func (t *Transport) putIdleConn(pconn *persistConn) bool {
if max == 0 { if max == 0 {
max = DefaultMaxIdleConnsPerHost max = DefaultMaxIdleConnsPerHost
} }
pconn.markReused()
t.idleMu.Lock() t.idleMu.Lock()
waitingDialer := t.idleConnCh[key] waitingDialer := t.idleConnCh[key]
...@@ -539,7 +591,7 @@ func (t *Transport) dial(network, addr string) (c net.Conn, err error) { ...@@ -539,7 +591,7 @@ func (t *Transport) dial(network, addr string) (c net.Conn, err error) {
} }
// Testing hooks: // Testing hooks:
var prePendingDial, postPendingDial func() var prePendingDial, postPendingDial, retried func()
// getConn dials and creates a new persistConn to the target as // getConn dials and creates a new persistConn to the target as
// specified in the connectMethod. This includes doing a proxy CONNECT // specified in the connectMethod. This includes doing a proxy CONNECT
...@@ -879,6 +931,7 @@ type persistConn struct { ...@@ -879,6 +931,7 @@ type persistConn struct {
closed bool // whether conn has been closed closed bool // whether conn has been closed
broken bool // an error has happened on this connection; marked broken so it's not reused. broken bool // an error has happened on this connection; marked broken so it's not reused.
canceled bool // whether this conn was broken due a CancelRequest canceled bool // whether this conn was broken due a CancelRequest
reused bool // whether conn has had successful request/response and is being reused.
// mutateHeaderFunc is an optional func to modify extra // mutateHeaderFunc is an optional func to modify extra
// headers on each outbound request before it's written. (the // headers on each outbound request before it's written. (the
// original Request given to RoundTrip is not modified) // original Request given to RoundTrip is not modified)
...@@ -900,6 +953,14 @@ func (pc *persistConn) isCanceled() bool { ...@@ -900,6 +953,14 @@ func (pc *persistConn) isCanceled() bool {
return pc.canceled return pc.canceled
} }
// isReused reports whether this connection is in a known broken state.
func (pc *persistConn) isReused() bool {
pc.lk.Lock()
r := pc.reused
pc.lk.Unlock()
return r
}
func (pc *persistConn) cancelRequest() { func (pc *persistConn) cancelRequest() {
pc.lk.Lock() pc.lk.Lock()
defer pc.lk.Unlock() defer pc.lk.Unlock()
...@@ -922,6 +983,9 @@ func (pc *persistConn) readLoop() { ...@@ -922,6 +983,9 @@ func (pc *persistConn) readLoop() {
alive := true alive := true
for alive { for alive {
pb, err := pc.br.Peek(1) pb, err := pc.br.Peek(1)
if err != nil {
err = beforeRespHeaderError{err}
}
pc.lk.Lock() pc.lk.Lock()
if pc.numExpectedResponses == 0 { if pc.numExpectedResponses == 0 {
...@@ -1004,7 +1068,13 @@ func (pc *persistConn) readLoop() { ...@@ -1004,7 +1068,13 @@ func (pc *persistConn) readLoop() {
// on t from this persistConn while the Transport // on t from this persistConn while the Transport
// potentially spins up a different persistConn for the // potentially spins up a different persistConn for the
// caller's subsequent request. // caller's subsequent request.
pc.t.setReqCanceler(rc.req, nil) //
// If this request will be retried, don't clear the reqCanceler
// yet or else roundTrip thinks it's been canceled.
if err == nil ||
checkTransportResend(err, rc.req, pc) != nil {
pc.t.setReqCanceler(rc.req, nil)
}
} }
pc.lk.Lock() pc.lk.Lock()
...@@ -1186,6 +1256,12 @@ var ( ...@@ -1186,6 +1256,12 @@ var (
testHookReadLoopBeforeNextRead func() testHookReadLoopBeforeNextRead func()
) )
// beforeRespHeaderError is used to indicate when an IO error has occurred before
// any header data was received.
type beforeRespHeaderError struct {
error
}
func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) { func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) {
if hook := testHookEnterRoundTrip; hook != nil { if hook := testHookEnterRoundTrip; hook != nil {
hook() hook()
...@@ -1267,7 +1343,7 @@ WaitResponse: ...@@ -1267,7 +1343,7 @@ WaitResponse:
} }
} }
if err != nil { if err != nil {
re = responseAndError{nil, err} re = responseAndError{nil, beforeRespHeaderError{err}}
pc.close() pc.close()
break WaitResponse break WaitResponse
} }
...@@ -1293,7 +1369,7 @@ WaitResponse: ...@@ -1293,7 +1369,7 @@ WaitResponse:
fn() fn()
} }
default: default:
re = responseAndError{err: errClosed} re = responseAndError{err: beforeRespHeaderError{errClosed}}
if pc.isCanceled() { if pc.isCanceled() {
re = responseAndError{err: errRequestCanceled} re = responseAndError{err: errRequestCanceled}
} }
...@@ -1326,6 +1402,14 @@ func (pc *persistConn) markBroken() { ...@@ -1326,6 +1402,14 @@ func (pc *persistConn) markBroken() {
pc.broken = true pc.broken = true
} }
// markReused marks this connection as having been successfully used for a
// request and response.
func (pc *persistConn) markReused() {
pc.lk.Lock()
pc.reused = true
pc.lk.Unlock()
}
func (pc *persistConn) close() { func (pc *persistConn) close() {
pc.lk.Lock() pc.lk.Lock()
defer pc.lk.Unlock() defer pc.lk.Unlock()
......
...@@ -2410,6 +2410,80 @@ type closerFunc func() error ...@@ -2410,6 +2410,80 @@ type closerFunc func() error
func (f closerFunc) Close() error { return f() } func (f closerFunc) Close() error { return f() }
// Issue 4677. If we try to reuse a connection that the server is in the
// process of closing, we may end up successfully writing out our request (or a
// portion of our request) only to find a connection error when we try to read
// from (or finish writing to) the socket.
//
// NOTE: we resend a request only if the request is idempotent, we reused a
// keep-alive connection, and we haven't yet received any header data. This
// automatically prevents an infinite resend loop because we'll run out of the
// cached keep-alive connections eventually.
func TestRetryIdempotentRequestsOnError(t *testing.T) {
defer afterTest(t)
ts := httptest.NewServer(HandlerFunc(func(w ResponseWriter, r *Request) {
}))
defer ts.Close()
tr := &Transport{}
c := &Client{Transport: tr}
const N = 2
retryc := make(chan struct{}, N)
SetRetriedHook(func() {
retryc <- struct{}{}
})
defer SetRetriedHook(nil)
for n := 0; n < 100; n++ {
// open 2 conns
errc := make(chan error, N)
for i := 0; i < N; i++ {
// start goroutines, send on errc
go func() {
res, err := c.Get(ts.URL)
if err == nil {
res.Body.Close()
}
errc <- err
}()
}
for i := 0; i < N; i++ {
if err := <-errc; err != nil {
t.Fatal(err)
}
}
ts.CloseClientConnections()
for i := 0; i < N; i++ {
go func() {
res, err := c.Get(ts.URL)
if err == nil {
res.Body.Close()
}
errc <- err
}()
}
for i := 0; i < N; i++ {
if err := <-errc; err != nil {
t.Fatal(err)
}
}
for i := 0; i < N; i++ {
select {
case <-retryc:
// we triggered a retry, test was successful
t.Logf("finished after %d runs\n", n)
return
default:
}
}
}
t.Fatal("did not trigger any retries")
}
// Issue 6981 // Issue 6981
func TestTransportClosesBodyOnError(t *testing.T) { func TestTransportClosesBodyOnError(t *testing.T) {
defer afterTest(t) defer afterTest(t)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment