经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » Java相关 » Java » 查看文章
简单设计一个JAVA并行处理工具类
来源:cnblogs  作者:狂盗一枝梅  时间:2024/8/7 8:51:11  对本文有异议

在工作中,我们肯定遇到过一个接口要处理N多事项导致接口响应速度很慢的情况,通常我们会综合使用两种方式来提升接口响应速度

  1. 优化查询SQL,提升查询效率
  2. 开启多线程并发处理业务数据

这里讨论第二种方案:使用多线程并发处理业务数据,最后处理完成以后,拼装起来返回给前端,每个人的实现方案都不一样,我在工作的这几年也经历了几种写法。

一、几种常见的并行处理写法

方法一:Future写法

其代码形式如下

  1. @Test
  2. public void test1() {
  3. //定义线程池
  4. ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4, 30,
  5. TimeUnit.SECONDS,
  6. new ArrayBlockingQueue<>(10),
  7. Executors.defaultThreadFactory(),
  8. new ThreadPoolExecutor.DiscardPolicy());
  9. //异步执行
  10. Future<String> getUserName = threadPoolExecutor.submit(() -> {
  11. //do something...
  12. return "kdyzm";
  13. });
  14. //异步执行
  15. Future<Integer> getUserAge = threadPoolExecutor.submit(() -> {
  16. //do something...
  17. return 12;
  18. });
  19. //拼装回调结果
  20. try {
  21. UserInfo user = new UserInfo();
  22. user.setName(getUserName.get());
  23. user.setAge(getUserAge.get());
  24. log.info(JsonUtils.toPrettyString(user));
  25. } catch (InterruptedException | ExecutionException e) {
  26. e.printStackTrace();
  27. }
  28. }
  29. @Data
  30. static class UserInfo {
  31. private String name;
  32. private Integer age;
  33. }

多几个submit一起执行,最后集中get获取最终结果。

这种方式任务一旦多了,就会显得代码很乱,一堆的变量名会让代码可读性很差。

方法二:CompletableFuture.allOf写法

其代码形式如下

  1. @Test
  2. public void test2() {
  3. try {
  4. UserInfo userInfo = new UserInfo();
  5. CompletableFuture.allOf(
  6. //异步执行
  7. CompletableFuture.runAsync(() -> {
  8. userInfo.setName("kdyzm");
  9. }),
  10. //异步执行
  11. CompletableFuture.runAsync(() -> {
  12. userInfo.setAge(12);
  13. })
  14. //同步返回
  15. ).get();
  16. log.info(JsonUtils.toPrettyString(userInfo));
  17. } catch (InterruptedException | ExecutionException e) {
  18. e.printStackTrace();
  19. }
  20. }
  21. @Data
  22. static class UserInfo {
  23. private String name;
  24. private Integer age;
  25. }

这种方法使用了CompletableFuture的API,通过将多个异步任务收集起来统一调度最后通过一个get方法同步到主线程。比直接使用Future简化了些。

方法三:CompletableFuture::join写法

其代码形式如下

  1. @Test
  2. public void test3(){
  3. UserInfo userInfo = new UserInfo();
  4. Arrays.asList(
  5. //异步执行
  6. CompletableFuture.supplyAsync(()->{
  7. return "kdyzm";
  8. //回调执行
  9. }).thenAccept(name->{
  10. userInfo.setName(name);
  11. }),
  12. //异步执行
  13. CompletableFuture.supplyAsync(()->{
  14. return 12;
  15. //回调执行
  16. }).thenAccept(age->{
  17. userInfo.setAge(age);
  18. })
  19. //等待所有线程执行完毕
  20. ).forEach(CompletableFuture::join);
  21. log.info(JsonUtils.toPrettyString(userInfo));
  22. }
  23. @Data
  24. static class UserInfo {
  25. private String name;
  26. private Integer age;
  27. }

这种写法和上面的写法相比具有更高的可读性,但是它也有缺点:thenAccept只能接收一个返回值,如果想处理多个值,则没有办法,只能使用方法2。

总结

几种写法中第二、三种写法比较常见,使用起来也更加方便,两者各有优缺点:方法2能处理多个返回值,方法3可读性更高。但是无论是方法2还是方法3,它们的使用总是要记住相关的API,使用起来总不是很顺手,可读性虽然方法3更强一些,但是总还是差点意思。此时我就有了自己设计一个简单的并行处理工具类的想法,既要易用,还要可读性高。

二、并行处理工具类设计

1、设计模式选型

因为平时比较喜欢链式调用的API,所以一开始一开始设计,我就想用建造者模式来实现这个工具类。关于建造者模式,详情可以看我之前的文章:设计模式(六):建造者模式 。建造者模式在实际应用中的特点就是链式调用,无论是StringBuilder还是lombok的@Data注解,都使用了建造者模式。

2、第一版代码

仿照方法三,我开发了第一版代码

  1. import lombok.Data;
  2. import lombok.extern.slf4j.Slf4j;
  3. import java.util.ArrayList;
  4. import java.util.List;
  5. import java.util.concurrent.CompletableFuture;
  6. import java.util.function.Consumer;
  7. import java.util.function.Supplier;
  8. /**
  9. * @author kdyzm
  10. */
  11. @Slf4j
  12. public class ConcurrentWorker {
  13. private List<Task> workers = new ArrayList<>();
  14. public static ConcurrentWorker runner() {
  15. return new ConcurrentWorker();
  16. }
  17. public <R> ConcurrentWorker addTask(Consumer<? super R> action, Supplier<R> value) {
  18. Task<R> worker = new Task<>(action, value);
  19. this.workers.add(worker);
  20. return this;
  21. }
  22. public void run() {
  23. workers.forEach(item -> {
  24. CompletableFuture completableFuture = CompletableFuture.supplyAsync(item.getValue());
  25. item.setCompletableFuture(completableFuture);
  26. });
  27. workers
  28. .stream()
  29. .map(
  30. item -> {
  31. return item.completableFuture.thenAccept(item.getAction());
  32. }
  33. )
  34. .forEach(CompletableFuture::join);
  35. }
  36. @Data
  37. public static class Task<R> {
  38. private Consumer<? super R> action;
  39. private Supplier<R> value;
  40. private CompletableFuture<R> completableFuture;
  41. public Task(Consumer<? super R> action, Supplier<R> value) {
  42. this.action = action;
  43. this.value = value;
  44. }
  45. }
  46. }

这段代码一共不到60行,使用了Lambda表达式和函数式编程相关的API对方法三进行改造,最终使用效果如下

  1. @Test
  2. public void test() {
  3. UserInfo userInfo = new UserInfo();
  4. ConcurrentWorker.runner()
  5. //添加任务
  6. .addTask(userInfo::setName, () -> {
  7. //延迟1000毫秒打印线程执行情况
  8. try {
  9. Thread.sleep(1000);
  10. } catch (InterruptedException e) {
  11. e.printStackTrace();
  12. }
  13. log.info(Thread.currentThread().getName()+"-name");
  14. return "张三";
  15. })
  16. //添加任务
  17. .addTask(userInfo::setAge, () -> {
  18. //延迟1000毫秒打印线程执行情况
  19. try {
  20. Thread.sleep(1000);
  21. } catch (InterruptedException e) {
  22. e.printStackTrace();
  23. }
  24. log.info(Thread.currentThread().getName()+"-age");
  25. return 13;
  26. })
  27. //执行任务
  28. .run();
  29. log.info(JsonUtils.toPrettyString(userInfo));
  30. }
  31. @Data
  32. static class UserInfo {
  33. private String name;
  34. private Integer age;
  35. private String sex;
  36. }

它的使用方式就是

  1. ConcurrentWorker.runner()
  2. .addTask(setter function, return_value function )
  3. .addTask(setter function, return_value function)
  4. .run()

可以看到易用性够了,可读性也很好,但是它的缺点和方法三一样,都只能接收一个参数,毕竟它是根据方法3封装的,接下来改造代码让它支持多参数处理。

3、第二版代码

已知,第一版代码已经支持了如下形式的功能

  1. ConcurrentWorker.runner()
  2. .addTask(setter function, return_value function )
  3. .addTask(setter function, return_value function)
  4. .run()

现在我想添加以下形式的重载方法

  1. .addTask(handle function)

没错,就一个参数,在这个方法中可以任意设置对象值。最终使用的效果如下

  1. @Test
  2. public void test() {
  3. UserInfo userInfo = new UserInfo();
  4. ConcurrentWorker.runner()
  5. .addTask(userInfo::setName, () -> {
  6. try {
  7. Thread.sleep(1000);
  8. log.info(Thread.currentThread().getName());
  9. } catch (InterruptedException e) {
  10. e.printStackTrace();
  11. }
  12. log.info(Thread.currentThread().getName()+"-name");
  13. return "张三";
  14. })
  15. .addTask(userInfo::setAge, () -> {
  16. try {
  17. Thread.sleep(1000);
  18. log.info(Thread.currentThread().getName());
  19. } catch (InterruptedException e) {
  20. e.printStackTrace();
  21. }
  22. log.info(Thread.currentThread().getName()+"-age");
  23. return 13;
  24. })
  25. //新方法:处理任意多属性值填充
  26. .addTask(()->{
  27. try {
  28. Thread.sleep(1000);
  29. } catch (InterruptedException e) {
  30. e.printStackTrace();
  31. }
  32. log.info(Thread.currentThread().getName()+"-sex");
  33. userInfo.setSex("男");
  34. })
  35. .run();
  36. log.info(JsonUtils.toPrettyString(userInfo));
  37. }
  38. @Data
  39. static class UserInfo {
  40. private String name;
  41. private Integer age;
  42. private String sex;
  43. }

完整工具类方法如下

  1. import lombok.Data;
  2. import lombok.extern.slf4j.Slf4j;
  3. import java.util.ArrayList;
  4. import java.util.List;
  5. import java.util.concurrent.CompletableFuture;
  6. import java.util.function.Consumer;
  7. import java.util.function.Supplier;
  8. /**
  9. * @author kdyzm
  10. */
  11. @Slf4j
  12. public class ConcurrentWorker {
  13. private List<Task> workers = new ArrayList<>();
  14. public static ConcurrentWorker runner() {
  15. return new ConcurrentWorker();
  16. }
  17. public <R> ConcurrentWorker addTask(Consumer<? super R> action, Supplier<R> value) {
  18. Task<R> worker = new Task<>(action, value);
  19. this.workers.add(worker);
  20. return this;
  21. }
  22. public <R> ConcurrentWorker addTask(Runnable runnable) {
  23. Task<R> worker = new Task<>(runnable);
  24. this.workers.add(worker);
  25. return this;
  26. }
  27. public void run() {
  28. workers.forEach(item -> {
  29. int taskType = item.getTaskType();
  30. CompletableFuture completableFuture = null;
  31. switch (taskType) {
  32. case TaskType.RETURN_VALUE:
  33. completableFuture = CompletableFuture.supplyAsync(item.getValue());
  34. break;
  35. case TaskType.VOID_RETURN:
  36. completableFuture = CompletableFuture.runAsync(item.getRunnable());
  37. break;
  38. default:
  39. break;
  40. }
  41. item.setCompletableFuture(completableFuture);
  42. });
  43. workers
  44. .stream()
  45. .map(
  46. item -> {
  47. int taskType = item.getTaskType();
  48. switch (taskType) {
  49. case TaskType.RETURN_VALUE:
  50. return item.completableFuture.thenAccept(item.getAction());
  51. default:
  52. return item.completableFuture.thenAccept(temp->{
  53. //空
  54. });
  55. }
  56. }
  57. )
  58. .forEach(CompletableFuture::join);
  59. }
  60. @Data
  61. public static class Task<R> {
  62. private Consumer<? super R> action;
  63. private Supplier<R> value;
  64. private CompletableFuture<R> completableFuture;
  65. private Runnable runnable;
  66. private int taskType;
  67. public Task(Consumer<? super R> action, Supplier<R> value) {
  68. this.action = action;
  69. this.value = value;
  70. this.taskType = TaskType.RETURN_VALUE;
  71. }
  72. public Task(Runnable runnable) {
  73. this.runnable = runnable;
  74. this.taskType = TaskType.VOID_RETURN;
  75. }
  76. }
  77. public static class TaskType {
  78. /**
  79. * 有返回值的
  80. */
  81. public static final int RETURN_VALUE = 1;
  82. /**
  83. * 没有返回值的
  84. */
  85. public static final int VOID_RETURN = 2;
  86. }
  87. }

我将任务类型分为两种,并使用TaskType类封装成常量值:1表示任务执行回调有返回值;2表示任务执行没有返回值,属性填充将在任务执行过程中完成,该类型任务使用Runnable接口实现。

4、工具类jar包

相关代码我已经打包成jar包上传到maven中央仓库,可以通过引入以下maven依赖使用ConcurrentWorker工具类

  1. <dependency>
  2. <groupId>cn.kdyzm</groupId>
  3. <artifactId>kdyzm-util</artifactId>
  4. <version>0.0.2</version>
  5. </dependency>


最后,欢迎关注我的博客:https://blog.kdyzm.cn

END.

原文链接:https://www.cnblogs.com/kuangdaoyizhimei/p/18344600

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号