Commit 66b47431 authored by Brad Fitzpatrick's avatar Brad Fitzpatrick Committed by Andrew Gerrand

net/http: update bundled http2

Updates x/net/http2 to git rev 6a513af for:

  http2: return flow control for closed streams
  https://golang.org/cl/25231

  http2: make Transport prefer HTTP response header recv before body write error
  https://golang.org/cl/24984

  http2: make Transport treat "Connection: close" the same as Request.Close
  https://golang.org/cl/24982

Fixes golang/go#16481

Change-Id: Iaddb166387ca2df1cfbbf09a166f8605578bec49
Reviewed-on: https://go-review.googlesource.com/25282
Run-TryBot: Brad Fitzpatrick <bradfitz@golang.org>
TryBot-Result: Gobot Gobot <gobot@golang.org>
Reviewed-by: default avatarAndrew Gerrand <adg@golang.org>
parent b11fff38
......@@ -85,7 +85,16 @@ const (
http2noDialOnMiss = false
)
func (p *http2clientConnPool) getClientConn(_ *Request, addr string, dialOnMiss bool) (*http2ClientConn, error) {
func (p *http2clientConnPool) getClientConn(req *Request, addr string, dialOnMiss bool) (*http2ClientConn, error) {
if http2isConnectionCloseRequest(req) && dialOnMiss {
// It gets its own connection.
const singleUse = true
cc, err := p.t.dialClientConn(addr, singleUse)
if err != nil {
return nil, err
}
return cc, nil
}
p.mu.Lock()
for _, cc := range p.conns[addr] {
if cc.CanTakeNewRequest() {
......@@ -128,7 +137,8 @@ func (p *http2clientConnPool) getStartDialLocked(addr string) *http2dialCall {
// run in its own goroutine.
func (c *http2dialCall) dial(addr string) {
c.res, c.err = c.p.t.dialClientConn(addr)
const singleUse = false // shared conn
c.res, c.err = c.p.t.dialClientConn(addr, singleUse)
close(c.done)
c.p.mu.Lock()
......@@ -3803,6 +3813,9 @@ func (sc *http2serverConn) closeStream(st *http2stream, err error) {
}
delete(sc.streams, st.id)
if p := st.body; p != nil {
sc.sendWindowUpdate(nil, p.Len())
p.CloseWithError(err)
}
st.cw.Close()
......@@ -3879,17 +3892,24 @@ func (sc *http2serverConn) processSettingInitialWindowSize(val uint32) error {
func (sc *http2serverConn) processData(f *http2DataFrame) error {
sc.serveG.check()
data := f.Data()
id := f.Header().StreamID
st, ok := sc.streams[id]
if !ok || st.state != http2stateOpen || st.gotTrailerHeader {
if int(sc.inflow.available()) < len(data) {
return http2StreamError{id, http2ErrCodeFlowControl}
}
sc.inflow.take(int32(len(data)))
sc.sendWindowUpdate(nil, len(data))
return http2StreamError{id, http2ErrCodeStreamClosed}
}
if st.body == nil {
panic("internal error: should have a body in this state")
}
data := f.Data()
if st.declBodyBytes != -1 && st.bodyBytes+int64(len(data)) > st.declBodyBytes {
st.body.CloseWithError(fmt.Errorf("sender tried to send more than declared Content-Length of %d bytes", st.declBodyBytes))
......@@ -4919,9 +4939,10 @@ func (t *http2Transport) initConnPool() {
// ClientConn is the state of a single HTTP/2 client connection to an
// HTTP/2 server.
type http2ClientConn struct {
t *http2Transport
tconn net.Conn // usually *tls.Conn, except specialized impls
tlsState *tls.ConnectionState // nil only for specialized impls
t *http2Transport
tconn net.Conn // usually *tls.Conn, except specialized impls
tlsState *tls.ConnectionState // nil only for specialized impls
singleUse bool // whether being used for a single http.Request
// readLoop goroutine fields:
readerDone chan struct{} // closed on error
......@@ -5117,7 +5138,7 @@ func http2shouldRetryRequest(req *Request, err error) bool {
return err == http2errClientConnUnusable
}
func (t *http2Transport) dialClientConn(addr string) (*http2ClientConn, error) {
func (t *http2Transport) dialClientConn(addr string, singleUse bool) (*http2ClientConn, error) {
host, _, err := net.SplitHostPort(addr)
if err != nil {
return nil, err
......@@ -5126,7 +5147,7 @@ func (t *http2Transport) dialClientConn(addr string) (*http2ClientConn, error) {
if err != nil {
return nil, err
}
return t.NewClientConn(tconn)
return t.newClientConn(tconn, singleUse)
}
func (t *http2Transport) newTLSConfig(host string) *tls.Config {
......@@ -5187,6 +5208,10 @@ func (t *http2Transport) expectContinueTimeout() time.Duration {
}
func (t *http2Transport) NewClientConn(c net.Conn) (*http2ClientConn, error) {
return t.newClientConn(c, false)
}
func (t *http2Transport) newClientConn(c net.Conn, singleUse bool) (*http2ClientConn, error) {
if http2VerboseLogs {
t.vlogf("http2: Transport creating client conn to %v", c.RemoteAddr())
}
......@@ -5204,6 +5229,7 @@ func (t *http2Transport) NewClientConn(c net.Conn) (*http2ClientConn, error) {
initialWindowSize: 65535,
maxConcurrentStreams: 1000,
streams: make(map[uint32]*http2clientStream),
singleUse: singleUse,
}
cc.cond = sync.NewCond(&cc.mu)
cc.flow.add(int32(http2initialWindowSize))
......@@ -5288,6 +5314,9 @@ func (cc *http2ClientConn) CanTakeNewRequest() bool {
}
func (cc *http2ClientConn) canTakeNewRequestLocked() bool {
if cc.singleUse && cc.nextStreamID > 1 {
return false
}
return cc.goAway == nil && !cc.closed &&
int64(len(cc.streams)+1) < int64(cc.maxConcurrentStreams) &&
cc.nextStreamID < 2147483647
......@@ -5494,22 +5523,26 @@ func (cc *http2ClientConn) RoundTrip(req *Request) (*Response, error) {
bodyWritten := false
ctx := http2reqContext(req)
handleReadLoopResponse := func(re http2resAndError) (*Response, error) {
res := re.res
if re.err != nil || res.StatusCode > 299 {
bodyWriter.cancel()
cs.abortRequestBodyWrite(http2errStopReqBodyWrite)
}
if re.err != nil {
cc.forgetStreamID(cs.ID)
return nil, re.err
}
res.Request = req
res.TLS = cc.tlsState
return res, nil
}
for {
select {
case re := <-readLoopResCh:
res := re.res
if re.err != nil || res.StatusCode > 299 {
bodyWriter.cancel()
cs.abortRequestBodyWrite(http2errStopReqBodyWrite)
}
if re.err != nil {
cc.forgetStreamID(cs.ID)
return nil, re.err
}
res.Request = req
res.TLS = cc.tlsState
return res, nil
return handleReadLoopResponse(re)
case <-respHeaderTimer:
cc.forgetStreamID(cs.ID)
if !hasBody || bodyWritten {
......@@ -5541,6 +5574,12 @@ func (cc *http2ClientConn) RoundTrip(req *Request) (*Response, error) {
return nil, cs.resetErr
case err := <-bodyWriter.resc:
select {
case re := <-readLoopResCh:
return handleReadLoopResponse(re)
default:
}
if err != nil {
return nil, err
}
......@@ -5932,7 +5971,7 @@ func (rl *http2clientConnReadLoop) cleanup() {
func (rl *http2clientConnReadLoop) run() error {
cc := rl.cc
rl.closeWhenIdle = cc.t.disableKeepAlives()
rl.closeWhenIdle = cc.t.disableKeepAlives() || cc.singleUse
gotReply := false
for {
f, err := cc.fr.ReadFrame()
......@@ -6216,10 +6255,27 @@ var http2errClosedResponseBody = errors.New("http2: response body closed")
func (b http2transportResponseBody) Close() error {
cs := b.cs
if cs.bufPipe.Err() != io.EOF {
cc := cs.cc
cs.cc.writeStreamReset(cs.ID, http2ErrCodeCancel, nil)
serverSentStreamEnd := cs.bufPipe.Err() == io.EOF
unread := cs.bufPipe.Len()
if unread > 0 || !serverSentStreamEnd {
cc.mu.Lock()
cc.wmu.Lock()
if !serverSentStreamEnd {
cc.fr.WriteRSTStream(cs.ID, http2ErrCodeCancel)
}
if unread > 0 {
cc.inflow.add(int32(unread))
cc.fr.WriteWindowUpdate(0, uint32(unread))
}
cc.bw.Flush()
cc.wmu.Unlock()
cc.mu.Unlock()
}
cs.bufPipe.BreakWithError(http2errClosedResponseBody)
return nil
}
......@@ -6227,6 +6283,7 @@ func (b http2transportResponseBody) Close() error {
func (rl *http2clientConnReadLoop) processData(f *http2DataFrame) error {
cc := rl.cc
cs := cc.streamByID(f.StreamID, f.StreamEnded())
data := f.Data()
if cs == nil {
cc.mu.Lock()
neverSent := cc.nextStreamID
......@@ -6237,9 +6294,15 @@ func (rl *http2clientConnReadLoop) processData(f *http2DataFrame) error {
return http2ConnectionError(http2ErrCodeProtocol)
}
if len(data) > 0 {
cc.wmu.Lock()
cc.fr.WriteWindowUpdate(0, uint32(len(data)))
cc.bw.Flush()
cc.wmu.Unlock()
}
return nil
}
if data := f.Data(); len(data) > 0 {
if len(data) > 0 {
if cs.bufPipe.b == nil {
cc.logf("http2: Transport received DATA frame for closed stream; closing connection")
......@@ -6282,7 +6345,7 @@ func (rl *http2clientConnReadLoop) endStreamError(cs *http2clientStream, err err
}
cs.bufPipe.closeWithErrorAndCode(err, code)
delete(rl.activeRes, cs.ID)
if cs.req.Close || cs.req.Header.Get("Connection") == "close" {
if http2isConnectionCloseRequest(cs.req) {
rl.closeWhenIdle = true
}
}
......@@ -6538,6 +6601,12 @@ func (s http2bodyWriterState) scheduleBodyWrite() {
}
}
// isConnectionCloseRequest reports whether req should use its own
// connection for a single request and then close the connection.
func http2isConnectionCloseRequest(req *Request) bool {
return req.Close || httplex.HeaderValuesContainsToken(req.Header["Connection"], "close")
}
// writeFramer is implemented by any type that is used to write frames.
type http2writeFramer interface {
writeFrame(http2writeContext) error
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment