CompletableFuture

1. 简介

本教程是CompletableFuture类的功能和用例指南,该类作为 Java 8 并发 API 改进引入。

延伸阅读:

Java中的Runnable-Callable
java.util.concurrent.Future 指南

2. Java 中的异步计算

异步计算很难推理。通常,我们希望将任何计算视为一系列步骤,但在异步计算的情况下,表示为回调的操作往往分散在代码中或彼此之间深深嵌套。当我们需要处理其中一个步骤中可能发生的错误时,情况会变得更糟。

Future接口是在 Java 5 中添加的,作为异步计算的结果,但它没有任何方法来组合这些计算或处理可能的错误。

Java 8引入了CompletableFuture类。除了Future接口,它还实现了CompletionStage接口。此接口定义了异步计算步骤的协定,我们可以将其与其他步骤结合使用。

CompletableFuture同时是一个构建块和一个框架,有大约50种不同的方法来组合,组合和执行异步计算步骤以及处理错误。

如此大的 API 可能会让人不知所措,但这些大多属于几个清晰而不同的用例。

3. 使用CompletableFuture作为简单的Future接口实例

首先,CompletableFuture类实现了 Future 接口,以便我们可以将其用作Future实现,但具有额外的完成逻辑。

例如,我们可以创建一个无参的构造函数类实例来表示某个Future结果,将其分发给消费者,并在将来的某个时间使用complete方法完成它。使用者可以使用get方法来阻止当前线程,直到提供此结果。

在下面的示例中,我们有一个方法可以创建一个CompletableFuture实例,然后在另一个线程中剥离一些计算并立即返回Future

计算完成后,该方法通过将结果提供给完整方法来完成Future

代码语言:javascript代码运行次数:0运行复制
public Future<String> calculateAsync() throws InterruptedException {
    CompletableFuture<String> completableFuture = new CompletableFuture<>();

    Executors.newCachedThreadPool().submit(() -> {
        Thread.sleep(500);
        completableFutureplete("Hello");
        return null;
    });

    return completableFuture;
}Copy

为了剥离计算,我们使用执行器_API。这种创建和完成_CompletableFuture的方法可以与任何并发机制或 API 一起使用,包括原始线程。

请注意,计算异步方法返回一个Future实例。

我们只需调用该方法,接收Future实例,并在准备好阻止结果时对其调用get方法。

另外,请注意get方法会引发一些已检查的异常,即ExecutionException(封装在计算过程中发生的异常)和InterruptedException(表示线程在活动之前或期间被中断的异常):

代码语言:javascript代码运行次数:0运行复制
Future<String> completableFuture = calculateAsync();

// ... 

String result = completableFuture.get();
assertEquals("Hello", result);Copy

如果我们已经知道计算的结果,我们可以将静态completeFuture方法与表示此计算结果的参数一起使用。因此,Futureget方法永远不会阻塞,而是立即返回以下结果:

代码语言:javascript代码运行次数:0运行复制
Future<String> completableFuture = 
  CompletableFuturepletedFuture("Hello");

// ...

String result = completableFuture.get();
assertEquals("Hello", result);Copy

作为另一种情况,我们可能希望取消Future指令的执行。

4.具有封装计算逻辑的CompletableFuture

上面的代码允许我们选择任何并发执行机制,但是如果我们想跳过这个样板并异步执行一些代码怎么办?

静态方法runAsync 和 supplyAsync允许我们相应地从RunnableSupplier函数类型中创建一个CompletableFuture实例。

RunnableSupplier是函数接口,由于新的 Java 8 功能,它们允许将其实例作为 lambda 表达式传递。

Runnable接口与线程中使用的旧接口相同,不允许返回值。

Supplier接口是一个泛型函数接口,具有没有参数并返回参数化类型值的单个方法。

这允许我们提供供应商的实例作为 lambda 表达式,该表达式执行计算并返回结果。它就像:

代码语言:javascript代码运行次数:0运行复制
CompletableFuture<String> future
  = CompletableFuture.supplyAsync(() -> "Hello");

// ...

assertEquals("Hello", future.get());Copy

5. 异步计算结果的处理

处理计算结果的最通用方法是将其馈送到函数。thenApply方法正是这样做的;它接受一个函数实例,使用它来处理结果,并返回一个Future来保存函数返回的值:

代码语言:javascript代码运行次数:0运行复制
CompletableFuture<String> completableFuture
  = CompletableFuture.supplyAsync(() -> "Hello");

CompletableFuture<String> future = completableFuture
  .thenApply(s -> s + " World");

assertEquals("Hello World", future.get());Copy

如果我们不需要在Future链中返回值,我们可以使用Consumer函数接口的实例。它的单一方法接受一个参数并返回void

CompletableFuture中有一个用于此用例的方法。thenAccept方法接收一个使用者,并将计算结果传递给它。然后,最终的future.get() 调用返回Void类型的实例:

代码语言:javascript代码运行次数:0运行复制
CompletableFuture<String> completableFuture
  = CompletableFuture.supplyAsync(() -> "Hello");

