本文目录导读:

CompletableFuture 是 Java 8 引入的一个强大的异步编程工具,它通过函数式编程风格,让你能够以声明式的方式编排复杂的异步任务,避免了传统的回调地狱(Callback Hell)和手动管理线程的复杂性。
下面我将系统地介绍如何使用 CompletableFuture 来编排复杂异步任务。
核心思想:流水线 + 组合
复杂任务可以分解为多个阶段(Stage)。CompletableFuture 的核心是将这些阶段像流水线一样串联或组合起来,每个阶段可以是一个异步操作,也可以是对上一个结果的同步转换。
基础概念回顾
-
创建异步任务:
CompletableFuture.supplyAsync(Supplier):执行有返回值的异步任务。CompletableFuture.runAsync(Runnable):执行无返回值的异步任务。- 这两个方法默认使用
ForkJoinPool.commonPool(),也可以传入自定义的Executor。
-
获取结果:
.get():阻塞等待结果。.join():类似get(),但不抛出受检异常。.getNow(valueIfAbsent):立即返回结果,如果未完成则返回默认值。
-
异常处理:
exceptionally(Function):当任务出现异常时,提供回退值。handle(BiFunction):无论成功或失败,都会执行,通过参数判断结果或异常。whenComplete(BiConsumer):无论成功或失败,都会执行,无法修改返回结果。
核心编排方法(流水线)
这些方法将多个 CompletableFuture 串联成一个新的 CompletableFuture。
转换 (thenApply)
-
作用:对上一个异步任务的结果进行同步转换,类似于 Stream 的
map。 -
场景:从数据库中拿到用户ID,然后去查询用户详情。
CompletableFuture<Integer> userIdFuture = CompletableFuture.supplyAsync(() -> getUserById("admin")); // 等待 userIdFuture 完成,然后执行转换 CompletableFuture<String> userNameFuture = userIdFuture.thenApply(userId -> { // 这个 Function 是在 userIdFuture 完成的线程中同步执行的 return getUserName(userId); }); String userName = userNameFuture.join(); // 最终结果是 "Admin"
消费 (thenAccept)
-
作用:消费上一个异步任务的结果,没有返回值。
-
场景:打印日志、发送通知。
CompletableFuture.supplyAsync(() -> "Hello") .thenAccept(result -> System.out.println("Result: " + result));
运行 (thenRun)
-
作用:在上一个异步任务完成后,执行一个 Runnable,既不关心输入也不关心输出。
-
场景:任务完成后做清理工作。
CompletableFuture.supplyAsync(() -> loadData()) .thenRun(() -> System.out.println("Data loaded!"));
异步执行 (thenApplyAsync, thenAcceptAsync, thenRunAsync)
-
关键区别:带
Async后缀的方法会强制在另一个线程池中异步执行后续操作,而不是在调用线程中执行,通常建议使用异步版本以避免阻塞调用线程。CompletableFuture.supplyAsync(() -> "Hello") .thenApplyAsync(result -> { // 这个操作会提交到 ForkJoinPool 中的另一个线程执行 return result + " World"; });
组合编排方法(并行 + 合并)
这些方法用于处理多个独立的 CompletableFuture 之间的依赖关系。
组合两个任务 (thenCompose)
-
作用:将一个
CompletableFuture的结果作为另一个异步方法的输入,它用于扁平化Future<Future>的情况,避免嵌套。 -
场景:先登录获取 token,再用 token 去获取用户信息。
// 错误写法,会产生嵌套 Future // CompletableFuture<CompletableFuture<User>> bad = login().thenApply(token -> getUser(token)); // 正确写法:thenCompose CompletableFuture<User> userFuture = login() .thenCompose(token -> getUser(token)); User user = userFuture.join();
合并两个任务的结果 (thenCombine)
-
作用:等待两个独立的
CompletableFuture都完成后,将两者的结果合并成一个新结果。 -
场景:同时查询订单信息和用户信息,然后合并展示。
CompletableFuture<Order> orderFuture = CompletableFuture.supplyAsync(() -> fetchOrder()); CompletableFuture<User> userFuture = CompletableFuture.supplyAsync(() -> fetchUser()); CompletableFuture<String> combinedFuture = orderFuture.thenCombine(userFuture, (order, user) -> { return "Order: " + order.getId() + ", User: " + user.getName(); }); String result = combinedFuture.join();
消费两个任务的结果 (thenAcceptBoth)
-
作用:等待两个独立任务都完成后,消费它们的结果,无返回值,类似于
thenCombine但无返回值。orderFuture.thenAcceptBoth(userFuture, (order, user) -> { System.out.println("Order " + order + " belongs to user " + user); });
等待所有任务完成 (allOf)
-
作用:等待所有给定的
CompletableFuture都完成,返回CompletableFuture<Void>,这意味着它不合并结果,你需要手动收集结果。 -
场景:批量并发查询。
List<CompletableFuture<String>> futures = IntStream.range(1, 10) .mapToObj(i -> CompletableFuture.supplyAsync(() -> fetchData(i))) .collect(Collectors.toList()); CompletableFuture<Void> allFutures = CompletableFuture.allOf( futures.toArray(new CompletableFuture[0]) ); // 当所有任务完成后,手动收集结果 CompletableFuture<List<String>> finalResult = allFutures.thenApply(v -> futures.stream() .map(CompletableFuture::join) // join 不会阻塞 .collect(Collectors.toList()) ); List<String> results = finalResult.join();
等待任意一个任务完成 (anyOf)
-
作用:等待任意一个给定的
CompletableFuture完成,返回CompletableFuture<Object>,结果是第一个完成的任务的结果。 -
场景:从多个数据源(如缓存、数据库)中查询,只要有一个返回即可。
CompletableFuture<String> cacheFuture = CompletableFuture.supplyAsync(() -> getFromCache("key")); CompletableFuture<String> dbFuture = CompletableFuture.supplyAsync(() -> getFromDB("key")); CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(cacheFuture, dbFuture); String result = (String) anyOfFuture.join(); // 可能是缓存或数据库的结果
异常处理编排
在复杂的流水线中,任何一个环节都可能失败。CompletableFuture 提供了强大的异常恢复机制。
exceptionally:优雅降级
-
作用:当上游任务抛出异常时,提供一个回退值,类似于
catch语句。 -
场景:调用外部接口失败,返回默认值。
CompletableFuture.supplyAsync(() -> { if (Math.random() > 0.5) throw new RuntimeException("API call failed"); return "API Result"; }).exceptionally(ex -> { System.out.println("Fallback due to: " + ex.getMessage()); return "Default Value"; // 降级 }).thenAccept(System.out::println);
handle:无论成功失败都处理
-
作用:无论上一个任务成功还是失败,都会调用这个
BiFunction,你需要检查Throwable参数。 -
场景:需要根据结果或异常进行不同的处理。
CompletableFuture.supplyAsync(() -> { if (Math.random() > 0.5) throw new RuntimeException("Error!"); return "Success"; }).handle((result, ex) -> { if (ex != null) { System.out.println("Error: " + ex.getMessage()); return "Recovered"; // 从异常恢复 } else { return "Processed: " + result; } }).thenAccept(System.out::println);
whenComplete:只观察,不修改
-
作用:观察任务完成状态(成功或异常),但不能改变最终结果,常用于记录日志。
-
场景:记录异步操作的最终状态。
CompletableFuture.supplyAsync(() -> "Data") .whenComplete((result, ex) -> { if (ex != null) { log.error("Failed", ex); } else { log.info("Success: " + result); } }); // 最终结果依然是 "Data" 或异常
复杂任务编排实战示例
需求:构建一个“用户推荐系统”,流程如下:
- 并行从多个数据源(用户行为、社交关系、历史购买)获取数据。
- 等待所有数据源返回后,进行聚合计算(例如加权评分)。
- 将计算结果异步存入缓存。
- 返回最终推荐结果。
- 如果任何一步失败,返回空列表作为降级。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class RecommendationService {
private final ExecutorService executor = Executors.newFixedThreadPool(10);
public CompletableFuture<List<String>> getRecommendations(String userId) {
// 1. 并行获取多个数据源
CompletableFuture<List<String>> behaviorFuture = CompletableFuture
.supplyAsync(() -> fetchUserBehavior(userId), executor);
CompletableFuture<List<String>> socialFuture = CompletableFuture
.supplyAsync(() -> fetchSocialConnections(userId), executor);
CompletableFuture<List<String>> purchaseFuture = CompletableFuture
.supplyAsync(() -> fetchPurchaseHistory(userId), executor);
// 2. 使用 allOf 等待所有数据源完成
return CompletableFuture
.allOf(behaviorFuture, socialFuture, purchaseFuture)
.thenApplyAsync(v -> {
// 3. 合并结果:从三个 Future 中取出数据
List<String> behavior = behaviorFuture.join();
List<String> social = socialFuture.join();
List<String> purchase = purchaseFuture.join();
// 4. 进行复杂的业务逻辑(聚合、去重、加权计算)
return aggregateRecommendations(behavior, social, purchase);
}, executor)
.thenApplyAsync(recommendations -> {
// 5. 异步写入缓存(消费结果,不改变结果)
cacheRecommendations(userId, recommendations);
return recommendations; // 传递结果
}, executor)
.exceptionally(ex -> {
// 6. 全局异常处理:返回空列表
log.error("Failed to get recommendations for user: " + userId, ex);
return List.of();
});
}
// --- 模拟方法 ---
private List<String> fetchUserBehavior(String userId) {
// 模拟耗时操作
sleep(100);
return List.of("item1", "item2");
}
private List<String> fetchSocialConnections(String userId) {
sleep(150);
return List.of("item3", "item4");
}
private List<String> fetchPurchaseHistory(String userId) {
sleep(200);
return List.of("item1", "item5");
}
private List<String> aggregateRecommendations(List<String> b, List<String> s, List<String> p) {
// 简化的合并逻辑
return List.of("rec1", "rec2");
}
private void cacheRecommendations(String userId, List<String> recs) {
// 模拟缓存写入
System.out.println("Caching " + recs.size() + " items for user " + userId);
}
private void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private static final Logger log = LoggerFactory.getLogger(RecommendationService.class);
// 主流程
public static void main(String[] args) {
RecommendationService service = new RecommendationService();
CompletableFuture<List<String>> future = service.getRecommendations("user123");
System.out.println("Main thread is not blocked...");
List<String> recs = future.join(); // 主线程阻塞等待最终结果
System.out.println("Recommendations: " + recs);
service.executor.shutdown();
}
}
最佳实践与注意事项
-
合理使用线程池:
- 不要总是用默认的
ForkJoinPool.commonPool(),因为它被所有并行流和CompletableFuture共享,容易导致线程饥饿。 - 建议:为不同类型的任务创建独立的、大小合适的
ExecutorService(如newFixedThreadPool或newCachedThreadPool)。
- 不要总是用默认的
-
Async版本的选择:thenApply(无 Async):在上一个任务的线程中同步执行,如果后续操作很重,会阻塞该线程。thenApplyAsync(有 Async):将后续操作提交到线程池执行,更加灵活,推荐使用,尤其是在需要控制线程隔离性的场景。
-
避免链式阻塞:
- 不要在
CompletableFuture的回调函数内部调用.join()或.get()来等待另一个Future,这会导致线程阻塞,破坏异步设计,应该使用thenCompose或thenCombine来组合。
- 不要在
-
异常处理要到位:
- 链式调用的最后,一定要有
exceptionally或handle兜底,否则异常可能会被吞掉(除非你显式调用.get())。 - 使用
whenComplete记录日志,但不要在其中修改结果。
- 链式调用的最后,一定要有
-
allOf与join的配合:- 在
allOf(...).thenApply(v ->...)内部调用join()是安全的,因为allOf保证所有任务已经完成,join()会立即返回结果。
- 在
-
善用
CompletableFuture的completionStage方法:completedFuture(value):创建一个已经完成的 Future。completedStage(value):创建一个已完成的 CompletionStage。minimalCompletionStage():将CompletableFuture转换为不可变的CompletionStage,用于安全暴露给调用方。
-
监控和调试:
CompletableFuture的链式调用很难调试,可以在关键节点添加whenComplete来打印日志。- 使用
JStack等工具排查线程死锁或线程池耗尽问题。
| 场景 | 推荐方法 | 说明 |
|---|---|---|
| 转换结果 | thenApply[Async] |
对结果做同步映射 |
| 消费结果 | thenAccept[Async] |
消费结果,无返回值 |
| 纯副作用 | thenRun[Async] |
执行一个 Runnable |
| 组合两个Future | thenCompose |
避免 Future<Future> 嵌套 |
| 合并两个Future | thenCombine |
等待两者完成,合并结果 |
| 等待所有完成 | allOf + thenApply |
批量并发,收集结果 |
| 等待任意完成 | anyOf |
多路复用,最快返回 |
| 异常时降级 | exceptionally |
提供回退值 |
| 无论成败都处理 | handle |
根据情况返回或抛出 |
| 观察状态 | whenComplete |
记录日志,不修改结果 |
掌握这些方法和原则,你就能像搭积木一样构建出清晰、健壮、高性能的复杂异步任务流。