sequence

This article focuses on the EXECUtor of the JDK HttpClient

HttpClientImpl

java.net.http/jdk/internal/net/http/HttpClientImpl.java

    private HttpClientImpl(HttpClientBuilderImpl builder,
                           SingleFacadeFactory facadeFactory) {
        id = CLIENT_IDS.incrementAndGet();
        dbgTag = "HttpClientImpl(" + id +")";
        if(builder.sslContext == null) { try { sslContext = SSLContext.getDefault(); } catch (NoSuchAlgorithmException ex) { throw new InternalError(ex); }}else {
            sslContext = builder.sslContext;
        }
        Executor ex = builder.executor;
        if (ex == null) {
            ex = Executors.newCachedThreadPool(new DefaultThreadFactory(id));
            isDefaultExecutor = true;
        } else {
            isDefaultExecutor = false;
        }
        delegatingExecutor = new DelegatingExecutor(this::isSelectorThread, ex);
        facadeRef = new WeakReference<>(facadeFactory.createFacade(this));
        client2 = new Http2ClientImpl(this);
        cookieHandler = builder.cookieHandler;
        connectTimeout = builder.connectTimeout;
        followRedirects = builder.followRedirects == null ?
                Redirect.NEVER : builder.followRedirects;
        this.userProxySelector = Optional.ofNullable(builder.proxy);
        this.proxySelector = userProxySelector
                .orElseGet(HttpClientImpl::getDefaultProxySelector);
        if (debug.on())
            debug.log("proxySelector is %s (user-supplied=%s)",
                      this.proxySelector, userProxySelector.isPresent());
        authenticator = builder.authenticator;
        if (builder.version == null) {
            version = HttpClient.Version.HTTP_2;
        } else {
            version = builder.version;
        }
        if (builder.sslParams == null) {
            sslParams = getDefaultParams(sslContext);
        } else {
            sslParams = builder.sslParams;
        }
        connections = new ConnectionPool(id);
        connections.start();
        timeouts = new TreeSet<>();
        try {
            selmgr = new SelectorManager(this);
        } catch (IOException e) {
            // unlikely
            throw new InternalError(e);
        }
        selmgr.setDaemon(true); filters = new FilterFactory(); initFilters(); assert facadeRef.get() ! = null; }Copy the code
  • Here if HttpClientBuilderImpl executor is null, will create the Executors. NewCachedThreadPool (new DefaultThreadFactory (id))

HttpClientImpl.sendAsync

java.net.http/jdk/internal/net/http/HttpClientImpl.java

    @Override
    public <T> CompletableFuture<HttpResponse<T>>
    sendAsync(HttpRequest userRequest, BodyHandler<T> responseHandler)
    {
        return sendAsync(userRequest, responseHandler, null);
    }

    @Override
    public <T> CompletableFuture<HttpResponse<T>>
    sendAsync(HttpRequest userRequest,
              BodyHandler<T> responseHandler,
              PushPromiseHandler<T> pushPromiseHandler) {
        return sendAsync(userRequest, responseHandler, pushPromiseHandler, delegatingExecutor.delegate);
    }

    private <T> CompletableFuture<HttpResponse<T>>
    sendAsync(HttpRequest userRequest,
              BodyHandler<T> responseHandler,
              PushPromiseHandler<T> pushPromiseHandler,
              Executor exchangeExecutor)    {

        Objects.requireNonNull(userRequest);
        Objects.requireNonNull(responseHandler);

        AccessControlContext acc = null;
        if(System.getSecurityManager() ! = null) acc = AccessController.getContext(); // Clone the, possibly untrusted, HttpRequest HttpRequestImpl requestImpl = new HttpRequestImpl(userRequest, proxySelector);if (requestImpl.method().equals("CONNECT"))
            throw new IllegalArgumentException("Unsupported method CONNECT");

        long start = DEBUGELAPSED ? System.nanoTime() : 0;
        reference();
        try {
            if (debugelapsed.on())
                debugelapsed.log("ClientImpl (async) send %s", userRequest);

            // When using sendAsync(...) we explicitly pass the
            // executor's delegate as exchange executor to force // asynchronous scheduling of the exchange. // When using send(...) we don't specify any executor
            // and default to using the client's delegating executor // which only spawns asynchronous tasks if it detects // that the current thread is the selector manager // thread. This will cause everything to execute inline // until we need to schedule some event with the selector. Executor executor = exchangeExecutor == null ? this.delegatingExecutor : exchangeExecutor; MultiExchange
      
        mex = new MultiExchange<>(userRequest, requestImpl, this, responseHandler, pushPromiseHandler, acc); CompletableFuture
       
        > res = mex.responseAsync(executor).whenComplete((b,t) -> unreference()); if (DEBUGELAPSED) { res = res.whenComplete( (b,t) -> debugCompleted("ClientImpl (async)", start, userRequest)); } // makes sure that any dependent actions happen in the CF default // executor. This is only needed for sendAsync(...) , when // exchangeExecutor is non-null. if (exchangeExecutor ! = null) { res = res.whenCompleteAsync((r, t) -> { /* do nothing */}, ASYNC_POOL); } return res; } catch(Throwable t) { unreference(); debugCompleted("ClientImpl (async)", start, userRequest); throw t; }}
       
      Copy the code
  • If here is sendAsync, executor of parameter passing is delegatingExecutor delegate; If the send method is synchronous, the executor will send null
  • WhenComplete ((b,t) -> unreference())) where executor is used

MultiExchange.responseAsync

java.net.http/jdk/internal/net/http/MultiExchange.java

    public CompletableFuture<HttpResponse<T>> responseAsync(Executor executor) {
        CompletableFuture<Void> start = new MinimalFuture<>();
        CompletableFuture<HttpResponse<T>> cf = responseAsync0(start);
        start.completeAsync( () -> null, executor); // trigger execution
        return cf;
    }

    private CompletableFuture<HttpResponse<T>>
    responseAsync0(CompletableFuture<Void> start) {
        return start.thenCompose( v -> responseAsyncImpl())
                    .thenCompose((Response r) -> {
                        Exchange<T> exch = getExchange();
                        return exch.readBodyAsync(responseHandler)
                            .thenApply((T body) -> {
                                this.response =
                                    new HttpResponseImpl<>(r.request(), r, this.response, body, exch);
                                return this.response;
                            });
                    });
    }
Copy the code
  • You can see that the completeAsync method for CompletableFuture is used here (Note that this method is unique to java9Executor is also used here
  • Due to the default is to use the Executors. NewCachedThreadPool create executor, attention should be paid to control concurrency and task execution time, to prevent the number of threads unlimited growth excessive consumption of system resources
    /**
     * Creates a thread pool that creates new threads as needed, but
     * will reuse previously constructed threads when they are
     * available, and uses the provided
     * ThreadFactory to create new threads when needed.
     *
     * @param threadFactory the factory to use when creating new threads
     * @return the newly created thread pool
     * @throws NullPointerException if threadFactory is null
     */
    public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
    }
Copy the code

RejectedExecutionException

  • The sample code
    @Test
    public void testAsyncPool(){
        ThreadPoolExecutor executor = ThreadPoolBuilder.fixedPool()
                .setPoolSize(2)
                .setQueueSize(5)
                .setThreadNamePrefix("test-") .build(); List<CompletableFuture<String>> futureList = intstream.rangeclosed (1,100).mapToobj (I -> new) CompletableFuture<String>()) .collect(Collectors.toList()); futureList.stream() .forEach(future -> { future.completeAsync(() -> { try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e1) { e1.printStackTrace(); }return "message"; },executor); }); CompletableFuture.allOf(futureList .toArray(new CompletableFuture<? >[futureList.size()])) .join(); }Copy the code

Create a fixedPool and specify queueSize to 5

  • Log output
java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.CompletableFuture$AsyncSupply@76b10754 rejected from java.util.concurrent.ThreadPoolExecutor@2bea5ab4[Running, pool size = 2, active threads = 2, queued tasks = 5, completed tasks = 0]

	at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2055)
	at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:825)
	at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1355)
	at java.base/java.util.concurrent.CompletableFuture.completeAsync(CompletableFuture.java:2591)
Copy the code

You can see that thread pool queue size has a limiting effect

summary

Asynchronously JDK httpclient executor in use, when the default is created using the Executors. NewCachedThreadPool create executor, the thread pool size is an Integer. MAX_VALUE, So be careful when you use it, it’s best to change to bounded queues, and then add thread pool monitoring.

doc

  • java.net.http javadoc