* Returns an iterator over the elements in this queue in proper sequence.
* The elements will be returned in order from first (head) to last (tail).
*
- * The returned {@code Iterator} is a "weakly consistent" iterator that
+ *
The returned iterator is a "weakly consistent" iterator that
* will never throw {@link java.util.ConcurrentModificationException
- * ConcurrentModificationException},
- * and guarantees to traverse elements as they existed upon
- * construction of the iterator, and may (but is not guaranteed to)
- * reflect any modifications subsequent to construction.
+ * ConcurrentModificationException}, and guarantees to traverse
+ * elements as they existed upon construction of the iterator, and
+ * may (but is not guaranteed to) reflect any modifications
+ * subsequent to construction.
*
* @return an iterator over the elements in this queue in proper sequence
*/
@@ -724,88 +771,634 @@ public class ArrayBlockingQueue extends AbstractQueue
}
/**
- * Iterator for ArrayBlockingQueue. To maintain weak consistency
- * with respect to puts and takes, we (1) read ahead one slot, so
- * as to not report hasNext true but then not have an element to
- * return -- however we later recheck this slot to use the most
- * current value; (2) ensure that each array slot is traversed at
- * most once (by tracking "remaining" elements); (3) skip over
- * null slots, which can occur if takes race ahead of iterators.
- * However, for circular array-based queues, we cannot rely on any
- * well established definition of what it means to be weakly
- * consistent with respect to interior removes since these may
- * require slot overwrites in the process of sliding elements to
- * cover gaps. So we settle for resiliency, operating on
- * established apparent nexts, which may miss some elements that
- * have moved between calls to next.
+ * Shared data between iterators and their queue, allowing queue
+ * modifications to update iterators when elements are removed.
+ *
+ * This adds a lot of complexity for the sake of correctly
+ * handling some uncommon operations, but the combination of
+ * circular-arrays and supporting interior removes (i.e., those
+ * not at head) would cause iterators to sometimes lose their
+ * places and/or (re)report elements they shouldn't. To avoid
+ * this, when a queue has one or more iterators, it keeps iterator
+ * state consistent by:
+ *
+ * (1) keeping track of the number of "cycles", that is, the
+ * number of times takeIndex has wrapped around to 0.
+ * (2) notifying all iterators via the callback removedAt whenever
+ * an interior element is removed (and thus other elements may
+ * be shifted).
+ *
+ * These suffice to eliminate iterator inconsistencies, but
+ * unfortunately add the secondary responsibility of maintaining
+ * the list of iterators. We track all active iterators in a
+ * simple linked list (accessed only when the queue's lock is
+ * held) of weak references to Itr. The list is cleaned up using
+ * 3 different mechanisms:
+ *
+ * (1) Whenever a new iterator is created, do some O(1) checking for
+ * stale list elements.
+ *
+ * (2) Whenever takeIndex wraps around to 0, check for iterators
+ * that have been unused for more than one wrap-around cycle.
+ *
+ * (3) Whenever the queue becomes empty, all iterators are notified
+ * and this entire data structure is discarded.
+ *
+ * So in addition to the removedAt callback that is necessary for
+ * correctness, iterators have the shutdown and takeIndexWrapped
+ * callbacks that help remove stale iterators from the list.
+ *
+ * Whenever a list element is examined, it is expunged if either
+ * the GC has determined that the iterator is discarded, or if the
+ * iterator reports that it is "detached" (does not need any
+ * further state updates). Overhead is maximal when takeIndex
+ * never advances, iterators are discarded before they are
+ * exhausted, and all removals are interior removes, in which case
+ * all stale iterators are discovered by the GC. But even in this
+ * case we don't increase the amortized complexity.
+ *
+ * Care must be taken to keep list sweeping methods from
+ * reentrantly invoking another such method, causing subtle
+ * corruption bugs.
*/
- private class Itr implements Iterator {
- private int remaining; // Number of elements yet to be returned
- private int nextIndex; // Index of element to be returned by next
- private E nextItem; // Element to be returned by next call to next
- private E lastItem; // Element returned by last call to next
- private int lastRet; // Index of last element returned, or -1 if none
+ class Itrs {
- Itr() {
- final ReentrantLock lock = ArrayBlockingQueue.this.lock;
- lock.lock();
- try {
- lastRet = -1;
- if ((remaining = count) > 0)
- nextItem = itemAt(nextIndex = takeIndex);
- } finally {
- lock.unlock();
+ /**
+ * Node in a linked list of weak iterator references.
+ */
+ private class Node extends WeakReference {
+ Node next;
+
+ Node(Itr iterator, Node next) {
+ super(iterator);
+ this.next = next;
}
}
- public boolean hasNext() {
- return remaining > 0;
+ /** Incremented whenever takeIndex wraps around to 0 */
+ int cycles = 0;
+
+ /** Linked list of weak iterator references */
+ private Node head;
+
+ /** Used to expunge stale iterators */
+ private Node sweeper = null;
+
+ private static final int SHORT_SWEEP_PROBES = 4;
+ private static final int LONG_SWEEP_PROBES = 16;
+
+ Itrs(Itr initial) {
+ register(initial);
}
- public E next() {
- final ReentrantLock lock = ArrayBlockingQueue.this.lock;
- lock.lock();
- try {
- if (remaining <= 0)
- throw new NoSuchElementException();
- lastRet = nextIndex;
- E x = itemAt(nextIndex); // check for fresher value
- if (x == null) {
- x = nextItem; // we are forced to report old value
- lastItem = null; // but ensure remove fails
+ /**
+ * Sweeps itrs, looking for and expunging stale iterators.
+ * If at least one was found, tries harder to find more.
+ * Called only from iterating thread.
+ *
+ * @param tryHarder whether to start in try-harder mode, because
+ * there is known to be at least one iterator to collect
+ */
+ void doSomeSweeping(boolean tryHarder) {
+ // assert lock.getHoldCount() == 1;
+ // assert head != null;
+ int probes = tryHarder ? LONG_SWEEP_PROBES : SHORT_SWEEP_PROBES;
+ Node o, p;
+ final Node sweeper = this.sweeper;
+ boolean passedGo; // to limit search to one full sweep
+
+ if (sweeper == null) {
+ o = null;
+ p = head;
+ passedGo = true;
+ } else {
+ o = sweeper;
+ p = o.next;
+ passedGo = false;
+ }
+
+ for (; probes > 0; probes--) {
+ if (p == null) {
+ if (passedGo)
+ break;
+ o = null;
+ p = head;
+ passedGo = true;
}
- else
- lastItem = x;
- while (--remaining > 0 && // skip over nulls
- (nextItem = itemAt(nextIndex = inc(nextIndex))) == null)
- ;
- return x;
- } finally {
- lock.unlock();
+ final Itr it = p.get();
+ final Node next = p.next;
+ if (it == null || it.isDetached()) {
+ // found a discarded/exhausted iterator
+ probes = LONG_SWEEP_PROBES; // "try harder"
+ // unlink p
+ p.clear();
+ p.next = null;
+ if (o == null) {
+ head = next;
+ if (next == null) {
+ // We've run out of iterators to track; retire
+ itrs = null;
+ return;
+ }
+ }
+ else
+ o.next = next;
+ } else {
+ o = p;
+ }
+ p = next;
}
+
+ this.sweeper = (p == null) ? null : o;
}
- public void remove() {
- final ReentrantLock lock = ArrayBlockingQueue.this.lock;
- lock.lock();
- try {
- int i = lastRet;
- if (i == -1)
- throw new IllegalStateException();
- lastRet = -1;
- E x = lastItem;
- lastItem = null;
- // only remove if item still at index
- if (x != null && x == items[i]) {
- boolean removingHead = (i == takeIndex);
- removeAt(i);
- if (!removingHead)
- nextIndex = dec(nextIndex);
+ /**
+ * Adds a new iterator to the linked list of tracked iterators.
+ */
+ void register(Itr itr) {
+ // assert lock.getHoldCount() == 1;
+ head = new Node(itr, head);
+ }
+
+ /**
+ * Called whenever takeIndex wraps around to 0.
+ *
+ * Notifies all iterators, and expunges any that are now stale.
+ */
+ void takeIndexWrapped() {
+ // assert lock.getHoldCount() == 1;
+ cycles++;
+ for (Node o = null, p = head; p != null;) {
+ final Itr it = p.get();
+ final Node next = p.next;
+ if (it == null || it.takeIndexWrapped()) {
+ // unlink p
+ // assert it == null || it.isDetached();
+ p.clear();
+ p.next = null;
+ if (o == null)
+ head = next;
+ else
+ o.next = next;
+ } else {
+ o = p;
}
- } finally {
- lock.unlock();
+ p = next;
}
+ if (head == null) // no more iterators to track
+ itrs = null;
+ }
+
+ /**
+ * Called whenever an interior remove (not at takeIndex) occured.
+ *
+ * Notifies all iterators, and expunges any that are now stale.
+ */
+ void removedAt(int removedIndex) {
+ for (Node o = null, p = head; p != null;) {
+ final Itr it = p.get();
+ final Node next = p.next;
+ if (it == null || it.removedAt(removedIndex)) {
+ // unlink p
+ // assert it == null || it.isDetached();
+ p.clear();
+ p.next = null;
+ if (o == null)
+ head = next;
+ else
+ o.next = next;
+ } else {
+ o = p;
+ }
+ p = next;
+ }
+ if (head == null) // no more iterators to track
+ itrs = null;
+ }
+
+ /**
+ * Called whenever the queue becomes empty.
+ *
+ * Notifies all active iterators that the queue is empty,
+ * clears all weak refs, and unlinks the itrs datastructure.
+ */
+ void queueIsEmpty() {
+ // assert lock.getHoldCount() == 1;
+ for (Node p = head; p != null; p = p.next) {
+ Itr it = p.get();
+ if (it != null) {
+ p.clear();
+ it.shutdown();
+ }
+ }
+ head = null;
+ itrs = null;
+ }
+
+ /**
+ * Called whenever an element has been dequeued (at takeIndex).
+ */
+ void elementDequeued() {
+ // assert lock.getHoldCount() == 1;
+ if (count == 0)
+ queueIsEmpty();
+ else if (takeIndex == 0)
+ takeIndexWrapped();
}
}
+ /**
+ * Iterator for ArrayBlockingQueue.
+ *
+ * To maintain weak consistency with respect to puts and takes, we
+ * read ahead one slot, so as to not report hasNext true but then
+ * not have an element to return.
+ *
+ * We switch into "detached" mode (allowing prompt unlinking from
+ * itrs without help from the GC) when all indices are negative, or
+ * when hasNext returns false for the first time. This allows the
+ * iterator to track concurrent updates completely accurately,
+ * except for the corner case of the user calling Iterator.remove()
+ * after hasNext() returned false. Even in this case, we ensure
+ * that we don't remove the wrong element by keeping track of the
+ * expected element to remove, in lastItem. Yes, we may fail to
+ * remove lastItem from the queue if it moved due to an interleaved
+ * interior remove while in detached mode.
+ */
+ private class Itr implements Iterator {
+ /** Index to look for new nextItem; NONE at end */
+ private int cursor;
+
+ /** Element to be returned by next call to next(); null if none */
+ private E nextItem;
+
+ /** Index of nextItem; NONE if none, REMOVED if removed elsewhere */
+ private int nextIndex;
+
+ /** Last element returned; null if none or not detached. */
+ private E lastItem;
+
+ /** Index of lastItem, NONE if none, REMOVED if removed elsewhere */
+ private int lastRet;
+
+ /** Previous value of takeIndex, or DETACHED when detached */
+ private int prevTakeIndex;
+
+ /** Previous value of iters.cycles */
+ private int prevCycles;
+
+ /** Special index value indicating "not available" or "undefined" */
+ private static final int NONE = -1;
+
+ /**
+ * Special index value indicating "removed elsewhere", that is,
+ * removed by some operation other than a call to this.remove().
+ */
+ private static final int REMOVED = -2;
+
+ /** Special value for prevTakeIndex indicating "detached mode" */
+ private static final int DETACHED = -3;
+
+ Itr() {
+ // assert lock.getHoldCount() == 0;
+ lastRet = NONE;
+ final ReentrantLock lock = ArrayBlockingQueue.this.lock;
+ lock.lock();
+ try {
+ if (count == 0) {
+ // assert itrs == null;
+ cursor = NONE;
+ nextIndex = NONE;
+ prevTakeIndex = DETACHED;
+ } else {
+ final int takeIndex = ArrayBlockingQueue.this.takeIndex;
+ prevTakeIndex = takeIndex;
+ nextItem = itemAt(nextIndex = takeIndex);
+ cursor = incCursor(takeIndex);
+ if (itrs == null) {
+ itrs = new Itrs(this);
+ } else {
+ itrs.register(this); // in this order
+ itrs.doSomeSweeping(false);
+ }
+ prevCycles = itrs.cycles;
+ // assert takeIndex >= 0;
+ // assert prevTakeIndex == takeIndex;
+ // assert nextIndex >= 0;
+ // assert nextItem != null;
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ boolean isDetached() {
+ // assert lock.getHoldCount() == 1;
+ return prevTakeIndex < 0;
+ }
+
+ private int incCursor(int index) {
+ // assert lock.getHoldCount() == 1;
+ if (++index == items.length)
+ index = 0;
+ if (index == putIndex)
+ index = NONE;
+ return index;
+ }
+
+ /**
+ * Returns true if index is invalidated by the given number of
+ * dequeues, starting from prevTakeIndex.
+ */
+ private boolean invalidated(int index, int prevTakeIndex,
+ long dequeues, int length) {
+ if (index < 0)
+ return false;
+ int distance = index - prevTakeIndex;
+ if (distance < 0)
+ distance += length;
+ return dequeues > distance;
+ }
+
+ /**
+ * Adjusts indices to incorporate all dequeues since the last
+ * operation on this iterator. Call only from iterating thread.
+ */
+ private void incorporateDequeues() {
+ // assert lock.getHoldCount() == 1;
+ // assert itrs != null;
+ // assert !isDetached();
+ // assert count > 0;
+
+ final int cycles = itrs.cycles;
+ final int takeIndex = ArrayBlockingQueue.this.takeIndex;
+ final int prevCycles = this.prevCycles;
+ final int prevTakeIndex = this.prevTakeIndex;
+
+ if (cycles != prevCycles || takeIndex != prevTakeIndex) {
+ final int len = items.length;
+ // how far takeIndex has advanced since the previous
+ // operation of this iterator
+ long dequeues = (cycles - prevCycles) * len
+ + (takeIndex - prevTakeIndex);
+
+ // Check indices for invalidation
+ if (invalidated(lastRet, prevTakeIndex, dequeues, len))
+ lastRet = REMOVED;
+ if (invalidated(nextIndex, prevTakeIndex, dequeues, len))
+ nextIndex = REMOVED;
+ if (invalidated(cursor, prevTakeIndex, dequeues, len))
+ cursor = takeIndex;
+
+ if (cursor < 0 && nextIndex < 0 && lastRet < 0)
+ detach();
+ else {
+ this.prevCycles = cycles;
+ this.prevTakeIndex = takeIndex;
+ }
+ }
+ }
+
+ /**
+ * Called when itrs should stop tracking this iterator, either
+ * because there are no more indices to update (cursor < 0 &&
+ * nextIndex < 0 && lastRet < 0) or as a special exception, when
+ * lastRet >= 0, because hasNext() is about to return false for the
+ * first time. Call only from iterating thread.
+ */
+ private void detach() {
+ // Switch to detached mode
+ // assert lock.getHoldCount() == 1;
+ // assert cursor == NONE;
+ // assert nextIndex < 0;
+ // assert lastRet < 0 || nextItem == null;
+ // assert lastRet < 0 ^ lastItem != null;
+ if (prevTakeIndex >= 0) {
+ // assert itrs != null;
+ prevTakeIndex = DETACHED;
+ // try to unlink from itrs (but not too hard)
+ itrs.doSomeSweeping(true);
+ }
+ }
+
+ /**
+ * For performance reasons, we would like not to acquire a lock in
+ * hasNext in the common case. To allow for this, we only access
+ * fields (i.e. nextItem) that are not modified by update operations
+ * triggered by queue modifications.
+ */
+ public boolean hasNext() {
+ // assert lock.getHoldCount() == 0;
+ if (nextItem != null)
+ return true;
+ noNext();
+ return false;
+ }
+
+ private void noNext() {
+ final ReentrantLock lock = ArrayBlockingQueue.this.lock;
+ lock.lock();
+ try {
+ // assert cursor == NONE;
+ // assert nextIndex == NONE;
+ if (!isDetached()) {
+ // assert lastRet >= 0;
+ incorporateDequeues(); // might update lastRet
+ if (lastRet >= 0) {
+ lastItem = itemAt(lastRet);
+ // assert lastItem != null;
+ detach();
+ }
+ }
+ // assert isDetached();
+ // assert lastRet < 0 ^ lastItem != null;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public E next() {
+ // assert lock.getHoldCount() == 0;
+ final E x = nextItem;
+ if (x == null)
+ throw new NoSuchElementException();
+ final ReentrantLock lock = ArrayBlockingQueue.this.lock;
+ lock.lock();
+ try {
+ if (!isDetached())
+ incorporateDequeues();
+ // assert nextIndex != NONE;
+ // assert lastItem == null;
+ lastRet = nextIndex;
+ final int cursor = this.cursor;
+ if (cursor >= 0) {
+ nextItem = itemAt(nextIndex = cursor);
+ // assert nextItem != null;
+ this.cursor = incCursor(cursor);
+ } else {
+ nextIndex = NONE;
+ nextItem = null;
+ }
+ } finally {
+ lock.unlock();
+ }
+ return x;
+ }
+
+ public void remove() {
+ // assert lock.getHoldCount() == 0;
+ final ReentrantLock lock = ArrayBlockingQueue.this.lock;
+ lock.lock();
+ try {
+ if (!isDetached())
+ incorporateDequeues(); // might update lastRet or detach
+ final int lastRet = this.lastRet;
+ this.lastRet = NONE;
+ if (lastRet >= 0) {
+ if (!isDetached())
+ removeAt(lastRet);
+ else {
+ final E lastItem = this.lastItem;
+ // assert lastItem != null;
+ this.lastItem = null;
+ if (itemAt(lastRet) == lastItem)
+ removeAt(lastRet);
+ }
+ } else if (lastRet == NONE)
+ throw new IllegalStateException();
+ // else lastRet == REMOVED and the last returned element was
+ // previously asynchronously removed via an operation other
+ // than this.remove(), so nothing to do.
+
+ if (cursor < 0 && nextIndex < 0)
+ detach();
+ } finally {
+ lock.unlock();
+ // assert lastRet == NONE;
+ // assert lastItem == null;
+ }
+ }
+
+ /**
+ * Called to notify the iterator that the queue is empty, or that it
+ * has fallen hopelessly behind, so that it should abandon any
+ * further iteration, except possibly to return one more element
+ * from next(), as promised by returning true from hasNext().
+ */
+ void shutdown() {
+ // assert lock.getHoldCount() == 1;
+ cursor = NONE;
+ if (nextIndex >= 0)
+ nextIndex = REMOVED;
+ if (lastRet >= 0) {
+ lastRet = REMOVED;
+ lastItem = null;
+ }
+ prevTakeIndex = DETACHED;
+ // Don't set nextItem to null because we must continue to be
+ // able to return it on next().
+ //
+ // Caller will unlink from itrs when convenient.
+ }
+
+ private int distance(int index, int prevTakeIndex, int length) {
+ int distance = index - prevTakeIndex;
+ if (distance < 0)
+ distance += length;
+ return distance;
+ }
+
+ /**
+ * Called whenever an interior remove (not at takeIndex) occured.
+ *
+ * @return true if this iterator should be unlinked from itrs
+ */
+ boolean removedAt(int removedIndex) {
+ // assert lock.getHoldCount() == 1;
+ if (isDetached())
+ return true;
+
+ final int cycles = itrs.cycles;
+ final int takeIndex = ArrayBlockingQueue.this.takeIndex;
+ final int prevCycles = this.prevCycles;
+ final int prevTakeIndex = this.prevTakeIndex;
+ final int len = items.length;
+ int cycleDiff = cycles - prevCycles;
+ if (removedIndex < takeIndex)
+ cycleDiff++;
+ final int removedDistance =
+ (cycleDiff * len) + (removedIndex - prevTakeIndex);
+ // assert removedDistance >= 0;
+ int cursor = this.cursor;
+ if (cursor >= 0) {
+ int x = distance(cursor, prevTakeIndex, len);
+ if (x == removedDistance) {
+ if (cursor == putIndex)
+ this.cursor = cursor = NONE;
+ }
+ else if (x > removedDistance) {
+ // assert cursor != prevTakeIndex;
+ this.cursor = cursor = dec(cursor);
+ }
+ }
+ int lastRet = this.lastRet;
+ if (lastRet >= 0) {
+ int x = distance(lastRet, prevTakeIndex, len);
+ if (x == removedDistance)
+ this.lastRet = lastRet = REMOVED;
+ else if (x > removedDistance)
+ this.lastRet = lastRet = dec(lastRet);
+ }
+ int nextIndex = this.nextIndex;
+ if (nextIndex >= 0) {
+ int x = distance(nextIndex, prevTakeIndex, len);
+ if (x == removedDistance)
+ this.nextIndex = nextIndex = REMOVED;
+ else if (x > removedDistance)
+ this.nextIndex = nextIndex = dec(nextIndex);
+ }
+ else if (cursor < 0 && nextIndex < 0 && lastRet < 0) {
+ this.prevTakeIndex = DETACHED;
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Called whenever takeIndex wraps around to zero.
+ *
+ * @return true if this iterator should be unlinked from itrs
+ */
+ boolean takeIndexWrapped() {
+ // assert lock.getHoldCount() == 1;
+ if (isDetached())
+ return true;
+ if (itrs.cycles - prevCycles > 1) {
+ // All the elements that existed at the time of the last
+ // operation are gone, so abandon further iteration.
+ shutdown();
+ return true;
+ }
+ return false;
+ }
+
+// /** Uncomment for debugging. */
+// public String toString() {
+// return ("cursor=" + cursor + " " +
+// "nextIndex=" + nextIndex + " " +
+// "lastRet=" + lastRet + " " +
+// "nextItem=" + nextItem + " " +
+// "lastItem=" + lastItem + " " +
+// "prevCycles=" + prevCycles + " " +
+// "prevTakeIndex=" + prevTakeIndex + " " +
+// "size()=" + size() + " " +
+// "remainingCapacity()=" + remainingCapacity());
+// }
+ }
+
+ public Spliterator spliterator() {
+ return Spliterators.spliterator
+ (this, Spliterator.ORDERED | Spliterator.NONNULL |
+ Spliterator.CONCURRENT);
+ }
}
diff --git a/jdk/src/share/classes/java/util/concurrent/BlockingDeque.java b/jdk/src/share/classes/java/util/concurrent/BlockingDeque.java
index 7f37f7e66ea..d98586a95b5 100644
--- a/jdk/src/share/classes/java/util/concurrent/BlockingDeque.java
+++ b/jdk/src/share/classes/java/util/concurrent/BlockingDeque.java
@@ -41,17 +41,18 @@ import java.util.*;
* for the deque to become non-empty when retrieving an element, and wait for
* space to become available in the deque when storing an element.
*
- * BlockingDeque methods come in four forms, with different ways
+ *
{@code BlockingDeque} methods come in four forms, with different ways
* of handling operations that cannot be satisfied immediately, but may be
* satisfied at some point in the future:
* one throws an exception, the second returns a special value (either
- * null or false, depending on the operation), the third
+ * {@code null} or {@code false}, depending on the operation), the third
* blocks the current thread indefinitely until the operation can succeed,
* and the fourth blocks for only a given maximum time limit before giving
* up. These methods are summarized in the following table:
*
*
*
+ * Summary of BlockingDeque methods
*
* | First Element (Head) |
*
@@ -116,20 +117,21 @@ import java.util.*;
*
*
*
- * Like any {@link BlockingQueue}, a BlockingDeque is thread safe,
+ *
Like any {@link BlockingQueue}, a {@code BlockingDeque} is thread safe,
* does not permit null elements, and may (or may not) be
* capacity-constrained.
*
- *
A BlockingDeque implementation may be used directly as a FIFO
- * BlockingQueue. The methods inherited from the
- * BlockingQueue interface are precisely equivalent to
- * BlockingDeque methods as indicated in the following table:
+ *
A {@code BlockingDeque} implementation may be used directly as a FIFO
+ * {@code BlockingQueue}. The methods inherited from the
+ * {@code BlockingQueue} interface are precisely equivalent to
+ * {@code BlockingDeque} methods as indicated in the following table:
*
*
*
+ * Comparison of BlockingQueue and BlockingDeque methods
*
- * | BlockingQueue Method |
- * Equivalent BlockingDeque Method |
+ * {@code BlockingQueue} Method |
+ * Equivalent {@code BlockingDeque} Method |
*
*
* | Insert |
@@ -208,7 +210,7 @@ public interface BlockingDeque extends BlockingQueue, Deque {
/**
* Inserts the specified element at the front of this deque if it is
* possible to do so immediately without violating capacity restrictions,
- * throwing an IllegalStateException if no space is currently
+ * throwing an {@code IllegalStateException} if no space is currently
* available. When using a capacity-restricted deque, it is generally
* preferable to use {@link #offerFirst(Object) offerFirst}.
*
@@ -223,7 +225,7 @@ public interface BlockingDeque extends BlockingQueue, Deque {
/**
* Inserts the specified element at the end of this deque if it is
* possible to do so immediately without violating capacity restrictions,
- * throwing an IllegalStateException if no space is currently
+ * throwing an {@code IllegalStateException} if no space is currently
* available. When using a capacity-restricted deque, it is generally
* preferable to use {@link #offerLast(Object) offerLast}.
*
@@ -238,7 +240,7 @@ public interface BlockingDeque extends BlockingQueue, Deque {
/**
* Inserts the specified element at the front of this deque if it is
* possible to do so immediately without violating capacity restrictions,
- * returning true upon success and false if no space is
+ * returning {@code true} upon success and {@code false} if no space is
* currently available.
* When using a capacity-restricted deque, this method is generally
* preferable to the {@link #addFirst(Object) addFirst} method, which can
@@ -254,7 +256,7 @@ public interface BlockingDeque extends BlockingQueue, Deque {
/**
* Inserts the specified element at the end of this deque if it is
* possible to do so immediately without violating capacity restrictions,
- * returning true upon success and false if no space is
+ * returning {@code true} upon success and {@code false} if no space is
* currently available.
* When using a capacity-restricted deque, this method is generally
* preferable to the {@link #addLast(Object) addLast} method, which can
@@ -302,10 +304,10 @@ public interface BlockingDeque extends BlockingQueue, Deque {
*
* @param e the element to add
* @param timeout how long to wait before giving up, in units of
- * unit
- * @param unit a TimeUnit determining how to interpret the
- * timeout parameter
- * @return true if successful, or false if
+ * {@code unit}
+ * @param unit a {@code TimeUnit} determining how to interpret the
+ * {@code timeout} parameter
+ * @return {@code true} if successful, or {@code false} if
* the specified waiting time elapses before space is available
* @throws InterruptedException if interrupted while waiting
* @throws ClassCastException if the class of the specified element
@@ -324,10 +326,10 @@ public interface BlockingDeque extends BlockingQueue, Deque {
*
* @param e the element to add
* @param timeout how long to wait before giving up, in units of
- * unit
- * @param unit a TimeUnit determining how to interpret the
- * timeout parameter
- * @return true if successful, or false if
+ * {@code unit}
+ * @param unit a {@code TimeUnit} determining how to interpret the
+ * {@code timeout} parameter
+ * @return {@code true} if successful, or {@code false} if
* the specified waiting time elapses before space is available
* @throws InterruptedException if interrupted while waiting
* @throws ClassCastException if the class of the specified element
@@ -363,10 +365,10 @@ public interface BlockingDeque extends BlockingQueue, Deque {
* become available.
*
* @param timeout how long to wait before giving up, in units of
- * unit
- * @param unit a TimeUnit determining how to interpret the
- * timeout parameter
- * @return the head of this deque, or null if the specified
+ * {@code unit}
+ * @param unit a {@code TimeUnit} determining how to interpret the
+ * {@code timeout} parameter
+ * @return the head of this deque, or {@code null} if the specified
* waiting time elapses before an element is available
* @throws InterruptedException if interrupted while waiting
*/
@@ -379,10 +381,10 @@ public interface BlockingDeque extends BlockingQueue, Deque {
* become available.
*
* @param timeout how long to wait before giving up, in units of
- * unit
- * @param unit a TimeUnit determining how to interpret the
- * timeout parameter
- * @return the tail of this deque, or null if the specified
+ * {@code unit}
+ * @param unit a {@code TimeUnit} determining how to interpret the
+ * {@code timeout} parameter
+ * @return the tail of this deque, or {@code null} if the specified
* waiting time elapses before an element is available
* @throws InterruptedException if interrupted while waiting
*/
@@ -392,13 +394,13 @@ public interface BlockingDeque extends BlockingQueue, Deque {
/**
* Removes the first occurrence of the specified element from this deque.
* If the deque does not contain the element, it is unchanged.
- * More formally, removes the first element e such that
- * o.equals(e) (if such an element exists).
- * Returns true if this deque contained the specified element
+ * More formally, removes the first element {@code e} such that
+ * {@code o.equals(e)} (if such an element exists).
+ * Returns {@code true} if this deque contained the specified element
* (or equivalently, if this deque changed as a result of the call).
*
* @param o element to be removed from this deque, if present
- * @return true if an element was removed as a result of this call
+ * @return {@code true} if an element was removed as a result of this call
* @throws ClassCastException if the class of the specified element
* is incompatible with this deque
* (optional)
@@ -410,13 +412,13 @@ public interface BlockingDeque extends BlockingQueue, Deque {
/**
* Removes the last occurrence of the specified element from this deque.
* If the deque does not contain the element, it is unchanged.
- * More formally, removes the last element e such that
- * o.equals(e) (if such an element exists).
- * Returns true if this deque contained the specified element
+ * More formally, removes the last element {@code e} such that
+ * {@code o.equals(e)} (if such an element exists).
+ * Returns {@code true} if this deque contained the specified element
* (or equivalently, if this deque changed as a result of the call).
*
* @param o element to be removed from this deque, if present
- * @return true if an element was removed as a result of this call
+ * @return {@code true} if an element was removed as a result of this call
* @throws ClassCastException if the class of the specified element
* is incompatible with this deque
* (optional)
@@ -431,8 +433,8 @@ public interface BlockingDeque extends BlockingQueue, Deque {
* Inserts the specified element into the queue represented by this deque
* (in other words, at the tail of this deque) if it is possible to do so
* immediately without violating capacity restrictions, returning
- * true upon success and throwing an
- * IllegalStateException if no space is currently available.
+ * {@code true} upon success and throwing an
+ * {@code IllegalStateException} if no space is currently available.
* When using a capacity-restricted deque, it is generally preferable to
* use {@link #offer(Object) offer}.
*
@@ -452,7 +454,7 @@ public interface BlockingDeque extends BlockingQueue, Deque {
* Inserts the specified element into the queue represented by this deque
* (in other words, at the tail of this deque) if it is possible to do so
* immediately without violating capacity restrictions, returning
- * true upon success and false if no space is currently
+ * {@code true} upon success and {@code false} if no space is currently
* available. When using a capacity-restricted deque, this method is
* generally preferable to the {@link #add} method, which can fail to
* insert an element only by throwing an exception.
@@ -494,8 +496,8 @@ public interface BlockingDeque extends BlockingQueue, Deque {
* {@link #offerLast(Object,long,TimeUnit) offerLast}.
*
* @param e the element to add
- * @return true if the element was added to this deque, else
- * false
+ * @return {@code true} if the element was added to this deque, else
+ * {@code false}
* @throws InterruptedException {@inheritDoc}
* @throws ClassCastException if the class of the specified element
* prevents it from being added to this deque
@@ -522,11 +524,11 @@ public interface BlockingDeque extends BlockingQueue, Deque {
/**
* Retrieves and removes the head of the queue represented by this deque
* (in other words, the first element of this deque), or returns
- * null if this deque is empty.
+ * {@code null} if this deque is empty.
*
* This method is equivalent to {@link #pollFirst()}.
*
- * @return the head of this deque, or null if this deque is empty
+ * @return the head of this deque, or {@code null} if this deque is empty
*/
E poll();
@@ -550,7 +552,7 @@ public interface BlockingDeque extends BlockingQueue, Deque {
* This method is equivalent to
* {@link #pollFirst(long,TimeUnit) pollFirst}.
*
- * @return the head of this deque, or null if the
+ * @return the head of this deque, or {@code null} if the
* specified waiting time elapses before an element is available
* @throws InterruptedException if interrupted while waiting
*/
@@ -573,27 +575,27 @@ public interface BlockingDeque extends BlockingQueue, Deque {
/**
* Retrieves, but does not remove, the head of the queue represented by
* this deque (in other words, the first element of this deque), or
- * returns null if this deque is empty.
+ * returns {@code null} if this deque is empty.
*
* This method is equivalent to {@link #peekFirst() peekFirst}.
*
- * @return the head of this deque, or null if this deque is empty
+ * @return the head of this deque, or {@code null} if this deque is empty
*/
E peek();
/**
* Removes the first occurrence of the specified element from this deque.
* If the deque does not contain the element, it is unchanged.
- * More formally, removes the first element e such that
- * o.equals(e) (if such an element exists).
- * Returns true if this deque contained the specified element
+ * More formally, removes the first element {@code e} such that
+ * {@code o.equals(e)} (if such an element exists).
+ * Returns {@code true} if this deque contained the specified element
* (or equivalently, if this deque changed as a result of the call).
*
*
This method is equivalent to
* {@link #removeFirstOccurrence(Object) removeFirstOccurrence}.
*
* @param o element to be removed from this deque, if present
- * @return true if this deque changed as a result of the call
+ * @return {@code true} if this deque changed as a result of the call
* @throws ClassCastException if the class of the specified element
* is incompatible with this deque
* (optional)
@@ -603,12 +605,12 @@ public interface BlockingDeque extends BlockingQueue, Deque {
boolean remove(Object o);
/**
- * Returns true if this deque contains the specified element.
- * More formally, returns true if and only if this deque contains
- * at least one element e such that o.equals(e).
+ * Returns {@code true} if this deque contains the specified element.
+ * More formally, returns {@code true} if and only if this deque contains
+ * at least one element {@code e} such that {@code o.equals(e)}.
*
* @param o object to be checked for containment in this deque
- * @return true if this deque contains the specified element
+ * @return {@code true} if this deque contains the specified element
* @throws ClassCastException if the class of the specified element
* is incompatible with this deque
* (optional)
@@ -635,9 +637,10 @@ public interface BlockingDeque extends BlockingQueue, Deque {
// *** Stack methods ***
/**
- * Pushes an element onto the stack represented by this deque. In other
- * words, inserts the element at the front of this deque unless it would
- * violate capacity restrictions.
+ * Pushes an element onto the stack represented by this deque (in other
+ * words, at the head of this deque) if it is possible to do so
+ * immediately without violating capacity restrictions, throwing an
+ * {@code IllegalStateException} if no space is currently available.
*
* This method is equivalent to {@link #addFirst(Object) addFirst}.
*
diff --git a/jdk/src/share/classes/java/util/concurrent/BlockingQueue.java b/jdk/src/share/classes/java/util/concurrent/BlockingQueue.java
index bb5eb603324..8eb3ce2c984 100644
--- a/jdk/src/share/classes/java/util/concurrent/BlockingQueue.java
+++ b/jdk/src/share/classes/java/util/concurrent/BlockingQueue.java
@@ -44,17 +44,18 @@ import java.util.Queue;
* element, and wait for space to become available in the queue when
* storing an element.
*
- *
BlockingQueue methods come in four forms, with different ways
+ *
{@code BlockingQueue} methods come in four forms, with different ways
* of handling operations that cannot be satisfied immediately, but may be
* satisfied at some point in the future:
* one throws an exception, the second returns a special value (either
- * null or false, depending on the operation), the third
+ * {@code null} or {@code false}, depending on the operation), the third
* blocks the current thread indefinitely until the operation can succeed,
* and the fourth blocks for only a given maximum time limit before giving
* up. These methods are summarized in the following table:
*
*
*
+ * Summary of BlockingQueue methods
*
* |
* Throws exception |
@@ -85,37 +86,37 @@ import java.util.Queue;
*
*
*
- * A BlockingQueue does not accept null elements.
- * Implementations throw NullPointerException on attempts
- * to add, put or offer a null. A
- * null is used as a sentinel value to indicate failure of
- * poll operations.
+ *
A {@code BlockingQueue} does not accept {@code null} elements.
+ * Implementations throw {@code NullPointerException} on attempts
+ * to {@code add}, {@code put} or {@code offer} a {@code null}. A
+ * {@code null} is used as a sentinel value to indicate failure of
+ * {@code poll} operations.
*
- *
A BlockingQueue may be capacity bounded. At any given
- * time it may have a remainingCapacity beyond which no
- * additional elements can be put without blocking.
- * A BlockingQueue without any intrinsic capacity constraints always
- * reports a remaining capacity of Integer.MAX_VALUE.
+ *
A {@code BlockingQueue} may be capacity bounded. At any given
+ * time it may have a {@code remainingCapacity} beyond which no
+ * additional elements can be {@code put} without blocking.
+ * A {@code BlockingQueue} without any intrinsic capacity constraints always
+ * reports a remaining capacity of {@code Integer.MAX_VALUE}.
*
- *
BlockingQueue implementations are designed to be used
+ *
{@code BlockingQueue} implementations are designed to be used
* primarily for producer-consumer queues, but additionally support
* the {@link java.util.Collection} interface. So, for example, it is
* possible to remove an arbitrary element from a queue using
- * remove(x). However, such operations are in general
+ * {@code remove(x)}. However, such operations are in general
* not performed very efficiently, and are intended for only
* occasional use, such as when a queued message is cancelled.
*
- *
BlockingQueue implementations are thread-safe. All
+ *
{@code BlockingQueue} implementations are thread-safe. All
* queuing methods achieve their effects atomically using internal
* locks or other forms of concurrency control. However, the
- * bulk Collection operations addAll,
- * containsAll, retainAll and removeAll are
+ * bulk Collection operations {@code addAll},
+ * {@code containsAll}, {@code retainAll} and {@code removeAll} are
* not necessarily performed atomically unless specified
* otherwise in an implementation. So it is possible, for example, for
- * addAll(c) to fail (throwing an exception) after adding
- * only some of the elements in c.
+ * {@code addAll(c)} to fail (throwing an exception) after adding
+ * only some of the elements in {@code c}.
*
- *
A BlockingQueue does not intrinsically support
+ *
A {@code BlockingQueue} does not intrinsically support
* any kind of "close" or "shutdown" operation to
* indicate that no more items will be added. The needs and usage of
* such features tend to be implementation-dependent. For example, a
@@ -125,7 +126,7 @@ import java.util.Queue;
*
*
* Usage example, based on a typical producer-consumer scenario.
- * Note that a BlockingQueue can safely be used with multiple
+ * Note that a {@code BlockingQueue} can safely be used with multiple
* producers and multiple consumers.
*
{@code
* class Producer implements Runnable {
@@ -181,13 +182,13 @@ public interface BlockingQueue extends Queue {
/**
* Inserts the specified element into this queue if it is possible to do
* so immediately without violating capacity restrictions, returning
- * true upon success and throwing an
- * IllegalStateException if no space is currently available.
+ * {@code true} upon success and throwing an
+ * {@code IllegalStateException} if no space is currently available.
* When using a capacity-restricted queue, it is generally preferable to
* use {@link #offer(Object) offer}.
*
* @param e the element to add
- * @return true (as specified by {@link Collection#add})
+ * @return {@code true} (as specified by {@link Collection#add})
* @throws IllegalStateException if the element cannot be added at this
* time due to capacity restrictions
* @throws ClassCastException if the class of the specified element
@@ -201,14 +202,14 @@ public interface BlockingQueue extends Queue {
/**
* Inserts the specified element into this queue if it is possible to do
* so immediately without violating capacity restrictions, returning
- * true upon success and false if no space is currently
+ * {@code true} upon success and {@code false} if no space is currently
* available. When using a capacity-restricted queue, this method is
* generally preferable to {@link #add}, which can fail to insert an
* element only by throwing an exception.
*
* @param e the element to add
- * @return true if the element was added to this queue, else
- * false
+ * @return {@code true} if the element was added to this queue, else
+ * {@code false}
* @throws ClassCastException if the class of the specified element
* prevents it from being added to this queue
* @throws NullPointerException if the specified element is null
@@ -237,10 +238,10 @@ public interface BlockingQueue extends Queue {
*
* @param e the element to add
* @param timeout how long to wait before giving up, in units of
- * unit
- * @param unit a TimeUnit determining how to interpret the
- * timeout parameter
- * @return true if successful, or false if
+ * {@code unit}
+ * @param unit a {@code TimeUnit} determining how to interpret the
+ * {@code timeout} parameter
+ * @return {@code true} if successful, or {@code false} if
* the specified waiting time elapses before space is available
* @throws InterruptedException if interrupted while waiting
* @throws ClassCastException if the class of the specified element
@@ -266,10 +267,10 @@ public interface BlockingQueue extends Queue {
* specified wait time if necessary for an element to become available.
*
* @param timeout how long to wait before giving up, in units of
- * unit
- * @param unit a TimeUnit determining how to interpret the
- * timeout parameter
- * @return the head of this queue, or null if the
+ * {@code unit}
+ * @param unit a {@code TimeUnit} determining how to interpret the
+ * {@code timeout} parameter
+ * @return the head of this queue, or {@code null} if the
* specified waiting time elapses before an element is available
* @throws InterruptedException if interrupted while waiting
*/
@@ -279,11 +280,11 @@ public interface BlockingQueue extends Queue {
/**
* Returns the number of additional elements that this queue can ideally
* (in the absence of memory or resource constraints) accept without
- * blocking, or Integer.MAX_VALUE if there is no intrinsic
+ * blocking, or {@code Integer.MAX_VALUE} if there is no intrinsic
* limit.
*
* Note that you cannot always tell if an attempt to insert
- * an element will succeed by inspecting remainingCapacity
+ * an element will succeed by inspecting {@code remainingCapacity}
* because it may be the case that another thread is about to
* insert or remove an element.
*
@@ -293,14 +294,14 @@ public interface BlockingQueue extends Queue {
/**
* Removes a single instance of the specified element from this queue,
- * if it is present. More formally, removes an element e such
- * that o.equals(e), if this queue contains one or more such
+ * if it is present. More formally, removes an element {@code e} such
+ * that {@code o.equals(e)}, if this queue contains one or more such
* elements.
- * Returns true if this queue contained the specified element
+ * Returns {@code true} if this queue contained the specified element
* (or equivalently, if this queue changed as a result of the call).
*
* @param o element to be removed from this queue, if present
- * @return true if this queue changed as a result of the call
+ * @return {@code true} if this queue changed as a result of the call
* @throws ClassCastException if the class of the specified element
* is incompatible with this queue
* (optional)
@@ -310,12 +311,12 @@ public interface BlockingQueue extends Queue {
boolean remove(Object o);
/**
- * Returns true if this queue contains the specified element.
- * More formally, returns true if and only if this queue contains
- * at least one element e such that o.equals(e).
+ * Returns {@code true} if this queue contains the specified element.
+ * More formally, returns {@code true} if and only if this queue contains
+ * at least one element {@code e} such that {@code o.equals(e)}.
*
* @param o object to be checked for containment in this queue
- * @return true if this queue contains the specified element
+ * @return {@code true} if this queue contains the specified element
* @throws ClassCastException if the class of the specified element
* is incompatible with this queue
* (optional)
@@ -329,10 +330,10 @@ public interface BlockingQueue extends Queue {
* to the given collection. This operation may be more
* efficient than repeatedly polling this queue. A failure
* encountered while attempting to add elements to
- * collection c may result in elements being in neither,
+ * collection {@code c} may result in elements being in neither,
* either or both collections when the associated exception is
* thrown. Attempts to drain a queue to itself result in
- * IllegalArgumentException. Further, the behavior of
+ * {@code IllegalArgumentException}. Further, the behavior of
* this operation is undefined if the specified collection is
* modified while the operation is in progress.
*
@@ -353,10 +354,10 @@ public interface BlockingQueue extends Queue {
* Removes at most the given number of available elements from
* this queue and adds them to the given collection. A failure
* encountered while attempting to add elements to
- * collection c may result in elements being in neither,
+ * collection {@code c} may result in elements being in neither,
* either or both collections when the associated exception is
* thrown. Attempts to drain a queue to itself result in
- * IllegalArgumentException. Further, the behavior of
+ * {@code IllegalArgumentException}. Further, the behavior of
* this operation is undefined if the specified collection is
* modified while the operation is in progress.
*
diff --git a/jdk/src/share/classes/java/util/concurrent/ConcurrentLinkedDeque.java b/jdk/src/share/classes/java/util/concurrent/ConcurrentLinkedDeque.java
index 146934af475..6bb62f0f98c 100644
--- a/jdk/src/share/classes/java/util/concurrent/ConcurrentLinkedDeque.java
+++ b/jdk/src/share/classes/java/util/concurrent/ConcurrentLinkedDeque.java
@@ -42,6 +42,9 @@ import java.util.Deque;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Queue;
+import java.util.Spliterator;
+import java.util.Spliterators;
+import java.util.function.Consumer;
/**
* An unbounded concurrent {@linkplain Deque deque} based on linked nodes.
@@ -816,7 +819,7 @@ public class ConcurrentLinkedDeque
* Creates an array list and fills it with elements of this list.
* Used by toArray.
*
- * @return the arrayList
+ * @return the array list
*/
private ArrayList toArrayList() {
ArrayList list = new ArrayList();
@@ -1024,12 +1027,28 @@ public class ConcurrentLinkedDeque
}
public E poll() { return pollFirst(); }
- public E remove() { return removeFirst(); }
public E peek() { return peekFirst(); }
- public E element() { return getFirst(); }
- public void push(E e) { addFirst(e); }
+
+ /**
+ * @throws NoSuchElementException {@inheritDoc}
+ */
+ public E remove() { return removeFirst(); }
+
+ /**
+ * @throws NoSuchElementException {@inheritDoc}
+ */
public E pop() { return removeFirst(); }
+ /**
+ * @throws NoSuchElementException {@inheritDoc}
+ */
+ public E element() { return getFirst(); }
+
+ /**
+ * @throws NullPointerException {@inheritDoc}
+ */
+ public void push(E e) { addFirst(e); }
+
/**
* Removes the first element {@code e} such that
* {@code o.equals(e)}, if such an element exists in this deque.
@@ -1385,6 +1404,99 @@ public class ConcurrentLinkedDeque
Node nextNode(Node p) { return pred(p); }
}
+ /** A customized variant of Spliterators.IteratorSpliterator */
+ static final class CLDSpliterator implements Spliterator {
+ static final int MAX_BATCH = 1 << 25; // max batch array size;
+ final ConcurrentLinkedDeque queue;
+ Node current; // current node; null until initialized
+ int batch; // batch size for splits
+ boolean exhausted; // true when no more nodes
+ CLDSpliterator(ConcurrentLinkedDeque queue) {
+ this.queue = queue;
+ }
+
+ public Spliterator trySplit() {
+ Node p;
+ final ConcurrentLinkedDeque q = this.queue;
+ int b = batch;
+ int n = (b <= 0) ? 1 : (b >= MAX_BATCH) ? MAX_BATCH : b + 1;
+ if (!exhausted &&
+ ((p = current) != null || (p = q.first()) != null)) {
+ if (p.item == null && p == (p = p.next))
+ current = p = q.first();
+ if (p != null && p.next != null) {
+ Object[] a = new Object[n];
+ int i = 0;
+ do {
+ if ((a[i] = p.item) != null)
+ ++i;
+ if (p == (p = p.next))
+ p = q.first();
+ } while (p != null && i < n);
+ if ((current = p) == null)
+ exhausted = true;
+ if (i > 0) {
+ batch = i;
+ return Spliterators.spliterator
+ (a, 0, i, Spliterator.ORDERED | Spliterator.NONNULL |
+ Spliterator.CONCURRENT);
+ }
+ }
+ }
+ return null;
+ }
+
+ public void forEachRemaining(Consumer super E> action) {
+ Node p;
+ if (action == null) throw new NullPointerException();
+ final ConcurrentLinkedDeque q = this.queue;
+ if (!exhausted &&
+ ((p = current) != null || (p = q.first()) != null)) {
+ exhausted = true;
+ do {
+ E e = p.item;
+ if (p == (p = p.next))
+ p = q.first();
+ if (e != null)
+ action.accept(e);
+ } while (p != null);
+ }
+ }
+
+ public boolean tryAdvance(Consumer super E> action) {
+ Node p;
+ if (action == null) throw new NullPointerException();
+ final ConcurrentLinkedDeque q = this.queue;
+ if (!exhausted &&
+ ((p = current) != null || (p = q.first()) != null)) {
+ E e;
+ do {
+ e = p.item;
+ if (p == (p = p.next))
+ p = q.first();
+ } while (e == null && p != null);
+ if ((current = p) == null)
+ exhausted = true;
+ if (e != null) {
+ action.accept(e);
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public long estimateSize() { return Long.MAX_VALUE; }
+
+ public int characteristics() {
+ return Spliterator.ORDERED | Spliterator.NONNULL |
+ Spliterator.CONCURRENT;
+ }
+ }
+
+ public Spliterator spliterator() {
+ return new CLDSpliterator(this);
+ }
+
/**
* Saves this deque to a stream (that is, serializes it).
*
diff --git a/jdk/src/share/classes/java/util/concurrent/ConcurrentLinkedQueue.java b/jdk/src/share/classes/java/util/concurrent/ConcurrentLinkedQueue.java
index abf12c59760..0b562bc217d 100644
--- a/jdk/src/share/classes/java/util/concurrent/ConcurrentLinkedQueue.java
+++ b/jdk/src/share/classes/java/util/concurrent/ConcurrentLinkedQueue.java
@@ -41,6 +41,9 @@ import java.util.Collection;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Queue;
+import java.util.Spliterator;
+import java.util.Spliterators;
+import java.util.function.Consumer;
/**
* An unbounded thread-safe {@linkplain Queue queue} based on linked nodes.
@@ -56,7 +59,7 @@ import java.util.Queue;
* Like most other concurrent collection implementations, this class
* does not permit the use of {@code null} elements.
*
- * This implementation employs an efficient "wait-free"
+ *
This implementation employs an efficient non-blocking
* algorithm based on one described in Simple,
* Fast, and Practical Non-Blocking and Blocking Concurrent Queue
@@ -295,7 +298,7 @@ public class ConcurrentLinkedQueue extends AbstractQueue
}
/**
- * Try to CAS head to p. If successful, repoint old head to itself
+ * Tries to CAS head to p. If successful, repoint old head to itself
* as sentinel for succ(), below.
*/
final void updateHead(Node h, Node p) {
@@ -792,6 +795,96 @@ public class ConcurrentLinkedQueue extends AbstractQueue
tail = t;
}
+ /** A customized variant of Spliterators.IteratorSpliterator */
+ static final class CLQSpliterator implements Spliterator {
+ static final int MAX_BATCH = 1 << 25; // max batch array size;
+ final ConcurrentLinkedQueue queue;
+ Node current; // current node; null until initialized
+ int batch; // batch size for splits
+ boolean exhausted; // true when no more nodes
+ CLQSpliterator(ConcurrentLinkedQueue queue) {
+ this.queue = queue;
+ }
+
+ public Spliterator trySplit() {
+ Node p;
+ final ConcurrentLinkedQueue q = this.queue;
+ int b = batch;
+ int n = (b <= 0) ? 1 : (b >= MAX_BATCH) ? MAX_BATCH : b + 1;
+ if (!exhausted &&
+ ((p = current) != null || (p = q.first()) != null) &&
+ p.next != null) {
+ Object[] a = new Object[n];
+ int i = 0;
+ do {
+ if ((a[i] = p.item) != null)
+ ++i;
+ if (p == (p = p.next))
+ p = q.first();
+ } while (p != null && i < n);
+ if ((current = p) == null)
+ exhausted = true;
+ if (i > 0) {
+ batch = i;
+ return Spliterators.spliterator
+ (a, 0, i, Spliterator.ORDERED | Spliterator.NONNULL |
+ Spliterator.CONCURRENT);
+ }
+ }
+ return null;
+ }
+
+ public void forEachRemaining(Consumer super E> action) {
+ Node p;
+ if (action == null) throw new NullPointerException();
+ final ConcurrentLinkedQueue q = this.queue;
+ if (!exhausted &&
+ ((p = current) != null || (p = q.first()) != null)) {
+ exhausted = true;
+ do {
+ E e = p.item;
+ if (p == (p = p.next))
+ p = q.first();
+ if (e != null)
+ action.accept(e);
+ } while (p != null);
+ }
+ }
+
+ public boolean tryAdvance(Consumer super E> action) {
+ Node p;
+ if (action == null) throw new NullPointerException();
+ final ConcurrentLinkedQueue q = this.queue;
+ if (!exhausted &&
+ ((p = current) != null || (p = q.first()) != null)) {
+ E e;
+ do {
+ e = p.item;
+ if (p == (p = p.next))
+ p = q.first();
+ } while (e == null && p != null);
+ if ((current = p) == null)
+ exhausted = true;
+ if (e != null) {
+ action.accept(e);
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public long estimateSize() { return Long.MAX_VALUE; }
+
+ public int characteristics() {
+ return Spliterator.ORDERED | Spliterator.NONNULL |
+ Spliterator.CONCURRENT;
+ }
+ }
+
+ public Spliterator spliterator() {
+ return new CLQSpliterator(this);
+ }
+
/**
* Throws NullPointerException if argument is null.
*
diff --git a/jdk/src/share/classes/java/util/concurrent/ConcurrentSkipListMap.java b/jdk/src/share/classes/java/util/concurrent/ConcurrentSkipListMap.java
index cd93e211890..99faf166834 100644
--- a/jdk/src/share/classes/java/util/concurrent/ConcurrentSkipListMap.java
+++ b/jdk/src/share/classes/java/util/concurrent/ConcurrentSkipListMap.java
@@ -34,7 +34,25 @@
*/
package java.util.concurrent;
-import java.util.*;
+import java.util.AbstractCollection;
+import java.util.AbstractMap;
+import java.util.AbstractSet;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.Spliterator;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
/**
* A scalable concurrent {@link ConcurrentNavigableMap} implementation.
@@ -45,38 +63,38 @@ import java.util.*;
* This class implements a concurrent variant of SkipLists
* providing expected average log(n) time cost for the
- * containsKey, get, put and
- * remove operations and their variants. Insertion, removal,
+ * {@code containsKey}, {@code get}, {@code put} and
+ * {@code remove} operations and their variants. Insertion, removal,
* update, and access operations safely execute concurrently by
* multiple threads. Iterators are weakly consistent, returning
* elements reflecting the state of the map at some point at or since
* the creation of the iterator. They do not throw {@link
- * ConcurrentModificationException}, and may proceed concurrently with
- * other operations. Ascending key ordered views and their iterators
- * are faster than descending ones.
+ * java.util.ConcurrentModificationException ConcurrentModificationException},
+ * and may proceed concurrently with other operations. Ascending key ordered
+ * views and their iterators are faster than descending ones.
*
- *
All Map.Entry pairs returned by methods in this class
+ *
All {@code Map.Entry} pairs returned by methods in this class
* and its views represent snapshots of mappings at the time they were
- * produced. They do not support the Entry.setValue
+ * produced. They do not support the {@code Entry.setValue}
* method. (Note however that it is possible to change mappings in the
- * associated map using put, putIfAbsent, or
- * replace, depending on exactly which effect you need.)
+ * associated map using {@code put}, {@code putIfAbsent}, or
+ * {@code replace}, depending on exactly which effect you need.)
*
- *
Beware that, unlike in most collections, the size
+ *
Beware that, unlike in most collections, the {@code size}
* method is not a constant-time operation. Because of the
* asynchronous nature of these maps, determining the current number
* of elements requires a traversal of the elements, and so may report
* inaccurate results if this collection is modified during traversal.
- * Additionally, the bulk operations putAll, equals,
- * toArray, containsValue, and clear are
+ * Additionally, the bulk operations {@code putAll}, {@code equals},
+ * {@code toArray}, {@code containsValue}, and {@code clear} are
* not guaranteed to be performed atomically. For example, an
- * iterator operating concurrently with a putAll operation
+ * iterator operating concurrently with a {@code putAll} operation
* might view only some of the added elements.
*
*
This class and its views and iterators implement all of the
* optional methods of the {@link Map} and {@link Iterator}
* interfaces. Like most other concurrent collections, this class does
- * not permit the use of null keys or values because some
+ * not permit the use of {@code null} keys or values because some
* null return values cannot be reliably distinguished from the absence of
* elements.
*
@@ -89,7 +107,6 @@ import java.util.*;
* @param the type of mapped values
* @since 1.6
*/
-@SuppressWarnings("unchecked")
public class ConcurrentSkipListMap extends AbstractMap
implements ConcurrentNavigableMap,
Cloneable,
@@ -257,7 +274,7 @@ public class ConcurrentSkipListMap extends AbstractMap
*
* Indexing uses skip list parameters that maintain good search
* performance while using sparser-than-usual indices: The
- * hardwired parameters k=1, p=0.5 (see method randomLevel) mean
+ * hardwired parameters k=1, p=0.5 (see method doPut) mean
* that about one-quarter of the nodes have indices. Of those that
* do, half have one level, a quarter have two, and so on (see
* Pugh's Skip List Cookbook, sec 3.4). The expected total space
@@ -295,6 +312,20 @@ public class ConcurrentSkipListMap extends AbstractMap
* there is a fair amount of near-duplication of code to handle
* variants.
*
+ * To produce random values without interference across threads,
+ * we use within-JDK thread local random support (via the
+ * "secondary seed", to avoid interference with user-level
+ * ThreadLocalRandom.)
+ *
+ * A previous version of this class wrapped non-comparable keys
+ * with their comparators to emulate Comparables when using
+ * comparators vs Comparables. However, JVMs now appear to better
+ * handle infusing comparator-vs-comparable choice into search
+ * loops. Static method cpr(comparator, x, y) is used for all
+ * comparisons, which works well as long as the comparator
+ * argument is set up outside of loops (thus sometimes passed as
+ * an argument to internal methods) to avoid field re-reads.
+ *
* For explanation of algorithms sharing at least a couple of
* features with this one, see Mikhail Fomitchev's thesis
* (http://www.cs.yorku.ca/~mikhail/), Keir Fraser's thesis
@@ -322,12 +353,6 @@ public class ConcurrentSkipListMap extends AbstractMap
private static final long serialVersionUID = -8627078645895051609L;
- /**
- * Generates the initial random seed for the cheaper per-instance
- * random number generators used in randomLevel.
- */
- private static final Random seedGenerator = new Random();
-
/**
* Special value used to identify base-level header
*/
@@ -339,17 +364,12 @@ public class ConcurrentSkipListMap extends AbstractMap
private transient volatile HeadIndex head;
/**
- * The comparator used to maintain order in this map, or null
- * if using natural ordering.
+ * The comparator used to maintain order in this map, or null if
+ * using natural ordering. (Non-private to simplify access in
+ * nested classes.)
* @serial
*/
- private final Comparator super K> comparator;
-
- /**
- * Seed for simple random number generator. Not volatile since it
- * doesn't matter too much if different threads don't see updates.
- */
- private transient int randomSeed;
+ final Comparator super K> comparator;
/** Lazily initialized key set */
private transient KeySet keySet;
@@ -365,12 +385,11 @@ public class ConcurrentSkipListMap extends AbstractMap
* clear, readObject. and ConcurrentSkipListSet.clone.
* (Note that comparator must be separately initialized.)
*/
- final void initialize() {
+ private void initialize() {
keySet = null;
entrySet = null;
values = null;
descendingMap = null;
- randomSeed = seedGenerator.nextInt() | 0x0100; // ensure nonzero
head = new HeadIndex(new Node(null, BASE_HEADER, null),
null, null, 1);
}
@@ -438,7 +457,7 @@ public class ConcurrentSkipListMap extends AbstractMap
* because callers will have already read value field and need
* to use that read (not another done here) and so directly
* test if value points to node.
- * @param n a possibly null reference to a node
+ *
* @return true if this node is a marker node
*/
boolean isMarker() {
@@ -477,7 +496,7 @@ public class ConcurrentSkipListMap extends AbstractMap
*/
if (f == next && this == b.next) {
if (f == null || f.value != f) // not already marked
- appendMarker(f);
+ casNext(f, new Node(f));
else
b.casNext(this, f.next);
}
@@ -487,13 +506,14 @@ public class ConcurrentSkipListMap extends AbstractMap
* Returns value if this node contains a valid key-value pair,
* else null.
* @return this node's value if it isn't a marker or header or
- * is deleted, else null.
+ * is deleted, else null
*/
V getValidValue() {
Object v = value;
if (v == this || v == BASE_HEADER)
return null;
- return (V)v;
+ @SuppressWarnings("unchecked") V vv = (V)v;
+ return vv;
}
/**
@@ -502,10 +522,11 @@ public class ConcurrentSkipListMap extends AbstractMap
* @return new entry or null
*/
AbstractMap.SimpleImmutableEntry createSnapshot() {
- V v = getValidValue();
- if (v == null)
+ Object v = value;
+ if (v == null || v == this || v == BASE_HEADER)
return null;
- return new AbstractMap.SimpleImmutableEntry(key, v);
+ @SuppressWarnings("unchecked") V vv = (V)v;
+ return new AbstractMap.SimpleImmutableEntry(key, vv);
}
// UNSAFE mechanics
@@ -588,7 +609,7 @@ public class ConcurrentSkipListMap extends AbstractMap
* @return true if successful
*/
final boolean unlink(Index succ) {
- return !indexesDeletedNode() && casRight(succ, succ.right);
+ return node.value != null && casRight(succ, succ.right);
}
// Unsafe mechanics
@@ -622,80 +643,12 @@ public class ConcurrentSkipListMap extends AbstractMap
/* ---------------- Comparison utilities -------------- */
/**
- * Represents a key with a comparator as a Comparable.
- *
- * Because most sorted collections seem to use natural ordering on
- * Comparables (Strings, Integers, etc), most internal methods are
- * geared to use them. This is generally faster than checking
- * per-comparison whether to use comparator or comparable because
- * it doesn't require a (Comparable) cast for each comparison.
- * (Optimizers can only sometimes remove such redundant checks
- * themselves.) When Comparators are used,
- * ComparableUsingComparators are created so that they act in the
- * same way as natural orderings. This penalizes use of
- * Comparators vs Comparables, which seems like the right
- * tradeoff.
+ * Compares using comparator or natural ordering if null.
+ * Called only by methods that have performed required type checks.
*/
- static final class ComparableUsingComparator implements Comparable {
- final K actualKey;
- final Comparator super K> cmp;
- ComparableUsingComparator(K key, Comparator super K> cmp) {
- this.actualKey = key;
- this.cmp = cmp;
- }
- public int compareTo(K k2) {
- return cmp.compare(actualKey, k2);
- }
- }
-
- /**
- * If using comparator, return a ComparableUsingComparator, else
- * cast key as Comparable, which may cause ClassCastException,
- * which is propagated back to caller.
- */
- private Comparable super K> comparable(Object key)
- throws ClassCastException {
- if (key == null)
- throw new NullPointerException();
- if (comparator != null)
- return new ComparableUsingComparator((K)key, comparator);
- else
- return (Comparable super K>)key;
- }
-
- /**
- * Compares using comparator or natural ordering. Used when the
- * ComparableUsingComparator approach doesn't apply.
- */
- int compare(K k1, K k2) throws ClassCastException {
- Comparator super K> cmp = comparator;
- if (cmp != null)
- return cmp.compare(k1, k2);
- else
- return ((Comparable super K>)k1).compareTo(k2);
- }
-
- /**
- * Returns true if given key greater than or equal to least and
- * strictly less than fence, bypassing either test if least or
- * fence are null. Needed mainly in submap operations.
- */
- boolean inHalfOpenRange(K key, K least, K fence) {
- if (key == null)
- throw new NullPointerException();
- return ((least == null || compare(key, least) >= 0) &&
- (fence == null || compare(key, fence) < 0));
- }
-
- /**
- * Returns true if given key greater than or equal to least and less
- * or equal to fence. Needed mainly in submap operations.
- */
- boolean inOpenRange(K key, K least, K fence) {
- if (key == null)
- throw new NullPointerException();
- return ((least == null || compare(key, least) >= 0) &&
- (fence == null || compare(key, fence) <= 0));
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ static final int cpr(Comparator c, Object x, Object y) {
+ return (c != null) ? c.compare(x, y) : ((Comparable)x).compareTo(y);
}
/* ---------------- Traversal -------------- */
@@ -708,13 +661,11 @@ public class ConcurrentSkipListMap extends AbstractMap
* @param key the key
* @return a predecessor of key
*/
- private Node findPredecessor(Comparable super K> key) {
+ private Node findPredecessor(Object key, Comparator super K> cmp) {
if (key == null)
throw new NullPointerException(); // don't postpone errors
for (;;) {
- Index q = head;
- Index r = q.right;
- for (;;) {
+ for (Index q = head, r = q.right, d;;) {
if (r != null) {
Node n = r.node;
K k = n.key;
@@ -724,18 +675,16 @@ public class ConcurrentSkipListMap extends AbstractMap
r = q.right; // reread r
continue;
}
- if (key.compareTo(k) > 0) {
+ if (cpr(cmp, key, k) > 0) {
q = r;
r = r.right;
continue;
}
}
- Index d = q.down;
- if (d != null) {
- q = d;
- r = d.right;
- } else
+ if ((d = q.down) == null)
return q.node;
+ q = d;
+ r = d.right;
}
}
}
@@ -784,54 +733,71 @@ public class ConcurrentSkipListMap extends AbstractMap
* @param key the key
* @return node holding key, or null if no such
*/
- private Node findNode(Comparable super K> key) {
- for (;;) {
- Node b = findPredecessor(key);
- Node n = b.next;
- for (;;) {
+ private Node findNode(Object key) {
+ if (key == null)
+ throw new NullPointerException(); // don't postpone errors
+ Comparator super K> cmp = comparator;
+ outer: for (;;) {
+ for (Node b = findPredecessor(key, cmp), n = b.next;;) {
+ Object v; int c;
if (n == null)
- return null;
+ break outer;
Node f = n.next;
if (n != b.next) // inconsistent read
break;
- Object v = n.value;
- if (v == null) { // n is deleted
+ if ((v = n.value) == null) { // n is deleted
n.helpDelete(b, f);
break;
}
- if (v == n || b.value == null) // b is deleted
+ if (b.value == null || v == n) // b is deleted
break;
- int c = key.compareTo(n.key);
- if (c == 0)
+ if ((c = cpr(cmp, key, n.key)) == 0)
return n;
if (c < 0)
- return null;
+ break outer;
b = n;
n = f;
}
}
+ return null;
}
/**
- * Gets value for key using findNode.
- * @param okey the key
+ * Gets value for key. Almost the same as findNode, but returns
+ * the found value (to avoid retries during re-reads)
+ *
+ * @param key the key
* @return the value, or null if absent
*/
- private V doGet(Object okey) {
- Comparable super K> key = comparable(okey);
- /*
- * Loop needed here and elsewhere in case value field goes
- * null just as it is about to be returned, in which case we
- * lost a race with a deletion, so must retry.
- */
- for (;;) {
- Node n = findNode(key);
- if (n == null)
- return null;
- Object v = n.value;
- if (v != null)
- return (V)v;
+ private V doGet(Object key) {
+ if (key == null)
+ throw new NullPointerException();
+ Comparator super K> cmp = comparator;
+ outer: for (;;) {
+ for (Node b = findPredecessor(key, cmp), n = b.next;;) {
+ Object v; int c;
+ if (n == null)
+ break outer;
+ Node f = n.next;
+ if (n != b.next) // inconsistent read
+ break;
+ if ((v = n.value) == null) { // n is deleted
+ n.helpDelete(b, f);
+ break;
+ }
+ if (b.value == null || v == n) // b is deleted
+ break;
+ if ((c = cpr(cmp, key, n.key)) == 0) {
+ @SuppressWarnings("unchecked") V vv = (V)v;
+ return vv;
+ }
+ if (c < 0)
+ break outer;
+ b = n;
+ n = f;
+ }
}
+ return null;
}
/* ---------------- Insertion -------------- */
@@ -839,187 +805,126 @@ public class ConcurrentSkipListMap extends AbstractMap
/**
* Main insertion method. Adds element if not present, or
* replaces value if present and onlyIfAbsent is false.
- * @param kkey the key
- * @param value the value that must be associated with key
+ * @param key the key
+ * @param value the value that must be associated with key
* @param onlyIfAbsent if should not insert if already present
* @return the old value, or null if newly inserted
*/
- private V doPut(K kkey, V value, boolean onlyIfAbsent) {
- Comparable super K> key = comparable(kkey);
- for (;;) {
- Node b = findPredecessor(key);
- Node n = b.next;
- for (;;) {
+ private V doPut(K key, V value, boolean onlyIfAbsent) {
+ Node z; // added node
+ if (key == null)
+ throw new NullPointerException();
+ Comparator super K> cmp = comparator;
+ outer: for (;;) {
+ for (Node b = findPredecessor(key, cmp), n = b.next;;) {
if (n != null) {
+ Object v; int c;
Node f = n.next;
if (n != b.next) // inconsistent read
break;
- Object v = n.value;
- if (v == null) { // n is deleted
+ if ((v = n.value) == null) { // n is deleted
n.helpDelete(b, f);
break;
}
- if (v == n || b.value == null) // b is deleted
+ if (b.value == null || v == n) // b is deleted
break;
- int c = key.compareTo(n.key);
- if (c > 0) {
+ if ((c = cpr(cmp, key, n.key)) > 0) {
b = n;
n = f;
continue;
}
if (c == 0) {
- if (onlyIfAbsent || n.casValue(v, value))
- return (V)v;
- else
- break; // restart if lost race to replace value
+ if (onlyIfAbsent || n.casValue(v, value)) {
+ @SuppressWarnings("unchecked") V vv = (V)v;
+ return vv;
+ }
+ break; // restart if lost race to replace value
}
// else c < 0; fall through
}
- Node z = new Node(kkey, value, n);
+ z = new Node(key, value, n);
if (!b.casNext(n, z))
break; // restart if lost race to append to b
- int level = randomLevel();
- if (level > 0)
- insertIndex(z, level);
- return null;
+ break outer;
}
}
- }
- /**
- * Returns a random level for inserting a new node.
- * Hardwired to k=1, p=0.5, max 31 (see above and
- * Pugh's "Skip List Cookbook", sec 3.4).
- *
- * This uses the simplest of the generators described in George
- * Marsaglia's "Xorshift RNGs" paper. This is not a high-quality
- * generator but is acceptable here.
- */
- private int randomLevel() {
- int x = randomSeed;
- x ^= x << 13;
- x ^= x >>> 17;
- randomSeed = x ^= x << 5;
- if ((x & 0x80000001) != 0) // test highest and lowest bits
- return 0;
- int level = 1;
- while (((x >>>= 1) & 1) != 0) ++level;
- return level;
- }
-
- /**
- * Creates and adds index nodes for the given node.
- * @param z the node
- * @param level the level of the index
- */
- private void insertIndex(Node z, int level) {
- HeadIndex h = head;
- int max = h.level;
-
- if (level <= max) {
+ int rnd = ThreadLocalRandom.nextSecondarySeed();
+ if ((rnd & 0x80000001) == 0) { // test highest and lowest bits
+ int level = 1, max;
+ while (((rnd >>>= 1) & 1) != 0)
+ ++level;
Index idx = null;
- for (int i = 1; i <= level; ++i)
- idx = new Index(z, idx, null);
- addIndex(idx, h, level);
-
- } else { // Add a new level
- /*
- * To reduce interference by other threads checking for
- * empty levels in tryReduceLevel, new levels are added
- * with initialized right pointers. Which in turn requires
- * keeping levels in an array to access them while
- * creating new head index nodes from the opposite
- * direction.
- */
- level = max + 1;
- Index[] idxs = (Index[])new Index,?>[level+1];
- Index idx = null;
- for (int i = 1; i <= level; ++i)
- idxs[i] = idx = new Index(z, idx, null);
-
- HeadIndex oldh;
- int k;
- for (;;) {
- oldh = head;
- int oldLevel = oldh.level;
- if (level <= oldLevel) { // lost race to add level
- k = level;
- break;
- }
- HeadIndex newh = oldh;
- Node