CountDownLatch
简介
A synchronization aid that allows one or more threads to wait until a
set of operations being performed in other threads completes.
只有当N个线程执行完毕,并且进行countDown
操作时,才允许await
的线程继续执行。否则该线程挂起。
构造方法
参数count
为计数值,传入AQS
的实现类Sync
设置成AQS的state
。
public CountDownLatch (int count) { if (count < 0 ) throw new IllegalArgumentException ("count < 0" ); this .sync = new Sync (count); }
Sync
通过继承AQS
从而完成同步的核心功能。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L ; Sync(int count) { setState(count); } int getCount () { return getState(); } protected int tryAcquireShared (int acquires) { return (getState() == 0 ) ? 1 : -1 ; } protected boolean tryReleaseShared (int releases) { for (;;) { int c = getState(); if (c == 0 ) return false ; int nextc = c-1 ; if (compareAndSetState(c, nextc)) return nextc = = 0 ; } } }
核心方法
countDown :将count值减1
await :调用await 的线程会被挂起,直到count
为0才继续执行,允许中断
1 2 3 4 5 6 7 8 9 10 11 12 public void await () throws InterruptedException { sync.acquireSharedInterruptibly(1 ); }public void countDown () { sync.releaseShared(1 ); }public boolean await (long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1 , unit.toNanos(timeout)); }
使用案例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 class Driver { void main () throws InterruptedException { CountDownLatch startSignal = new CountDownLatch (1 ); CountDownLatch doneSignal = new CountDownLatch (N); for (int i = 0 ; i < N; ++i) new Thread (new Worker (startSignal, doneSignal)).start(); doSomethingElse(); startSignal.countDown(); doSomethingElse(); doneSignal.await(); } } class Worker implements Runnable { private final CountDownLatch startSignal; private final CountDownLatch doneSignal; Worker(CountDownLatch startSignal, CountDownLatch doneSignal) { this .startSignal = startSignal; this .doneSignal = doneSignal; } public void run () { try { startSignal.await(); doWork(); doneSignal.countDown(); } catch (InterruptedException ex) {} } void doWork () { ... } }
CyclicBarrier
简介
A synchronization aid that allows a set of threads to all wait for
each other to reach a common barrier point. CyclicBarriers are useful in
programs involving a fixed sized party of threads that must occasionally
wait for each other. The barrier is called cyclic because it
can be re-used after the waiting threads are released.
一组线程到达barrier时会被阻塞,直到最后一个线程到达屏障,被阻塞的线程才会继续执行。
构造方法
参数含义如下:
parties :拦截的线程数量
barrierAction :所有线程到达barrier
后执行的任务
1 2 3 4 5 6 public CyclicBarrier (int parties, Runnable barrierAction) { if (parties <= 0 ) throw new IllegalArgumentException (); this .parties = parties; this .count = parties; this .barrierCommand = barrierAction; }
成员属性
lock :可重入锁,用于进行dowait
时锁定
parties :参与的线程数量
trip :实际进行await()
的condition
barrierCommand :最后一个线程到达时执行的任务
count :等待进入屏障的线程数量
generation :当前的generation
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 private static class Generation { boolean broken = false ; } private final ReentrantLock lock = new ReentrantLock (); private final Condition trip = lock.newCondition(); private final int parties; private final Runnable barrierCommand; private Generation generation = new Generation (); private int count;
核心方法
await
可响应中断,通过调用dowait(false, 0L)
实现
1 2 3 4 5 6 7 public int await () throws InterruptedException, BrokenBarrierException { try { return dowait(false , 0L ); } catch (TimeoutException toe) { throw new Error (toe); } }
dowait
await
的具体实现。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 private int dowait (boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this .lock; lock.lock(); try { final Generation g = generation; if (g.broken) throw new BrokenBarrierException (); if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException (); } int index = --count; if (index == 0 ) { boolean ranAction = false ; try { final Runnable command = barrierCommand; if (command != null ) command.run(); ranAction = true ; nextGeneration(); return 0 ; } finally { if (!ranAction) breakBarrier(); } } for (;;) { try { if (!timed) trip.await(); else if (nanos > 0L ) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException (); if (g != generation) return index; if (timed && nanos <= 0L ) { breakBarrier(); throw new TimeoutException (); } } } finally { lock.unlock(); } }
nextGeneration
线程进入屏障后会进行调用。
1 2 3 4 5 6 7 8 9 private void nextGeneration () { trip.signalAll(); count = parties; generation = new Generation (); }
breakBarrier
损坏当前屏障,会唤醒所有在屏障中的线程。
1 2 3 4 5 6 7 8 private void breakBarrier () { generation.broken = true ; count = parties; trip.signalAll(); }
使用案例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 class Solver { final int N; final float [][] data; final CyclicBarrier barrier; class Worker implements Runnable { int myRow; Worker(int row) { myRow = row; } public void run () { while (!done()) { processRow(myRow); try { barrier.await(); } catch (InterruptedException ex) { return ; } catch (BrokenBarrierException ex) { return ; } } } } public Solver (float [][] matrix) { data = matrix; N = matrix.length; barrier = new CyclicBarrier (N, new Runnable () { public void run () { mergeRows(...); } }); for (int i = 0 ; i < N; ++i) new Thread (new Worker (i)).start(); waitUntilDone(); } }
Semaphore
简介
A counting semaphore. Conceptually, a semaphore maintains a set of
permits. Each acquire()
blocks if necessary until a permit is available, and then takes it. Each
release()
adds a permit, potentially releasing a blocking acquirer. However, no
actual permit objects are used; the Semaphore
just keeps a
count of the number available and acts accordingly.
线程执行acquire()
后,会判断permit
是否可用,不可用则阻塞,可用则减去permit
。线程执行release()
后,会增加一个permit
,并且释放一个阻塞线程。
Semaphores are often used to restrict the number of threads than can
access some (physical or logical) resource. For example, here is a class
that uses a semaphore to control access to a pool of items:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 class Pool { private static final int MAX_AVAILABLE = 100 ; private final Semaphore available = new Semaphore (MAX_AVAILABLE, true ); public Object getItem () throws InterruptedException { available.acquire(); return getNextAvailableItem(); } public void putItem (Object x) { if (markAsUnused(x)) available.release(); } protected Object[] items = ... whatever kinds of items being managed protected boolean [] used = new boolean [MAX_AVAILABLE]; protected synchronized Object getNextAvailableItem () { for (int i = 0 ; i < MAX_AVAILABLE; ++i) { if (!used[i]) { used[i] = true ; return items[i]; } } return null ; } protected synchronized boolean markAsUnused (Object item) { for (int i = 0 ; i < MAX_AVAILABLE; ++i) { if (item == items[i]) { if (used[i]) { used[i] = false ; return true ; } else return false ; } } return false ; } }
构造方法
两个构造方法:默认创建非公平策略的信号量,另一个构造方法可以选择公平策略的信号量。
1 2 3 4 5 6 7 public Semaphore (int permits) { sync = new NonfairSync (permits); }public Semaphore (int permits, boolean fair) { sync = fair ? new FairSync (permits) : new NonfairSync (permits); }
成员属性
Semaphore
主要通过sync
(AQS的实现类)来实现核心功能。
1 private final Sync sync;
Sync
Sync
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 1192457210091910933L ; Sync(int permits) { setState(permits); } final int getPermits () { return getState(); } final int nonfairTryAcquireShared (int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } protected final boolean tryReleaseShared (int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) throw new Error ("Maximum permit count exceeded" ); if (compareAndSetState(current, next)) return true ; } } final void reducePermits (int reductions) { for (;;) { int current = getState(); int next = current - reductions; if (next > current) throw new Error ("Permit count underflow" ); if (compareAndSetState(current, next)) return ; } } final int drainPermits () { for (;;) { int current = getState(); if (current == 0 || compareAndSetState(current, 0 )) return current; } } }
NonfairSync
非公平策略直接调用tryAcquireShared
完成获取资源的操作。
1 2 3 4 5 6 7 8 9 10 11 static final class NonfairSync extends Sync { private static final long serialVersionUID = -2694183684443567898L ; NonfairSync(int permits) { super (permits); } protected int tryAcquireShared (int acquires) { return nonfairTryAcquireShared(acquires); } }
FairSync
公平策略中,获取共享状态时,会判断Sync Queue
中是否有前驱元素。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 static final class FairSync extends Sync { private static final long serialVersionUID = 2014338818796000944L ; FairSync(int permits) { super (permits); } protected int tryAcquireShared (int acquires) { for (;;) { if (hasQueuedPredecessors()) return -1 ; int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } }
核心方法
acquire
获取一个permit
,在permit
有效之前,将会阻塞,响应中断。
1 2 3 public void acquire () throws InterruptedException { sync.acquireSharedInterruptibly(1 ); }
acquireUninterruptibly
不接受中断的acquire()
.
1 2 3 public void acquireUninterruptibly () { sync.acquireShared(1 ); }
release
释放一个permits
。通过AQS.releaseShared()
。
1 2 3 4 public void release (int permits) { if (permits < 0 ) throw new IllegalArgumentException (); sync.releaseShared(permits); }
参考
Java SE 8 Docs API