Commit 2e302182 authored by Brad Fitzpatrick's avatar Brad Fitzpatrick

net/http: remove idle transport connections from Transport when server closes

Previously the Transport would cache idle connections from the
Transport for later reuse, but if a peer server disconnected
(e.g. idle timeout), we would not proactively remove the *persistConn
from the Transport's idle list, leading to a waste of memory
(potentially forever).

Instead, when the persistConn's readLoop terminates, remote it from
the idle list, if present.

This also adds the beginning of accounting for the total number of
idle connections, which will be needed for Transport.MaxIdleConns
later.

Updates #15461

Change-Id: Iab091f180f8dd1ee0d78f34b9705d68743b5557b
Reviewed-on: https://go-review.googlesource.com/22492Reviewed-by: default avatarAndrew Gerrand <adg@golang.org>
parent 87bca88c
...@@ -120,3 +120,17 @@ func afterTest(t testing.TB) { ...@@ -120,3 +120,17 @@ func afterTest(t testing.TB) {
} }
t.Errorf("Test appears to have leaked %s:\n%s", bad, stacks) t.Errorf("Test appears to have leaked %s:\n%s", bad, stacks)
} }
// waitCondition reports whether fn eventually returned true,
// checking immediately and then every checkEvery amount,
// until waitFor has elpased, at which point it returns false.
func waitCondition(waitFor, checkEvery time.Duration, fn func() bool) bool {
deadline := time.Now().Add(waitFor)
for time.Now().Before(deadline) {
if fn() {
return true
}
time.Sleep(checkEvery)
}
return false
}
...@@ -65,6 +65,7 @@ const DefaultMaxIdleConnsPerHost = 2 ...@@ -65,6 +65,7 @@ const DefaultMaxIdleConnsPerHost = 2
type Transport struct { type Transport struct {
idleMu sync.Mutex idleMu sync.Mutex
wantIdle bool // user has requested to close all idle conns wantIdle bool // user has requested to close all idle conns
idleCount int
idleConn map[connectMethodKey][]*persistConn idleConn map[connectMethodKey][]*persistConn
idleConnCh map[connectMethodKey]chan *persistConn idleConnCh map[connectMethodKey]chan *persistConn
...@@ -166,7 +167,7 @@ type Transport struct { ...@@ -166,7 +167,7 @@ type Transport struct {
nextProtoOnce sync.Once nextProtoOnce sync.Once
h2transport *http2Transport // non-nil if http2 wired up h2transport *http2Transport // non-nil if http2 wired up
// TODO: tunable on global max cached connections // TODO: MaxIdleConns tunable for global max cached connections (Issue 15461)
// TODO: tunable on timeout on cached connections // TODO: tunable on timeout on cached connections
// TODO: tunable on max per-host TCP dials in flight (Issue 13957) // TODO: tunable on max per-host TCP dials in flight (Issue 13957)
} }
...@@ -613,6 +614,7 @@ func (t *Transport) tryPutIdleConn(pconn *persistConn) error { ...@@ -613,6 +614,7 @@ func (t *Transport) tryPutIdleConn(pconn *persistConn) error {
} }
} }
t.idleConn[key] = append(t.idleConn[key], pconn) t.idleConn[key] = append(t.idleConn[key], pconn)
t.idleCount++
return nil return nil
} }
...@@ -638,13 +640,14 @@ func (t *Transport) getIdleConnCh(cm connectMethod) chan *persistConn { ...@@ -638,13 +640,14 @@ func (t *Transport) getIdleConnCh(cm connectMethod) chan *persistConn {
return ch return ch
} }
func (t *Transport) getIdleConn(cm connectMethod) (pconn *persistConn) { func (t *Transport) getIdleConn(cm connectMethod) *persistConn {
key := cm.key() key := cm.key()
t.idleMu.Lock() t.idleMu.Lock()
defer t.idleMu.Unlock() defer t.idleMu.Unlock()
if t.idleConn == nil { if t.idleConn == nil {
return nil return nil
} }
var pconn *persistConn
for { for {
pconns, ok := t.idleConn[key] pconns, ok := t.idleConn[key]
if !ok { if !ok {
...@@ -659,8 +662,44 @@ func (t *Transport) getIdleConn(cm connectMethod) (pconn *persistConn) { ...@@ -659,8 +662,44 @@ func (t *Transport) getIdleConn(cm connectMethod) (pconn *persistConn) {
pconn = pconns[len(pconns)-1] pconn = pconns[len(pconns)-1]
t.idleConn[key] = pconns[:len(pconns)-1] t.idleConn[key] = pconns[:len(pconns)-1]
} }
if !pconn.isBroken() { t.idleCount--
return if pconn.isBroken() {
// There is a tiny window where this is
// possible, between the connecting dying and
// the persistConn readLoop calling
// Transport.removeIdleConn. Just skip it and
// carry on.
continue
}
return pconn
}
}
// removeIdleConn marks pconn as dead.
func (t *Transport) removeIdleConn(pconn *persistConn) {
key := pconn.cacheKey
t.idleMu.Lock()
defer t.idleMu.Unlock()
pconns, _ := t.idleConn[key]
switch len(pconns) {
case 0:
// Nothing
case 1:
if pconns[0] == pconn {
t.idleCount--
delete(t.idleConn, key)
}
default:
// TODO(bradfitz): map into LRU element?
for i, v := range pconns {
if v != pconn {
continue
}
pconns[i] = pconns[len(pconns)-1]
t.idleConn[key] = pconns[:len(pconns)-1]
t.idleCount--
break
} }
} }
} }
...@@ -1120,7 +1159,10 @@ func (pc *persistConn) cancelRequest() { ...@@ -1120,7 +1159,10 @@ func (pc *persistConn) cancelRequest() {
func (pc *persistConn) readLoop() { func (pc *persistConn) readLoop() {
closeErr := errReadLoopExiting // default value, if not changed below closeErr := errReadLoopExiting // default value, if not changed below
defer func() { pc.close(closeErr) }() defer func() {
pc.close(closeErr)
pc.t.removeIdleConn(pc)
}()
tryPutIdleConn := func() bool { tryPutIdleConn := func() bool {
if err := pc.t.tryPutIdleConn(pc); err != nil { if err := pc.t.tryPutIdleConn(pc); err != nil {
......
...@@ -438,6 +438,54 @@ func TestTransportMaxPerHostIdleConns(t *testing.T) { ...@@ -438,6 +438,54 @@ func TestTransportMaxPerHostIdleConns(t *testing.T) {
} }
} }
func TestTransportRemovesDeadIdleConnections(t *testing.T) {
defer afterTest(t)
ts := httptest.NewServer(HandlerFunc(func(w ResponseWriter, r *Request) {
io.WriteString(w, r.RemoteAddr)
}))
defer ts.Close()
tr := &Transport{}
defer tr.CloseIdleConnections()
c := &Client{Transport: tr}
doReq := func(name string) string {
// Do a POST instead of a GET to prevent the Transport's
// idempotent request retry logic from kicking in...
res, err := c.Post(ts.URL, "", nil)
if err != nil {
t.Fatalf("%s: %v", name, err)
}
if res.StatusCode != 200 {
t.Fatalf("%s: %v", name, res.Status)
}
defer res.Body.Close()
slurp, err := ioutil.ReadAll(res.Body)
if err != nil {
t.Fatalf("%s: %v", name, err)
}
return string(slurp)
}
first := doReq("first")
keys1 := tr.IdleConnKeysForTesting()
ts.CloseClientConnections()
var keys2 []string
if !waitCondition(3*time.Second, 50*time.Millisecond, func() bool {
keys2 = tr.IdleConnKeysForTesting()
return len(keys2) == 0
}) {
t.Fatalf("Transport didn't notice idle connection's death.\nbefore: %q\n after: %q\n", keys1, keys2)
}
second := doReq("second")
if first == second {
t.Errorf("expected a different connection between requests. got %q both times", first)
}
}
func TestTransportServerClosingUnexpectedly(t *testing.T) { func TestTransportServerClosingUnexpectedly(t *testing.T) {
setParallel(t) setParallel(t)
defer afterTest(t) defer afterTest(t)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment