Commit 8a2a5013 authored by Brad Fitzpatrick's avatar Brad Fitzpatrick

net/http: fix Transport race(s) with high GOMAXPROCS

Also adds a new test for GOMAXPROCS=16 explicitly, which now passes
reliably in a stress loop like:

$ go test -c
$ (while ./http.test -test.v -test.run=Concurrency; do echo pass; done ) 2>&1 | tee foo; less foo

(It used to fail very quickly and reliably on at least Linux/amd64)

Fixes #3793

R=golang-dev, adg, r
CC=golang-dev
https://golang.org/cl/6347061
parent 2b4cc6cc
...@@ -24,6 +24,7 @@ import ( ...@@ -24,6 +24,7 @@ import (
"os" "os"
"strings" "strings"
"sync" "sync"
"time"
) )
// DefaultTransport is the default implementation of Transport and is // DefaultTransport is the default implementation of Transport and is
...@@ -260,6 +261,11 @@ func (t *Transport) putIdleConn(pconn *persistConn) bool { ...@@ -260,6 +261,11 @@ func (t *Transport) putIdleConn(pconn *persistConn) bool {
pconn.close() pconn.close()
return false return false
} }
for _, exist := range t.idleConn[key] {
if exist == pconn {
log.Fatalf("dup idle pconn %p in freelist", pconn)
}
}
t.idleConn[key] = append(t.idleConn[key], pconn) t.idleConn[key] = append(t.idleConn[key], pconn)
return true return true
} }
...@@ -289,7 +295,7 @@ func (t *Transport) getIdleConn(cm *connectMethod) (pconn *persistConn) { ...@@ -289,7 +295,7 @@ func (t *Transport) getIdleConn(cm *connectMethod) (pconn *persistConn) {
return return
} }
} }
return panic("unreachable")
} }
func (t *Transport) dial(network, addr string) (c net.Conn, err error) { func (t *Transport) dial(network, addr string) (c net.Conn, err error) {
...@@ -324,6 +330,7 @@ func (t *Transport) getConn(cm *connectMethod) (*persistConn, error) { ...@@ -324,6 +330,7 @@ func (t *Transport) getConn(cm *connectMethod) (*persistConn, error) {
conn: conn, conn: conn,
reqch: make(chan requestAndChan, 50), reqch: make(chan requestAndChan, 50),
writech: make(chan writeRequest, 50), writech: make(chan writeRequest, 50),
closech: make(chan struct{}),
} }
switch { switch {
...@@ -491,6 +498,7 @@ type persistConn struct { ...@@ -491,6 +498,7 @@ type persistConn struct {
bw *bufio.Writer // to conn bw *bufio.Writer // to conn
reqch chan requestAndChan // written by roundTrip; read by readLoop reqch chan requestAndChan // written by roundTrip; read by readLoop
writech chan writeRequest // written by roundTrip; read by writeLoop writech chan writeRequest // written by roundTrip; read by writeLoop
closech chan struct{} // broadcast close when readLoop (TCP connection) closes
isProxy bool isProxy bool
// mutateHeaderFunc is an optional func to modify extra // mutateHeaderFunc is an optional func to modify extra
...@@ -522,6 +530,7 @@ func remoteSideClosed(err error) bool { ...@@ -522,6 +530,7 @@ func remoteSideClosed(err error) bool {
} }
func (pc *persistConn) readLoop() { func (pc *persistConn) readLoop() {
defer close(pc.closech)
defer close(pc.writech) defer close(pc.writech)
alive := true alive := true
var lastbody io.ReadCloser // last response body, if any, read on this connection var lastbody io.ReadCloser // last response body, if any, read on this connection
...@@ -549,7 +558,11 @@ func (pc *persistConn) readLoop() { ...@@ -549,7 +558,11 @@ func (pc *persistConn) readLoop() {
lastbody.Close() // assumed idempotent lastbody.Close() // assumed idempotent
lastbody = nil lastbody = nil
} }
resp, err := ReadResponse(pc.br, rc.req)
var resp *Response
if err == nil {
resp, err = ReadResponse(pc.br, rc.req)
}
if err != nil { if err != nil {
pc.close() pc.close()
...@@ -578,7 +591,7 @@ func (pc *persistConn) readLoop() { ...@@ -578,7 +591,7 @@ func (pc *persistConn) readLoop() {
var waitForBodyRead chan bool var waitForBodyRead chan bool
if hasBody { if hasBody {
lastbody = resp.Body lastbody = resp.Body
waitForBodyRead = make(chan bool) waitForBodyRead = make(chan bool, 1)
resp.Body.(*bodyEOFSignal).fn = func() { resp.Body.(*bodyEOFSignal).fn = func() {
if alive && !pc.t.putIdleConn(pc) { if alive && !pc.t.putIdleConn(pc) {
alive = false alive = false
...@@ -692,6 +705,8 @@ func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err err ...@@ -692,6 +705,8 @@ func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err err
pc.reqch <- requestAndChan{req.Request, resc, requestedGzip} pc.reqch <- requestAndChan{req.Request, resc, requestedGzip}
var re responseAndError var re responseAndError
var pconnDeadCh = pc.closech
var failTicker <-chan time.Time
WaitResponse: WaitResponse:
for { for {
select { select {
...@@ -700,6 +715,24 @@ WaitResponse: ...@@ -700,6 +715,24 @@ WaitResponse:
re = responseAndError{nil, err} re = responseAndError{nil, err}
break WaitResponse break WaitResponse
} }
case <-pconnDeadCh:
// The persist connection is dead. This shouldn't
// usually happen (only with Connection: close responses
// with no response bodies), but if it does happen it
// means either a) the remote server hung up on us
// prematurely, or b) the readLoop sent us a response &
// closed its closech at roughly the same time, and we
// selected this case first, in which case a response
// might still be coming soon.
//
// We can't avoid the select race in b) by using a unbuffered
// resc channel instead, because then goroutines can
// leak if we exit due to other errors.
pconnDeadCh = nil // avoid spinning
failTicker = time.After(100 * time.Millisecond) // arbitrary time to wait for resc
case <-failTicker:
re = responseAndError{nil, errors.New("net/http: transport closed before response was received")}
break WaitResponse
case re = <-resc: case re = <-resc:
break WaitResponse break WaitResponse
} }
...@@ -762,6 +795,7 @@ type bodyEOFSignal struct { ...@@ -762,6 +795,7 @@ type bodyEOFSignal struct {
body io.ReadCloser body io.ReadCloser
fn func() fn func()
isClosed bool isClosed bool
once sync.Once
} }
func (es *bodyEOFSignal) Read(p []byte) (n int, err error) { func (es *bodyEOFSignal) Read(p []byte) (n int, err error) {
...@@ -769,9 +803,8 @@ func (es *bodyEOFSignal) Read(p []byte) (n int, err error) { ...@@ -769,9 +803,8 @@ func (es *bodyEOFSignal) Read(p []byte) (n int, err error) {
if es.isClosed && n > 0 { if es.isClosed && n > 0 {
panic("http: unexpected bodyEOFSignal Read after Close; see issue 1725") panic("http: unexpected bodyEOFSignal Read after Close; see issue 1725")
} }
if err == io.EOF && es.fn != nil { if err == io.EOF {
es.fn() es.condfn()
es.fn = nil
} }
return return
} }
...@@ -782,13 +815,18 @@ func (es *bodyEOFSignal) Close() (err error) { ...@@ -782,13 +815,18 @@ func (es *bodyEOFSignal) Close() (err error) {
} }
es.isClosed = true es.isClosed = true
err = es.body.Close() err = es.body.Close()
if err == nil && es.fn != nil { if err == nil {
es.fn() es.condfn()
es.fn = nil
} }
return return
} }
func (es *bodyEOFSignal) condfn() {
if es.fn != nil {
es.once.Do(es.fn)
}
}
type readFirstCloseBoth struct { type readFirstCloseBoth struct {
io.ReadCloser io.ReadCloser
io.Closer io.Closer
......
...@@ -857,6 +857,50 @@ func TestIssue3595(t *testing.T) { ...@@ -857,6 +857,50 @@ func TestIssue3595(t *testing.T) {
} }
} }
func TestTransportConcurrency(t *testing.T) {
const maxProcs = 16
const numReqs = 500
defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(maxProcs))
ts := httptest.NewServer(HandlerFunc(func(w ResponseWriter, r *Request) {
fmt.Fprintf(w, "%v", r.FormValue("echo"))
}))
defer ts.Close()
tr := &Transport{}
c := &Client{Transport: tr}
reqs := make(chan string)
defer close(reqs)
var wg sync.WaitGroup
wg.Add(numReqs)
for i := 0; i < maxProcs*2; i++ {
go func() {
for req := range reqs {
res, err := c.Get(ts.URL + "/?echo=" + req)
if err != nil {
t.Errorf("error on req %s: %v", req, err)
wg.Done()
continue
}
all, err := ioutil.ReadAll(res.Body)
if err != nil {
t.Errorf("read error on req %s: %v", req, err)
wg.Done()
continue
}
if string(all) != req {
t.Errorf("body of req %s = %q; want %q", req, all, req)
}
wg.Done()
res.Body.Close()
}
}()
}
for i := 0; i < numReqs; i++ {
reqs <- fmt.Sprintf("request-%d", i)
}
wg.Wait()
}
type fooProto struct{} type fooProto struct{}
func (fooProto) RoundTrip(req *Request) (*Response, error) { func (fooProto) RoundTrip(req *Request) (*Response, error) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment