/** * Removes and transfers nodes until hit non-cancelled one or * null. Split out from signal in part to encourage compilers * to inline the case of no waiters. * @param first (non-null) the first node on condition queue */ privatevoiddoSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); }
/** * Removes and transfers all nodes. * @param first (non-null) the first node on condition queue */ privatevoiddoSignalAll(Node first) { lastWaiter = firstWaiter = null; do { Nodenext= first.nextWaiter; first.nextWaiter = null; transferForSignal(first); first = next; } while (first != null); }
/** * Unlinks cancelled waiter nodes from condition queue. * Called only while holding lock. This is called when * cancellation occurred during condition wait, and upon * insertion of a new waiter when lastWaiter is seen to have * been cancelled. This method is needed to avoid garbage * retention in the absence of signals. So even though it may * require a full traversal, it comes into play only when * timeouts or cancellations occur in the absence of * signals. It traverses all nodes rather than stopping at a * particular target to unlink all pointers to garbage nodes * without requiring many re-traversals during cancellation * storms. */ privatevoidunlinkCancelledWaiters() { Nodet= firstWaiter; Nodetrail=null; while (t != null) { Nodenext= t.nextWaiter; if (t.waitStatus != Node.CONDITION) { t.nextWaiter = null; if (trail == null) firstWaiter = next; else trail.nextWaiter = next; if (next == null) lastWaiter = trail; } else trail = t; t = next; } }
// public methods
/** * Moves the longest-waiting thread, if one exists, from the * wait queue for this condition to the wait queue for the * owning lock. * * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */ publicfinalvoidsignal() { if (!isHeldExclusively()) thrownewIllegalMonitorStateException(); Nodefirst= firstWaiter; if (first != null) doSignal(first); }
/** * Moves all threads from the wait queue for this condition to * the wait queue for the owning lock. * * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */ publicfinalvoidsignalAll() { if (!isHeldExclusively()) thrownewIllegalMonitorStateException(); Nodefirst= firstWaiter; if (first != null) doSignalAll(first); }
/** * Implements uninterruptible condition wait. * <ol> * <li> Save lock state returned by {@link #getState}. * <li> Invoke {@link #release} with saved state as argument, * throwing IllegalMonitorStateException if it fails. * <li> Block until signalled. * <li> Reacquire by invoking specialized version of * {@link #acquire} with saved state as argument. * </ol> */ publicfinalvoidawaitUninterruptibly() { Nodenode= addConditionWaiter(); intsavedState= fullyRelease(node); booleaninterrupted=false; while (!isOnSyncQueue(node)) { LockSupport.park(this); if (Thread.interrupted()) interrupted = true; } if (acquireQueued(node, savedState) || interrupted) selfInterrupt(); }
/* * For interruptible waits, we need to track whether to throw * InterruptedException, if interrupted while blocked on * condition, versus reinterrupt current thread, if * interrupted while blocked waiting to re-acquire. */
/** Mode meaning to reinterrupt on exit from wait */ privatestaticfinalintREINTERRUPT=1; /** Mode meaning to throw InterruptedException on exit from wait */ privatestaticfinalintTHROW_IE= -1;
/** * Checks for interrupt, returning THROW_IE if interrupted * before signalled, REINTERRUPT if after signalled, or * 0 if not interrupted. */ privateintcheckInterruptWhileWaiting(Node node) { return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0; }
/** * Throws InterruptedException, reinterrupts current thread, or * does nothing, depending on mode. */ privatevoidreportInterruptAfterWait(int interruptMode) throws InterruptedException { if (interruptMode == THROW_IE) thrownewInterruptedException(); elseif (interruptMode == REINTERRUPT) selfInterrupt(); }
/** * Implements interruptible condition wait. * <ol> * <li> If current thread is interrupted, throw InterruptedException. * <li> Save lock state returned by {@link #getState}. * <li> Invoke {@link #release} with saved state as argument, * throwing IllegalMonitorStateException if it fails. * <li> Block until signalled or interrupted. * <li> Reacquire by invoking specialized version of * {@link #acquire} with saved state as argument. * <li> If interrupted while blocked in step 4, throw InterruptedException. * </ol> */ publicfinalvoidawait()throws InterruptedException { if (Thread.interrupted()) thrownewInterruptedException(); Nodenode= addConditionWaiter(); intsavedState= fullyRelease(node); intinterruptMode=0; while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
/** * Implements timed condition wait. * <ol> * <li> If current thread is interrupted, throw InterruptedException. * <li> Save lock state returned by {@link #getState}. * <li> Invoke {@link #release} with saved state as argument, * throwing IllegalMonitorStateException if it fails. * <li> Block until signalled, interrupted, or timed out. * <li> Reacquire by invoking specialized version of * {@link #acquire} with saved state as argument. * <li> If interrupted while blocked in step 4, throw InterruptedException. * </ol> */ publicfinallongawaitNanos(long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) thrownewInterruptedException(); Nodenode= addConditionWaiter(); intsavedState= fullyRelease(node); finallongdeadline= System.nanoTime() + nanosTimeout; intinterruptMode=0; while (!isOnSyncQueue(node)) { if (nanosTimeout <= 0L) { transferAfterCancelledWait(node); break; } if (nanosTimeout >= spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; nanosTimeout = deadline - System.nanoTime(); } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return deadline - System.nanoTime(); }
/** * Implements absolute timed condition wait. * <ol> * <li> If current thread is interrupted, throw InterruptedException. * <li> Save lock state returned by {@link #getState}. * <li> Invoke {@link #release} with saved state as argument, * throwing IllegalMonitorStateException if it fails. * <li> Block until signalled, interrupted, or timed out. * <li> Reacquire by invoking specialized version of * {@link #acquire} with saved state as argument. * <li> If interrupted while blocked in step 4, throw InterruptedException. * <li> If timed out while blocked in step 4, return false, else true. * </ol> */ publicfinalbooleanawaitUntil(Date deadline) throws InterruptedException { longabstime= deadline.getTime(); if (Thread.interrupted()) thrownewInterruptedException(); Nodenode= addConditionWaiter(); intsavedState= fullyRelease(node); booleantimedout=false; intinterruptMode=0; while (!isOnSyncQueue(node)) { if (System.currentTimeMillis() > abstime) { timedout = transferAfterCancelledWait(node); break; } LockSupport.parkUntil(this, abstime); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return !timedout; }
/** * Implements timed condition wait. * <ol> * <li> If current thread is interrupted, throw InterruptedException. * <li> Save lock state returned by {@link #getState}. * <li> Invoke {@link #release} with saved state as argument, * throwing IllegalMonitorStateException if it fails. * <li> Block until signalled, interrupted, or timed out. * <li> Reacquire by invoking specialized version of * {@link #acquire} with saved state as argument. * <li> If interrupted while blocked in step 4, throw InterruptedException. * <li> If timed out while blocked in step 4, return false, else true. * </ol> */ publicfinalbooleanawait(long time, TimeUnit unit) throws InterruptedException { longnanosTimeout= unit.toNanos(time); if (Thread.interrupted()) thrownewInterruptedException(); Nodenode= addConditionWaiter(); intsavedState= fullyRelease(node); finallongdeadline= System.nanoTime() + nanosTimeout; booleantimedout=false; intinterruptMode=0; while (!isOnSyncQueue(node)) { if (nanosTimeout <= 0L) { timedout = transferAfterCancelledWait(node); break; } if (nanosTimeout >= spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; nanosTimeout = deadline - System.nanoTime(); } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return !timedout; }
// support for instrumentation
/** * Returns true if this condition was created by the given * synchronization object. * * @return {@code true} if owned */ finalbooleanisOwnedBy(AbstractQueuedSynchronizer sync) { returnsync== AbstractQueuedSynchronizer.this; }
/** * Queries whether any threads are waiting on this condition. * Implements {@link AbstractQueuedSynchronizer#hasWaiters(ConditionObject)}. * * @return {@code true} if there are any waiting threads * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */ protectedfinalbooleanhasWaiters() { if (!isHeldExclusively()) thrownewIllegalMonitorStateException(); for (Nodew= firstWaiter; w != null; w = w.nextWaiter) { if (w.waitStatus == Node.CONDITION) returntrue; } returnfalse; }
/** * Returns an estimate of the number of threads waiting on * this condition. * Implements {@link AbstractQueuedSynchronizer#getWaitQueueLength(ConditionObject)}. * * @return the estimated number of waiting threads * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */ protectedfinalintgetWaitQueueLength() { if (!isHeldExclusively()) thrownewIllegalMonitorStateException(); intn=0; for (Nodew= firstWaiter; w != null; w = w.nextWaiter) { if (w.waitStatus == Node.CONDITION) ++n; } return n; }
/** * Returns a collection containing those threads that may be * waiting on this Condition. * Implements {@link AbstractQueuedSynchronizer#getWaitingThreads(ConditionObject)}. * * @return the collection of threads * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */ protectedfinal Collection<Thread> getWaitingThreads() { if (!isHeldExclusively()) thrownewIllegalMonitorStateException(); ArrayList<Thread> list = newArrayList<Thread>(); for (Nodew= firstWaiter; w != null; w = w.nextWaiter) { if (w.waitStatus == Node.CONDITION) { Threadt= w.thread; if (t != null) list.add(t); } } return list; } }
privatevoidunparkSuccessor(Node node) { //判断节点的状态是否为非CANCELLED状态 intws= node.waitStatus; if (ws < 0) //如果是非CANCELLED状态,将状态设置为0 compareAndSetWaitStatus(node, ws, 0); //定义s为node的后继节点 Nodes= node.next; //判断s是否为空节点或者是否为CANCELLED状态 if (s == null || s.waitStatus > 0) { s = null; //从尾节点往前找到最前面那个为非CANCELLED状态的线程 for (Nodet= tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } //如果该节点不为空,就unpark当前节点 if (s != null) LockSupport.unpark(s.thread); }