Which Async thread pool uses?

preface

In Spring, we often use asynchronous operations. You can use @ EnableAsync and @ Async in annotations. However, it is recently found that the thread number in asynchronous mode uses the thread pool ThreadPoolTaskExecutor customized in our project instead of the previously familiar SimpleAsyncTaskExecutor

Let's take a look at his execution process.

text

  1. First of all, to make asynchrony effective, we need to add @ EnableAsync to the startup class, and then click it. It will use @ Import to inject an AsyncConfigurationSelector class. The parent class determines that it uses the configuration class ProxyAsyncConfiguration when starting.
public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {
    private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME = "org.springframework.scheduling.aspectj.AspectJAsyncConfiguration";

    public AsyncConfigurationSelector() {
    }

    @Nullable
    public String[] selectImports(AdviceMode adviceMode) {
        switch(adviceMode) {
        case PROXY:
            return new String[]{ProxyAsyncConfiguration.class.getName()};
        case ASPECTJ:
            return new String[]{"org.springframework.scheduling.aspectj.AspectJAsyncConfiguration"};
        default:
            return null;
        }
    }
}
copy
  1. Click Open to see an AsyncAnnotationBeanPostProcessor injected. It implements the BeanPostProcessor interface, so it is a post processor used to apply the Spring AOP Advisor to a given bean. Thus, the methods defined by the asynchronous annotation on the bean will be called asynchronously.
public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {
    public ProxyAsyncConfiguration() {
    }

    @Bean(
        name = {"org.springframework.context.annotation.internalAsyncAnnotationProcessor"}
    )
    @Role(2)
    public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
        Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");
        AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
        bpp.configure(this.executor, this.exceptionHandler);
        Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");
        if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {
            bpp.setAsyncAnnotationType(customAsyncAnnotation);
        }

        bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));
        bpp.setOrder((Integer)this.enableAsync.getNumber("order"));
        return bpp;
    }
}
copy
  1. The parent class of AsyncAnnotationBeanPostProcessor implements BeanFactoryAware. After the AsyncAnnotationBeanPostProcessor is instantiated, setBeanFactory() will be called back to instantiate the Aspect AsyncAnnotationAdvisor.
public void setBeanFactory(BeanFactory beanFactory) {
    super.setBeanFactory(beanFactory);
    //Define a slice
    AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
    if (this.asyncAnnotationType != null) {
        advisor.setAsyncAnnotationType(this.asyncAnnotationType);
    }

    advisor.setBeanFactory(beanFactory);
    this.advisor = advisor;
}
copy
  1. AsyncAnnotationAdvisor constructs and declares pointcuts and code enhancements (notifications).
	public AsyncAnnotationAdvisor(
			@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {

		Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2);
		asyncAnnotationTypes.add(Async.class);
		try {
			asyncAnnotationTypes.add((Class<? extends Annotation>)
					ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader()));
		}
		catch (ClassNotFoundException ex) {
			// If EJB 3.1 API not present, simply ignore.
		}
        //notice
		this.advice = buildAdvice(executor, exceptionHandler);
        //breakthrough point
		this.pointcut = buildPointcut(asyncAnnotationTypes);
	}
copy
  1. Notice is the final implementation. buildAdvice is used to build notifications, mainly to create an interceptor of type AnnotationAsyncExecutionInterceptor, and configure the executor and exception handler used. The real asynchronous execution code is in AsyncExecutionAspectSupport!
protected Advice buildAdvice(
			@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {

		AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);
		//Configuring Interceptors 
		interceptor.configure(executor, exceptionHandler);
		return interceptor;
	}
copy
  1. Configure interceptors. Configure custom executors and exception handlers through parameters or use default executors and exception handlers.
public void configure(@Nullable Supplier<Executor> defaultExecutor,
			@Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {

    	//Default actuator
		this.defaultExecutor = new SingletonSupplier<>(defaultExecutor, () -> getDefaultExecutor(this.beanFactory));
		this.exceptionHandler = new SingletonSupplier<>(exceptionHandler, SimpleAsyncUncaughtExceptionHandler::new);
	}
copy
  1. The getDefaultExecutor() method is used to find the default executor. The parent class AsyncExecutionAspectSupport first looks for the only executor of the TaskExecutor type and returns it. If there are multiple executors, it looks for the default executor taskExecutor. If it cannot be found, it returns null directly. The subclass AsyncExecutionInterceptor overrides the getDefaultExecutor method. First, call the parent class logic. If null is returned, configure an executor named SimpleAsyncTaskExecutor
/**
 * Parent class
 * Gets or builds the default executor for this notification instance
 * The actuator returned here will be cached for subsequent use
 * By default, the bean that searches for the unique TaskExecutor is implemented
 * In the context, it is used for the Executor bean named "taskExecutor".
 * If neither is resolvable, this implementation will return null
 */
@Nullable
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
	if (beanFactory != null) {
		try {
			// Search for a unique TaskExecutor type bean and return
			return beanFactory.getBean(TaskExecutor.class);
		}
		catch (NoUniqueBeanDefinitionException ex) {
            //If the only bean exception cannot be found, search for a "taskExecutor" bean of TaskExecutor type and return
			logger.debug("Could not find unique TaskExecutor bean", ex);
			try {
				return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
			}
			catch (NoSuchBeanDefinitionException ex2) {
				if (logger.isInfoEnabled()) {
					logger.info("More than one TaskExecutor bean found within the context, and none is named " +
							"'taskExecutor'. Mark one of them as primary or name it 'taskExecutor' (possibly " +
							"as an alias) in order to use it for async processing: " + ex.getBeanNamesFound());
				}
			}
		}
		catch (NoSuchBeanDefinitionException ex) {
            //When no exception is found, search for a "taskExecutor" bean of TaskExecutor type and return
			logger.debug("Could not find default TaskExecutor bean", ex);
			try {
				return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
			}
			catch (NoSuchBeanDefinitionException ex2) {
				logger.info("No task executor bean found for async processing: " +
						"no bean of type TaskExecutor and no bean named 'taskExecutor' either");
			}
			// Giving up -> either using local default executor or none at all...
		}
	}
	return null;
}


/**
 * Subclass
 * If the parent class is null, re instantiate an executor named SimpleAsyncTaskExecutor
 */
@Override
@Nullable
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
	Executor defaultExecutor = super.getDefaultExecutor(beanFactory);
	return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor());
}
copy

Therefore, at this stage, we can understand why the asynchronous thread name is SimpleAsyncTaskExecutor xx by default, and why it is possible to use its own thread pool configuration asynchronously with its own thread pool.

After we have this entry point, AsyncExecutionInterceptor#invoke() will be executed every time the request interface executes an asynchronous method. The determineAsyncExecutor is used to decide which executor to use

@Nullable
protected AsyncTaskExecutor determineAsyncExecutor(Method method) {
    //Select an executor corresponding to the method from the cached executors
    AsyncTaskExecutor executor = (AsyncTaskExecutor)this.executors.get(method);
    if (executor == null) {
        //Get the value in the @ Async annotation (the specified executor)
        String qualifier = this.getExecutorQualifier(method);
        Executor targetExecutor;
        if (StringUtils.hasLength(qualifier)) {
            //Get the bean of the specified executor
            targetExecutor = this.findQualifiedExecutor(this.beanFactory, qualifier);
        } else {
            //Select the default actuator
            targetExecutor = (Executor)this.defaultExecutor.get();
        }

        if (targetExecutor == null) {
            return null;
        }

        executor = targetExecutor instanceof AsyncListenableTaskExecutor ? (AsyncListenableTaskExecutor)targetExecutor : new TaskExecutorAdapter(targetExecutor);
        //Cache the actuator
        this.executors.put(method, executor);
    }

    return (AsyncTaskExecutor)executor;
}
copy

When there is an executor, call the doSubmit method to add tasks to the executor.

Asynchronous task, the SimpleAsyncTaskExecutor will be used as the executor by default! It has the following characteristics:

Do not reuse threads, that is, create a new thread for each task. However, the number of concurrent threads can be controlled through the concurrencyLimit attribute, but there is no limit by default (the concurrencyLimit value is - 1).

Therefore, if we use asynchronous tasks, we must not use the default executor configuration to prevent OOM exceptions! The best way is to specify the actuator!

summary

This article mainly uses the source code to understand how the asynchronous annotation @ Async selects and uses threads in the project, and tries to assign a unique thread pool to the asynchronous task, which will avoid the impact of not sharing the thread pool with other businesses.

Tags: Java

Posted by mounika on Tue, 27 Sep 2022 01:54:56 +0930