Spring事务-异步源码解析
开启异步 开启异步任务使用方法 1).方法上加@Async注解
2).启动类或者配置类上@EnableAsync
@EnableAsync用于开启Spring bean异步方法的能力。下面是注解EnableAsync的定义。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented @Import(AsyncConfigurationSelector.class) public @interface EnableAsync { Class<? extends Annotation > annotation() default Annotation.class; boolean proxyTargetClass () default false ; AdviceMode mode () default AdviceMode.PROXY; int order () default Ordered.LOWEST_PRECEDENCE; }
核心注解就是@Import(AsyncConfigurationSelector.class),一看就是套路ImportSelector
接口的selectImports()方法,源码如下:
注册AsyncAnnotationBeanPostProcessor
默认情况下EnableAsync#mode()为AdviceMode.PROXY,AsyncConfigurationSelector的selectImports()方法返回的@Configuartion类ProxyAsyncConfiguration里面注册了一个AsyncAnnotationBeanPostProcessor,这个BeanPostProcessor为每个有@Async的类或方法的类生成一个有异步方法调用能力的代理对象。
AdviceModeImportSelector#selectImports
AsyncConfigurationSelector.selectImports()方法是在父类AdviceModeImportSelector的ImportSelector接口方法selectImports()调用时被调用的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 public abstract class AdviceModeImportSelector <A extends Annotation > implements ImportSelector { public static final String DEFAULT_ADVICE_MODE_ATTRIBUTE_NAME = "mode" ; protected String getAdviceModeAttributeName () { return DEFAULT_ADVICE_MODE_ATTRIBUTE_NAME; } @Override public final String[] selectImports(AnnotationMetadata importingClassMetadata) { Class<?> annType = GenericTypeResolver.resolveTypeArgument(getClass(), AdviceModeImportSelector.class); Assert.state(annType != null , "Unresolvable type argument for AdviceModeImportSelector" ); AnnotationAttributes attributes = AnnotationConfigUtils.attributesFor(importingClassMetadata, annType); if (attributes == null ) { throw new IllegalArgumentException (String.format( "@%s is not present on importing class '%s' as expected" , annType.getSimpleName(), importingClassMetadata.getClassName())); } AdviceMode adviceMode = attributes.getEnum(getAdviceModeAttributeName()); String[] imports = selectImports(adviceMode); if (imports == null ) { throw new IllegalArgumentException ("Unknown AdviceMode: " + adviceMode); } return imports; } @Nullable protected abstract String[] selectImports(AdviceMode adviceMode); }
AsyncConfigurationSelector 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public class AsyncConfigurationSelector extends AdviceModeImportSelector <EnableAsync> { private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME = "org.springframework.scheduling.aspectj.AspectJAsyncConfiguration" ; @Override @Nullable public String[] selectImports(AdviceMode adviceMode) { switch (adviceMode) { case PROXY: return new String []{ProxyAsyncConfiguration.class.getName()}; case ASPECTJ: return new String []{ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME}; default : return null ; } } }
异步配置类 ProxyAsyncConfiguration
ProxyAsyncConfiguration的@Bean方法内,注册了一个AsyncAnnotationBeanPostProcessor并配置了Supplier<Executor>和Supplier<AsyncUncaughtExceptionHandler>,这两个对象是定义在父类AbstractAsyncConfiguration里面的,通过一个@Autowired方法将容器的里面的AsyncConfigurer对象赋给executor和exceptionHandler。所以如果容器中没有这两种bean,这两个成员变量就是null。如果我们想实现自己的Executor和AsyncUncaughtExceptionHandler可以在容器中实现一个自定义的AsyncConfigurer对象。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 @Configuration @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration { @Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME) @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public AsyncAnnotationBeanPostProcessor asyncAdvisor () { Assert.notNull(this .enableAsync, "@EnableAsync annotation metadata was not injected" ); AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor (); bpp.configure(this .executor, this .exceptionHandler); Class<? extends Annotation > customAsyncAnnotation = this .enableAsync.getClass("annotation" ); if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation" )) { bpp.setAsyncAnnotationType(customAsyncAnnotation); } bpp.setProxyTargetClass(this .enableAsync.getBoolean("proxyTargetClass" )); bpp.setOrder(this .enableAsync.<Integer>getNumber("order" )); return bpp; } }
ProxyAsyncConfiguration就两点:
就是继承了AbstractAsyncConfiguration类
定义了一个bean:AsyncAnnotationBeanPostProcessor
AbstractAsyncConfiguration 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 @Configuration public abstract class AbstractAsyncConfiguration implements ImportAware { @Nullable protected AnnotationAttributes enableAsync; @Nullable protected Supplier<Executor> executor; @Nullable protected Supplier<AsyncUncaughtExceptionHandler> exceptionHandler; @Override public void setImportMetadata (AnnotationMetadata importMetadata) { this .enableAsync = AnnotationAttributes.fromMap( importMetadata.getAnnotationAttributes(EnableAsync.class.getName(), false )); if (this .enableAsync == null ) { throw new IllegalArgumentException ( "@EnableAsync is not present on importing class " + importMetadata.getClassName()); } } @Autowired(required = false) void setConfigurers (Collection<AsyncConfigurer> configurers) { if (CollectionUtils.isEmpty(configurers)) { return ; } if (configurers.size() > 1 ) { throw new IllegalStateException ("Only one AsyncConfigurer may exist" ); } AsyncConfigurer configurer = configurers.iterator().next(); this .executor = configurer::getAsyncExecutor; this .exceptionHandler = configurer::getAsyncUncaughtExceptionHandler; } }
属性:
1)注解属性
2)异步任务执行器
3)异常处理器
方法:
1)setImportMetadata 设置注解属性,即属性1
2)setConfigurers 设置异步任务执行器和异常处理器,即属性2,3
AsyncAnnotationBeanPostProcessor
目标对象在经过AsyncAnnotationBeanPostProcessor的postProcessAfterInitialization()方法后会返回一个代理对象替换元对象。先看一下AsyncAnnotationBeanPostProcessor的继承结构。
AbstractBeanFactoryAwareAdvisingPostProcessor实现了接口BeanFactoryAware的setBeanFactory()方法,而AsyncAnnotationBeanPostProcessor又重写了此方法,此方法的调用要早于接口BeanPostProcessor的postProcessAfterInitialization()方法,下面是AsyncAnnotationBeanPostProcessor.setBeanFactory()方法。
AOP-Advisor切面初始化
AsyncAnnotationBeanPostProcessor这个类的Bean 初始化时 : BeanFactoryAware接口setBeanFactory方法中,对AsyncAnnotationAdvisor异步注解切面进行了构造。
setBeanFactory 1 2 3 4 5 6 7 8 9 10 11 @Override public void setBeanFactory (BeanFactory beanFactory) { super .setBeanFactory(beanFactory); AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor (this .executor, this .exceptionHandler); if (this .asyncAnnotationType != null ) { advisor.setAsyncAnnotationType(this .asyncAnnotationType); } advisor.setBeanFactory(beanFactory); this .advisor = advisor; }
这个方法内实例化了一个AsyncAnnotationAdvisor并保存在了父类AbstractAdvisingBeanPostProcessor的成员变量advisor。AsyncAnnotationAdvisor是继承了AbstractPointcutAdvisor实现了getAdvice()和getPointcut()方法。这两个方法的返回值就是上面方法实例化AsyncAnnotationAdvisor就确定的。
AsyncAnnotationAdvisor
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 public class AsyncAnnotationAdvisor extends AbstractPointcutAdvisor implements BeanFactoryAware { private Advice advice; private Pointcut pointcut; public AsyncAnnotationAdvisor () { this ((Supplier<Executor>) null , (Supplier<AsyncUncaughtExceptionHandler>) null ); } @SuppressWarnings("unchecked") public AsyncAnnotationAdvisor ( @Nullable Executor executor, @Nullable AsyncUncaughtExceptionHandler exceptionHandler) { this (SingletonSupplier.ofNullable(executor), SingletonSupplier.ofNullable(exceptionHandler)); } @SuppressWarnings("unchecked") public AsyncAnnotationAdvisor ( @Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) { Set<Class<? extends Annotation >> asyncAnnotationTypes = new LinkedHashSet <>(2 ); asyncAnnotationTypes.add(Async.class); try { asyncAnnotationTypes.add((Class<? extends Annotation >) ClassUtils.forName("javax.ejb.Asynchronous" , AsyncAnnotationAdvisor.class.getClassLoader())); } catch (ClassNotFoundException ex) { } this .advice = buildAdvice(executor, exceptionHandler); this .pointcut = buildPointcut(asyncAnnotationTypes); } public void setAsyncAnnotationType (Class<? extends Annotation> asyncAnnotationType) { Assert.notNull(asyncAnnotationType, "'asyncAnnotationType' must not be null" ); Set<Class<? extends Annotation >> asyncAnnotationTypes = new HashSet <>(); asyncAnnotationTypes.add(asyncAnnotationType); this .pointcut = buildPointcut(asyncAnnotationTypes); } @Override public void setBeanFactory (BeanFactory beanFactory) { if (this .advice instanceof BeanFactoryAware) { ((BeanFactoryAware) this .advice).setBeanFactory(beanFactory); } } @Override public Advice getAdvice () { return this .advice; } @Override public Pointcut getPointcut () { return this .pointcut; } protected Advice buildAdvice ( @Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) { AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor (null ); interceptor.configure(executor, exceptionHandler); return interceptor; } protected Pointcut buildPointcut (Set<Class<? extends Annotation>> asyncAnnotationTypes) { ComposablePointcut result = null ; for (Class<? extends Annotation > asyncAnnotationType : asyncAnnotationTypes) { Pointcut cpc = new AnnotationMatchingPointcut (asyncAnnotationType, true ); Pointcut mpc = new AnnotationMatchingPointcut (null , asyncAnnotationType, true ); if (result == null ) { result = new ComposablePointcut (cpc); } else { result.union(cpc); } result = result.union(mpc); } return (result != null ? result : Pointcut.TRUE); } }
buildAdvice()方法返回的是一个AnnotationAsyncExecutionInterceptor(),并传入AsyncAnnotationBeanPostProcessor的executor和exceptionHandler。接下来就看一下这个通知具体行为。继承结构如下:
可以看到AnnotationAsyncExecutionInterceptor是MethodInterceptor接口的实现类,下面看一下invoke()方法的实现:
AnnotationAsyncExecutionInterceptor 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 public class AsyncExecutionInterceptor extends AsyncExecutionAspectSupport implements MethodInterceptor , Ordered { public AsyncExecutionInterceptor (@Nullable Executor defaultExecutor) { super (defaultExecutor); } public AsyncExecutionInterceptor (@Nullable Executor defaultExecutor, AsyncUncaughtExceptionHandler exceptionHandler) { super (defaultExecutor, exceptionHandler); } @Override @Nullable public Object invoke (final MethodInvocation invocation) throws Throwable { Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null ); Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass); final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod); AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod); if (executor == null ) { throw new IllegalStateException ( "No executor specified and no default executor set on AsyncExecutionInterceptor either" ); } Callable<Object> task = () -> { try { Object result = invocation.proceed(); if (result instanceof Future) { return ((Future<?>) result).get(); } } catch (ExecutionException ex) { handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments()); } catch (Throwable ex) { handleError(ex, userDeclaredMethod, invocation.getArguments()); } return null ; }; return doSubmit(task, executor, invocation.getMethod().getReturnType()); } @Override @Nullable protected String getExecutorQualifier (Method method) { return null ; } @Override @Nullable protected Executor getDefaultExecutor (@Nullable BeanFactory beanFactory) { Executor defaultExecutor = super .getDefaultExecutor(beanFactory); return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor ()); } @Override public int getOrder () { return Ordered.HIGHEST_PRECEDENCE; } }
AsyncExecutionAspectSupport#determineAsyncExecutor 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 protected AsyncTaskExecutor determineAsyncExecutor (Method method) { AsyncTaskExecutor executor = this .executors.get(method); if (executor == null ) { Executor targetExecutor; String qualifier = getExecutorQualifier(method); if (StringUtils.hasLength(qualifier)) { targetExecutor = findQualifiedExecutor(this .beanFactory, qualifier); } else { targetExecutor = this .defaultExecutor.get(); } if (targetExecutor == null ) { return null ; } executor = (targetExecutor instanceof AsyncListenableTaskExecutor ? (AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter (targetExecutor)); this .executors.put(method, executor); } return executor; }
AnnotationMatchingPointcut
buildPointcut()方法会结合类和方法上的asyncAnnotationType,当然默认是@Async和@Asynchronous,类和方法上一处出现异步注解就会应用通知。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 public class AnnotationMatchingPointcut implements Pointcut { private final ClassFilter classFilter; private final MethodMatcher methodMatcher; public AnnotationMatchingPointcut (Class<? extends Annotation> classAnnotationType) { this (classAnnotationType, false ); } public AnnotationMatchingPointcut (Class<? extends Annotation> classAnnotationType, boolean checkInherited) { this .classFilter = new AnnotationClassFilter (classAnnotationType, checkInherited); this .methodMatcher = MethodMatcher.TRUE; } public AnnotationMatchingPointcut (@Nullable Class<? extends Annotation> classAnnotationType, @Nullable Class<? extends Annotation> methodAnnotationType) { this (classAnnotationType, methodAnnotationType, false ); } public AnnotationMatchingPointcut (@Nullable Class<? extends Annotation> classAnnotationType, @Nullable Class<? extends Annotation> methodAnnotationType, boolean checkInherited) { Assert.isTrue((classAnnotationType != null || methodAnnotationType != null ), "Either Class annotation type or Method annotation type needs to be specified (or both)" ); if (classAnnotationType != null ) { this .classFilter = new AnnotationClassFilter (classAnnotationType, checkInherited); } else { this .classFilter = new AnnotationCandidateClassFilter (methodAnnotationType); } if (methodAnnotationType != null ) { this .methodMatcher = new AnnotationMethodMatcher (methodAnnotationType, checkInherited); } else { this .methodMatcher = MethodMatcher.TRUE; } } @Override public ClassFilter getClassFilter () { return this .classFilter; } @Override public MethodMatcher getMethodMatcher () { return this .methodMatcher; } @Override public boolean equals (@Nullable Object other) { if (this == other) { return true ; } if (!(other instanceof AnnotationMatchingPointcut)) { return false ; } AnnotationMatchingPointcut otherPointcut = (AnnotationMatchingPointcut) other; return (this .classFilter.equals(otherPointcut.classFilter) && this .methodMatcher.equals(otherPointcut.methodMatcher)); } @Override public int hashCode () { return this .classFilter.hashCode() * 37 + this .methodMatcher.hashCode(); } @Override public String toString () { return "AnnotationMatchingPointcut: " + this .classFilter + ", " + this .methodMatcher; } public static AnnotationMatchingPointcut forClassAnnotation (Class<? extends Annotation> annotationType) { Assert.notNull(annotationType, "Annotation type must not be null" ); return new AnnotationMatchingPointcut (annotationType); } public static AnnotationMatchingPointcut forMethodAnnotation (Class<? extends Annotation> annotationType) { Assert.notNull(annotationType, "Annotation type must not be null" ); return new AnnotationMatchingPointcut (null , annotationType); } private static class AnnotationCandidateClassFilter implements ClassFilter { private final Class<? extends Annotation > annotationType; AnnotationCandidateClassFilter(Class<? extends Annotation > annotationType) { this .annotationType = annotationType; } @Override public boolean matches (Class<?> clazz) { return AnnotationUtils.isCandidateClass(clazz, this .annotationType); } @Override public boolean equals (Object obj) { if (this == obj) { return true ; } if (!(obj instanceof AnnotationCandidateClassFilter)) { return false ; } AnnotationCandidateClassFilter that = (AnnotationCandidateClassFilter) obj; return this .annotationType.equals(that.annotationType); } @Override public int hashCode () { return this .annotationType.hashCode(); } @Override public String toString () { return getClass().getName() + ": " + this .annotationType; } } }
AOP-生成代理类AopProxy
(AsyncAnnotationBeanPostProcessor -》postProcessAfterInitialization())
具体的后置处理:AsyncAnnotationBeanPostProcessor的后置bean处理是通过其父类AbstractAdvisingBeanPostProcessor来实现的,
该类实现了BeanPostProcessor接口,复写postProcessAfterInitialization方法如下图所示:
AbstractAdvisingBeanPostProcessor#postProcessAfterInitialization 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 @Override public Object postProcessAfterInitialization (Object bean, String beanName) { if (this .advisor == null || bean instanceof AopInfrastructureBean) { return bean; } if (bean instanceof Advised) { Advised advised = (Advised) bean; if (!advised.isFrozen() && isEligible(AopUtils.getTargetClass(bean))) { if (this .beforeExistingAdvisors) { advised.addAdvisor(0 , this .advisor); } else { advised.addAdvisor(this .advisor); } return bean; } } if (isEligible(bean, beanName)) { ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName); if (!proxyFactory.isProxyTargetClass()) { evaluateProxyInterfaces(bean.getClass(), proxyFactory); } proxyFactory.addAdvisor(this .advisor); customizeProxyFactory(proxyFactory); return proxyFactory.getProxy(getProxyClassLoader()); } return bean; }
isEligible用于判断这个类或者这个类中的某个方法是否含有注解,AsyncAnnotationAdvisor 实现了PointcutAdvisor接口。
AbstractBeanFactoryAwareAdvisingPostProcessor#isEligible 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 protected boolean isEligible (Object bean, String beanName) { return isEligible(bean.getClass()); } protected boolean isEligible (Class<?> targetClass) { Boolean eligible = this .eligibleBeans.get(targetClass); if (eligible != null ) { return eligible; } if (this .advisor == null ) { return false ; } eligible = AopUtils.canApply(this .advisor, targetClass); this .eligibleBeans.put(targetClass, eligible); return eligible; }
isEligible校验通过后,构造ProxyFactory代理工厂,添加代理的接口,设置切面,最后返回代理类:AopProxy接口实现类
canApply 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public static boolean canApply (Advisor advisor, Class<?> targetClass) { return canApply(advisor, targetClass, false ); } public static boolean canApply (Advisor advisor, Class<?> targetClass, boolean hasIntroductions) { if (advisor instanceof IntroductionAdvisor) { return ((IntroductionAdvisor) advisor).getClassFilter().matches(targetClass); } else if (advisor instanceof PointcutAdvisor) { PointcutAdvisor pca = (PointcutAdvisor) advisor; return canApply(pca.getPointcut(), targetClass, hasIntroductions); } else { return true ; } }
isEligible校验通过后,构造ProxyFactory代理工厂,添加代理的接口,设置切面,最后返回代理类:AopProxy接口实现类
AOP-切点执行
上一步生成的代理AopProxy接口,我们这里最终实际生成的是JdkDynamicAopProxy,即JDK动态代理类,类图如下:
最终执行的是InvocationHandler接口的invoke方法,下面是截取出来的核心代码:
invoke
又进入了JDK动态代理的核心类JdkDynamicAopProxy#invok
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 @Override @Nullable public Object invoke (Object proxy, Method method, Object[] args) throws Throwable { Object oldProxy = null ; boolean setProxyContext = false ; TargetSource targetSource = this .advised.targetSource; Object target = null ; try { if (!this .equalsDefined && AopUtils.isEqualsMethod(method)) { return equals(args[0 ]); } else if (!this .hashCodeDefined && AopUtils.isHashCodeMethod(method)) { return hashCode(); } else if (method.getDeclaringClass() == DecoratingProxy.class) { return AopProxyUtils.ultimateTargetClass(this .advised); } else if (!this .advised.opaque && method.getDeclaringClass().isInterface() && method.getDeclaringClass().isAssignableFrom(Advised.class)) { return AopUtils.invokeJoinpointUsingReflection(this .advised, method, args); } Object retVal; if (this .advised.exposeProxy) { oldProxy = AopContext.setCurrentProxy(proxy); setProxyContext = true ; } target = targetSource.getTarget(); Class<?> targetClass = (target != null ? target.getClass() : null ); List<Object> chain = this .advised.getInterceptorsAndDynamicInterceptionAdvice(method, targetClass); if (chain.isEmpty()) { Object[] argsToUse = AopProxyUtils.adaptArgumentsIfNecessary(method, args); retVal = AopUtils.invokeJoinpointUsingReflection(target, method, argsToUse); } else { MethodInvocation invocation = new ReflectiveMethodInvocation (proxy, target, method, args, targetClass, chain); retVal = invocation.proceed(); } Class<?> returnType = method.getReturnType(); if (retVal != null && retVal == target && returnType != Object.class && returnType.isInstance(proxy) && !RawTargetAccess.class.isAssignableFrom(method.getDeclaringClass())) { retVal = proxy; } else if (retVal == null && returnType != Void.TYPE && returnType.isPrimitive()) { throw new AopInvocationException ( "Null return value from advice does not match primitive return type for: " + method); } return retVal; } finally { if (target != null && !targetSource.isStatic()) { targetSource.releaseTarget(target); } if (setProxyContext) { AopContext.setCurrentProxy(oldProxy); } } }
@Async注解的拦截器是AsyncExecutionInterceptor,它继承了MethodInterceptor接口。而MethodInterceptor就是AOP规范中的Advice(切点的处理器)。
chain不为空,执行第二个分支,构造ReflectiveMethodInvocation,然后执行proceed方法。
后面的代码AOP已经讲过不在赘述。
核心方法是InterceptorAndDynamicMethodMatcher.interceptor.invoke(this),实际就是执行了AsyncExecutionInterceptor.invoke,继续追!
AsyncExecutionInterceptor#invoke 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 @Override @Nullable public Object invoke (final MethodInvocation invocation) throws Throwable { Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null ); Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass); final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod); AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod); if (executor == null ) { throw new IllegalStateException ( "No executor specified and no default executor set on AsyncExecutionInterceptor either" ); } Callable<Object> task = () -> { try { Object result = invocation.proceed(); if (result instanceof Future) { return ((Future<?>) result).get(); } } catch (ExecutionException ex) { handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments()); } catch (Throwable ex) { handleError(ex, userDeclaredMethod, invocation.getArguments()); } return null ; }; return doSubmit(task, executor, invocation.getMethod().getReturnType()); }
doSubmit 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 @Nullable protected Object doSubmit (Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) { if (CompletableFuture.class.isAssignableFrom(returnType)) { return CompletableFuture.supplyAsync(() -> { try { return task.call(); } catch (Throwable ex) { throw new CompletionException (ex); } }, executor); } else if (ListenableFuture.class.isAssignableFrom(returnType)) { return ((AsyncListenableTaskExecutor) executor).submitListenable(task); } else if (Future.class.isAssignableFrom(returnType)) { return executor.submit(task); } else { executor.submit(task); return null ; } }
总结 整体流程大体可梳理为两条线:
从注解开始:@EnableAsync–》ProxyAsyncConfiguration类构造一个bean(类型:AsyncAnnotationBeanPostProcessor)
从AsyncAnnotationBeanPostProcessor这个类的bean的生命周期走:AOP-Advisor切面初始化(setBeanFactory())–>AOP-生成代理类AopProxy(postProcessAfterInitialization())–>AOP-切点执行(InvocationHandler.invoke)