CompletableFuture<Void> future = completableFuture
  .thenAccept(s -> System.out.println("Computation returned: " + s));

future.get();Copy

最后,如果我们既不需要计算的值,也不想在链的末尾返回一些值,那么我们可以将一个可运行的 lambda传递给thenRun方法。在下面的示例中,我们只需在调用future.get() 后在控制台中打印一行:

代码语言:javascript代码运行次数:0运行复制
CompletableFuture<String> completableFuture 
  = CompletableFuture.supplyAsync(() -> "Hello");

CompletableFuture<Void> future = completableFuture
  .thenRun(() -> System.out.println("Computation finished."));

future.get();Copy

6. 组合Future

CompletableFuture_API 最好的部分是能够在计算步骤链中组合_CompletableFuture实例。

这种链接的结果本身就是一个允许进一步链接和组合的CompletableFuture。这种方法在函数式语言中无处不在,通常被称为一元设计模式。

在下面的例子中,我们使用thenCompose方法将两个Futures按顺序连接起来。

请注意,此方法采用一个返回CompletableFuture实例的函数。此函数的参数是上一个计算步骤的结果。这允许我们在下一个CompletableFuture 的 lambda 中使用此值:

代码语言:javascript代码运行次数:0运行复制
CompletableFuture<String> completableFuture 
  = CompletableFuture.supplyAsync(() -> "Hello")
    .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World"));

assertEquals("Hello World", completableFuture.get());Copy

thenCompose方法与thenApply 一起实现了 monadic 模式的基本构建块。它们与StreamOptional类的mapflatMap方法密切相关,Java 8 中也可用。

这两种方法都接收一个函数并将其应用于计算结果,但thenComposeflatMap) 方法接收一个返回相同类型另一个对象的函数。此功能结构允许将这些类的实例组合为构建块。

如果我们想执行两个独立的 Future 并对它们的结果做一些事情,我们可以使用thenCombine方法,该方法接受一个Future和一个带有两个参数的函数来处理这两个结果:

代码语言:javascript代码运行次数:0运行复制
CompletableFuture<String> completableFuture 
  = CompletableFuture.supplyAsync(() -> "Hello")
    .thenCombine(CompletableFuture.supplyAsync(
      () -> " World"), (s1, s2) -> s1 + s2));

assertEquals("Hello World", completableFuture.get());Copy

一个更简单的情况是,当我们想对两个Future的结果做一些事情,但不需要将任何结果值传递到Future链上。thenAcceptBoth方法可以提供帮助:

代码语言:javascript代码运行次数:0运行复制
CompletableFuture future = CompletableFuture.supplyAsync(() -> "Hello")
  .thenAcceptBoth(CompletableFuture.supplyAsync(() -> " World"),
    (s1, s2) -> System.out.println(s1 + s2));Copy

7.thenApply()thenCompose()之间的区别

在前面的部分中,我们已经展示了有关thenApply()和thenCompose()的示例。这两个 API 都有助于链接不同的CompletableFuture调用,但这两个函数的用法不同。

7.1.thenApply()

我们可以使用此方法来处理上一个调用的结果。但是,要记住的一个关键点是,返回类型将组合所有调用。

因此,当我们想要转换CompletableFuture调用的结果时,此方法很有用:

代码语言:javascript代码运行次数:0运行复制
CompletableFuture<Integer> finalResult = compute().thenApply(s-> s + 1);Copy

7.2.thenCompose()

thenCompose() 与thenApply() 类似,因为它们都返回一个新的 CompletionStage。但是,thenCompose()使用前一阶段作为参数。它将展平并直接返回带有结果的Future,而不是我们在thenApply()中观察到的嵌套Future:

代码语言:javascript代码运行次数:0运行复制
CompletableFuture<Integer> computeAnother(Integer i){
    return CompletableFuture.supplyAsync(() -> 10 + i);
}
CompletableFuture<Integer> finalResult = compute().thenCompose(this::computeAnother);Copy

因此,如果想法是链接CompletableFuture方法,那么最好使用thenCompose()。

另外,请注意,这两种方法之间的差异类似于map() 和flatMap() 之间的差异

8. 并行运行多个Future

当我们需要并行执行多个Future时,我们通常希望等待所有Future执行,然后处理它们的组合结果。

CompletableFuture.allOf静态方法允许等待作为 var-arg 提供的所有Futures的完成:

代码语言:javascript代码运行次数:0运行复制
CompletableFuture<String> future1  
  = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2  
  = CompletableFuture.supplyAsync(() -> "Beautiful");
CompletableFuture<String> future3  
  = CompletableFuture.supplyAsync(() -> "World");

CompletableFuture<Void> combinedFuture 
  = CompletableFuture.allOf(future1, future2, future3);

// ...

combinedFuture.get();

assertTrue(future1.isDone());
assertTrue(future2.isDone());
assertTrue(future3.isDone());Copy

请注意,CompletableFuture.allOf() 的返回类型是CompletableFuture<Void>。此方法的局限性在于它不返回所有Future的组合结果。相反,我们必须手动从Future中获取结果。幸运的是,CompletableFuture.join()方法和Java 8 Streams API使它变得简单:

代码语言:javascript代码运行次数:0运行复制
String combined = Stream.of(future1, future2, future3)
  .map(CompletableFuture::join)
  .collect(Collectors.joining(" "));

assertEquals("Hello Beautiful World", combined);Copy

CompletableFuture.join() 方法类似于get方法,但如果Future无法正常完成,它会抛出一个未经检查的异常。这使得可以将其用作Stream.map() 方法中的方法引用。

9. 处理错误

对于异步计算步骤链中的错误处理,我们必须以类似的方式调整抛出/捕获习惯用法。

CompletableFuture类允许我们使用特殊的句柄方法处理它,而不是在语法块中捕获异常。此方法接收两个参数:计算结果(如果成功完成)和引发的异常(如果某些计算步骤未正常完成)。

在下面的示例中,我们使用handle方法在问候语的异步计算完成时提供默认值,但由于未提供名称而出现错误:

代码语言:javascript代码运行次数:0运行复制
String name = null;

// ...

CompletableFuture<String> completableFuture  
  =  CompletableFuture.supplyAsync(() -> {
      if (name == null) {
          throw new RuntimeException("Computation error!");
      }
      return "Hello, " + name;
  }).handle((s, t) -> s != null ? s : "Hello, Stranger!");

assertEquals("Hello, Stranger!", completableFuture.get());Copy

作为替代方案,假设我们想使用一个值手动完成Future,如第一个示例所示,但也有能力在异常的情况下完成它。完整的异常方法就是为此而设计的。以下示例中的completableFuture.get() 方法抛出一个ExecutionException,其原因为RuntimeException

代码语言:javascript代码运行次数:0运行复制
CompletableFuture<String> completableFuture = new CompletableFuture<>();

// ...

completableFuturepleteExceptionally(
  new RuntimeException("Calculation failed!"));

// ...

completableFuture.get(); // ExecutionExceptionCopy

在上面的示例中,我们本可以使用handle方法异步处理异常,但使用get方法,我们可以使用更典型的同步异常处理方法。

10. 异步方法

CompletableFuture类中流利 API 的大多数方法都有两个带有异步后缀的附加变体。这些方法通常用于在另一个线程中运行相应的执行步骤。

没有异步后缀的方法使用调用线程运行下一个执行阶段。相比之下,没有Executor参数的Async方法使用执行程序的公共分支/联接池实现运行一个步骤,该实现使用ForkJoinPoolmonPool() 访问,只要并行度> 1。最后,带有 Executor 参数的Async方法使用传递的Executor运行一个步骤。

下面是一个修改后的示例,该示例使用函数实例处理计算结果。唯一可见的区别是thenApplyAsync方法,但在底层,函数的应用程序被包装到ForkJoinTask实例中(有关fork/join框架的更多信息,请参阅文章“Java中的Fork/join框架指南”)。这使我们能够更加并行化我们的计算,并更有效地使用系统资源:

代码语言:javascript代码运行次数:0运行复制
CompletableFuture<String> completableFuture  
  = CompletableFuture.supplyAsync(() -> "Hello");

CompletableFuture<String> future = completableFuture
  .thenApplyAsync(s -> s + " World");

assertEquals("Hello World", future.get());Copy

11. JDK 9_可压缩Future_API

Java 9 通过以下更改增强了_CompletableFuture_API:

  • 添加了新的工厂方法
  • 支持延迟和超时
  • 改进了对子类化的支持

和新的实例 API:

  • Executor defaultExecutor()
  • CompletableFuture<U> newIncompleteFuture()
  • CompletableFuture<T> copy()
  • CompletionStage<T> minimalCompletionStage()
  • CompletableFuture<T> completeAsync(Supplier<? extends T> supplier, Executor executor)
  • CompletableFuture<T> completeAsync(Supplier<? extends T> supplier)
  • CompletableFuture<T> orTimeout(long timeout, TimeUnit unit)
  • CompletableFuture<T> completeOnTimeout(T value, long timeout, TimeUnit unit)

我们现在还有一些静态实用程序方法:

  • Executor delayedExecutor(long delay, TimeUnit unit, Executor executor)
  • Executor delayedExecutor(long delay, TimeUnit unit)
  • <U> CompletionStage<U> completedStage(U value)
  • <U> CompletionStage<U> failedStage(Throwable ex)
  • <U> CompletableFuture<U> failedFuture(Throwable ex)

最后,为了解决超时问题,Java 9 又引入了两个新功能:

  • orTimeout()
  • completeOnTimeout()

以下是进一步阅读的详细文章:Java 9 CompletableFuture API 改进。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。 原始发表:2023-02-16,如有侵权请联系 cloudcommunity@tencent 删除异步异常java函数接口