Commit 39377013 authored by Daniel Morsing's avatar Daniel Morsing Committed by Brad Fitzpatrick

net/http: handle close/response race more gracefully

There was a logical race in Transport.RoundTrip where a roundtrip with
a pending response would race with the channel for the connection
closing. This usually happened for responses with connection: close
and no body.

We handled this race by reading the close channel, setting a timer
for 100ms and if no response was returned before then, we would then
return an error.

This put a lower bound on how fast a connection could fail. We couldn't
fail a request faster than 100ms.

Reordering the channel operations gets rid of the logical race. If
the readLoop causes the connection to be closed, it would have put
its response into the return channel already and we can fetch it with
a non-blocking receive.

Change-Id: Idf09e48d7a0453d7de0120d3055d0ce5893a5428
Reviewed-on: https://go-review.googlesource.com/1787Reviewed-by: default avatarBrad Fitzpatrick <bradfitz@golang.org>
parent a2d12201
...@@ -78,6 +78,10 @@ func (t *Transport) PutIdleTestConn() bool { ...@@ -78,6 +78,10 @@ func (t *Transport) PutIdleTestConn() bool {
}) })
} }
func SetInstallConnClosedHook(f func()) {
testHookPersistConnClosedGotRes = f
}
func NewTestTimeoutHandler(handler Handler, ch <-chan time.Time) Handler { func NewTestTimeoutHandler(handler Handler, ch <-chan time.Time) Handler {
f := func() <-chan time.Time { f := func() <-chan time.Time {
return ch return ch
......
...@@ -914,14 +914,22 @@ func (pc *persistConn) readLoop() { ...@@ -914,14 +914,22 @@ func (pc *persistConn) readLoop() {
} }
} }
// The connection might be going away when we put the
// idleConn below. When that happens, we close the response channel to signal
// to roundTrip that the connection is gone. roundTrip waits for
// both closing and a response in a select, so it might choose
// the close channel, rather than the response.
// We send the response first so that roundTrip can check
// if there is a pending one with a non-blocking select
// on the response channel before erroring out.
rc.ch <- responseAndError{resp, err}
if alive && !hasBody { if alive && !hasBody {
alive = !pc.sawEOF && alive = !pc.sawEOF &&
pc.wroteRequest() && pc.wroteRequest() &&
pc.t.putIdleConn(pc) pc.t.putIdleConn(pc)
} }
rc.ch <- responseAndError{resp, err}
// Wait for the just-returned response body to be fully consumed // Wait for the just-returned response body to be fully consumed
// before we race and peek on the underlying bufio reader. // before we race and peek on the underlying bufio reader.
if waitForBodyRead != nil { if waitForBodyRead != nil {
...@@ -1028,6 +1036,8 @@ func (e *httpError) Temporary() bool { return true } ...@@ -1028,6 +1036,8 @@ func (e *httpError) Temporary() bool { return true }
var errTimeout error = &httpError{err: "net/http: timeout awaiting response headers", timeout: true} var errTimeout error = &httpError{err: "net/http: timeout awaiting response headers", timeout: true}
var errClosed error = &httpError{err: "net/http: transport closed before response was received"} var errClosed error = &httpError{err: "net/http: transport closed before response was received"}
var testHookPersistConnClosedGotRes func() // nil except for tests
func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) { func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) {
pc.t.setReqCanceler(req.Request, pc.cancelRequest) pc.t.setReqCanceler(req.Request, pc.cancelRequest)
pc.lk.Lock() pc.lk.Lock()
...@@ -1078,8 +1088,6 @@ func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err err ...@@ -1078,8 +1088,6 @@ func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err err
pc.reqch <- requestAndChan{req.Request, resc, requestedGzip} pc.reqch <- requestAndChan{req.Request, resc, requestedGzip}
var re responseAndError var re responseAndError
var pconnDeadCh = pc.closech
var failTicker <-chan time.Time
var respHeaderTimer <-chan time.Time var respHeaderTimer <-chan time.Time
WaitResponse: WaitResponse:
for { for {
...@@ -1095,23 +1103,25 @@ WaitResponse: ...@@ -1095,23 +1103,25 @@ WaitResponse:
defer timer.Stop() // prevent leaks defer timer.Stop() // prevent leaks
respHeaderTimer = timer.C respHeaderTimer = timer.C
} }
case <-pconnDeadCh: case <-pc.closech:
// The persist connection is dead. This shouldn't // The persist connection is dead. This shouldn't
// usually happen (only with Connection: close responses // usually happen (only with Connection: close responses
// with no response bodies), but if it does happen it // with no response bodies), but if it does happen it
// means either a) the remote server hung up on us // means either a) the remote server hung up on us
// prematurely, or b) the readLoop sent us a response & // prematurely, or b) the readLoop sent us a response &
// closed its closech at roughly the same time, and we // closed its closech at roughly the same time, and we
// selected this case first, in which case a response // selected this case first. If we got a response, readLoop makes sure
// might still be coming soon. // to send it before it puts the conn and closes the channel.
// // That way, we can fetch the response, if there is one,
// We can't avoid the select race in b) by using a unbuffered // with a non-blocking receive.
// resc channel instead, because then goroutines can select {
// leak if we exit due to other errors. case re = <-resc:
pconnDeadCh = nil // avoid spinning if fn := testHookPersistConnClosedGotRes; fn != nil {
failTicker = time.After(100 * time.Millisecond) // arbitrary time to wait for resc fn()
case <-failTicker: }
default:
re = responseAndError{err: errClosed} re = responseAndError{err: errClosed}
}
break WaitResponse break WaitResponse
case <-respHeaderTimer: case <-respHeaderTimer:
pc.close() pc.close()
......
...@@ -2275,6 +2275,52 @@ func TestTransportRangeAndGzip(t *testing.T) { ...@@ -2275,6 +2275,52 @@ func TestTransportRangeAndGzip(t *testing.T) {
res.Body.Close() res.Body.Close()
} }
// Previously, we used to handle a logical race within RoundTrip by waiting for 100ms
// in the case of an error. Changing the order of the channel operations got rid of this
// race.
//
// In order to test that the channel op reordering works, we install a hook into the
// roundTrip function which gets called if we saw the connection go away and
// we subsequently received a response.
func TestTransportResponseCloseRace(t *testing.T) {
if testing.Short() {
t.Skip("skipping in short mode")
}
defer afterTest(t)
ts := httptest.NewServer(HandlerFunc(func(w ResponseWriter, r *Request) {
}))
defer ts.Close()
sawRace := false
SetInstallConnClosedHook(func() {
sawRace = true
})
defer SetInstallConnClosedHook(nil)
tr := &Transport{
DisableKeepAlives: true,
}
req, err := NewRequest("GET", ts.URL, nil)
if err != nil {
t.Fatal(err)
}
// selects are not deterministic, so do this a bunch
// and see if we handle the logical race at least once.
for i := 0; i < 10000; i++ {
resp, err := tr.RoundTrip(req)
if err != nil {
t.Fatalf("unexpected error: %s", err)
continue
}
resp.Body.Close()
if sawRace {
break
}
}
if !sawRace {
t.Errorf("didn't see response/connection going away race")
}
}
func wantBody(res *http.Response, err error, want string) error { func wantBody(res *http.Response, err error, want string) error {
if err != nil { if err != nil {
return err return err
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment