Commit fbaf881c authored by Russ Cox's avatar Russ Cox

net/http: fix Transport.MaxConnsPerHost limits & idle pool races

There were at least three races in the implementation of the pool of
idle HTTP connections before this CL.

The first race is that HTTP/2 connections can be shared for many
requests, but each requesting goroutine would take the connection out
of the pool and then immediately return it before using it; this
created unnecessary, tiny little race windows during which another
goroutine might dial a second connection instead of reusing the first.
This CL changes the idle pool to just leave the HTTP/2 connection in
the pool permanently (until there is reason to close it), instead of
doing the take-it-out-put-it-back dance race.

The second race is that “is there an idle connection?” and
“register to wait for an idle connection” were implemented as two
separate steps, in different critical sections. So a client could end
up registered to wait for an idle connection and be waiting or perhaps
dialing, not having noticed the idle connection sitting in the pool
that arrived between the two steps.

The third race is that t.getIdleConnCh assumes that the inability to
send on the channel means the client doesn't need the result, when it
could mean that the client has not yet entered the select.
That is, the main dial does:

	idleConnCh := t.getIdleConnCh(cm)
	select {
	case v := <-dialc:
		...
	case pc := <-idleConnCh
		...
	...
	}

But then tryPutIdleConn does:

	waitingDialer := t.idleConnCh[key] // what getIdleConnCh(cm) returned
	select {
	case waitingDialer <- pconn:
		// We're done ...
		return nil
	default:
		if waitingDialer != nil {
			// They had populated this, but their dial won
			// first, so we can clean up this map entry.
			delete(t.idleConnCh, key)
		}
	}

If the client has returned from getIdleConnCh but not yet reached the
select, tryPutIdleConn will be unable to do the send, incorrectly
conclude that the client does not care anymore, and put the connection
in the idle pool instead, again leaving the client dialing unnecessarily
while a connection sits in the idle pool.

(It's also odd that the success case does not clean up the map entry,
and also that the map has room for only a single waiting goroutine for
a given host.)

None of these races mattered too much before Go 1.11: at most they
meant that connections were not reused quite as promptly as possible,
or a few more than necessary would be created. But Go 1.11 added
Transport.MaxConnsPerHost, which limited the number of connections
created for a given host. The default is 0 (unlimited), but if a user
did explicitly impose a low limit (2 is common), all these misplaced
conns could easily add up to the entire limit, causing a deadlock.
This was causing intermittent timeouts in TestTransportMaxConnsPerHost.

The addition of the MaxConnsPerHost support added its own races.

For example, here t.incHostConnCount could increment the count
and return a channel ready for receiving, and then the client would
not receive from it nor ever issue the decrement, because the select
need not evaluate these two cases in order:

	select {
	case <-t.incHostConnCount(cmKey):
		// count below conn per host limit; proceed
	case pc := <-t.getIdleConnCh(cm):
		if trace != nil && trace.GotConn != nil {
			trace.GotConn(httptrace.GotConnInfo{Conn: pc.conn, Reused: pc.isReused()})
		}
		return pc, nil
	...
	}

Obviously, unmatched increments are another way to get to a deadlock.
TestTransportMaxConnsPerHost deadlocked approximately 100% of
the time with a small random sleep added between incHostConnCount
and the select:

	ch := t.incHostConnCount(cmKey):
	time.Sleep(time.Duration(rand.Intn(10))*time.Millisecond)
	select {
	case <-ch
		// count below conn per host limit; proceed
	case pc := <-t.getIdleConnCh(cm):
		...
	}

The limit also did not properly apply to HTTP/2, because of the
decrement being attached to the underlying net.Conn.Close
and net/http not having access to the underlying HTTP/2 conn.
The alternate decrements for HTTP/2 may also have introduced
spurious decrements (discussion in #29889). Perhaps those
spurious decrements or other races caused the other intermittent
non-deadlock failures in TestTransportMaxConnsPerHost,
in which the HTTP/2 phase created too many connections (#31982).

This CL replaces the buggy, racy code with new code that is hopefully
neither buggy nor racy.

Fixes #29889.
Fixes #31982.
Fixes #32336.

Change-Id: I0dfac3a6fe8a6cdf5f0853722781fe2ec071ac97
Reviewed-on: https://go-review.googlesource.com/c/go/+/184262
Run-TryBot: Russ Cox <rsc@golang.org>
TryBot-Result: Gobot Gobot <gobot@golang.org>
Reviewed-by: default avatarBryan C. Mills <bcmills@google.com>
parent ddc8439b
...@@ -166,30 +166,40 @@ func (t *Transport) IdleConnCountForTesting(scheme, addr string) int { ...@@ -166,30 +166,40 @@ func (t *Transport) IdleConnCountForTesting(scheme, addr string) int {
return 0 return 0
} }
func (t *Transport) IdleConnChMapSizeForTesting() int { func (t *Transport) IdleConnWaitMapSizeForTesting() int {
t.idleMu.Lock() t.idleMu.Lock()
defer t.idleMu.Unlock() defer t.idleMu.Unlock()
return len(t.idleConnCh) return len(t.idleConnWait)
} }
func (t *Transport) IsIdleForTesting() bool { func (t *Transport) IsIdleForTesting() bool {
t.idleMu.Lock() t.idleMu.Lock()
defer t.idleMu.Unlock() defer t.idleMu.Unlock()
return t.wantIdle return t.closeIdle
} }
func (t *Transport) RequestIdleConnChForTesting() { func (t *Transport) QueueForIdleConnForTesting() {
t.getIdleConnCh(connectMethod{nil, "http", "example.com", false}) t.queueForIdleConn(nil)
} }
// PutIdleTestConn reports whether it was able to insert a fresh
// persistConn for scheme, addr into the idle connection pool.
func (t *Transport) PutIdleTestConn(scheme, addr string) bool { func (t *Transport) PutIdleTestConn(scheme, addr string) bool {
c, _ := net.Pipe() c, _ := net.Pipe()
key := connectMethodKey{"", scheme, addr, false} key := connectMethodKey{"", scheme, addr, false}
select {
case <-t.incHostConnCount(key): if t.MaxConnsPerHost > 0 {
default: // Transport is tracking conns-per-host.
return false // Increment connection count to account
// for new persistConn created below.
t.connsPerHostMu.Lock()
if t.connsPerHost == nil {
t.connsPerHost = make(map[connectMethodKey]int)
}
t.connsPerHost[key]++
t.connsPerHostMu.Unlock()
} }
return t.tryPutIdleConn(&persistConn{ return t.tryPutIdleConn(&persistConn{
t: t, t: t,
conn: c, // dummy conn: c, // dummy
......
...@@ -2407,6 +2407,7 @@ func TestTimeoutHandlerRace(t *testing.T) { ...@@ -2407,6 +2407,7 @@ func TestTimeoutHandlerRace(t *testing.T) {
} }
// See issues 8209 and 8414. // See issues 8209 and 8414.
// Both issues involved panics in the implementation of TimeoutHandler.
func TestTimeoutHandlerRaceHeader(t *testing.T) { func TestTimeoutHandlerRaceHeader(t *testing.T) {
setParallel(t) setParallel(t)
defer afterTest(t) defer afterTest(t)
...@@ -2434,7 +2435,9 @@ func TestTimeoutHandlerRaceHeader(t *testing.T) { ...@@ -2434,7 +2435,9 @@ func TestTimeoutHandlerRaceHeader(t *testing.T) {
defer func() { <-gate }() defer func() { <-gate }()
res, err := c.Get(ts.URL) res, err := c.Get(ts.URL)
if err != nil { if err != nil {
t.Error(err) // We see ECONNRESET from the connection occasionally,
// and that's OK: this test is checking that the server does not panic.
t.Log(err)
return return
} }
defer res.Body.Close() defer res.Body.Close()
...@@ -5507,19 +5510,23 @@ func TestServerSetKeepAlivesEnabledClosesConns(t *testing.T) { ...@@ -5507,19 +5510,23 @@ func TestServerSetKeepAlivesEnabledClosesConns(t *testing.T) {
if a1 != a2 { if a1 != a2 {
t.Fatal("expected first two requests on same connection") t.Fatal("expected first two requests on same connection")
} }
var idle0 int addr := strings.TrimPrefix(ts.URL, "http://")
if !waitCondition(2*time.Second, 10*time.Millisecond, func() bool {
idle0 = tr.IdleConnKeyCountForTesting() // The two requests should have used the same connection,
return idle0 == 1 // and there should not have been a second connection that
}) { // was created by racing dial against reuse.
t.Fatalf("idle count before SetKeepAlivesEnabled called = %v; want 1", idle0) // (The first get was completed when the second get started.)
n := tr.IdleConnCountForTesting("http", addr)
if n != 1 {
t.Fatalf("idle count for %q after 2 gets = %d, want 1", addr, n)
} }
// SetKeepAlivesEnabled should discard idle conns.
ts.Config.SetKeepAlivesEnabled(false) ts.Config.SetKeepAlivesEnabled(false)
var idle1 int var idle1 int
if !waitCondition(2*time.Second, 10*time.Millisecond, func() bool { if !waitCondition(2*time.Second, 10*time.Millisecond, func() bool {
idle1 = tr.IdleConnKeyCountForTesting() idle1 = tr.IdleConnCountForTesting("http", addr)
return idle1 == 0 return idle1 == 0
}) { }) {
t.Fatalf("idle count after SetKeepAlivesEnabled called = %v; want 0", idle1) t.Fatalf("idle count after SetKeepAlivesEnabled called = %v; want 0", idle1)
......
...@@ -57,15 +57,6 @@ var DefaultTransport RoundTripper = &Transport{ ...@@ -57,15 +57,6 @@ var DefaultTransport RoundTripper = &Transport{
// MaxIdleConnsPerHost. // MaxIdleConnsPerHost.
const DefaultMaxIdleConnsPerHost = 2 const DefaultMaxIdleConnsPerHost = 2
// connsPerHostClosedCh is a closed channel used by MaxConnsPerHost
// for the property that receives from a closed channel return the
// zero value.
var connsPerHostClosedCh = make(chan struct{})
func init() {
close(connsPerHostClosedCh)
}
// Transport is an implementation of RoundTripper that supports HTTP, // Transport is an implementation of RoundTripper that supports HTTP,
// HTTPS, and HTTP proxies (for either HTTP or HTTPS with CONNECT). // HTTPS, and HTTP proxies (for either HTTP or HTTPS with CONNECT).
// //
...@@ -102,11 +93,11 @@ func init() { ...@@ -102,11 +93,11 @@ func init() {
// request is treated as idempotent but the header is not sent on the // request is treated as idempotent but the header is not sent on the
// wire. // wire.
type Transport struct { type Transport struct {
idleMu sync.Mutex idleMu sync.Mutex
wantIdle bool // user has requested to close all idle conns closeIdle bool // user has requested to close all idle conns
idleConn map[connectMethodKey][]*persistConn // most recently used at end idleConn map[connectMethodKey][]*persistConn // most recently used at end
idleConnCh map[connectMethodKey]chan *persistConn idleConnWait map[connectMethodKey]wantConnQueue // waiting getConns
idleLRU connLRU idleLRU connLRU
reqMu sync.Mutex reqMu sync.Mutex
reqCanceler map[*Request]func(error) reqCanceler map[*Request]func(error)
...@@ -114,9 +105,9 @@ type Transport struct { ...@@ -114,9 +105,9 @@ type Transport struct {
altMu sync.Mutex // guards changing altProto only altMu sync.Mutex // guards changing altProto only
altProto atomic.Value // of nil or map[string]RoundTripper, key is URI scheme altProto atomic.Value // of nil or map[string]RoundTripper, key is URI scheme
connCountMu sync.Mutex connsPerHostMu sync.Mutex
connPerHostCount map[connectMethodKey]int connsPerHost map[connectMethodKey]int
connPerHostAvailable map[connectMethodKey]chan struct{} connsPerHostWait map[connectMethodKey]wantConnQueue // waiting getConns
// Proxy specifies a function to return a proxy for a given // Proxy specifies a function to return a proxy for a given
// Request. If the function returns a non-nil error, the // Request. If the function returns a non-nil error, the
...@@ -203,11 +194,6 @@ type Transport struct { ...@@ -203,11 +194,6 @@ type Transport struct {
// active, and idle states. On limit violation, dials will block. // active, and idle states. On limit violation, dials will block.
// //
// Zero means no limit. // Zero means no limit.
//
// For HTTP/2, this currently only controls the number of new
// connections being created at a time, instead of the total
// number. In practice, hosts using HTTP/2 only have about one
// idle connection, though.
MaxConnsPerHost int MaxConnsPerHost int
// IdleConnTimeout is the maximum amount of time an idle // IdleConnTimeout is the maximum amount of time an idle
...@@ -543,7 +529,6 @@ func (t *Transport) roundTrip(req *Request) (*Response, error) { ...@@ -543,7 +529,6 @@ func (t *Transport) roundTrip(req *Request) (*Response, error) {
var resp *Response var resp *Response
if pconn.alt != nil { if pconn.alt != nil {
// HTTP/2 path. // HTTP/2 path.
t.putOrCloseIdleConn(pconn)
t.setReqCanceler(req, nil) // not cancelable with CancelRequest t.setReqCanceler(req, nil) // not cancelable with CancelRequest
resp, err = pconn.alt.RoundTrip(req) resp, err = pconn.alt.RoundTrip(req)
} else { } else {
...@@ -554,7 +539,6 @@ func (t *Transport) roundTrip(req *Request) (*Response, error) { ...@@ -554,7 +539,6 @@ func (t *Transport) roundTrip(req *Request) (*Response, error) {
} }
if http2isNoCachedConnError(err) { if http2isNoCachedConnError(err) {
t.removeIdleConn(pconn) t.removeIdleConn(pconn)
t.decHostConnCount(cm.key()) // clean up the persistent connection
} else if !pconn.shouldRetryRequest(req, err) { } else if !pconn.shouldRetryRequest(req, err) {
// Issue 16465: return underlying net.Conn.Read error from peek, // Issue 16465: return underlying net.Conn.Read error from peek,
// as we've historically done. // as we've historically done.
...@@ -665,8 +649,7 @@ func (t *Transport) CloseIdleConnections() { ...@@ -665,8 +649,7 @@ func (t *Transport) CloseIdleConnections() {
t.idleMu.Lock() t.idleMu.Lock()
m := t.idleConn m := t.idleConn
t.idleConn = nil t.idleConn = nil
t.idleConnCh = nil t.closeIdle = true // close newly idle connections
t.wantIdle = true
t.idleLRU = connLRU{} t.idleLRU = connLRU{}
t.idleMu.Unlock() t.idleMu.Unlock()
for _, conns := range m { for _, conns := range m {
...@@ -762,7 +745,7 @@ func (cm *connectMethod) proxyAuth() string { ...@@ -762,7 +745,7 @@ func (cm *connectMethod) proxyAuth() string {
var ( var (
errKeepAlivesDisabled = errors.New("http: putIdleConn: keep alives disabled") errKeepAlivesDisabled = errors.New("http: putIdleConn: keep alives disabled")
errConnBroken = errors.New("http: putIdleConn: connection is in bad state") errConnBroken = errors.New("http: putIdleConn: connection is in bad state")
errWantIdle = errors.New("http: putIdleConn: CloseIdleConnections was called") errCloseIdle = errors.New("http: putIdleConn: CloseIdleConnections was called")
errTooManyIdle = errors.New("http: putIdleConn: too many idle connections") errTooManyIdle = errors.New("http: putIdleConn: too many idle connections")
errTooManyIdleHost = errors.New("http: putIdleConn: too many idle connections for host") errTooManyIdleHost = errors.New("http: putIdleConn: too many idle connections for host")
errCloseIdleConns = errors.New("http: CloseIdleConnections called") errCloseIdleConns = errors.New("http: CloseIdleConnections called")
...@@ -821,29 +804,56 @@ func (t *Transport) tryPutIdleConn(pconn *persistConn) error { ...@@ -821,29 +804,56 @@ func (t *Transport) tryPutIdleConn(pconn *persistConn) error {
return errConnBroken return errConnBroken
} }
pconn.markReused() pconn.markReused()
key := pconn.cacheKey
t.idleMu.Lock() t.idleMu.Lock()
defer t.idleMu.Unlock() defer t.idleMu.Unlock()
waitingDialer := t.idleConnCh[key] // HTTP/2 (pconn.alt != nil) connections do not come out of the idle list,
select { // because multiple goroutines can use them simultaneously.
case waitingDialer <- pconn: // If this is an HTTP/2 connection being “returned,” we're done.
// We're done with this pconn and somebody else is if pconn.alt != nil && t.idleLRU.m[pconn] != nil {
// currently waiting for a conn of this type (they're
// actively dialing, but this conn is ready
// first). Chrome calls this socket late binding. See
// https://insouciant.org/tech/connection-management-in-chromium/
return nil return nil
default: }
if waitingDialer != nil {
// They had populated this, but their dial won // Deliver pconn to goroutine waiting for idle connection, if any.
// first, so we can clean up this map entry. // (They may be actively dialing, but this conn is ready first.
delete(t.idleConnCh, key) // Chrome calls this socket late binding.
// See https://insouciant.org/tech/connection-management-in-chromium/.)
key := pconn.cacheKey
if q, ok := t.idleConnWait[key]; ok {
done := false
if pconn.alt == nil {
// HTTP/1.
// Loop over the waiting list until we find a w that isn't done already, and hand it pconn.
for q.len() > 0 {
w := q.popFront()
if w.tryDeliver(pconn, nil) {
done = true
break
}
}
} else {
// HTTP/2.
// Can hand the same pconn to everyone in the waiting list,
// and we still won't be done: we want to put it in the idle
// list unconditionally, for any future clients too.
for q.len() > 0 {
w := q.popFront()
w.tryDeliver(pconn, nil)
}
}
if q.len() == 0 {
delete(t.idleConnWait, key)
} else {
t.idleConnWait[key] = q
}
if done {
return nil
} }
} }
if t.wantIdle {
return errWantIdle if t.closeIdle {
return errCloseIdle
} }
if t.idleConn == nil { if t.idleConn == nil {
t.idleConn = make(map[connectMethodKey][]*persistConn) t.idleConn = make(map[connectMethodKey][]*persistConn)
...@@ -864,71 +874,86 @@ func (t *Transport) tryPutIdleConn(pconn *persistConn) error { ...@@ -864,71 +874,86 @@ func (t *Transport) tryPutIdleConn(pconn *persistConn) error {
oldest.close(errTooManyIdle) oldest.close(errTooManyIdle)
t.removeIdleConnLocked(oldest) t.removeIdleConnLocked(oldest)
} }
if t.IdleConnTimeout > 0 {
// Set idle timer, but only for HTTP/1 (pconn.alt == nil).
// The HTTP/2 implementation manages the idle timer itself
// (see idleConnTimeout in h2_bundle.go).
if t.IdleConnTimeout > 0 && pconn.alt == nil {
if pconn.idleTimer != nil { if pconn.idleTimer != nil {
pconn.idleTimer.Reset(t.IdleConnTimeout) pconn.idleTimer.Reset(t.IdleConnTimeout)
} else { } else {
// idleTimer does not apply to HTTP/2 pconn.idleTimer = time.AfterFunc(t.IdleConnTimeout, pconn.closeConnIfStillIdle)
if pconn.alt == nil {
pconn.idleTimer = time.AfterFunc(t.IdleConnTimeout, pconn.closeConnIfStillIdle)
}
} }
} }
pconn.idleAt = time.Now() pconn.idleAt = time.Now()
return nil return nil
} }
// getIdleConnCh returns a channel to receive and return idle // queueForIdleConn queues w to receive the next idle connection for w.cm.
// persistent connection for the given connectMethod. // As an optimization hint to the caller, queueForIdleConn reports whether
// It may return nil, if persistent connections are not being used. // it successfully delivered an already-idle connection.
func (t *Transport) getIdleConnCh(cm connectMethod) chan *persistConn { func (t *Transport) queueForIdleConn(w *wantConn) (delivered bool) {
if t.DisableKeepAlives { if t.DisableKeepAlives {
return nil return false
} }
key := cm.key()
t.idleMu.Lock() t.idleMu.Lock()
defer t.idleMu.Unlock() defer t.idleMu.Unlock()
t.wantIdle = false
if t.idleConnCh == nil { // Stop closing connections that become idle - we might want one.
t.idleConnCh = make(map[connectMethodKey]chan *persistConn) // (That is, undo the effect of t.CloseIdleConnections.)
} t.closeIdle = false
ch, ok := t.idleConnCh[key]
if !ok { if w == nil {
ch = make(chan *persistConn) // Happens in test hook.
t.idleConnCh[key] = ch return false
} }
return ch
}
func (t *Transport) getIdleConn(cm connectMethod) (pconn *persistConn, idleSince time.Time) { // Look for most recently-used idle connection.
key := cm.key() if list, ok := t.idleConn[w.key]; ok {
t.idleMu.Lock() stop := false
defer t.idleMu.Unlock() delivered := false
for { for len(list) > 0 && !stop {
pconns, ok := t.idleConn[key] pconn := list[len(list)-1]
if !ok { if pconn.isBroken() {
return nil, time.Time{} // persistConn.readLoop has marked the connection broken,
// but Transport.removeIdleConn has not yet removed it from the idle list.
// Drop on floor on behalf of Transport.removeIdleConn.
list = list[:len(list)-1]
continue
}
delivered = w.tryDeliver(pconn, nil)
if delivered {
if pconn.alt != nil {
// HTTP/2: multiple clients can share pconn.
// Leave it in the list.
} else {
// HTTP/1: only one client can use pconn.
// Remove it from the list.
t.idleLRU.remove(pconn)
list = list[:len(list)-1]
}
}
stop = true
} }
if len(pconns) == 1 { if len(list) > 0 {
pconn = pconns[0] t.idleConn[w.key] = list
delete(t.idleConn, key)
} else { } else {
// 2 or more cached connections; use the most delete(t.idleConn, w.key)
// recently used one at the end.
pconn = pconns[len(pconns)-1]
t.idleConn[key] = pconns[:len(pconns)-1]
} }
t.idleLRU.remove(pconn) if stop {
if pconn.isBroken() { return delivered
// There is a tiny window where this is
// possible, between the connecting dying and
// the persistConn readLoop calling
// Transport.removeIdleConn. Just skip it and
// carry on.
continue
} }
return pconn, pconn.idleAt
} }
// Register to receive next connection that becomes idle.
if t.idleConnWait == nil {
t.idleConnWait = make(map[connectMethodKey]wantConnQueue)
}
q := t.idleConnWait[w.key]
q.pushBack(w)
t.idleConnWait[w.key] = q
return false
} }
// removeIdleConn marks pconn as dead. // removeIdleConn marks pconn as dead.
...@@ -1015,20 +1040,147 @@ func (t *Transport) dial(ctx context.Context, network, addr string) (net.Conn, e ...@@ -1015,20 +1040,147 @@ func (t *Transport) dial(ctx context.Context, network, addr string) (net.Conn, e
return zeroDialer.DialContext(ctx, network, addr) return zeroDialer.DialContext(ctx, network, addr)
} }
// A wantConn records state about a wanted connection
// (that is, an active call to getConn).
// The conn may be gotten by dialing or by finding an idle connection,
// or a cancellation may make the conn no longer wanted.
// These three options are racing against each other and use
// wantConn to coordinate and agree about the winning outcome.
type wantConn struct {
cm connectMethod
key connectMethodKey // cm.key()
ctx context.Context // context for dial
ready chan struct{} // closed when pc, err pair is delivered
// hooks for testing to know when dials are done
// beforeDial is called in the getConn goroutine when the dial is queued.
// afterDial is called when the dial is completed or cancelled.
beforeDial func()
afterDial func()
mu sync.Mutex // protects pc, err, close(ready)
pc *persistConn
err error
}
// waiting reports whether w is still waiting for an answer (connection or error).
func (w *wantConn) waiting() bool {
select {
case <-w.ready:
return false
default:
return true
}
}
// tryDeliver attempts to deliver pc, err to w and reports whether it succeeded.
func (w *wantConn) tryDeliver(pc *persistConn, err error) bool {
w.mu.Lock()
defer w.mu.Unlock()
if w.pc != nil || w.err != nil {
return false
}
w.pc = pc
w.err = err
if w.pc == nil && w.err == nil {
panic("net/http: internal error: misuse of tryDeliver")
}
close(w.ready)
return true
}
// cancel marks w as no longer wanting a result (for example, due to cancellation).
// If a connection has been delivered already, cancel returns it with t.putOrCloseIdleConn.
func (w *wantConn) cancel(t *Transport, err error) {
w.mu.Lock()
if w.pc == nil && w.err == nil {
close(w.ready) // catch misbehavior in future delivery
}
pc := w.pc
w.pc = nil
w.err = err
w.mu.Unlock()
if pc != nil {
t.putOrCloseIdleConn(pc)
}
}
// A wantConnQueue is a queue of wantConns.
type wantConnQueue struct {
// This is a queue, not a deque.
// It is split into two stages - head[headPos:] and tail.
// popFront is trivial (headPos++) on the first stage, and
// pushBack is trivial (append) on the second stage.
// If the first stage is empty, popFront can swap the
// first and second stages to remedy the situation.
//
// This two-stage split is analogous to the use of two lists
// in Okasaki's purely functional queue but without the
// overhead of reversing the list when swapping stages.
head []*wantConn
headPos int
tail []*wantConn
}
// len returns the number of items in the queue.
func (q *wantConnQueue) len() int {
return len(q.head) - q.headPos + len(q.tail)
}
// pushBack adds w to the back of the queue.
func (q *wantConnQueue) pushBack(w *wantConn) {
q.tail = append(q.tail, w)
}
// popFront removes and returns the w at the front of the queue.
func (q *wantConnQueue) popFront() *wantConn {
if q.headPos >= len(q.head) {
if len(q.tail) == 0 {
return nil
}
// Pick up tail as new head, clear tail.
q.head, q.headPos, q.tail = q.tail, 0, q.head[:0]
}
w := q.head[q.headPos]
q.head[q.headPos] = nil
q.headPos++
return w
}
// getConn dials and creates a new persistConn to the target as // getConn dials and creates a new persistConn to the target as
// specified in the connectMethod. This includes doing a proxy CONNECT // specified in the connectMethod. This includes doing a proxy CONNECT
// and/or setting up TLS. If this doesn't return an error, the persistConn // and/or setting up TLS. If this doesn't return an error, the persistConn
// is ready to write requests to. // is ready to write requests to.
func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (*persistConn, error) { func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (pc *persistConn, err error) {
req := treq.Request req := treq.Request
trace := treq.trace trace := treq.trace
ctx := req.Context() ctx := req.Context()
if trace != nil && trace.GetConn != nil { if trace != nil && trace.GetConn != nil {
trace.GetConn(cm.addr()) trace.GetConn(cm.addr())
} }
if pc, idleSince := t.getIdleConn(cm); pc != nil {
w := &wantConn{
cm: cm,
key: cm.key(),
ctx: ctx,
ready: make(chan struct{}, 1),
beforeDial: testHookPrePendingDial,
afterDial: testHookPostPendingDial,
}
defer func() {
if err != nil {
w.cancel(t, err)
}
}()
// Queue for idle connection.
if delivered := t.queueForIdleConn(w); delivered {
pc := w.pc
if trace != nil && trace.GotConn != nil { if trace != nil && trace.GotConn != nil {
trace.GotConn(pc.gotIdleConnTrace(idleSince)) trace.GotConn(pc.gotIdleConnTrace(pc.idleAt))
} }
// set request canceler to some non-nil function so we // set request canceler to some non-nil function so we
// can detect whether it was cleared between now and when // can detect whether it was cleared between now and when
...@@ -1037,108 +1189,44 @@ func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (*persistC ...@@ -1037,108 +1189,44 @@ func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (*persistC
return pc, nil return pc, nil
} }
type dialRes struct {
pc *persistConn
err error
}
dialc := make(chan dialRes)
cmKey := cm.key()
// Copy these hooks so we don't race on the postPendingDial in
// the goroutine we launch. Issue 11136.
testHookPrePendingDial := testHookPrePendingDial
testHookPostPendingDial := testHookPostPendingDial
handlePendingDial := func() {
testHookPrePendingDial()
go func() {
if v := <-dialc; v.err == nil {
t.putOrCloseIdleConn(v.pc)
} else {
t.decHostConnCount(cmKey)
}
testHookPostPendingDial()
}()
}
cancelc := make(chan error, 1) cancelc := make(chan error, 1)
t.setReqCanceler(req, func(err error) { cancelc <- err }) t.setReqCanceler(req, func(err error) { cancelc <- err })
if t.MaxConnsPerHost > 0 { // Queue for permission to dial.
select { t.queueForDial(w)
case <-t.incHostConnCount(cmKey):
// count below conn per host limit; proceed
case pc := <-t.getIdleConnCh(cm):
if trace != nil && trace.GotConn != nil {
trace.GotConn(httptrace.GotConnInfo{Conn: pc.conn, Reused: pc.isReused()})
}
return pc, nil
case <-req.Cancel:
return nil, errRequestCanceledConn
case <-req.Context().Done():
return nil, req.Context().Err()
case err := <-cancelc:
if err == errRequestCanceled {
err = errRequestCanceledConn
}
return nil, err
}
}
go func() { // Wait for completion or cancellation.
pc, err := t.dialConn(ctx, cm)
dialc <- dialRes{pc, err}
}()
idleConnCh := t.getIdleConnCh(cm)
select { select {
case v := <-dialc: case <-w.ready:
// Our dial finished. // Trace success but only for HTTP/1.
if v.pc != nil { // HTTP/2 calls trace.GotConn itself.
if trace != nil && trace.GotConn != nil && v.pc.alt == nil { if w.pc != nil && w.pc.alt == nil && trace != nil && trace.GotConn != nil {
trace.GotConn(httptrace.GotConnInfo{Conn: v.pc.conn}) trace.GotConn(httptrace.GotConnInfo{Conn: w.pc.conn, Reused: w.pc.isReused()})
} }
return v.pc, nil if w.err != nil {
} // If the request has been cancelled, that's probably
// Our dial failed. See why to return a nicer error // what caused w.err; if so, prefer to return the
// value. // cancellation error (see golang.org/issue/16049).
t.decHostConnCount(cmKey) select {
select { case <-req.Cancel:
case <-req.Cancel: return nil, errRequestCanceledConn
// It was an error due to cancellation, so prioritize that case <-req.Context().Done():
// error value. (Issue 16049) return nil, req.Context().Err()
return nil, errRequestCanceledConn case err := <-cancelc:
case <-req.Context().Done(): if err == errRequestCanceled {
return nil, req.Context().Err() err = errRequestCanceledConn
case err := <-cancelc: }
if err == errRequestCanceled { return nil, err
err = errRequestCanceledConn default:
// return below
} }
return nil, err
default:
// It wasn't an error due to cancellation, so
// return the original error message:
return nil, v.err
}
case pc := <-idleConnCh:
// Another request finished first and its net.Conn
// became available before our dial. Or somebody
// else's dial that they didn't use.
// But our dial is still going, so give it away
// when it finishes:
handlePendingDial()
if trace != nil && trace.GotConn != nil {
trace.GotConn(httptrace.GotConnInfo{Conn: pc.conn, Reused: pc.isReused()})
} }
return pc, nil return w.pc, w.err
case <-req.Cancel: case <-req.Cancel:
handlePendingDial()
return nil, errRequestCanceledConn return nil, errRequestCanceledConn
case <-req.Context().Done(): case <-req.Context().Done():
handlePendingDial()
return nil, req.Context().Err() return nil, req.Context().Err()
case err := <-cancelc: case err := <-cancelc:
handlePendingDial()
if err == errRequestCanceled { if err == errRequestCanceled {
err = errRequestCanceledConn err = errRequestCanceledConn
} }
...@@ -1146,81 +1234,102 @@ func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (*persistC ...@@ -1146,81 +1234,102 @@ func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (*persistC
} }
} }
// incHostConnCount increments the count of connections for a // queueForDial queues w to wait for permission to begin dialing.
// given host. It returns an already-closed channel if the count // Once w receives permission to dial, it will do so in a separate goroutine.
// is not at its limit; otherwise it returns a channel which is func (t *Transport) queueForDial(w *wantConn) {
// notified when the count is below the limit. w.beforeDial()
func (t *Transport) incHostConnCount(cmKey connectMethodKey) <-chan struct{} {
if t.MaxConnsPerHost <= 0 { if t.MaxConnsPerHost <= 0 {
return connsPerHostClosedCh go t.dialConnFor(w)
return
} }
t.connCountMu.Lock()
defer t.connCountMu.Unlock() t.connsPerHostMu.Lock()
if t.connPerHostCount[cmKey] == t.MaxConnsPerHost { defer t.connsPerHostMu.Unlock()
if t.connPerHostAvailable == nil {
t.connPerHostAvailable = make(map[connectMethodKey]chan struct{}) if n := t.connsPerHost[w.key]; n < t.MaxConnsPerHost {
} if t.connsPerHost == nil {
ch, ok := t.connPerHostAvailable[cmKey] t.connsPerHost = make(map[connectMethodKey]int)
if !ok {
ch = make(chan struct{})
t.connPerHostAvailable[cmKey] = ch
} }
return ch t.connsPerHost[w.key] = n + 1
go t.dialConnFor(w)
return
}
if t.connsPerHostWait == nil {
t.connsPerHostWait = make(map[connectMethodKey]wantConnQueue)
}
q := t.connsPerHostWait[w.key]
q.pushBack(w)
t.connsPerHostWait[w.key] = q
}
// dialConnFor dials on behalf of w and delivers the result to w.
// dialConnFor has received permission to dial w.cm and is counted in t.connCount[w.cm.key()].
// If the dial is cancelled or unsuccessful, dialConnFor decrements t.connCount[w.cm.key()].
func (t *Transport) dialConnFor(w *wantConn) {
defer w.afterDial()
pc, err := t.dialConn(w.ctx, w.cm)
delivered := w.tryDeliver(pc, err)
if err == nil && (!delivered || pc.alt != nil) {
// pconn was not passed to w,
// or it is HTTP/2 and can be shared.
// Add to the idle connection pool.
t.putOrCloseIdleConn(pc)
} }
if t.connPerHostCount == nil { if err != nil {
t.connPerHostCount = make(map[connectMethodKey]int) t.decConnsPerHost(w.key)
} }
t.connPerHostCount[cmKey]++
// return a closed channel to avoid race: if decHostConnCount is called
// after incHostConnCount and during the nil check, decHostConnCount
// will delete the channel since it's not being listened on yet.
return connsPerHostClosedCh
} }
// decHostConnCount decrements the count of connections // decConnsPerHost decrements the per-host connection count for key,
// for a given host. // which may in turn give a different waiting goroutine permission to dial.
// See Transport.MaxConnsPerHost. func (t *Transport) decConnsPerHost(key connectMethodKey) {
func (t *Transport) decHostConnCount(cmKey connectMethodKey) {
if t.MaxConnsPerHost <= 0 { if t.MaxConnsPerHost <= 0 {
return return
} }
t.connCountMu.Lock()
defer t.connCountMu.Unlock() t.connsPerHostMu.Lock()
t.connPerHostCount[cmKey]-- defer t.connsPerHostMu.Unlock()
select { n := t.connsPerHost[key]
case t.connPerHostAvailable[cmKey] <- struct{}{}: if n == 0 {
default: // Shouldn't happen, but if it does, the counting is buggy and could
// close channel before deleting avoids getConn waiting forever in // easily lead to a silent deadlock, so report the problem loudly.
// case getConn has reference to channel but hasn't started waiting. panic("net/http: internal error: connCount underflow")
// This could lead to more than MaxConnsPerHost in the unlikely case }
// that > 1 go routine has fetched the channel but none started waiting.
if t.connPerHostAvailable[cmKey] != nil { // Can we hand this count to a goroutine still waiting to dial?
close(t.connPerHostAvailable[cmKey]) // (Some goroutines on the wait list may have timed out or
// gotten a connection another way. If they're all gone,
// we don't want to kick off any spurious dial operations.)
if q := t.connsPerHostWait[key]; q.len() > 0 {
done := false
for q.len() > 0 {
w := q.popFront()
if w.waiting() {
go t.dialConnFor(w)
done = true
break
}
}
if q.len() == 0 {
delete(t.connsPerHostWait, key)
} else {
// q is a value (like a slice), so we have to store
// the updated q back into the map.
t.connsPerHostWait[key] = q
}
if done {
return
} }
delete(t.connPerHostAvailable, cmKey)
}
if t.connPerHostCount[cmKey] == 0 {
delete(t.connPerHostCount, cmKey)
} }
}
// connCloseListener wraps a connection, the transport that dialed it
// and the connected-to host key so the host connection count can be
// transparently decremented by whatever closes the embedded connection.
type connCloseListener struct {
net.Conn
t *Transport
cmKey connectMethodKey
didClose int32
}
func (c *connCloseListener) Close() error { // Otherwise, decrement the recorded count.
if atomic.AddInt32(&c.didClose, 1) != 1 { if n--; n == 0 {
return nil delete(t.connsPerHost, key)
} else {
t.connsPerHost[key] = n
} }
err := c.Conn.Close()
c.t.decHostConnCount(c.cmKey)
return err
} }
// The connect method and the transport can both specify a TLS // The connect method and the transport can both specify a TLS
...@@ -1283,8 +1392,8 @@ func (pconn *persistConn) addTLS(name string, trace *httptrace.ClientTrace) erro ...@@ -1283,8 +1392,8 @@ func (pconn *persistConn) addTLS(name string, trace *httptrace.ClientTrace) erro
return nil return nil
} }
func (t *Transport) dialConn(ctx context.Context, cm connectMethod) (*persistConn, error) { func (t *Transport) dialConn(ctx context.Context, cm connectMethod) (pconn *persistConn, err error) {
pconn := &persistConn{ pconn = &persistConn{
t: t, t: t,
cacheKey: cm.key(), cacheKey: cm.key(),
reqch: make(chan requestAndChan, 1), reqch: make(chan requestAndChan, 1),
...@@ -1423,9 +1532,6 @@ func (t *Transport) dialConn(ctx context.Context, cm connectMethod) (*persistCon ...@@ -1423,9 +1532,6 @@ func (t *Transport) dialConn(ctx context.Context, cm connectMethod) (*persistCon
} }
} }
if t.MaxConnsPerHost > 0 {
pconn.conn = &connCloseListener{Conn: pconn.conn, t: t, cmKey: pconn.cacheKey}
}
pconn.br = bufio.NewReaderSize(pconn, t.readBufferSize()) pconn.br = bufio.NewReaderSize(pconn, t.readBufferSize())
pconn.bw = bufio.NewWriterSize(persistConnWriter{pconn}, t.writeBufferSize()) pconn.bw = bufio.NewWriterSize(persistConnWriter{pconn}, t.writeBufferSize())
...@@ -1631,7 +1737,7 @@ func (pc *persistConn) canceled() error { ...@@ -1631,7 +1737,7 @@ func (pc *persistConn) canceled() error {
return pc.canceledErr return pc.canceledErr
} }
// isReused reports whether this connection is in a known broken state. // isReused reports whether this connection has been used before.
func (pc *persistConn) isReused() bool { func (pc *persistConn) isReused() bool {
pc.mu.Lock() pc.mu.Lock()
r := pc.reused r := pc.reused
...@@ -2119,10 +2225,12 @@ func (pc *persistConn) wroteRequest() bool { ...@@ -2119,10 +2225,12 @@ func (pc *persistConn) wroteRequest() bool {
// but the server has already replied. In this case, we don't // but the server has already replied. In this case, we don't
// want to wait too long, and we want to return false so this // want to wait too long, and we want to return false so this
// connection isn't re-used. // connection isn't re-used.
t := time.NewTimer(maxWriteWaitBeforeConnReuse)
defer t.Stop()
select { select {
case err := <-pc.writeErrCh: case err := <-pc.writeErrCh:
return err == nil return err == nil
case <-time.After(maxWriteWaitBeforeConnReuse): case <-t.C:
return false return false
} }
} }
...@@ -2374,10 +2482,10 @@ func (pc *persistConn) closeLocked(err error) { ...@@ -2374,10 +2482,10 @@ func (pc *persistConn) closeLocked(err error) {
pc.broken = true pc.broken = true
if pc.closed == nil { if pc.closed == nil {
pc.closed = err pc.closed = err
if pc.alt != nil { pc.t.decConnsPerHost(pc.cacheKey)
// Clean up any host connection counting. // Close HTTP/1 (pc.alt == nil) connection.
pc.t.decHostConnCount(pc.cacheKey) // HTTP/2 closes its connection itself.
} else { if pc.alt == nil {
if err != errCallerOwnsConn { if err != errCallerOwnsConn {
pc.conn.Close() pc.conn.Close()
} }
......
...@@ -655,13 +655,17 @@ func TestTransportMaxConnsPerHost(t *testing.T) { ...@@ -655,13 +655,17 @@ func TestTransportMaxConnsPerHost(t *testing.T) {
expected := int32(tr.MaxConnsPerHost) expected := int32(tr.MaxConnsPerHost)
if dialCnt != expected { if dialCnt != expected {
t.Errorf("Too many dials (%s): %d", scheme, dialCnt) t.Errorf("round 1: too many dials (%s): %d != %d", scheme, dialCnt, expected)
} }
if gotConnCnt != expected { if gotConnCnt != expected {
t.Errorf("Too many get connections (%s): %d", scheme, gotConnCnt) t.Errorf("round 1: too many get connections (%s): %d != %d", scheme, gotConnCnt, expected)
} }
if ts.TLS != nil && tlsHandshakeCnt != expected { if ts.TLS != nil && tlsHandshakeCnt != expected {
t.Errorf("Too many tls handshakes (%s): %d", scheme, tlsHandshakeCnt) t.Errorf("round 1: too many tls handshakes (%s): %d != %d", scheme, tlsHandshakeCnt, expected)
}
if t.Failed() {
t.FailNow()
} }
(<-connCh).Close() (<-connCh).Close()
...@@ -670,13 +674,13 @@ func TestTransportMaxConnsPerHost(t *testing.T) { ...@@ -670,13 +674,13 @@ func TestTransportMaxConnsPerHost(t *testing.T) {
doReq() doReq()
expected++ expected++
if dialCnt != expected { if dialCnt != expected {
t.Errorf("Too many dials (%s): %d", scheme, dialCnt) t.Errorf("round 2: too many dials (%s): %d", scheme, dialCnt)
} }
if gotConnCnt != expected { if gotConnCnt != expected {
t.Errorf("Too many get connections (%s): %d", scheme, gotConnCnt) t.Errorf("round 2: too many get connections (%s): %d != %d", scheme, gotConnCnt, expected)
} }
if ts.TLS != nil && tlsHandshakeCnt != expected { if ts.TLS != nil && tlsHandshakeCnt != expected {
t.Errorf("Too many tls handshakes (%s): %d", scheme, tlsHandshakeCnt) t.Errorf("round 2: too many tls handshakes (%s): %d != %d", scheme, tlsHandshakeCnt, expected)
} }
} }
...@@ -2795,8 +2799,8 @@ func TestIdleConnChannelLeak(t *testing.T) { ...@@ -2795,8 +2799,8 @@ func TestIdleConnChannelLeak(t *testing.T) {
<-didRead <-didRead
} }
if got := tr.IdleConnChMapSizeForTesting(); got != 0 { if got := tr.IdleConnWaitMapSizeForTesting(); got != 0 {
t.Fatalf("ForDisableKeepAlives = %v, map size = %d; want 0", disableKeep, got) t.Fatalf("for DisableKeepAlives = %v, map size = %d; want 0", disableKeep, got)
} }
} }
} }
...@@ -3378,9 +3382,9 @@ func TestTransportCloseIdleConnsThenReturn(t *testing.T) { ...@@ -3378,9 +3382,9 @@ func TestTransportCloseIdleConnsThenReturn(t *testing.T) {
} }
wantIdle("after second put", 0) wantIdle("after second put", 0)
tr.RequestIdleConnChForTesting() // should toggle the transport out of idle mode tr.QueueForIdleConnForTesting() // should toggle the transport out of idle mode
if tr.IsIdleForTesting() { if tr.IsIdleForTesting() {
t.Error("shouldn't be idle after RequestIdleConnChForTesting") t.Error("shouldn't be idle after QueueForIdleConnForTesting")
} }
if !tr.PutIdleTestConn("http", "example.com") { if !tr.PutIdleTestConn("http", "example.com") {
t.Fatal("after re-activation") t.Fatal("after re-activation")
...@@ -3802,8 +3806,8 @@ func TestNoCrashReturningTransportAltConn(t *testing.T) { ...@@ -3802,8 +3806,8 @@ func TestNoCrashReturningTransportAltConn(t *testing.T) {
ln := newLocalListener(t) ln := newLocalListener(t)
defer ln.Close() defer ln.Close()
handledPendingDial := make(chan bool, 1) var wg sync.WaitGroup
SetPendingDialHooks(nil, func() { handledPendingDial <- true }) SetPendingDialHooks(func() { wg.Add(1) }, wg.Done)
defer SetPendingDialHooks(nil, nil) defer SetPendingDialHooks(nil, nil)
testDone := make(chan struct{}) testDone := make(chan struct{})
...@@ -3873,7 +3877,7 @@ func TestNoCrashReturningTransportAltConn(t *testing.T) { ...@@ -3873,7 +3877,7 @@ func TestNoCrashReturningTransportAltConn(t *testing.T) {
doReturned <- true doReturned <- true
<-madeRoundTripper <-madeRoundTripper
<-handledPendingDial wg.Wait()
} }
func TestTransportReuseConnection_Gzip_Chunked(t *testing.T) { func TestTransportReuseConnection_Gzip_Chunked(t *testing.T) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment