Commit f251708a authored by Brad Fitzpatrick's avatar Brad Fitzpatrick

net/http: update bundled http2

Updates bundled x/net/http2 to git rev 8dab9293 for:

    http2: make Transport retry on server's GOAWAY graceful shutdown
    https://golang.org/cl/33971

Fixes #18083

Change-Id: I676f5eb4b490a4d86356778bb17296c451f16d90
Reviewed-on: https://go-review.googlesource.com/34011
Run-TryBot: Brad Fitzpatrick <bradfitz@golang.org>
TryBot-Result: Gobot Gobot <gobot@golang.org>
Reviewed-by: default avatarTom Bergan <tombergan@google.com>
parent 348a7c53
...@@ -2188,6 +2188,14 @@ func http2shouldLogPanic(panicValue interface{}) bool { ...@@ -2188,6 +2188,14 @@ func http2shouldLogPanic(panicValue interface{}) bool {
return panicValue != nil && panicValue != ErrAbortHandler return panicValue != nil && panicValue != ErrAbortHandler
} }
func http2reqGetBody(req *Request) func() (io.ReadCloser, error) {
return req.GetBody
}
func http2reqBodyIsNoBody(body io.ReadCloser) bool {
return body == NoBody
}
var http2DebugGoroutines = os.Getenv("DEBUG_HTTP2_GOROUTINES") == "1" var http2DebugGoroutines = os.Getenv("DEBUG_HTTP2_GOROUTINES") == "1"
type http2goroutineLock uint64 type http2goroutineLock uint64
...@@ -3247,6 +3255,11 @@ func (sc *http2serverConn) maxHeaderListSize() uint32 { ...@@ -3247,6 +3255,11 @@ func (sc *http2serverConn) maxHeaderListSize() uint32 {
return uint32(n + typicalHeaders*perFieldOverhead) return uint32(n + typicalHeaders*perFieldOverhead)
} }
func (sc *http2serverConn) curOpenStreams() uint32 {
sc.serveG.check()
return sc.curClientStreams + sc.curPushedStreams
}
// stream represents a stream. This is the minimal metadata needed by // stream represents a stream. This is the minimal metadata needed by
// the serve goroutine. Most of the actual stream state is owned by // the serve goroutine. Most of the actual stream state is owned by
// the http.Handler's goroutine in the responseWriter. Because the // the http.Handler's goroutine in the responseWriter. Because the
...@@ -3560,7 +3573,7 @@ func (sc *http2serverConn) serve() { ...@@ -3560,7 +3573,7 @@ func (sc *http2serverConn) serve() {
fn(loopNum) fn(loopNum)
} }
if sc.inGoAway && sc.curClientStreams == 0 && !sc.needToSendGoAway && !sc.writingFrame { if sc.inGoAway && sc.curOpenStreams() == 0 && !sc.needToSendGoAway && !sc.writingFrame {
return return
} }
} }
...@@ -4373,7 +4386,7 @@ func (sc *http2serverConn) newStream(id, pusherID uint32, state http2streamState ...@@ -4373,7 +4386,7 @@ func (sc *http2serverConn) newStream(id, pusherID uint32, state http2streamState
} else { } else {
sc.curClientStreams++ sc.curClientStreams++
} }
if sc.curClientStreams+sc.curPushedStreams == 1 { if sc.curOpenStreams() == 1 {
sc.setConnState(StateActive) sc.setConnState(StateActive)
} }
...@@ -5114,7 +5127,7 @@ func (w *http2responseWriter) push(target string, opts http2pushOptions) error { ...@@ -5114,7 +5127,7 @@ func (w *http2responseWriter) push(target string, opts http2pushOptions) error {
} }
for k := range opts.Header { for k := range opts.Header {
if strings.HasPrefix(k, ":") { if strings.HasPrefix(k, ":") {
return fmt.Errorf("promised request headers cannot include psuedo header %q", k) return fmt.Errorf("promised request headers cannot include pseudo header %q", k)
} }
switch strings.ToLower(k) { switch strings.ToLower(k) {
...@@ -5510,6 +5523,7 @@ type http2clientStream struct { ...@@ -5510,6 +5523,7 @@ type http2clientStream struct {
ID uint32 ID uint32
resc chan http2resAndError resc chan http2resAndError
bufPipe http2pipe // buffered pipe with the flow-controlled response payload bufPipe http2pipe // buffered pipe with the flow-controlled response payload
startedWrite bool // started request body write; guarded by cc.mu
requestedGzip bool requestedGzip bool
on100 func() // optional code to run if get a 100 continue response on100 func() // optional code to run if get a 100 continue response
...@@ -5651,9 +5665,11 @@ func (t *http2Transport) RoundTripOpt(req *Request, opt http2RoundTripOpt) (*Res ...@@ -5651,9 +5665,11 @@ func (t *http2Transport) RoundTripOpt(req *Request, opt http2RoundTripOpt) (*Res
} }
http2traceGotConn(req, cc) http2traceGotConn(req, cc)
res, err := cc.RoundTrip(req) res, err := cc.RoundTrip(req)
if http2shouldRetryRequest(req, err) { if err != nil {
if req, err = http2shouldRetryRequest(req, err); err == nil {
continue continue
} }
}
if err != nil { if err != nil {
t.vlogf("RoundTrip failure: %v", err) t.vlogf("RoundTrip failure: %v", err)
return nil, err return nil, err
...@@ -5674,11 +5690,39 @@ func (t *http2Transport) CloseIdleConnections() { ...@@ -5674,11 +5690,39 @@ func (t *http2Transport) CloseIdleConnections() {
var ( var (
http2errClientConnClosed = errors.New("http2: client conn is closed") http2errClientConnClosed = errors.New("http2: client conn is closed")
http2errClientConnUnusable = errors.New("http2: client conn not usable") http2errClientConnUnusable = errors.New("http2: client conn not usable")
http2errClientConnGotGoAway = errors.New("http2: Transport received Server's graceful shutdown GOAWAY")
http2errClientConnGotGoAwayAfterSomeReqBody = errors.New("http2: Transport received Server's graceful shutdown GOAWAY; some request body already written")
) )
func http2shouldRetryRequest(req *Request, err error) bool { // shouldRetryRequest is called by RoundTrip when a request fails to get
// response headers. It is always called with a non-nil error.
// It returns either a request to retry (either the same request, or a
// modified clone), or an error if the request can't be replayed.
func http2shouldRetryRequest(req *Request, err error) (*Request, error) {
switch err {
default:
return nil, err
case http2errClientConnUnusable, http2errClientConnGotGoAway:
return req, nil
case http2errClientConnGotGoAwayAfterSomeReqBody:
if req.Body == nil || http2reqBodyIsNoBody(req.Body) {
return req, nil
}
return err == http2errClientConnUnusable getBody := http2reqGetBody(req)
if getBody == nil {
return nil, errors.New("http2: Transport: peer server initiated graceful shutdown after some of Request.Body was written; define Request.GetBody to avoid this error")
}
body, err := getBody()
if err != nil {
return nil, err
}
newReq := *req
newReq.Body = body
return &newReq, nil
}
} }
func (t *http2Transport) dialClientConn(addr string, singleUse bool) (*http2ClientConn, error) { func (t *http2Transport) dialClientConn(addr string, singleUse bool) (*http2ClientConn, error) {
...@@ -5826,6 +5870,15 @@ func (cc *http2ClientConn) setGoAway(f *http2GoAwayFrame) { ...@@ -5826,6 +5870,15 @@ func (cc *http2ClientConn) setGoAway(f *http2GoAwayFrame) {
if old != nil && old.ErrCode != http2ErrCodeNo { if old != nil && old.ErrCode != http2ErrCodeNo {
cc.goAway.ErrCode = old.ErrCode cc.goAway.ErrCode = old.ErrCode
} }
last := f.LastStreamID
for streamID, cs := range cc.streams {
if streamID > last {
select {
case cs.resc <- http2resAndError{err: http2errClientConnGotGoAway}:
default:
}
}
}
} }
func (cc *http2ClientConn) CanTakeNewRequest() bool { func (cc *http2ClientConn) CanTakeNewRequest() bool {
...@@ -6059,6 +6112,13 @@ func (cc *http2ClientConn) RoundTrip(req *Request) (*Response, error) { ...@@ -6059,6 +6112,13 @@ func (cc *http2ClientConn) RoundTrip(req *Request) (*Response, error) {
cs.abortRequestBodyWrite(http2errStopReqBodyWrite) cs.abortRequestBodyWrite(http2errStopReqBodyWrite)
} }
if re.err != nil { if re.err != nil {
if re.err == http2errClientConnGotGoAway {
cc.mu.Lock()
if cs.startedWrite {
re.err = http2errClientConnGotGoAwayAfterSomeReqBody
}
cc.mu.Unlock()
}
cc.forgetStreamID(cs.ID) cc.forgetStreamID(cs.ID)
return nil, re.err return nil, re.err
} }
...@@ -7225,6 +7285,9 @@ func (t *http2Transport) getBodyWriterState(cs *http2clientStream, body io.Reade ...@@ -7225,6 +7285,9 @@ func (t *http2Transport) getBodyWriterState(cs *http2clientStream, body io.Reade
resc := make(chan error, 1) resc := make(chan error, 1)
s.resc = resc s.resc = resc
s.fn = func() { s.fn = func() {
cs.cc.mu.Lock()
cs.startedWrite = true
cs.cc.mu.Unlock()
resc <- cs.writeRequestBody(body, cs.req.Body) resc <- cs.writeRequestBody(body, cs.req.Body)
} }
s.delay = t.expectContinueTimeout() s.delay = t.expectContinueTimeout()
...@@ -7644,7 +7707,9 @@ type http2WriteScheduler interface { ...@@ -7644,7 +7707,9 @@ type http2WriteScheduler interface {
// https://tools.ietf.org/html/rfc7540#section-5.1 // https://tools.ietf.org/html/rfc7540#section-5.1
AdjustStream(streamID uint32, priority http2PriorityParam) AdjustStream(streamID uint32, priority http2PriorityParam)
// Push queues a frame in the scheduler. // Push queues a frame in the scheduler. In most cases, this will not be
// called with wr.StreamID()!=0 unless that stream is currently open. The one
// exception is RST_STREAM frames, which may be sent on idle or closed streams.
Push(wr http2FrameWriteRequest) Push(wr http2FrameWriteRequest)
// Pop dequeues the next frame to write. Returns false if no frames can // Pop dequeues the next frame to write. Returns false if no frames can
...@@ -8183,7 +8248,11 @@ func (ws *http2priorityWriteScheduler) Push(wr http2FrameWriteRequest) { ...@@ -8183,7 +8248,11 @@ func (ws *http2priorityWriteScheduler) Push(wr http2FrameWriteRequest) {
} else { } else {
n = ws.nodes[id] n = ws.nodes[id]
if n == nil { if n == nil {
panic("add on non-open stream")
if wr.DataSize() > 0 {
panic("add DATA on non-open stream")
}
n = &ws.root
} }
} }
n.q.push(wr) n.q.push(wr)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment