From the public account: New World grocery store

preface

After the HTTP request in Go — HTTP1.1 request flow analysis, the middle intermittent, lasted nearly a month, finally dare to start the code word to write this article.

Read the advice

HTTP2.0 is basically the same as HTTP1.1 in establishing TCP connections and secure TLS transport channels. Therefore, I recommend that you take a look at the HTTP request in Go – HTTP1.1 Request Flow Analysis article, this article will be based on the previous article and only introduce the logic related to HTTP2.0.

(*Transport).roundTrip

Transport (*). RoundTrip method will be called t.n extProtoOnce. Do (t.o nceSetNextProtoDefaults) initialization TLSClientConfig and h2transport, Both of these are closely related to HTTP2.0.

TLSClientConfig: Initializes the HTTP protocol supported by the client and notifies the server of TLS handshakes.

H2transport: If the request is http2, H2Transport takes over the connection, request and response processing logic.

Here’s the source code:

func (t *Transport) onceSetNextProtoDefaults(a) {
	/ /... Omit the code here...
	t2, err := http2configureTransport(t)
	iferr ! =nil {
		log.Printf("Error enabling Transport HTTP/2 support: %v", err)
		return
	}
	t.h2transport = t2

	/ /... Omit the code here...
}
func http2configureTransport(t1 *Transport) (*http2Transport, error) {
	connPool := new(http2clientConnPool)
	t2 := &http2Transport{
		ConnPool: http2noDialClientConnPool{connPool},
		t1:       t1,
	}
	connPool.t = t2
	iferr := http2registerHTTPSProtocol(t1, http2noDialH2RoundTripper{t2}); err ! =nil {
		return nil, err
	}
	if t1.TLSClientConfig == nil {
		t1.TLSClientConfig = new(tls.Config)
	}
	if! http2strSliceContains(t1.TLSClientConfig.NextProtos,"h2") {
		t1.TLSClientConfig.NextProtos = append([]string{"h2"}, t1.TLSClientConfig.NextProtos...)
	}
	if! http2strSliceContains(t1.TLSClientConfig.NextProtos,"HTTP / 1.1") {
		t1.TLSClientConfig.NextProtos = append(t1.TLSClientConfig.NextProtos, "HTTP / 1.1")
	}
	upgradeFn := func(authority string, c *tls.Conn) RoundTripper {
		addr := http2authorityAddr("https", authority)
		ifused, err := connPool.addConnIfNeeded(addr, t2, c); err ! =nil {
			go c.Close()
			return http2erringRoundTripper{err}
		} else if! used {// Turns out we don't need this c.
			// For example, two goroutines made requests to the same host
			// at the same time, both kicking off TCP dials. (since protocol
			// was unknown)
			go c.Close()
		}
		return t2
	}
	if m := t1.TLSNextProto; len(m) == 0 {
		t1.TLSNextProto = map[string]func(string, *tls.Conn) RoundTripper{
			"h2": upgradeFn,
		}
	} else {
		m["h2"] = upgradeFn
	}
	return t2, nil
}
Copy the code

The author will simply disassemble the above source code into the following steps:

  1. Create a newhttp2clientConnPoolAnd copy it to T2, and later HTTP2 requests will preferentially get connections from the pool.
  2. Initialize theTLSClientConfigAnd will supporth2andhttp1.1Protocol added toTLSClientConfig.NextProtosIn the.
  3. To define ah2theupgradeFnStored in thet1.TLSNextProtoIn the water.

Since the previous article covered the steps before creating a connection in more detail, here’s the source code for setting up a connection with the server: (*Transport).dialconn method:

func (t *Transport) dialConn(ctx context.Context, cm connectMethod) (pconn *persistConn, err error) {
	/ /... Omit the code here...
	if cm.scheme() == "https" && t.hasCustomTLSDialer() {
		/ /... Omit the code here...
	} else {
		conn, err := t.dial(ctx, "tcp", cm.addr())
		iferr ! =nil {
			return nil, wrapErr(err)
		}
		pconn.conn = conn
		if cm.scheme() == "https" {
			var firstTLSHost string
			iffirstTLSHost, _, err = net.SplitHostPort(cm.addr()); err ! =nil {
				return nil, wrapErr(err)
			}
			iferr = pconn.addTLS(firstTLSHost, trace); err ! =nil {
				return nil, wrapErr(err)
			}
		}
	}

	// Proxy setup.
	/ /... Omit the code here...

	ifs := pconn.tlsState; s ! =nil&& s.NegotiatedProtocolIsMutual && s.NegotiatedProtocol ! ="" {
		if next, ok := t.TLSNextProto[s.NegotiatedProtocol]; ok {
			return &persistConn{t: t, cacheKey: pconn.cacheKey, alt: next(cm.targetAddr, pconn.conn.(*tls.Conn))}, nil}}/ /... Omit the code here...
}
Copy the code

The author describes the above source code as follows:

  1. callt.dial(ctx, "tcp", cm.addr())Create a TCP connection.
  2. If the request is HTTPS, a secure TLS transport channel is established for the request.
  3. Check the TLS handshake status if negotiated with the serverNegotiatedProtocolThe protocol is not empty and the client’st.TLSNextProtoWith this protocol, return a persistent connection whose Alt is not null (HTTP1.1 does not enter the if condition).

The author of the above third point to expand. It has been verified by the author in the local debug, when both the client and server support http2 s.N egotiatedProtocol value of h2 and s.N egotiatedProtocolIsMutual value is true.

When we analyzed the http2configureTransport function above, we knew that TLSNextProto registered a function with key h2, so a call to next is actually a call to the previous upgradeFn function.

UpgradeFn will call connPool.addConnifneeded to add a TLS transport channel to http2’s connection pool and eventually return t2, http2Transport, which was created earlier.

func (p *http2clientConnPool) addConnIfNeeded(key string, t *http2Transport, c *tls.Conn) (used bool, err error) {
	p.mu.Lock()
	/ /... Omit the code here...
	// This is used to determine whether it is necessary to add new connections to the connection pool
	// Check whether the connection pool already has a connection to host. If so, return the connection directly
	call, dup := p.addConnCalls[key]
	if! dup {/ /... Omit the code here...
		call = &http2addConnCall{
			p:    p,
			done: make(chan struct{}),
		}
		p.addConnCalls[key] = call
		go call.run(t, key, c)
	}
	p.mu.Unlock()

	<-call.done
	ifcall.err ! =nil {
		return false, call.err
	}
	return! dup,nil
}
func (c *http2addConnCall) run(t *http2Transport, key string, tc *tls.Conn) {
	cc, err := t.NewClientConn(tc)

	p := c.p
	p.mu.Lock()
	iferr ! =nil {
		c.err = err
	} else {
		p.addConnLocked(key, cc)
	}
	delete(p.addConnCalls, key)
	p.mu.Unlock()
	close(c.done)
}

Copy the code

Analyzing the above source code we can get two conclusions:

  1. afterupgradeFnAfter that, (*Transport).dialConn returns a persistent connection in which the Alt field is not nil.
  2. t.NewClientConn(tc)New connections are stored in the HTTP2 connection pool, i.ehttp2clientConnPool, the next summary expands the analysis of NewClientConn.

Finally we go back to the (*Transport).roundtrip method and analyze the key source code:

func (t *Transport) roundTrip(req *Request) (*Response, error) {
	t.nextProtoOnce.Do(t.onceSetNextProtoDefaults)
	/ /... Omit the code here...
	for {
		select {
		case <-ctx.Done():
			req.closeBody()
			return nil, ctx.Err()
		default:}/ /... Omit the code here...
		pconn, err := t.getConn(treq, cm)
		iferr ! =nil {
			t.setReqCanceler(req, nil)
			req.closeBody()
			return nil, err
		}

		var resp *Response
		ifpconn.alt ! =nil {
			// HTTP/2 path.
			t.setReqCanceler(req, nil) // not cancelable with CancelRequest
			resp, err = pconn.alt.RoundTrip(req)
		} else {
			resp, err = pconn.roundTrip(treq)
		}
		if err == nil {
			return resp, nil
		}

		/ /... Omit the code here...}}Copy the code

In combination with the previous analysis, pconn. Alt is non-nil when both the server and client support http2 protocol. Therefore, http2 requests will go to the pconn.alt.roundtrip (REQ) branch, which means the http2 request flow will be taken over by http2Transport.

(*http2Transport).NewClientConn

NewClientConn is called internally by T.newClientConn (c, T.isableKeepalives ()).

Because this section contains more content, so I no longer post the source code at one time, but according to the key steps of analysis and block post source code.

Initialize an http2ClientConn:

cc := &http2ClientConn{
	t:                     t,
	tconn:                 c,
	readerDone:            make(chan struct{}),
	nextStreamID:          1,
	maxFrameSize:          16 << 10.// spec default
	initialWindowSize:     65535.// spec default
	maxConcurrentStreams:  1000.// "infinite", per spec. 1000 seems good enough.
	peerMaxHeaderListSize: 0xffffffffffffffff.// "infinite", per spec. Use 2^64-1 instead.
	streams:               make(map[uint32]*http2clientStream),
	singleUse:             singleUse,
	wantSettingsAck:       true,
	pings:                 make(map[[8]byte]chan struct{})},Copy the code

The above source code creates a default http2ClientConn.

InitialWindowSize: Initializes the window size to 65535, which then initializes the window size that each data stream can send.

MaxConcurrentStreams: Indicates the maximum number of streams allowed to transmit data simultaneously on each connection.

Streams: streams of data on the current connection.

SingleUse: Controls whether http2 connections allow multiple streams to be shared. The value is controlled by t.isableKeepalives ().

Create a conditional lock and create a new Writer&Reader.

cc.cond = sync.NewCond(&cc.mu)
cc.flow.add(int32(http2initialWindowSize))
cc.bw = bufio.NewWriter(http2stickyErrWriter{c, &cc.werr})
cc.br = bufio.NewReader(c)
Copy the code

Add (int32(http2initialWindowSize));

Cc.flow. add sets the writable flow control window size for the current connection to http2initialWindowSize, which is 65535.

Create a Framer that reads and writes data frames.

cc.fr = http2NewFramer(cc.bw, cc.br)
cc.fr.ReadMetaHeaders = hpack.NewDecoder(http2initialHeaderTableSize, nil)
cc.fr.MaxHeaderListSize = t.maxHeaderListSize()
Copy the code

4, send the opening line to the server, and send some initialization data frames.

initialSettings := []http2Setting{
	{ID: http2SettingEnablePush, Val: 0},
	{ID: http2SettingInitialWindowSize, Val: http2transportDefaultStreamFlow},
}
ifmax := t.maxHeaderListSize(); max ! =0 {
	initialSettings = append(initialSettings, http2Setting{ID: http2SettingMaxHeaderListSize, Val: max})
}

cc.bw.Write(http2clientPreface)
cc.fr.WriteSettings(initialSettings...)
cc.fr.WriteWindowUpdate(0, http2transportDefaultConnFlow)
cc.inflow.add(http2transportDefaultConnFlow + http2initialWindowSize)
cc.bw.Flush()
Copy the code

The opening message sent by the client to the server is as follows:

const (
    // The client first sends a string beginning with PRI to the server.
    http2ClientPreface = HTTP / 2.0 "PRI * \ r \ n \ r \ nSM \ r \ n \ r \ n"
)
var (
	http2clientPreface = []byte(http2ClientPreface)
)
Copy the code

After sending the opening gambit, the client sends the SETTINGS data frame to the server.

Http2SettingEnablePush: Tells the server client whether push is enabled.

Http2SettingInitialWindowSize: tell the server client acceptable data window is the biggest http2transportDefaultStreamFlow (4 m).

After sending the SETTINGS data frames, send WINDOW_UPDATE data frames, because the first parameter to 0 streamID is 0, namely is to inform server maximum data window for this connection acceptable http2transportDefaultConnFlow (1 g).

After sending the WINDOW_UPDATE data frames, the client can read flow control window size is set to http2transportDefaultConnFlow + http2initialWindowSize.

5. Open the read loop and return

go cc.readLoop()
Copy the code

(*http2Transport).RoundTrip

(*http2Transport).roundTrip is just an entry function that calls the (*http2Transport).roundtripopt method.

(*http2Transport). RoundTripOpt has two key steps:

t.connPool().GetClientConn(req, addr): In http2 connection pool to obtain an available connection, including the type of connection pool for http2noDialClientConnPool, reference http2configureTransport function.

Cc.roundtrip (REQ): Sends a request and returns a response by obtaining the available connection.

(http2noDialClientConnPool).GetClientConn

(http2noDialClientConnPool). According to the results of the actual debug GetClientConn will eventually call (* http2clientConnPool). GetClientConn (the req * Request, Addr string, dialOnMiss bool).

By (http2noDialClientConnPool). GetClientConn get connection to (* http2clientConnPool) GetClientConn method of the third parameter is always false, If this parameter is set to false, the dialing process is not restarted at this stage even if the available connection cannot be obtained normally.

The (*http2clientConnPool).getClientConn iterates through connections at the same address and determines the status of the connections to get a connection that can handle the request.

for _, cc := range p.conns[addr] {
	if st := cc.idleState(); st.canTakeNewRequest {
		if p.shouldTraceGetConn(st) {
			http2traceGetConn(req, addr)
		}
		p.mu.Unlock()
		return cc, nil}}Copy the code

Cc.idlestate () determines whether a connection in the current connection pool can handle a new request:

1. Whether the current connection can be shared by multiple requests. If only a single request is used and there is already a data flow, the current connection cannot process new requests.

if cc.singleUse && cc.nextStreamID > 1 {
	return
}
Copy the code

2, the current connection can handle new requests only if the following items are true:

  • The connection status is normal, that is, not closed and not in the closing state.
  • The current connection is processing less thanmaxConcurrentStreams.
  • Next data stream to be processed + current connection pending request *2 < Math.maxint32.
  • The current connection has not been idle for a long time (mainly throughcc.tooIdleLocked()Judgment).
st.canTakeNewRequest = cc.goAway == nil&&! cc.closed && ! cc.closing && maxConcurrentOkay &&int64(cc.nextStreamID)+2*int64(cc.pendingRequests) < math.MaxInt32 && ! cc.tooIdleLocked()Copy the code

When a connection is successfully obtained from the link pool to handle the request, the data interaction with the server is available, the (*http2ClientConn).RoundTrip process.

(*http2ClientConn).roundTrip

Http2 does not support some of the headers in http1.1. Http2 does not support any of the headers in http1.1.

func http2checkConnHeaders(req *Request) error {
	if v := req.Header.Get("Upgrade"); v ! ="" {
		return fmt.Errorf("http2: invalid Upgrade request header: %q", req.Header["Upgrade"])}if vv := req.Header["Transfer-Encoding"]; len(vv) > 0 && (len(vv) > 1 || vv[0] != "" && vv[0] != "chunked") {
		return fmt.Errorf("http2: invalid Transfer-Encoding request header: %q", vv)
	}
	if vv := req.Header["Connection"]; len(vv) > 0 && (len(vv) > 1 || vv[0] != "" && !strings.EqualFold(vv[0]."close") && !strings.EqualFold(vv[0]."keep-alive")) {
		return fmt.Errorf("http2: invalid Connection request header: %q", vv)
	}
	return nil
}
func http2commaSeparatedTrailers(req *Request) (string, error) {
	keys := make([]string.0.len(req.Trailer))
	for k := range req.Trailer {
		k = CanonicalHeaderKey(k)
		switch k {
		case "Transfer-Encoding"."Trailer"."Content-Length":
			return "", &http2badStringError{"invalid Trailer key", k}
		}
		keys = append(keys, k)
	}
	if len(keys) > 0 {
		sort.Strings(keys)
		return strings.Join(keys, ","), nil
	}
	return "".nil
}
Copy the code

Call (*http2ClientConn).aWaitOpenSlotForRequest and wait until the current connection is processing a smaller stream than maxConcurrentStreams. If this function returns an error, the request fails.

2.1, double check The current connection is available.

ifcc.closed || ! cc.canTakeNewRequestLocked() {ifwaitingForConn ! =nil {
		close(waitingForConn)
	}
	return http2errClientConnUnusable
}
Copy the code

If maxConcurrentStreams is smaller than maxConcurrentStreams, return nil. I believe that most logic goes back here.

if int64(len(cc.streams))+1< =int64(cc.maxConcurrentStreams) {
	ifwaitingForConn ! =nil {
		close(waitingForConn)
	}
	return nil
}
Copy the code

2.3. If the current connection has indeed reached its limit of data flow, the waiting process begins.

if waitingForConn == nil {
	waitingForConn = make(chan struct{})
	go func(a) {
		iferr := http2awaitRequestCancel(req, waitingForConn); err ! =nil {
			cc.mu.Lock()
			waitingForConnErr = err
			cc.cond.Broadcast()
			cc.mu.Unlock()
		}
	}()
}
cc.pendingRequests++
cc.cond.Wait()
cc.pendingRequests--
Copy the code

According to the above logic, there are two situations when the current connection processing data flow reaches the upper limit. One is to wait for the request to be cancelled, and the other is to wait for the request to finish. Repeat steps 2.1, 2.2, and 2.3 if another data flow ends and wakes up the currently waiting request.

3. Call cc.newStream() to create a data stream on the connection (creating a data stream is thread-safe because the source code locks awaitOpenSlotForRequest before calling it and does not release the lock until the requested header is written).

func (cc *http2ClientConn) newStream(a) *http2clientStream {
	cs := &http2clientStream{
		cc:        cc,
		ID:        cc.nextStreamID,
		resc:      make(chan http2resAndError, 1),
		peerReset: make(chan struct{}),
		done:      make(chan struct{}),
	}
	cs.flow.add(int32(cc.initialWindowSize))
	cs.flow.setConnFlow(&cc.flow)
	cs.inflow.add(http2transportDefaultStreamFlow)
	cs.inflow.setConnFlow(&cc.inflow)
	cc.nextStreamID += 2
	cc.streams[cs.ID] = cs
	return cs
}
Copy the code

The author briefly describes the above code as follows:

  • Create a newhttp2clientStream, the data flow ID iscc.nextStreamID, after creating a new data flow,cc.nextStreamID +=2.
  • Data flow overhttp2resAndErrorThe pipe receives the response to the request.
  • Initialize the writable flow control window size of the current data stream tocc.initialWindowSizeAnd saves the writable flow control pointer to the connection.
  • Initialize the readable flow control window size of the current data stream tohttp2transportDefaultStreamFlowAnd saves the readable flow control pointer for the connection.
  • Finally, the new data flow is registered with the current connection.

Calling cc.t.GetBodyWriterState (cs, body) will return an http2bodyWriterState structure. This structure lets you know whether the body request was sent successfully.

func (t *http2Transport) getBodyWriterState(cs *http2clientStream, body io.Reader) (s http2bodyWriterState) {
	s.cs = cs
	if body == nil {
		return
	}
	resc := make(chan error, 1)
	s.resc = resc
	s.fn = func(a) {
		cs.cc.mu.Lock()
		cs.startedWrite = true
		cs.cc.mu.Unlock()
		resc <- cs.writeRequestBody(body, cs.req.Body)
	}
	s.delay = t.expectContinueTimeout()
	if s.delay == 0| |! httpguts.HeaderValuesContainsToken( cs.req.Header["Expect"]."100-continue") {
		return
	}
	// Omit the code here because most requests do not set the 100-continue header
	return
}
Copy the code

S.rider: Marks the date when the current data stream starts writing data, and writes the result of the body request to the S.rider pipeline. (The writeRequestBody will be analyzed in the next post.)

5. Since multiple requests share a connection, a lock is required for writing frames to the connection, such as the request header.

