开启异步 开启异步任务使用方法 1).方法上加@Async注解
@EnableAsync用于开启Spring bean异步方法的能力。下面是注解EnableAsync的定义。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented @Import(AsyncConfigurationSelector.class) public @interface EnableAsync { Class<? extends Annotation > annotation() default Annotation.class; boolean proxyTargetClass () default false ; AdviceMode mode () default AdviceMode.PROXY; int order () default Ordered.LOWEST_PRECEDENCE; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 public abstract class AdviceModeImportSelector <A extends Annotation > implements ImportSelector { public static final String DEFAULT_ADVICE_MODE_ATTRIBUTE_NAME = "mode" ; protected String getAdviceModeAttributeName () { return DEFAULT_ADVICE_MODE_ATTRIBUTE_NAME; } @Override public final String[] selectImports(AnnotationMetadata importingClassMetadata) { Class<?> annType = GenericTypeResolver.resolveTypeArgument(getClass(), AdviceModeImportSelector.class); Assert.state(annType != null , "Unresolvable type argument for AdviceModeImportSelector" ); AnnotationAttributes attributes = AnnotationConfigUtils.attributesFor(importingClassMetadata, annType); if (attributes == null ) { throw new IllegalArgumentException (String.format( "@%s is not present on importing class '%s' as expected" , annType.getSimpleName(), importingClassMetadata.getClassName())); } AdviceMode adviceMode = attributes.getEnum(getAdviceModeAttributeName()); String[] imports = selectImports(adviceMode); if (imports == null ) { throw new IllegalArgumentException ("Unknown AdviceMode: " + adviceMode); } return imports; } @Nullable protected abstract String[] selectImports(AdviceMode adviceMode); }
AsyncConfigurationSelector 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public class AsyncConfigurationSelector extends AdviceModeImportSelector <EnableAsync> { private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME = "org.springframework.scheduling.aspectj.AspectJAsyncConfiguration" ; @Override @Nullable public String[] selectImports(AdviceMode adviceMode) { switch (adviceMode) { case PROXY: return new String []{ProxyAsyncConfiguration.class.getName()}; case ASPECTJ: return new String []{ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME}; default : return null ; } } }
异步配置类 ProxyAsyncConfiguration
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 @Configuration @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration { @Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME) @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public AsyncAnnotationBeanPostProcessor asyncAdvisor () { Assert.notNull(this .enableAsync, "@EnableAsync annotation metadata was not injected" ); AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor (); bpp.configure(this .executor, this .exceptionHandler); Class<? extends Annotation > customAsyncAnnotation = this .enableAsync.getClass("annotation" ); if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation" )) { bpp.setAsyncAnnotationType(customAsyncAnnotation); } bpp.setProxyTargetClass(this .enableAsync.getBoolean("proxyTargetClass" )); bpp.setOrder(this .enableAsync.<Integer>getNumber("order" )); return bpp; } }
AbstractAsyncConfiguration 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 @Configuration public abstract class AbstractAsyncConfiguration implements ImportAware { @Nullable protected AnnotationAttributes enableAsync; @Nullable protected Supplier<Executor> executor; @Nullable protected Supplier<AsyncUncaughtExceptionHandler> exceptionHandler; @Override public void setImportMetadata (AnnotationMetadata importMetadata) { this .enableAsync = AnnotationAttributes.fromMap( importMetadata.getAnnotationAttributes(EnableAsync.class.getName(), false )); if (this .enableAsync == null ) { throw new IllegalArgumentException ( "@EnableAsync is not present on importing class " + importMetadata.getClassName()); } } @Autowired(required = false) void setConfigurers (Collection<AsyncConfigurer> configurers) { if (CollectionUtils.isEmpty(configurers)) { return ; } if (configurers.size() > 1 ) { throw new IllegalStateException ("Only one AsyncConfigurer may exist" ); } AsyncConfigurer configurer = configurers.iterator().next(); this .executor = configurer::getAsyncExecutor; this .exceptionHandler = configurer::getAsyncUncaughtExceptionHandler; } }
1)setImportMetadata 设置注解属性,即属性1
2)setConfigurers 设置异步任务执行器和异常处理器,即属性2,3
AsyncAnnotationBeanPostProcessor这个类的Bean 初始化时 : BeanFactoryAware接口setBeanFactory方法中,对AsyncAnnotationAdvisor异步注解切面进行了构造。
setBeanFactory 1 2 3 4 5 6 7 8 9 10 11 @Override public void setBeanFactory (BeanFactory beanFactory) { super .setBeanFactory(beanFactory); AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor (this .executor, this .exceptionHandler); if (this .asyncAnnotationType != null ) { advisor.setAsyncAnnotationType(this .asyncAnnotationType); } advisor.setBeanFactory(beanFactory); this .advisor = advisor; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 public class AsyncAnnotationAdvisor extends AbstractPointcutAdvisor implements BeanFactoryAware { private Advice advice; private Pointcut pointcut; public AsyncAnnotationAdvisor () { this ((Supplier<Executor>) null , (Supplier<AsyncUncaughtExceptionHandler>) null ); } @SuppressWarnings("unchecked") public AsyncAnnotationAdvisor ( @Nullable Executor executor, @Nullable AsyncUncaughtExceptionHandler exceptionHandler) { this (SingletonSupplier.ofNullable(executor), SingletonSupplier.ofNullable(exceptionHandler)); } @SuppressWarnings("unchecked") public AsyncAnnotationAdvisor ( @Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) { Set<Class<? extends Annotation >> asyncAnnotationTypes = new LinkedHashSet <>(2 ); asyncAnnotationTypes.add(Async.class); try { asyncAnnotationTypes.add((Class<? extends Annotation >) ClassUtils.forName("javax.ejb.Asynchronous" , AsyncAnnotationAdvisor.class.getClassLoader())); } catch (ClassNotFoundException ex) { } this .advice = buildAdvice(executor, exceptionHandler); this .pointcut = buildPointcut(asyncAnnotationTypes); } public void setAsyncAnnotationType (Class<? extends Annotation> asyncAnnotationType) { Assert.notNull(asyncAnnotationType, "'asyncAnnotationType' must not be null" ); Set<Class<? extends Annotation >> asyncAnnotationTypes = new HashSet <>(); asyncAnnotationTypes.add(asyncAnnotationType); this .pointcut = buildPointcut(asyncAnnotationTypes); } @Override public void setBeanFactory (BeanFactory beanFactory) { if (this .advice instanceof BeanFactoryAware) { ((BeanFactoryAware) this .advice).setBeanFactory(beanFactory); } } @Override public Advice getAdvice () { return this .advice; } @Override public Pointcut getPointcut () { return this .pointcut; } protected Advice buildAdvice ( @Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) { AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor (null ); interceptor.configure(executor, exceptionHandler); return interceptor; } protected Pointcut buildPointcut (Set<Class<? extends Annotation>> asyncAnnotationTypes) { ComposablePointcut result = null ; for (Class<? extends Annotation > asyncAnnotationType : asyncAnnotationTypes) { Pointcut cpc = new AnnotationMatchingPointcut (asyncAnnotationType, true ); Pointcut mpc = new AnnotationMatchingPointcut (null , asyncAnnotationType, true ); if (result == null ) { result = new ComposablePointcut (cpc); } else { result.union(cpc); } result = result.union(mpc); } return (result != null ? result : Pointcut.TRUE); } }
AnnotationAsyncExecutionInterceptor 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 public class AsyncExecutionInterceptor extends AsyncExecutionAspectSupport implements MethodInterceptor , Ordered { public AsyncExecutionInterceptor (@Nullable Executor defaultExecutor) { super (defaultExecutor); } public AsyncExecutionInterceptor (@Nullable Executor defaultExecutor, AsyncUncaughtExceptionHandler exceptionHandler) { super (defaultExecutor, exceptionHandler); } @Override @Nullable public Object invoke (final MethodInvocation invocation) throws Throwable { Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null ); Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass); final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod); AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod); if (executor == null ) { throw new IllegalStateException ( "No executor specified and no default executor set on AsyncExecutionInterceptor either" ); } Callable<Object> task = () -> { try { Object result = invocation.proceed(); if (result instanceof Future) { return ((Future<?>) result).get(); } } catch (ExecutionException ex) { handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments()); } catch (Throwable ex) { handleError(ex, userDeclaredMethod, invocation.getArguments()); } return null ; }; return doSubmit(task, executor, invocation.getMethod().getReturnType()); } @Override @Nullable protected String getExecutorQualifier (Method method) { return null ; } @Override @Nullable protected Executor getDefaultExecutor (@Nullable BeanFactory beanFactory) { Executor defaultExecutor = super .getDefaultExecutor(beanFactory); return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor ()); } @Override public int getOrder () { return Ordered.HIGHEST_PRECEDENCE; } }
AsyncExecutionAspectSupport#determineAsyncExecutor 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 protected AsyncTaskExecutor determineAsyncExecutor (Method method) { AsyncTaskExecutor executor = this .executors.get(method); if (executor == null ) { Executor targetExecutor; String qualifier = getExecutorQualifier(method); if (StringUtils.hasLength(qualifier)) { targetExecutor = findQualifiedExecutor(this .beanFactory, qualifier); } else { targetExecutor = this .defaultExecutor.get(); } if (targetExecutor == null ) { return null ; } executor = (targetExecutor instanceof AsyncListenableTaskExecutor ? (AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter (targetExecutor)); this .executors.put(method, executor); } return executor; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 public class AnnotationMatchingPointcut implements Pointcut { private final ClassFilter classFilter; private final MethodMatcher methodMatcher; public AnnotationMatchingPointcut (Class<? extends Annotation> classAnnotationType) { this (classAnnotationType, false ); } public AnnotationMatchingPointcut (Class<? extends Annotation> classAnnotationType, boolean checkInherited) { this .classFilter = new AnnotationClassFilter (classAnnotationType, checkInherited); this .methodMatcher = MethodMatcher.TRUE; } public AnnotationMatchingPointcut (@Nullable Class<? extends Annotation> classAnnotationType, @Nullable Class<? extends Annotation> methodAnnotationType) { this (classAnnotationType, methodAnnotationType, false ); } public AnnotationMatchingPointcut (@Nullable Class<? extends Annotation> classAnnotationType, @Nullable Class<? extends Annotation> methodAnnotationType, boolean checkInherited) { Assert.isTrue((classAnnotationType != null || methodAnnotationType != null ), "Either Class annotation type or Method annotation type needs to be specified (or both)" ); if (classAnnotationType != null ) { this .classFilter = new AnnotationClassFilter (classAnnotationType, checkInherited); } else { this .classFilter = new AnnotationCandidateClassFilter (methodAnnotationType); } if (methodAnnotationType != null ) { this .methodMatcher = new AnnotationMethodMatcher (methodAnnotationType, checkInherited); } else { this .methodMatcher = MethodMatcher.TRUE; } } @Override public ClassFilter getClassFilter () { return this .classFilter; } @Override public MethodMatcher getMethodMatcher () { return this .methodMatcher; } @Override public boolean equals (@Nullable Object other) { if (this == other) { return true ; } if (!(other instanceof AnnotationMatchingPointcut)) { return false ; } AnnotationMatchingPointcut otherPointcut = (AnnotationMatchingPointcut) other; return (this .classFilter.equals(otherPointcut.classFilter) && this .methodMatcher.equals(otherPointcut.methodMatcher)); } @Override public int hashCode () { return this .classFilter.hashCode() * 37 + this .methodMatcher.hashCode(); } @Override public String toString () { return "AnnotationMatchingPointcut: " + this .classFilter + ", " + this .methodMatcher; } public static AnnotationMatchingPointcut forClassAnnotation (Class<? extends Annotation> annotationType) { Assert.notNull(annotationType, "Annotation type must not be null" ); return new AnnotationMatchingPointcut (annotationType); } public static AnnotationMatchingPointcut forMethodAnnotation (Class<? extends Annotation> annotationType) { Assert.notNull(annotationType, "Annotation type must not be null" ); return new AnnotationMatchingPointcut (null , annotationType); } private static class AnnotationCandidateClassFilter implements ClassFilter { private final Class<? extends Annotation > annotationType; AnnotationCandidateClassFilter(Class<? extends Annotation > annotationType) { this .annotationType = annotationType; } @Override public boolean matches (Class<?> clazz) { return AnnotationUtils.isCandidateClass(clazz, this .annotationType); } @Override public boolean equals (Object obj) { if (this == obj) { return true ; } if (!(obj instanceof AnnotationCandidateClassFilter)) { return false ; } AnnotationCandidateClassFilter that = (AnnotationCandidateClassFilter) obj; return this .annotationType.equals(that.annotationType); } @Override public int hashCode () { return this .annotationType.hashCode(); } @Override public String toString () { return getClass().getName() + ": " + this .annotationType; } } }
(AsyncAnnotationBeanPostProcessor -》postProcessAfterInitialization())
AbstractAdvisingBeanPostProcessor#postProcessAfterInitialization 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 @Override public Object postProcessAfterInitialization (Object bean, String beanName) { if (this .advisor == null || bean instanceof AopInfrastructureBean) { return bean; } if (bean instanceof Advised) { Advised advised = (Advised) bean; if (!advised.isFrozen() && isEligible(AopUtils.getTargetClass(bean))) { if (this .beforeExistingAdvisors) { advised.addAdvisor(0 , this .advisor); } else { advised.addAdvisor(this .advisor); } return bean; } } if (isEligible(bean, beanName)) { ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName); if (!proxyFactory.isProxyTargetClass()) { evaluateProxyInterfaces(bean.getClass(), proxyFactory); } proxyFactory.addAdvisor(this .advisor); customizeProxyFactory(proxyFactory); return proxyFactory.getProxy(getProxyClassLoader()); } return bean; }
isEligible用于判断这个类或者这个类中的某个方法是否含有注解,AsyncAnnotationAdvisor 实现了PointcutAdvisor接口。
AbstractBeanFactoryAwareAdvisingPostProcessor#isEligible 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 protected boolean isEligible (Object bean, String beanName) { return isEligible(bean.getClass()); } protected boolean isEligible (Class<?> targetClass) { Boolean eligible = this .eligibleBeans.get(targetClass); if (eligible != null ) { return eligible; } if (this .advisor == null ) { return false ; } eligible = AopUtils.canApply(this .advisor, targetClass); this .eligibleBeans.put(targetClass, eligible); return eligible; }
canApply 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public static boolean canApply (Advisor advisor, Class<?> targetClass) { return canApply(advisor, targetClass, false ); } public static boolean canApply (Advisor advisor, Class<?> targetClass, boolean hasIntroductions) { if (advisor instanceof IntroductionAdvisor) { return ((IntroductionAdvisor) advisor).getClassFilter().matches(targetClass); } else if (advisor instanceof PointcutAdvisor) { PointcutAdvisor pca = (PointcutAdvisor) advisor; return canApply(pca.getPointcut(), targetClass, hasIntroductions); } else { return true ; } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 @Override @Nullable public Object invoke (Object proxy, Method method, Object[] args) throws Throwable { Object oldProxy = null ; boolean setProxyContext = false ; TargetSource targetSource = this .advised.targetSource; Object target = null ; try { if (!this .equalsDefined && AopUtils.isEqualsMethod(method)) { return equals(args[0 ]); } else if (!this .hashCodeDefined && AopUtils.isHashCodeMethod(method)) { return hashCode(); } else if (method.getDeclaringClass() == DecoratingProxy.class) { return AopProxyUtils.ultimateTargetClass(this .advised); } else if (!this .advised.opaque && method.getDeclaringClass().isInterface() && method.getDeclaringClass().isAssignableFrom(Advised.class)) { return AopUtils.invokeJoinpointUsingReflection(this .advised, method, args); } Object retVal; if (this .advised.exposeProxy) { oldProxy = AopContext.setCurrentProxy(proxy); setProxyContext = true ; } target = targetSource.getTarget(); Class<?> targetClass = (target != null ? target.getClass() : null ); List<Object> chain = this .advised.getInterceptorsAndDynamicInterceptionAdvice(method, targetClass); if (chain.isEmpty()) { Object[] argsToUse = AopProxyUtils.adaptArgumentsIfNecessary(method, args); retVal = AopUtils.invokeJoinpointUsingReflection(target, method, argsToUse); } else { MethodInvocation invocation = new ReflectiveMethodInvocation (proxy, target, method, args, targetClass, chain); retVal = invocation.proceed(); } Class<?> returnType = method.getReturnType(); if (retVal != null && retVal == target && returnType != Object.class && returnType.isInstance(proxy) && !RawTargetAccess.class.isAssignableFrom(method.getDeclaringClass())) { retVal = proxy; } else if (retVal == null && returnType != Void.TYPE && returnType.isPrimitive()) { throw new AopInvocationException ( "Null return value from advice does not match primitive return type for: " + method); } return retVal; } finally { if (target != null && !targetSource.isStatic()) { targetSource.releaseTarget(target); } if (setProxyContext) { AopContext.setCurrentProxy(oldProxy); } } }
AsyncExecutionInterceptor#invoke 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 @Override @Nullable public Object invoke (final MethodInvocation invocation) throws Throwable { Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null ); Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass); final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod); AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod); if (executor == null ) { throw new IllegalStateException ( "No executor specified and no default executor set on AsyncExecutionInterceptor either" ); } Callable<Object> task = () -> { try { Object result = invocation.proceed(); if (result instanceof Future) { return ((Future<?>) result).get(); } } catch (ExecutionException ex) { handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments()); } catch (Throwable ex) { handleError(ex, userDeclaredMethod, invocation.getArguments()); } return null ; }; return doSubmit(task, executor, invocation.getMethod().getReturnType()); }
doSubmit 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 @Nullable protected Object doSubmit (Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) { if (CompletableFuture.class.isAssignableFrom(returnType)) { return CompletableFuture.supplyAsync(() -> { try { return task.call(); } catch (Throwable ex) { throw new CompletionException (ex); } }, executor); } else if (ListenableFuture.class.isAssignableFrom(returnType)) { return ((AsyncListenableTaskExecutor) executor).submitListenable(task); } else if (Future.class.isAssignableFrom(returnType)) { return executor.submit(task); } else { executor.submit(task); return null ; } }
总结 整体流程大体可梳理为两条线: