preface

Hi, everyone. Today I’m going to share with you the Dubbo Thread Pool model. In the previous chapter, we discussed Dubbo SPI. We learned that Dubbo SPI is essentially a reinforcement of the Service Provider Interface (SPI) extension point discovery mechanism of the JDK standard, while addressing some of the shortcomings of SPI in Java. And the extension of our ability to implement customization using Dubbo SPI. The Dubbo threading model we will discuss in this chapter is also based on SPI implementation. What is the threading model? And how does that play into our project? So let’s discuss that in this chapter. Let’s get started quickly!

1. Introduction to the threading model

For those familiar with servlets, asynchronous non-blocking mode is supported starting with Servlet 3.x. As for what asynchronous non-blocking is, I have discussed in the previous chapters that you can learn by yourself. Let’s go through a simple flowchart for accessing a Web application:

In the above flowchart, we can see that the first request initiates a synchronous Web call, and then the Web initiates a call to the third-party service. The whole process is a synchronous call. The second request also initiates a synchronous call, but switches threads when making a third-party call (with Servlet 3.x we don’t need to manually create threads to do this). The advantage of this is that we can use dedicated processing thread pools for business processing or invocation of third-party services. So when do we need to switch threads and not use the main thread? If the logic for event processing is done quickly and no new I/O requests are made, such as just marking an id in memory, it is faster to process directly on the I/O thread because of reduced thread pool scheduling. However, if the event processing logic is slow or new I/O requests need to be made, such as database queries or other service calls, they must be sent to the thread pool. Otherwise, the I/O thread will block and cannot receive other requests.

2. Usage

That gives us in Dubbo a combination of different dispatch strategies and different thread pool configurations to deal with different scenarios. The configuration mode is as follows:

<dubbo:protocol name="dubbo" dispatcher="all" threadpool="fixed" threads="100" />
Copy the code

The following is a brief description of the dispatcher and ThreadPool parameters:

  1. Dispatcher
  • allAll messages are dispatched to the thread pool, including requests, responses, connection events, disconnection events, heartbeats, and so on. (the default)
  • directNone of the messages are dispatched to the thread pool and are executed directly on the IO thread.
  • messageOnly the request response message is dispatched to the thread pool, other connection disconnection events, heartbeat, and other messages are executed directly on the IO thread.
  • executionOnly request messages are dispatched to the thread pool. No response, response, and other disconnection events, heartbeat, etc., are executed directly on the IO thread.
  • connectionOn an IO thread, disconnection events are queued, executed one by one, and other messages are dispatched to the thread pool.
  1. ThreadPool
  • fixedFixed size thread pool, threads created at startup, not closed, always held. (the default)
  • cachedCache thread pool, automatically deleted after idle minute, rebuilt if needed.
  • limitedA scalable thread pool, but the number of threads in the pool only grows, not shrinks. The purpose of only growing without shrinking is to avoid performance problems caused by sudden heavy traffic when shrinking.
  • eagerPriority to createWorkerThe thread pool. The number of tasks is greater thancorePoolSizeBut less thanmaximumPoolSizeIs created preferentiallyWorkerTo deal with tasks. When the number of tasks is greater thanmaximumPoolSize, the task is put into a blocking queue. Thrown when the blocking queue is fullRejectedExecutionException. (compared to thecached:cachedThe number of tasks exceedsmaximumPoolSizeInstead of putting the task on a blocking queue)

3. Application scenarios

From the previous introduction, we should understand why we need to switch threads, following a simple principle: if we are working on a task that requires new IO or a task that takes a long time to process, we can put that work into our task thread pool. Then we simply summarize the scenarios we often encounter at work:

  1. Computing service: In my previous job, I encountered a requirement that our vehicles and machines report data to the server in real time, and the server records the data and calculates and corrects the navigation data in real time. Therefore, we need a computational microservice, whose main job is to calculate and modify real-time data. Therefore, this service is a typical computational service, so we try to reduce thread switching in the calculation process and do the calculation in a thread as much as possible. This reduces the overhead of thread switching and provides computing speed.

  2. Gateway services: First we need to understand what a gateway is. The simple understanding is that all service entry, each service call must pass through the gateway to the corresponding service (similar to Nginx). Then the main work of the gateway is service forwarding (authentication, traffic limiting, etc.), which can be understood as initiating requests. Obviously making a request is starting a new IO so we can switch to the thread pool to handle it.

4. Example demonstration

Let’s demonstrate this by taking a list of books as an example. The following is the structure diagram of the project:

Since we are focusing on the configuration of the service provider, we will mainly look at the configuration of dubo-provider-xml.xml:


      
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"
       xmlns="http://www.springframework.org/schema/beans"
       xsi:schemaLocation="Http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd">

    <! Thread pool: fixed Fixed size: 100 -->
    <dubbo:protocol port="20880" name="dubbo" dispatcher="all" threadpool="fixed" threads="100" />

    <dubbo:application name="demo-provider" metadata-type="remote"/>

    <dubbo:registry address=Zookeeper: / / "127.0.0.1:2181"/>

    <bean id="bookFacade" class="com.muke.dubbocourse.test.provider.BookFacadeImpl"/>

    <! -- Exposed service for Dubbo service -->
    <dubbo:service interface="com.muke.dubbocourse.common.api.BookFacade" ref="bookFacade" />

</beans>
Copy the code

In the XML configuration above, dispatcher=”all” specifies the event distribution policy and threadpool=”fixed” threads=”100″ specifies the threadpool with a fixed size of 100.

5. Principle analysis

The distribution policy and thread pool are loaded using SPI in Dubbo. See the Dubbo SPI section above. Let’s dive into the subject and start by looking at the five event distribution strategies provided for us in Dubbo:

We here simple analysis of all distribution strategy other are similar partners to consult the source code analysis. Below we see org. Apache. Dubbo. Remoting. Transport. The dispatcher. All. AllChannelHandler core source code:

/ * * * *@className AllChannelHandler
 *       
 *@descriptionAll processing is distributed to the thread pool to process * *@author<a href="http://youngitman.tech"> Young IT male </a> * *@dateAs the 2020-03-05 * *@JunitTest: {@link  }     
 *
 *@versionV1.0.0 * * * /
public class AllChannelHandler extends WrappedChannelHandler {

    public AllChannelHandler(ChannelHandler handler, URL url) {
        super(handler, url);
    }

    /** ** Remote connection event callback **@author liyong 
     * @date 1:34 PM 2020/12/6 
     * @param channel 
     * @exception 
     * @return void 
     **/
    @Override
    public void connected(Channel channel) throws RemotingException {
        ExecutorService executor = getExecutorService();
        try {
            // Connect to remote events and put them into the thread pool for execution
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t); }}/** ** Port remote connection **@author liyong 
     * @date 1:34 PM 2020/12/6 
     * @param channel 
     * @exception 
     * @return void 
     **/
    @Override
    public void disconnected(Channel channel) throws RemotingException {
        ExecutorService executor = getExecutorService();
        try {
            // Disconnect processing events are put into the thread pool for execution
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException("disconnect event", channel, getClass() + " error when process disconnected event .", t); }}/** ** Received data callback **@author liyong 
     * @date 1:34 PM 2020/12/6 
     * @param channel 
     * @param message 
     * @exception 
     * @return void 
     **/
    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService executor = getPreferredExecutorService(message);
        try {
            // The received data is put into the thread pool for processing
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
        	if(message instanceof Request && t instanceof RejectedExecutionException){
                sendFeedback(channel, (Request) message, t);
                return;
        	}
            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t); }}/** ** Abnormal callback ** occurred@author liyong 
     * @date 1:35 PM 2020/12/6 
     * @param channel 
     * @param exception 
     * @exception 
     * @return void 
     **/
    @Override
    public void caught(Channel channel, Throwable exception) throws RemotingException {
        ExecutorService executor = getExecutorService();
        try {
            // An exception is put into the thread pool
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
        } catch (Throwable t) {
            throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t); }}}Copy the code

As you can see from the code comments above, the all processing strategy is to dispatch all messages to the thread pool, including requests, responses, connection events, disconnect events, heartbeats, and so on.

There are four main thread pool processing strategies:

We analyze with fixed strategy. We see org.apache.dubbo.com mon. Threadpool. Support. Fixed. FixedThreadPool core source code:

/** * create a fixed size thread pool **@see java.util.concurrent.Executors#newFixedThreadPool(int)
 */
public class FixedThreadPool implements ThreadPool {

    @Override
    public Executor getExecutor(URL url) {
        // The thread pool name
        String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
        // Thread pool size
        int threads = url.getParameter(THREADS_KEY, DEFAULT_THREADS);
        // Queue size
        int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
        return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
                // If queue size is 0, use synchronous queue
                queues == 0 ? new SynchronousQueue<Runnable>() :
                        // Otherwise use the specified size to block the queue
                        (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                : new LinkedBlockingQueue<Runnable>(queues)),
                new NamedInternalThreadFactory(name, true), newAbortPolicyWithReport(name, url)); }}Copy the code

The above source code creates a thread pool using a queue of the specified size, or a synchronous queue if the queue size is 0.

6. Summary

In this section, we mainly study the ThreadPool model in Dubbo. In Dubbo, we are provided with two ThreadPool models for policy adjustment: Dispatcher and ThreadPool. Dispatcher provides five policies: all, Direct, message, execution, and Connection. ThreadPool provides four policies: Fixed, cached, Limited, and eager. At the same time, we respectively learned the underlying implementation logic from the source code.

The highlights of this lesson are as follows:

  1. Understand the threading model in Dubbo

  2. What is the Dispatcher mode

  3. What is the ThreadPool pattern

  4. Understand the thread model implementation principle

Write in the last

This section is the last section of the Dubbo starting from scratch basic course in the Dubbo to Master series (” Learn Dubbo from scratch “, “Advanced Application of Dubbo”, “Dubbo source analysis”). Thank you for your long-term support. Due to my limited time and energy, I may be slow to update related topics in the following courses, please include more, thank you again for your attention. If you want to get the latest topics to share, please follow my wechat official account.

The author

Personally engaged in the financial industry, I have worked in chongqing’s first-class technical team of Yiji Pay, Sijian Technology and an online car hailing platform, and now I am working in a bank responsible for the construction of unified payment system. I have a strong interest in the financial industry. It also practices big data, data storage, automated integration and deployment, distributed microservices, responsive programming, and artificial intelligence. At the same time, he is also keen on technology sharing, creating public accounts and blog sites to share knowledge system. Concern public number: young IT male get latest technical article push!

Blog: Youngitman.tech

Wechat Official Account: