经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » Java相关 » Java » 查看文章
异步编程——CompletableFuture详解
来源:cnblogs  作者:异人程序员  时间:2025/3/7 9:09:30  对本文有异议

Future

JDK5 新增了Future接口,用于描述一个异步计算的结果。

虽然 Future 以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不方便,我们必须使用Future.get()的方式阻塞调用线程,或者使用轮询方式判断 Future.isDone 任务是否结束,再获取结果。

并且,Future 无法解决多个异步任务相互依赖的场景,简单点说就是,主线程需要等待子线程任务执行完毕之后在进行执行,这个时候你可能想到了 「CountDownLatch」,没错确实可以解决,代码如下。

这里定义两个 Future,第一个通过用户 id 获取用户信息,第二个通过商品 id 获取商品信息。

  1. public void testCountDownLatch() throws InterruptedException, ExecutionException {
  2. ExecutorService executorService = Executors.newFixedThreadPool(5);
  3. CountDownLatch downLatch = new CountDownLatch(2);
  4. long startTime = System.currentTimeMillis();
  5. Future<String> userFuture = executorService.submit(() -> {
  6. //模拟查询商品耗时500毫秒
  7. Thread.sleep(500);
  8. downLatch.countDown();
  9. return "用户A";
  10. });
  11. Future<String> goodsFuture = executorService.submit(() -> {
  12. //模拟查询商品耗时500毫秒
  13. Thread.sleep(400);
  14. downLatch.countDown();
  15. return "商品A";
  16. });
  17. downLatch.await();
  18. //模拟主程序耗时时间
  19. Thread.sleep(600);
  20. System.out.println("获取用户信息:" + userFuture.get());
  21. System.out.println("获取商品信息:" + goodsFuture.get());
  22. System.out.println("总共用时" + (System.currentTimeMillis() - startTime) + "ms");
  23. }

Java8 以后这不再是一种优雅的解决方式,接下来来了解下 CompletableFuture 的使用。

CompletableFuture

  1. @Test
  2. public void testCompletableInfo() throws InterruptedException, ExecutionException {
  3. long startTime = System.currentTimeMillis();
  4. //调用用户服务获取用户基本信息
  5. CompletableFuture<String> userFuture = CompletableFuture.supplyAsync(() ->
  6. //模拟查询商品耗时500毫秒
  7. {
  8. try {
  9. Thread.sleep(500);
  10. } catch (InterruptedException e) {
  11. e.printStackTrace();
  12. }
  13. return "用户A";
  14. });
  15. //调用商品服务获取商品基本信息
  16. CompletableFuture<String> goodsFuture = CompletableFuture.supplyAsync(() ->
  17. //模拟查询商品耗时500毫秒
  18. {
  19. try {
  20. Thread.sleep(400);
  21. } catch (InterruptedException e) {
  22. e.printStackTrace();
  23. }
  24. return "商品A";
  25. });
  26. System.out.println("获取用户信息:" + userFuture.get());
  27. System.out.println("获取商品信息:" + goodsFuture.get());
  28. //模拟主程序耗时时间
  29. Thread.sleep(600);
  30. System.out.println("总共用时" + (System.currentTimeMillis() - startTime) + "ms");
  31. }

CompletableFuture 创建方式

「supplyAsync」执行任务,支持返回值。
「runAsync」执行任务,没有返回值。
参数如果传了线程池就使用自定义的线程池,没传则使用默认内置线程池ForkJoinPool.commonPool(),根据supplier构建执行任务。(注意:默认内置线程池核心数为机器核心数减一,如果机器核心数比2小时,会创建一个新线程去跑任务,建议在高并发场景使用自定义线程池

  1. public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier){..}
  2. public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor){..}
  3. public static CompletableFuture<Void> runAsync(Runnable runnable){..}
  4. public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor){..}

CompletableFuture 获取方式

  1. //方式一
  2. public T get()
  3. //方式二
  4. public T get(long timeout, TimeUnit unit)
  5. //方式三
  6. public T getNow(T valueIfAbsent)
  7. //方式四
  8. public T join()

说明:

「get()和 get(long timeout, TimeUnit unit)」 => 在 Future 中就已经提供了,后者提供超时处理,如果在指定时间内未获取结果将抛出超时异常
「getNow」 => 立即获取结果不阻塞,结果计算已完成将返回结果或计算过程中的异常,如果未计算完成将返回设定的 valueIfAbsent 值
「join」 => 方法里有异常不会抛出异常,但会抛出 CompletionException

异步回调方法

1、thenRun/thenRunAsync
通俗点讲就是,「做完第一个任务后,再做第二个任务,第二个任务也没有返回值」。

【Async】加了则第一个任务使用的是你自己传入的线程池,第二个任务使用的是 ForkJoin 线程池,没加则第二个线程池也用传入的线程池。

2、thenAccept/thenAcceptAsync

第一个任务执行完成后,执行第二个回调方法任务,会将该任务的执行结果,作为入参,传递到回调方法中,但是回调方法是没有返回值的。

3、thenApply/thenApplyAsync

表示第一个任务执行完成后,执行第二个回调方法任务,会将该任务的执行结果,作为入参,传递到回调方法中,并且回调方法是有返回值的。

异常回调

