摘要: 原创出处 http://www.iocoder.cn/Spring-Boot/Async-Job/ 「芋道源码」欢迎转载,保留摘要,谢谢!

  • 1. 概述

  • 2. 快速入门

  • 3. 异步回调

  • 4. 异步异常处理器

  • 5. 自定义执行器

  • 666. 彩蛋


本文在提供完整代码示例,可见 https://github.com/YunaiV/SpringBoot-Labs 的 lab-29 目录。

原创不易,给点个 Star 嘿,一起冲鸭!

1. 概述

在日常开发中,我们的逻辑都是同步调用,顺序执行。在一些场景下,我们会希望异步调用,将和主线程关联度低的逻辑异步调用,以实现让主线程更快的执行完成,提升性能。例如说:记录用户访问日志到数据库,记录管理员操作日志到数据库中。

异步调用,对应的是同步调用。

  • 同步调用:指程序按照 定义顺序 依次执行,每一行程序都必须等待上一行程序执行完成之后才能执行;

  • 异步调用:指程序在顺序执行时,不等待异步调用的语句返回结果,就执行后面的程序。

考虑到异步调用的可靠性,我们一般会考虑引入分布式消息队列,例如说 RabbitMQ、RocketMQ、Kafka 等等。但是在一些时候,我们并不需要这么高的可靠性,可以使用进程内的队列或者线程池。例如说示例代码如下:

public class Demo {

   public static void main(String[] args) {
       // 创建线程池。这里只是临时测试,不要扣艿艿遵守阿里 Java 开发规范,YEAH
       ExecutorService executor = Executors.newFixedThreadPool(10);

       // 提交任务到线程池中执行。
       executor.submit(new Runnable() {

           @Override
           public void run()
{
               System.out.println("听说我被异步调用了");
           }

       });
   }

}

友情提示:这里说进程内的队列或者线程池,相对不可靠的原因是,队列和线程池中的任务仅仅存储在内存中,如果 JVM 进程被异常关闭,将会导致丢失,未被执行。

而分布式消息队列,异步调用会以一个消息的形式,存储在消息队列的服务器上,所以即使 JVM 进程被异常关闭,消息依然在消息队列的服务器上。

所以,使用进程内的队列或者线程池来实现异步调用的话,一定要尽可能的保证 JVM 进程的优雅关闭,保证它们在关闭前被执行完成。

在 Spring Framework 的 Spring Task 模块,提供了 @Async 注解,可以添加在方法上,自动实现该方法的异步调用。

简单来说,我们可以像使用 @Transactional 声明式事务,使用 Spring Task 提供的 @Async 注解,声明式异步。而在实现原理上,也是基于 Spring AOP 拦截,实现异步提交该操作到线程池中,达到异步调用的目的。

如果胖友看过艿艿写的 《芋道 Spring Boot 定时任务入门》 文章,就会发现 Spring Task 模块,还提供了定时任务的功能。

下面,让我们一起遨游 Spring 异步任务的海洋。

2. 快速入门

示例代码对应仓库:lab-29-async-demo 。

本小节,我们会编写示例,对比同步调用和异步调用的性能差别,并演示 Spring @Async 注解的使用方式。

2.1 引入依赖

在 pom.xml 文件中,引入相关依赖。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
       
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

   <parent>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-parent</artifactId>
       <version>2.2.1.RELEASE</version>
       <relativePath/> <!-- lookup parent from repository -->
   </parent>
   <modelVersion>4.0.0</modelVersion>

   <artifactId>lab-29-async-demo</artifactId>

   <dependencies>
       <!-- 引入 Spring Boot 依赖 -->
       <dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter</artifactId>
       </dependency>

       <!-- 方便等会写单元测试 -->
       <dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter-test</artifactId>
           <scope>test</scope>
       </dependency>
   </dependencies>

</project>

因为 Spring Task 是 Spring Framework 的模块,所以在我们引入 spring-boot-web 依赖后,无需特别引入它。

2.2 Application

创建 Application.java 类,配置 @SpringBootApplication 注解。代码如下:

@SpringBootApplication
@EnableAsync // 开启 @Async 的支持
public class Application {

   public static void main(String[] args) {
       SpringApplication.run(Application.class, args);
   }

}
  • 在类上添加 @EnableAsync 注解,启用异步功能。

2.3 DemoService

在 cn.iocoder.springboot.lab29.asynctask.service 包路径下,创建 DemoService 类。代码如下:

// DemoService.java

@Service
public class DemoService {

   private Logger logger = LoggerFactory.getLogger(getClass());

   public Integer execute01() {
       logger.info("[execute01]");
       sleep(10);
       return 1;
   }

   public Integer execute02() {
       logger.info("[execute02]");
       sleep(5);
       return 2;
   }

   private static void sleep(int seconds) {
       try {
           Thread.sleep(seconds * 1000);
       } catch (InterruptedException e) {
           throw new RuntimeException(e);
       }
   }

}
  • 定义了 #execute01() 和 #execute02() 方法,分别 sleep 10 秒和 5 秒,模拟耗时操作。

  • 同时在每个方法里,使用 logger 打印日志,方便我们看到每个方法的开始执行时间,和执行所在线程

2.4 同步调用测试

创建 DemoServiceTest 测试类,编写 #task01() 方法,同步调用 DemoService 的上述两个方法。代码如下:

// DemoServiceTest.java

@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class DemoServiceTest {

   private Logger logger = LoggerFactory.getLogger(getClass());

   @Autowired
   private DemoService demoService;

   @Test
   public void task01() {
       long now = System.currentTimeMillis();
       logger.info("[task01][开始执行]");

       demoService.execute01();
       demoService.execute02();

       logger.info("[task01][结束执行,消耗时长 {} 毫秒]", System.currentTimeMillis() - now);
   }

}

运行单元测试,执行日志如下:

2019-11-30 14:03:35.820  INFO 64639 --- [           main] c.i.s.l.a.service.DemoServiceTest        : [task01][开始执行]
2019-11-30 14:03:35.828  INFO 64639 --- [           main] c.i.s.l.asynctask.service.DemoService    : [execute01]
2019-11-30 14:03:45.833  INFO 64639 --- [           main] c.i.s.l.asynctask.service.DemoService    : [execute02]
2019-11-30 14:03:50.834  INFO 64639 --- [           main] c.i.s.l.a.service.DemoServiceTest        : [task01][结束执行,消耗时长 15014 毫秒]
  • DemoService 的两个方法,顺序执行,一共消耗 15 秒左右。

  • DemoService 的两个方法,都在主线程中执行。

2.5 异步调用测试

修改 DemoService 的代码,增加 #execute01() 和 #execute02() 的异步调用。代码如下:

// DemoService.java

@Async
public Integer execute01Async() {
   return this.execute01();
}

@Async
public Integer execute02Async() {
   return this.execute02();
}
  • 额外增加了 #execute01Async() 和 #execute02Async() 方法,主要是不想破坏上面的「2.4 同步调用测试」哈。实际上,可以在 #execute01() 和 #execute02() 方法上,添加 @Async 注解,实现异步调用。

修改 DemoServiceTest 测试类,编写 #task02() 方法,异步调用上述的两个方法。代码如下:

// DemoServiceTest.java

@Test
public void task02() {
   long now = System.currentTimeMillis();
   logger.info("[task02][开始执行]");

   demoService.execute01Async();
   demoService.execute02Async();

   logger.info("[task02][结束执行,消耗时长 {} 毫秒]", System.currentTimeMillis() - now);
}

运行单元测试,执行日志如下:

2019-11-30 15:57:45.809  INFO 69165 --- [           main] c.i.s.l.a.service.DemoServiceTest        : [task02][开始执行]
2019-11-30 15:57:45.836  INFO 69165 --- [           main] c.i.s.l.a.service.DemoServiceTest        : [task02][结束执行,消耗时长 27 毫秒]

2019-11-30 15:57:45.844  INFO 69165 --- [         task-1] c.i.s.l.asynctask.service.DemoService    : [execute01]
2019-11-30 15:57:45.844  INFO 69165 --- [         task-2] c.i.s.l.asynctask.service.DemoService    : [execute02]
  • DemoService 的两个方法,异步执行,所以主线程只消耗 27 毫秒左右。注意,实际这两个方法,并没有执行完成。

  • DemoService 的两个方法,都在异步的线程池中,进行执行。

2.6 等待异步调用完成测试

在 「2.5 异步调用测试」 中,两个方法只是发布异步调用,并未执行完成。在一些业务场景中,我们希望达到异步调用的效果,同时主线程阻塞等待异步调用的结果。

修改 DemoService 的代码,增加 #execute01() 和 #execute02() 的异步调用,并返回 Future 对象。代码如下:

// DemoService.java

@Async
public Future<Integer> execute01AsyncWithFuture() {
   return AsyncResult.forValue(this.execute01());
}

@Async
public Future<Integer> execute02AsyncWithFuture() {
   return AsyncResult.forValue(this.execute02());
}
  • 相比 「2.5 异步调用测试」 的两个方法,我们额外增加调用 AsyncResult#forValue(V value) 方法,返回带有执行结果的 Future 对象。

修改 DemoServiceTest 测试类,编写 #task03() 方法,异步调用上述的两个方法,并阻塞等待执行完成。代码如下:

// DemoServiceTest.java

