你知道 @Async 是怎么让方法异步执行的吗?

在阅读本文之前,你可以通过 Creating Asynchronous Methods 指导来体验下创建异步方法的使用方式。

为什么要写这篇文章,本质上对于这些 Spring 已经封装好的能力,并不需要去关注它底层到底是怎么玩的,比如 @Async,你肯定可以猜到对于打了这个注解的方法(或者类),在执行这个方法(或者类下所有方法)时,Spring 框架会将当前方法丢进到一个单独的线程池中去执行,以达到方法异步执行的目的。

本篇文章的原始诉求来自于需要对 @Async 描述的方法进行 trace 埋点,当前大多数基于线程上下文传递 traceContext 的方式显然对于跨线程问题是不能满足的,需要特殊的处理;那么就需要对这些技术点进行剖析,以寻求切入点。

前言

@Async 是通过注解标记来开启方法的异步执行的;对于注解的底层实现,除了 java 原生提供那种依赖编译期植入的之外,其他的基本都差不多,即运行时通过反射等方式拦截到打了注解的类或者方法,然后执行时进行横切拦截;另外这里还有一个点就是方法异步执行,所以对于 @Async 的剖析,就一定绕不开两个基本的知识点,就是代理和线程池。
在了解到这些之后,我们来拆解下 @Async 的基本原理。

如何开启生效?

The @EnableAsync annotation switches on Spring’s ability to run @Async methods in a background thread pool.

通过 @EnableAsync 来开启异步方法的能力。

1
2
3
4
5
6
7
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync {
// ...`
}

@EnableAsync 注解 Import 了 AsyncConfigurationSelector,这个在 SpringBoot 中是非常常见的一种写法,这里需要关注的是选择了哪个自动配置类;adviceMode 默认是 false,这里就以 ProxyAsyncConfiguration 为例:

1
2
3
4
5
6
7
8
9
10
11
12
@Override
@Nullable
public String[] selectImports(AdviceMode adviceMode) {
switch (adviceMode) {
case PROXY:
return new String[] {ProxyAsyncConfiguration.class.getName()};
case ASPECTJ:
return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};
default:
return null;
}
}

AsyncAnnotationBeanPostProcessor

org.springframework.scheduling.annotation.ProxyAsyncConfiguration中最主要的就是创建 AsyncAnnotationBeanPostProcessor,从名字看,AsyncAnnotationBeanPostProcessor 就是来处理 @Async 注解的;目的很明确,就是创建对应 bean 的代理对象,以便于执行方法时能够进行 AOP 拦截(具体细节可以看 org.springframework.aop.framework.AbstractAdvisingBeanPostProcessor#postProcessAfterInitialization这个方法)。

1
2
3
4
5
6
ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName);
if (!proxyFactory.isProxyTargetClass()) {
evaluateProxyInterfaces(bean.getClass(), proxyFactory);
}
proxyFactory.addAdvisor(this.advisor);
customizeProxyFactory(proxyFactory);

AnnotationAsyncExecutionInterceptor

这里涉及到 AOP 的一些基础知识,可以查阅之前写的 https://juejin.cn/post/6844903623101513735 这篇文章

AOP 中最外层的是代理类,然后是织入器(advisor),再接着是切面(advice he PointCut);前面已经将创建代理对象的逻辑进行了介绍,所以接下来是织入器(advisor)和切面的创建。实际上织入器(advisor)的创建逻辑也是在 AsyncAnnotationBeanPostProcessor 中完成的。

1
2
3
4
5
6
7
8
9
10
11
@Override
public void setBeanFactory(BeanFactory beanFactory) {
super.setBeanFactory(beanFactory);
// 创建 advisor
AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
if (this.asyncAnnotationType != null) {
advisor.setAsyncAnnotationType(this.asyncAnnotationType);
}
advisor.setBeanFactory(beanFactory);
this.advisor = advisor;
}

在 AsyncAnnotationAdvisor 的构造函数中,会构建 Advice 和 Pointcut

1
2
3
4
5
6
7
8
9
public AsyncAnnotationAdvisor(
@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
// 省略其他代码
/// ...
// 创建 advice
this.advice = buildAdvice(executor, exceptionHandler);
// 创建 pointcut
this.pointcut = buildPointcut(asyncAnnotationTypes);
}

Advice 就是具体执行拦截的逻辑,这里的 advice 实际上 AnnotationAsyncExecutionInterceptor(why ? 因饰Advice 是 MethodInterceptor 的父类)。

1
2
3
4
5
6
7
protected Advice buildAdvice(
@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
// 这里
AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);
interceptor.configure(executor, exceptionHandler);
return interceptor;
}

到这里,关于 @EnableAsync 是如何开启创建异步方法的逻辑基本就介绍完了;本质上还是 Spring AOP 的那套逻辑。

Tips

除了 adviceMode,一般情况下还会涉及到另外一个参数,即 proxyTargetClass;proxyTargetClass 在设置为 true 和 false 时,对应使用的代理机制大致如下:

  • true

    • 目标对象实现了接口 – 使用 CGLIB 代理机制
    • 目标对象没有接口(只有实现类) – 使用 CGLIB 代理机制
  • false

    • 目标对象实现了接口 – 使用 JDK 动态代理机制(代理所有实现了的接口)
    • 目标对象没有接口(只有实现类) – 使用 CGLIB 代理机制

线程池

上一小节中,对 @EnableAsync 生效机制和对应的 AOP 对象创建逻辑进行了介绍;实际上 AOP 拦截到具体的方法之后的主要目的就是将执行逻辑丢到线程池中去执行。那这里就会涉及到本节的主题,即线程池。本节需要搞清楚几个问题:

  • 什么时候创建的线程池?
  • 创建的线程池类型是啥?
  • 方法执行任务是如何被提交的?

创建 AnnotationAsyncExecutionInterceptor 时初始化线程池

线程池的创建是在创建 AnnotationAsyncExecutionInterceptor 对象时完成,代码如下:

1
2
3
public AnnotationAsyncExecutionInterceptor(@Nullable Executor defaultExecutor) {
super(defaultExecutor);
}

在其父类 AsyncExecutionAspectSupport 中完成具体线程池创建

1
this.defaultExecutor = new SingletonSupplier<>(defaultExecutor, () -> getDefaultExecutor(this.beanFactory));

在 getDefaultExecutor 方法中, 会先从 Spring 容器找 TaskExecutor 类型的线程池 Bean,如果找不到,会扩大范围找 Executor 类型的线程池 Bean,如果找不到,则返回 null。

这里是个延迟载入的操作,即只有当异步方法被调用时,才会触发 SingletonSupplier get 操作,从而触发 getBean 的逻辑,如果你在 debug 时出现没有正常走到断点的情况,可以关注下这个场景。

默认线程池 SimpleAsyncTaskExecutor

1
2
3
4
5
6
@Override
@Nullable
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
Executor defaultExecutor = super.getDefaultExecutor(beanFactory);
return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor());
}

从这段逻辑看,如果从 Spring 容器中没有找到对应的线程池 Bean,那么就创建 SimpleAsyncTaskExecutor 作为默认的线程池。

This class also customizes the Executor by defining a new bean. Here, the method is named taskExecutor, since this is the specific method name for which Spring searches. In our case, we want to limit the number of concurrent threads to two and limit the size of the queue to 500. There are many more things you can tune. If you do not define an Executor bean, Spring creates a SimpleAsyncTaskExecutor and uses that.

方法执行任务的提交

基于前面的分析,方法执行任务的提交一定是发生在拦截到 @Async 注解时,也就是 AnnotationAsyncExecutionInterceptor 中;通过分析代码,在其父类 AsyncExecutionInterceptor
中,验证了分析。下面是部分核心逻辑:

1
2
3
4
5
6
7
8
9
10
11
public Object invoke(final MethodInvocation invocation) throws Throwable {
// 1、拿到 Method
// 2、根据 Method 获取 executor
AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
// 3、创建方法执行任务 task
Callable<Object> task = () -> {
// ...
};
// 4、提交 task
return doSubmit(task, executor, invocation.getMethod().getReturnType());
}

determineAsyncExecutor 中说明了, executor 是和方法对象绑定的,即每个方法都有一个自己的 executor;异步方法在第一次执行的时候创建自己的 executor,然后缓存到内存中。在 doSubmit 中,会根据 returnType 的类型进行相应的处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {
// CompletableFuture
if (CompletableFuture.class.isAssignableFrom(returnType)) {
return CompletableFuture.supplyAsync(() -> {
try {
return task.call();
}
catch (Throwable ex) {
throw new CompletionException(ex);
}
}, executor);
}
// ListenableFuture
else if (ListenableFuture.class.isAssignableFrom(returnType)) {
return ((AsyncListenableTaskExecutor) executor).submitListenable(task);
}
// Future
else if (Future.class.isAssignableFrom(returnType)) {
return executor.submit(task);
}
// void
else {
executor.submit(task);
return null;
}
}

如何自定义线程池

SpringBoot 提供了 org.springframework.scheduling.annotation.AsyncConfigurer 接口让开发人员可以自定义线程池执行器;框架默认提供了一个空的实现类 AsyncConfigurerSupport,两个方法体内部都是空实现。这部分逻辑在 org.springframework.scheduling.annotation.AbstractAsyncConfiguration#setConfigurers体现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* Collect any {@link AsyncConfigurer} beans through autowiring.
*/
@Autowired(required = false)
void setConfigurers(Collection<AsyncConfigurer> configurers) {
if (CollectionUtils.isEmpty(configurers)) {
return;
}
if (configurers.size() > 1) {
throw new IllegalStateException("Only one AsyncConfigurer may exist");
}
AsyncConfigurer configurer = configurers.iterator().next();
// for this
this.executor = configurer::getAsyncExecutor;
this.exceptionHandler = configurer::getAsyncUncaughtExceptionHandler;
}

AsyncConfigurer 在项目中只能有一个实现 Bean,如果超过一个,将会抛出 IllegalStateException 异常。

总结

本文通过对 @Async 注解的分析,和你解释了 @Async 是怎么让方法异步执行的吗? 这个问题;从分析过程中可以知道,对于绝大多数面向工程师使用的注解或者工具,本质上是离不开那些最最基本知识点的。当然,通过分析代码,一方面是可以进一步识别作者的意图,更主要的是可以看到那些意料之外的“骚操作” coding。

你知道 @Async 是怎么让方法异步执行的吗?

http://www.glmapper.com/2022/09/12/springboot/spring-boot-async-anno/

作者

卫恒

发布于

2022-09-12

更新于

2022-11-12

许可协议

评论