Above all: This article is about using @async for asynchronous tasks, and gives a preliminary overview and summary of thread pools, including some potholes encountered

Some thread pools used at work

The following code has been desensitized

1.newCachedThreadPool

    private void startTask(List<String> usersList){
        ExecutorService executor = Executors.newCachedThreadPool();
        executor.submit(()->{
		//do someting
        });
    }

Copy the code

2.newScheduledThreadPool


@Configuration
public class ScheduleConfig implements SchedulingConfigurer {

    @Override
    public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
        // The thread pool is corePoolSize
        taskRegistrar.setScheduler(Executors.newScheduledThreadPool(10)); }}Copy the code

If you install the Ali specification plug-in in IDEA, you will find that both of the above ways to create thread pools will return red. The reason is:

Do not use Executors to create a thread pool. Use ThreadPoolExecutor to clear the running rules of the thread pool and avoid resource depletion. * If the thread pool object returns by Executors, it has the following disadvantages:

  1. FixedThreadPool and SingleThreadPool:

    The allowed queue length is integer. MAX_VALUE, which may accumulate a large number of requests and result in OOM.

  2. CachedThreadPool:

    The number of threads allowed to be created is integer.max_value, which may create a large number of threads, resulting in OOM.

The CachedThreadPool is the same as newScheduledThreadPool because the maximum number of threads is set to integer.max_value.


    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }
Copy the code
    public static ExecutorService newCachedThreadPool(a) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
Copy the code

The newCachedThreadPool uses a synchronousQueue, also known as a 1 BlockingQueue, so the maximum number of threads allowed is integer.max_value. May create a large number of threads resulting in OOM.

Similarly ScheduledThreadPoolExecutor DelayedWorkQueue is used the initial size of 16. When the queue is full, new threads will be created, resulting in a large number of threads may be created, resulting in OOM.

JVM parameters -xMS64m -XMx192m-xSS1024K -xx :MetaspaceSize=64m -xx :MaxMetaspaceSize=128m


    @PostMapping("/newCachedThreadPoolExample")
    @ResponseBody
    public void  newCachedThreadPoolExample(a){
        ExecutorService executorService = Executors.newCachedThreadPool();
        while (true){
            executorService.submit(()->{
                log.info("submit:"+LocalDateTime.now());
                try {
                    Thread.sleep(1000);
                }catch(InterruptedException e){ e.printStackTrace(); }}); }}Copy the code

Initial startup conditions:

The explosion started when the interface was requested

And then it started to get stuck

More embarrassing is that there has been no error OOM situation, directly stuck dead.

conclusion

Although the thread pool above can avoid OOM in the case of external restrictions, it is recommended to customize the thread pool according to your business situation.

Use @async to quickly create an asynchronous task

1. application.yml

Here is the thread pool configuration. I will not go into details, but you can also configure config in code.

Selection of the thread pool buffer queue

Most of the problems above are related to the buffer queue of the thread pool. It is also important to choose a buffer queue that matches your business characteristics.

spring:
  task:
    execution:
      pool:
        # Maximum number of threads
        max-size: 16
        Number of core threads
        core-size: 16
        # Survival time
        keep-alive: 10s
        # queue size
        queue-capacity: 100
        Whether to allow core threads to timeout
        allow-core-thread-timeout: true
      # thread name prefix
      thread-name-prefix: async-task-

Copy the code

2.ThreadpoolApplication

To enable asynchronous tasks, add the @enableAsync annotation to the Application. If you choose to write config in code, you need to annotate the config file with @enableAsync.

@EnableAsync
@SpringBootApplication
public class ThreadpoolApplication {

    public static void main(String[] args) { SpringApplication.run(ThreadpoolApplication.class, args); }}Copy the code

3.AsyncTask

Write an asynchronous task-handling class that adds @async above the methods that need asynchrony enabled

@Component
@Slf4j
public class AsyncTask {
    @Async
    public void  asyncRun(a) throws InterruptedException {
        Thread.sleep(10);
        log.info(Thread.currentThread().getName()+": Processing completed"); }}Copy the code

4.AsyncService

Write a service that calls asynchronous methods

@Service
@Slf4j
public class AsyncService {
    @Autowired
    private AsyncTask asyncTask;

    public void  asyncSimpleExample(a) {
        try {
            log.info("service start");
            asyncTask.asyncRun();
            log.info("service end");
        }catch(InterruptedException e){ e.printStackTrace(); }}}Copy the code

5.AsyncController

Write a Controller to call AsyncService


/ * * *@author kurtl
 */
@Controller
@RequestMapping("/")
public class AsyncController {
    @Autowired
    private AsyncService asyncService;
    @PostMapping("/asyncSimpleExample")
    @ResponseBody
    public void  asyncSimpleExample(a){ asyncService.asyncSimpleExample(); }}Copy the code

Finally, this interface is requested

AsyncSimpleExample: asyncSimpleExample: asyncSimpleExample: asyncSimpleExample: asyncSimpleExample: asyncSimpleExample: asyncSimpleExample: asyncSimpleExample: asyncSimpleExample: asyncSimpleExample: asyncSimpleExample Instead, it returns directly, indicating that this is an asynchronous task. So far we have successfully created an asynchronous task with @async.

About @async and @enableAsync

Personally feel that the source code is a very important part of the annotations in the source code, reading the annotations can also help you quickly understand the role of the source code, etc., all I will put the important annotations a little translation

1. @ Async source code




@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Async {

	/**
	 * A qualifier value for the specified asynchronous operation(s).
	 * <p>May be used to determine the target executor to be used when executing
	 * the asynchronous operation(s), matching the qualifier value (or the bean
	 * name) of a specific {@link java.util.concurrent.Executor Executor} or
	 * {@link org.springframework.core.task.TaskExecutor TaskExecutor}
	 * bean definition.
	 * <p>When specified on a class-level {@code @Async} annotation, indicates that the
	 * given executor should be used for all methods within the class. Method-level use
	 * of {@code Async#value} always overrides any value set at the class level.
	 * @since3.1.2 * /

    /** * There are three very important parts of these comments * 1@AsyncMethod can only return Void or Future * 2@AsyncBy org. Springframework. Core. Task. TaskExecutor * or Java. Util. Concurrent. * 3 Executor to create a thread pool. To write a@AsyncScope is used on classes@AsyncIt will override the method@Async* /

	String value(a) default "";

}

Copy the code

2. @ EnableAsync source code




/**
 * Enables Spring's asynchronous method execution capability, similar to functionality
 * found in Spring's {@code <task:*>} XML namespace.
 *
 * <p>To be used together with @{@linkConfiguration Configuration} classes as follows, * enabling annotation-driven async processing for an entire Spring application context: * * <pre class="code"> * &#064; Configuration * &#064; EnableAsync * public class AppConfig {* *}</pre> *@ConfigurationAnnotations are used together, so@EnableAsyncShould * be added to thread pool Config or SpringBootApplication * {@code MyAsyncBean} is a user-defined type with one or more methods annotated with
 * either Spring's {@code @Async} annotation, the EJB 3.1 {@code @javax.ejb.Asynchronous}
 * annotation, or any custom annotation specified via the {@link#annotation} attribute. * The aspect is added transparently for any registered bean, for instance via this * configuration: * * <pre class="code"> * &#064; Configuration * public class AnotherAppConfig { * * &#064; Bean * public MyAsyncBean asyncBean() { * return new MyAsyncBean(); * } * }</pre> * * <p>By default, Spring will be searching for an associated thread pool definition: * either a unique {@link org.springframework.core.task.TaskExecutor} bean in the context,
 * or an {@link java.util.concurrent.Executor} bean named "taskExecutor" otherwise. If
 * neither of the two is resolvable, a {@linkOrg. Springframework. Core. Task. SimpleAsyncTaskExecutor} * by default spring will search TaskExecutor type of bean or first name to * The * SimpleAsyncTaskExecutor Executor bean does not exist using the * SimpleAsyncTaskExecutor Executor Executor. * will be used to process async method invocations. Besides, annotated methods having * *@author Chris Beams
 * @author Juergen Hoeller
 * @author Stephane Nicoll
 * @author Sam Brannen
 * @since 3.1
 * @see Async
 * @see AsyncConfigurer
 * @see AsyncConfigurationSelector
 */
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync {

	/**
	 * Indicate the 'async' annotation type to be detected at either class
	 * or method level.
	 * <p>By default, both Spring's @{@link Async} annotation and the EJB 3.1
	 * {@code @javax.ejb.Asynchronous} annotation will be detected. * <p>This attribute exists so that developers can provide their own * custom annotation type to indicate that a method (or all methods of * a given class) should be invoked asynchronously. * /
	Class<? extends Annotation> annotation() default Annotation.class;

	/**
	 * Indicate whether subclass-based (CGLIB) proxies are to be created as opposed
	 * to standard Java interface-based proxies.
	 * <p><strong>Applicable only if the {@link #mode} is set to {@link AdviceMode#PROXY}</strong>.
	 * <p>The default is {@code false}.
	 * <p>Note that setting this attribute to {@code true} will affect <em>all</em>
	 * Spring-managed beans requiring proxying, not just those marked with {@code @Async}.
	 * For example, other beans marked with Spring's {@code @Transactional} annotation * will be upgraded to subclass proxying at the same time. This approach has no * negative impact in practice unless one is explicitly expecting one type of proxy * vs. another &mdash; For example, in tests. * * This field is used to indicate whether to create a CGLIB based proxy. In fact, on older versions of Spring (around 3.x), whether to automatically choose JDK dynamic proxy or CGLIB. Other Spring-managed beans are also upgraded to the CGLIB proxy */
	boolean proxyTargetClass(a) default false;

	/**
	 * Indicate how async advice should be applied.
	 * <p><b>The default is {@link AdviceMode#PROXY}.</b>
	 * Please note that proxy mode allows for interception of calls through the proxy
	 * only. Local calls within the same class cannot get intercepted that way; an
	 * {@link Async} annotation on such a method within a local call will be ignored
	 * since Spring's interceptor does not even kick in for such a runtime scenario.
	 * For a more advanced mode of interception, consider switching this to
	 * {@linkAdviceMode#ASPECTJ}. * this field is used to identify the mode of asynchronous notification. The default PROXY field is * PROXY. When this field is * PROXY, non-asynchronous methods in the same class call asynchronous methods, the different * step will not take effect. If you want to implement a non-asynchronous method of the same class to call an asynchronous method, this should be set to ASPECTJ */
	AdviceMode mode(a) default AdviceMode.PROXY;

	/**
	 * Indicate the order in which the {@link AsyncAnnotationBeanPostProcessor}
	 * should be applied.
	 * <p>The default is {@linkOrdered#LOWEST_PRECEDENCE} in order to run * after all other post-processors, So that it can add an advisor to * existing proxies proxies rather than double-proxies. By default, the lowest priority * (integer. MAX_VALUE, the smaller the value, the higher the priority) */
	int order(a) default Ordered.LOWEST_PRECEDENCE;

}


Copy the code

In the source code of the above, in fact, the core code only, @ Import (AsyncConfigurationSelector. Class), the introduction of related configuration.




/**
 * Selects which implementation of {@link AbstractAsyncConfiguration} should
 * be used based on the value of {@link EnableAsync#mode} on the importing
 * {@code @Configuration} class.
 *
 * @author Chris Beams
 * @author Juergen Hoeller
 * @since 3.1
 * @see EnableAsync
 * @see ProxyAsyncConfiguration
 */
public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {

	private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME =
			"org.springframework.scheduling.aspectj.AspectJAsyncConfiguration";


	/**
	 * Returns {@link ProxyAsyncConfiguration} or {@code AspectJAsyncConfiguration}
	 * for {@code PROXY} and {@code ASPECTJ} values of {@link EnableAsync#mode()},
	 * respectively.
	 */
	/** * This whole method is essentially a selector, much like the selectImports() method of the ImportSelector interface, which loads different configuration classes */ based on different proxy modes
	@Override
	@Nullable
	public String[] selectImports(AdviceMode adviceMode) {

		switch (adviceMode) {
			case PROXY:
				return new String[] {ProxyAsyncConfiguration.class.getName()};
			case ASPECTJ:
				return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};
			default:
				return null; }}}Copy the code

Let’s look at the default ProxyAsyncConfiguration. Class




/ * * * {@code @Configuration} class that registers the Spring infrastructure beans necessary
 * to enable proxy-based asynchronous method execution.
 *
 * @author Chris Beams
 * @author Stephane Nicoll
 * @author Juergen Hoeller
 * @since 3.1
 * @see EnableAsync
 * @see AsyncConfigurationSelector
 */
@Configuration
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
/ / inherited AbstractAsyncConfiguration class
public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {

	@Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)
	@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
	public AsyncAnnotationBeanPostProcessor asyncAdvisor(a) {
		Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");
		/ / initialize AsyncAnnotationBeanPostProcessor type of bean
		AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
		// Sets the actuator and exception handler
		bpp.configure(this.executor, this.exceptionHandler);
		Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");
		/ / set the annotation
		if(customAsyncAnnotation ! = AnnotationUtils.getDefaultValue(EnableAsync.class,"annotation")) {
			bpp.setAsyncAnnotationType(customAsyncAnnotation);
		}
		// Set the annotation properties
		bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));
		bpp.setOrder(this.enableAsync.<Integer>getNumber("order"));
		returnbpp; }}Copy the code

This a class inherits the AbstractAsyncConfiguration class, in fact also did a thing initialization AsyncAnnotationBeanPostProcessor, @ Async annotation is through AsyncAnnotationBeanPostProcessor this post processor to generate a proxy object to implement the asynchronous, we see first inherit the config.




/**
 * Abstract base {@code Configuration} class providing common structure for enabling
 * Spring's asynchronous method execution capability.
 *
 * @author Chris Beams
 * @author Juergen Hoeller
 * @author Stephane Nicoll
 * @since 3.1
 * @see EnableAsync
 */
@Configuration
public abstract class AbstractAsyncConfiguration implements ImportAware {

	@Nullable
	protected AnnotationAttributes enableAsync; / /; //enableAsync annotation property

	@Nullable
	protected Supplier<Executor> executor; // Thread executor

	@Nullable
	protected Supplier<AsyncUncaughtExceptionHandler> exceptionHandler; // The exception handler corresponds to the code above


	@Override
	// Set the annotation properties
	public void setImportMetadata(AnnotationMetadata importMetadata) {
		this.enableAsync = AnnotationAttributes.fromMap(
				importMetadata.getAnnotationAttributes(EnableAsync.class.getName(), false));
		if (this.enableAsync == null) {
			throw new IllegalArgumentException(
					"@EnableAsync is not present on importing class "+ importMetadata.getClassName()); }}/**
	 * Collect any {@link AsyncConfigurer} beans through autowiring.
	 */
	@Autowired(required = false)
	// Sets the actuator and exception handler
	void setConfigurers(Collection<AsyncConfigurer> configurers) {
		if (CollectionUtils.isEmpty(configurers)) {
			return;
		}
		if (configurers.size() > 1) {
			throw new IllegalStateException("Only one AsyncConfigurer may exist");
		}
		AsyncConfigurer configurer = configurers.iterator().next();
		this.executor = configurer::getAsyncExecutor;
		this.exceptionHandler = configurer::getAsyncUncaughtExceptionHandler; }}Copy the code

Throughout the code structure is very clear, we return to the previous class, look at his set of bean AsyncAnnotationBeanPostProcessor. This bean is very complex, so it’s a class diagram. Figure out baen’s life cycle. AsyncAnnotationBeanPostProcessor is a rear processors, so we look for the parent class AbstractAdvisingBeanPostProcessor.




/**
 * Base class for {@link BeanPostProcessor} implementations that apply a
 * Spring AOP {@link Advisor} to specific beans.
 *
 * @author Juergen Hoeller
 * @since3.2 * /
@SuppressWarnings("serial")
public abstract class AbstractAdvisingBeanPostProcessor extends ProxyProcessorSupport implements BeanPostProcessor {

	@Nullable
	protected Advisor advisor;

	protected boolean beforeExistingAdvisors = false;

	private finalMap<Class<? >, Boolean> eligibleBeans =new ConcurrentHashMap<>(256);



	public void setBeforeExistingAdvisors(boolean beforeExistingAdvisors) {
		this.beforeExistingAdvisors = beforeExistingAdvisors;
	}


	@Override
	public Object postProcessBeforeInitialization(Object bean, String beanName) {
		return bean;
	}

	@Override
	public Object postProcessAfterInitialization(Object bean, String beanName) {
		 // No notification, or AopInfrastructureBean, then no proxy
		if (this.advisor == null || bean instanceof AopInfrastructureBean) {
			// Ignore AOP infrastructure such as scoped proxies.
			return bean;
		}
		/ / add the advisor
		if (bean instanceof Advised) {
			Advised advised = (Advised) bean;
			if(! advised.isFrozen() && isEligible(AopUtils.getTargetClass(bean))) {// Add our local Advisor to the existing proxy's Advisor chain...
				// This is done with beforeExistingAdvisors to decide whether to add notifications before or after all notifications
				// The default false is set to true in @async
				if (this.beforeExistingAdvisors) {
					advised.addAdvisor(0.this.advisor);
				}
				else {
					advised.addAdvisor(this.advisor);
				}
				returnbean; }}// construct the ProxyFactory ProxyFactory
		if (isEligible(bean, beanName)) {
			ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName);
			// Add an interface to the proxy
			if(! proxyFactory.isProxyTargetClass()) { evaluateProxyInterfaces(bean.getClass(), proxyFactory); }// Set the section
			proxyFactory.addAdvisor(this.advisor);
			customizeProxyFactory(proxyFactory);
			// Return the proxy class
			return proxyFactory.getProxy(getProxyClassLoader());
		}

		// No proxy needed.
		return bean;
	}

	//isEligible is used to determine whether this class or a method in the class contains annotations
	protected boolean isEligible(Object bean, String beanName) {
		returnisEligible(bean.getClass()); }}Copy the code

Proxyfactory.addadvisor (this.Advisor); An object Advisor of the AsyncAnnotationAdvisor class is held here: the buildAdvice() method generates notifications, and buildPointcut generates pointcuts. Navigate to the class’s buildPointcut method to see its pointcut matching rules.




@SuppressWarnings("serial")
public class AsyncAnnotationAdvisor extends AbstractPointcutAdvisor implements BeanFactoryAware {

	private Advice advice;

	private Pointcut pointcut;


	/**
	 * Create a new {@code AsyncAnnotationAdvisor} for bean-style configuration.
	 */
	public AsyncAnnotationAdvisor(a) {
		this((Supplier<Executor>) null, (Supplier<AsyncUncaughtExceptionHandler>) null);
	}


	@SuppressWarnings("unchecked")
	public AsyncAnnotationAdvisor(
			@Nullable Executor executor, @Nullable AsyncUncaughtExceptionHandler exceptionHandler) {

		this(SingletonSupplier.ofNullable(executor), SingletonSupplier.ofNullable(exceptionHandler));
	}


	@SuppressWarnings("unchecked")
	public AsyncAnnotationAdvisor(
			@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {

		Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2);
		asyncAnnotationTypes.add(Async.class);
		try {
			asyncAnnotationTypes.add((Class<? extends Annotation>)
					ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader()));
		}
		catch (ClassNotFoundException ex) {
			// If EJB 3.1 API not present, simply ignore.
		}
		this.advice = buildAdvice(executor, exceptionHandler);
		this.pointcut = buildPointcut(asyncAnnotationTypes);
	}



	public void setAsyncAnnotationType(Class<? extends Annotation> asyncAnnotationType) {
		Assert.notNull(asyncAnnotationType, "'asyncAnnotationType' must not be null");
		Set<Class<? extends Annotation>> asyncAnnotationTypes = new HashSet<>();
		asyncAnnotationTypes.add(asyncAnnotationType);
		this.pointcut = buildPointcut(asyncAnnotationTypes);
	}

	/**
	 * Set the {@code BeanFactory} to be used when looking up executors by qualifier.
	 */
	@Override
	public void setBeanFactory(BeanFactory beanFactory) {
		if (this.advice instanceof BeanFactoryAware) {
			((BeanFactoryAware) this.advice).setBeanFactory(beanFactory); }}@Override
	public Advice getAdvice(a) {
		return this.advice;
	}

	@Override
	public Pointcut getPointcut(a) {
		return this.pointcut;
	}

	// Build notification, a simple interceptor
	protected Advice buildAdvice(
			@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {

		AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);
		interceptor.configure(executor, exceptionHandler);
		return interceptor;
	}


	protected Pointcut buildPointcut(Set<Class<? extends Annotation>> asyncAnnotationTypes) {
		ComposablePointcut result = null;
		for (Class<? extends Annotation> asyncAnnotationType : asyncAnnotationTypes) {
			// Match CPC and MPC matchers
			// Check if the class has an @async annotation
			Pointcut cpc = new AnnotationMatchingPointcut(asyncAnnotationType, true);
			// Check if there is an @async annotation.
			Pointcut mpc = new AnnotationMatchingPointcut(null, asyncAnnotationType, true);
			if (result == null) {
				result = new ComposablePointcut(cpc);
			}
			else {
				result.union(cpc);
			}
			result = result.union(mpc);
		}
		return(result ! =null? result : Pointcut.TRUE); }}Copy the code

Find it again buildAdvice notification logic, is a blocker, generate AnnotationAsyncExecutionInterceptor object, for the Interceptor, focus on its core method to invoke. Its parent class AsyncExecutionInterceptor rewrite the AsyncExecutionInterceptor interface to invoke method. The following code





public class AsyncExecutionInterceptor extends AsyncExecutionAspectSupport implements MethodInterceptor.Ordered {


	public AsyncExecutionInterceptor(@Nullable Executor defaultExecutor) {
		super(defaultExecutor);
	}

	public AsyncExecutionInterceptor(@Nullable Executor defaultExecutor, AsyncUncaughtExceptionHandler exceptionHandler) {
		super(defaultExecutor, exceptionHandler);
	}



	@Override
	@Nullable
	//
	public Object invoke(final MethodInvocation invocation) throws Throwable { Class<? > targetClass = (invocation.getThis() ! =null ? AopUtils.getTargetClass(invocation.getThis()) : null);
		Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
		final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);
		// Get a thread pool
		AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
		if (executor == null) {
			throw new IllegalStateException(
					"No executor specified and no default executor set on AsyncExecutionInterceptor either");
		}
		 // This method is then wrapped as a Callable object and passed into the thread pool for execution
		Callable<Object> task = () -> {
			try {
				Object result = invocation.proceed();
				if (result instanceof Future) {
					// Block and wait for the result
					return ((Future<?>) result).get();
				}
			}
			catch (ExecutionException ex) {
				handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
			}
			catch (Throwable ex) {
				handleError(ex, userDeclaredMethod, invocation.getArguments());
			}
			return null;
		};

		return doSubmit(task, executor, invocation.getMethod().getReturnType());
	}


	@Override
	@Nullable
	protected String getExecutorQualifier(Method method) {
		return null;
	}


	@Override
	@Nullable
	protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
		Executor defaultExecutor = super.getDefaultExecutor(beanFactory);
		return(defaultExecutor ! =null ? defaultExecutor : new SimpleAsyncTaskExecutor());
	}

	@Override
	public int getOrder(a) {
		returnOrdered.HIGHEST_PRECEDENCE; }}Copy the code

As you can see, Invoke first wraps a Callable object and then passes in doSubmit, so the core of the code is in the doSubmit method.


	@Nullable
	protected Object doSubmit(Callable task, AsyncTaskExecutor executor, Class
         returnType) {
	// Determine if the class CompletableFuture exists, and use CompletableFuture to perform the task first
		if (CompletableFuture.class.isAssignableFrom(returnType)) {
			return CompletableFuture.supplyAsync(() -> {
				try {
					return task.call();
				}
				catch (Throwable ex) {
					throw new CompletionException(ex);
				}
			}, executor);
		}
		else if (ListenableFuture.class.isAssignableFrom(returnType)) {
			return ((AsyncListenableTaskExecutor) executor).submitListenable(task);
		}
		else if (Future.class.isAssignableFrom(returnType)) {
			return executor.submit(task);
		}
		else {
			executor.submit(task);
			return null; }}Copy the code

This is mainly to judge the different return values, and finally into the submit method, and according to the different thread pool, its implementation is also different, the following is the implementation of SimpleAsyncTaskExecutor.


	/**
	 * Template method for the actual execution of a task.
	 * <p>The default implementation creates a new Thread and starts it.
	 * @param task the Runnable to execute
	 * @see #setThreadFactory
	 * @see #createThread
	 * @see java.lang.Thread#start()
	 */
	protected void doExecute(Runnable task) {
		Thread thread = (this.threadFactory ! =null ? this.threadFactory.newThread(task) : createThread(task));
		thread.start();
	}


Copy the code

@async default thread pool

1. Always define a thread pool with @async

The source code above makes it clear that by default Spring searches for a TaskExecutor type bean or an Executor type bean named TaskExecutor, neither of which has the SimpleAsyncTaskExecutor Executor Executor Executor Executor Executor Executor Executor Executor Executor. But SimpleAsyncTaskExecutor is not a real thread pool. This class does not reuse threads and creates a new thread each time it is called. It is likely to result in OOM.





@SuppressWarnings("serial")
public class SimpleAsyncTaskExecutor extends CustomizableThreadCreator
		implements AsyncListenableTaskExecutor.Serializable {

	/**
	 * Permit any number of concurrent invocations: that is, don't throttle concurrency.
	 * @see ConcurrencyThrottleSupport#UNBOUNDED_CONCURRENCY
	 */
	public static final int UNBOUNDED_CONCURRENCY = ConcurrencyThrottleSupport.UNBOUNDED_CONCURRENCY;

	/**
	 * Switch concurrency 'off': that is, don't allow any concurrent invocations.
	 * @see ConcurrencyThrottleSupport#NO_CONCURRENCY
	 */
	public static final int NO_CONCURRENCY = ConcurrencyThrottleSupport.NO_CONCURRENCY;


	/** Internal concurrency throttle used by this executor. */
	private final ConcurrencyThrottleAdapter concurrencyThrottle = new ConcurrencyThrottleAdapter();

	@Nullable
	private ThreadFactory threadFactory;

	@Nullable
	private TaskDecorator taskDecorator;


	/** * Create a new SimpleAsyncTaskExecutor with default thread name prefix. */
	public SimpleAsyncTaskExecutor(a) {
		super(a); }/**
	 * Create a new SimpleAsyncTaskExecutor with the given thread name prefix.
	 * @param threadNamePrefix the prefix to use for the names of newly created threads
	 */
	public SimpleAsyncTaskExecutor(String threadNamePrefix) {
		super(threadNamePrefix);
	}

	/**
	 * Create a new SimpleAsyncTaskExecutor with the given external thread factory.
	 * @param threadFactory the factory to use for creating new Threads
	 */
	public SimpleAsyncTaskExecutor(ThreadFactory threadFactory) {
		this.threadFactory = threadFactory;
	}


	/**
	 * Specify an external factory to use for creating new Threads,
	 * instead of relying on the local properties of this executor.
	 * <p>You may specify an inner ThreadFactory bean or also a ThreadFactory reference
	 * obtained from JNDI (on a Java EE 6 server) or some other lookup mechanism.
	 * @see #setThreadNamePrefix
	 * @see #setThreadPriority
	 */
	public void setThreadFactory(@Nullable ThreadFactory threadFactory) {
		this.threadFactory = threadFactory;
	}

	/** * Return the external factory to use for creating new Threads, if any. */
	@Nullable
	public final ThreadFactory getThreadFactory(a) {
		return this.threadFactory;
	}


	public final void setTaskDecorator(TaskDecorator taskDecorator) {
		this.taskDecorator = taskDecorator;
	}


	// You can set the maximum number of threads, by limiting the number of threads
	public void setConcurrencyLimit(int concurrencyLimit) {
		this.concurrencyThrottle.setConcurrencyLimit(concurrencyLimit);
	}

	/** * Return the maximum number of parallel accesses allowed. */
	public final int getConcurrencyLimit(a) {
		return this.concurrencyThrottle.getConcurrencyLimit();
	}

	/**
	 * Return whether this throttle is currently active.
	 * @return {@code true} if the concurrency limit for this instance is active
	 * @see #getConcurrencyLimit()
	 * @see #setConcurrencyLimit
	 */
	public final boolean isThrottleActive(a) {
		return this.concurrencyThrottle.isThrottleActive();
	}


	/**
	 * Executes the given task, within a concurrency throttle
	 * if configured (through the superclass's settings).
	 * @see #doExecute(Runnable)
	 */
	@Override
	public void execute(Runnable task) {
		execute(task, TIMEOUT_INDEFINITE);
	}

	/**
	 * Executes the given task, within a concurrency throttle
	 * if configured (through the superclass's settings).
	 * <p>Executes urgent tasks (with 'immediate' timeout) directly,
	 * bypassing the concurrency throttle (if active). All other
	 * tasks are subject to throttling.
	 * @see #TIMEOUT_IMMEDIATE
	 * @see #doExecute(Runnable)
	 */
	//
	@Override
	public void execute(Runnable task, long startTimeout) {
		Assert.notNull(task, "Runnable must not be null");
		Runnable taskToUse = (this.taskDecorator ! =null ? this.taskDecorator.decorate(task) : task);
		if (isThrottleActive() && startTimeout > TIMEOUT_IMMEDIATE) {
			this.concurrencyThrottle.beforeAccess();
			doExecute(new ConcurrencyThrottlingRunnable(taskToUse));
		}
		else{ doExecute(taskToUse); }}@Override
	publicFuture<? > submit(Runnable task) { FutureTask<Object> future =new FutureTask<>(task, null);
		execute(future, TIMEOUT_INDEFINITE);
		return future;
	}

	@Override
	public <T> Future<T> submit(Callable<T> task) {
		FutureTask<T> future = new FutureTask<>(task);
		execute(future, TIMEOUT_INDEFINITE);
		return future;
	}

	@Override
	publicListenableFuture<? > submitListenable(Runnable task) { ListenableFutureTask<Object> future =new ListenableFutureTask<>(task, null);
		execute(future, TIMEOUT_INDEFINITE);
		return future;
	}

	@Override
	public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
		ListenableFutureTask<T> future = new ListenableFutureTask<>(task);
		execute(future, TIMEOUT_INDEFINITE);
		return future;
	}

	/**
	 * Template method for the actual execution of a task.
	 * <p>The default implementation creates a new Thread and starts it.
	 * @param task the Runnable to execute
	 * @see #setThreadFactory
	 * @see #createThread
	 * @see java.lang.Thread#start()
	 */
	// Check if there is a factory, if not, call the parent class to create the thread
	protected void doExecute(Runnable task) {
		Thread thread = (this.threadFactory ! =null ? this.threadFactory.newThread(task) : createThread(task));
		thread.start();
	}


	/**
	 * Subclass of the general ConcurrencyThrottleSupport class,
	 * making {@code beforeAccess()} and {@code afterAccess()}
	 * visible to the surrounding class.
	 */
	private static class ConcurrencyThrottleAdapter extends ConcurrencyThrottleSupport {

		@Override
		protected void beforeAccess(a) {
			super.beforeAccess();
		}

		@Override
		protected void afterAccess(a) {
			super.afterAccess(); }}/**
	 * This Runnable calls {@code afterAccess()} after the
	 * target Runnable has finished its execution.
	 */
	private class ConcurrencyThrottlingRunnable implements Runnable {

		private final Runnable target;

		public ConcurrencyThrottlingRunnable(Runnable target) {
			this.target = target;
		}

		@Override
		public void run(a) {
			try {
				this.target.run();
			}
			finally{ concurrencyThrottle.afterAccess(); }}}}Copy the code

The main thing is this code


	/**
	 * Template method for the actual execution of a task.
	 * <p>The default implementation creates a new Thread and starts it.
	 * @param task the Runnable to execute
	 * @see #setThreadFactory
	 * @see #createThread
	 * @see java.lang.Thread#start()
	 */
	// Check if there is a factory, if not, call the parent class to create the thread
	protected void doExecute(Runnable task) {
		Thread thread = (this.threadFactory ! =null ? this.threadFactory.newThread(task) : createThread(task));
		thread.start();
	}

Copy the code

Instead of using a thread pool, new threads are created directly, so creating a lot of threads leads to OOM. SetConcurrencyLimit setConcurrencyLimit setConcurrencyLimit setConcurrencyLimit setConcurrencyLimit setConcurrencyLimit setConcurrencyLimit setConcurrencyLimit So the conclusion is that you have to set the thread pool to use @async.

@async Asynchronous failure

The following code has been desensitized

When I was looking at the company code, I found this code

    public UserVO saveUser(HttpServletRequest request, String source) {
        String token = RequestUtils.getToken(request);
        String uid = checkUserLoginReturnUid(token);
        log.info("Login, token: {}, uid: {}", token, uid);
        // Get user information
        User User = getLoginUser(uid);
        if(User == null){
            User = new User();
            // Get user information
            Map<String,String> userMap = redisTemplateMain.getUserMapByToken(token);
            // Save the user
            saveUser(User, userMap, source);
            sendUserSystem(Integer.valueOf(userMap.get("id")));
        }
        // Put user information into the cache
        setAuth2Redis(User);
        return setUser2Redis(User);
    }


    // Notify the user system that we have successfully registered a user
    @Async
    public void sendUserSystem(Integer userId){
        Map<String,Object> map = new HashMap<>();
        map.put("mainUid", userId);
        map.put("source"."");
        String json = HttpUtil.post(property.userRegisterSendSystem, map);
        log.info("sendUserSystem userId : {}, json : {}", userId, json);
    }

Copy the code

As we have already known when we looked at the source code before, since the AdviceMode of @async is PROXY by default, so when the caller and the called are in the same class, the section cannot be generated, and @Async is not managed by the Spring container. So this method has been running in sync for so long.

We can write a method to test that.


    public void  asyncInvalid(a) {
        try {
            log.info("service start");
            asyncInvalidExample();
            log.info("service end");
        }catch(InterruptedException e){ e.printStackTrace(); }}@Async
    public void  asyncInvalidExample(a) throws InterruptedException{
        Thread.sleep(10);
        log.info(Thread.currentThread().getName()+": Processing completed");
    }


Copy the code

The result of the call is clearly that the operation is not asynchronous, but synchronous.

Thread pool rejection causes thread loss

Since thread pools already have a buffer queue to hold unconsumed tasks, there must be cases where the queue is filled and threads are lost. So let’s write some code to simulate that.

The configuration file

spring:
  task:
    execution:
      pool:
        # Maximum number of threads
        max-size: 16
        Number of core threads
        core-size: 16
        # Survival time
        keep-alive: 10s
        # queue size
        queue-capacity: 100
        Whether to allow core threads to timeout
        allow-core-thread-timeout: true
      # thread name prefix
      thread-name-prefix: async-task-

Copy the code

Asynchronous methods


    @Async
    public void  asyncRefuseRun(a) throws InterruptedException {
        Thread.sleep(5000000);
    }
Copy the code

A method is called



    public void  asyncRefuseRun(a) {
        for (int t = 0; t<2000; t++){ log.info(""+t);
            try {
                asyncTask.asyncRefuseRun();
            }catch(InterruptedException e){ e.printStackTrace(); }}}Copy the code

Here I loop 2000 threads, which in theory will be rejected when the thread reaches maxPoolSize + queueCapacity, which is 16+100.

By the 116 were thrown when the Java exception. Util. Concurrent. RejectedExecutionException. Proves that the thread has executed its rejection policy.

To understand a thread pool’s rejection policy, look at its interface.


public interface RejectedExecutionHandler {
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

Copy the code

When a thread pool rejects, the current submitted task and the thread pool instance itself are passed to you for processing. You are advised to implement the rejection policy based on your own business scenario.

Of course, if the JDK built-in implementation can meet the current business, you can directly use JDK implementation.

AbortPolicy (AbortPolicy)

This abort policy is the same one we just demonstrated, which, when triggered, aborts the task and throws an exception, and is the default implementation of ThreadPoolExecutor.

   /**
     * A handler for rejected tasks that throws a
     * {@code RejectedExecutionException}.
     */
    public static class AbortPolicy implements RejectedExecutionHandler {
        /**
         * Creates an {@code AbortPolicy}.
         */
        public AbortPolicy(a) {}/**
         * Always throws RejectedExecutionException.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         * @throws RejectedExecutionException always
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from "+ e.toString()); }}Copy the code

DiscardPolicy

    /** * A handler for rejected tasks that silently discards the * rejected task. */
    public static class DiscardPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code DiscardPolicy}.
         */
        public DiscardPolicy(a) {}/**
         * Does nothing, which has the effect of discarding task r.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {}}Copy the code

Obviously, doing nothing is an empty implementation.

DiscardOldestPolicy discarding old policy

    /**
     * A handler for rejected tasks that discards the oldest unhandled
     * request and then retries {@code execute}, unless the executor
     * is shut down, in which case the task is discarded.
     */
    public static class DiscardOldestPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code DiscardOldestPolicy} for the given executor.
         */
        public DiscardOldestPolicy(a) {}/**
         * Obtains and ignores the next task that the executor
         * would otherwise execute, if one is immediately available,
         * and then retries execution of task r, unless the executor
         * is shut down, in which case task r is instead discarded.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if(! e.isShutdown()) { e.getQueue().poll(); e.execute(r); }}}Copy the code

If the thread pool is not closed, pop the element in the queue header and try to execute. The task is actually still discarded, and if the header element fails, it’s discarded. The difference is that the older elements are discarded first.

CallerRunsPolicy (CallerRun Policy)

    /**
     * A handler for rejected tasks that runs the rejected task
     * directly in the calling thread of the {@code execute} method,
     * unless the executor has been shut down, in which case the task
     * is discarded.
     */
    public static class CallerRunsPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code CallerRunsPolicy}.
         */
        public CallerRunsPolicy(a) {}/**
         * Executes task r in the caller's thread, unless the executor
         * has been shut down, in which case the task is discarded.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if(! e.isShutdown()) { r.run(); }}}Copy the code

When the reject policy is triggered, it determines whether the thread pool is closed or not, which is handled by the current thread submitting the task. However, when a large number of commits are committed, the thread blocks, resulting in poor performance.

Thread pool rejects policy implementation in Hutool

Let’s take a look at the implementation of hutool, a common tool class that also has a thread pool tool.


	/** * Build ThreadPoolExecutor **@param builder {@link ExecutorBuilder}
	 * @return {@link ThreadPoolExecutor}
	 */
	private static ThreadPoolExecutor build(ExecutorBuilder builder) {
		final int corePoolSize = builder.corePoolSize;
		final int maxPoolSize = builder.maxPoolSize;
		final long keepAliveTime = builder.keepAliveTime;
		final BlockingQueue<Runnable> workQueue;
		if (null! = builder.workQueue) { workQueue = builder.workQueue; }else {
			SynchronousQueue is used when corePoolSize is 0 to avoid infinite blocking
			workQueue = (corePoolSize <= 0)?new SynchronousQueue<>() : new LinkedBlockingQueue<>(DEFAULT_QUEUE_CAPACITY);
		}
		final ThreadFactory threadFactory = (null! = builder.threadFactory) ? builder.threadFactory : Executors.defaultThreadFactory(); RejectedExecutionHandler handler = ObjectUtil.defaultIfNull(builder.handler,new ThreadPoolExecutor.AbortPolicy());

		final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(//
				corePoolSize, //
				maxPoolSize, //
				keepAliveTime, TimeUnit.NANOSECONDS, //
				workQueue, //
				threadFactory, //
				handler//
		);
		if (null! = builder.allowCoreThreadTimeOut) { threadPoolExecutor.allowCoreThreadTimeOut(builder.allowCoreThreadTimeOut); }return threadPoolExecutor;
	}

Copy the code

As you can clearly see, the thread pool rejection policy is passed, and if not, the default AbortPolicy is used.

The rejection strategy in Dubbo

public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy {

    protected static final Logger logger = LoggerFactory.getLogger(AbortPolicyWithReport.class);

    private final String threadName;

    private final URL url;

    private static volatile long lastPrintTime = 0;

    private static Semaphore guard = new Semaphore(1);

    public AbortPolicyWithReport(String threadName, URL url) {
        this.threadName = threadName;
        this.url = url;
    }

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        String msg = String.format("Thread pool is EXHAUSTED!" +
                        " Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d)," +
                        " Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!",
                threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(),
                e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(),
                url.getProtocol(), url.getIp(), url.getPort());
        logger.warn(msg);
        dumpJStack();
        throw new RejectedExecutionException(msg);
    }

    private void dumpJStack(a) {
       // Omit the implementation}}Copy the code

Dubbo’s policy implementation is all about letting developers know when tasks are rejected and why. It starts with the detailed setting parameters of the thread pool, the current state of the thread pool, and information about the current rejected task. Then output the current thread stack details in dumpJStack implementation, finally throw RejectedExecutionException.

Thread pool rejection policy in Netty

    private static final class NewThreadRunsPolicy implements RejectedExecutionHandler {
        NewThreadRunsPolicy() {
            super(a); }public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            try {
                final Thread t = new Thread(r, "Temporary task executor");
                t.start();
            } catch (Throwable e) {
                throw new RejectedExecutionException(
                        "Failed to start a new thread", e); }}}Copy the code

Netty’s thread pool rejection policy is similar to the CallerRunsPolicy policy, which does not discard the task but continues processing the task. The difference is that the CallerRunsPolicy policy continues processing the task after the calling thread. Netty is a new thread to process.

Thread pool rejection policy in activeMq


 new RejectedExecutionHandler() {
                @Override
                public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) {
                    try {
                        executor.getQueue().offer(r, 60, TimeUnit.SECONDS);
                    } catch (InterruptedException e) {
                        throw new RejectedExecutionException("Interrupted waiting for BrokerService.worker");
                    }

                    throw new RejectedExecutionException("Timed Out while attempting to enqueue Task."); }});Copy the code

The policy in activeMq belongs to the maximum effort task execution type. When the rejection policy is triggered, the task will be re-inserted into the task queue within one minute. When the one-minute timeout fails, an exception will be thrown.

Monitoring thread pools

In development, the running state of our thread pool, the thread state, is very important to us. So we should monitor the thread pool. We can add new operations before or after execution by extending beforeExecute, afterExecute, and terminated methods. Used to log thread pools.

methods meaning
shutdown() When the thread pool is closed (waiting for all tasks in the thread pool to complete execution), the number of executed tasks, executing tasks, and unexecuted tasks is counted
shutdownNow() Before the task is executed, record the start time of the task. The startTimes HashMap takes the hashCode of the task as the key and the start time as the value
beforeExecute(Thread t, Runnable r) When the thread pool is closed (waiting for all tasks in the thread pool to complete execution), the number of executed tasks, executing tasks, and unexecuted tasks is counted
afterExecute(Runnable r, Throwable t) After the task is executed, calculate the end time of the task. Collect statistics on task time, number of initial threads, number of core threads, number of executing tasks, number of completed tasks, total number of tasks, number of cached tasks in the queue, maximum number of threads in the pool, maximum number of allowed threads, idle time of threads, whether the thread pool is closed, and whether the thread pool is terminated

package com.example.threadpool;

import lombok.extern.slf4j.Slf4j;

import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

/ * * *@author kurtl
 */
@Slf4j
public class ThreadPoolMonitor extends ThreadPoolExecutor {


    /** * Saves the start time of the task. When the task is complete, the end time is subtracted from the start time to calculate the task execution time */
    private final ConcurrentHashMap<String, Date> startTimes;

    /** * Thread pool name, usually named with the service name to distinguish */
    private final String poolName;

    /** * calls the constructor of the parent class and initializes the HashMap and thread pool name **@paramCorePoolSize Number of core threads in the thread pool *@paramMaximumPoolSize Maximum number of threads in the thread pool *@paramKeepAliveTime Maximum idle time of a thread *@paramUnit Unit of idle time *@paramWorkQueue Holds the queue of submitted tasks *@paramPoolName specifies the thread poolName */
    public ThreadPoolMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                             TimeUnit unit, BlockingQueue<Runnable> workQueue, String poolName) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                Executors.defaultThreadFactory(), poolName);
    }


    /** * calls the constructor of the parent class and initializes the HashMap and thread pool name **@paramCorePoolSize Number of core threads in the thread pool *@paramMaximumPoolSize Maximum number of threads in the thread pool *@paramKeepAliveTime Maximum idle time of a thread *@paramUnit Unit of idle time *@paramWorkQueue Holds the queue of submitted tasks *@paramThreadFactory specifies the threadFactory@paramPoolName specifies the thread poolName */
    public ThreadPoolMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                             TimeUnit unit, BlockingQueue<Runnable> workQueue,
                             ThreadFactory threadFactory, String poolName) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
        this.startTimes = new ConcurrentHashMap<>();
        this.poolName = poolName;
    }

    /** * Count thread pool status */ when a thread pool is delayed (waiting for all tasks in the pool to complete)
    @Override
    public void shutdown(a) {
        // Count the number of executed tasks, executing tasks, and unexecuted tasks
        log.info("{} Going to shutdown. Executed tasks: {}, Running tasks: {}, Pending tasks: {}".this.poolName, this.getCompletedTaskCount(), this.getActiveCount(), this.getQueue().size());
        super.shutdown();
    }

    /** * Count thread pool status when the thread pool is immediately closed */
    @Override
    public List<Runnable> shutdownNow(a) {
        // Count the number of executed tasks, executing tasks, and unexecuted tasks
        log.info("{} Going to immediately shutdown. Executed tasks: {}, Running tasks: {}, Pending tasks: {}".this.poolName, this.getCompletedTaskCount(), this.getActiveCount(), this.getQueue().size());
        return super.shutdownNow();
    }

    /** * Before executing the task, record the start time of the task */
    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        startTimes.put(String.valueOf(r.hashCode()), new Date());
    }

    /** * After the task is executed, the end time of the task is calculated */
    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        Date startDate = startTimes.remove(String.valueOf(r.hashCode()));
        Date finishDate = new Date();
        long diff = finishDate.getTime() - startDate.getTime();
        // Count the task time, number of initial threads, number of core threads, number of tasks being executed,
        // Number of completed tasks, total tasks, number of cached tasks in the queue, maximum number of threads in the pool,
        // Maximum number of threads allowed, idle time, whether the thread pool is closed, whether the thread pool is terminated
        log.info("{}-pool-monitor: " +
                        "Duration: {} ms, PoolSize: {}, CorePoolSize: {}, Active: {}, " +
                        "Completed: {}, Task: {}, Queue: {}, LargestPoolSize: {}, " +
                        "MaximumPoolSize: {}, KeepAliveTime: {}, isShutdown: {}, isTerminated: {}".this.poolName,
                diff, this.getPoolSize(), this.getCorePoolSize(), this.getActiveCount(),
                this.getCompletedTaskCount(), this.getTaskCount(), this.getQueue().size(), this.getLargestPoolSize(),
                this.getMaximumPoolSize(), this.getKeepAliveTime(TimeUnit.MILLISECONDS), this.isShutdown(), this.isTerminated());
    }

    / * * * create fixed thread pool, code from the Executors. NewFixedThreadPool method, increased poolName here * *@paramNThreads Number of threads *@paramPoolName specifies the thread poolName *@returnThe ExecutorService object * /
    public static ExecutorService newFixedThreadPool(int nThreads, String poolName) {
        return new ThreadPoolMonitor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), poolName);
    }

    / * * * create cache type thread pool, code from the Executors. NewCachedThreadPool method, increased poolName here * *@paramPoolName specifies the thread poolName *@returnThe ExecutorService object * /
    public static ExecutorService newCachedThreadPool(String poolName) {
        return new ThreadPoolMonitor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), poolName);
    }

    /** * The thread used to generate the thread pool just overwrites the default thread factory of the thread pool, passing in the thread pool name to facilitate problem tracking */
    static class EventThreadFactory implements ThreadFactory {
        private static final AtomicInteger POOL_NUMBER = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        /** * Initializes the thread factory **@paramPoolName specifies the thread poolName */
        EventThreadFactory(String poolName) {
            SecurityManager s = System.getSecurityManager();
            group = Objects.nonNull(s) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
            namePrefix = poolName + "-pool-" + POOL_NUMBER.getAndIncrement() + "-thread-";
        }

        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
            if (t.isDaemon())
                t.setDaemon(false);
            if(t.getPriority() ! = Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY);returnt; }}}Copy the code

The core issue of thread pool load is whether there are enough resources allocated based on the current thread pool parameters. We can approach this question from both antecedent and middle perspectives. Beforehand, thread pool, defines the concept of “active” to let the user able to perceive the thread pool before occurrence Reject abnormal load problem, the thread pool activity calculation formula is: thread pool activity = activeCount/maximumPoolSize. This formula represents a higher thread load as the number of active threads approaches maximumPoolSize. There is a Reject exception, and there is a waiting task in the queue (custom threshold is supported). In both cases, an alarm will be triggered, and the alarm information will be pushed to the person in charge of the service through the elephant. — Technical documents of Meituan

Number of core threads How to set the maximum number of threads

How to properly configure the thread pool parameters, the more common statement is.

IO intensive = 2Ncpu (you can control the size after testing, 2Ncpu is generally ok) (common in threads: database data interaction, file upload and download, network data transfer, etc.)

Computation-intensive = Ncpu (common in threads: complex algorithms)

However, this scheme does not consider the case of multi-threaded pool, and there are deviations from the actual use.

Photo from Meituan technology blog

Therefore, the parameter Settings should be customized according to actual application scenarios.

Use of multithreaded pools

Generally, in real business, we define different thread pools to handle different business. Using the ThreadPoolMonitor we completed earlier, you can quickly define different threads.

ThreadPoolConfig


@EnableAsync
@Configuration
public class ThreadPoolConfig {

    @Bean
    public ThreadPoolExecutor test01(a){
        return new ThreadPoolMonitor(16.32.60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100),"test01");
    }

    @Bean
    public ThreadPoolExecutor test02(a){
        return new ThreadPoolMonitor(8.16.60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100),"test02"); }}Copy the code

TODO

Dynamic thread pool 2. Task-based thread pool monitoring

The author is of limited level, please point out any errors and omissions.

Refer to the article

1. Implementation principle of Java thread pool and its practice in Meituan business

2.Java concurrency (vi) Thread pool monitoring

3. A Java thread pool misuse (newFixedThreadPool) caused by online blood and summary