@Async原理分析

"@Async 注解原理分析"

Posted by tablesheep on October 18, 2021

@Async 注解原理分析

@EnableAsync

@EnableAsync除了一些配置外,最重要的就是通过@Import引入了AsyncConfigurationSelector

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync {

   /**
    * 配置哪些注解需要扫描
    * Indicate the 'async' annotation type to be detected at either class
    * or method level.
    * <p>By default, both Spring's @{@link Async} annotation and the EJB 3.1
    * {@code @javax.ejb.Asynchronous} annotation will be detected.
    * ......
    */
   Class<? extends Annotation> annotation() default Annotation.class;

   /**
    * 配置使用CGLIB 还是 JDK 动态代理,不仅会影响 Async,还会影响其他aop的代理。(Spring Boot 中并没有用)
    * Indicate whether subclass-based (CGLIB) proxies are to be created as opposed
    * to standard Java interface-based proxies.
    * <p><strong>Applicable only if the {@link #mode} is set to {@link AdviceMode#PROXY}</strong>.
    * <p>The default is {@code false}.
    * <p>Note that setting this attribute to {@code true} will affect <em>all</em>
    * Spring-managed beans requiring proxying, not just those marked with {@code @Async}.
    * For example, other beans marked with Spring's {@code @Transactional} annotation
    * will be upgraded to subclass proxying at the same time. This approach has no
    * negative impact in practice unless one is explicitly expecting one type of proxy
    * vs. another &mdash; for example, in tests.
    */
   boolean proxyTargetClass() default false;

   /**
    * 选择代理的模式,基本就是使用AdviceMode.PROXY
    * Indicate how async advice should be applied.
    * <p><b>The default is {@link AdviceMode#PROXY}.</b>
    * Please note that proxy mode allows for interception of calls through the proxy
    * only. Local calls within the same class cannot get intercepted that way; an
    * {@link Async} annotation on such a method within a local call will be ignored
    * since Spring's interceptor does not even kick in for such a runtime scenario.
    * For a more advanced mode of interception, consider switching this to
    * {@link AdviceMode#ASPECTJ}.
    */
   AdviceMode mode() default AdviceMode.PROXY;

   /**
    * 设置AsyncAnnotationBeanPostProcessor 在 BeanPostProcessors 中的顺序
    * Indicate the order in which the {@link AsyncAnnotationBeanPostProcessor}
    * should be applied.
    * <p>The default is {@link Ordered#LOWEST_PRECEDENCE} in order to run
    * after all other post-processors, so that it can add an advisor to
    * existing proxies rather than double-proxy.
    */
   int order() default Ordered.LOWEST_PRECEDENCE;

}

AsyncConfigurationSelector

根据@EnableAsync#mode进行选择,默认是ProxyAsyncConfiguration

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {

   private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME =
         "org.springframework.scheduling.aspectj.AspectJAsyncConfiguration";


   /**
    * Returns {@link ProxyAsyncConfiguration} or {@code AspectJAsyncConfiguration}
    * for {@code PROXY} and {@code ASPECTJ} values of {@link EnableAsync#mode()},
    * respectively.
    */
   @Override
   @Nullable
   public String[] selectImports(AdviceMode adviceMode) {
      switch (adviceMode) {
         case PROXY:
            return new String[] {ProxyAsyncConfiguration.class.getName()};
         case ASPECTJ:
            return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};
         default:
            return null;
      }
   }

}

ProxyAsyncConfiguration

通过@Bean注入AsyncAnnotationBeanPostProcessor这个BeanPostProcessor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Configuration(proxyBeanMethods = false)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {

    //注意这个Bean的名字,在Async 中Spring 并没有把aop的所有配置都放出来,在必要的时候能通过修改BeanDifinition的信息控制Async的代理行为
   @Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)
    //标识为基础设施
   @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
   public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
      Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");
      AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
      bpp.configure(this.executor, this.exceptionHandler);
      Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");
      if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {
         bpp.setAsyncAnnotationType(customAsyncAnnotation);
      }
      bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));
      bpp.setOrder(this.enableAsync.<Integer>getNumber("order"));
      return bpp;
   }
}

AsyncAnnotationBeanPostProcessor

AsyncAnnotationBeanPostProcessor

AsyncAnnotationBeanPostProcessor除了一些配置外,最重要的就是在setBeanFactory中往AbstractAdvisingBeanPostProcessor添加了AsyncAnnotationAdvisor,除此之外可能不出别的东西。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class AsyncAnnotationBeanPostProcessor extends AbstractBeanFactoryAwareAdvisingPostProcessor {
......

//配置异步线程池
   public void setExecutor(Executor executor) {
      this.executor = SingletonSupplier.of(executor);
   }

//配置异常处理器
   public void setExceptionHandler(AsyncUncaughtExceptionHandler exceptionHandler) {
      this.exceptionHandler = SingletonSupplier.of(exceptionHandler);
   }

//设置扫描的注解
   public void setAsyncAnnotationType(Class<? extends Annotation> asyncAnnotationType) {
      Assert.notNull(asyncAnnotationType, "'asyncAnnotationType' must not be null");
      this.asyncAnnotationType = asyncAnnotationType;
   }


   @Override
   public void setBeanFactory(BeanFactory beanFactory) {
      super.setBeanFactory(beanFactory);

      AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
      if (this.asyncAnnotationType != null) {
         advisor.setAsyncAnnotationType(this.asyncAnnotationType);
      }
      advisor.setBeanFactory(beanFactory);
       //设置AbstractAdvisingBeanPostProcessor#advisor
      this.advisor = advisor;
   }

}

既然AsyncAnnotationBeanPostProcessor是一个BeanPostProcessor,那它就是在Bean创建时工作的,顺藤摸瓜找到入口AbstractAdvisingBeanPostProcessor#postProcessAfterInitialization,可以看到只实现了postProcessAfterInitialization那么@Async就是在Bean初始化后进行的代理。

AbstractAdvisingBeanPostProcessor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
public abstract class AbstractAdvisingBeanPostProcessor extends ProxyProcessorSupport implements BeanPostProcessor {

   @Nullable
   protected Advisor advisor;
......

   @Override
   public Object postProcessAfterInitialization(Object bean, String beanName) {
      if (this.advisor == null || bean instanceof AopInfrastructureBean) {
         // Ignore AOP infrastructure such as scoped proxies.
         return bean;
      }

    //当Bean已经被代理时(在Spring中所有代理对象都会继承Advised),不需要再代理,只需要往其中加入Advisor(在这里就是AsyncAnnotationAdvisor)
      if (bean instanceof Advised) {
         Advised advised = (Advised) bean;
         if (!advised.isFrozen() && isEligible(AopUtils.getTargetClass(bean))) {
            // Add our local Advisor to the existing proxy's Advisor chain...
            if (this.beforeExistingAdvisors) {
               advised.addAdvisor(0, this.advisor);
            }
            else {
               advised.addAdvisor(this.advisor);
            }
            return bean;
         }
      }


      if (isEligible(bean, beanName)) {
        //判断Bean是否能代理,若可以使用ProxyFactory生成代理对象
         ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName);
         if (!proxyFactory.isProxyTargetClass()) {
            evaluateProxyInterfaces(bean.getClass(), proxyFactory);
         }
          //将AsyncAnnotationAdvisor添加进proxyFactory
         proxyFactory.addAdvisor(this.advisor);
         customizeProxyFactory(proxyFactory);

         // Use original ClassLoader if bean class not locally loaded in overriding class loader
         ClassLoader classLoader = getProxyClassLoader();
         if (classLoader instanceof SmartClassLoader && classLoader != bean.getClass().getClassLoader()) {
            classLoader = ((SmartClassLoader) classLoader).getOriginalClassLoader();
         }
         return proxyFactory.getProxy(classLoader);
      }

      // No proxy needed.
      return bean;
   }
......


	/**
	 * Prepare a {@link ProxyFactory} for the given bean.
	 * <p>Subclasses may customize the handling of the target instance and in
	 * particular the exposure of the target class. The default introspection
	 * of interfaces for non-target-class proxies and the configured advisor
	 * will be applied afterwards; {@link #customizeProxyFactory} allows for
	 * late customizations of those parts right before proxy creation.
	 * @param bean the bean instance to create a proxy for
	 * @param beanName the corresponding bean name
	 * @return the ProxyFactory, initialized with this processor's
	 * {@link ProxyConfig} settings and the specified bean
	 * @since 4.2.3
	 * @see #customizeProxyFactory
	 */
	protected ProxyFactory prepareProxyFactory(Object bean, String beanName) {
		ProxyFactory proxyFactory = new ProxyFactory();
		proxyFactory.copyFrom(this);
		proxyFactory.setTarget(bean);
		return proxyFactory;
	}
......
}

AbstractBeanFactoryAwareAdvisingPostProcessor

主要重写了prepareProxyFactoryAbstractAdvisingBeanPostProcessor创建的ProxyFactory进行配置,以及给isEligible方法增加条件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public abstract class AbstractBeanFactoryAwareAdvisingPostProcessor extends AbstractAdvisingBeanPostProcessor
      implements BeanFactoryAware {

   @Nullable
   private ConfigurableListableBeanFactory beanFactory;


   @Override
   public void setBeanFactory(BeanFactory beanFactory) {
      this.beanFactory = (beanFactory instanceof ConfigurableListableBeanFactory ?
            (ConfigurableListableBeanFactory) beanFactory : null);
   }

   @Override
   protected ProxyFactory prepareProxyFactory(Object bean, String beanName) {
      if (this.beanFactory != null) {
         AutoProxyUtils.exposeTargetClass(this.beanFactory, beanName, bean.getClass());
      }

      ProxyFactory proxyFactory = super.prepareProxyFactory(bean, beanName);
      if (!proxyFactory.isProxyTargetClass() && this.beanFactory != null &&
            AutoProxyUtils.shouldProxyTargetClass(this.beanFactory, beanName)) {
            //主要配置后续代理的类型是JDK还是CGLIB动态代理
         proxyFactory.setProxyTargetClass(true);
      }
      return proxyFactory;
   }

   @Override
   protected boolean isEligible(Object bean, String beanName) {
      return (!AutoProxyUtils.isOriginalInstance(beanName, bean.getClass()) &&
            super.isEligible(bean, beanName));
   }

}

到这里到这里代理对象已经生成了,接下来看异步执行是如何处理的。

AsyncAnnotationAdvisor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class AsyncAnnotationAdvisor extends AbstractPointcutAdvisor implements BeanFactoryAware {

   private Advice advice;

   private Pointcut pointcut;
......

//添加默认注解Async 以及 javax.ejb.Asynchronous,根据注解构建Pointcut,最主要是生成Advice
   public AsyncAnnotationAdvisor(
         @Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
      Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2);
      asyncAnnotationTypes.add(Async.class);
      try {
         asyncAnnotationTypes.add((Class<? extends Annotation>)
               ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader()));
      }
      catch (ClassNotFoundException ex) {
         // If EJB 3.1 API not present, simply ignore.
      }
      this.advice = buildAdvice(executor, exceptionHandler);
      this.pointcut = buildPointcut(asyncAnnotationTypes);
   }

......
//setAsyncAnnotationType
//setBeanFactory
//getter
......

   protected Advice buildAdvice(
         @Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
//构建具体aop的Advice,通过AnnotationAsyncExecutionInterceptor实现方法的代理异步执行
      AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);
      interceptor.configure(executor, exceptionHandler);
      return interceptor;
   }

......

}

AnnotationAsyncExecutionInterceptor

重写了父类的getExecutorQualifier方法,根据@Async的配置获取线程池名字,以便父类查找指定线程池执行异步任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class AnnotationAsyncExecutionInterceptor extends AsyncExecutionInterceptor {


   /**
    * Return the qualifier or bean name of the executor to be used when executing the
    * given method, specified via {@link Async#value} at the method or declaring
    * class level. If {@code @Async} is specified at both the method and class level, the
    * method's {@code #value} takes precedence (even if empty strinjavag, indicating that
    * the default executor should be used preferentially).
    * @param method the method to inspect for executor qualifier metadata
    * @return the qualifier if specified, otherwise empty string indicating that the
    * {@linkplain #setExecutor(Executor) default executor} should be used
    * @see #determineAsyncExecutor(Method)
    */
   @Override
   @Nullable
   protected String getExecutorQualifier(Method method) {
      // Maintainer's note: changes made here should also be made in
      // AnnotationAsyncExecutionAspect#getExecutorQualifier
      Async async = AnnotatedElementUtils.findMergedAnnotation(method, Async.class);
      if (async == null) {
         async = AnnotatedElementUtils.findMergedAnnotation(method.getDeclaringClass(), Async.class);
      }
      return (async != null ? async.value() : null);
   }

}

主要看父类的invoke方法

AsyncExecutionInterceptor

MethodInterceptor实现类,通过它实现异步执行操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public class AsyncExecutionInterceptor extends AsyncExecutionAspectSupport implements MethodInterceptor, Ordered {
    
......

   /**
    * Intercept the given method invocation, submit the actual calling of the method to
    * the correct task executor and return immediately to the caller.
    * @param invocation the method to intercept and make asynchronous
    * @return {@link Future} if the original method returns {@code Future}; {@code null}
    * otherwise.
    */
   @Override
   @Nullable
   public Object invoke(final MethodInvocation invocation) throws Throwable {
      Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
      Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
      final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);

    //根据方法获取线程池
      AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
      if (executor == null) {
         throw new IllegalStateException(
               "No executor specified and no default executor set on AsyncExecutionInterceptor either");
      }

    //创建任务
      Callable<Object> task = () -> {
         try {
            Object result = invocation.proceed();
            if (result instanceof Future) {
               return ((Future<?>) result).get();
            }
         }
         catch (ExecutionException ex) {
            handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
         }
         catch (Throwable ex) {
            handleError(ex, userDeclaredMethod, invocation.getArguments());
         }
         return null;
      };

    //将任务提交至线程池执行
      return doSubmit(task, executor, invocation.getMethod().getReturnType());
   }

......

}

总结

Spring Async 主要是通过BeanPostProcessorBean初始化后生成代理对象,使用Spring AOP实现的异步执行。与CacheTransactional不同,没有使用AbstractAutoProxyCreator系列(InfrastructureAdvisorAutoProxyCreator)去实现代理,而是使用了AsyncAnnotationBeanPostProcessor通过ProxyFactory创建代理对象。

可以看到AbstractAdvisingBeanPostProcessorAbstractAutoProxyCreator一样具备自动代理的功能,不同的是AbstractAutoProxyCreator会从容器中查找Advisor,而AbstractAdvisingBeanPostProcessor只是增加了自身配置的Advisor

问题,为什么Async 不使用AbstractAutoProxyCreator

References