用CompletableFuture如何编排复杂异步任务?

wen java案例 64

本文目录导读:

用CompletableFuture如何编排复杂异步任务?

  1. 核心思想:流水线 + 组合
  2. 基础概念回顾
  3. 核心编排方法(流水线)
  4. 组合编排方法(并行 + 合并)
  5. 异常处理编排
  6. 复杂任务编排实战示例
  7. 最佳实践与注意事项

CompletableFuture 是 Java 8 引入的一个强大的异步编程工具,它通过函数式编程风格,让你能够以声明式的方式编排复杂的异步任务,避免了传统的回调地狱(Callback Hell)和手动管理线程的复杂性。

下面我将系统地介绍如何使用 CompletableFuture 来编排复杂异步任务。


核心思想:流水线 + 组合

复杂任务可以分解为多个阶段(Stage)。CompletableFuture 的核心是将这些阶段像流水线一样串联或组合起来,每个阶段可以是一个异步操作,也可以是对上一个结果的同步转换。


基础概念回顾

  1. 创建异步任务:

    • CompletableFuture.supplyAsync(Supplier):执行有返回值的异步任务。
    • CompletableFuture.runAsync(Runnable):执行无返回值的异步任务。
    • 这两个方法默认使用 ForkJoinPool.commonPool(),也可以传入自定义的 Executor
  2. 获取结果:

    • .get():阻塞等待结果。
    • .join():类似 get(),但不抛出受检异常。
    • .getNow(valueIfAbsent):立即返回结果,如果未完成则返回默认值。
  3. 异常处理:

    • exceptionally(Function):当任务出现异常时,提供回退值。
    • handle(BiFunction):无论成功或失败,都会执行,通过参数判断结果或异常。
    • whenComplete(BiConsumer):无论成功或失败,都会执行,无法修改返回结果。

核心编排方法(流水线)

这些方法将多个 CompletableFuture 串联成一个新的 CompletableFuture

转换 (thenApply)

  • 作用:对上一个异步任务的结果进行同步转换,类似于 Stream 的 map

  • 场景:从数据库中拿到用户ID,然后去查询用户详情。

    CompletableFuture<Integer> userIdFuture = CompletableFuture.supplyAsync(() -> getUserById("admin"));
    // 等待 userIdFuture 完成,然后执行转换
    CompletableFuture<String> userNameFuture = userIdFuture.thenApply(userId -> {
        // 这个 Function 是在 userIdFuture 完成的线程中同步执行的
        return getUserName(userId); 
    });
    String userName = userNameFuture.join(); // 最终结果是 "Admin"

消费 (thenAccept)

  • 作用:消费上一个异步任务的结果,没有返回值。

  • 场景:打印日志、发送通知。

    CompletableFuture.supplyAsync(() -> "Hello")
                     .thenAccept(result -> System.out.println("Result: " + result));

运行 (thenRun)

  • 作用:在上一个异步任务完成后,执行一个 Runnable,既不关心输入也不关心输出。

  • 场景:任务完成后做清理工作。

    CompletableFuture.supplyAsync(() -> loadData())
                     .thenRun(() -> System.out.println("Data loaded!"));

异步执行 (thenApplyAsync, thenAcceptAsync, thenRunAsync)

  • 关键区别:带 Async 后缀的方法会强制在另一个线程池中异步执行后续操作,而不是在调用线程中执行,通常建议使用异步版本以避免阻塞调用线程。

    CompletableFuture.supplyAsync(() -> "Hello")
                     .thenApplyAsync(result -> {
                         // 这个操作会提交到 ForkJoinPool 中的另一个线程执行
                         return result + " World";
                     });

组合编排方法(并行 + 合并)

这些方法用于处理多个独立的 CompletableFuture 之间的依赖关系。

组合两个任务 (thenCompose)

  • 作用:将一个 CompletableFuture 的结果作为另一个异步方法的输入,它用于扁平化 Future<Future> 的情况,避免嵌套。

  • 场景:先登录获取 token,再用 token 去获取用户信息。

    // 错误写法,会产生嵌套 Future
    // CompletableFuture<CompletableFuture<User>> bad = login().thenApply(token -> getUser(token));
    // 正确写法:thenCompose
    CompletableFuture<User> userFuture = login()
            .thenCompose(token -> getUser(token));
    User user = userFuture.join();

合并两个任务的结果 (thenCombine)

  • 作用:等待两个独立的 CompletableFuture 都完成后,将两者的结果合并成一个新结果。

  • 场景:同时查询订单信息和用户信息,然后合并展示。

    CompletableFuture<Order> orderFuture = CompletableFuture.supplyAsync(() -> fetchOrder());
    CompletableFuture<User> userFuture = CompletableFuture.supplyAsync(() -> fetchUser());
    CompletableFuture<String> combinedFuture = orderFuture.thenCombine(userFuture, (order, user) -> {
        return "Order: " + order.getId() + ", User: " + user.getName();
    });
    String result = combinedFuture.join();

消费两个任务的结果 (thenAcceptBoth)

  • 作用:等待两个独立任务都完成后,消费它们的结果,无返回值,类似于 thenCombine 但无返回值。

    orderFuture.thenAcceptBoth(userFuture, (order, user) -> {
        System.out.println("Order " + order + " belongs to user " + user);
    });

等待所有任务完成 (allOf)

  • 作用:等待所有给定的 CompletableFuture 都完成,返回 CompletableFuture<Void>,这意味着它不合并结果,你需要手动收集结果。

  • 场景:批量并发查询。

    List<CompletableFuture<String>> futures = IntStream.range(1, 10)
            .mapToObj(i -> CompletableFuture.supplyAsync(() -> fetchData(i)))
            .collect(Collectors.toList());
    CompletableFuture<Void> allFutures = CompletableFuture.allOf(
            futures.toArray(new CompletableFuture[0])
    );
    // 当所有任务完成后,手动收集结果
    CompletableFuture<List<String>> finalResult = allFutures.thenApply(v -> 
            futures.stream()
                   .map(CompletableFuture::join) // join 不会阻塞
                   .collect(Collectors.toList())
    );
    List<String> results = finalResult.join();

等待任意一个任务完成 (anyOf)

  • 作用:等待任意一个给定的 CompletableFuture 完成,返回 CompletableFuture<Object>,结果是第一个完成的任务的结果。

  • 场景:从多个数据源(如缓存、数据库)中查询,只要有一个返回即可。

    CompletableFuture<String> cacheFuture = CompletableFuture.supplyAsync(() -> getFromCache("key"));
    CompletableFuture<String> dbFuture = CompletableFuture.supplyAsync(() -> getFromDB("key"));
    CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(cacheFuture, dbFuture);
    String result = (String) anyOfFuture.join(); // 可能是缓存或数据库的结果

异常处理编排

在复杂的流水线中,任何一个环节都可能失败。CompletableFuture 提供了强大的异常恢复机制。

exceptionally:优雅降级

  • 作用:当上游任务抛出异常时,提供一个回退值,类似于 catch 语句。

  • 场景:调用外部接口失败,返回默认值。

    CompletableFuture.supplyAsync(() -> {
        if (Math.random() > 0.5) throw new RuntimeException("API call failed");
        return "API Result";
    }).exceptionally(ex -> {
        System.out.println("Fallback due to: " + ex.getMessage());
        return "Default Value"; // 降级
    }).thenAccept(System.out::println);

handle:无论成功失败都处理

  • 作用:无论上一个任务成功还是失败,都会调用这个 BiFunction,你需要检查 Throwable 参数。

  • 场景:需要根据结果或异常进行不同的处理。

    CompletableFuture.supplyAsync(() -> {
        if (Math.random() > 0.5) throw new RuntimeException("Error!");
        return "Success";
    }).handle((result, ex) -> {
        if (ex != null) {
            System.out.println("Error: " + ex.getMessage());
            return "Recovered"; // 从异常恢复
        } else {
            return "Processed: " + result;
        }
    }).thenAccept(System.out::println);

whenComplete:只观察,不修改

  • 作用:观察任务完成状态(成功或异常),但不能改变最终结果,常用于记录日志。

  • 场景:记录异步操作的最终状态。

    CompletableFuture.supplyAsync(() -> "Data")
                     .whenComplete((result, ex) -> {
                         if (ex != null) {
                             log.error("Failed", ex);
                         } else {
                             log.info("Success: " + result);
                         }
                     });
    // 最终结果依然是 "Data" 或异常

复杂任务编排实战示例

需求:构建一个“用户推荐系统”,流程如下:

  1. 并行从多个数据源(用户行为、社交关系、历史购买)获取数据。
  2. 等待所有数据源返回后,进行聚合计算(例如加权评分)。
  3. 将计算结果异步存入缓存。
  4. 返回最终推荐结果。
  5. 如果任何一步失败,返回空列表作为降级。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class RecommendationService {
    private final ExecutorService executor = Executors.newFixedThreadPool(10);
    public CompletableFuture<List<String>> getRecommendations(String userId) {
        // 1. 并行获取多个数据源
        CompletableFuture<List<String>> behaviorFuture = CompletableFuture
                .supplyAsync(() -> fetchUserBehavior(userId), executor);
        CompletableFuture<List<String>> socialFuture = CompletableFuture
                .supplyAsync(() -> fetchSocialConnections(userId), executor);
        CompletableFuture<List<String>> purchaseFuture = CompletableFuture
                .supplyAsync(() -> fetchPurchaseHistory(userId), executor);
        // 2. 使用 allOf 等待所有数据源完成
        return CompletableFuture
                .allOf(behaviorFuture, socialFuture, purchaseFuture)
                .thenApplyAsync(v -> {
                    // 3. 合并结果:从三个 Future 中取出数据
                    List<String> behavior = behaviorFuture.join();
                    List<String> social = socialFuture.join();
                    List<String> purchase = purchaseFuture.join();
                    // 4. 进行复杂的业务逻辑(聚合、去重、加权计算)
                    return aggregateRecommendations(behavior, social, purchase);
                }, executor)
                .thenApplyAsync(recommendations -> {
                    // 5. 异步写入缓存(消费结果,不改变结果)
                    cacheRecommendations(userId, recommendations);
                    return recommendations; // 传递结果
                }, executor)
                .exceptionally(ex -> {
                    // 6. 全局异常处理:返回空列表
                    log.error("Failed to get recommendations for user: " + userId, ex);
                    return List.of();
                });
    }
    // --- 模拟方法 ---
    private List<String> fetchUserBehavior(String userId) {
        // 模拟耗时操作
        sleep(100);
        return List.of("item1", "item2");
    }
    private List<String> fetchSocialConnections(String userId) {
        sleep(150);
        return List.of("item3", "item4");
    }
    private List<String> fetchPurchaseHistory(String userId) {
        sleep(200);
        return List.of("item1", "item5");
    }
    private List<String> aggregateRecommendations(List<String> b, List<String> s, List<String> p) {
        // 简化的合并逻辑
        return List.of("rec1", "rec2");
    }
    private void cacheRecommendations(String userId, List<String> recs) {
        // 模拟缓存写入
        System.out.println("Caching " + recs.size() + " items for user " + userId);
    }
    private void sleep(long millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
    private static final Logger log = LoggerFactory.getLogger(RecommendationService.class);
    // 主流程
    public static void main(String[] args) {
        RecommendationService service = new RecommendationService();
        CompletableFuture<List<String>> future = service.getRecommendations("user123");
        System.out.println("Main thread is not blocked...");
        List<String> recs = future.join(); // 主线程阻塞等待最终结果
        System.out.println("Recommendations: " + recs);
        service.executor.shutdown();
    }
}

最佳实践与注意事项

  1. 合理使用线程池

    • 不要总是用默认的 ForkJoinPool.commonPool(),因为它被所有并行流和 CompletableFuture 共享,容易导致线程饥饿。
    • 建议:为不同类型的任务创建独立的、大小合适的 ExecutorService(如 newFixedThreadPoolnewCachedThreadPool)。
  2. Async 版本的选择

    • thenApply(无 Async):在上一个任务的线程中同步执行,如果后续操作很重,会阻塞该线程。
    • thenApplyAsync(有 Async):将后续操作提交到线程池执行,更加灵活,推荐使用,尤其是在需要控制线程隔离性的场景。
  3. 避免链式阻塞

    • 不要在 CompletableFuture 的回调函数内部调用 .join().get() 来等待另一个 Future,这会导致线程阻塞,破坏异步设计,应该使用 thenComposethenCombine 来组合。
  4. 异常处理要到位

    • 链式调用的最后,一定要有 exceptionallyhandle 兜底,否则异常可能会被吞掉(除非你显式调用 .get())。
    • 使用 whenComplete 记录日志,但不要在其中修改结果。
  5. allOfjoin 的配合

    • allOf(...).thenApply(v ->...) 内部调用 join() 是安全的,因为 allOf 保证所有任务已经完成,join() 会立即返回结果。
  6. 善用 CompletableFuturecompletionStage 方法

    • completedFuture(value):创建一个已经完成的 Future。
    • completedStage(value):创建一个已完成的 CompletionStage。
    • minimalCompletionStage():将 CompletableFuture 转换为不可变的 CompletionStage,用于安全暴露给调用方。
  7. 监控和调试

    • CompletableFuture 的链式调用很难调试,可以在关键节点添加 whenComplete 来打印日志。
    • 使用 JStack 等工具排查线程死锁或线程池耗尽问题。

场景 推荐方法 说明
转换结果 thenApply[Async] 对结果做同步映射
消费结果 thenAccept[Async] 消费结果,无返回值
纯副作用 thenRun[Async] 执行一个 Runnable
组合两个Future thenCompose 避免 Future<Future> 嵌套
合并两个Future thenCombine 等待两者完成,合并结果
等待所有完成 allOf + thenApply 批量并发,收集结果
等待任意完成 anyOf 多路复用,最快返回
异常时降级 exceptionally 提供回退值
无论成败都处理 handle 根据情况返回或抛出
观察状态 whenComplete 记录日志,不修改结果

掌握这些方法和原则,你就能像搭积木一样构建出清晰、健壮、高性能的复杂异步任务流。

抱歉,评论功能暂时关闭!