经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » Java相关 » Spring Boot » 查看文章
SpringBoot线程池和Java线程池的用法和实现原理
来源:cnblogs  作者:twilight0402  时间:2023/4/12 11:16:50  对本文有异议

使用默认的线程池

方式一:通过@Async注解调用

  1. public class AsyncTest {
  2. @Async
  3. public void async(String name) throws InterruptedException {
  4. System.out.println("async" + name + " " + Thread.currentThread().getName());
  5. Thread.sleep(1000);
  6. }
  7. }

启动类上需要添加@EnableAsync注解,否则不会生效。

  1. @SpringBootApplication
  2. //@EnableAsync
  3. public class Test1Application {
  4. public static void main(String[] args) throws InterruptedException {
  5. ConfigurableApplicationContext run = SpringApplication.run(Test1Application.class, args);
  6. AsyncTest bean = run.getBean(AsyncTest.class);
  7. for(int index = 0; index <= 10; ++index){
  8. bean.async(String.valueOf(index));
  9. }
  10. }
  11. }

方式二:直接注入 ThreadPoolTaskExecutor

此时可不加 @EnableAsync注解

  1. @SpringBootTest
  2. class Test1ApplicationTests {
  3. @Resource
  4. ThreadPoolTaskExecutor threadPoolTaskExecutor;
  5. @Test
  6. void contextLoads() {
  7. Runnable runnable = () -> {
  8. System.out.println(Thread.currentThread().getName());
  9. };
  10. for(int index = 0; index <= 10; ++index){
  11. threadPoolTaskExecutor.submit(runnable);
  12. }
  13. }
  14. }

线程池默认配置信息

SpringBoot线程池的常见配置:

  1. spring:
  2. task:
  3. execution:
  4. pool:
  5. core-size: 8
  6. max-size: 16 # 默认是 Integer.MAX_VALUE
  7. keep-alive: 60s # 当线程池中的线程数量大于 corePoolSize 时,如果某线程空闲时间超过keepAliveTime,线程将被终止
  8. allow-core-thread-timeout: true # 是否允许核心线程超时,默认true
  9. queue-capacity: 100 # 线程队列的大小,默认Integer.MAX_VALUE
  10. shutdown:
  11. await-termination: false # 线程关闭等待
  12. thread-name-prefix: task- # 线程名称的前缀

SpringBoot 线程池的实现原理

TaskExecutionAutoConfiguration 类中定义了 ThreadPoolTaskExecutor,该类的内部实现也是基于java原生的 ThreadPoolExecutor类。initializeExecutor()方法在其父类中被调用,但是在父类中 RejectedExecutionHandler 被定义为了 private RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.AbortPolicy(); ,并通过initialize()方法将AbortPolicy传入initializeExecutor()中。

