Commit 81b2ea4d authored by Brad Fitzpatrick's avatar Brad Fitzpatrick

net/http: add Transport.MaxIdleConns limit

The HTTP client had a limit for the maximum number of idle connections
per-host, but not a global limit.

This CLs adds a global idle connection limit too,
Transport.MaxIdleConns.

All idle conns are now also stored in a doubly-linked list. When there
are too many, the oldest one is closed.

Fixes #15461

Change-Id: I72abbc28d140c73cf50f278fa70088b45ae0deef
Reviewed-on: https://go-review.googlesource.com/22655Reviewed-by: default avatarAndrew Gerrand <adg@golang.org>
Run-TryBot: Brad Fitzpatrick <bradfitz@golang.org>
TryBot-Result: Gobot Gobot <gobot@golang.org>
parent 38cfaa5f
...@@ -360,7 +360,7 @@ var pkgDeps = map[string][]string{ ...@@ -360,7 +360,7 @@ var pkgDeps = map[string][]string{
// HTTP, kingpin of dependencies. // HTTP, kingpin of dependencies.
"net/http": { "net/http": {
"L4", "NET", "OS", "L4", "NET", "OS",
"context", "compress/gzip", "crypto/tls", "context", "compress/gzip", "container/list", "crypto/tls",
"mime/multipart", "runtime/debug", "mime/multipart", "runtime/debug",
"net/http/internal", "net/http/internal",
"golang.org/x/net/http2/hpack", "golang.org/x/net/http2/hpack",
......
...@@ -9,6 +9,7 @@ package http ...@@ -9,6 +9,7 @@ package http
import ( import (
"net" "net"
"sort"
"sync" "sync"
"time" "time"
) )
...@@ -86,6 +87,7 @@ func (t *Transport) IdleConnKeysForTesting() (keys []string) { ...@@ -86,6 +87,7 @@ func (t *Transport) IdleConnKeysForTesting() (keys []string) {
for key := range t.idleConn { for key := range t.idleConn {
keys = append(keys, key.String()) keys = append(keys, key.String())
} }
sort.Strings(keys)
return return
} }
......
...@@ -12,6 +12,7 @@ package http ...@@ -12,6 +12,7 @@ package http
import ( import (
"bufio" "bufio"
"compress/gzip" "compress/gzip"
"container/list"
"context" "context"
"crypto/tls" "crypto/tls"
"errors" "errors"
...@@ -38,6 +39,7 @@ var DefaultTransport RoundTripper = &Transport{ ...@@ -38,6 +39,7 @@ var DefaultTransport RoundTripper = &Transport{
Timeout: 30 * time.Second, Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second, KeepAlive: 30 * time.Second,
}, },
MaxIdleConns: 100,
TLSHandshakeTimeout: 10 * time.Second, TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second, ExpectContinueTimeout: 1 * time.Second,
} }
...@@ -69,6 +71,7 @@ type Transport struct { ...@@ -69,6 +71,7 @@ type Transport struct {
idleCount int idleCount int
idleConn map[connectMethodKey][]*persistConn idleConn map[connectMethodKey][]*persistConn
idleConnCh map[connectMethodKey]chan *persistConn idleConnCh map[connectMethodKey]chan *persistConn
idleLRU connLRU
reqMu sync.Mutex reqMu sync.Mutex
reqCanceler map[*Request]func() reqCanceler map[*Request]func()
...@@ -127,6 +130,10 @@ type Transport struct { ...@@ -127,6 +130,10 @@ type Transport struct {
// uncompressed. // uncompressed.
DisableCompression bool DisableCompression bool
// MaxIdleConns controls the maximum number of idle (keep-alive)
// connections across all hosts. Zero means no limit.
MaxIdleConns int
// MaxIdleConnsPerHost, if non-zero, controls the maximum idle // MaxIdleConnsPerHost, if non-zero, controls the maximum idle
// (keep-alive) connections to keep per-host. If zero, // (keep-alive) connections to keep per-host. If zero,
// DefaultMaxIdleConnsPerHost is used. // DefaultMaxIdleConnsPerHost is used.
...@@ -455,6 +462,8 @@ func (t *Transport) CloseIdleConnections() { ...@@ -455,6 +462,8 @@ func (t *Transport) CloseIdleConnections() {
t.idleConn = nil t.idleConn = nil
t.idleConnCh = nil t.idleConnCh = nil
t.wantIdle = true t.wantIdle = true
t.idleCount = 0
t.idleLRU = connLRU{}
t.idleMu.Unlock() t.idleMu.Unlock()
for _, conns := range m { for _, conns := range m {
for _, pconn := range conns { for _, pconn := range conns {
...@@ -555,6 +564,7 @@ var ( ...@@ -555,6 +564,7 @@ var (
errConnBroken = errors.New("http: putIdleConn: connection is in bad state") errConnBroken = errors.New("http: putIdleConn: connection is in bad state")
errWantIdle = errors.New("http: putIdleConn: CloseIdleConnections was called") errWantIdle = errors.New("http: putIdleConn: CloseIdleConnections was called")
errTooManyIdle = errors.New("http: putIdleConn: too many idle connections") errTooManyIdle = errors.New("http: putIdleConn: too many idle connections")
errTooManyIdleHost = errors.New("http: putIdleConn: too many idle connections for host")
errCloseIdleConns = errors.New("http: CloseIdleConnections called") errCloseIdleConns = errors.New("http: CloseIdleConnections called")
errReadLoopExiting = errors.New("http: persistConn.readLoop exiting") errReadLoopExiting = errors.New("http: persistConn.readLoop exiting")
errServerClosedIdle = errors.New("http: server closed idle conn") errServerClosedIdle = errors.New("http: server closed idle conn")
...@@ -566,6 +576,13 @@ func (t *Transport) putOrCloseIdleConn(pconn *persistConn) { ...@@ -566,6 +576,13 @@ func (t *Transport) putOrCloseIdleConn(pconn *persistConn) {
} }
} }
func (t *Transport) maxIdleConnsPerHost() int {
if v := t.MaxIdleConnsPerHost; v != 0 {
return v
}
return DefaultMaxIdleConnsPerHost
}
// tryPutIdleConn adds pconn to the list of idle persistent connections awaiting // tryPutIdleConn adds pconn to the list of idle persistent connections awaiting
// a new request. // a new request.
// If pconn is no longer needed or not in a good state, tryPutIdleConn returns // If pconn is no longer needed or not in a good state, tryPutIdleConn returns
...@@ -578,12 +595,8 @@ func (t *Transport) tryPutIdleConn(pconn *persistConn) error { ...@@ -578,12 +595,8 @@ func (t *Transport) tryPutIdleConn(pconn *persistConn) error {
if pconn.isBroken() { if pconn.isBroken() {
return errConnBroken return errConnBroken
} }
key := pconn.cacheKey
max := t.MaxIdleConnsPerHost
if max == 0 {
max = DefaultMaxIdleConnsPerHost
}
pconn.markReused() pconn.markReused()
key := pconn.cacheKey
t.idleMu.Lock() t.idleMu.Lock()
defer t.idleMu.Unlock() defer t.idleMu.Unlock()
...@@ -611,17 +624,22 @@ func (t *Transport) tryPutIdleConn(pconn *persistConn) error { ...@@ -611,17 +624,22 @@ func (t *Transport) tryPutIdleConn(pconn *persistConn) error {
t.idleConn = make(map[connectMethodKey][]*persistConn) t.idleConn = make(map[connectMethodKey][]*persistConn)
} }
idles := t.idleConn[key] idles := t.idleConn[key]
if len(idles) >= max { if len(idles) >= t.maxIdleConnsPerHost() {
return errTooManyIdle return errTooManyIdleHost
} }
for _, exist := range idles { for _, exist := range idles {
if exist == pconn { if exist == pconn {
log.Fatalf("dup idle pconn %p in freelist", pconn) log.Fatalf("dup idle pconn %p in freelist", pconn)
} }
} }
t.idleConn[key] = append(idles, pconn) t.idleConn[key] = append(idles, pconn)
t.idleCount++ t.idleCount++
t.idleLRU.add(pconn)
if t.MaxIdleConns != 0 && t.idleLRU.len() > t.MaxIdleConns {
oldest := t.idleLRU.removeOldest()
oldest.close(errTooManyIdle)
t.removeIdleConnLocked(oldest)
}
pconn.idleAt = time.Now() pconn.idleAt = time.Now()
return nil return nil
} }
...@@ -661,12 +679,13 @@ func (t *Transport) getIdleConn(cm connectMethod) (pconn *persistConn, idleSince ...@@ -661,12 +679,13 @@ func (t *Transport) getIdleConn(cm connectMethod) (pconn *persistConn, idleSince
pconn = pconns[0] pconn = pconns[0]
delete(t.idleConn, key) delete(t.idleConn, key)
} else { } else {
// 2 or more cached connections; pop last // 2 or more cached connections; use the most
// TODO: queue? // recently used one.
pconn = pconns[len(pconns)-1] pconn = pconns[len(pconns)-1]
t.idleConn[key] = pconns[:len(pconns)-1] t.idleConn[key] = pconns[:len(pconns)-1]
} }
t.idleCount-- t.idleCount--
t.idleLRU.remove(pconn)
if pconn.isBroken() { if pconn.isBroken() {
// There is a tiny window where this is // There is a tiny window where this is
// possible, between the connecting dying and // possible, between the connecting dying and
...@@ -681,10 +700,15 @@ func (t *Transport) getIdleConn(cm connectMethod) (pconn *persistConn, idleSince ...@@ -681,10 +700,15 @@ func (t *Transport) getIdleConn(cm connectMethod) (pconn *persistConn, idleSince
// removeIdleConn marks pconn as dead. // removeIdleConn marks pconn as dead.
func (t *Transport) removeIdleConn(pconn *persistConn) { func (t *Transport) removeIdleConn(pconn *persistConn) {
key := pconn.cacheKey
t.idleMu.Lock() t.idleMu.Lock()
defer t.idleMu.Unlock() defer t.idleMu.Unlock()
t.removeIdleConnLocked(pconn)
}
// t.idleMu must be held.
func (t *Transport) removeIdleConnLocked(pconn *persistConn) {
t.idleLRU.remove(pconn)
key := pconn.cacheKey
pconns, _ := t.idleConn[key] pconns, _ := t.idleConn[key]
switch len(pconns) { switch len(pconns) {
case 0: case 0:
...@@ -695,7 +719,6 @@ func (t *Transport) removeIdleConn(pconn *persistConn) { ...@@ -695,7 +719,6 @@ func (t *Transport) removeIdleConn(pconn *persistConn) {
delete(t.idleConn, key) delete(t.idleConn, key)
} }
default: default:
// TODO(bradfitz): map into LRU element?
for i, v := range pconns { for i, v := range pconns {
if v != pconn { if v != pconn {
continue continue
...@@ -1944,3 +1967,42 @@ func cloneTLSClientConfig(cfg *tls.Config) *tls.Config { ...@@ -1944,3 +1967,42 @@ func cloneTLSClientConfig(cfg *tls.Config) *tls.Config {
Renegotiation: cfg.Renegotiation, Renegotiation: cfg.Renegotiation,
} }
} }
type connLRU struct {
ll *list.List // list.Element.Value type is of *persistConn
m map[*persistConn]*list.Element
}
// addO adds pc to the head of the linked list.
func (cl *connLRU) add(pc *persistConn) {
if cl.ll == nil {
cl.ll = list.New()
cl.m = make(map[*persistConn]*list.Element)
}
ele := cl.ll.PushFront(pc)
if _, ok := cl.m[pc]; ok {
panic("persistConn was already in LRU")
}
cl.m[pc] = ele
}
func (cl *connLRU) removeOldest() *persistConn {
ele := cl.ll.Back()
pc := ele.Value.(*persistConn)
cl.ll.Remove(ele)
delete(cl.m, pc)
return pc
}
// remove removes pc from cl.
func (cl *connLRU) remove(pc *persistConn) {
if ele, ok := cl.m[pc]; ok {
cl.ll.Remove(ele)
delete(cl.m, pc)
}
}
// len returns the number of items in the cache.
func (cl *connLRU) len() int {
return len(cl.m)
}
...@@ -383,8 +383,8 @@ func TestTransportMaxPerHostIdleConns(t *testing.T) { ...@@ -383,8 +383,8 @@ func TestTransportMaxPerHostIdleConns(t *testing.T) {
} }
})) }))
defer ts.Close() defer ts.Close()
maxIdleConns := 2 maxIdleConnsPerHost := 2
tr := &Transport{DisableKeepAlives: false, MaxIdleConnsPerHost: maxIdleConns} tr := &Transport{DisableKeepAlives: false, MaxIdleConnsPerHost: maxIdleConnsPerHost}
c := &Client{Transport: tr} c := &Client{Transport: tr}
// Start 3 outstanding requests and wait for the server to get them. // Start 3 outstanding requests and wait for the server to get them.
...@@ -429,14 +429,14 @@ func TestTransportMaxPerHostIdleConns(t *testing.T) { ...@@ -429,14 +429,14 @@ func TestTransportMaxPerHostIdleConns(t *testing.T) {
resch <- "res2" resch <- "res2"
<-donech <-donech
if e, g := 2, tr.IdleConnCountForTesting(cacheKey); e != g { if g, w := tr.IdleConnCountForTesting(cacheKey), 2; g != w {
t.Errorf("after second response, expected %d idle conns; got %d", e, g) t.Errorf("after second response, idle conns = %d; want %d", g, w)
} }
resch <- "res3" resch <- "res3"
<-donech <-donech
if e, g := maxIdleConns, tr.IdleConnCountForTesting(cacheKey); e != g { if g, w := tr.IdleConnCountForTesting(cacheKey), maxIdleConnsPerHost; g != w {
t.Errorf("after third response, still expected %d idle conns; got %d", e, g) t.Errorf("after third response, idle conns = %d; want %d", g, w)
} }
} }
...@@ -3229,10 +3229,6 @@ func TestTransportEventTrace(t *testing.T) { ...@@ -3229,10 +3229,6 @@ func TestTransportEventTrace(t *testing.T) {
t.Errorf("unexpected DNS host lookup for %q", host) t.Errorf("unexpected DNS host lookup for %q", host)
return nil, nil return nil, nil
} }
if err != nil {
t.Error(err)
return nil, err
}
return []net.IPAddr{net.IPAddr{IP: net.ParseIP(ip)}}, nil return []net.IPAddr{net.IPAddr{IP: net.ParseIP(ip)}}, nil
}) })
...@@ -3291,6 +3287,61 @@ func TestTransportEventTrace(t *testing.T) { ...@@ -3291,6 +3287,61 @@ func TestTransportEventTrace(t *testing.T) {
} }
} }
func TestTransportMaxIdleConns(t *testing.T) {
defer afterTest(t)
ts := httptest.NewServer(HandlerFunc(func(w ResponseWriter, r *Request) {
// No body for convenience.
}))
defer ts.Close()
tr := &Transport{
MaxIdleConns: 4,
}
defer tr.CloseIdleConnections()
ip, port, err := net.SplitHostPort(ts.Listener.Addr().String())
if err != nil {
t.Fatal(err)
}
c := &Client{Transport: tr}
ctx := context.WithValue(context.Background(), nettrace.LookupIPAltResolverKey{}, func(ctx context.Context, host string) ([]net.IPAddr, error) {
return []net.IPAddr{net.IPAddr{IP: net.ParseIP(ip)}}, nil
})
hitHost := func(n int) {
req, _ := NewRequest("GET", fmt.Sprintf("http://host-%d.dns-is-faked.golang:"+port, n), nil)
req = req.WithContext(ctx)
res, err := c.Do(req)
if err != nil {
t.Fatal(err)
}
res.Body.Close()
}
for i := 0; i < 4; i++ {
hitHost(i)
}
want := []string{
"|http|host-0.dns-is-faked.golang:" + port,
"|http|host-1.dns-is-faked.golang:" + port,
"|http|host-2.dns-is-faked.golang:" + port,
"|http|host-3.dns-is-faked.golang:" + port,
}
if got := tr.IdleConnKeysForTesting(); !reflect.DeepEqual(got, want) {
t.Fatalf("idle conn keys mismatch.\n got: %q\nwant: %q\n", got, want)
}
// Now hitting the 5th host should kick out the first host:
hitHost(4)
want = []string{
"|http|host-1.dns-is-faked.golang:" + port,
"|http|host-2.dns-is-faked.golang:" + port,
"|http|host-3.dns-is-faked.golang:" + port,
"|http|host-4.dns-is-faked.golang:" + port,
}
if got := tr.IdleConnKeysForTesting(); !reflect.DeepEqual(got, want) {
t.Fatalf("idle conn keys mismatch after 5th host.\n got: %q\nwant: %q\n", got, want)
}
}
var errFakeRoundTrip = errors.New("fake roundtrip") var errFakeRoundTrip = errors.New("fake roundtrip")
type funcRoundTripper func() type funcRoundTripper func()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment