preface
In Spring, we often use asynchronous operations. You can use @ EnableAsync and @ Async in annotations. However, it is recently found that the thread number in asynchronous mode uses the thread pool ThreadPoolTaskExecutor customized in our project instead of the previously familiar SimpleAsyncTaskExecutor
Let's take a look at his execution process.
text
- First of all, to make asynchrony effective, we need to add @ EnableAsync to the startup class, and then click it. It will use @ Import to inject an AsyncConfigurationSelector class. The parent class determines that it uses the configuration class ProxyAsyncConfiguration when starting.
copypublic class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> { private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME = "org.springframework.scheduling.aspectj.AspectJAsyncConfiguration"; public AsyncConfigurationSelector() { } @Nullable public String[] selectImports(AdviceMode adviceMode) { switch(adviceMode) { case PROXY: return new String[]{ProxyAsyncConfiguration.class.getName()}; case ASPECTJ: return new String[]{"org.springframework.scheduling.aspectj.AspectJAsyncConfiguration"}; default: return null; } } }
- Click Open to see an AsyncAnnotationBeanPostProcessor injected. It implements the BeanPostProcessor interface, so it is a post processor used to apply the Spring AOP Advisor to a given bean. Thus, the methods defined by the asynchronous annotation on the bean will be called asynchronously.
copypublic class ProxyAsyncConfiguration extends AbstractAsyncConfiguration { public ProxyAsyncConfiguration() { } @Bean( name = {"org.springframework.context.annotation.internalAsyncAnnotationProcessor"} ) @Role(2) public AsyncAnnotationBeanPostProcessor asyncAdvisor() { Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected"); AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor(); bpp.configure(this.executor, this.exceptionHandler); Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation"); if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) { bpp.setAsyncAnnotationType(customAsyncAnnotation); } bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass")); bpp.setOrder((Integer)this.enableAsync.getNumber("order")); return bpp; } }
- The parent class of AsyncAnnotationBeanPostProcessor implements BeanFactoryAware. After the AsyncAnnotationBeanPostProcessor is instantiated, setBeanFactory() will be called back to instantiate the Aspect AsyncAnnotationAdvisor.
copypublic void setBeanFactory(BeanFactory beanFactory) { super.setBeanFactory(beanFactory); //Define a slice AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler); if (this.asyncAnnotationType != null) { advisor.setAsyncAnnotationType(this.asyncAnnotationType); } advisor.setBeanFactory(beanFactory); this.advisor = advisor; }
- AsyncAnnotationAdvisor constructs and declares pointcuts and code enhancements (notifications).
copypublic AsyncAnnotationAdvisor( @Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) { Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2); asyncAnnotationTypes.add(Async.class); try { asyncAnnotationTypes.add((Class<? extends Annotation>) ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader())); } catch (ClassNotFoundException ex) { // If EJB 3.1 API not present, simply ignore. } //notice this.advice = buildAdvice(executor, exceptionHandler); //breakthrough point this.pointcut = buildPointcut(asyncAnnotationTypes); }
- Notice is the final implementation. buildAdvice is used to build notifications, mainly to create an interceptor of type AnnotationAsyncExecutionInterceptor, and configure the executor and exception handler used. The real asynchronous execution code is in AsyncExecutionAspectSupport!
copyprotected Advice buildAdvice( @Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) { AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null); //Configuring Interceptors interceptor.configure(executor, exceptionHandler); return interceptor; }
- Configure interceptors. Configure custom executors and exception handlers through parameters or use default executors and exception handlers.
copypublic void configure(@Nullable Supplier<Executor> defaultExecutor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) { //Default actuator this.defaultExecutor = new SingletonSupplier<>(defaultExecutor, () -> getDefaultExecutor(this.beanFactory)); this.exceptionHandler = new SingletonSupplier<>(exceptionHandler, SimpleAsyncUncaughtExceptionHandler::new); }
- The getDefaultExecutor() method is used to find the default executor. The parent class AsyncExecutionAspectSupport first looks for the only executor of the TaskExecutor type and returns it. If there are multiple executors, it looks for the default executor taskExecutor. If it cannot be found, it returns null directly. The subclass AsyncExecutionInterceptor overrides the getDefaultExecutor method. First, call the parent class logic. If null is returned, configure an executor named SimpleAsyncTaskExecutor
copy/** * Parent class * Gets or builds the default executor for this notification instance * The actuator returned here will be cached for subsequent use * By default, the bean that searches for the unique TaskExecutor is implemented * In the context, it is used for the Executor bean named "taskExecutor". * If neither is resolvable, this implementation will return null */ @Nullable protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) { if (beanFactory != null) { try { // Search for a unique TaskExecutor type bean and return return beanFactory.getBean(TaskExecutor.class); } catch (NoUniqueBeanDefinitionException ex) { //If the only bean exception cannot be found, search for a "taskExecutor" bean of TaskExecutor type and return logger.debug("Could not find unique TaskExecutor bean", ex); try { return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class); } catch (NoSuchBeanDefinitionException ex2) { if (logger.isInfoEnabled()) { logger.info("More than one TaskExecutor bean found within the context, and none is named " + "'taskExecutor'. Mark one of them as primary or name it 'taskExecutor' (possibly " + "as an alias) in order to use it for async processing: " + ex.getBeanNamesFound()); } } } catch (NoSuchBeanDefinitionException ex) { //When no exception is found, search for a "taskExecutor" bean of TaskExecutor type and return logger.debug("Could not find default TaskExecutor bean", ex); try { return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class); } catch (NoSuchBeanDefinitionException ex2) { logger.info("No task executor bean found for async processing: " + "no bean of type TaskExecutor and no bean named 'taskExecutor' either"); } // Giving up -> either using local default executor or none at all... } } return null; } /** * Subclass * If the parent class is null, re instantiate an executor named SimpleAsyncTaskExecutor */ @Override @Nullable protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) { Executor defaultExecutor = super.getDefaultExecutor(beanFactory); return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor()); }
Therefore, at this stage, we can understand why the asynchronous thread name is SimpleAsyncTaskExecutor xx by default, and why it is possible to use its own thread pool configuration asynchronously with its own thread pool.
After we have this entry point, AsyncExecutionInterceptor#invoke() will be executed every time the request interface executes an asynchronous method. The determineAsyncExecutor is used to decide which executor to use
copy@Nullable protected AsyncTaskExecutor determineAsyncExecutor(Method method) { //Select an executor corresponding to the method from the cached executors AsyncTaskExecutor executor = (AsyncTaskExecutor)this.executors.get(method); if (executor == null) { //Get the value in the @ Async annotation (the specified executor) String qualifier = this.getExecutorQualifier(method); Executor targetExecutor; if (StringUtils.hasLength(qualifier)) { //Get the bean of the specified executor targetExecutor = this.findQualifiedExecutor(this.beanFactory, qualifier); } else { //Select the default actuator targetExecutor = (Executor)this.defaultExecutor.get(); } if (targetExecutor == null) { return null; } executor = targetExecutor instanceof AsyncListenableTaskExecutor ? (AsyncListenableTaskExecutor)targetExecutor : new TaskExecutorAdapter(targetExecutor); //Cache the actuator this.executors.put(method, executor); } return (AsyncTaskExecutor)executor; }
When there is an executor, call the doSubmit method to add tasks to the executor.
Asynchronous task, the SimpleAsyncTaskExecutor will be used as the executor by default! It has the following characteristics:
Do not reuse threads, that is, create a new thread for each task. However, the number of concurrent threads can be controlled through the concurrencyLimit attribute, but there is no limit by default (the concurrencyLimit value is - 1).
Therefore, if we use asynchronous tasks, we must not use the default executor configuration to prevent OOM exceptions! The best way is to specify the actuator!
summary
This article mainly uses the source code to understand how the asynchronous annotation @ Async selects and uses threads in the project, and tries to assign a unique thread pool to the asynchronous task, which will avoid the impact of not sharing the thread pool with other businesses.