注意在TaskExecutionAutoConfiguration 类中,ThreadPoolTaskExecutor类的bean的名称为: applicationTaskExecutortaskExecutor

  1. // TaskExecutionAutoConfiguration#applicationTaskExecutor()
  2. @Lazy
  3. @Bean(name = { APPLICATION_TASK_EXECUTOR_BEAN_NAME,
  4. AsyncAnnotationBeanPostProcessor.DEFAUL
  5. T_TASK_EXECUTOR_BEAN_NAME })
  6. @ConditionalOnMissingBean(Executor.class)
  7. public ThreadPoolTaskExecutor applicationTaskExecutor(TaskExecutorBuilder builder) {
  8. return builder.build();
  9. }
  1. // ThreadPoolTaskExecutor#initializeExecutor()
  2. @Override
  3. protected ExecutorService initializeExecutor(
  4. ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
  5. BlockingQueue<Runnable> queue = createQueue(this.queueCapacity);
  6. ThreadPoolExecutor executor;
  7. if (this.taskDecorator != null) {
  8. executor = new ThreadPoolExecutor(
  9. this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
  10. queue, threadFactory, rejectedExecutionHandler) {
  11. @Override
  12. public void execute(Runnable command) {
  13. Runnable decorated = taskDecorator.decorate(command);
  14. if (decorated != command) {
  15. decoratedTaskMap.put(decorated, command);
  16. }
  17. super.execute(decorated);
  18. }
  19. };
  20. }
  21. else {
  22. executor = new ThreadPoolExecutor(
  23. this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
  24. queue, threadFactory, rejectedExecutionHandler);
  25. }
  26. if (this.allowCoreThreadTimeOut) {
  27. executor.allowCoreThreadTimeOut(true);
  28. }
  29. this.threadPoolExecutor = executor;
  30. return executor;
  31. }
  1. // ExecutorConfigurationSupport#initialize()
  2. public void initialize() {
  3. if (logger.isInfoEnabled()) {
  4. logger.info("Initializing ExecutorService" + (this.beanName != null ? " '" + this.beanName + "'" : ""));
  5. }
  6. if (!this.threadNamePrefixSet && this.beanName != null) {
  7. setThreadNamePrefix(this.beanName + "-");
  8. }
  9. this.executor = initializeExecutor(this.threadFactory, this.rejectedExecutionHandler);
  10. }

覆盖默认的线程池

覆盖默认的 taskExecutor对象,bean的返回类型可以是ThreadPoolTaskExecutor也可以是Executor

  1. @Configuration
  2. public class ThreadPoolConfiguration {
  3. @Bean("taskExecutor")
  4. public ThreadPoolTaskExecutor taskExecutor() {
  5. ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
  6. //设置线程池参数信息
  7. taskExecutor.setCorePoolSize(10);
  8. taskExecutor.setMaxPoolSize(50);
  9. taskExecutor.setQueueCapacity(200);
  10. taskExecutor.setKeepAliveSeconds(60);
  11. taskExecutor.setThreadNamePrefix("myExecutor--");
  12. taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
  13. taskExecutor.setAwaitTerminationSeconds(60);
  14. //修改拒绝策略为使用当前线程执行
  15. taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
  16. //初始化线程池
  17. taskExecutor.initialize();
  18. return taskExecutor;
  19. }
  20. }

管理多个线程池

如果出现了多个线程池,例如再定义一个线程池 taskExecutor2,则直接执行会报错。此时需要指定bean的名称即可。

  1. @Bean("taskExecutor2")
  2. public ThreadPoolTaskExecutor taskExecutor2() {
  3. ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
  4. //设置线程池参数信息
  5. taskExecutor.setCorePoolSize(10);
  6. taskExecutor.setMaxPoolSize(50);
  7. taskExecutor.setQueueCapacity(200);
  8. taskExecutor.setKeepAliveSeconds(60);
  9. taskExecutor.setThreadNamePrefix("myExecutor2--");
  10. taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
  11. taskExecutor.setAwaitTerminationSeconds(60);
  12. //修改拒绝策略为使用当前线程执行
  13. taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
  14. //初始化线程池
  15. taskExecutor.initialize();
  16. return taskExecutor;
  17. }

引用线程池时,需要将变量名更改为bean的名称,这样会按照名称查找。

  1. @Resource
  2. ThreadPoolTaskExecutor taskExecutor2;

对于使用@Async注解的多线程则在注解中指定bean的名字即可。

  1. @Async("taskExecutor2")
  2. public void async(String name) throws InterruptedException {
  3. System.out.println("async" + name + " " + Thread.currentThread().getName());
  4. Thread.sleep(1000);
  5. }

线程池的四种拒绝策略

JAVA常用的四种线程池

ThreadPoolExecutor 类的构造函数如下:

  1. public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
  2. BlockingQueue<Runnable> workQueue) {
  3. this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
  4. Executors.defaultThreadFactory(), defaultHandler);
  5. }

newCachedThreadPool

不限制最大线程数(maximumPoolSize=Integer.MAX_VALUE),如果有空闲的线程超过需要,则回收,否则重用已有的线程。

  1. new ThreadPoolExecutor(0, Integer.MAX_VALUE,
  2. 60L, TimeUnit.SECONDS,
  3. new SynchronousQueue<Runnable>());

newFixedThreadPool

定长线程池,超出线程数的任务会在队列中等待。

  1. return new ThreadPoolExecutor(nThreads, nThreads,
  2. 0L, TimeUnit.MILLISECONDS,
  3. new LinkedBlockingQueue<Runnable>());

newScheduledThreadPool

类似于newCachedThreadPool,线程数无上限,但是可以指定corePoolSize。可实现延迟执行、周期执行。

  1. public ScheduledThreadPoolExecutor(int corePoolSize) {
  2. super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
  3. new DelayedWorkQueue());
  4. }

周期执行:

  1. ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
  2. scheduledThreadPool.scheduleAtFixedRate(()->{
  3. System.out.println("rate");
  4. }, 1, 1, TimeUnit.SECONDS);

延时执行:

  1. scheduledThreadPool.schedule(()->{
  2. System.out.println("delay 3 seconds");
  3. }, 3, TimeUnit.SECONDS);

newSingleThreadExecutor

单线程线程池,可以实现线程的顺序执行。

  1. public static ExecutorService newSingleThreadExecutor() {
  2. return new FinalizableDelegatedExecutorService
  3. (new ThreadPoolExecutor(1, 1,
  4. 0L, TimeUnit.MILLISECONDS,
  5. new LinkedBlockingQueue<Runnable>()));
  6. }

Java 线程池中的四种拒绝策略

  • CallerRunsPolicy:线程池让调用者去执行。

  • AbortPolicy:如果线程池拒绝了任务,直接报错。

  • DiscardPolicy:如果线程池拒绝了任务,直接丢弃。

  • DiscardOldestPolicy:如果线程池拒绝了任务,直接将线程池中最旧的,未运行的任务丢弃,将新任务入队。

CallerRunsPolicy

直接在主线程中执行了run方法。

  1. public static class CallerRunsPolicy implements RejectedExecutionHandler {
  2. public CallerRunsPolicy() { }
  3. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  4. if (!e.isShutdown()) {
  5. r.run();
  6. }
  7. }
  8. }

效果类似于:

  1. Runnable thread = ()->{
  2. System.out.println(Thread.currentThread().getName());
  3. try {
  4. Thread.sleep(0);
  5. } catch (InterruptedException e) {
  6. throw new RuntimeException(e);
  7. }
  8. };
  9. thread.run();

AbortPolicy

直接抛出RejectedExecutionException异常,并指示任务的信息,线程池的信息。、

  1. public static class AbortPolicy implements RejectedExecutionHandler {
  2. public AbortPolicy() { }
  3. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  4. throw new RejectedExecutionException("Task " + r.toString() +
  5. " rejected from " +
  6. e.toString());
  7. }
  8. }

DiscardPolicy

什么也不做。

  1. public static class DiscardPolicy implements RejectedExecutionHandler {
  2. public DiscardPolicy() { }
  3. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  4. }
  5. }

DiscardOldestPolicy

  • e.getQueue().poll() : 取出队列最旧的任务。

  • e.execute(r) : 当前任务入队。

  1. public static class DiscardOldestPolicy implements RejectedExecutionHandler {
  2. public DiscardOldestPolicy() { }
  3. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  4. if (!e.isShutdown()) {
  5. e.getQueue().poll();
  6. e.execute(r);
  7. }
  8. }
  9. }

Java 线程复用的原理

