经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » Java相关 » Java » 查看文章
JUC并发—7.AQS源码分析三
来源:cnblogs  作者:东阳马生架构  时间:2025/2/20 10:39:25  对本文有异议

大纲

1.等待多线程完成的CountDownLatch介绍

2.CountDownLatch.await()方法源码

3.CountDownLatch.coutDown()方法源码

4.CountDownLatch总结

5.控制并发线程数的Semaphore介绍

6.Semaphore的令牌获取过程

7.Semaphore的令牌释放过程

8.同步屏障CyclicBarrier介绍

9.CyclicBarrier的await()方法源码

10.使用CountDownLatch等待注册的完成

11.使用CyclicBarrier将工作任务多线程分而治之

12.使用CyclicBarrier聚合服务接口的返回结果

13.使用Semaphore等待指定数量线程完成任务

 

volatile、synchronized、CAS、AQS、读写锁、锁优化和锁故障、并发集合、线程池、同步组件

 

1.等待多线程完成的CountDownLatch

(1)CountDownLatch的简介

(2)CountDownLatch的应用

(3)CountDownLatch的例子

 

(1)CountDownLatch的简介

CountDownLatch允许一个或多个线程等待其他线程完成操作。CountDownLatch提供了两个核心方法,分别是await()方法和countDown()方法。CountDownLatch.await()方法让调用线程进行阻塞进入等待状态,CountDownLatch.countDown()方法用于对计数器进行递减。

 

CountDownLatch在构造时需要传入一个正整数作为计数器初始值。线程每调用一次countDown()方法,都会对该计数器减一。当计数器为0时,会唤醒所有执行await()方法时被阻塞的线程。

 

(2)CountDownLatch的应用

应用一:

使用多线程去解析一个Excel里多个sheet的数据,每个线程解析一个sheet里的数据,等所有sheet解析完再提示处理完成。此时便可以使用CountDownLatch来实现,当然可以使用Thread.join()方法。

 

注意:Thread.join()方法是基于wait()和notify()来实现的。在main线程里开启一个线程A,main线程如果执行了线程A的join()方法,那么就会导致main线程被阻塞,main线程会等待线程A执行完毕才会继续往下执行。

 

应用二:

微服务注册中心的register-client,为了在注册线程执行成功后,才发送心跳。可以使用CountDownLatch,当然也可以使用Thread.join()方法。

 

应用三:

可以通过CountDownLatch实现类似并发的效果。把CountDownLatch的计数器设置为1,然后让1000个线程调用await()方法。当1000个线程初始化完成后,在main线程调用countDown()让计数器归零。这样这1000个线程就会在一个for()循环中,依次被唤醒。

 

(3)CountDownLatch的例子

  1. public class CountDownLatchDemo {
  2. public static void main(String[] args) throws Exception {
  3. final CountDownLatch latch = new CountDownLatch(2);
  4. new Thread() {
  5. public void run() {
  6. try {
  7. Thread.sleep(1000);
  8. System.out.println("线程1开始执行,休眠2秒...");
  9. Thread.sleep(1000);
  10. System.out.println("线程1准备执行countDown操作...");
  11. latch.countDown();
  12. System.out.println("线程1完成执行countDown操作...");
  13. } catch (Exception e) {
  14. e.printStackTrace();
  15. }
  16. }
  17. }.start();
  18. new Thread() {
  19. public void run() {
  20. try {
  21. Thread.sleep(1000);
  22. System.out.println("线程2开始执行,休眠2秒...");
  23. Thread.sleep(1000);
  24. System.out.println("线程2准备执行countDown操作...");
  25. latch.countDown();
  26. System.out.println("线程2完成执行countDown操作...");
  27. } catch (Exception e) {
  28. e.printStackTrace();
  29. }
  30. }
  31. }.start();
  32. System.out.println("main线程准备执行countDownLatch的await操作,将会同步阻塞等待...");
  33. latch.await();
  34. System.out.println("所有线程都完成countDown操作,结束同步阻塞等待...");
  35. }
  36. }

 

2.CountDownLatch.await()方法源码

(1)CountDownLatch.await()方法的阻塞流程

(2)CountDownLatch.await()方法的唤醒流程

(3)CountDownLatch.await()方法的阻塞总结

 

(1)CountDownLatch.await()方法的阻塞流程

CountDownLatch是基于AQS中的共享锁来实现的。从CountDownLatch的构造方法可知,CountDownLatch的count就是AQS的state。

 

调用CountDownLatch的await()方法时,会先调用AQS的acquireSharedInterruptibly()模版方法,然后会调用CountDownLatch的内部类Sync实现的tryAcquireShared()方法。tryAcquireShared()方法会判断state的值是否为0,如果为0,才返回1,否则返回-1。

 

当调用CountDownLatch内部类Sync的tryAcquireShared()方法获得的返回值是-1时,才会调用AQS的doAcquireSharedInterruptibly()方法,将当前线程封装成Node结点加入等待队列,然后挂起当前线程进行阻塞。

  1. //A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.
  2. public class CountDownLatch {
  3. private final Sync sync;
  4. public CountDownLatch(int count) {
  5. if (count < 0) {
  6. throw new IllegalArgumentException("count < 0");
  7. }
  8. this.sync = new Sync(count);
  9. }
  10. //Synchronization control For CountDownLatch.
  11. //Uses AQS state to represent count.
  12. private static final class Sync extends AbstractQueuedSynchronizer {
  13. Sync(int count) {
  14. setState(count);
  15. }
  16. int getCount() {
  17. return getState();
  18. }
  19. protected int tryAcquireShared(int acquires) {
  20. return (getState() == 0) ? 1 : -1;
  21. }
  22. protected boolean tryReleaseShared(int releases) {
  23. //Decrement count; signal when transition to zero
  24. for (;;) {
  25. int c = getState();
  26. if (c == 0) {
  27. return false;
  28. }
  29. int nextc = c-1;
  30. if (compareAndSetState(c, nextc)) {
  31. return nextc == 0;
  32. }
  33. }
  34. }
  35. }
  36. //Causes the current thread to wait until the latch has counted down to zero,
  37. //unless the thread is Thread#interrupt interrupted.
  38. public void await() throws InterruptedException {
  39. //执行AQS的acquireSharedInterruptibly()方法
  40. sync.acquireSharedInterruptibly(1);
  41. }
  42. ...
  43. }
  44. public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
  45. ...
  46. //Acquires in shared mode, aborting if interrupted.
  47. //Implemented by first checking interrupt status, then invoking at least once #tryAcquireShared, returning on success.
  48. //Otherwise the thread is queued, possibly repeatedly blocking and unblocking,
  49. //invoking #tryAcquireShared until success or the thread is interrupted.
  50. public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
  51. if (Thread.interrupted()) {
  52. throw new InterruptedException();
  53. }
  54. //执行CountDownLatch的内部类Sync实现的tryAcquireShared()方法,抢占共享锁
  55. if (tryAcquireShared(arg) < 0) {
  56. //执行AQS的doAcquireSharedInterruptibly()方法
  57. doAcquireSharedInterruptibly(arg);
  58. }
  59. }
  60. //Acquires in shared interruptible mode.
  61. private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
  62. final Node node = addWaiter(Node.SHARED);//封装当前线程为Shared类型的Node结点
  63. boolean failed = true;
  64. try {
  65. //第一次循环r = -1,所以会执行AQS的shouldParkAfterFailedAcquire()方法
  66. //将node结点的有效前驱结点的状态设置为SIGNAL
  67. for (;;) {
  68. final Node p = node.predecessor();//node结点的前驱结点
  69. if (p == head) {
  70. int r = tryAcquireShared(arg);
  71. if (r >= 0) {
  72. setHeadAndPropagate(node, r);
  73. p.next = null; // help GC
  74. failed = false;
  75. return;
  76. }
  77. }
  78. //执行shouldParkAfterFailedAcquire()方法设置node结点的前驱结点的状态为SIGNAL
  79. //执行parkAndCheckInterrupt()方法挂起当前线程
  80. if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {
  81. throw new InterruptedException();
  82. }
  83. }
  84. } finally {
  85. if (failed) {
  86. cancelAcquire(node);
  87. }
  88. }
  89. }
  90. //Checks and updates status for a node that failed to acquire.
  91. //Returns true if thread should block. This is the main signal control in all acquire loops.
  92. private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
  93. int ws = pred.waitStatus;
  94. if (ws == Node.SIGNAL) {
  95. //This node has already set status asking a release to signal it, so it can safely park.
  96. return true;
  97. }
  98. if (ws > 0) {
  99. //Predecessor was cancelled. Skip over predecessors and indicate retry.
  100. do {
  101. node.prev = pred = pred.prev;
  102. } while (pred.waitStatus > 0);
  103. pred.next = node;
  104. } else {
  105. //waitStatus must be 0 or PROPAGATE.
  106. //Indicate that we need a signal, but don't park yet.
  107. //Caller will need to retry to make sure it cannot acquire before parking.
  108. compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
  109. }
  110. return false;
  111. }
  112. //设置头结点和唤醒后续线程
  113. //Sets head of queue, and checks if successor may be waiting in shared mode,
  114. //if so propagating if either propagate > 0 or PROPAGATE status was set.
  115. private void setHeadAndPropagate(Node node, int propagate) {
  116. Node h = head;
  117. setHead(node);//将node结点设置为头结点
  118. if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) {
  119. Node s = node.next;
  120. if (s == null || s.isShared()) {
  121. doReleaseShared();
  122. }
  123. }
  124. }
  125. private void setHead(Node node) {
  126. head = node;
  127. node.thread = null;
  128. node.prev = null;
  129. }
  130. ...
  131. }

(2)CountDownLatch.await()方法的唤醒流程

调用await()方法时,首先会将当前线程封装成Node结点并添加到等待队列中,然后在执行第一次for循环时会设置该Node结点的前驱结点状态为SIGNAL,接着在执行第二次for循环时才会将当前线程进行挂起阻塞。

 

当该线程后续被唤醒时,该线程又会进入下一次for循环。如果该线程对应的node结点的前驱结点是等待队列的头结点且state值已为0,那么就执行AQS的setHeadAndPropagate()方法设置头结点 + 唤醒后续线程。

 

其中setHeadAndPropagate()方法有两个工作(设置头结点 + 唤醒传递):

工作一:设置当前被唤醒线程对应的结点为头结点

工作二:当满足如下这两个条件的时候需要调用doReleaseShared()方法唤醒后续的线程

条件一:propagate > 0,表示当前是共享锁,需要进行唤醒传递

条件二:s.isShared()判断当前结点为共享模式

 

CountDownLatch的实现中会在以下两个场景调用doReleaseShared()方法:

场景一:state为1时调用的countDown()方法会调用doReleaseShared()方法

场景二:当阻塞的线程被唤醒时,会调用setHeadAndPropagate()方法,进而调用doReleaseShared()方法,这样可以提升唤醒共享结点的速度

 

(3)CountDownLatch.await()方法的阻塞总结

只要state != 0,就会进行如下处理:

一.将当前线程封装成一个Node结点,然后添加到AQS的等待队列中

二.调用LockSupport.park()方法,挂起当前线程

 

3.CountDownLatch.coutDown()方法源码

(1)CountDownLatch.coutDown()的唤醒流程

(2)CountDownLatch.tryReleaseShared()

(3)AQS的doReleaseShared()方法

 

(1)CountDownLatch.coutDown()的唤醒流程

调用CountDownLatch的countDown()方法时,会先调用AQS的releaseShared()模版方法,然后会执行CountDownLatch的内部类Sync实现的tryReleaseShared()方法。

 

如果tryReleaseShared()方法返回true,则执行AQS的doReleaseShared()方法,通过AQS的doReleaseShared()方法唤醒共享锁模式下的等待队列中的线程。

  1. //A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.
  2. public class CountDownLatch {
  3. private final Sync sync;
  4. public CountDownLatch(int count) {
  5. if (count < 0) {
  6. throw new IllegalArgumentException("count < 0");
  7. }
  8. this.sync = new Sync(count);
  9. }
  10. //Synchronization control For CountDownLatch.
  11. //Uses AQS state to represent count.
  12. private static final class Sync extends AbstractQueuedSynchronizer {
  13. Sync(int count) {
  14. setState(count);
  15. }
  16. int getCount() {
  17. return getState();
  18. }
  19. protected int tryAcquireShared(int acquires) {
  20. return (getState() == 0) ? 1 : -1;
  21. }
  22. protected boolean tryReleaseShared(int releases) {
  23. //Decrement count; signal when transition to zero
  24. for (;;) {
  25. int c = getState();
  26. if (c == 0) {
  27. return false;
  28. }
  29. int nextc = c-1;
  30. if (compareAndSetState(c, nextc)) {
  31. return nextc == 0;
  32. }
  33. }
  34. }
  35. }
  36. //Decrements the count of the latch, releasing all waiting threads if the count reaches zero.
  37. public void countDown() {
  38. //执行AQS的releaseShared()方法
  39. sync.releaseShared(1);
  40. }
  41. ...
  42. }
  43. public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
  44. ...
  45. //Releases in shared mode.
  46. //Implemented by unblocking one or more threads if #tryReleaseShared returns true.
  47. public final boolean releaseShared(int arg) {
  48. //执行CountDownLatch的内部类Sync实现的tryReleaseShared()方法,释放共享锁
  49. if (tryReleaseShared(arg)) {
  50. //执行AQS的doReleaseShared()方法
  51. doReleaseShared();
  52. return true;
  53. }
  54. return false;
  55. }
  56. //Release action for shared mode -- signals successor and ensures propagation.
  57. //Note: For exclusive mode, release just amounts to calling unparkSuccessor of head if it needs signal.
  58. private void doReleaseShared() {
  59. for (;;) {
  60. //每次循环时头结点都会发生变化
  61. //因为调用unparkSuccessor()方法会唤醒doAcquireSharedInterruptibly()方法中阻塞的线程
  62. //然后阻塞的线程会在执行setHeadAndPropagate()方法时通过setHead()修改头结点
  63. Node h = head;//获取最新的头结点
  64. if (h != null && h != tail) {//等待队列中存在挂起线程的结点
  65. int ws = h.waitStatus;
  66. if (ws == Node.SIGNAL) {//头结点的状态正常,表示对应的线程可以被唤醒
  67. if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) {
  68. continue;//loop to recheck cases
  69. }
  70. //唤醒头结点的后继结点
  71. //唤醒的线程会在doAcquireSharedInterruptibly()方法中执行setHeadAndPropagate()方法修改头结点
  72. unparkSuccessor(h);
  73. } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) {
  74. //如果ws = 0表示初始状态,则修改结点为PROPAGATE状态
  75. continue;//loop on failed CAS
  76. }
  77. }
  78. if (h == head) {//判断头结点是否有变化
  79. break;//loop if head changed
  80. }
  81. }
  82. }
  83. //Wakes up node's successor, if one exists.
  84. private void unparkSuccessor(Node node) {
  85. int ws = node.waitStatus;
  86. if (ws < 0) {
  87. compareAndSetWaitStatus(node, ws, 0);
  88. }
  89. Node s = node.next;
  90. if (s == null || s.waitStatus > 0) {
  91. s = null;
  92. for (Node t = tail; t != null && t != node; t = t.prev) {
  93. if (t.waitStatus <= 0) {
  94. s = t;
  95. }
  96. }
  97. }
  98. if (s != null) {
  99. LockSupport.unpark(s.thread);
  100. }
  101. }
  102. ...
  103. }

(2)CountDownLatch.tryReleaseShared()

从tryReleaseShared()方法可知:每次countDown()其实就是把AQS的state值减1,然后通过CAS更新state值。如果CAS设置成功,那么就判断当前state值是否为0。如果是0那么就返回true,如果不是0那么就返回false。返回true的时候会调用AQS的doReleaseShared()方法,唤醒等待队列中的线程。

 

(3)AQS的doReleaseShared()方法

该方法要从AQS的等待队列中唤醒头结点的后继结点,需要满足:

条件一:等待队列中要存在挂起线程的结点(h != null && h != tail)

条件二:等待队列的头结点的状态正常(h.waitStatus = Node.SIGNAL)

 

在共享锁模式下,state为0时需要通过唤醒传递把所有挂起的线程都唤醒。首先doReleaseShared()方法会通过for(;;)进行自旋操作,每次循环都会通过Node h = head来获取等待队列中最新的头结点,然后通过if (h == head)来判断等待队列中的头结点是否发生变化。如果没有变化,则退出自旋。

 

注意:在共享锁模式下,被unparkSuccessor()唤醒的等待队列中的线程,会继续在在doAcquireSharedInterruptibly()方法中,执行setHeadAndPropagate()方法修改头结点,从而实现唤醒传递。

 

4.CountDownLatch总结

假设有两个线程A和B,分别调用了CountDownLatch的await()方法,此时state所表示的计数器不为0。所以线程A和B会被封装成SHARED类型的结点,并添加到AQS的等待队列中。

 

当线程C调用CountDownLatch的coutDown()方法后,如果state被递减到0,那么就会调用doReleaseShared()方法唤醒等待队列中的线程。然后被唤醒的线程会继续调用setHeadAndPropagate()方法实现唤醒传递,从而继续在doReleaseShared()方法中唤醒所有在等待队列中的被阻塞的线程。

 

5.控制并发线程数的Semaphore介绍

(1)Semaphore的作用

(2)Semaphore的方法

(3)Semaphore原理分析

 

(1)Semaphore的作用

Semaphore信号量用来控制同时访问特定资源的线程数量,有两核心方法。

方法一:acquire()方法,获取一个令牌

方法二:release()方法,释放一个令牌

 

多个线程访问某限制访问流量的资源时,可先调用acquire()获取访问令牌。如果能够正常获得,则表示允许访问。如果令牌不够,则会阻塞当前线程。当某个获得令牌的线程通过release()方法释放一个令牌后,被阻塞在acquire()方法的线程就有机会获得这个释放的令牌。

  1. public class SemaphoreDemo {
  2. public static void main(String[] args) throws InterruptedException {
  3. Semaphore semaphore = new Semaphore(10, true);//初始化10个资源,使用公平锁
  4. semaphore.acquire();//每次获取一个资源,如果获取不到,线程就会阻塞
  5. semaphore.release();//释放一个资源
  6. }
  7. }

(2)Semaphore的方法

Semaphore实际上并没有一个真实的令牌发给线程,Semaphore只是对一个可分配数量进行计数维护,或者说进行许可证管理。Semaphore可以在公共资源有限的场景下实现流量控制,如数据库连接。

  1. 一.Semaphore(permits, fair):permits表示令牌数,fair表示公平性
  2. 二.acquire(permits):获取指定数量的令牌,如果数量不足则阻塞当前线程
  3. 三.tryAcquire(permits):尝试获取指定数量的令牌,此过程是非阻塞的,成功返回true,失败返回false
  4. 四.release(permits):释放指定数量的令牌
  5. 五.drainPermits():当前线程获得剩下的所有令牌
  6. 六.hasQueuedThread():判断当前Semaphore实例上是否存在等待令牌的线程

(3)Semaphore原理分析

Semaphore也是基于AQS中的共享锁来实现的。在创建Semaphore实例时传递的参数permits,其实就是AQS中的state属性。每次调用Semaphore的acquire()方法,都会对state值进行递减。

 

所以从根本上说,Semaphore是通过重写AQS的两个方法来实现的:

方法一:tryAcquireShared(),抢占共享锁

方法二:tryReleaseShared(),释放共享锁

  1. public class Semaphore implements java.io.Serializable {
  2. private final Sync sync;
  3. //Creates a Semaphore with the given number of permits and nonfair fairness setting.
  4. public Semaphore(int permits) {
  5. sync = new NonfairSync(permits);
  6. }
  7. static final class NonfairSync extends Sync {
  8. NonfairSync(int permits) {
  9. super(permits);
  10. }
  11. protected int tryAcquireShared(int acquires) {
  12. return nonfairTryAcquireShared(acquires);
  13. }
  14. }
  15. //Acquires a permit from this semaphore, blocking until one is available,
  16. //or the thread is Thread#interrupt interrupted.
  17. public void acquire() throws InterruptedException {
  18. //执行AQS的模版方法acquireSharedInterruptibly()
  19. sync.acquireSharedInterruptibly(1);
  20. }
  21. //Releases a permit, returning it to the semaphore.
  22. public void release() {
  23. //执行AQS的模版方法releaseShared()
  24. sync.releaseShared(1);
  25. }
  26. //Synchronization implementation for semaphore.
  27. //Uses AQS state to represent permits. Subclassed into fair and nonfair versions.
  28. abstract static class Sync extends AbstractQueuedSynchronizer {
  29. Sync(int permits) {
  30. //设置state的值为传入的令牌数
  31. setState(permits);
  32. }
  33. final int getPermits() {
  34. return getState();
  35. }
  36. final int nonfairTryAcquireShared(int acquires) {
  37. for (;;) {
  38. int available = getState();
  39. int remaining = available - acquires;
  40. if (remaining < 0 || compareAndSetState(available, remaining)) {
  41. return remaining;
  42. }
  43. }
  44. }
  45. protected final boolean tryReleaseShared(int releases) {
  46. for (;;) {
  47. int current = getState();
  48. int next = current + releases;
  49. if (next < current) {
  50. throw new Error("Maximum permit count exceeded");
  51. }
  52. if (compareAndSetState(current, next)) {
  53. return true;
  54. }
  55. }
  56. }
  57. ...
  58. }
  59. ...
  60. }
  61. public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
  62. ...
  63. //Acquires in shared mode, aborting if interrupted.
  64. //Implemented by first checking interrupt status, then invoking at least once #tryAcquireShared, returning on success.
  65. //Otherwise the thread is queued, possibly repeatedly blocking and unblocking,
  66. //invoking #tryAcquireShared until success or the thread is interrupted.
  67. public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
  68. if (Thread.interrupted()) {
  69. throw new InterruptedException();
  70. }
  71. //执行Semaphore的内部类Sync的子类实现的tryAcquireShared()方法,抢占共享锁
  72. if (tryAcquireShared(arg) < 0) {
  73. //执行AQS的doAcquireSharedInterruptibly()方法
  74. doAcquireSharedInterruptibly(arg);
  75. }
  76. }
  77. //Releases in shared mode.
  78. //Implemented by unblocking one or more threads if #tryReleaseShared returns true.
  79. public final boolean releaseShared(int arg) {
  80. //执行Semaphore的内部类Sync实现的tryReleaseShared()方法,释放共享锁
  81. if (tryReleaseShared(arg)) {
  82. //执行AQS的doReleaseShared()方法
  83. doReleaseShared();
  84. return true;
  85. }
  86. return false;
  87. }
  88. ...
  89. }

 

6.Semaphore的令牌获取过程

(1)Semaphore的令牌获取过程

(2)Semaphore的公平策略

(3)Semaphore的非公平策略

(4)tryAcquireShared()后的处理

 

(1)Semaphore的令牌获取过程

在调用Semaphore的acquire()方法获取令牌时:首先会执行AQS的模版方法acquireSharedInterruptibly(),然后执行Sync子类实现的tryAcquireShared()方法来抢占锁。如果抢占锁失败,则执行AQS的doAcquireSharedInterruptibly()方法。该方法会将当前线程封装成Node结点并加入等待队列,然后挂起线程。

 

(2)Semaphore的公平策略

在执行Sync子类FairSync的tryAcquireShared()方法尝试获取令牌时,先通过AQS的hasQueuedPredecessors()判断是否已有线程在等待队列中。如果已经有线程在等待队列中,那么当前线程获取令牌就必然失败。否则,就递减state的值 + 判断state是否小于0 + CAS设置state的值。

 

(3)Semaphore的非公平策略

在执行Sync子类NonfairSync的tryAcquireShared()方法尝试获取令牌时,则会直接执行Sync的nonfairTryAcquireShared()方法来获取令牌,也就是递减state的值 + 判断state是否小于0 + CAS设置state的值。

 

(4)tryAcquireShared()后的处理

不管公平策略还是非公平策略,对应的tryAcquireShared()方法都是通过自旋来抢占令牌(CAS设置state),直到令牌数不够时才会让tryAcquireShared()方法返回小于0的数值。然后触发执行AQS的doAcquireSharedInterruptibly()方法,该方法会将当前线程封装成Node结点并加入等待队列,然后挂起线程。

  1. public class Semaphore implements java.io.Serializable {
  2. private final Sync sync;
  3. //Creates a Semaphore with the given number of permits and nonfair fairness setting.
  4. public Semaphore(int permits) {
  5. sync = new NonfairSync(permits);
  6. }
  7. static final class NonfairSync extends Sync {
  8. NonfairSync(int permits) {
  9. super(permits);
  10. }
  11. //以非公平锁的方式获取令牌
  12. protected int tryAcquireShared(int acquires) {
  13. //执行Sync的nonfairTryAcquireShared()方法
  14. return nonfairTryAcquireShared(acquires);
  15. }
  16. }
  17. static final class FairSync extends Sync {
  18. FairSync(int permits) {
  19. super(permits);
  20. }
  21. //以公平锁的方式获取令牌
  22. protected int tryAcquireShared(int acquires) {
  23. for (;;) {
  24. //如果已经有线程在等待队列中,那么就说明获取令牌必然失败
  25. if (hasQueuedPredecessors()) {
  26. return -1;
  27. }
  28. int available = getState();
  29. int remaining = available - acquires;
  30. if (remaining < 0 || compareAndSetState(available, remaining)) {
  31. return remaining;
  32. }
  33. }
  34. }
  35. }
  36. //Acquires a permit from this semaphore, blocking until one is available,
  37. //or the thread is Thread#interrupt interrupted.
  38. public void acquire() throws InterruptedException {
  39. //执行AQS的模版方法acquireSharedInterruptibly()
  40. sync.acquireSharedInterruptibly(1);
  41. }
  42. //Synchronization implementation for semaphore.
  43. //Uses AQS state to represent permits. Subclassed into fair and nonfair versions.
  44. abstract static class Sync extends AbstractQueuedSynchronizer {
  45. Sync(int permits) {
  46. //设置state的值为传入的令牌数
  47. setState(permits);
  48. }
  49. final int getPermits() {
  50. return getState();
  51. }
  52. final int nonfairTryAcquireShared(int acquires) {
  53. for (;;) {
  54. int available = getState();
  55. int remaining = available - acquires;
  56. if (remaining < 0 || compareAndSetState(available, remaining)) {
  57. return remaining;
  58. }
  59. }
  60. }
  61. ...
  62. }
  63. ...
  64. }
  65. public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
  66. ...
  67. //Acquires in shared mode, aborting if interrupted.
  68. //Implemented by first checking interrupt status, then invoking at least once #tryAcquireShared, returning on success.
  69. //Otherwise the thread is queued, possibly repeatedly blocking and unblocking,
  70. //invoking #tryAcquireShared until success or the thread is interrupted.
  71. public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
  72. if (Thread.interrupted()) {
  73. throw new InterruptedException();
  74. }
  75. //执行Semaphore的内部类Sync的子类实现的tryAcquireShared()方法,抢占共享锁
  76. if (tryAcquireShared(arg) < 0) {
  77. //执行AQS的doAcquireSharedInterruptibly()方法
  78. doAcquireSharedInterruptibly(arg);
  79. }
  80. }
  81. //Queries whether any threads have been waiting to acquire longer than the current thread.
  82. public final boolean hasQueuedPredecessors() {
  83. Node t = tail; // Read fields in reverse initialization order
  84. Node h = head;
  85. Node s;
  86. return h != t && ((s = h.next) == null || s.thread != Thread.currentThread());
  87. }
  88. //Acquires in shared interruptible mode.
  89. private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
  90. final Node node = addWaiter(Node.SHARED);//封装当前线程为Shared类型的Node结点
  91. boolean failed = true;
  92. try {
  93. //第一次循环r = -1,所以会执行AQS的shouldParkAfterFailedAcquire()方法
  94. //将node结点的有效前驱结点的状态设置为SIGNAL
  95. for (;;) {
  96. final Node p = node.predecessor();//node结点的前驱结点
  97. if (p == head) {
  98. int r = tryAcquireShared(arg);
  99. if (r >= 0) {
  100. setHeadAndPropagate(node, r);
  101. p.next = null; // help GC
  102. failed = false;
  103. return;
  104. }
  105. }
  106. //执行shouldParkAfterFailedAcquire()方法设置node结点的前驱结点的状态为SIGNAL
  107. //执行parkAndCheckInterrupt()方法挂起当前线程
  108. if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {
  109. throw new InterruptedException();
  110. }
  111. }
  112. } finally {
  113. if (failed) {
  114. cancelAcquire(node);
  115. }
  116. }
  117. }
  118. ...
  119. }

 

7.Semaphore的令牌释放过程

(1)Semaphore的令牌释放过程

(2)Semaphore的令牌释放本质

 

(1)Semaphore的令牌释放过程

在调用Semaphore的release()方法去释放令牌时:首先会执行AQS的模版方法releaseShared(),然后执行Sync实现的tryReleaseShared()方法来释放锁(累加state值)。如果释放锁成功,则执行AQS的doReleaseShared()方法去唤醒线程。

 

(2)Semaphore的令牌释放本质

Semaphore的release()方法释放令牌的本质就是对state字段进行累加,然后唤醒等待队列头结点的后继结点 + 唤醒传递来唤醒等待的线程。

 

注意:并非一定要执行acquire()方法的线程才能调用release()方法,任意一个线程都可以调用release()方法,也可以通过reducePermits()方法来减少令牌数。

  1. public class Semaphore implements java.io.Serializable {
  2. private final Sync sync;
  3. //Creates a Semaphore with the given number of permits and nonfair fairness setting.
  4. public Semaphore(int permits) {
  5. sync = new NonfairSync(permits);
  6. }
  7. //Releases a permit, returning it to the semaphore.
  8. public void release() {
  9. //执行AQS的模版方法releaseShared()
  10. sync.releaseShared(1);
  11. }
  12. //Synchronization implementation for semaphore.
  13. //Uses AQS state to represent permits. Subclassed into fair and nonfair versions.
  14. abstract static class Sync extends AbstractQueuedSynchronizer {
  15. Sync(int permits) {
  16. //设置state的值为传入的令牌数
  17. setState(permits);
  18. }
  19. //尝试释放锁,也就是对state值进行累加
  20. protected final boolean tryReleaseShared(int releases) {
  21. for (;;) {
  22. int current = getState();
  23. int next = current + releases;
  24. if (next < current) {
  25. throw new Error("Maximum permit count exceeded");
  26. }
  27. if (compareAndSetState(current, next)) {
  28. return true;
  29. }
  30. }
  31. }
  32. ...
  33. }
  34. ...
  35. }
  36. public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
  37. ...
  38. //Releases in shared mode.
  39. //Implemented by unblocking one or more threads if #tryReleaseShared returns true.
  40. public final boolean releaseShared(int arg) {
  41. //执行Semaphore的内部类Sync实现的tryReleaseShared()方法,释放共享锁
  42. if (tryReleaseShared(arg)) {
  43. //执行AQS的doReleaseShared()方法,唤醒等待队列中的线程
  44. doReleaseShared();
  45. return true;
  46. }
  47. return false;
  48. }
  49. //Release action for shared mode -- signals successor and ensures propagation.
  50. //Note: For exclusive mode, release just amounts to calling unparkSuccessor of head if it needs signal.
  51. private void doReleaseShared() {
  52. for (;;) {
  53. //每次循环时头结点都会发生变化
  54. //因为调用unparkSuccessor()方法会唤醒doAcquireSharedInterruptibly()方法中阻塞的线程
  55. //然后阻塞的线程会在执行setHeadAndPropagate()方法时通过setHead()修改头结点
  56. Node h = head;//获取最新的头结点
  57. if (h != null && h != tail) {//等待队列中存在挂起线程的结点
  58. int ws = h.waitStatus;
  59. if (ws == Node.SIGNAL) {//头结点的状态正常,表示对应的线程可以被唤醒
  60. if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) {
  61. continue;//loop to recheck cases
  62. }
  63. //唤醒头结点的后继结点
  64. //唤醒的线程会在doAcquireSharedInterruptibly()方法中执行setHeadAndPropagate()方法修改头结点
  65. unparkSuccessor(h);
  66. } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) {
  67. //如果ws = 0表示初始状态,则修改结点为PROPAGATE状态
  68. continue;//loop on failed CAS
  69. }
  70. }
  71. if (h == head) {//判断头结点是否有变化
  72. break;//loop if head changed
  73. }
  74. }
  75. }
  76. //Wakes up node's successor, if one exists.
  77. private void unparkSuccessor(Node node) {
  78. int ws = node.waitStatus;
  79. if (ws < 0) {
  80. compareAndSetWaitStatus(node, ws, 0);
  81. }
  82. Node s = node.next;
  83. if (s == null || s.waitStatus > 0) {
  84. s = null;
  85. for (Node t = tail; t != null && t != node; t = t.prev) {
  86. if (t.waitStatus <= 0) {
  87. s = t;
  88. }
  89. }
  90. }
  91. if (s != null) {
  92. LockSupport.unpark(s.thread);
  93. }
  94. }
  95. ...
  96. }

 

8.同步屏障CyclicBarrier介绍

(1)CyclicBarrier的作用

(2)CyclicBarrier的基本原理

 

(1)CyclicBarrier的作用

CyclicBarrier的字面意思就是可循环使用的屏障。CyclicBarrier的主要作用就是让一组线程到达一个屏障时被阻塞,直到最后一个线程到达屏障时屏障才会打开,接着才让所有被屏障拦截的线程一起继续往下执行。线程进入屏障是通过CyclicBarrier的await()方法来实现的。

 

(2)CyclicBarrier的基本原理

假设有3个线程在运行中都会调用CyclicBarrier的await()方法,而每个线程从开始执行到执行await()方法所用时间可能不一样,最终当执行时间最长的线程到达屏障时,会唤醒其他较早到达屏障的线程继续往下执行。

 

CyclicBarrier包含两个层面的意思:

一是Barrier屏障点,线程调用await()方法都会阻塞在屏障点,直到所有线程都到达屏障点后再放行。

二是Cyclic循环,当所有线程通过当前屏障点后,又可以进入下一轮的屏障点进行等待,可以不断循环。

 

9.CyclicBarrier的await()方法源码

(1)CyclicBarrier的成员变量

(2)CyclicBarrier的await()方法源码

(3)CountDownLatch和CyclicBarrier对比

 

(1)CyclicBarrier的成员变量

  1. //A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point.
  2. //CyclicBarriers are useful in programs involving a fixed sized party of threads that must occasionally wait for each other.
  3. //The barrier is called cyclic because it can be re-used after the waiting threads are released.
  4. public class CyclicBarrier {
  5. ...
  6. private static class Generation {
  7. boolean broken = false;
  8. }
  9. private final ReentrantLock lock = new ReentrantLock();
  10. private final Condition trip = lock.newCondition();//用于线程之间相互唤醒
  11. private final int parties;//参与的线程数量
  12. private int count;//初始值是parties,每调用一次await()就减1
  13. private final Runnable barrierCommand;//回调任务
  14. private Generation generation = new Generation();
  15. ...
  16. }

CyclicBarrier是基于ReentrantLock + Condition来实现的。

 

parties表示每次要求到达屏障点的线程数,只有到达屏障点的线程数满足指定的parties数量,所有线程才会被唤醒。

 

count是一个初始值为parties的计数器,每个线程调用await()方法会对count减1,当count为0时会唤醒所有线程,并且结束当前的屏障周期generation,然后所有线程进入下一个屏障周期,而且count会恢复成parties。

 

(2)CyclicBarrier的await()方法源码

线程调用CyclicBarrier的await()方法时,会触发调用CyclicBarrier的dowait()方法。

 

CyclicBarrier的dowait()方法会对count计数器进行递减。如果count递减到0,则会调用CyclicBarrier的nextGeneration()唤醒所有线程,同时如果异步回调任务barrierCommand不为空,则会执行该任务。如果count还没递减到0,则调用Condition的await()方法阻塞当前线程。

 

被阻塞的线程,除了会被CyclicBarrier的nextGeneration()方法唤醒外,还会被Thread的interrupt()方法唤醒、被中断异常唤醒,而这些唤醒会调用CyclicBarrier的breakBarrier()方法。

 

在CyclicBarrier的nextGeneration()方法和CyclicBarrier的breakBarrier()方法中,都会通过Condition的signalAll()方法唤醒所有被阻塞等待的线程。

  1. //A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point.
  2. //CyclicBarriers are useful in programs involving a fixed sized party of threads that must occasionally wait for each other.
  3. //The barrier is called cyclic because it can be re-used after the waiting threads are released.
  4. public class CyclicBarrier {
  5. ...
  6. private static class Generation {
  7. boolean broken = false;//用来标记屏障是否被中断
  8. }
  9. private final ReentrantLock lock = new ReentrantLock();
  10. private final Condition trip = lock.newCondition();//用于线程之间相互唤醒
  11. private final int parties;//参与的线程数量
  12. private int count;//初始值是parties,每调用一次await()就减1
  13. private final Runnable barrierCommand;//回调任务
  14. private Generation generation = new Generation();
  15. public CyclicBarrier(int parties, Runnable barrierAction) {
  16. if (parties <= 0) throw new IllegalArgumentException();
  17. this.parties = parties;
  18. this.count = parties;
  19. this.barrierCommand = barrierAction;
  20. }
  21. public CyclicBarrier(int parties) {
  22. this(parties, null);
  23. }
  24. //Waits until all #getParties have invoked await on this barrier.
  25. public int await() throws InterruptedException, BrokenBarrierException {
  26. try {
  27. //执行CyclicBarrier的dowait()方法
  28. return dowait(false, 0L);
  29. } catch (TimeoutException toe) {
  30. throw new Error(toe);
  31. }
  32. }
  33. //Main barrier code, covering the various policies.
  34. private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {
  35. final ReentrantLock lock = this.lock;
  36. lock.lock();//使用Condition需要先获取锁
  37. try {
  38. //获取当前的generation
  39. final Generation g = generation;
  40. //确认当前generation的barrier是否有效,如果generation的broken为true,则抛出屏障中断异常
  41. if (g.broken) {
  42. throw new BrokenBarrierException();
  43. }
  44. if (Thread.interrupted()) {
  45. breakBarrier();
  46. throw new InterruptedException();
  47. }
  48. //统计已经到达当前generation的线程数量
  49. int index = --count;
  50. //如果index为0,则表示所有线程都到达了屏障点
  51. if (index == 0) {
  52. boolean ranAction = false;
  53. try {
  54. final Runnable command = barrierCommand;
  55. if (command != null) {
  56. //触发回调
  57. command.run();
  58. }
  59. ranAction = true;
  60. //执行nextGeneration()方法唤醒所有线程,同时进入下一个屏障周期
  61. nextGeneration();
  62. return 0;
  63. } finally {
  64. if (!ranAction) {
  65. breakBarrier();
  66. }
  67. }
  68. }
  69. //loop until tripped, broken, interrupted, or timed out
  70. //如果index > 0,则阻塞当前线程
  71. for (;;) {
  72. try {
  73. if (!timed) {
  74. //通过Condition的await()方法,在阻塞当前线程的同时释放锁
  75. //这样其他线程就能获取到锁执行上面的index = --count
  76. trip.await();
  77. } else if (nanos > 0L) {
  78. nanos = trip.awaitNanos(nanos);
  79. }
  80. } catch (InterruptedException ie) {
  81. if (g == generation && ! g.broken) {
  82. breakBarrier();
  83. throw ie;
  84. } else {
  85. Thread.currentThread().interrupt();
  86. }
  87. }
  88. if (g.broken) {
  89. throw new BrokenBarrierException();
  90. }
  91. if (g != generation) {
  92. return index;
  93. }
  94. if (timed && nanos <= 0L) {
  95. //中断屏障,设置generation.broken为true
  96. breakBarrier();
  97. throw new TimeoutException();
  98. }
  99. }
  100. } finally {
  101. lock.unlock();
  102. }
  103. }
  104. //Updates state on barrier trip and wakes up everyone.
  105. //Called only while holding lock.
  106. private void nextGeneration() {
  107. //通过Condition的signalAll()唤醒所有等待的线程
  108. trip.signalAll();
  109. //还原count
  110. count = parties;
  111. //进入新的generation
  112. generation = new Generation();
  113. }
  114. //Sets current barrier generation as broken and wakes up everyone.
  115. //Called only while holding lock.
  116. private void breakBarrier() {
  117. generation.broken = true;
  118. count = parties;
  119. //通过Condition的signalAll()唤醒所有等待的线程
  120. trip.signalAll();
  121. }
  122. ...
  123. }

(3)CountDownLatch和CyclicBarrier对比

一.CyclicBarrier可以被重用、可以响应中断

二.CountDownLatch的计数器只能使用一次,但可以通过reset()方法重置

 

10.使用CountDownLatch等待注册的完成

Hadoop HDFS(分布式存储系统)的NameNode分为主备两个节点,各个DataNode在启动时都会向两个NameNode进行注册,此时就可以使用CountDownLatch等待向主备节点注册的完成。

  1. //DataNode启动类
  2. public class DataNode {
  3. //是否还在运行
  4. private volatile Boolean shouldRun;
  5. //负责和一组NameNode(主NameNode + 备NameNode)通信的组件
  6. private NameNodeGroupOfferService offerService;
  7. //初始化DataNode
  8. private void initialize() {
  9. this.shouldRun = true;
  10. this.offerService = new NameNodeGroupOfferService();
  11. this.offerService.start();
  12. }
  13. //运行DataNode
  14. private void run() {
  15. try {
  16. while(shouldRun) {
  17. Thread.sleep(10000);
  18. }
  19. } catch (Exception e) {
  20. e.printStackTrace();
  21. }
  22. }
  23. public static void main(String[] args) {
  24. DataNode datanode = new DataNode();
  25. datanode.initialize();
  26. datanode.run();
  27. }
  28. }
  29. //负责某个NameNode进行通信的线程组件
  30. public class NameNodeServiceActor {
  31. //向某个NameNode进行注册
  32. public void register(CountDownLatch latch) {
  33. Thread registerThread = new RegisterThread(latch);
  34. registerThread.start();
  35. }
  36. //负责注册的线程,传入一个CountDownLatch
  37. class RegisterThread extends Thread {
  38. CountDownLatch latch;
  39. public RegisterThread(CountDownLatch latch) {
  40. this.latch = latch;
  41. }
  42. @Override
  43. public void run() {
  44. try {
  45. //发送rpc接口调用请求到NameNode去进行注册
  46. System.out.println("发送请求到NameNode进行注册...");
  47. Thread.sleep(1000);
  48. latch.countDown();
  49. } catch (Exception e) {
  50. e.printStackTrace();
  51. }
  52. }
  53. }
  54. }
  55. //负责跟一组NameNode(主NameNode + 备NameNode)进行通信的线程组件
  56. public class NameNodeGroupOfferService {
  57. //负责跟NameNode主节点通信的ServiceActor组件
  58. private NameNodeServiceActor activeServiceActor;
  59. //负责跟NameNode备节点通信的ServiceActor组件
  60. private NameNodeServiceActor standbyServiceActor;
  61. //构造函数
  62. public NameNodeGroupOfferService() {
  63. this.activeServiceActor = new NameNodeServiceActor();
  64. this.standbyServiceActor = new NameNodeServiceActor();
  65. }
  66. //启动OfferService组件
  67. public void start() {
  68. //直接使用两个ServiceActor组件分别向主备两个NameNode节点进行注册
  69. register();
  70. }
  71. //向主备两个NameNode节点进行注册
  72. private void register() {
  73. try {
  74. CountDownLatch latch = new CountDownLatch(2);
  75. this.activeServiceActor.register(latch);
  76. this.standbyServiceActor.register(latch);
  77. latch.await();//阻塞等待主备都完成注册
  78. System.out.println("主备NameNode全部注册完毕...");
  79. } catch (Exception e) {
  80. e.printStackTrace();
  81. }
  82. }
  83. }

 