whenComplete + exceptionally 示例

  1. public void testWhenCompleteExceptionally() throws ExecutionException, InterruptedException {
  2. CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> {
  3. if (Math.random() < 0.5) {
  4. throw new RuntimeException("出错了");
  5. }
  6. System.out.println("正常结束");
  7. return 0.11;
  8. }).whenComplete((aDouble, throwable) -> {
  9. if (aDouble == null) {
  10. System.out.println("whenComplete aDouble is null");
  11. } else {
  12. System.out.println("whenComplete aDouble is " + aDouble);
  13. }
  14. if (throwable == null) {
  15. System.out.println("whenComplete throwable is null");
  16. } else {
  17. System.out.println("whenComplete throwable is " + throwable.getMessage());
  18. }
  19. }).exceptionally((throwable) -> {
  20. System.out.println("exceptionally中异常:" + throwable.getMessage());
  21. return 0.0;
  22. });
  23. System.out.println("最终返回的结果 = " + future.get());
  24. }

当出现异常时,exceptionally 中会捕获该异常,给出默认返回值 0.0。

而 「whenComplete」 这个回调函数:

「正常完成」:whenComplete 返回结果和上级任务一致,异常为 null;
「出现异常」:whenComplete 返回结果为 null,异常为上级任务的异常;

结果:

  1. whenComplete aDouble is null
  2. whenComplete throwable is java.lang.RuntimeException: 出错了
  3. exceptionally中异常:java.lang.RuntimeException: 出错了
  4. 最终返回的结果 = 0.0

注意点

1、Future 需要获取返回值,才能获取异常信息

Future 需要获取返回值,才能获取到异常信息。如果不加 get()/join()方法,看不到异常信息。如果想要获取,考虑是否加 try...catch...或者使用 exceptionally 方法。

2、CompletableFuture 的 get()方法是阻塞的

CompletableFuture 的 get()方法是阻塞的,如果使用它来获取异步调用的返回值,需要添加超时时间。

3、不建议使用默认线程池

CompletableFuture 代码中使用了默认的 「ForkJoin 线程池」, 处理的线程个数是电脑 「CPU 核数-1」 。在大量请求过来的时候,处理逻辑复杂的话,响应会很慢。一般建议使用自定义线程池,优化线程池配置参数。

4、自定义线程池时,注意拒绝策略

如果线程池拒绝策略是 DiscardPolicy(丢弃当前任务) 或者 DiscardOldestPolicy(丢弃最旧的那个任务),当线程池饱和时,会直接丢弃任务,不会抛弃异常。因此建议,CompletableFuture 线程池策略最好使用 AbortPolicy(抛出执行异常)或者CallerRunsPolicy(让主线程执行)。

结合业务代码使用示例

Util工具类

  1. public class CompletableFutureUtil {
  2. private CompletableFutureUtil(){}
  3. public static <R> CompletableFuture<R> executeWithFallbackAndContextPropagation(@Nonnull Supplier<R> normalFunction,
  4. @Nonnull Supplier<R> exceptionFunction,
  5. @Nonnull ThreadPoolTaskExecutor taskExecutor,
  6. @Nonnull String exceptionMsg){
  7. Thread mainThread = Thread.currentThread();
  8. return CompletableFuture
  9. .supplyAsync(normalFunction,taskExecutor)
  10. .exceptionally(e -> {
  11. log.error(exceptionMsg, e);
  12. return exceptionFunction.get();
  13. })
  14. .whenComplete((data,e)->{
  15. if(!mainThread.equals(Thread.currentThread())){
  16. MallContextHolderManager.clearContext();
  17. }
  18. });
  19. }
  20. }

使用Util创建任务代码

  1. private CompletableFuture<Boolean> asyncQueryCommentPic(ProductDetailInfoNewDto detailInfoDto, ProductInfoQueryDTO productInfoQuery) {
  2. ThreadPoolTaskExecutor taskExecutor = bizThreadPoolManager.getBizThreadPoolTaskExecutor(BIZ_THREAD_POOL_NAME);
  3. // 兜底获取不到线程池时降级
  4. if (taskExecutor == null) {
  5. detailInfoDto.setShowPrimaryPic(Boolean.FALSE);
  6. return null;
  7. }
  8. return CompletableFutureUtil.executeWithFallbackAndContextPropagation(
  9. () -> queryShowPrimaryPic(detailInfoDto, productInfoQuery),
  10. () -> Boolean.FALSE,
  11. taskExecutor,
  12. "异步任务执行异常");
  13. }

获取任务结果代码

  1. private void handShowPrimaryPic(ProductDetailInfoNewDto detailInfoDto, CompletableFuture<Boolean> commentPicFuture) {
  2. detailInfoDto.setShowPrimaryPic(Boolean.FALSE);
  3. if (commentPicFuture != null) {
  4. try {
  5. Boolean showPrimaryPic = commentPicFuture.get(asyncGetCommentPrimaryPicTimeout, TimeUnit.MILLISECONDS);
  6. detailInfoDto.setShowPrimaryPic(showPrimaryPic);
  7. } catch (Exception e) {
  8. log.error("任务等待结果异常:future={}", JSON.toJSONString(commentPicFuture), e);
  9. }
  10. }
  11. }

原文链接:https://www.cnblogs.com/ashuaiYiRen/p/18756609

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号