@Test
public void task03() throws ExecutionException, InterruptedException
{
   long now = System.currentTimeMillis();
   logger.info("[task03][开始执行]");

   // <1> 执行任务
   Future<Integer> execute01Result = demoService.execute01AsyncWithFuture();
   Future<Integer> execute02Result = demoService.execute02AsyncWithFuture();
   // <2> 阻塞等待结果
   execute01Result.get();
   execute02Result.get();

   logger.info("[task03][结束执行,消耗时长 {} 毫秒]", System.currentTimeMillis() - now);
}
  • <1> 处,异步调用两个方法,并返回对应的 Future 对象。这样,这两个异步调用的逻辑,可以并行执行。

  • <2> 处,分别调用两个 Future 对象的 #get() 方法,阻塞等待结果。

运行单元测试,执行日志如下:

2019-11-30 16:10:22.226  INFO 69641 --- [           main] c.i.s.l.a.service.DemoServiceTest        : [task03][开始执行]

2019-11-30 16:10:22.272  INFO 69641 --- [         task-1] c.i.s.l.asynctask.service.DemoService    : [execute01]
2019-11-30 16:10:22.272  INFO 69641 --- [         task-2] c.i.s.l.asynctask.service.DemoService    : [execute02]

2019-11-30 16:10:32.276  INFO 69641 --- [           main] c.i.s.l.a.service.DemoServiceTest        : [task03][结束执行,消耗时长 10050 毫秒]
  • DemoService 的两个方法,异步执行,因为主线程阻塞等待执行结果,所以消耗 10 秒左右。当同时有多个异步调用,并阻塞等待执行结果,消耗时长由最慢的异步调用的逻辑所决定。

  • DemoService 的两个方法,都在异步的线程池中,进行执行。

下面「2.7 应用配置文件」小节,是补充知识,建议看看。

2.7 应用配置文件

在 application.yml 中,添加 Spring Task 定时任务的配置,如下:

spring:
 task:
   # Spring 执行器配置,对应 TaskExecutionProperties 配置类。对于 Spring 异步任务,会使用该执行器。
   execution:
     thread-name-prefix: task- # 线程池的线程名的前缀。默认为 task- ,建议根据自己应用来设置
     pool: # 线程池相关
       core-size: 8 # 核心线程数,线程池创建时候初始化的线程数。默认为 8 。
       max-size: 20 # 最大线程数,线程池最大的线程数,只有在缓冲队列满了之后,才会申请超过核心线程数的线程。默认为 Integer.MAX_VALUE
       keep-alive: 60s # 允许线程的空闲时间,当超过了核心线程之外的线程,在空闲时间到达之后会被销毁。默认为 60 秒
       queue-capacity: 200 # 缓冲队列大小,用来缓冲执行任务的队列的大小。默认为 Integer.MAX_VALUE 。
       allow-core-thread-timeout: true # 是否允许核心线程超时,即开启线程池的动态增长和缩小。默认为 true 。
     shutdown:
       await-termination: true # 应用关闭时,是否等待定时任务执行完成。默认为 false ,建议设置为 true
       await-termination-period: 60 # 等待任务完成的最大时长,单位为秒。默认为 0 ,根据自己应用来设置
  • 在 spring.task.execution 配置项,Spring Task 调度任务的配置,对应 TaskExecutionProperties 配置类。

  • Spring Boot TaskExecutionAutoConfiguration 自动化配置类,实现 Spring Task 的自动配置,创建 ThreadPoolTaskExecutor 基于线程池的任务执行器。本质上,ThreadPoolTaskExecutor 是基于 ThreadPoolExecutor 的封装,主要增加提交任务,返回 ListenableFuture 对象的功能。

注意spring.task.execution.shutdown 配置项,是为了实现 Spring Task 异步任务的优雅关闭。我们想象一下,如果异步任务在执行的过程中,如果应用开始关闭,把异步任务需要使用到的 Spring Bean 进行销毁,例如说数据库连接池,那么此时异步任务还在执行中,一旦需要访问数据库,可能会导致报错。

  • 所以,通过配置 await-termination = true ,实现应用关闭时,等待异步任务执行完成。这样,应用在关闭的时,Spring 会优先等待 ThreadPoolTaskScheduler 执行完任务之后,再开始 Spring Bean 的销毁。

  • 同时,又考虑到我们不可能无限等待异步任务全部执行结束,因此可以配置 await-termination-period = 60 ,等待任务完成的最大时长,单位为秒。具体设置多少的等待时长,可以根据自己应用的需要。

3. 异步回调

示例代码对应仓库:lab-29-async-demo 。

异步 + 回调,快活似神仙。所以本小节我们来看看,如何在异步调用完成后,实现自定义回调

考虑到让胖友更加理解 Spring Task 异步回调是如何实现的,我们会在 「3.1 AsyncResult」 和 「3.2 ListenableFutureTask」小节进行部分源码解析,请保持淡定。如果不想看的胖友,可以直接看 「3.3 具体示例」 小节。

友情提示:该示例,基于 「2. 快速入门」 的 lab-29-async-demo 的基础上,继续改造。

3.1 AsyncResult

在 「2.6 等待异步调用完成测试」 中,我们看到了 AsyncResult 类,表示异步结果。返回结果分成两种情况:

  • 执行成功时,调用 AsyncResult#forValue(V value) 静态方法,返回成功的 ListenableFuture 对象。代码如下:

      // AsyncResult.java

     @Nullable
     private final V value;

     public static <V> ListenableFuture<V> forValue(V value) {
         return new AsyncResult<>(value, null);
     }
  • 执行异常时,调用 AsyncResult#forExecutionException(Throwable ex) 静态方法,返回异常的 ListenableFuture 对象。代码如下:

      // AsyncResult.java

     @Nullable
     private final Throwable executionException;

     public static <V> ListenableFuture<V> forExecutionException(Throwable ex) {
         return new AsyncResult<>(null, ex);
     }

同时,AsyncResult 实现了 ListenableFuture 接口,提供异步执行结果的回调处理。这里,我们先来看看 ListenableFuture 接口。代码如下:

// ListenableFuture.java

public interface ListenableFuture<T> extends Future<T> {

   // 添加回调方法,统一处理成功和异常的情况。
   void addCallback(ListenableFutureCallback<? super T> callback);

   // 添加成功和失败的回调方法,分别处理成功和异常的情况。
   void addCallback(SuccessCallback<? super T> successCallback, FailureCallback failureCallback);

   // 将 ListenableFuture 转换成 JDK8 提供的 CompletableFuture 。
   // 这样,后续我们可以使用 ListenableFuture 来设置回调
   // 不了解 CompletableFuture 的胖友,可以看看 https://colobu.com/2016/02/29/Java-CompletableFuture/ 文章。
   default CompletableFuture<T> completable() {
       CompletableFuture<T> completable = new DelegatingCompletableFuture<>(this);
       addCallback(completable::complete, completable::completeExceptionally);
       return completable;
   }

}
  • 看下每个接口方法上的注释。

因为 ListenableFuture 继承了 Future 接口,所以 AsyncResult 也需要实现 Future 接口。这里,我们再来看看 Future 接口。代码如下:

// Future.java
public interface Future<V> {

   // 获取异步执行的结果,如果没有结果可用,此方法会阻塞直到异步计算完成。
   V get() throws InterruptedException, ExecutionException;

   // 获取异步执行结果,如果没有结果可用,此方法会阻塞,但是会有时间限制,如果阻塞时间超过设定的 timeout 时间,该方法将抛出异常。
   V get(long timeout, TimeUnit unit)
       
throws InterruptedException, ExecutionException, TimeoutException
;

   // 如果任务执行结束,无论是正常结束或是中途取消还是发生异常,都返回 true 。
   boolean isDone();

   // 如果任务完成前被取消,则返回 true 。
   boolean isCancelled();

   // 如果任务还没开始,执行 cancel(...) 方法将返回 false;
   // 如果任务已经启动,执行 cancel(true) 方法将以中断执行此任务线程的方式来试图停止任务,如果停止成功,返回 true ;
   // 当任务已经启动,执行c ancel(false) 方法将不会对正在执行的任务线程产生影响(让线程正常执行到完成),此时返回 false ;
   // 当任务已经完成,执行 cancel(...) 方法将返回 false 。
   // mayInterruptRunning 参数表示是否中断执行中的线程。
   boolean cancel(boolean mayInterruptIfRunning);

}
  • 如上注释内容,参考自 《Java 多线程编程:Callable、Future 和 FutureTask 浅析》 文章。

AsyncResult 对 ListenableFuture 定义的 #addCallback(...) 接口方法,实现代码如下:

// AsyncResult.java

@Override
public void addCallback(ListenableFutureCallback<? super V> callback) {
   addCallback(callback, callback);
}

@Override
public void addCallback(SuccessCallback<? super V> successCallback, FailureCallback failureCallback) {
   try {
       if (this.executionException != null) { // <1>
           failureCallback.onFailure(exposedException(this.executionException));
       } else { // <2>
           successCallback.onSuccess(this.value);
       }
   } catch (Throwable ex) { // <3>
       // Ignore
   }
}

// 从 ExecutionException 中,获得原始异常。
private static Throwable exposedException(Throwable original) {
   if (original instanceof ExecutionException) {
       Throwable cause = original.getCause();
       if (cause != null) {
           return cause;
       }
   }
   return original;
}