Commit 504b8d15 authored by Brad Fitzpatrick's avatar Brad Fitzpatrick

net/http: regenerate http2 bundle with bundle fixes to include comments

The golang.org/x/tools/cmd/bundle tool previously had a bug where it
dropped some comments.

This regenerates it with the fixed version (https://golang.org/cl/45117).

(Upstream is still git rev 3470a06c1, from https://golang.org/cl/44331)

Updates #20548

Change-Id: Ic5d9208a0c8f7facdb7b315c6acab66ace34c0a9
Reviewed-on: https://go-review.googlesource.com/45158Reviewed-by: default avatarHiroshi Ioka <hirochachacha@gmail.com>
Reviewed-by: default avatarBrad Fitzpatrick <bradfitz@golang.org>
parent 25a3dd3f
......@@ -759,7 +759,7 @@ type http2dialCall struct {
// requires p.mu is held.
func (p *http2clientConnPool) getStartDialLocked(addr string) *http2dialCall {
if call, ok := p.dialing[addr]; ok {
// A dial is already in-flight. Don't start another.
return call
}
call := &http2dialCall{p: p, done: make(chan struct{})}
......@@ -887,7 +887,12 @@ func (p *http2clientConnPool) MarkDead(cc *http2ClientConn) {
func (p *http2clientConnPool) closeIdleConnections() {
p.mu.Lock()
defer p.mu.Unlock()
// TODO: don't close a cc if it was just added to the pool
// milliseconds ago and has never been used. There's currently
// a small race window with the HTTP/1 Transport's integration
// where it can add an idle conn just before using it, and
// somebody else can concurrently call CloseIdleConns and
// break some caller's RoundTrip.
for _, vv := range p.conns {
for _, cc := range vv {
cc.closeIfIdle()
......@@ -902,7 +907,8 @@ func http2filterOutClientConn(in []*http2ClientConn, exclude *http2ClientConn) [
out = append(out, v)
}
}
// If we filtered it out, zero out the last item to prevent
// the GC from seeing it.
if len(in) != len(out) {
in[len(in)-1] = nil
}
......@@ -943,7 +949,10 @@ func http2configureTransport(t1 *Transport) (*http2Transport, error) {
go c.Close()
return http2erringRoundTripper{err}
} else if !used {
// Turns out we don't need this c.
// For example, two goroutines made requests to the same host
// at the same time, both kicking off TCP dials. (since protocol
// was unknown)
go c.Close()
}
return t2
......@@ -1058,7 +1067,7 @@ func (b *http2dataBuffer) Read(p []byte) (int, error) {
ntotal += n
b.r += n
b.size -= n
// If the first chunk has been consumed, advance to the next chunk.
if b.r == len(b.chunks[0]) {
http2putDataBufferChunk(b.chunks[0])
end := len(b.chunks) - 1
......@@ -1087,7 +1096,9 @@ func (b *http2dataBuffer) Len() int {
func (b *http2dataBuffer) Write(p []byte) (int, error) {
ntotal := len(p)
for len(p) > 0 {
// If the last chunk is empty, allocate a new chunk. Try to allocate
// enough to fully copy p plus any additional bytes we expect to
// receive. However, this may allocate less than len(p).
want := int64(len(p))
if b.expected > want {
want = b.expected
......@@ -1571,6 +1582,12 @@ type http2Framer struct {
// If the limit is hit, MetaHeadersFrame.Truncated is set true.
MaxHeaderListSize uint32
// TODO: track which type of frame & with which flags was sent
// last. Then return an error (unless AllowIllegalWrites) if
// we're in the middle of a header block and a
// non-Continuation or Continuation on a different stream is
// attempted to be written.
logReads, logWrites bool
debugFramer *http2Framer // only use for logging written writes
......@@ -1583,15 +1600,15 @@ type http2Framer struct {
func (fr *http2Framer) maxHeaderListSize() uint32 {
if fr.MaxHeaderListSize == 0 {
return 16 << 20
return 16 << 20 // sane default, per docs
}
return fr.MaxHeaderListSize
}
func (f *http2Framer) startWrite(ftype http2FrameType, flags http2Flags, streamID uint32) {
// Write the FrameHeader.
f.wbuf = append(f.wbuf[:0],
0,
0, // 3 bytes of length, filled in in endWrite
0,
0,
byte(ftype),
......@@ -1603,7 +1620,8 @@ func (f *http2Framer) startWrite(ftype http2FrameType, flags http2Flags, streamI
}
func (f *http2Framer) endWrite() error {
// Now that we know the final size, fill in the FrameHeader in
// the space previously reserved for it. Abuse append.
length := len(f.wbuf) - http2frameHeaderLen
if length >= (1 << 24) {
return http2ErrFrameTooLarge
......@@ -1627,8 +1645,9 @@ func (f *http2Framer) logWrite() {
if f.debugFramer == nil {
f.debugFramerBuf = new(bytes.Buffer)
f.debugFramer = http2NewFramer(nil, f.debugFramerBuf)
f.debugFramer.logReads = false
f.debugFramer.logReads = false // we log it ourselves, saying "wrote" below
// Let us read anything, even if we accidentally wrote it
// in the wrong order:
f.debugFramer.AllowIllegalReads = true
}
f.debugFramerBuf.Write(f.wbuf)
......@@ -1845,7 +1864,11 @@ func (f *http2DataFrame) Data() []byte {
func http2parseDataFrame(fc *http2frameCache, fh http2FrameHeader, payload []byte) (http2Frame, error) {
if fh.StreamID == 0 {
// DATA frames MUST be associated with a stream. If a
// DATA frame is received whose stream identifier
// field is 0x0, the recipient MUST respond with a
// connection error (Section 5.4.1) of type
// PROTOCOL_ERROR.
return nil, http2connError{http2ErrCodeProtocol, "DATA frame with stream ID 0"}
}
f := fc.getDataFrame()
......@@ -1860,7 +1883,10 @@ func http2parseDataFrame(fc *http2frameCache, fh http2FrameHeader, payload []byt
}
}
if int(padSize) > len(payload) {
// If the length of the padding is greater than the
// length of the frame payload, the recipient MUST
// treat this as a connection error.
// Filed: https://github.com/http2/http2-spec/issues/610
return nil, http2connError{http2ErrCodeProtocol, "pad size larger than data payload"}
}
f.data = payload[:len(payload)-int(padSize)]
......@@ -1911,7 +1937,7 @@ func (f *http2Framer) WriteDataPadded(streamID uint32, endStream bool, data, pad
if !f.AllowIllegalWrites {
for _, b := range pad {
if b != 0 {
// "Padding octets MUST be set to zero when sending."
return http2errPadBytes
}
}
......@@ -1945,20 +1971,33 @@ type http2SettingsFrame struct {
func http2parseSettingsFrame(_ *http2frameCache, fh http2FrameHeader, p []byte) (http2Frame, error) {
if fh.Flags.Has(http2FlagSettingsAck) && fh.Length > 0 {
// When this (ACK 0x1) bit is set, the payload of the
// SETTINGS frame MUST be empty. Receipt of a
// SETTINGS frame with the ACK flag set and a length
// field value other than 0 MUST be treated as a
// connection error (Section 5.4.1) of type
// FRAME_SIZE_ERROR.
return nil, http2ConnectionError(http2ErrCodeFrameSize)
}
if fh.StreamID != 0 {
// SETTINGS frames always apply to a connection,
// never a single stream. The stream identifier for a
// SETTINGS frame MUST be zero (0x0). If an endpoint
// receives a SETTINGS frame whose stream identifier
// field is anything other than 0x0, the endpoint MUST
// respond with a connection error (Section 5.4.1) of
// type PROTOCOL_ERROR.
return nil, http2ConnectionError(http2ErrCodeProtocol)
}
if len(p)%6 != 0 {
// Expecting even number of 6 byte settings.
return nil, http2ConnectionError(http2ErrCodeFrameSize)
}
f := &http2SettingsFrame{http2FrameHeader: fh, p: p}
if v, ok := f.Value(http2SettingInitialWindowSize); ok && v > (1<<31)-1 {
// Values above the maximum flow control window size of 2^31 - 1 MUST
// be treated as a connection error (Section 5.4.1) of type
// FLOW_CONTROL_ERROR.
return nil, http2ConnectionError(http2ErrCodeFlowControl)
}
return f, nil
......@@ -2127,9 +2166,14 @@ func http2parseWindowUpdateFrame(_ *http2frameCache, fh http2FrameHeader, p []by
if len(p) != 4 {
return nil, http2ConnectionError(http2ErrCodeFrameSize)
}
inc := binary.BigEndian.Uint32(p[:4]) & 0x7fffffff
inc := binary.BigEndian.Uint32(p[:4]) & 0x7fffffff // mask off high reserved bit
if inc == 0 {
// A receiver MUST treat the receipt of a
// WINDOW_UPDATE frame with an flow control window
// increment of 0 as a stream error (Section 5.4.2) of
// type PROTOCOL_ERROR; errors on the connection flow
// control window MUST be treated as a connection
// error (Section 5.4.1).
if fh.StreamID == 0 {
return nil, http2ConnectionError(http2ErrCodeProtocol)
}
......@@ -2146,7 +2190,7 @@ func http2parseWindowUpdateFrame(_ *http2frameCache, fh http2FrameHeader, p []by
// If the Stream ID is zero, the window update applies to the
// connection as a whole.
func (f *http2Framer) WriteWindowUpdate(streamID, incr uint32) error {
// "The legal range for the increment to the flow control window is 1 to 2^31-1 (2,147,483,647) octets."
if (incr < 1 || incr > 2147483647) && !f.AllowIllegalWrites {
return errors.New("illegal window increment value")
}
......@@ -2188,7 +2232,10 @@ func http2parseHeadersFrame(_ *http2frameCache, fh http2FrameHeader, p []byte) (
http2FrameHeader: fh,
}
if fh.StreamID == 0 {
// HEADERS frames MUST be associated with a stream. If a HEADERS frame
// is received whose stream identifier field is 0x0, the recipient MUST
// respond with a connection error (Section 5.4.1) of type
// PROTOCOL_ERROR.
return nil, http2connError{http2ErrCodeProtocol, "HEADERS frame with stream ID 0"}
}
var padLength uint8
......@@ -2204,7 +2251,7 @@ func http2parseHeadersFrame(_ *http2frameCache, fh http2FrameHeader, p []byte) (
return nil, err
}
hf.Priority.StreamDep = v & 0x7fffffff
hf.Priority.Exclusive = (v != hf.Priority.StreamDep)
hf.Priority.Exclusive = (v != hf.Priority.StreamDep) // high bit was set
p, hf.Priority.Weight, err = http2readByte(p)
if err != nil {
return nil, err
......@@ -2325,13 +2372,13 @@ func http2parsePriorityFrame(_ *http2frameCache, fh http2FrameHeader, payload []
return nil, http2connError{http2ErrCodeFrameSize, fmt.Sprintf("PRIORITY frame payload size was %d; want 5", len(payload))}
}
v := binary.BigEndian.Uint32(payload[:4])
streamID := v & 0x7fffffff
streamID := v & 0x7fffffff // mask off high bit
return &http2PriorityFrame{
http2FrameHeader: fh,
http2PriorityParam: http2PriorityParam{
Weight: payload[4],
StreamDep: streamID,
Exclusive: streamID != v,
Exclusive: streamID != v, // was high bit set?
},
}, nil
}
......@@ -2449,7 +2496,12 @@ func http2parsePushPromise(_ *http2frameCache, fh http2FrameHeader, p []byte) (_
http2FrameHeader: fh,
}
if pp.StreamID == 0 {
// PUSH_PROMISE frames MUST be associated with an existing,
// peer-initiated stream. The stream identifier of a
// PUSH_PROMISE frame indicates the stream it is associated
// with. If the stream identifier field specifies the value
// 0x0, a recipient MUST respond with a connection error
// (Section 5.4.1) of type PROTOCOL_ERROR.
return nil, http2ConnectionError(http2ErrCodeProtocol)
}
// The PUSH_PROMISE frame includes optional padding.
......@@ -2468,7 +2520,7 @@ func http2parsePushPromise(_ *http2frameCache, fh http2FrameHeader, p []byte) (_
pp.PromiseID = pp.PromiseID & (1<<31 - 1)
if int(padLength) > len(p) {
// like the DATA frame, error out if padding is longer than the body.
return nil, http2ConnectionError(http2ErrCodeProtocol)
}
pp.headerFragBuf = p[:len(p)-int(padLength)]
......@@ -2638,7 +2690,9 @@ func (mh *http2MetaHeadersFrame) checkPseudos() error {
default:
return http2pseudoHeaderError(hf.Name)
}
// Check for duplicates.
// This would be a bad algorithm, but N is 4.
// And this doesn't allocate.
for _, hf2 := range pf[:i] {
if hf.Name == hf2.Name {
return http2duplicatePseudoHeaderError(hf.Name)
......@@ -2656,7 +2710,8 @@ func (fr *http2Framer) maxHeaderStringLen() int {
if uint32(int(v)) == v {
return int(v)
}
// They had a crazy big number for MaxHeaderBytes anyway,
// so give them unlimited header lengths:
return 0
}
......@@ -2711,7 +2766,7 @@ func (fr *http2Framer) readMetaFrame(hf *http2HeadersFrame) (*http2MetaHeadersFr
mh.Fields = append(mh.Fields, hf)
})
// Lose reference to MetaHeadersFrame:
defer hdec.SetEmitFunc(func(hf hpack.HeaderField) {})
var hc http2headersOrContinuation = hf
......@@ -2727,7 +2782,7 @@ func (fr *http2Framer) readMetaFrame(hf *http2HeadersFrame) (*http2MetaHeadersFr
if f, err := fr.ReadFrame(); err != nil {
return nil, err
} else {
hc = f.(*http2ContinuationFrame)
hc = f.(*http2ContinuationFrame) // guaranteed by checkFrameOrder
}
}
......@@ -2769,7 +2824,7 @@ func http2summarizeFrame(f http2Frame) string {
return nil
})
if n > 0 {
buf.Truncate(buf.Len() - 1)
buf.Truncate(buf.Len() - 1) // remove trailing comma
}
case *http2DataFrame:
data := f.Data()
......@@ -2894,7 +2949,7 @@ func (cc *http2ClientConn) Ping(ctx context.Context) error {
func http2cloneTLSConfig(c *tls.Config) *tls.Config {
c2 := c.Clone()
c2.GetClientCertificate = c.GetClientCertificate
c2.GetClientCertificate = c.GetClientCertificate // golang.org/issue/19264
return c2
}
......@@ -2974,7 +3029,7 @@ func http2curGoroutineID() uint64 {
defer http2littleBuf.Put(bp)
b := *bp
b = b[:runtime.Stack(b, false)]
// Parse the 4707 out of "goroutine 4707 ["
b = bytes.TrimPrefix(b, http2goroutineSpace)
i := bytes.IndexByte(b, ' ')
if i < 0 {
......@@ -3010,9 +3065,10 @@ func http2parseUintBytes(s []byte, base int, bitSize int) (n uint64, err error)
goto Error
case 2 <= base && base <= 36:
// valid base; nothing to do
case base == 0:
// Look for octal, hex prefix.
switch {
case s[0] == '0' && len(s) > 1 && (s[1] == 'x' || s[1] == 'X'):
base = 16
......@@ -3058,7 +3114,7 @@ func http2parseUintBytes(s []byte, base int, bitSize int) (n uint64, err error)
}
if n >= cutoff {
// n*base overflows
n = 1<<64 - 1
err = strconv.ErrRange
goto Error
......@@ -3067,7 +3123,7 @@ func http2parseUintBytes(s []byte, base int, bitSize int) (n uint64, err error)
n1 := n + uint64(v)
if n1 < n || n1 > maxVal {
// n+v overflows
n = 1<<64 - 1
err = strconv.ErrRange
goto Error
......@@ -3251,7 +3307,7 @@ func (s http2Setting) String() string {
// Valid reports whether the setting is valid.
func (s http2Setting) Valid() error {
// Limits and error codes from 6.5.2 Defined SETTINGS Parameters
switch s.ID {
case http2SettingEnablePush:
if s.Val != 1 && s.Val != 0 {
......@@ -3495,7 +3551,8 @@ func (s *http2sorter) Keys(h Header) []string {
}
func (s *http2sorter) SortStrings(ss []string) {
// Our sorter works on s.v, which sorter owns, so
// stash it away while we sort the user's buffer.
save := s.v
s.v = ss
sort.Sort(s)
......@@ -3560,8 +3617,8 @@ func (p *http2pipe) Read(d []byte) (n int, err error) {
}
if p.err != nil {
if p.readFn != nil {
p.readFn()
p.readFn = nil
p.readFn() // e.g. copy trailers
p.readFn = nil // not sticky like p.err
}
p.b = nil
return 0, p.err
......@@ -3585,7 +3642,7 @@ func (p *http2pipe) Write(d []byte) (n int, err error) {
return 0, http2errClosedPipeWrite
}
if p.breakErr != nil {
return len(d), nil
return len(d), nil // discard when there is no reader
}
return p.b.Write(d)
}
......@@ -3617,7 +3674,7 @@ func (p *http2pipe) closeWithError(dst *error, err error, fn func()) {
}
defer p.c.Signal()
if *dst != nil {
// Already been done.
return
}
p.readFn = fn
......@@ -3633,7 +3690,8 @@ func (p *http2pipe) closeDoneLocked() {
if p.donec == nil {
return
}
// Close if unclosed. This isn't racy since we always
// hold p.mu while closing.
select {
case <-p.donec:
default:
......@@ -3659,7 +3717,7 @@ func (p *http2pipe) Done() <-chan struct{} {
if p.donec == nil {
p.donec = make(chan struct{})
if p.err != nil || p.breakErr != nil {
// Already hit an error.
p.closeDoneLocked()
}
}
......@@ -3785,7 +3843,7 @@ type http2serverInternalState struct {
func (s *http2serverInternalState) registerConn(sc *http2serverConn) {
if s == nil {
return
return // if the Server was used without calling ConfigureServer
}
s.mu.Lock()
s.activeConns[sc] = struct{}{}
......@@ -3794,7 +3852,7 @@ func (s *http2serverInternalState) registerConn(sc *http2serverConn) {
func (s *http2serverInternalState) unregisterConn(sc *http2serverConn) {
if s == nil {
return
return // if the Server was used without calling ConfigureServer
}
s.mu.Lock()
delete(s.activeConns, sc)
......@@ -3803,7 +3861,7 @@ func (s *http2serverInternalState) unregisterConn(sc *http2serverConn) {
func (s *http2serverInternalState) startGracefulShutdown() {
if s == nil {
return
return // if the Server was used without calling ConfigureServer
}
s.mu.Lock()
for sc := range s.activeConns {
......@@ -3856,6 +3914,13 @@ func http2ConfigureServer(s *Server, conf *http2Server) error {
}
}
// Note: not setting MinVersion to tls.VersionTLS12,
// as we don't want to interfere with HTTP/1.1 traffic
// on the user's server. We enforce TLS 1.2 later once
// we accept a connection. Ideally this should be done
// during next-proto selection, but using TLS <1.2 with
// HTTP/2 is still the client's bug.
s.TLSConfig.PreferServerCipherSuites = true
haveNPN := false
......@@ -3946,10 +4011,10 @@ func (s *http2Server) ServeConn(c net.Conn, opts *http2ServeConnOpts) {
readFrameCh: make(chan http2readFrameResult),
wantWriteFrameCh: make(chan http2FrameWriteRequest, 8),
serveMsgCh: make(chan interface{}, 8),
wroteFrameCh: make(chan http2frameWriteResult, 1),
bodyReadCh: make(chan http2bodyReadMsg),
wroteFrameCh: make(chan http2frameWriteResult, 1), // buffered; one send in writeFrameAsync
bodyReadCh: make(chan http2bodyReadMsg), // buffering doesn't matter either way
doneServing: make(chan struct{}),
clientMaxStreams: math.MaxUint32,
clientMaxStreams: math.MaxUint32, // Section 6.5.2: "Initially, there is no limit to this value"
advMaxStreams: s.maxConcurrentStreams(),
initialStreamSendWindowSize: http2initialWindowSize,
maxFrameSize: http2initialMaxFrameSize,
......@@ -3961,6 +4026,11 @@ func (s *http2Server) ServeConn(c net.Conn, opts *http2ServeConnOpts) {
s.state.registerConn(sc)
defer s.state.unregisterConn(sc)
// The net/http package sets the write deadline from the
// http.Server.WriteTimeout during the TLS handshake, but then
// passes the connection off to us with the deadline already set.
// Write deadlines are set per stream in serverConn.newStream.
// Disarm the net.Conn write deadline here.
if sc.hs.WriteTimeout != 0 {
sc.conn.SetWriteDeadline(time.Time{})
}
......@@ -3971,6 +4041,9 @@ func (s *http2Server) ServeConn(c net.Conn, opts *http2ServeConnOpts) {
sc.writeSched = http2NewRandomWriteScheduler()
}
// These start at the RFC-specified defaults. If there is a higher
// configured value for inflow, that will be updated when we send a
// WINDOW_UPDATE shortly after sending SETTINGS.
sc.flow.add(http2initialWindowSize)
sc.inflow.add(http2initialWindowSize)
sc.hpackEncoder = hpack.NewEncoder(&sc.headerWriteBuf)
......@@ -3984,18 +4057,44 @@ func (s *http2Server) ServeConn(c net.Conn, opts *http2ServeConnOpts) {
if tc, ok := c.(http2connectionStater); ok {
sc.tlsState = new(tls.ConnectionState)
*sc.tlsState = tc.ConnectionState()
// 9.2 Use of TLS Features
// An implementation of HTTP/2 over TLS MUST use TLS
// 1.2 or higher with the restrictions on feature set
// and cipher suite described in this section. Due to
// implementation limitations, it might not be
// possible to fail TLS negotiation. An endpoint MUST
// immediately terminate an HTTP/2 connection that
// does not meet the TLS requirements described in
// this section with a connection error (Section
// 5.4.1) of type INADEQUATE_SECURITY.
if sc.tlsState.Version < tls.VersionTLS12 {
sc.rejectConn(http2ErrCodeInadequateSecurity, "TLS version too low")
return
}
if sc.tlsState.ServerName == "" {
// Client must use SNI, but we don't enforce that anymore,
// since it was causing problems when connecting to bare IP
// addresses during development.
//
// TODO: optionally enforce? Or enforce at the time we receive
// a new request, and verify the the ServerName matches the :authority?
// But that precludes proxy situations, perhaps.
//
// So for now, do nothing here again.
}
if !s.PermitProhibitedCipherSuites && http2isBadCipher(sc.tlsState.CipherSuite) {
// "Endpoints MAY choose to generate a connection error
// (Section 5.4.1) of type INADEQUATE_SECURITY if one of
// the prohibited cipher suites are negotiated."
//
// We choose that. In my opinion, the spec is weak
// here. It also says both parties must support at least
// TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256 so there's no
// excuses here. If we really must, we could allow an
// "AllowInsecureWeakCiphers" option on the server later.
// Let's see how it plays out first.
sc.rejectConn(http2ErrCodeInadequateSecurity, fmt.Sprintf("Prohibited TLS 1.2 Cipher Suite: %x", sc.tlsState.CipherSuite))
return
}
......@@ -4009,7 +4108,7 @@ func (s *http2Server) ServeConn(c net.Conn, opts *http2ServeConnOpts) {
func (sc *http2serverConn) rejectConn(err http2ErrCode, debug string) {
sc.vlogf("http2: server rejecting conn: %v, %s", err, debug)
// ignoring errors. hanging up anyway.
sc.framer.WriteGoAway(0, err, []byte(debug))
sc.bw.Flush()
sc.conn.Close()
......@@ -4135,11 +4234,16 @@ func (sc *http2serverConn) HeaderEncoder() (*hpack.Encoder, *bytes.Buffer) {
func (sc *http2serverConn) state(streamID uint32) (http2streamState, *http2stream) {
sc.serveG.check()
// http://tools.ietf.org/html/rfc7540#section-5.1
if st, ok := sc.streams[streamID]; ok {
return st.state, st
}
// "The first use of a new stream identifier implicitly closes all
// streams in the "idle" state that might have been initiated by
// that peer with a lower-valued stream identifier. For example, if
// a client sends a HEADERS frame on stream 7 without ever sending a
// frame on stream 5, then stream 5 transitions to the "closed"
// state when the first frame for stream 7 is sent or received."
if streamID%2 == 1 {
if streamID <= sc.maxClientStreamID {
return http2stateClosed, nil
......@@ -4193,11 +4297,18 @@ func http2isClosedConnError(err error) bool {
return false
}
// TODO: remove this string search and be more like the Windows
// case below. That might involve modifying the standard library
// to return better error types.
str := err.Error()
if strings.Contains(str, "use of closed network connection") {
return true
}
// TODO(bradfitz): x/tools/cmd/bundle doesn't really support
// build tags, so I can't make an http2_windows.go file with
// Windows-specific stuff. Fix that and move this, once we
// have a way to bundle this into std's net/http somehow.
if runtime.GOOS == "windows" {
if oe, ok := err.(*net.OpError); ok && oe.Op == "read" {
if se, ok := oe.Err.(*os.SyscallError); ok && se.Syscall == "wsarecv" {
......@@ -4217,7 +4328,7 @@ func (sc *http2serverConn) condlogf(err error, format string, args ...interface{
return
}
if err == io.EOF || err == io.ErrUnexpectedEOF || http2isClosedConnError(err) {
// Boring, expected errors.
sc.vlogf(format, args...)
} else {
sc.logf(format, args...)
......@@ -4307,7 +4418,7 @@ func (sc *http2serverConn) stopShutdownTimer() {
}
func (sc *http2serverConn) notePanic() {
// Note: this is for serverConn.serve panicking, not http.Handler code.
if http2testHookOnPanicMu != nil {
http2testHookOnPanicMu.Lock()
defer http2testHookOnPanicMu.Unlock()
......@@ -4327,7 +4438,7 @@ func (sc *http2serverConn) serve() {
defer sc.conn.Close()
defer sc.closeAllStreamsOnConnClose()
defer sc.stopShutdownTimer()
defer close(sc.doneServing)
defer close(sc.doneServing) // unblocks handlers trying to send
if http2VerboseLogs {
sc.vlogf("http2: server connection from %v on %p", sc.conn.RemoteAddr(), sc.hs)
......@@ -4343,6 +4454,8 @@ func (sc *http2serverConn) serve() {
})
sc.unackedSettings++
// Each connection starts with intialWindowSize inflow tokens.
// If a higher value is configured, we add more tokens.
if diff := sc.srv.initialConnRecvWindowSize() - http2initialWindowSize; diff > 0 {
sc.sendWindowUpdate(nil, int(diff))
}
......@@ -4351,7 +4464,10 @@ func (sc *http2serverConn) serve() {
sc.condlogf(err, "http2: server: error reading preface from client %v: %v", sc.conn.RemoteAddr(), err)
return
}
// Now that we've got the preface, get us out of the
// "StateNew" state. We can't go directly to idle, though.
// Active means we read some data and anticipate a request. We'll
// do another Active when we get a HEADERS frame.
sc.setConnState(StateActive)
sc.setConnState(StateIdle)
......@@ -4360,7 +4476,7 @@ func (sc *http2serverConn) serve() {
defer sc.idleTimer.Stop()
}
go sc.readFrames()
go sc.readFrames() // closed by defer sc.conn.Close above
settingsTimer := time.AfterFunc(http2firstSettingsTimeout, sc.onSettingsTimer)
defer settingsTimer.Stop()
......@@ -4391,7 +4507,7 @@ func (sc *http2serverConn) serve() {
case msg := <-sc.serveMsgCh:
switch v := msg.(type) {
case func(int):
v(loopNum)
v(loopNum) // for testing
case *http2serverMessage:
switch v {
case http2settingsTimerMsg:
......@@ -4446,7 +4562,7 @@ func (sc *http2serverConn) onIdleTimer() { sc.sendServeMsg(http2idleTimerMsg) }
func (sc *http2serverConn) onShutdownTimer() { sc.sendServeMsg(http2shutdownTimerMsg) }
func (sc *http2serverConn) sendServeMsg(msg interface{}) {
sc.serveG.checkNotOn()
sc.serveG.checkNotOn() // NOT
select {
case sc.serveMsgCh <- msg:
case <-sc.doneServing:
......@@ -4458,7 +4574,7 @@ func (sc *http2serverConn) sendServeMsg(msg interface{}) {
func (sc *http2serverConn) readPreface() error {
errc := make(chan error, 1)
go func() {
// Read the client preface
buf := make([]byte, len(http2ClientPreface))
if _, err := io.ReadFull(sc.conn, buf); err != nil {
errc <- err
......@@ -4468,7 +4584,7 @@ func (sc *http2serverConn) readPreface() error {
errc <- nil
}
}()
timer := time.NewTimer(http2prefaceTimeout)
timer := time.NewTimer(http2prefaceTimeout) // TODO: configurable on *Server?
defer timer.Stop()
select {
case <-timer.C:
......@@ -4512,7 +4628,13 @@ func (sc *http2serverConn) writeDataFromHandler(stream *http2stream, data []byte
case <-sc.doneServing:
return http2errClientDisconnected
case <-stream.cw:
// If both ch and stream.cw were ready (as might
// happen on the final Write after an http.Handler
// ends), prefer the write result. Otherwise this
// might just be us successfully closing the stream.
// The writeFrameAsync and serve goroutines guarantee
// that the ch send will happen before the stream.cw
// close.
select {
case err = <-ch:
frameWriteDone = true
......@@ -4535,12 +4657,13 @@ func (sc *http2serverConn) writeDataFromHandler(stream *http2stream, data []byte
// buffered and is read by serve itself). If you're on the serve
// goroutine, call writeFrame instead.
func (sc *http2serverConn) writeFrameFromHandler(wr http2FrameWriteRequest) error {
sc.serveG.checkNotOn()
sc.serveG.checkNotOn() // NOT
select {
case sc.wantWriteFrameCh <- wr:
return nil
case <-sc.doneServing:
// Serve loop is gone.
// Client has closed their connection to the server.
return http2errClientDisconnected
}
}
......@@ -4559,6 +4682,24 @@ func (sc *http2serverConn) writeFrame(wr http2FrameWriteRequest) {
// If true, wr will not be written and wr.done will not be signaled.
var ignoreWrite bool
// We are not allowed to write frames on closed streams. RFC 7540 Section
// 5.1.1 says: "An endpoint MUST NOT send frames other than PRIORITY on
// a closed stream." Our server never sends PRIORITY, so that exception
// does not apply.
//
// The serverConn might close an open stream while the stream's handler
// is still running. For example, the server might close a stream when it
// receives bad data from the client. If this happens, the handler might
// attempt to write a frame after the stream has been closed (since the
// handler hasn't yet been notified of the close). In this case, we simply
// ignore the frame. The handler will notice that the stream is closed when
// it waits for the frame to be written.
//
// As an exception to this rule, we allow sending RST_STREAM after close.
// This allows us to immediately reject new streams without tracking any
// state for those streams (except for the queued RST_STREAM frame). This
// may result in duplicate RST_STREAMs in some cases, but the client should
// ignore those.
if wr.StreamID() != 0 {
_, isReset := wr.write.(http2StreamError)
if state, _ := sc.state(wr.StreamID()); state == http2stateClosed && !isReset {
......@@ -4566,12 +4707,15 @@ func (sc *http2serverConn) writeFrame(wr http2FrameWriteRequest) {
}
}
// Don't send a 100-continue response if we've already sent headers.
// See golang.org/issue/14030.
switch wr.write.(type) {
case *http2writeResHeaders:
wr.stream.wroteHeaders = true
case http2write100ContinueHeadersFrame:
if wr.stream.wroteHeaders {
// We do not need to notify wr.done because this frame is
// never written with wr.done != nil.
if wr.done != nil {
panic("wr.done != nil for write100ContinueHeadersFrame")
}
......@@ -4600,7 +4744,8 @@ func (sc *http2serverConn) startFrameWrite(wr http2FrameWriteRequest) {
case http2stateHalfClosedLocal:
switch wr.write.(type) {
case http2StreamError, http2handlerPanicRST, http2writeWindowUpdate:
// RFC 7540 Section 5.1 allows sending RST_STREAM, PRIORITY, and WINDOW_UPDATE
// in this state. (We never send PRIORITY from the server, so that is not checked.)
default:
panic(fmt.Sprintf("internal error: attempt to send frame on a half-closed-local stream: %v", wr))
}
......@@ -4654,9 +4799,21 @@ func (sc *http2serverConn) wroteFrame(res http2frameWriteResult) {
}
switch st.state {
case http2stateOpen:
// Here we would go to stateHalfClosedLocal in
// theory, but since our handler is done and
// the net/http package provides no mechanism
// for closing a ResponseWriter while still
// reading data (see possible TODO at top of
// this file), we go into closed state here
// anyway, after telling the peer we're
// hanging up on them. We'll transition to
// stateClosed after the RST_STREAM frame is
// written.
st.state = http2stateHalfClosedLocal
// Section 8.1: a server MAY request that the client abort
// transmission of a request without error by sending a
// RST_STREAM with an error code of NO_ERROR after sending
// a complete response.
sc.resetStream(http2streamError(st.id, http2ErrCodeNo))
case http2stateHalfClosedRemote:
sc.closeStream(st, http2errHandlerComplete)
......@@ -4664,7 +4821,7 @@ func (sc *http2serverConn) wroteFrame(res http2frameWriteResult) {
} else {
switch v := wr.write.(type) {
case http2StreamError:
// st may be unknown if the RST_STREAM was generated to reject bad input.
if st, ok := sc.streams[v.StreamID]; ok {
sc.closeStream(st, v)
}
......@@ -4673,6 +4830,7 @@ func (sc *http2serverConn) wroteFrame(res http2frameWriteResult) {
}
}
// Reply (if requested) to unblock the ServeHTTP goroutine.
wr.replyToWriter(res.err)
sc.scheduleFrameWrite()
......@@ -4720,7 +4878,7 @@ func (sc *http2serverConn) scheduleFrameWrite() {
}
if sc.needsFrameFlush {
sc.startFrameWrite(http2FrameWriteRequest{write: http2flushFrameWriter{}})
sc.needsFrameFlush = false
sc.needsFrameFlush = false // after startFrameWrite, since it sets this true
continue
}
break
......@@ -4736,7 +4894,7 @@ func (sc *http2serverConn) scheduleFrameWrite() {
// startGracefulShutdown returns immediately; it does not wait until
// the connection has shut down.
func (sc *http2serverConn) startGracefulShutdown() {
sc.serveG.checkNotOn()
sc.serveG.checkNotOn() // NOT
sc.shutdownOnce.Do(func() { sc.sendServeMsg(http2gracefulShutdownMsg) })
}
......@@ -4750,7 +4908,7 @@ func (sc *http2serverConn) goAway(code http2ErrCode) {
if code != http2ErrCodeNo {
forceCloseIn = 250 * time.Millisecond
} else {
// TODO: configurable
forceCloseIn = 1 * time.Second
}
sc.goAwayIn(code, forceCloseIn)
......@@ -4792,11 +4950,18 @@ func (sc *http2serverConn) processFrameFromReader(res http2readFrameResult) bool
if err != nil {
if err == http2ErrFrameTooLarge {
sc.goAway(http2ErrCodeFrameSize)
return true
return true // goAway will close the loop
}
clientGone := err == io.EOF || err == io.ErrUnexpectedEOF || http2isClosedConnError(err)
if clientGone {
// TODO: could we also get into this state if
// the peer does a half close
// (e.g. CloseWrite) because they're done
// sending frames but they're still wanting
// our open replies? Investigate.
// TODO: add CloseWrite to crypto/tls.Conn first
// so we have a way to test this? I suppose
// just for testing we could have a non-TLS mode.
return false
}
} else {
......@@ -4820,7 +4985,7 @@ func (sc *http2serverConn) processFrameFromReader(res http2readFrameResult) bool
case http2ConnectionError:
sc.logf("http2: server connection error from %v: %v", sc.conn.RemoteAddr(), ev)
sc.goAway(http2ErrCode(ev))
return true
return true // goAway will handle shutdown
default:
if res.err != nil {
sc.vlogf("http2: server closing client connection; error reading frame from client %s: %v", sc.conn.RemoteAddr(), err)
......@@ -4834,6 +4999,7 @@ func (sc *http2serverConn) processFrameFromReader(res http2readFrameResult) bool
func (sc *http2serverConn) processFrame(f http2Frame) error {
sc.serveG.check()
// First frame received must be SETTINGS.
if !sc.sawFirstSettings {
if _, ok := f.(*http2SettingsFrame); !ok {
return http2ConnectionError(http2ErrCodeProtocol)
......@@ -4859,7 +5025,8 @@ func (sc *http2serverConn) processFrame(f http2Frame) error {
case *http2GoAwayFrame:
return sc.processGoAway(f)
case *http2PushPromiseFrame:
// A client cannot push. Thus, servers MUST treat the receipt of a PUSH_PROMISE
// frame as a connection error (Section 5.4.1) of type PROTOCOL_ERROR.
return http2ConnectionError(http2ErrCodeProtocol)
default:
sc.vlogf("http2: server ignoring frame: %v", f.Header())
......@@ -4870,11 +5037,16 @@ func (sc *http2serverConn) processFrame(f http2Frame) error {
func (sc *http2serverConn) processPing(f *http2PingFrame) error {
sc.serveG.check()
if f.IsAck() {
// 6.7 PING: " An endpoint MUST NOT respond to PING frames
// containing this flag."
return nil
}
if f.StreamID != 0 {
// "PING frames are not associated with any individual
// stream. If a PING frame is received with a stream
// identifier field value other than 0x0, the recipient MUST
// respond with a connection error (Section 5.4.1) of type
// PROTOCOL_ERROR."
return http2ConnectionError(http2ErrCodeProtocol)
}
if sc.inGoAway && sc.goAwayCode != http2ErrCodeNo {
......@@ -4887,20 +5059,27 @@ func (sc *http2serverConn) processPing(f *http2PingFrame) error {
func (sc *http2serverConn) processWindowUpdate(f *http2WindowUpdateFrame) error {
sc.serveG.check()
switch {
case f.StreamID != 0:
case f.StreamID != 0: // stream-level flow control
state, st := sc.state(f.StreamID)
if state == http2stateIdle {
// Section 5.1: "Receiving any frame other than HEADERS
// or PRIORITY on a stream in this state MUST be
// treated as a connection error (Section 5.4.1) of
// type PROTOCOL_ERROR."
return http2ConnectionError(http2ErrCodeProtocol)
}
if st == nil {
// "WINDOW_UPDATE can be sent by a peer that has sent a
// frame bearing the END_STREAM flag. This means that a
// receiver could receive a WINDOW_UPDATE frame on a "half
// closed (remote)" or "closed" stream. A receiver MUST
// NOT treat this as an error, see Section 5.1."
return nil
}
if !st.flow.add(int32(f.Increment)) {
return http2streamError(f.StreamID, http2ErrCodeFlowControl)
}
default:
default: // connection-level flow control
if !sc.flow.add(int32(f.Increment)) {
return http2goAwayFlowError{}
}
......@@ -4914,7 +5093,11 @@ func (sc *http2serverConn) processResetStream(f *http2RSTStreamFrame) error {
state, st := sc.state(f.StreamID)
if state == http2stateIdle {
// 6.4 "RST_STREAM frames MUST NOT be sent for a
// stream in the "idle" state. If a RST_STREAM frame
// identifying an idle stream is received, the
// recipient MUST treat this as a connection error
// (Section 5.4.1) of type PROTOCOL_ERROR.
return http2ConnectionError(http2ErrCodeProtocol)
}
if st != nil {
......@@ -4949,12 +5132,13 @@ func (sc *http2serverConn) closeStream(st *http2stream, err error) {
}
}
if p := st.body; p != nil {
// Return any buffered unread bytes worth of conn-level flow control.
// See golang.org/issue/16481
sc.sendWindowUpdate(nil, p.Len())
p.CloseWithError(err)
}
st.cw.Close()
st.cw.Close() // signals Handler's CloseNotifier, unblocks writes, etc
sc.writeSched.CloseStream(st.id)
}
......@@ -4963,7 +5147,9 @@ func (sc *http2serverConn) processSettings(f *http2SettingsFrame) error {
if f.IsAck() {
sc.unackedSettings--
if sc.unackedSettings < 0 {
// Why is the peer ACKing settings we never sent?
// The spec doesn't mention this case, but
// hang up on them anyway.
return http2ConnectionError(http2ErrCodeProtocol)
}
return nil
......@@ -4995,11 +5181,13 @@ func (sc *http2serverConn) processSetting(s http2Setting) error {
case http2SettingInitialWindowSize:
return sc.processSettingInitialWindowSize(s.Val)
case http2SettingMaxFrameSize:
sc.maxFrameSize = int32(s.Val)
sc.maxFrameSize = int32(s.Val) // the maximum valid s.Val is < 2^31
case http2SettingMaxHeaderListSize:
sc.peerMaxHeaderListSize = s.Val
default:
// Unknown setting: "An endpoint that receives a SETTINGS
// frame with any unknown or unsupported identifier MUST
// ignore that setting."
if http2VerboseLogs {
sc.vlogf("http2: server ignoring unknown setting %v", s)
}
......@@ -5009,13 +5197,26 @@ func (sc *http2serverConn) processSetting(s http2Setting) error {
func (sc *http2serverConn) processSettingInitialWindowSize(val uint32) error {
sc.serveG.check()
// Note: val already validated to be within range by
// processSetting's Valid call.
// "A SETTINGS frame can alter the initial flow control window
// size for all current streams. When the value of
// SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST
// adjust the size of all stream flow control windows that it
// maintains by the difference between the new value and the
// old value."
old := sc.initialStreamSendWindowSize
sc.initialStreamSendWindowSize = int32(val)
growth := int32(val) - old
growth := int32(val) - old // may be negative
for _, st := range sc.streams {
if !st.flow.add(growth) {
// 6.9.2 Initial Flow Control Window Size
// "An endpoint MUST treat a change to
// SETTINGS_INITIAL_WINDOW_SIZE that causes any flow
// control window to exceed the maximum size as a
// connection error (Section 5.4.1) of type
// FLOW_CONTROL_ERROR."
return http2ConnectionError(http2ErrCodeFlowControl)
}
}
......@@ -5029,23 +5230,40 @@ func (sc *http2serverConn) processData(f *http2DataFrame) error {
}
data := f.Data()
// "If a DATA frame is received whose stream is not in "open"
// or "half closed (local)" state, the recipient MUST respond
// with a stream error (Section 5.4.2) of type STREAM_CLOSED."
id := f.Header().StreamID
state, st := sc.state(id)
if id == 0 || state == http2stateIdle {
// Section 5.1: "Receiving any frame other than HEADERS
// or PRIORITY on a stream in this state MUST be
// treated as a connection error (Section 5.4.1) of
// type PROTOCOL_ERROR."
return http2ConnectionError(http2ErrCodeProtocol)
}
if st == nil || state != http2stateOpen || st.gotTrailerHeader || st.resetQueued {
// This includes sending a RST_STREAM if the stream is
// in stateHalfClosedLocal (which currently means that
// the http.Handler returned, so it's done reading &
// done writing). Try to stop the client from sending
// more DATA.
// But still enforce their connection-level flow control,
// and return any flow control bytes since we're not going
// to consume them.
if sc.inflow.available() < int32(f.Length) {
return http2streamError(id, http2ErrCodeFlowControl)
}
// Deduct the flow control from inflow, since we're
// going to immediately add it back in
// sendWindowUpdate, which also schedules sending the
// frames.
sc.inflow.take(int32(f.Length))
sc.sendWindowUpdate(nil, int(f.Length))
sc.sendWindowUpdate(nil, int(f.Length)) // conn-level
if st != nil && st.resetQueued {
// Already have a stream error in flight. Don't send another.
return nil
}
return http2streamError(id, http2ErrCodeStreamClosed)
......@@ -5054,12 +5272,13 @@ func (sc *http2serverConn) processData(f *http2DataFrame) error {
panic("internal error: should have a body in this state")
}
// Sender sending more than they'd declared?
if st.declBodyBytes != -1 && st.bodyBytes+int64(len(data)) > st.declBodyBytes {
st.body.CloseWithError(fmt.Errorf("sender tried to send more than declared Content-Length of %d bytes", st.declBodyBytes))
return http2streamError(id, http2ErrCodeStreamClosed)
}
if f.Length > 0 {
// Check whether the client has flow control quota.
if st.inflow.available() < int32(f.Length) {
return http2streamError(id, http2ErrCodeFlowControl)
}
......@@ -5076,6 +5295,8 @@ func (sc *http2serverConn) processData(f *http2DataFrame) error {
st.bodyBytes += int64(len(data))
}
// Return any padded flow control now, since we won't
// refund it later on body reads.
if pad := int32(f.Length) - int32(len(data)); pad > 0 {
sc.sendWindowUpdate32(nil, pad)
sc.sendWindowUpdate32(st, pad)
......@@ -5095,7 +5316,8 @@ func (sc *http2serverConn) processGoAway(f *http2GoAwayFrame) error {
sc.vlogf("http2: received GOAWAY %+v, starting graceful shutdown", f)
}
sc.startGracefulShutdownInternal()
// http://tools.ietf.org/html/rfc7540#section-6.8
// We should not create any new streams, which means we should disable push.
sc.pushEnabled = false
return nil
}
......@@ -5126,7 +5348,7 @@ func (st *http2stream) endStream() {
func (st *http2stream) copyTrailersToHandlerRequest() {
for k, vv := range st.trailer {
if _, ok := st.reqTrailer[k]; ok {
// Only copy it over it was pre-declared.
st.reqTrailer[k] = vv
}
}
......@@ -5142,22 +5364,35 @@ func (sc *http2serverConn) processHeaders(f *http2MetaHeadersFrame) error {
sc.serveG.check()
id := f.StreamID
if sc.inGoAway {
// Ignore.
return nil
}
// http://tools.ietf.org/html/rfc7540#section-5.1.1
// Streams initiated by a client MUST use odd-numbered stream
// identifiers. [...] An endpoint that receives an unexpected
// stream identifier MUST respond with a connection error
// (Section 5.4.1) of type PROTOCOL_ERROR.
if id%2 != 1 {
return http2ConnectionError(http2ErrCodeProtocol)
}
// A HEADERS frame can be used to create a new stream or
// send a trailer for an open one. If we already have a stream
// open, let it process its own HEADERS frame (trailers at this
// point, if it's valid).
if st := sc.streams[f.StreamID]; st != nil {
if st.resetQueued {
// We're sending RST_STREAM to close the stream, so don't bother
// processing this frame.
return nil
}
return st.processTrailerHeaders(f)
}
// [...] The identifier of a newly established stream MUST be
// numerically greater than all streams that the initiating
// endpoint has opened or reserved. [...] An endpoint that
// receives an unexpected stream identifier MUST respond with
// a connection error (Section 5.4.1) of type PROTOCOL_ERROR.
if id <= sc.maxClientStreamID {
return http2ConnectionError(http2ErrCodeProtocol)
}
......@@ -5167,12 +5402,22 @@ func (sc *http2serverConn) processHeaders(f *http2MetaHeadersFrame) error {
sc.idleTimer.Stop()
}
// http://tools.ietf.org/html/rfc7540#section-5.1.2
// [...] Endpoints MUST NOT exceed the limit set by their peer. An
// endpoint that receives a HEADERS frame that causes their
// advertised concurrent stream limit to be exceeded MUST treat
// this as a stream error (Section 5.4.2) of type PROTOCOL_ERROR
// or REFUSED_STREAM.
if sc.curClientStreams+1 > sc.advMaxStreams {
if sc.unackedSettings == 0 {
// They should know better.
return http2streamError(id, http2ErrCodeProtocol)
}
// Assume it's a network race, where they just haven't
// received our last SETTINGS update. But actually
// this can't happen yet, because we don't yet provide
// a way for users to adjust server parameters at
// runtime.
return http2streamError(id, http2ErrCodeRefusedStream)
}
......@@ -5197,17 +5442,24 @@ func (sc *http2serverConn) processHeaders(f *http2MetaHeadersFrame) error {
if st.reqTrailer != nil {
st.trailer = make(Header)
}
st.body = req.Body.(*http2requestBody).pipe
st.body = req.Body.(*http2requestBody).pipe // may be nil
st.declBodyBytes = req.ContentLength
handler := sc.handler.ServeHTTP
if f.Truncated {
// Their header list was too long. Send a 431 error.
handler = http2handleHeaderListTooLong
} else if err := http2checkValidHTTP2RequestHeaders(req.Header); err != nil {
handler = http2new400Handler(err)
}
// The net/http package sets the read deadline from the
// http.Server.ReadTimeout during the TLS handshake, but then
// passes the connection off to us with the deadline already
// set. Disarm it here after the request headers are read,
// similar to how the http1 server works. Here it's
// technically more like the http1 Server's ReadHeaderTimeout
// (in Go 1.8), though. That's a more sane option anyway.
if sc.hs.ReadTimeout != 0 {
sc.conn.SetReadDeadline(time.Time{})
}
......@@ -5234,7 +5486,9 @@ func (st *http2stream) processTrailerHeaders(f *http2MetaHeadersFrame) error {
for _, hf := range f.RegularFields() {
key := sc.canonicalHeader(hf.Name)
if !http2ValidTrailerHeader(key) {
// TODO: send more details to the peer somehow. But http2 has
// no way to send debug data at a stream level. Discuss with
// HTTP folk.
return http2streamError(st.id, http2ErrCodeProtocol)
}
st.trailer[key] = append(st.trailer[key], hf.Value)
......@@ -5246,7 +5500,10 @@ func (st *http2stream) processTrailerHeaders(f *http2MetaHeadersFrame) error {
func http2checkPriority(streamID uint32, p http2PriorityParam) error {
if streamID == p.StreamDep {
// Section 5.3.1: "A stream cannot depend on itself. An endpoint MUST treat
// this as a stream error (Section 5.4.2) of type PROTOCOL_ERROR."
// Section 5.3.3 says that a stream can depend on one of its dependencies,
// so it's only self-dependencies that are forbidden.
return http2streamError(streamID, http2ErrCodeProtocol)
}
return nil
......@@ -5278,9 +5535,9 @@ func (sc *http2serverConn) newStream(id, pusherID uint32, state http2streamState
cancelCtx: cancelCtx,
}
st.cw.Init()
st.flow.conn = &sc.flow
st.flow.conn = &sc.flow // link to conn-level counter
st.flow.add(sc.initialStreamSendWindowSize)
st.inflow.conn = &sc.inflow
st.inflow.conn = &sc.inflow // link to conn-level counter
st.inflow.add(sc.srv.initialStreamRecvWindowSize())
if sc.hs.WriteTimeout != 0 {
st.writeDeadline = time.AfterFunc(sc.hs.WriteTimeout, st.onWriteTimeout)
......@@ -5316,13 +5573,22 @@ func (sc *http2serverConn) newWriterAndRequest(st *http2stream, f *http2MetaHead
return nil, nil, http2streamError(f.StreamID, http2ErrCodeProtocol)
}
} else if rp.method == "" || rp.path == "" || (rp.scheme != "https" && rp.scheme != "http") {
// See 8.1.2.6 Malformed Requests and Responses:
//
// Malformed requests or responses that are detected
// MUST be treated as a stream error (Section 5.4.2)
// of type PROTOCOL_ERROR."
//
// 8.1.2.3 Request Pseudo-Header Fields
// "All HTTP/2 requests MUST include exactly one valid
// value for the :method, :scheme, and :path
// pseudo-header fields"
return nil, nil, http2streamError(f.StreamID, http2ErrCodeProtocol)
}
bodyOpen := !f.StreamEnded()
if rp.method == "HEAD" && bodyOpen {
// HEAD requests can't have bodies
return nil, nil, http2streamError(f.StreamID, http2ErrCodeProtocol)
}
......@@ -5369,7 +5635,7 @@ func (sc *http2serverConn) newWriterAndRequestNoBody(st *http2stream, rp http2re
if needsContinue {
rp.header.Del("Expect")
}
// Merge Cookie headers into one "; "-delimited value.
if cookies := rp.header["Cookie"]; len(cookies) > 1 {
rp.header.Set("Cookie", strings.Join(cookies, "; "))
}
......@@ -5381,7 +5647,8 @@ func (sc *http2serverConn) newWriterAndRequestNoBody(st *http2stream, rp http2re
key = CanonicalHeaderKey(strings.TrimSpace(key))
switch key {
case "Transfer-Encoding", "Trailer", "Content-Length":
// Bogus. (copy of http1 rules)
// Ignore.
default:
if trailer == nil {
trailer = make(Header)
......@@ -5396,7 +5663,7 @@ func (sc *http2serverConn) newWriterAndRequestNoBody(st *http2stream, rp http2re
var requestURI string
if rp.method == "CONNECT" {
url_ = &url.URL{Host: rp.authority}
requestURI = rp.authority
requestURI = rp.authority // mimic HTTP/1 server behavior
} else {
var err error
url_, err = url.ParseRequestURI(rp.path)
......@@ -5429,7 +5696,7 @@ func (sc *http2serverConn) newWriterAndRequestNoBody(st *http2stream, rp http2re
rws := http2responseWriterStatePool.Get().(*http2responseWriterState)
bwSave := rws.bw
*rws = http2responseWriterState{}
*rws = http2responseWriterState{} // zero all the fields
rws.conn = sc
rws.bw = bwSave
rws.bw.Reset(http2chunkWriter{rws})
......@@ -5452,7 +5719,7 @@ func (sc *http2serverConn) runHandler(rw *http2responseWriter, req *Request, han
write: http2handlerPanicRST{rw.rws.stream.id},
stream: rw.rws.stream,
})
// Same as net/http:
if http2shouldLogPanic(e) {
const size = 64 << 10
buf := make([]byte, size)
......@@ -5480,10 +5747,13 @@ func http2handleHeaderListTooLong(w ResponseWriter, r *Request) {
// called from handler goroutines.
// h may be nil.
func (sc *http2serverConn) writeHeaders(st *http2stream, headerData *http2writeResHeaders) error {
sc.serveG.checkNotOn()
sc.serveG.checkNotOn() // NOT on
var errc chan error
if headerData.h != nil {
// If there's a header map (which we don't own), so we have to block on
// waiting for this frame to be written, so an http.Flush mid-handler
// writes out the correct value of keys, before a handler later potentially
// mutates it.
errc = http2errChanPool.Get().(chan error)
}
if err := sc.writeFrameFromHandler(http2FrameWriteRequest{
......@@ -5526,7 +5796,7 @@ type http2bodyReadMsg struct {
// Notes that the handler for the given stream ID read n bytes of its body
// and schedules flow control tokens to be sent.
func (sc *http2serverConn) noteBodyReadFromHandler(st *http2stream, n int, err error) {
sc.serveG.checkNotOn()
sc.serveG.checkNotOn() // NOT on
if n > 0 {
select {
case sc.bodyReadCh <- http2bodyReadMsg{st, n}:
......@@ -5537,9 +5807,10 @@ func (sc *http2serverConn) noteBodyReadFromHandler(st *http2stream, n int, err e
func (sc *http2serverConn) noteBodyRead(st *http2stream, n int) {
sc.serveG.check()
sc.sendWindowUpdate(nil, n)
sc.sendWindowUpdate(nil, n) // conn-level
if st.state != http2stateHalfClosedRemote && st.state != http2stateClosed {
// Don't send this WINDOW_UPDATE if the stream is closed
// remotely.
sc.sendWindowUpdate(st, n)
}
}
......@@ -5681,7 +5952,7 @@ func (rws *http2responseWriterState) hasTrailers() bool { return len(rws.trailer
func (rws *http2responseWriterState) declareTrailer(k string) {
k = CanonicalHeaderKey(k)
if !http2ValidTrailerHeader(k) {
// Forbidden by RFC 2616 14.40.
rws.conn.logf("ignoring invalid trailer %q", k)
return
}
......@@ -5723,7 +5994,7 @@ func (rws *http2responseWriterState) writeChunk(p []byte) (n int, err error) {
}
var date string
if _, ok := rws.snapHeader["Date"]; !ok {
// TODO(bradfitz): be faster here, like net/http? measure.
date = time.Now().UTC().Format(TimeFormat)
}
......@@ -5761,7 +6032,7 @@ func (rws *http2responseWriterState) writeChunk(p []byte) (n int, err error) {
endStream := rws.handlerDone && !rws.hasTrailers()
if len(p) > 0 || endStream {
// only send a 0 byte DATA frame if we're ending the stream.
if err := rws.conn.writeDataFromHandler(rws.stream, p, endStream); err != nil {
return 0, err
}
......@@ -5838,11 +6109,14 @@ func (w *http2responseWriter) Flush() {
}
if rws.bw.Buffered() > 0 {
if err := rws.bw.Flush(); err != nil {
// Ignore the error. The frame writer already knows.
return
}
} else {
// The bufio.Writer won't call chunkWriter.Write
// (writeChunk with zero bytes, so we have to do it
// ourselves to force the HTTP response header and/or
// final DATA frame (with END_STREAM) to be sent.
rws.writeChunk(nil)
}
}
......@@ -5859,7 +6133,7 @@ func (w *http2responseWriter) CloseNotify() <-chan bool {
rws.closeNotifierCh = ch
cw := rws.stream.cw
go func() {
cw.Wait()
cw.Wait() // wait for close
ch <- true
}()
}
......@@ -5934,9 +6208,9 @@ func (w *http2responseWriter) write(lenData int, dataB []byte, dataS string) (n
if !http2bodyAllowedForStatus(rws.status) {
return 0, ErrBodyNotAllowed
}
rws.wroteBytes += int64(len(dataB)) + int64(len(dataS))
rws.wroteBytes += int64(len(dataB)) + int64(len(dataS)) // only one can be set
if rws.sentContentLen != 0 && rws.wroteBytes > rws.sentContentLen {
// TODO: send a RST_STREAM
return 0, errors.New("http2: handler wrote more than declared Content-Length")
}
......@@ -5973,10 +6247,13 @@ func (w *http2responseWriter) push(target string, opts http2pushOptions) error {
sc := st.sc
sc.serveG.checkNotOn()
// No recursive pushes: "PUSH_PROMISE frames MUST only be sent on a peer-initiated stream."
// http://tools.ietf.org/html/rfc7540#section-6.6
if st.isPushed() {
return http2ErrRecursivePush
}
// Default options.
if opts.Method == "" {
opts.Method = "GET"
}
......@@ -5988,6 +6265,7 @@ func (w *http2responseWriter) push(target string, opts http2pushOptions) error {
wantScheme = "https"
}
// Validate the request.
u, err := url.Parse(target)
if err != nil {
return err
......@@ -6010,7 +6288,10 @@ func (w *http2responseWriter) push(target string, opts http2pushOptions) error {
if strings.HasPrefix(k, ":") {
return fmt.Errorf("promised request headers cannot include pseudo header %q", k)
}
// These headers are meaningful only if the request has a body,
// but PUSH_PROMISE requests cannot have a body.
// http://tools.ietf.org/html/rfc7540#section-8.2
// Also disallow Host, since the promised URL must be absolute.
switch strings.ToLower(k) {
case "content-length", "content-encoding", "trailer", "te", "expect", "host":
return fmt.Errorf("promised request headers cannot include %q", k)
......@@ -6020,6 +6301,9 @@ func (w *http2responseWriter) push(target string, opts http2pushOptions) error {
return err
}
// The RFC effectively limits promised requests to GET and HEAD:
// "Promised requests MUST be cacheable [GET, HEAD, or POST], and MUST be safe [GET or HEAD]"
// http://tools.ietf.org/html/rfc7540#section-8.2
if opts.Method != "GET" && opts.Method != "HEAD" {
return fmt.Errorf("method %q must be GET or HEAD", opts.Method)
}
......@@ -6062,28 +6346,41 @@ type http2startPushRequest struct {
func (sc *http2serverConn) startPush(msg *http2startPushRequest) {
sc.serveG.check()
// http://tools.ietf.org/html/rfc7540#section-6.6.
// PUSH_PROMISE frames MUST only be sent on a peer-initiated stream that
// is in either the "open" or "half-closed (remote)" state.
if msg.parent.state != http2stateOpen && msg.parent.state != http2stateHalfClosedRemote {
// responseWriter.Push checks that the stream is peer-initiaed.
msg.done <- http2errStreamClosed
return
}
// http://tools.ietf.org/html/rfc7540#section-6.6.
if !sc.pushEnabled {
msg.done <- ErrNotSupported
return
}
// PUSH_PROMISE frames must be sent in increasing order by stream ID, so
// we allocate an ID for the promised stream lazily, when the PUSH_PROMISE
// is written. Once the ID is allocated, we start the request handler.
allocatePromisedID := func() (uint32, error) {
sc.serveG.check()
// Check this again, just in case. Technically, we might have received
// an updated SETTINGS by the time we got around to writing this frame.
if !sc.pushEnabled {
return 0, ErrNotSupported
}
// http://tools.ietf.org/html/rfc7540#section-6.5.2.
if sc.curPushedStreams+1 > sc.clientMaxStreams {
return 0, http2ErrPushLimitReached
}
// http://tools.ietf.org/html/rfc7540#section-5.1.1.
// Streams initiated by the server MUST use even-numbered identifiers.
// A server that is unable to establish a new stream identifier can send a GOAWAY
// frame so that the client is forced to open a new connection for new streams.
if sc.maxPushPromiseID+2 >= 1<<31 {
sc.startGracefulShutdownInternal()
return 0, http2ErrPushLimitReached
......@@ -6091,16 +6388,21 @@ func (sc *http2serverConn) startPush(msg *http2startPushRequest) {
sc.maxPushPromiseID += 2
promisedID := sc.maxPushPromiseID
// http://tools.ietf.org/html/rfc7540#section-8.2.
// Strictly speaking, the new stream should start in "reserved (local)", then
// transition to "half closed (remote)" after sending the initial HEADERS, but
// we start in "half closed (remote)" for simplicity.
// See further comments at the definition of stateHalfClosedRemote.
promised := sc.newStream(promisedID, msg.parent.id, http2stateHalfClosedRemote)
rw, req, err := sc.newWriterAndRequestNoBody(promised, http2requestParam{
method: msg.method,
scheme: msg.url.Scheme,
authority: msg.url.Host,
path: msg.url.RequestURI(),
header: http2cloneHeader(msg.header),
header: http2cloneHeader(msg.header), // clone since handler runs concurrently with writing the PUSH_PROMISE
})
if err != nil {
// Should not happen, since we've already validated msg.url.
panic(fmt.Sprintf("newWriterAndRequestNoBody(%+v): %v", msg.url, err))
}
......@@ -6310,7 +6612,7 @@ var http2errTransportVersion = errors.New("http2: ConfigureTransport is only sup
// It requires Go 1.6 or later and returns an error if the net/http package is too old
// or if t1 has already been HTTP/2-enabled.
func http2ConfigureTransport(t1 *Transport) error {
_, err := http2configureTransport(t1)
_, err := http2configureTransport(t1) // in configure_transport.go (go1.6) or not_go16.go
return err
}
......@@ -6493,7 +6795,7 @@ func (t *http2Transport) RoundTrip(req *Request) (*Response, error) {
// and returns a host:port. The port 443 is added if needed.
func http2authorityAddr(scheme string, authority string) (addr string) {
host, port, err := net.SplitHostPort(authority)
if err != nil {
if err != nil { // authority didn't have a port
port = "443"
if scheme == "http" {
port = "80"
......@@ -6503,7 +6805,7 @@ func http2authorityAddr(scheme string, authority string) (addr string) {
if a, err := idna.ToASCII(host); err == nil {
host = a
}
// IPv6 address literal, without a port:
if strings.HasPrefix(host, "[") && strings.HasSuffix(host, "]") {
return host + ":" + port
}
......@@ -6566,12 +6868,14 @@ func http2shouldRetryRequest(req *Request, err error) (*Request, error) {
case http2errClientConnUnusable, http2errClientConnGotGoAway:
return req, nil
case http2errClientConnGotGoAwayAfterSomeReqBody:
// If the Body is nil (or http.NoBody), it's safe to reuse
// this request and its Body.
if req.Body == nil || http2reqBodyIsNoBody(req.Body) {
return req, nil
}
getBody := http2reqGetBody(req)
// Otherwise we depend on the Request having its GetBody
// func defined.
getBody := http2reqGetBody(req) // Go 1.8: getBody = req.GetBody
if getBody == nil {
return nil, errors.New("http2: Transport: peer server initiated graceful shutdown after some of Request.Body was written; define Request.GetBody to avoid this error")
}
......@@ -6664,9 +6968,9 @@ func (t *http2Transport) newClientConn(c net.Conn, singleUse bool) (*http2Client
tconn: c,
readerDone: make(chan struct{}),
nextStreamID: 1,
maxFrameSize: 16 << 10,
initialWindowSize: 65535,
maxConcurrentStreams: 1000,
maxFrameSize: 16 << 10, // spec default
initialWindowSize: 65535, // spec default
maxConcurrentStreams: 1000, // "infinite", per spec. 1000 seems good enough.
streams: make(map[uint32]*http2clientStream),
singleUse: singleUse,
wantSettingsAck: true,
......@@ -6683,12 +6987,16 @@ func (t *http2Transport) newClientConn(c net.Conn, singleUse bool) (*http2Client
cc.cond = sync.NewCond(&cc.mu)
cc.flow.add(int32(http2initialWindowSize))
// TODO: adjust this writer size to account for frame size +
// MTU + crypto/tls record padding.
cc.bw = bufio.NewWriter(http2stickyErrWriter{c, &cc.werr})
cc.br = bufio.NewReader(c)
cc.fr = http2NewFramer(cc.bw, cc.br)
cc.fr.ReadMetaHeaders = hpack.NewDecoder(http2initialHeaderTableSize, nil)
cc.fr.MaxHeaderListSize = t.maxHeaderListSize()
// TODO: SetMaxDynamicTableSize, SetMaxDynamicTableSizeLimit on
// henc in response to SETTINGS frames?
cc.henc = hpack.NewEncoder(&cc.hbuf)
if cs, ok := c.(http2connectionStater); ok {
......@@ -6724,6 +7032,7 @@ func (cc *http2ClientConn) setGoAway(f *http2GoAwayFrame) {
old := cc.goAway
cc.goAway = f
// Merge the previous and current GoAway error frames.
if cc.goAwayDebug == "" {
cc.goAwayDebug = string(f.DebugData())
}
......@@ -6774,7 +7083,7 @@ func (cc *http2ClientConn) closeIfIdle() {
}
cc.closed = true
nextID := cc.nextStreamID
// TODO: do clients send GOAWAY too? maybe? Just Close:
cc.mu.Unlock()
if http2VerboseLogs {
......@@ -6820,7 +7129,7 @@ func (cc *http2ClientConn) putFrameScratchBuffer(buf []byte) {
return
}
}
// forget about it.
}
// errRequestCanceled is a copy of net/http's errRequestCanceled because it's not
......@@ -6848,7 +7157,10 @@ func (cc *http2ClientConn) responseHeaderTimeout() time.Duration {
if cc.t.t1 != nil {
return cc.t.t1.ResponseHeaderTimeout
}
// No way to do this (yet?) with just an http2.Transport. Probably
// no need. Request.Cancel this is the new way. We only need to support
// this for compatibility with the old http.Transport fields when
// we're doing transparent http2.
return 0
}
......@@ -6912,10 +7224,24 @@ func (cc *http2ClientConn) RoundTrip(req *Request) (*Response, error) {
req.Header.Get("Accept-Encoding") == "" &&
req.Header.Get("Range") == "" &&
req.Method != "HEAD" {
// Request gzip only, not deflate. Deflate is ambiguous and
// not as universally supported anyway.
// See: http://www.gzip.org/zlib/zlib_faq.html#faq38
//
// Note that we don't request this for HEAD requests,
// due to a bug in nginx:
// http://trac.nginx.org/nginx/ticket/358
// https://golang.org/issue/5522
//
// We don't request gzip if the request is for a range, since
// auto-decoding a portion of a gzipped document will just fail
// anyway. See https://golang.org/issue/8923
requestedGzip = true
}
// we send: HEADERS{1}, CONTINUATION{0,} + DATA{0,} (DATA is
// sent by writeRequestBody below, along with any Trailers,
// again in form HEADERS{1}, CONTINUATION{0,})
hdrs, err := cc.encodeHeaders(req, requestedGzip, trailers, contentLen)
if err != nil {
cc.mu.Unlock()
......@@ -6938,11 +7264,12 @@ func (cc *http2ClientConn) RoundTrip(req *Request) (*Response, error) {
if werr != nil {
if hasBody {
req.Body.Close()
req.Body.Close() // per RoundTripper contract
bodyWriter.cancel()
}
cc.forgetStreamID(cs.ID)
// Don't bother sending a RST_STREAM (our write already failed;
// no need to keep writing)
http2traceWroteRequest(cs.trace, werr)
return nil, werr
}
......@@ -6966,7 +7293,15 @@ func (cc *http2ClientConn) RoundTrip(req *Request) (*Response, error) {
handleReadLoopResponse := func(re http2resAndError) (*Response, error) {
res := re.res
if re.err != nil || res.StatusCode > 299 {
// On error or status code 3xx, 4xx, 5xx, etc abort any
// ongoing write, assuming that the server doesn't care
// about our request body. If the server replied with 1xx or
// 2xx, however, then assume the server DOES potentially
// want our body (e.g. full-duplex streaming:
// golang.org/issue/13444). If it turns out the server
// doesn't, they'll RST_STREAM us soon enough. This is a
// heuristic to avoid adding knobs to Transport. Hopefully
// we can keep it.
bodyWriter.cancel()
cs.abortRequestBodyWrite(http2errStopReqBodyWrite)
}
......@@ -7018,10 +7353,12 @@ func (cc *http2ClientConn) RoundTrip(req *Request) (*Response, error) {
}
return nil, http2errRequestCanceled
case <-cs.peerReset:
// processResetStream already removed the
// stream from the streams map; no need for
// forgetStreamID.
return nil, cs.resetErr
case err := <-bodyWriter.resc:
// Prefer the read loop's response, if available. Issue 16102.
select {
case re := <-readLoopResCh:
return handleReadLoopResponse(re)
......@@ -7042,7 +7379,7 @@ func (cc *http2ClientConn) RoundTrip(req *Request) (*Response, error) {
// requires cc.wmu be held
func (cc *http2ClientConn) writeHeaders(streamID uint32, endStream bool, hdrs []byte) error {
first := true
first := true // first frame written (HEADERS is first, then CONTINUATION)
frameSize := int(cc.maxFrameSize)
for len(hdrs) > 0 && cc.werr == nil {
chunk := hdrs
......@@ -7063,7 +7400,10 @@ func (cc *http2ClientConn) writeHeaders(streamID uint32, endStream bool, hdrs []
cc.fr.WriteContinuation(streamID, endHeaders, chunk)
}
}
// TODO(bradfitz): this Flush could potentially block (as
// could the WriteHeaders call(s) above), which means they
// wouldn't respond to Request.Cancel being readable. That's
// rare, but this should probably be in a goroutine.
cc.bw.Flush()
return cc.werr
}
......@@ -7079,13 +7419,16 @@ var (
func (cs *http2clientStream) writeRequestBody(body io.Reader, bodyCloser io.Closer) (err error) {
cc := cs.cc
sentEnd := false
sentEnd := false // whether we sent the final DATA frame w/ END_STREAM
buf := cc.frameScratchBuffer()
defer cc.putFrameScratchBuffer(buf)
defer func() {
http2traceWroteRequest(cs.trace, err)
// TODO: write h12Compare test showing whether
// Request.Body is closed by the Transport,
// and in multiple cases: server replies <=299 and >299
// while still writing request body
cerr := bodyCloser.Close()
if err == nil {
err = cerr
......@@ -7124,7 +7467,12 @@ func (cs *http2clientStream) writeRequestBody(body io.Reader, bodyCloser io.Clos
sentEnd = sawEOF && len(remain) == 0 && !hasTrailers
err = cc.fr.WriteData(cs.ID, sentEnd, data)
if err == nil {
// TODO(bradfitz): this flush is for latency, not bandwidth.
// Most requests won't need this. Make this opt-in or
// opt-out? Use some heuristic on the body type? Nagel-like
// timers? Based on 'n'? Only last chunk of this for loop,
// unless flow control tokens are low? For now, always.
// If we change this, see comment below.
err = cc.bw.Flush()
}
cc.wmu.Unlock()
......@@ -7135,7 +7483,9 @@ func (cs *http2clientStream) writeRequestBody(body io.Reader, bodyCloser io.Clos
}
if sentEnd {
// Already sent END_STREAM (which implies we have no
// trailers) and flushed, because currently all
// WriteData frames above get a flush. So we're done.
return nil
}
......@@ -7149,6 +7499,8 @@ func (cs *http2clientStream) writeRequestBody(body io.Reader, bodyCloser io.Clos
cc.wmu.Lock()
defer cc.wmu.Unlock()
// Two ways to send END_STREAM: either with trailers, or
// with an empty DATA frame.
if len(trls) > 0 {
err = cc.writeHeaders(cs.ID, true, trls)
} else {
......@@ -7182,7 +7534,7 @@ func (cs *http2clientStream) awaitFlowControl(maxBytes int) (taken int32, err er
take := a
if int(take) > maxBytes {
take = int32(maxBytes)
take = int32(maxBytes) // can't truncate int; take is int32
}
if take > int32(cc.maxFrameSize) {
take = int32(cc.maxFrameSize)
......@@ -7230,6 +7582,9 @@ func (cc *http2ClientConn) encodeHeaders(req *Request, addGzipHeader bool, trail
}
}
// Check for any invalid headers and return an error before we
// potentially pollute our hpack state. (We want to be able to
// continue to reuse the hpack encoder for future requests)
for k, vv := range req.Header {
if !httplex.ValidHeaderFieldName(k) {
return nil, fmt.Errorf("invalid HTTP header name %q", k)
......@@ -7241,6 +7596,11 @@ func (cc *http2ClientConn) encodeHeaders(req *Request, addGzipHeader bool, trail
}
}
// 8.1.2.3 Request Pseudo-Header Fields
// The :path pseudo-header field includes the path and query parts of the
// target URI (the path-absolute production and optionally a '?' character
// followed by the query production (see Sections 3.3 and 3.4 of
// [RFC3986]).
cc.writeHeader(":authority", host)
cc.writeHeader(":method", req.Method)
if req.Method != "CONNECT" {
......@@ -7256,13 +7616,20 @@ func (cc *http2ClientConn) encodeHeaders(req *Request, addGzipHeader bool, trail
lowKey := strings.ToLower(k)
switch lowKey {
case "host", "content-length":
// Host is :authority, already sent.
// Content-Length is automatic, set below.
continue
case "connection", "proxy-connection", "transfer-encoding", "upgrade", "keep-alive":
// Per 8.1.2.2 Connection-Specific Header
// Fields, don't send connection-specific
// fields. We have already checked if any
// are error-worthy so just ignore the rest.
continue
case "user-agent":
// Match Go's http1 behavior: at most one
// User-Agent. If set to nil or empty string,
// then omit it. Otherwise if not mentioned,
// include the default (below).
didUA = true
if len(vv) < 1 {
continue
......@@ -7300,7 +7667,8 @@ func http2shouldSendReqContentLength(method string, contentLength int64) bool {
if contentLength < 0 {
return false
}
// For zero bodies, whether we send a content-length depends on the method.
// It also kinda doesn't matter for http2 either way, with END_STREAM.
switch method {
case "POST", "PUT", "PATCH":
return true
......@@ -7313,7 +7681,8 @@ func http2shouldSendReqContentLength(method string, contentLength int64) bool {
func (cc *http2ClientConn) encodeTrailers(req *Request) []byte {
cc.hbuf.Reset()
for k, vv := range req.Trailer {
// Transfer-Encoding, etc.. have already been filter at the
// start of RoundTrip
lowKey := strings.ToLower(k)
for _, v := range vv {
cc.writeHeader(lowKey, v)
......@@ -7367,7 +7736,7 @@ func (cc *http2ClientConn) streamByID(id uint32, andRemove bool) *http2clientStr
cc.idleTimer.Reset(cc.idleTimeout)
}
close(cs.done)
cc.cond.Broadcast()
cc.cond.Broadcast() // wake up checkResetOrDone via clientStream.awaitFlowControl
}
return cs
}
......@@ -7426,6 +7795,9 @@ func (rl *http2clientConnReadLoop) cleanup() {
cc.idleTimer.Stop()
}
// Close any response bodies if the server closes prematurely.
// TODO: also do this if we've written the headers but not
// gotten a response yet.
err := cc.readerErr
cc.mu.Lock()
if cc.goAway != nil && http2isEOFOrNetReadError(err) {
......@@ -7455,7 +7827,7 @@ func (rl *http2clientConnReadLoop) cleanup() {
func (rl *http2clientConnReadLoop) run() error {
cc := rl.cc
rl.closeWhenIdle = cc.t.disableKeepAlives() || cc.singleUse
gotReply := false
gotReply := false // ever saw a HEADERS reply
gotSettings := false
for {
f, err := cc.fr.ReadFrame()
......@@ -7463,7 +7835,7 @@ func (rl *http2clientConnReadLoop) run() error {
cc.vlogf("http2: Transport readFrame error on conn %p: (%T) %v", cc, err, err)
}
if se, ok := err.(http2StreamError); ok {
if cs := cc.streamByID(se.StreamID, true); cs != nil {
if cs := cc.streamByID(se.StreamID, true /*ended; remove it*/); cs != nil {
cs.cc.writeStreamReset(cs.ID, se.Code, err)
if se.Cause == nil {
se.Cause = cc.fr.errDetail
......@@ -7484,7 +7856,7 @@ func (rl *http2clientConnReadLoop) run() error {
}
gotSettings = true
}
maybeIdle := false
maybeIdle := false // whether frame might transition us to idle
switch f := f.(type) {
case *http2MetaHeadersFrame:
......@@ -7527,12 +7899,17 @@ func (rl *http2clientConnReadLoop) processHeaders(f *http2MetaHeadersFrame) erro
cc := rl.cc
cs := cc.streamByID(f.StreamID, f.StreamEnded())
if cs == nil {
// We'd get here if we canceled a request while the
// server had its response still in flight. So if this
// was just something we canceled, ignore it.
return nil
}
if !cs.firstByte {
if cs.trace != nil {
// TODO(bradfitz): move first response byte earlier,
// when we first read the 9 byte header, not waiting
// until all the HEADERS+CONTINUATION frames have been
// merged. This works for now.
http2traceFirstResponseByte(cs.trace)
}
cs.firstByte = true
......@@ -7548,13 +7925,13 @@ func (rl *http2clientConnReadLoop) processHeaders(f *http2MetaHeadersFrame) erro
if _, ok := err.(http2ConnectionError); ok {
return err
}
// Any other error type is a stream error.
cs.cc.writeStreamReset(f.StreamID, http2ErrCodeProtocol, err)
cs.resc <- http2resAndError{err: err}
return nil
return nil // return nil from process* funcs to keep conn alive
}
if res == nil {
// (nil, nil) special case. See handleResponse docs.
return nil
}
if res.Body != http2noBody {
......@@ -7589,9 +7966,9 @@ func (rl *http2clientConnReadLoop) handleResponse(cs *http2clientStream, f *http
if statusCode == 100 {
http2traceGot100Continue(cs.trace)
if cs.on100 != nil {
cs.on100()
cs.on100() // forces any write delay timer to fire
}
cs.pastHeaders = false
cs.pastHeaders = false // do it all again
return nil, nil
}
......@@ -7627,10 +8004,12 @@ func (rl *http2clientConnReadLoop) handleResponse(cs *http2clientStream, f *http
if clen64, err := strconv.ParseInt(clens[0], 10, 64); err == nil {
res.ContentLength = clen64
} else {
// TODO: care? unlike http/1, it won't mess up our framing, so it's
// more safe smuggling-wise to ignore.
}
} else if len(clens) > 1 {
// TODO: care? unlike http/1, it won't mess up our framing, so it's
// more safe smuggling-wise to ignore.
}
}
......@@ -7656,16 +8035,18 @@ func (rl *http2clientConnReadLoop) handleResponse(cs *http2clientStream, f *http
func (rl *http2clientConnReadLoop) processTrailers(cs *http2clientStream, f *http2MetaHeadersFrame) error {
if cs.pastTrailers {
// Too many HEADERS frames for this stream.
return http2ConnectionError(http2ErrCodeProtocol)
}
cs.pastTrailers = true
if !f.StreamEnded() {
// We expect that any headers for trailers also
// has END_STREAM.
return http2ConnectionError(http2ErrCodeProtocol)
}
if len(f.PseudoFields()) > 0 {
// No pseudo header fields are defined for trailers.
// TODO: ConnectionError might be overly harsh? Check.
return http2ConnectionError(http2ErrCodeProtocol)
}
......@@ -7713,7 +8094,7 @@ func (b http2transportResponseBody) Read(p []byte) (n int, err error) {
}
}
if n == 0 {
// No flow control tokens to send back.
return
}
......@@ -7721,13 +8102,15 @@ func (b http2transportResponseBody) Read(p []byte) (n int, err error) {
defer cc.mu.Unlock()
var connAdd, streamAdd int32
// Check the conn-level first, before the stream-level.
if v := cc.inflow.available(); v < http2transportDefaultConnFlow/2 {
connAdd = http2transportDefaultConnFlow - v
cc.inflow.add(connAdd)
}
if err == nil {
if err == nil { // No need to refresh if the stream is over or failed.
// Consider any buffered body data (read from the conn but not
// consumed by the client) when computing flow control for this
// stream.
v := int(cs.inflow.available()) + cs.bufPipe.Len()
if v < http2transportDefaultStreamFlow-http2transportDefaultStreamMinRefresh {
streamAdd = int32(http2transportDefaultStreamFlow - v)
......@@ -7764,7 +8147,7 @@ func (b http2transportResponseBody) Close() error {
cc.fr.WriteRSTStream(cs.ID, http2ErrCodeCancel)
cs.didReset = true
}
// Return connection-level flow control.
if unread > 0 {
cc.inflow.add(int32(unread))
cc.fr.WriteWindowUpdate(0, uint32(unread))
......@@ -7787,11 +8170,16 @@ func (rl *http2clientConnReadLoop) processData(f *http2DataFrame) error {
neverSent := cc.nextStreamID
cc.mu.Unlock()
if f.StreamID >= neverSent {
// We never asked for this.
cc.logf("http2: Transport received unsolicited DATA frame; closing connection")
return http2ConnectionError(http2ErrCodeProtocol)
}
// We probably did ask for this, but canceled. Just ignore it.
// TODO: be stricter here? only silently ignore things which
// we canceled, but not things which were closed normally
// by the peer? Tough without accumulating too much state.
// But at least return their flow control:
if f.Length > 0 {
cc.mu.Lock()
cc.inflow.add(int32(f.Length))
......@@ -7805,7 +8193,7 @@ func (rl *http2clientConnReadLoop) processData(f *http2DataFrame) error {
return nil
}
if f.Length > 0 {
// Check connection-level flow control.
cc.mu.Lock()
if cs.inflow.available() >= int32(f.Length) {
cs.inflow.take(int32(f.Length))
......@@ -7813,7 +8201,8 @@ func (rl *http2clientConnReadLoop) processData(f *http2DataFrame) error {
cc.mu.Unlock()
return http2ConnectionError(http2ErrCodeFlowControl)
}
// Return any padded flow control now, since we won't
// refund it later on body reads.
if pad := int32(f.Length) - int32(len(data)); pad > 0 {
cs.inflow.add(pad)
cc.inflow.add(pad)
......@@ -7843,7 +8232,8 @@ func (rl *http2clientConnReadLoop) processData(f *http2DataFrame) error {
var http2errInvalidTrailers = errors.New("http2: invalid trailers")
func (rl *http2clientConnReadLoop) endStream(cs *http2clientStream) {
// TODO: check that any declared content-length matches, like
// server.go's (*stream).endStream method.
rl.endStreamError(cs, nil)
}
......@@ -7879,7 +8269,7 @@ func (rl *http2clientConnReadLoop) processGoAway(f *http2GoAwayFrame) error {
cc := rl.cc
cc.t.connPool().MarkDead(cc)
if f.ErrCode != 0 {
// TODO: deal with GOAWAY more. particularly the error code
cc.vlogf("transport got GOAWAY with error code = %v", f.ErrCode)
}
cc.setGoAway(f)
......@@ -7906,11 +8296,17 @@ func (rl *http2clientConnReadLoop) processSettings(f *http2SettingsFrame) error
case http2SettingMaxConcurrentStreams:
cc.maxConcurrentStreams = s.Val
case http2SettingInitialWindowSize:
// Values above the maximum flow-control
// window size of 2^31-1 MUST be treated as a
// connection error (Section 5.4.1) of type
// FLOW_CONTROL_ERROR.
if s.Val > math.MaxInt32 {
return http2ConnectionError(http2ErrCodeFlowControl)
}
// Adjust flow control of currently-open
// frames by the difference of the old initial
// window size and this one.
delta := int32(s.Val) - int32(cc.initialWindowSize)
for _, cs := range cc.streams {
cs.flow.add(delta)
......@@ -7919,7 +8315,7 @@ func (rl *http2clientConnReadLoop) processSettings(f *http2SettingsFrame) error
cc.initialWindowSize = s.Val
default:
// TODO(bradfitz): handle more settings? SETTINGS_HEADER_TABLE_SIZE probably.
cc.vlogf("Unhandled Setting: %v", s)
}
return nil
......@@ -7960,18 +8356,21 @@ func (rl *http2clientConnReadLoop) processWindowUpdate(f *http2WindowUpdateFrame
func (rl *http2clientConnReadLoop) processResetStream(f *http2RSTStreamFrame) error {
cs := rl.cc.streamByID(f.StreamID, true)
if cs == nil {
// TODO: return error if server tries to RST_STEAM an idle stream
return nil
}
select {
case <-cs.peerReset:
// Already reset.
// This is the only goroutine
// which closes this, so there
// isn't a race.
default:
err := http2streamError(cs.ID, f.ErrCode)
cs.resetErr = err
close(cs.peerReset)
cs.bufPipe.CloseWithError(err)
cs.cc.cond.Broadcast()
cs.cc.cond.Broadcast() // wake up checkResetOrDone via clientStream.awaitFlowControl
}
delete(rl.activeRes, cs.ID)
return nil
......@@ -7988,7 +8387,7 @@ func (cc *http2ClientConn) ping(ctx http2contextContext) error {
return err
}
cc.mu.Lock()
// check for dup before insert
if _, found := cc.pings[p]; !found {
cc.pings[p] = c
cc.mu.Unlock()
......@@ -8012,7 +8411,7 @@ func (cc *http2ClientConn) ping(ctx http2contextContext) error {
case <-ctx.Done():
return ctx.Err()
case <-cc.readerDone:
// connection closed
return cc.readerErr
}
}
......@@ -8022,7 +8421,7 @@ func (rl *http2clientConnReadLoop) processPing(f *http2PingFrame) error {
cc := rl.cc
cc.mu.Lock()
defer cc.mu.Unlock()
// If ack, notify listener if any
if c, ok := cc.pings[f.Data]; ok {
close(c)
delete(cc.pings, f.Data)
......@@ -8039,12 +8438,21 @@ func (rl *http2clientConnReadLoop) processPing(f *http2PingFrame) error {
}
func (rl *http2clientConnReadLoop) processPushPromise(f *http2PushPromiseFrame) error {
// We told the peer we don't want them.
// Spec says:
// "PUSH_PROMISE MUST NOT be sent if the SETTINGS_ENABLE_PUSH
// setting of the peer endpoint is set to 0. An endpoint that
// has set this setting and has received acknowledgement MUST
// treat the receipt of a PUSH_PROMISE frame as a connection
// error (Section 5.4.1) of type PROTOCOL_ERROR."
return http2ConnectionError(http2ErrCodeProtocol)
}
func (cc *http2ClientConn) writeStreamReset(streamID uint32, code http2ErrCode, err error) {
// TODO: map err to more interesting error codes, once the
// HTTP community comes up with some. But currently for
// RST_STREAM there's no equivalent to GOAWAY frame's debug
// data, and the error codes are all pretty vague ("cancel").
cc.wmu.Lock()
cc.fr.WriteRSTStream(streamID, code)
cc.bw.Flush()
......@@ -8173,7 +8581,8 @@ func (s http2bodyWriterState) cancel() {
func (s http2bodyWriterState) on100() {
if s.timer == nil {
// If we didn't do a delayed write, ignore the server's
// bogus 100 continue response.
return
}
s.timer.Stop()
......@@ -8185,7 +8594,9 @@ func (s http2bodyWriterState) on100() {
// called until after the headers have been written.
func (s http2bodyWriterState) scheduleBodyWrite() {
if s.timer == nil {
// We're not doing a delayed write (see
// getBodyWriterState), so just start the writing
// goroutine immediately.
go s.fn()
return
}
......@@ -8240,7 +8651,9 @@ func http2writeEndsStream(w http2writeFramer) bool {
case *http2writeResHeaders:
return v.endStream
case nil:
// This can only happen if the caller reuses w after it's
// been intentionally nil'ed out to prevent use. Keep this
// here to catch future refactoring breaking it.
panic("writeEndsStream called on nil writeFramer")
}
return false
......@@ -8274,7 +8687,7 @@ type http2writeGoAway struct {
func (p *http2writeGoAway) writeFrame(ctx http2writeContext) error {
err := ctx.Framer().WriteGoAway(p.maxStreamID, p.code, nil)
if p.code != 0 {
ctx.Flush()
ctx.Flush() // ignore error: we're hanging up on them anyway
time.Sleep(50 * time.Millisecond)
ctx.CloseConn()
}
......@@ -8386,7 +8799,13 @@ func http2encKV(enc *hpack.Encoder, k, v string) {
}
func (w *http2writeResHeaders) staysWithinBuffer(max int) bool {
// TODO: this is a common one. It'd be nice to return true
// here and get into the fast path if we could be clever and
// calculate the size fast enough, or at least a conservative
// uppper bound that usually fires. (Maybe if w.h and
// w.trailers are nil, so we don't need to enumerate it.)
// Otherwise I'm afraid that just calculating the length to
// answer this question would be slower than the ~2µs benefit.
return false
}
......@@ -8445,7 +8864,7 @@ type http2writePushPromise struct {
}
func (w *http2writePushPromise) staysWithinBuffer(max int) bool {
// TODO: see writeResHeaders.staysWithinBuffer
return false
}
......@@ -8497,7 +8916,7 @@ func (w http2write100ContinueHeadersFrame) writeFrame(ctx http2writeContext) err
}
func (w http2write100ContinueHeadersFrame) staysWithinBuffer(max int) bool {
// Sloppy but conservative:
return 9+2*(len(":status")+len("100")) <= max
}
......@@ -8517,7 +8936,9 @@ func (wu http2writeWindowUpdate) writeFrame(ctx http2writeContext) error {
func http2encodeHeaders(enc *hpack.Encoder, h Header, keys []string) {
if keys == nil {
sorter := http2sorterPool.Get().(*http2sorter)
// Using defer here, since the returned keys from the
// sorter.Keys method is only valid until the sorter
// is returned:
defer http2sorterPool.Put(sorter)
keys = sorter.Keys(h)
}
......@@ -8525,16 +8946,19 @@ func http2encodeHeaders(enc *hpack.Encoder, h Header, keys []string) {
vv := h[k]
k = http2lowerHeader(k)
if !http2validWireHeaderFieldName(k) {
// Skip it as backup paranoia. Per
// golang.org/issue/14048, these should
// already be rejected at a higher level.
continue
}
isTE := k == "transfer-encoding"
for _, v := range vv {
if !httplex.ValidHeaderFieldValue(v) {
// TODO: return an error? golang.org/issue/14048
// For now just omit it.
continue
}
// TODO: more of "8.1.2.2 Connection-Specific Header Fields"
if isTE && v != "trailers" {
continue
}
......@@ -8602,7 +9026,10 @@ type http2FrameWriteRequest struct {
func (wr http2FrameWriteRequest) StreamID() uint32 {
if wr.stream == nil {
if se, ok := wr.write.(http2StreamError); ok {
// (*serverConn).resetStream doesn't set
// stream because it doesn't necessarily have
// one. So special case this type of write
// message.
return se.StreamID
}
return 0
......@@ -8632,11 +9059,13 @@ func (wr http2FrameWriteRequest) DataSize() int {
func (wr http2FrameWriteRequest) Consume(n int32) (http2FrameWriteRequest, http2FrameWriteRequest, int) {
var empty http2FrameWriteRequest
// Non-DATA frames are always consumed whole.
wd, ok := wr.write.(*http2writeData)
if !ok || len(wd.p) == 0 {
return wr, empty, 1
}
// Might need to split after applying limits.
allowed := wr.stream.flow.available()
if n < allowed {
allowed = n
......@@ -8654,10 +9083,13 @@ func (wr http2FrameWriteRequest) Consume(n int32) (http2FrameWriteRequest, http2
write: &http2writeData{
streamID: wd.streamID,
p: wd.p[:allowed],
// Even if the original had endStream set, there
// are bytes remaining because len(wd.p) > allowed,
// so we know endStream is false.
endStream: false,
},
// Our caller is blocking on the final DATA frame, not
// this intermediate frame, so no need to wait.
done: nil,
}
rest := http2FrameWriteRequest{
......@@ -8672,6 +9104,8 @@ func (wr http2FrameWriteRequest) Consume(n int32) (http2FrameWriteRequest, http2
return consumed, rest, 2
}
// The frame is consumed whole.
// NB: This cast cannot overflow because allowed is <= math.MaxInt32.
wr.stream.flow.take(int32(len(wd.p)))
return wr, empty, 1
}
......@@ -8698,7 +9132,7 @@ func (wr *http2FrameWriteRequest) replyToWriter(err error) {
default:
panic(fmt.Sprintf("unbuffered done channel passed in for type %T", wr.write))
}
wr.write = nil
wr.write = nil // prevent use (assume it's tainted after wr.done send)
}
// writeQueue is used by implementations of WriteScheduler.
......@@ -8717,7 +9151,7 @@ func (q *http2writeQueue) shift() http2FrameWriteRequest {
panic("invalid use of queue")
}
wr := q.s[0]
// TODO: less copy-happy queue.
copy(q.s, q.s[1:])
q.s[len(q.s)-1] = http2FrameWriteRequest{}
q.s = q.s[:len(q.s)-1]
......@@ -8746,6 +9180,8 @@ func (q *http2writeQueue) consume(n int32) (http2FrameWriteRequest, bool) {
type http2writeQueuePool []*http2writeQueue
// put inserts an unused writeQueue into the pool.
// put inserts an unused writeQueue into the pool.
func (p *http2writeQueuePool) put(q *http2writeQueue) {
for i := range q.s {
......@@ -8769,7 +9205,7 @@ func (p *http2writeQueuePool) get() *http2writeQueue {
}
// RFC 7540, Section 5.3.5: the default weight is 16.
const http2priorityDefaultWeight = 15 // 16 = 15 + 1
const http2priorityDefaultWeight = 15
// PriorityWriteSchedulerConfig configures a priorityWriteScheduler.
type http2PriorityWriteSchedulerConfig struct {
......@@ -8815,7 +9251,8 @@ type http2PriorityWriteSchedulerConfig struct {
// If cfg is nil, default options are used.
func http2NewPriorityWriteScheduler(cfg *http2PriorityWriteSchedulerConfig) http2WriteScheduler {
if cfg == nil {
// For justification of these defaults, see:
// https://docs.google.com/document/d/1oLhNg1skaWD4_DtaoCxdSRN5erEXrH-KnLrMwEpOtFY
cfg = &http2PriorityWriteSchedulerConfig{
MaxClosedNodesInTree: 10,
MaxIdleNodesInTree: 10,
......@@ -8870,7 +9307,7 @@ func (n *http2priorityNode) setParent(parent *http2priorityNode) {
if n.parent == parent {
return
}
// Unlink from current parent.
if parent := n.parent; parent != nil {
if n.prev == nil {
parent.kids = n.next
......@@ -8881,7 +9318,9 @@ func (n *http2priorityNode) setParent(parent *http2priorityNode) {
n.next.prev = n.prev
}
}
// Link to new parent.
// If parent=nil, remove n from the tree.
// Always insert at the head of parent.kids (this is assumed by walkReadyInOrder).
n.parent = parent
if parent == nil {
n.next = nil
......@@ -8917,10 +9356,15 @@ func (n *http2priorityNode) walkReadyInOrder(openParent bool, tmp *[]*http2prior
return false
}
// Don't consider the root "open" when updating openParent since
// we can't send data frames on the root stream (only control frames).
if n.id != 0 {
openParent = openParent || (n.state == http2priorityNodeOpen)
}
// Common case: only one kid or all kids have the same weight.
// Some clients don't use weights; other clients (like web browsers)
// use mostly-linear priority trees.
w := n.kids.weight
needSort := false
for k := n.kids.next; k != nil; k = k.next {
......@@ -8938,6 +9382,8 @@ func (n *http2priorityNode) walkReadyInOrder(openParent bool, tmp *[]*http2prior
return false
}
// Uncommon case: sort the child nodes. We remove the kids from the parent,
// then re-insert after sorting so we can reuse tmp for future sort calls.
*tmp = (*tmp)[:0]
for n.kids != nil {
*tmp = append(*tmp, n.kids)
......@@ -8945,7 +9391,7 @@ func (n *http2priorityNode) walkReadyInOrder(openParent bool, tmp *[]*http2prior
}
sort.Sort(http2sortPriorityNodeSiblings(*tmp))
for i := len(*tmp) - 1; i >= 0; i-- {
(*tmp)[i].setParent(n)
(*tmp)[i].setParent(n) // setParent inserts at the head of n.kids
}
for k := n.kids; k != nil; k = k.next {
if k.walkReadyInOrder(openParent, tmp, f) {
......@@ -8962,7 +9408,8 @@ func (z http2sortPriorityNodeSiblings) Len() int { return len(z) }
func (z http2sortPriorityNodeSiblings) Swap(i, k int) { z[i], z[k] = z[k], z[i] }
func (z http2sortPriorityNodeSiblings) Less(i, k int) bool {
// Prefer the subtree that has sent fewer bytes relative to its weight.
// See sections 5.3.2 and 5.3.4.
wi, bi := float64(z[i].weight+1), float64(z[i].subtreeBytes)
wk, bk := float64(z[k].weight+1), float64(z[k].subtreeBytes)
if bi == 0 && bk == 0 {
......@@ -9004,7 +9451,7 @@ type http2priorityWriteScheduler struct {
}
func (ws *http2priorityWriteScheduler) OpenStream(streamID uint32, options http2OpenStreamOptions) {
// The stream may be currently idle but cannot be opened or closed.
if curr := ws.nodes[streamID]; curr != nil {
if curr.state != http2priorityNodeIdle {
panic(fmt.Sprintf("stream %d already opened", streamID))
......@@ -9013,6 +9460,10 @@ func (ws *http2priorityWriteScheduler) OpenStream(streamID uint32, options http2
return
}
// RFC 7540, Section 5.3.5:
// "All streams are initially assigned a non-exclusive dependency on stream 0x0.
// Pushed streams initially depend on their associated stream. In both cases,
// streams are assigned a default weight of 16."
parent := ws.nodes[options.PusherID]
if parent == nil {
parent = &ws.root
......@@ -9060,6 +9511,9 @@ func (ws *http2priorityWriteScheduler) AdjustStream(streamID uint32, priority ht
panic("adjustPriority on root")
}
// If streamID does not exist, there are two cases:
// - A closed stream that has been removed (this will have ID <= maxID)
// - An idle stream that is being used for "grouping" (this will have ID > maxID)
n := ws.nodes[streamID]
if n == nil {
if streamID <= ws.maxID || ws.maxIdleNodesInTree == 0 {
......@@ -9077,6 +9531,8 @@ func (ws *http2priorityWriteScheduler) AdjustStream(streamID uint32, priority ht
ws.addClosedOrIdleNode(&ws.idleNodes, ws.maxIdleNodesInTree, n)
}
// Section 5.3.1: A dependency on a stream that is not currently in the tree
// results in that stream being given a default priority (Section 5.3.5).
parent := ws.nodes[priority.StreamDep]
if parent == nil {
n.setParent(&ws.root)
......@@ -9084,10 +9540,18 @@ func (ws *http2priorityWriteScheduler) AdjustStream(streamID uint32, priority ht
return
}
// Ignore if the client tries to make a node its own parent.
if n == parent {
return
}
// Section 5.3.3:
// "If a stream is made dependent on one of its own dependencies, the
// formerly dependent stream is first moved to be dependent on the
// reprioritized stream's previous parent. The moved dependency retains
// its weight."
//
// That is: if parent depends on n, move parent to depend on n.parent.
for x := parent.parent; x != nil; x = x.parent {
if x == n {
parent.setParent(n.parent)
......@@ -9095,6 +9559,9 @@ func (ws *http2priorityWriteScheduler) AdjustStream(streamID uint32, priority ht
}
}
// Section 5.3.3: The exclusive flag causes the stream to become the sole
// dependency of its parent stream, causing other dependencies to become
// dependent on the exclusive stream.
if priority.Exclusive {
k := parent.kids
for k != nil {
......@@ -9117,7 +9584,11 @@ func (ws *http2priorityWriteScheduler) Push(wr http2FrameWriteRequest) {
} else {
n = ws.nodes[id]
if n == nil {
// id is an idle or closed stream. wr should not be a HEADERS or
// DATA frame. However, wr can be a RST_STREAM. In this case, we
// push wr onto the root, rather than creating a new priorityNode,
// since RST_STREAM is tiny and the stream's priority is unknown
// anyway. See issue #17919.
if wr.DataSize() > 0 {
panic("add DATA on non-open stream")
}
......@@ -9138,7 +9609,9 @@ func (ws *http2priorityWriteScheduler) Pop() (wr http2FrameWriteRequest, ok bool
return false
}
n.addBytes(int64(wr.DataSize()))
// If B depends on A and B continuously has data available but A
// does not, gradually increase the throttling limit to allow B to
// steal more and more bandwidth from A.
if openParent {
ws.writeThrottleLimit += 1024
if ws.writeThrottleLimit < 0 {
......@@ -9157,7 +9630,7 @@ func (ws *http2priorityWriteScheduler) addClosedOrIdleNode(list *[]*http2priorit
return
}
if len(*list) == maxSize {
// Remove the oldest node, then shift left.
ws.removeNode((*list)[0])
x := (*list)[1:]
copy(*list, x)
......@@ -9195,7 +9668,7 @@ type http2randomWriteScheduler struct {
}
func (ws *http2randomWriteScheduler) OpenStream(streamID uint32, options http2OpenStreamOptions) {
// no-op: idle streams are not tracked
}
func (ws *http2randomWriteScheduler) CloseStream(streamID uint32) {
......@@ -9208,7 +9681,7 @@ func (ws *http2randomWriteScheduler) CloseStream(streamID uint32) {
}
func (ws *http2randomWriteScheduler) AdjustStream(streamID uint32, priority http2PriorityParam) {
// no-op: priorities are ignored
}
func (ws *http2randomWriteScheduler) Push(wr http2FrameWriteRequest) {
......@@ -9226,11 +9699,11 @@ func (ws *http2randomWriteScheduler) Push(wr http2FrameWriteRequest) {
}
func (ws *http2randomWriteScheduler) Pop() (http2FrameWriteRequest, bool) {
// Control frames first.
if !ws.zero.empty() {
return ws.zero.shift(), true
}
// Iterate over all non-idle streams until finding one that can be consumed.
for _, q := range ws.sq {
if wr, ok := q.consume(math.MaxInt32); ok {
return wr, true
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment