@Async 注解原理分析
@EnableAsync
@EnableAsync
除了一些配置外,最重要的就是通过@Import
引入了AsyncConfigurationSelector
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync {
/**
* 配置哪些注解需要扫描
* Indicate the 'async' annotation type to be detected at either class
* or method level.
* <p>By default, both Spring's @{@link Async} annotation and the EJB 3.1
* {@code @javax.ejb.Asynchronous} annotation will be detected.
* ......
*/
Class<? extends Annotation> annotation() default Annotation.class;
/**
* 配置使用CGLIB 还是 JDK 动态代理,不仅会影响 Async,还会影响其他aop的代理。(Spring Boot 中并没有用)
* Indicate whether subclass-based (CGLIB) proxies are to be created as opposed
* to standard Java interface-based proxies.
* <p><strong>Applicable only if the {@link #mode} is set to {@link AdviceMode#PROXY}</strong>.
* <p>The default is {@code false}.
* <p>Note that setting this attribute to {@code true} will affect <em>all</em>
* Spring-managed beans requiring proxying, not just those marked with {@code @Async}.
* For example, other beans marked with Spring's {@code @Transactional} annotation
* will be upgraded to subclass proxying at the same time. This approach has no
* negative impact in practice unless one is explicitly expecting one type of proxy
* vs. another — for example, in tests.
*/
boolean proxyTargetClass() default false;
/**
* 选择代理的模式,基本就是使用AdviceMode.PROXY
* Indicate how async advice should be applied.
* <p><b>The default is {@link AdviceMode#PROXY}.</b>
* Please note that proxy mode allows for interception of calls through the proxy
* only. Local calls within the same class cannot get intercepted that way; an
* {@link Async} annotation on such a method within a local call will be ignored
* since Spring's interceptor does not even kick in for such a runtime scenario.
* For a more advanced mode of interception, consider switching this to
* {@link AdviceMode#ASPECTJ}.
*/
AdviceMode mode() default AdviceMode.PROXY;
/**
* 设置AsyncAnnotationBeanPostProcessor 在 BeanPostProcessors 中的顺序
* Indicate the order in which the {@link AsyncAnnotationBeanPostProcessor}
* should be applied.
* <p>The default is {@link Ordered#LOWEST_PRECEDENCE} in order to run
* after all other post-processors, so that it can add an advisor to
* existing proxies rather than double-proxy.
*/
int order() default Ordered.LOWEST_PRECEDENCE;
}
AsyncConfigurationSelector
根据@EnableAsync#mode
进行选择,默认是ProxyAsyncConfiguration
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {
private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME =
"org.springframework.scheduling.aspectj.AspectJAsyncConfiguration";
/**
* Returns {@link ProxyAsyncConfiguration} or {@code AspectJAsyncConfiguration}
* for {@code PROXY} and {@code ASPECTJ} values of {@link EnableAsync#mode()},
* respectively.
*/
@Override
@Nullable
public String[] selectImports(AdviceMode adviceMode) {
switch (adviceMode) {
case PROXY:
return new String[] {ProxyAsyncConfiguration.class.getName()};
case ASPECTJ:
return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};
default:
return null;
}
}
}
ProxyAsyncConfiguration
通过@Bean
注入AsyncAnnotationBeanPostProcessor
这个BeanPostProcessor
。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Configuration(proxyBeanMethods = false)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {
//注意这个Bean的名字,在Async 中Spring 并没有把aop的所有配置都放出来,在必要的时候能通过修改BeanDifinition的信息控制Async的代理行为
@Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)
//标识为基础设施
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");
AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
bpp.configure(this.executor, this.exceptionHandler);
Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");
if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {
bpp.setAsyncAnnotationType(customAsyncAnnotation);
}
bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));
bpp.setOrder(this.enableAsync.<Integer>getNumber("order"));
return bpp;
}
}
AsyncAnnotationBeanPostProcessor
AsyncAnnotationBeanPostProcessor
除了一些配置外,最重要的就是在setBeanFactory
中往AbstractAdvisingBeanPostProcessor
添加了AsyncAnnotationAdvisor
,除此之外可能不出别的东西。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class AsyncAnnotationBeanPostProcessor extends AbstractBeanFactoryAwareAdvisingPostProcessor {
......
//配置异步线程池
public void setExecutor(Executor executor) {
this.executor = SingletonSupplier.of(executor);
}
//配置异常处理器
public void setExceptionHandler(AsyncUncaughtExceptionHandler exceptionHandler) {
this.exceptionHandler = SingletonSupplier.of(exceptionHandler);
}
//设置扫描的注解
public void setAsyncAnnotationType(Class<? extends Annotation> asyncAnnotationType) {
Assert.notNull(asyncAnnotationType, "'asyncAnnotationType' must not be null");
this.asyncAnnotationType = asyncAnnotationType;
}
@Override
public void setBeanFactory(BeanFactory beanFactory) {
super.setBeanFactory(beanFactory);
AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
if (this.asyncAnnotationType != null) {
advisor.setAsyncAnnotationType(this.asyncAnnotationType);
}
advisor.setBeanFactory(beanFactory);
//设置AbstractAdvisingBeanPostProcessor#advisor
this.advisor = advisor;
}
}
既然AsyncAnnotationBeanPostProcessor
是一个BeanPostProcessor
,那它就是在Bean
创建时工作的,顺藤摸瓜找到入口AbstractAdvisingBeanPostProcessor#postProcessAfterInitialization
,可以看到只实现了postProcessAfterInitialization
那么@Async
就是在Bean
初始化后进行的代理。
AbstractAdvisingBeanPostProcessor
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
public abstract class AbstractAdvisingBeanPostProcessor extends ProxyProcessorSupport implements BeanPostProcessor {
@Nullable
protected Advisor advisor;
......
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) {
if (this.advisor == null || bean instanceof AopInfrastructureBean) {
// Ignore AOP infrastructure such as scoped proxies.
return bean;
}
//当Bean已经被代理时(在Spring中所有代理对象都会继承Advised),不需要再代理,只需要往其中加入Advisor(在这里就是AsyncAnnotationAdvisor)
if (bean instanceof Advised) {
Advised advised = (Advised) bean;
if (!advised.isFrozen() && isEligible(AopUtils.getTargetClass(bean))) {
// Add our local Advisor to the existing proxy's Advisor chain...
if (this.beforeExistingAdvisors) {
advised.addAdvisor(0, this.advisor);
}
else {
advised.addAdvisor(this.advisor);
}
return bean;
}
}
if (isEligible(bean, beanName)) {
//判断Bean是否能代理,若可以使用ProxyFactory生成代理对象
ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName);
if (!proxyFactory.isProxyTargetClass()) {
evaluateProxyInterfaces(bean.getClass(), proxyFactory);
}
//将AsyncAnnotationAdvisor添加进proxyFactory
proxyFactory.addAdvisor(this.advisor);
customizeProxyFactory(proxyFactory);
// Use original ClassLoader if bean class not locally loaded in overriding class loader
ClassLoader classLoader = getProxyClassLoader();
if (classLoader instanceof SmartClassLoader && classLoader != bean.getClass().getClassLoader()) {
classLoader = ((SmartClassLoader) classLoader).getOriginalClassLoader();
}
return proxyFactory.getProxy(classLoader);
}
// No proxy needed.
return bean;
}
......
/**
* Prepare a {@link ProxyFactory} for the given bean.
* <p>Subclasses may customize the handling of the target instance and in
* particular the exposure of the target class. The default introspection
* of interfaces for non-target-class proxies and the configured advisor
* will be applied afterwards; {@link #customizeProxyFactory} allows for
* late customizations of those parts right before proxy creation.
* @param bean the bean instance to create a proxy for
* @param beanName the corresponding bean name
* @return the ProxyFactory, initialized with this processor's
* {@link ProxyConfig} settings and the specified bean
* @since 4.2.3
* @see #customizeProxyFactory
*/
protected ProxyFactory prepareProxyFactory(Object bean, String beanName) {
ProxyFactory proxyFactory = new ProxyFactory();
proxyFactory.copyFrom(this);
proxyFactory.setTarget(bean);
return proxyFactory;
}
......
}
AbstractBeanFactoryAwareAdvisingPostProcessor
主要重写了prepareProxyFactory
对AbstractAdvisingBeanPostProcessor
创建的ProxyFactory
进行配置,以及给isEligible
方法增加条件。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public abstract class AbstractBeanFactoryAwareAdvisingPostProcessor extends AbstractAdvisingBeanPostProcessor
implements BeanFactoryAware {
@Nullable
private ConfigurableListableBeanFactory beanFactory;
@Override
public void setBeanFactory(BeanFactory beanFactory) {
this.beanFactory = (beanFactory instanceof ConfigurableListableBeanFactory ?
(ConfigurableListableBeanFactory) beanFactory : null);
}
@Override
protected ProxyFactory prepareProxyFactory(Object bean, String beanName) {
if (this.beanFactory != null) {
AutoProxyUtils.exposeTargetClass(this.beanFactory, beanName, bean.getClass());
}
ProxyFactory proxyFactory = super.prepareProxyFactory(bean, beanName);
if (!proxyFactory.isProxyTargetClass() && this.beanFactory != null &&
AutoProxyUtils.shouldProxyTargetClass(this.beanFactory, beanName)) {
//主要配置后续代理的类型是JDK还是CGLIB动态代理
proxyFactory.setProxyTargetClass(true);
}
return proxyFactory;
}
@Override
protected boolean isEligible(Object bean, String beanName) {
return (!AutoProxyUtils.isOriginalInstance(beanName, bean.getClass()) &&
super.isEligible(bean, beanName));
}
}
到这里到这里代理对象已经生成了,接下来看异步执行是如何处理的。
AsyncAnnotationAdvisor
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class AsyncAnnotationAdvisor extends AbstractPointcutAdvisor implements BeanFactoryAware {
private Advice advice;
private Pointcut pointcut;
......
//添加默认注解Async 以及 javax.ejb.Asynchronous,根据注解构建Pointcut,最主要是生成Advice
public AsyncAnnotationAdvisor(
@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2);
asyncAnnotationTypes.add(Async.class);
try {
asyncAnnotationTypes.add((Class<? extends Annotation>)
ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader()));
}
catch (ClassNotFoundException ex) {
// If EJB 3.1 API not present, simply ignore.
}
this.advice = buildAdvice(executor, exceptionHandler);
this.pointcut = buildPointcut(asyncAnnotationTypes);
}
......
//setAsyncAnnotationType
//setBeanFactory
//getter
......
protected Advice buildAdvice(
@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
//构建具体aop的Advice,通过AnnotationAsyncExecutionInterceptor实现方法的代理异步执行
AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);
interceptor.configure(executor, exceptionHandler);
return interceptor;
}
......
}
AnnotationAsyncExecutionInterceptor
重写了父类的getExecutorQualifier
方法,根据@Async
的配置获取线程池名字,以便父类查找指定线程池执行异步任务。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class AnnotationAsyncExecutionInterceptor extends AsyncExecutionInterceptor {
/**
* Return the qualifier or bean name of the executor to be used when executing the
* given method, specified via {@link Async#value} at the method or declaring
* class level. If {@code @Async} is specified at both the method and class level, the
* method's {@code #value} takes precedence (even if empty strinjavag, indicating that
* the default executor should be used preferentially).
* @param method the method to inspect for executor qualifier metadata
* @return the qualifier if specified, otherwise empty string indicating that the
* {@linkplain #setExecutor(Executor) default executor} should be used
* @see #determineAsyncExecutor(Method)
*/
@Override
@Nullable
protected String getExecutorQualifier(Method method) {
// Maintainer's note: changes made here should also be made in
// AnnotationAsyncExecutionAspect#getExecutorQualifier
Async async = AnnotatedElementUtils.findMergedAnnotation(method, Async.class);
if (async == null) {
async = AnnotatedElementUtils.findMergedAnnotation(method.getDeclaringClass(), Async.class);
}
return (async != null ? async.value() : null);
}
}
主要看父类的invoke方法
AsyncExecutionInterceptor
MethodInterceptor
实现类,通过它实现异步执行操作。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public class AsyncExecutionInterceptor extends AsyncExecutionAspectSupport implements MethodInterceptor, Ordered {
......
/**
* Intercept the given method invocation, submit the actual calling of the method to
* the correct task executor and return immediately to the caller.
* @param invocation the method to intercept and make asynchronous
* @return {@link Future} if the original method returns {@code Future}; {@code null}
* otherwise.
*/
@Override
@Nullable
public Object invoke(final MethodInvocation invocation) throws Throwable {
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);
//根据方法获取线程池
AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
if (executor == null) {
throw new IllegalStateException(
"No executor specified and no default executor set on AsyncExecutionInterceptor either");
}
//创建任务
Callable<Object> task = () -> {
try {
Object result = invocation.proceed();
if (result instanceof Future) {
return ((Future<?>) result).get();
}
}
catch (ExecutionException ex) {
handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
}
catch (Throwable ex) {
handleError(ex, userDeclaredMethod, invocation.getArguments());
}
return null;
};
//将任务提交至线程池执行
return doSubmit(task, executor, invocation.getMethod().getReturnType());
}
......
}
总结
Spring Async 主要是通过BeanPostProcessor
在Bean
初始化后生成代理对象,使用Spring AOP实现的异步执行。与Cache
、Transactional
不同,没有使用AbstractAutoProxyCreator
系列(InfrastructureAdvisorAutoProxyCreator
)去实现代理,而是使用了AsyncAnnotationBeanPostProcessor
通过ProxyFactory
创建代理对象。
可以看到AbstractAdvisingBeanPostProcessor
与AbstractAutoProxyCreator
一样具备自动代理的功能,不同的是AbstractAutoProxyCreator
会从容器中查找Advisor
,而AbstractAdvisingBeanPostProcessor
只是增加了自身配置的Advisor
。
问题,为什么Async 不使用AbstractAutoProxyCreator
?