java的线程池中保存的是 java.util.concurrent.ThreadPoolExecutor.Worker 对象,该对象在 被维护在private final HashSet<Worker> workers = new HashSet<Worker>();workQueue是保存待执行的任务的队列,线程池中加入新的任务时,会将任务加入到workQueue队列中。

  1. private final class Worker
  2. extends AbstractQueuedSynchronizer
  3. implements Runnable
  4. {
  5. /**
  6. * This class will never be serialized, but we provide a
  7. * serialVersionUID to suppress a javac warning.
  8. */
  9. private static final long serialVersionUID = 6138294804551838833L;
  10. /** Thread this worker is running in. Null if factory fails. */
  11. final Thread thread;
  12. /** Initial task to run. Possibly null. */
  13. Runnable firstTask;
  14. /** Per-thread task counter */
  15. volatile long completedTasks;
  16. /**
  17. * Creates with given first task and thread from ThreadFactory.
  18. * @param firstTask the first task (null if none)
  19. */
  20. Worker(Runnable firstTask) {
  21. setState(-1); // inhibit interrupts until runWorker
  22. this.firstTask = firstTask;
  23. this.thread = getThreadFactory().newThread(this);
  24. }
  25. /** Delegates main run loop to outer runWorker */
  26. public void run() {
  27. runWorker(this);
  28. }
  29. // Lock methods
  30. //
  31. // The value 0 represents the unlocked state.
  32. // The value 1 represents the locked state.
  33. protected boolean isHeldExclusively() {
  34. return getState() != 0;
  35. }
  36. protected boolean tryAcquire(int unused) {
  37. if (compareAndSetState(0, 1)) {
  38. setExclusiveOwnerThread(Thread.currentThread());
  39. return true;
  40. }
  41. return false;
  42. }
  43. protected boolean tryRelease(int unused) {
  44. setExclusiveOwnerThread(null);
  45. setState(0);
  46. return true;
  47. }
  48. public void lock() { acquire(1); }
  49. public boolean tryLock() { return tryAcquire(1); }
  50. public void unlock() { release(1); }
  51. public boolean isLocked() { return isHeldExclusively(); }
  52. void interruptIfStarted() {
  53. Thread t;
  54. if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
  55. try {
  56. t.interrupt();
  57. } catch (SecurityException ignore) {
  58. }
  59. }
  60. }
  61. }

work对象的执行依赖于 runWorker(),与我们平时写的线程不同,该线程处在一个循环中,并不断地从队列中获取新的任务执行。因此线程池中的线程才可以复用,而不是像我们平常使用的线程一样执行完毕就结束。

  1. final void runWorker(Worker w) {
  2. Thread wt = Thread.currentThread();
  3. Runnable task = w.firstTask;
  4. w.firstTask = null;
  5. w.unlock(); // allow interrupts
  6. boolean completedAbruptly = true;
  7. try {
  8. while (task != null || (task = getTask()) != null) {
  9. w.lock();
  10. // If pool is stopping, ensure thread is interrupted;
  11. // if not, ensure thread is not interrupted. This
  12. // requires a recheck in second case to deal with
  13. // shutdownNow race while clearing interrupt
  14. if ((runStateAtLeast(ctl.get(), STOP) ||
  15. (Thread.interrupted() &&
  16. runStateAtLeast(ctl.get(), STOP))) &&
  17. !wt.isInterrupted())
  18. wt.interrupt();
  19. try {
  20. beforeExecute(wt, task);
  21. Throwable thrown = null;
  22. try {
  23. task.run();
  24. } catch (RuntimeException x) {
  25. thrown = x; throw x;
  26. } catch (Error x) {
  27. thrown = x; throw x;
  28. } catch (Throwable x) {
  29. thrown = x; throw new Error(x);
  30. } finally {
  31. afterExecute(task, thrown);
  32. }
  33. } finally {
  34. task = null;
  35. w.completedTasks++;
  36. w.unlock();
  37. }
  38. }
  39. completedAbruptly = false;
  40. } finally {
  41. processWorkerExit(w, completedAbruptly);
  42. }
  43. }

原文链接:https://www.cnblogs.com/twilight0402/p/17305328.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号