11.使用CyclicBarrier将工作任务多线程分而治之

  1. //输出结果:
  2. //线程1执行自己的一部分工作...
  3. //线程2执行自己的一部分工作...
  4. //线程3执行自己的一部分工作...
  5. //所有线程都完成自己的任务,可以合并结果了...
  6. //最终结果合并完成,线程3可以退出...
  7. //最终结果合并完成,线程1可以退出...
  8. //最终结果合并完成,线程2可以退出...
  9. public class CyclicBarrierDemo {
  10. public static void main(String[] args) {
  11. final CyclicBarrier barrier = new CyclicBarrier(3, new Runnable() {
  12. public void run() {
  13. System.out.println("所有线程都完成自己的任务,可以合并结果了...");
  14. }
  15. });
  16. new Thread() {
  17. public void run() {
  18. try {
  19. System.out.println("线程1执行自己的一部分工作...");
  20. barrier.await();
  21. System.out.println("最终结果合并完成,线程1可以退出...");
  22. } catch (Exception e) {
  23. e.printStackTrace();
  24. }
  25. }
  26. }.start();
  27. new Thread() {
  28. public void run() {
  29. try {
  30. System.out.println("线程2执行自己的一部分工作...");
  31. barrier.await();
  32. System.out.println("最终结果合并完成,线程2可以退出...");
  33. } catch (Exception e) {
  34. e.printStackTrace();
  35. }
  36. }
  37. }.start();
  38. new Thread() {
  39. public void run() {
  40. try {
  41. System.out.println("线程3执行自己的一部分工作...");
  42. barrier.await();
  43. System.out.println("最终结果合并完成,线程3可以退出...");
  44. } catch (Exception e) {
  45. e.printStackTrace();
  46. }
  47. }
  48. }.start();
  49. }
  50. }

 

12.使用CyclicBarrier聚合服务接口的返回结果

当然也可以使用CountDownLatch来实现聚合服务接口的返回结果;

  1. public class ApiServiceDemo {
  2. public Map<String, Object> queryOrders() throws Exception {
  3. final List<Object> results = new ArrayList<Object>();
  4. final Map<String, Object> map = new ConcurrentHashMap<String, Object>();
  5. CyclicBarrier barrier = new CyclicBarrier(3, new Runnable() {
  6. @Override
  7. public void run() {
  8. map.put("price", results.get(0));
  9. map.put("order", results.get(1));
  10. map.put("stats", results.get(2));
  11. }
  12. });
  13. //请求价格接口
  14. new Thread() {
  15. public void run() {
  16. try {
  17. System.out.println("请求价格服务...");
  18. Thread.sleep(1000);
  19. results.add(new Object());
  20. barrier.await();
  21. } catch (Exception e) {
  22. e.printStackTrace();
  23. }
  24. };
  25. }.start();
  26. //请求订单接口
  27. new Thread() {
  28. public void run() {
  29. try {
  30. System.out.println("请求订单服务...");
  31. Thread.sleep(1000);
  32. results.add(new Object());
  33. barrier.await();
  34. } catch (Exception e) {
  35. e.printStackTrace();
  36. }
  37. };
  38. }.start();
  39. //请求统计接口
  40. new Thread() {
  41. public void run() {
  42. try {
  43. System.out.println("请求订单统计服务...");
  44. Thread.sleep(1000);
  45. results.add(new Object());
  46. barrier.await();
  47. } catch (Exception e) {
  48. e.printStackTrace();
  49. }
  50. };
  51. }.start();
  52. while(map.size() < 3) {
  53. Thread.sleep(100);
  54. }
  55. return map;
  56. }
  57. }

 

13.使用Semaphore等待指定数量线程完成任务

可以通过Semaphore实现等待指定数量的线程完成任务才往下执行。

  1. //输出结果如下:
  2. //线程2执行一个计算任务
  3. //等待1个线程完成任务即可...
  4. //线程1执行一个计算任务
  5. public class SemaphoreDemo {
  6. public static void main(String[] args) throws Exception {
  7. final Semaphore semaphore = new Semaphore(0);
  8. new Thread() {
  9. public void run() {
  10. try {
  11. Thread.sleep(2000);
  12. System.out.println("线程1执行一个计算任务");
  13. semaphore.release();
  14. } catch (Exception e) {
  15. e.printStackTrace();
  16. }
  17. }
  18. }.start();
  19. new Thread() {
  20. public void run() {
  21. try {
  22. Thread.sleep(1000);
  23. System.out.println("线程2执行一个计算任务");
  24. semaphore.release();
  25. } catch (Exception e) {
  26. e.printStackTrace();
  27. }
  28. }
  29. }.start();
  30. semaphore.acquire(1);
  31. System.out.println("等待1个线程完成任务即可...");
  32. }
  33. }

 

原文链接:https://www.cnblogs.com/mjunz/p/18725070

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号