cc.wmu.Lock() endStream := ! hasBody && ! hasTrailers werr := cc.writeHeaders(cs.ID, endStream,int(cc.maxFrameSize), hdrs)
cc.wmu.Unlock()
Copy the code

6. If there is a request for body, the request body is written. If there is no request for body, the response header timeout is set (if there is a request for body, the response header timeout needs to be set after the request body is written).

if hasBody {
	bodyWriter.scheduleBodyWrite()
} else {
	http2traceWroteRequest(cs.trace, nil)
	ifd := cc.responseHeaderTimeout(); d ! =0 {
		timer := time.NewTimer(d)
		defer timer.Stop()
		respHeaderTimer = timer.C
	}
}
Copy the code

The contents of scheduleBodyWrite are as follows:

func (s http2bodyWriterState) scheduleBodyWrite(a) {
	if s.timer == nil {
		// We're not doing a delayed write (see
		// getBodyWriterState), so just start the writing
		// goroutine immediately.
		go s.fn()
		return
	}
	http2traceWait100Continue(s.cs.trace)
	if s.timer.Stop() {
		s.timer.Reset(s.delay)
	}
}
Copy the code

Because the author’s request header does not carry the 100-continue header, s.imer initialized to nil in the previous getBodyWriterState function means that calling scheduleBodyWrite immediately starts sending the request body.

7. Polling the pipeline for response results.

Before looking at the polling source code, let’s look at a simple function:

handleReadLoopResponse := func(re http2resAndError) (*Response, bool, error) {
	res := re.res
	ifre.err ! =nil || res.StatusCode > 299 {
		bodyWriter.cancel()
		cs.abortRequestBodyWrite(http2errStopReqBodyWrite)
	}
	ifre.err ! =nil {
		cc.forgetStreamID(cs.ID)
		return nil, cs.getStartedWrite(), re.err
	}
	res.Request = req
	res.TLS = cc.tlsState
	return res, false.nil
}
Copy the code

This function determines whether the response is normal and constructs (*http2ClientConn) the return value of.roundTrip based on the result of the response.

Now that you know handleReadLoopResponse, let’s look at the polling logic:

for {
	select {
	case re := <-readLoopResCh:
		return handleReadLoopResponse(re)
	// Omit the code (including polling for request cancellation, request timeout, etc.)
	case err := <-bodyWriter.resc:
		// Prefer the read loop's response, if available. Issue 16102.
		select {
		case re := <-readLoopResCh:
			return handleReadLoopResponse(re)
		default:}iferr ! =nil {
			cc.forgetStreamID(cs.ID)
			return nil, cs.getStartedWrite(), err
		}
		bodyWritten = true
		ifd := cc.responseHeaderTimeout(); d ! =0 {
			timer := time.NewTimer(d)
			defer timer.Stop()
			respHeaderTimer = timer.C
		}
	}
}
Copy the code

The author only describes the second case above where the request body has been sent:

  • Can the response be read? If the response can be read, return it directly.
  • Check whether the body request is successfully sent. If the request fails, the body is returned directly.
  • If the request body is sent successfully, the timeout for the response header is set.

conclusion

This paper mainly describes two aspects:

  1. Verify that both the client and server support the HTTP2 protocol, and establish an Http2 connection with a read loop on that connection.
  2. Get an HTTP2 connection from the HTTP2 connection pool and send the request and read the response.

trailer

In view of HTTTP2.0 content is more, and the length of the article is not easy to read, the author will follow the analysis of the content into two parts:

  1. Describes the data frame and flow control and the read loop to read the response and send itreadLoopResChThe pipe.
  2. Http2.0 header compression logic.

Finally, I sincerely hope that this article can be of some help to all readers.

Note:

  1. At the time of writing, the version of GO used by the author is GO1.14.2.
  2. The case of H2C is not considered in this paper.
  3. Because I’m analyzing the request flow, I didn’t set up the server locally, but instead used a picture step by step debug that supports http2 connections. Eg: dss0.bdstatic.com/5aV1bjqh_Q2…

reference

Developers.google.com/web/fundame…