springboot~CompletableFuture并行计算

在Spring中,CompletableFuture通常用于异步编程,可以方便地处理异步任务的执行和结果处理,CompletableFuture 是 Java 8 引入的一个类,用于支持异步编程和并发操作。它基于 Future 和 CompletionStage 接口,提供了丰富的方法来处理异步任务的执行和结果处理。

下面是 CompletableFuture 实现的一些关键原理:

  1. 线程池支持CompletableFuture 内部使用线程池来执行异步任务,可以通过指定不同的线程池来控制任务的执行方式。默认情况下,CompletableFuture 使用 ForkJoinPool.commonPool() 作为默认的线程池。

  2. 回调函数CompletableFuture 支持链式调用,可以通过 thenApply(), thenAccept(), thenRun(), thenCompose() 等方法添加回调函数,在异步任务完成后处理任务的结果或执行下一步操作。

  3. 异常处理CompletableFuture 提供了 exceptionally(), handle(), whenComplete() 等方法来处理异步任务中可能抛出的异常,确保异常能够被捕获并处理。

  4. 组合操作CompletableFuture 支持多个 CompletableFuture 对象之间的组合操作,如 thenCombine(), thenCompose(), allOf(), anyOf() 等方法,实现并行执行、串行执行、等待所有任务完成等功能。

  5. CompletableFuture 工厂方法:除了 supplyAsync() 方法外,CompletableFuture 还提供了一系列工厂方法来创建 CompletableFuture 对象,如 runAsync(), completedFuture(), failedFuture() 等,方便快速创建并管理异步任务。

总的来说,CompletableFuture 的实现基于 Future 和 CompletionStage 接口,利用线程池、回调函数、异常处理、组合操作等机制,提供了强大而灵活的异步编程功能,使得开发人员能够更加方便地处理异步任务的执行和结果处理。

使用方法(一)链式

如果我们的业务方法已经写完了,这时可以直接通过supplyAsync方法来调用这些已知的方法,而不需要重新开发

CompletableFuture<String> a1 = CompletableFuture.supplyAsync(() -> {
      try {
          Thread.sleep(1000);
      } catch (InterruptedException e) {
          throw new RuntimeException(e);
      }
      return "Hello World";
  });
  CompletableFuture<String> a2 = CompletableFuture.supplyAsync(() -> {
      try {
          Thread.sleep(2000);
      } catch (InterruptedException e) {
          throw new RuntimeException(e);
      }
      return "Hello World";
  });
  CompletableFuture<String> a3 = CompletableFuture.supplyAsync(() -> {
      try {
          Thread.sleep(3000);
      } catch (InterruptedException e) {
          throw new RuntimeException(e);
      }
      return "Hello World";
  });

  // 这块最后是并行计算时间为3秒
  CompletableFuture.allOf(a1, a2, a3).join();

  String result = a1.get() + " | " + a2.get() + " | " + a3.get();

使用方法(二)独立方法

如果方法比较独立,并且之前没有开发过,那么你可以通过异步方法来将这些逻辑与调用代码解耦

@Service
@EnableAsync
public class ParallelTaskService {

    @Async
    public CompletableFuture<String> task1() {
        // 模拟一个耗时操作
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        return CompletableFuture.completedFuture("Task 1 completed");
    }

    @Async
    public CompletableFuture<String> task2() {
        // 模拟另一个耗时操作
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        return CompletableFuture.completedFuture("Task 2 completed");
    }
}

// 并行计算时,响应时间是2和3秒之中最大值,即3秒
@GetMapping("/hello-world2")
public CompletableFuture<String> helloWorld2() {
    CompletableFuture<String> task1Result = parallelTaskService.task1();
    CompletableFuture<String> task2Result = parallelTaskService.task2();

    // 等待所有任务都完成
    CompletableFuture<Void> allOf = CompletableFuture.allOf(task1Result, task2Result);

    // 处理所有任务完成后的逻辑
    return allOf.thenApply(voidResult -> {
        String result = task1Result.join() + " | " + task2Result.join();
        return result;
    });
}