In this article, we will talk about the notification and alarm module of the Dynamic Thread Pool open Source project. Currently, the project provides the following notification and alarm functions. For each notification item, you can independently configure whether to enable, alarm threshold, alarm interval, and platform. For details, see notify package of the Core module.

1. Notification of change of core parameters

2. The thread pool activity alarm is generated

3. The queue capacity alarm is generated

4. Deny the policy alarm

5. The task execution timeout alarm is generated

6. The task queuing timeout alarm is generated


DynamicTp project address

Currently 700STAR, thank you for your star, welcome PR, spare time to contribute to open source together

Gitee address: gitee.com/yanhom/dyna…

Github address: github.com/lyh200/dyna…


series

Us group dynamic thread pool practice ideas, open source

Dynamic thread pool framework (DynamicTp), monitoring and source code parsing

Dynamic Thread Pool (DynamicTp) : dynamically adjusts Tomcat, Jetty, and Undertow thread pool parameters


Thread pool interpretation

The ThreadPoolExecutor class inherits from the ThreadPoolExecutor class in JUC.

Executor provides a way to decouple the submission and execution of tasks by defining only a Runnable command (EXECUTE) method for submitting tasks and leaving it up to implementers to customize the execution of tasks.

The ExecutorService interface inherits Executor and extends the lifecycle management method, the method of returning Futrue, and the method of submitting tasks in batches

void shutdown(a);
List<Runnable> shutdownNow(a);
boolean isShutdown(a);
boolean isTerminated(a);
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;

<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result); Future<? > submit(Runnable task); <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit) throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
Copy the code

The Abstract Class AbstractExecutorService inherits the ExecutorService interface, provides a default implementation of executorService-related methods, and wraps Runnable tasks with the RunnableFuture implementation class FutureTask. Given to the execute() method, the FutureTask block can then fetch execution results and orchestrate the submission of batch tasks

protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
    return new FutureTask<T>(runnable, value);
}
    
publicFuture<? > submit(Runnable task) {if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}
Copy the code

ThreadPoolExecutor inherits AbstractExecutorService and uses the pooling concept to manage a certain number of threads to schedule the execution of submitted tasks. It defines a set of thread pool lifecycle states. A CTL variable is used to hold both the current pool state (3 bits higher) and the current pool thread count (29 bits lower). If you look at the source code, you will notice that the ThreadPoolExecutor class has a large number of methods that require both the state of the pool and the number of threads in the pool to be updated. Putting an atomic variable in a ThreadPoolExecutor class is a good way to keep data consistency and code simplicity.

  // Use this variable to store the current pool state (up 3 bits) and the current thread count (down 29 bits)
  private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); 
  private static final int COUNT_BITS = Integer.SIZE - 3;
  private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

  // runState is stored in the high-order bits
  // It can accept new task submissions and also process tasks in the task queue
  / / the result: 111 with 29 0:11 1, 00000000000000000000000000000
  private static final int RUNNING    = -1 << COUNT_BITS;
  
  // Does not accept new task submissions, but processes tasks in the task queue
  / / the result: 000 00000000000000000000000000000
  private static final int SHUTDOWN   =  0 << COUNT_BITS;
  
  // Does not accept new tasks, does not execute tasks in the queue, and interrupts ongoing tasks
  / / the result: 001 00000000000000000000000000000
  private static final int STOP       =  1 << COUNT_BITS;
  
  // The task queue is empty, workerCount = 0, and the thread pool state terminated() is executed when it transitions to TIDYING state.
  / / the result: 010 00000000000000000000000000000
  private static final int TIDYING    =  2 << COUNT_BITS;
  
  // call terminated() hook method into terminated state
  / / the result: 010 00000000000000000000000000000
  private static final int TERMINATED =  3 << COUNT_BITS;

  // Packing and unpacking ctl
  // The lower 29 bits go to 0 to get the state of the thread pool
  private static int runStateOf(int c)     { return c & ~CAPACITY; }
  // The top three bits become 0 to get the number of threads in the thread pool
  private static int workerCountOf(int c)  { return c & CAPACITY; }
  private static int ctlOf(int rs, int wc) { return rs | wc; }
Copy the code

The core entry execute() method executes the following logic:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null.false);
    }
    else if(! addWorker(command,false))
        reject(command);
}
Copy the code

Can summarize the following main execution process, of course, see the above code will have some abnormal branch judgment, can be added to the following execution process

1. Check the thread pool status. If the thread pool status is not RUNNING, execute the deny policy

2. If the current number of threads is less than the core thread pool, create a new thread to process the submitted task

3. If the number of current threads is greater than the number of core threads and the task queue is not full, the task is put into the task queue for execution

4. If the core thread pool is less than the number of current thread pools is less than the maximum number of threads and the task queue is full, a new thread is created to execute the submitted task

5. If the number of current threads is greater than the maximum number of threads and the queue is full, the task is rejected

AddWorker () method logic

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            // Get the current pool status
            int rs = runStateOf(c);

            If the thread pool status is > SHUTDOWN, return false, otherwise 2
            If the thread pool state is SHUTDOWN and firstTask is not null, return false, because the thread pool in SHUTDOWN state cannot accept new tasks, otherwise 3
            // 3. If the thread pool status = SHUTDOWN and firstTask == null, return false if the task queue is empty
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null&&! workQueue.isEmpty()))return false;

            for (;;) {
                int wc = workerCountOf(c);
                // 1. If the number of threads in the current thread pool is greater than or equal to CAPACITY (the theoretical maximum of 500 million), return fasLE
                // 2. If the number of current pool threads >= corePoolSize, return false
                // 3. If the number of current pool threads >= maximumPoolSize when creating non-core threads, return false
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // cas increases the number of current pool threads, and exits the loop if it succeeds
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                // cas fails to increase the number of threads in the current pool (multi-thread concurrency), then CTL is acquired again to calculate the current thread pool state. If it is not equal to the state RS calculated above, it indicates that the thread pool state has changed, and it needs to jump to the outer loop to re-judge the state
                if(runStateOf(c) ! = rs)continue retry;
                // else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            // Create a Worker thread to execute the task
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if(t ! =null) {
                final ReentrantLock mainLock = this.mainLock;
                // The mainLock is required to access the worker set
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    // 1. The current pool state is < SHUTDOWN, which is the RUNNING state. If it is started, an exception is raised
                    // 2. The current pool status = SHUTDOWN, and firstTask == null, the task in the task queue needs to be processed, if started, throw an exception
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        // Add to workers collection
                        workers.add(w);
                        int s = workers.size();
                        // Determine the maximum number of threads in the update history
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true; }}finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    // Start a new thread
                    t.start();
                    workerStarted = true; }}}finally {
            if (! workerStarted)
                WorkerCount --, remove the worker from workers
                addWorkerFailed(w);
        }
        return workerStarted;
    }
Copy the code

Threads in the Thread pool are not directly used by Thread class. Instead, they define an internal Thread Worker class, which implements AQS and Runnable interfaces, and then hold a reference to Thread class and a firstTask (the firstTask to be executed after creation). Each Worker thread starts and executes the run() method, which calls the execution outer runWorker(Worker w) method

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        // 1. If the task is not empty, it is executed directly as the first task of the thread
        // 2. If the task is empty, get the task execution from the task queue using the getTask() method
        while(task ! =null|| (task = getTask()) ! =null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted. This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            // Thread pool status >= STOP, then interrupt the thread
            if((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && ! wt.isInterrupted()) wt.interrupt();try {
                // The hook method called before the actual execution of the task
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    // Perform the actual task
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    // The hook method called after the task executesafterExecute(task, thrown); }}finally {
                // Set the task to null and get a new task
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        // If there is no task to execute, execute worker destruction logicprocessWorkerExit(w, completedAbruptly); }}Copy the code

GetTask () method logic

private Runnable getTask(a) {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        
        // Decrement the number of worker threads in two cases
        // 1. rs >= STOP
        // 2. rs == SHUTDOWN && workQueue.isEmpty()
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        // Allow the core thread to timeout or the number of current threads > the number of core threads
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        SetMaximumPoolSize () = setMaximumPoolSize()
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            // Block the queue to get the task
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if(r ! =null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            // An interrupt occurred and retry
            timedOut = false; }}}Copy the code

The ThreadPoolExecutor inheritance system and the core source code are described in detail. With that in mind, DynamicTp provides alarm notification capabilities.


Notification of change of core parameters

The listener in the corresponding configuration center listens for configuration changes, encapsulates them in DtpProperties, and then submits them to the Refresh () method in the DtpRegistry class for configuration updates, highlighting the changed fields when notified


Thread pool activity alarm

Activity = activeCount/maximumPoolSize

After the service is started, a scheduled monitoring task is enabled. Every time (configurable), the thread pool activity is calculated. When the threshold is reached, an alarm is generated


Queue Capacity Alarm

Capacity usage = queueSize/queueCapacity

After the service is started, a scheduled monitoring task is enabled and the task queue usage is calculated at intervals. When the task queue usage reaches the threshold, an alarm is generated. If the task queue usage is triggered for multiple times within the alarm interval, no alarm notification is sent


Reject policy Alarm

/**
 * Do sth before reject.
 * @param executor ThreadPoolExecutor instance
 */
default void beforeReject(ThreadPoolExecutor executor) {
    if (executor instanceof DtpExecutor) {
        DtpExecutor dtpExecutor = (DtpExecutor) executor;
        dtpExecutor.incRejectCount(1); Runnable runnable = () -> AlarmManager.doAlarm(dtpExecutor, REJECT); AlarmManager.triggerAlarm(dtpExecutor.getThreadPoolName(), REJECT.getValue(), runnable); }}Copy the code

If the number of threads in the thread pool reaches the upper limit and the task queue is full, the rejection policy is triggered. The RejectedExecutionHandler used by the DtpExecutor thread pool is wrapped with a dynamic proxy and executes the RejectedAware beforeReject() method before executing the specific rejection policy. This method does reject quantity accumulation (total value accumulation, period value accumulation). If the period accumulated reaches the threshold, an alarm notification is triggered (the period accumulated value is reset to 0 and the last alarm time is reset to the current time). If an alarm is triggered for multiple times within the alarm interval, no alarm notification is sent


The task queue timeout alarm is generated

Overwrite ThreadPoolExecutor’s execute() and beforeExecute() methods to wrap tasks with DtpRunnable if execution timeout or queued timeout values are configured, and record the submitTime of the task’s commit time. BeforeExecute calculates the waiting time of the task in the queue based on the difference between the current time and submitTime. It calculates the number of queueTimeout tasks (total value and period value) if the difference is greater than queueTimeout. If the period accumulated reaches the threshold, an alarm notification is triggered (the period accumulated value is reset to 0 and the last alarm time is reset to the current time). If an alarm is triggered for multiple times within the alarm interval, no alarm notification is sent

@Override
public void execute(Runnable command) {
    if (CollUtil.isNotEmpty(taskWrappers)) {
        for(TaskWrapper t : taskWrappers) { command = t.wrap(command); }}if (runTimeout > 0 || queueTimeout > 0) {
        command = new DtpRunnable(command);
    }
    super.execute(command);
}
Copy the code
@Override
protected void beforeExecute(Thread t, Runnable r) {
    if(! (rinstanceof DtpRunnable)) {
        super.beforeExecute(t, r);
        return;
    }
    DtpRunnable runnable = (DtpRunnable) r;
    long currTime = System.currentTimeMillis();
    if (runTimeout > 0) {
        runnable.setStartTime(currTime);
    }
    if (queueTimeout > 0) {
        long waitTime = currTime - runnable.getSubmitTime();
        if (waitTime > queueTimeout) {
            queueTimeoutCount.incrementAndGet();
            Runnable alarmTask = () -> AlarmManager.doAlarm(this, QUEUE_TIMEOUT);
            AlarmManager.triggerAlarm(this.getThreadPoolName(), QUEUE_TIMEOUT.getValue(), alarmTask); }}super.beforeExecute(t, r);
}
Copy the code


The task execution timeout alarm is generated

Override afterExecute() of ThreadPoolExecutor to calculate the actual execution time of the task based on the difference between the current time and the startTime set in beforeExecute(). Then, if the difference is greater than the configured runTimeout, the number of queued timeout tasks is added up (the total value is added up, the period value is added up). If the period accumulated reaches the threshold, an alarm notification is triggered (the period accumulated value is reset to 0 and the last alarm time is reset to the current time). If an alarm is triggered for multiple times within the alarm interval, no alarm notification is sent

@Override
protected void afterExecute(Runnable r, Throwable t) {

    if (runTimeout > 0) {
        DtpRunnable runnable = (DtpRunnable) r;
        long runTime = System.currentTimeMillis() - runnable.getStartTime();
        if (runTime > runTimeout) {
            runTimeoutCount.incrementAndGet();
            Runnable alarmTask = () -> AlarmManager.doAlarm(this, RUN_TIMEOUT);
            AlarmManager.triggerAlarm(this.getThreadPoolName(), RUN_TIMEOUT.getValue(), alarmTask); }}super.afterExecute(r, t);
}
Copy the code


Configuration items related to alarm notification

If you want to use to inform the alarm function, the configuration file must be configured platforms fields, and can configure multiple platforms, such as nailing, micro enterprises, etc.; NotifyItems Configures alarm items, including threshold, platform, and alarm interval.

spring:
  dynamic:
    tp:
      # omit other items
      platforms:                         # Notification platform
        - platform: wechat
          urlKey: 38a98-0c5c3b649c
          receivers: test
        - platform: ding
          urlKey: f80db3e801d593604f4a08dcd6a
          secret: SECb5444a6f375d5b9d21
          receivers: 17811511815
      executors:                                   # dynamic thread pool configuration, all have default value, use default value can not configure this item, reduce configuration
        - threadPoolName: dtpExecutor1
          executorType: common                          The thread pool types are common and eager
          corePoolSize: 2
          maximumPoolSize: 4
          queueCapacity: 200
          queueType: VariableLinkedBlockingQueue       QueueTypeEnum enum class
          rejectedHandlerType: CallerRunsPolicy        View the RejectedTypeEnum enum class
          keepAliveTime: 50
          allowCoreThreadTimeOut: false
          threadNamePrefix: dtp1                         # thread name prefix
          waitForTasksToCompleteOnShutdown: false        # Refer to Spring thread pool design
          awaitTerminationSeconds: 5                     # unit (s)
          preStartAllCoreThreads: false                  Whether to preheat the core thread. The default is false
          runTimeout: 200                                # Task execution timeout threshold, currently used only for alarms, unit: ms
          queueTimeout: 100                              Timeout threshold for a task to wait in a queue (ms)
          taskWrapperNames: ["ttl"]                      The TaskWrapper name integrates the TaskWrapper interface
          notifyItems:                     # Alarm items, if not configured, will be automatically configured with the default value (change notification, capacity alarm, activity alarm, reject alarm, task timeout alarm)
            - type: capacity               NotifyTypeEnum Enum class
              threshold: 80                # Alarm threshold
              platforms: [ding.wechat]     # optional configuration, not configured by default with the configuration of the upper platforms so platform
              interval: 120                Alarm interval (unit: s)
            - type: change
            - type: liveness
              threshold: 80
              interval: 120
            - type: reject
              threshold: 1
              interval: 160
            - type: run_timeout
              threshold: 1
              interval: 120
            - type: queue_timeout
              threshold: 1
              interval: 140
Copy the code

conclusion

At the beginning of this article, we introduced the inheritance system of thread pool ThreadPoolExecutor, and the source code interpretation of the core process. Then we introduce the above six alarm notification capabilities provided by DynamicTp. We hope that by monitoring and alarm, we can sense the execution load of our service thread pool in time and make adjustments in the first time to prevent accidents.


To contact me

If you have any ideas or suggestions about the project, please add me to wechat for communication, or create issues to improve the project together

Public id: CodeFox

WeChat: yanhom1314