Recently, several business parties have been throwing thread pool overflow exceptions on the Provider side, so someone has proposed a requirement for Dubbo thread pool monitoring. The implementation is relatively simple: on existing monitoring systems, you can count the Metric to the corresponding Metric.

The Dubbo thread pool has been changed in the new version (it is not clear which version it is, but I am currently running 2.7.8, so I will use this as a border in this article). Prior to 2.7.8, the corresponding thread pool will be stored in DataStore, which is an SPI interface. The default implementation class is SimpleDataStore. Since 2.7.8, thread pool information is no longer in the DataStore, corresponding interface is ExecutorRepository, ExecutorRepository is a SPI interface, the corresponding default implementation class DefaultExecutorRepository

DataStore

public class SimpleDataStore implements DataStore {
    // consuemr/provider -> port -> ExecutorService
    private ConcurrentMap<String, ConcurrentMap<String, Object>> data = new ConcurrentHashMap();
}
Copy the code

DataStore is relatively simple, and will not be explained much. Since I’m looking at the latest code (2.7.8), I can’t find out where to cache the corresponding ExecutorService to the DataStore, but I can find out where to use it: ThreadPoolStatusChecker

ThreadPoolStatusChecker ThreadPoolStatusChecker ThreadPoolStatusChecker ThreadPoolStatusChecker ThreadPoolStatusChecker ThreadPoolStatusChecker ThreadPoolStatusChecker ThreadPoolStatusChecker ThreadPoolStatusChecker ThreadPoolStatusChecker ThreadPoolStatusChecker ThreadPoolStatusChecker ThreadPoolStatusChecker ThreadPoolStatusChecker ThreadPoolStatusChecker ThreadPoolStatusChecker Ls, invoke, and Status, where the status command is some health status of the simple Dubbo service. Check items include:

public class DubboStatusChecker implements InitializingBean {

	private StatusChecker memoryStatusChecker;

	private StatusChecker	serverStatusChecker;

	private StatusChecker	threadPoolStatusChecker;

	private StatusChecker	registryStatusChecker;

	private StatusChecker	springStatusChecker;

	private StatusChecker	dataSourceStatusChecker;
}
Copy the code

ThreadPoolStatusChecker: ThreadPoolStatusChecker: ThreadPoolStatusChecker: ThreadPoolStatusChecker

@Activate public class ThreadPoolStatusChecker implements StatusChecker { @Override public Status check() { DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension(); Map<String, Object> executors = dataStore.get(CommonConstants.EXECUTOR_SERVICE_COMPONENT_KEY); StringBuilder msg = new StringBuilder(); Status.Level level = Status.Level.OK; for (Map.Entry<String, Object> entry : executors.entrySet()) { String port = entry.getKey(); ExecutorService executor = (ExecutorService) entry.getValue(); if (executor instanceof ThreadPoolExecutor) { ThreadPoolExecutor tp = (ThreadPoolExecutor) executor; OK Boolean OK = tp.getActivecount () < tp.getMaximumPoolSize() -1; Status.Level lvl = Status.Level.OK; if (! ok) { level = Status.Level.WARN; lvl = Status.Level.WARN; } if (msg.length() > 0) { msg.append(";" ); } msg.append("Pool status:").append(lvl).append(", max:").append(tp.getMaximumPoolSize()).append(", core:") .append(tp.getCorePoolSize()).append(", largest:").append(tp.getLargestPoolSize()).append(", active:") .append(tp.getActiveCount()).append(", task:").append(tp.getTaskCount()).append(", service port: ").append(port); } } return msg.length() == 0 ? new Status(Status.Level.UNKNOWN) : new Status(level, msg.toString()); }}Copy the code

As you can see, the check logic here is also relatively simple, as long as the number of active threads is less than the maximum number of threads is OK

Thread pool monitoring based on datastores

  1. Get to DataStore via SPI because DataStore caches all thread pool-related information
  2. Start a scheduled task to periodically collect thread pool information and report it to the monitoring system

ExecutorRepository

As mentioned earlier, DataStore was abolished in 2.7.8 and replaced with ExecutorRepository. Take a look at its interfaces

@SPI("default")
public interface ExecutorRepository {
    AbstractServer# constructor /AbstractClient#initExecutor
    ExecutorService createExecutorIfAbsent(URL url);

    // Get the thread pool from the URL
    ExecutorService getExecutor(URL url);

    void updateThreadpool(URL url, ExecutorService executor);

    ScheduledExecutorService nextScheduledExecutor(a);

    ScheduledExecutorService getServiceExporterExecutor(a);

    ExecutorService getSharedExecutor(a);
}
Copy the code
  1. The main concern is thatcreateExecutorIfAbsentandgetExecutorMethods, i.e.,When to saveandWhen to pick it up
  2. inSave forCorresponding thread pool time, based on parametersURL.URLThere are several core parameters: side, port, thread pool type, thread count related parameters
// provider/consumer -> port -> ExecutorService
private ConcurrentMap<String, ConcurrentMap<Integer, ExecutorService>> data = new ConcurrentHashMap<>();


public synchronized ExecutorService createExecutorIfAbsent(URL url) {
    / / EXECUTOR_SERVICE_COMPONENT_KEY is the ExecutorService. Class. GetName (), on behalf of the provider side here
    String componentKey = EXECUTOR_SERVICE_COMPONENT_KEY;
    if (CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY))) {
        componentKey = CONSUMER_SIDE;
    }
    Map<Integer, ExecutorService> executors = data.computeIfAbsent(componentKey, k -> new ConcurrentHashMap<>());
    // Port. If only one protocol is used in the application, there is usually only one port
    Integer portKey = url.getPort();
    ExecutorService executor = executors.computeIfAbsent(portKey, k -> createExecutor(url));
    return executor;
}


public ExecutorService getExecutor(URL url) {
    String componentKey = EXECUTOR_SERVICE_COMPONENT_KEY;
    if (CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY))) {
        componentKey = CONSUMER_SIDE;
    }
    Map<Integer, ExecutorService> executors = data.get(componentKey);

    if (executors == null) {
        return null;
    }

    Integer portKey = url.getPort();
    ExecutorService executor = executors.get(portKey);
    return executor;
}
Copy the code

The code is relatively simple, with a few caveats:

  1. In 2.7.8, the consumer-side thread pool model was modified. In my tests, the consumer-side thread pool was initialized, but it was no longer needed. For details, please refer to “Consumer side thread pool Model Transformation” on the official website
  2. Save forThe corresponding thread pool is requiredURLparameter
  3. ExecutorRepository does not expose the corresponding interface to fetch the thread pool cache

Thread pool monitoring based on ExecutorRepository

Since we need the URL parameter when we store the corresponding thread pool, how do we obtain the URL parameter? Consider the two sides, the consumer side and the provider side:

1. Consumer side: It makes little sense to monitor the consumer side’s thread pool, especially since Dubbo’s new version of the consumer side’s thread pool model has been modified. It is not clear when the consumer side’s initialized thread pool will be used. The getExecutor method is actually called every time an RPC call is made, and the code used is DubboInvoker#doInvoke -> AbstractInvoker#getCallbackExecutor. So can we do something in the Filter layer? In fact, this URL mainly involves parameters: side, port, thread pool type, thread count parameters. But there’s no need to do that.

DubboProtocol#export -> DubboProtocol#openServer: when a service is exposed, initialization of the thread pool is involved. So is it the other way around to get the corresponding URL? This can be obtained by DubboProtocol

Collection<Exporter<? > > exporters = DubboProtocol. GetDubboProtocol () getExporters through exporters () and then get the corresponding urlCopy the code

However, if multiple protocols are exposed in the application, only the thread pool corresponding to the Dubbo protocol (PORT) can be obtained, and the thread pool of other protocols is not counted

Another idea is to fetch all the protocols in the application, concatenate urls, and fetch the corresponding thread pool from ExecutorRepository