From 8d1ab57065c7ebcc650b5fb4ae098f8b0a35f112 Mon Sep 17 00:00:00 2001 From: Doug Lea Date: Sat, 22 Jul 2023 10:41:42 +0000 Subject: [PATCH] 8301341: LinkedTransferQueue does not respect timeout for poll() 8300663: java/util/concurrent/SynchronousQueue/Fairness.java failed with "Error: fair=true i=0 j=1" 8267502: JDK-8246677 caused 16x performance regression in SynchronousQueue Reviewed-by: alanb --- .../util/concurrent/LinkedTransferQueue.java | 1217 ++++++++--------- .../util/concurrent/SynchronousQueue.java | 870 ++---------- .../LinkedTransferQueue/WhiteBox.java | 76 +- 3 files changed, 782 insertions(+), 1381 deletions(-) diff --git a/src/java.base/share/classes/java/util/concurrent/LinkedTransferQueue.java b/src/java.base/share/classes/java/util/concurrent/LinkedTransferQueue.java index cb6b33ddb7d..5aef5cd12a1 100644 --- a/src/java.base/share/classes/java/util/concurrent/LinkedTransferQueue.java +++ b/src/java.base/share/classes/java/util/concurrent/LinkedTransferQueue.java @@ -47,6 +47,7 @@ import java.util.Queue; import java.util.Spliterator; import java.util.Spliterators; import java.util.concurrent.locks.LockSupport; +import java.util.concurrent.ForkJoinWorkerThread; import java.util.function.Consumer; import java.util.function.Predicate; @@ -107,7 +108,11 @@ public class LinkedTransferQueue extends AbstractQueue * http://www.cs.rochester.edu/u/scott/papers/2009_Scherer_CACM_SSQ.pdf) * additionally arrange that threads enqueuing unmatched data also * block. Dual Transfer Queues support all of these modes, as - * dictated by callers. + * dictated by callers. All enqueue/dequeue operations can be + * handled by a single method (here, "xfer") with parameters + * indicating whether to act as some form of offer, put, poll, + * take, or transfer (each possibly with timeout), as described + * below. * * A FIFO dual queue may be implemented using a variation of the * Michael & Scott (M&S) lock-free queue algorithm @@ -126,44 +131,33 @@ public class LinkedTransferQueue extends AbstractQueue * * The M&S queue algorithm is known to be prone to scalability and * overhead limitations when maintaining (via CAS) these head and - * tail pointers. This has led to the development of - * contention-reducing variants such as elimination arrays (see - * Moir et al http://portal.acm.org/citation.cfm?id=1074013) and - * optimistic back pointers (see Ladan-Mozes & Shavit - * http://people.csail.mit.edu/edya/publications/OptimisticFIFOQueue-journal.pdf). - * However, the nature of dual queues enables a simpler tactic for - * improving M&S-style implementations when dual-ness is needed. + * tail pointers. To address these, dual queues with slack differ + * from plain M&S dual queues by virtue of only sometimes updating + * head or tail pointers when matching, appending, or even + * traversing nodes. * * In a dual queue, each node must atomically maintain its match - * status. While there are other possible variants, we implement - * this here as: for a data-mode node, matching entails CASing an - * "item" field from a non-null data value to null upon match, and - * vice-versa for request nodes, CASing from null to a data - * value. (Note that the linearization properties of this style of - * queue are easy to verify -- elements are made available by - * linking, and unavailable by matching.) Compared to plain M&S - * queues, this property of dual queues requires one additional - * successful atomic operation per enq/deq pair. But it also - * enables lower cost variants of queue maintenance mechanics. (A - * variation of this idea applies even for non-dual queues that - * support deletion of interior elements, such as - * j.u.c.ConcurrentLinkedQueue.) + * status. Matching entails CASing an "item" field from a non-null + * data value to null upon match, and vice-versa for request + * nodes, CASing from null to a data value. (To reduce the need + * for re-reads, we use the compareAndExchange forms of CAS for + * pointer updates, that provide the current value to continue + * with on failure.) Note that the linearization properties of + * this style of queue are easy to verify -- elements are made + * available by linking, and unavailable by matching. Compared to + * plain M&S queues, this property of dual queues requires one + * additional successful atomic operation per enq/deq pair. But it + * also enables lower cost variants of queue maintenance + * mechanics. * - * Once a node is matched, its match status can never again - * change. We may thus arrange that the linked list of them - * contain a prefix of zero or more matched nodes, followed by a - * suffix of zero or more unmatched nodes. (Note that we allow - * both the prefix and suffix to be zero length, which in turn - * means that we do not use a dummy header.) If we were not - * concerned with either time or space efficiency, we could - * correctly perform enqueue and dequeue operations by traversing - * from a pointer to the initial node; CASing the item of the - * first unmatched node on match and CASing the next field of the - * trailing node on appends. While this would be a terrible idea - * in itself, it does have the benefit of not requiring ANY atomic - * updates on head/tail fields. + * Once a node is matched, it is no longer live -- its match + * status can never again change. We may thus arrange that the + * linked list of them contain a prefix of zero or more dead + * nodes, followed by a suffix of zero or more live nodes. Note + * that we allow both the prefix and suffix to be zero length, + * which in turn means that we do not require a dummy header. * - * We introduce here an approach that lies between the extremes of + * We use here an approach that lies between the extremes of * never versus always updating queue (head and tail) pointers. * This offers a tradeoff between sometimes requiring extra * traversal steps to locate the first and/or last unmatched @@ -178,143 +172,43 @@ public class LinkedTransferQueue extends AbstractQueue * * The best value for this "slack" (the targeted maximum distance * between the value of "head" and the first unmatched node, and - * similarly for "tail") is an empirical matter. We have found - * that using very small constants in the range of 1-3 work best - * over a range of platforms. Larger values introduce increasing - * costs of cache misses and risks of long traversal chains, while - * smaller values increase CAS contention and overhead. - * - * Dual queues with slack differ from plain M&S dual queues by - * virtue of only sometimes updating head or tail pointers when - * matching, appending, or even traversing nodes; in order to - * maintain a targeted slack. The idea of "sometimes" may be - * operationalized in several ways. The simplest is to use a - * per-operation counter incremented on each traversal step, and - * to try (via CAS) to update the associated queue pointer - * whenever the count exceeds a threshold. Another, that requires - * more overhead, is to use random number generators to update - * with a given probability per traversal step. - * - * In any strategy along these lines, because CASes updating - * fields may fail, the actual slack may exceed targeted slack. - * However, they may be retried at any time to maintain targets. - * Even when using very small slack values, this approach works - * well for dual queues because it allows all operations up to the - * point of matching or appending an item (hence potentially - * allowing progress by another thread) to be read-only, thus not - * introducing any further contention. As described below, we - * implement this by performing slack maintenance retries only - * after these points. - * - * As an accompaniment to such techniques, traversal overhead can - * be further reduced without increasing contention of head - * pointer updates: Threads may sometimes shortcut the "next" link - * path from the current "head" node to be closer to the currently - * known first unmatched node, and similarly for tail. Again, this - * may be triggered with using thresholds or randomization. + * similarly for "tail") is an empirical matter. Larger values + * introduce increasing costs of cache misses and risks of long + * traversal chains and out-of-order updates, while smaller values + * increase CAS contention and overhead. Using the smallest + * non-zero value of one is both simple and empirically a good + * choice in most applicatkions. The slack value is hard-wired: a + * path greater than one is usually implemented by checking + * equality of traversal pointers. Because CASes updating fields + * attempting to do so may stall, the writes may appear out of + * order (an older CAS from the same head or tail may execute + * after a newer one), the actual slack may exceed targeted + * slack. To reduce impact, other threads may help update by + * unsplicing dead nodes while traversing. * * These ideas must be further extended to avoid unbounded amounts * of costly-to-reclaim garbage caused by the sequential "next" * links of nodes starting at old forgotten head nodes: As first * described in detail by Boehm - * (http://portal.acm.org/citation.cfm?doid=503272.503282), if a GC - * delays noticing that any arbitrarily old node has become + * (http://portal.acm.org/citation.cfm?doid=503272.503282), if a + * GC delays noticing that any arbitrarily old node has become * garbage, all newer dead nodes will also be unreclaimed. * (Similar issues arise in non-GC environments.) To cope with - * this in our implementation, upon CASing to advance the head - * pointer, we set the "next" link of the previous head to point - * only to itself; thus limiting the length of chains of dead nodes. - * (We also take similar care to wipe out possibly garbage - * retaining values held in other Node fields.) However, doing so - * adds some further complexity to traversal: If any "next" - * pointer links to itself, it indicates that the current thread - * has lagged behind a head-update, and so the traversal must - * continue from the "head". Traversals trying to find the - * current tail starting from "tail" may also encounter - * self-links, in which case they also continue at "head". + * this in our implementation, upon advancing the head pointer, we + * set the "next" link of the previous head to point only to + * itself; thus limiting the length of chains of dead nodes. (We + * also take similar care to wipe out possibly garbage retaining + * values held in other node fields.) This is easy to accommodate + * in the primary xfer method, but adds a lot of complexity to + * Collection operations including traversal; mainly because if + * any "next" pointer links to itself, the current thread has + * lagged behind a head-update, and so must restart. * - * It is tempting in slack-based scheme to not even use CAS for - * updates (similarly to Ladan-Mozes & Shavit). However, this - * cannot be done for head updates under the above link-forgetting - * mechanics because an update may leave head at a detached node. - * And while direct writes are possible for tail updates, they - * increase the risk of long retraversals, and hence long garbage - * chains, which can be much more costly than is worthwhile - * considering that the cost difference of performing a CAS vs - * write is smaller when they are not triggered on each operation - * (especially considering that writes and CASes equally require - * additional GC bookkeeping ("write barriers") that are sometimes - * more costly than the writes themselves because of contention). + * *** Blocking *** * - * *** Overview of implementation *** - * - * We use a threshold-based approach to updates, with a slack - * threshold of two -- that is, we update head/tail when the - * current pointer appears to be two or more steps away from the - * first/last node. The slack value is hard-wired: a path greater - * than one is naturally implemented by checking equality of - * traversal pointers except when the list has only one element, - * in which case we keep slack threshold at one. Avoiding tracking - * explicit counts across method calls slightly simplifies an - * already-messy implementation. Using randomization would - * probably work better if there were a low-quality dirt-cheap - * per-thread one available, but even ThreadLocalRandom is too - * heavy for these purposes. - * - * With such a small slack threshold value, it is not worthwhile - * to augment this with path short-circuiting (i.e., unsplicing - * interior nodes) except in the case of cancellation/removal (see - * below). - * - * All enqueue/dequeue operations are handled by the single method - * "xfer" with parameters indicating whether to act as some form - * of offer, put, poll, take, or transfer (each possibly with - * timeout). The relative complexity of using one monolithic - * method outweighs the code bulk and maintenance problems of - * using separate methods for each case. - * - * Operation consists of up to two phases. The first is implemented - * in method xfer, the second in method awaitMatch. - * - * 1. Traverse until matching or appending (method xfer) - * - * Conceptually, we simply traverse all nodes starting from head. - * If we encounter an unmatched node of opposite mode, we match - * it and return, also updating head (by at least 2 hops) to - * one past the matched node (or the node itself if it's the - * pinned trailing node). Traversals also check for the - * possibility of falling off-list, in which case they restart. - * - * If the trailing node of the list is reached, a match is not - * possible. If this call was untimed poll or tryTransfer - * (argument "how" is NOW), return empty-handed immediately. - * Else a new node is CAS-appended. On successful append, if - * this call was ASYNC (e.g. offer), an element was - * successfully added to the end of the queue and we return. - * - * Of course, this naive traversal is O(n) when no match is - * possible. We optimize the traversal by maintaining a tail - * pointer, which is expected to be "near" the end of the list. - * It is only safe to fast-forward to tail (in the presence of - * arbitrary concurrent changes) if it is pointing to a node of - * the same mode, even if it is dead (in this case no preceding - * node could still be matchable by this traversal). If we - * need to restart due to falling off-list, we can again - * fast-forward to tail, but only if it has changed since the - * last traversal (else we might loop forever). If tail cannot - * be used, traversal starts at head (but in this case we - * expect to be able to match near head). As with head, we - * CAS-advance the tail pointer by at least two hops. - * - * 2. Await match or cancellation (method awaitMatch) - * - * Wait for another thread to match node; instead cancelling if - * the current thread was interrupted or the wait timed out. To - * improve performance in common single-source / single-sink - * usages when there are more tasks that cores, an initial - * Thread.yield is tried when there is apparently only one - * waiter. In other cases, waiters may help with some - * bookkeeping, then park/unpark. + * The DualNode class is shared with class SynchronousQueue. It + * houses method await, which is used for all blocking control, as + * described below in DualNode internal documentation. * * ** Unlinking removed interior nodes ** * @@ -330,15 +224,13 @@ public class LinkedTransferQueue extends AbstractQueue * unreachable in this way: (1) If s is the trailing node of list * (i.e., with null next), then it is pinned as the target node * for appends, so can only be removed later after other nodes are - * appended. (2) We cannot necessarily unlink s given a - * predecessor node that is matched (including the case of being - * cancelled): the predecessor may already be unspliced, in which - * case some previous reachable node may still point to s. - * (For further explanation see Herlihy & Shavit "The Art of - * Multiprocessor Programming" chapter 9). Although, in both - * cases, we can rule out the need for further action if either s - * or its predecessor are (or can be made to be) at, or fall off - * from, the head of list. + * appended. (2) Unless we know it is already off-list, we cannot + * necessarily unlink s given a predecessor node that is matched + * (including the case of being cancelled): the predecessor may + * already be unspliced, in which case some previous reachable + * node may still point to s. (For further explanation see + * Herlihy & Shavit "The Art of Multiprocessor Programming" + * chapter 9). * * Without taking these into account, it would be possible for an * unbounded number of supposedly removed nodes to remain reachable. @@ -350,186 +242,439 @@ public class LinkedTransferQueue extends AbstractQueue * * When these cases arise, rather than always retraversing the * entire list to find an actual predecessor to unlink (which - * won't help for case (1) anyway), we record the need to sweep the - * next time any thread would otherwise block in awaitMatch. Also, - * because traversal operations on the linked list of nodes are a - * natural opportunity to sweep dead nodes, we generally do so, - * including all the operations that might remove elements as they - * traverse, such as removeIf and Iterator.remove. This largely - * eliminates long chains of dead interior nodes, except from - * cancelled or timed out blocking operations. + * won't help for case (1) anyway), we record a conservative + * estimate of possible unsplice failures (in "sweepVotes"). + * We trigger a full sweep when the estimate exceeds a threshold + * ("SWEEP_THRESHOLD") indicating the maximum number of estimated + * removal failures to tolerate before sweeping through, unlinking + * cancelled nodes that were not unlinked upon initial removal. + * We perform sweeps by the thread hitting threshold (rather than + * background threads or by spreading work to other threads) + * because in the main contexts in which removal occurs, the + * caller is timed-out or cancelled, which are not time-critical + * enough to warrant the overhead that alternatives would impose + * on other threads. + * + * Because the sweepVotes estimate is conservative, and because + * nodes become unlinked "naturally" as they fall off the head of + * the queue, and because we allow votes to accumulate even while + * sweeps are in progress, there are typically significantly fewer + * such nodes than estimated. * * Note that we cannot self-link unlinked interior nodes during * sweeps. However, the associated garbage chains terminate when * some successor ultimately falls off the head of the list and is * self-linked. + * + * *** Revision notes *** + * + * This version differs from previous releases as follows: + * + * * Class DualNode replaces Qnode, with fields and methods + * that apply to any match-based dual data structure, and now + * usable in other j.u.c classes. in particular, SynchronousQueue. + * * Blocking control (in class DualNode) accommodates + * VirtualThreads and (perhaps virtualized) uniprocessors. + * * All fields of this class (LinkedTransferQueue) are + * default-initializable (to null), allowing further extension + * (in particular, SynchronousQueue.Transferer) + * * Head and tail fields are lazily initialized rather than set + * to a dummy node, while also reducing retries under heavy + * contention and misorderings, and relaxing some accesses, + * requiring accommodation in many places (as well as + * adjustments in WhiteBox tests). */ /** - * The number of nanoseconds for which it is faster to spin - * rather than to use timed park. A rough estimate suffices. - * Using a power of two minus one simplifies some comparisons. + * Node for linked dual data structures. Uses type Object, not E, + * for items to allow cancellation and forgetting after use. Only + * field "item" is declared volatile (with bypasses for + * pre-publication and post-match writes), although field "next" + * is also CAS-able. Other accesses are constrained by context + * (including dependent chains of next's headed by a volatile + * read). + * + * This class also arranges blocking while awaiting matches. + * Control of blocking (and thread scheduling in general) for + * possibly-synchronous queues (and channels etc constructed + * from them) must straddle two extremes: If there are too few + * underlying cores for a fulfilling party to continue, then + * the caller must park to cause a context switch. On the + * other hand, if the queue is busy with approximately the + * same number of independent producers and consumers, then + * that context switch may cause an order-of-magnitude + * slowdown. Many cases are somewhere in-between, in which + * case threads should try spinning and then give up and + * block. We deal with this as follows: + * + * 1. Callers to method await indicate eligibility for + * spinning when the node is either the only waiting node, or + * the next matchable node is still spinning. Otherwise, the + * caller may block (almost) immediately. + * + * 2. Even if eligible to spin, a caller blocks anyway in two + * cases where it is normally best: If the thread isVirtual, + * or the system is a uniprocessor. Uniprocessor status can + * vary over time (due to virtualization at other system + * levels), but checking Runtime availableProcessors can be + * slow and may itself acquire blocking locks, so we only + * occasionally (using ThreadLocalRandom) update when an + * otherwise-eligible spin elapses. + * + * 3. When enabled, spins should be long enough to cover + * bookeeping overhead of almost-immediate fulfillments, but + * much less than the expected time of a (non-virtual) + * park/unpark context switch. The optimal value is + * unknowable, in part because the relative costs of + * Thread.onSpinWait versus park/unpark vary across platforms. + * The current value is an empirical compromise across tested + * platforms. + * + * 4. When using timed waits, callers spin instead of invoking + * timed park if the remaining time is less than the likely cost + * of park/unpark. This also avoids re-parks when timed park + * returns just barely too soon. As is the case in most j.u.c + * blocking support, untimed waits use ManagedBlockers when + * callers are ForkJoin threads, but timed waits use plain + * parkNanos, under the rationale that known-to-be transient + * blocking doesn't require compensation. (This decision should be + * revisited here and elsewhere to deal with very long timeouts.) + * + * 5. Park/unpark signalling otherwise relies on a Dekker-like + * scheme in which the caller advertises the need to unpark by + * setting its waiter field, followed by a full fence and recheck + * before actually parking. An explicit fence in used here rather + * than unnecessarily requiring volatile accesses elsewhere. This + * fence also separates accesses to field isUniprocessor. + * + * 6. To make the above work, callers must precheck that + * timeouts are not already elapsed, and that interruptible + * operations were not already interrupted on call to the + * corresponding queue operation. Cancellation on timeout or + * interrupt otherwise proceeds by trying to fulfill with an + * impossible value (which is one reason that we use Object + * types here rather than typed fields). */ - static final long SPIN_FOR_TIMEOUT_THRESHOLD = 1023L; - - /** - * The maximum number of estimated removal failures (sweepVotes) - * to tolerate before sweeping through the queue unlinking - * cancelled nodes that were not unlinked upon initial - * removal. See above for explanation. The value must be at least - * two to avoid useless sweeps when removing trailing nodes. - */ - static final int SWEEP_THRESHOLD = 32; - - /** - * Queue nodes. Uses Object, not E, for items to allow forgetting - * them after use. Writes that are intrinsically ordered wrt - * other accesses or CASes use simple relaxed forms. - */ - static final class Node implements ForkJoinPool.ManagedBlocker { - final boolean isData; // false if this is a request node + static final class DualNode implements ForkJoinPool.ManagedBlocker { volatile Object item; // initially non-null if isData; CASed to match - volatile Node next; - volatile Thread waiter; // null when not waiting for a match + DualNode next; // accessed only in chains of volatile ops + Thread waiter; // access order constrained by context + final boolean isData; // false if this is a request node - /** - * Constructs a data node holding item if item is non-null, - * else a request node. Uses relaxed write because item can - * only be seen after piggy-backing publication via CAS. - */ - Node(Object item) { - ITEM.set(this, item); - isData = (item != null); + DualNode(Object item, boolean isData) { + ITEM.set(this, item); // relaxed write before publication + this.isData = isData; } - /** Constructs a (matched data) dummy node. */ - Node() { - isData = true; + // Atomic updates + final Object cmpExItem(Object cmp, Object val) { // try to match + return ITEM.compareAndExchange(this, cmp, val); + } + final DualNode cmpExNext(DualNode cmp, DualNode val) { + return (DualNode)NEXT.compareAndExchange(this, cmp, val); } - final boolean casNext(Node cmp, Node val) { - // assert val != null; - return NEXT.compareAndSet(this, cmp, val); - } - - final boolean casItem(Object cmp, Object val) { - // assert isData == (cmp != null); - // assert isData == (val == null); - // assert !(cmp instanceof Node); - return ITEM.compareAndSet(this, cmp, val); + /** Returns true if this node has been matched or cancelled */ + final boolean matched() { + return isData != (item != null); } /** - * Links node to itself to avoid garbage retention. Called - * only after CASing head field, so uses relaxed write. + * Relaxed write to replace reference to user data with + * self-link. Can be used only if not already null after + * match. */ - final void selfLink() { - // assert isMatched(); - NEXT.setRelease(this, this); + final void selfLinkItem() { + ITEM.set(this, this); } - final void appendRelaxed(Node next) { - // assert next != null; - // assert this.next == null; - NEXT.setOpaque(this, next); - } + /** The number of times to spin when eligible */ + private static final int SPINS = 1 << 7; /** - * Returns true if this node has been matched, including the - * case of artificial matches due to cancellation. + * The number of nanoseconds for which it is faster to spin + * rather than to use timed park. A rough estimate suffices. */ - final boolean isMatched() { - return isData == (item == null); - } + private static final long SPIN_FOR_TIMEOUT_THRESHOLD = 1L << 10; - /** Tries to CAS-match this node; if successful, wakes waiter. */ - final boolean tryMatch(Object cmp, Object val) { - if (casItem(cmp, val)) { - LockSupport.unpark(waiter); - return true; + /** + * True if system is a uniprocessor, occasionally rechecked. + */ + private static boolean isUniprocessor = + (Runtime.getRuntime().availableProcessors() == 1); + + /** + * Refresh rate (probablility) for updating isUniprocessor + * field, to reduce the likeihood that multiple calls to await + * will contend invoking Runtime.availableProcessors. Must be + * a power of two minus one. + */ + private static final int UNIPROCESSOR_REFRESH_RATE = (1 << 5) - 1; + + /** + * Possibly blocks until matched or caller gives up. + * + * @param e the comparison value for checking match + * @param ns timeout, or Long.MAX_VALUE if untimed + * @param blocker the LockSupport.setCurrentBlocker argument + * @param spin true if should spin when enabled + * @return matched item, or e if unmatched on interrupt or timeout + */ + final Object await(Object e, long ns, Object blocker, boolean spin) { + Object m; // the match or e if none + boolean timed = (ns != Long.MAX_VALUE); + long deadline = (timed) ? System.nanoTime() + ns : 0L; + boolean upc = isUniprocessor; // don't spin but later recheck + Thread w = Thread.currentThread(); + if (w.isVirtual()) // don't spin + spin = false; + int spins = (spin & !upc) ? SPINS : 0; // negative when may park + while ((m = item) == e) { + if (spins >= 0) { + if (--spins >= 0) + Thread.onSpinWait(); + else { // prepare to park + if (spin) // occasionally recheck + checkForUniprocessor(upc); + LockSupport.setCurrentBlocker(blocker); + waiter = w; // ensure ordering + VarHandle.fullFence(); + } + } else if (w.isInterrupted() || + (timed && // try to cancel with impossible match + ((ns = deadline - System.nanoTime()) <= 0L))) { + m = cmpExItem(e, (e == null) ? this : null); + break; + } else if (timed) { + if (ns < SPIN_FOR_TIMEOUT_THRESHOLD) + Thread.onSpinWait(); + else + LockSupport.parkNanos(ns); + } else if (w instanceof ForkJoinWorkerThread) { + try { + ForkJoinPool.managedBlock(this); + } catch (InterruptedException cannotHappen) { } + } else + LockSupport.park(); } - return false; + if (spins < 0) { + LockSupport.setCurrentBlocker(null); + waiter = null; + } + return m; } - /** - * Returns true if a node with the given mode cannot be - * appended to this node because this node is unmatched and - * has opposite data mode. - */ - final boolean cannotPrecede(boolean haveData) { - boolean d = isData; - return d != haveData && d != (item == null); + /** Occasionally updates isUniprocessor field */ + private void checkForUniprocessor(boolean prev) { + int r = ThreadLocalRandom.nextSecondarySeed(); + if ((r & UNIPROCESSOR_REFRESH_RATE) == 0) { + boolean u = (Runtime.getRuntime().availableProcessors() == 1); + if (u != prev) + isUniprocessor = u; + } } + // ManagedBlocker support public final boolean isReleasable() { - return (isData == (item == null)) || - Thread.currentThread().isInterrupted(); + return (matched() || Thread.currentThread().isInterrupted()); } - public final boolean block() { while (!isReleasable()) LockSupport.park(); return true; } - private static final long serialVersionUID = -3375979862319811754L; + // VarHandle mechanics + static final VarHandle ITEM; + static final VarHandle NEXT; + static { + try { + Class tn = DualNode.class; + MethodHandles.Lookup l = MethodHandles.lookup(); + ITEM = l.findVarHandle(tn, "item", Object.class); + NEXT = l.findVarHandle(tn, "next", tn); + } catch (ReflectiveOperationException e) { + throw new ExceptionInInitializerError(e); + } + // Reduce the risk of rare disastrous classloading in first call to + // LockSupport.park: https://bugs.openjdk.org/browse/JDK-8074773 + Class ensureLoaded = LockSupport.class; + } } /** - * A node from which the first live (non-matched) node (if any) - * can be reached in O(1) time. + * Unless empty (in which case possibly null), a node from which + * all live nodes are reachable. * Invariants: - * - all live nodes are reachable from head via .next - * - head != null - * - (tmp = head).next != tmp || tmp != head + * - head is never self-linked * Non-invariants: * - head may or may not be live - * - it is permitted for tail to lag behind head, that is, for tail - * to not be reachable from head! + * + * This field is used by subclass SynchronousQueue.Transferer to + * record the top of a Lifo stack, with tail always null, but + * otherwise maintaining the same properties. */ - transient volatile Node head; + transient volatile DualNode head; /** - * A node from which the last node on list (that is, the unique - * node with node.next == null) can be reached in O(1) time. - * Invariants: - * - the last node is always reachable from tail via .next - * - tail != null + * Unless null, a node from which the last node on list (that is, + * the unique node with node.next == null), if one exists, can be + * reached. * Non-invariants: * - tail may or may not be live - * - it is permitted for tail to lag behind head, that is, for tail - * to not be reachable from head! - * - tail.next may or may not be self-linked. + * - tail may be the same as head + * - tail may or may not be self-linked. + * - tail may lag behind head, so need not be reachable from head */ - private transient volatile Node tail; + transient volatile DualNode tail; /** The number of apparent failures to unsplice cancelled nodes */ - private transient volatile boolean needSweep; + transient volatile int sweepVotes; - private boolean casTail(Node cmp, Node val) { - // assert cmp != null; - // assert val != null; - return TAIL.compareAndSet(this, cmp, val); + // Atomic updates + + final DualNode cmpExTail(DualNode cmp, DualNode val) { + return (DualNode)TAIL.compareAndExchange(this, cmp, val); + } + final DualNode cmpExHead(DualNode cmp, DualNode val) { + return (DualNode)HEAD.compareAndExchange(this, cmp, val); } - private boolean casHead(Node cmp, Node val) { - return HEAD.compareAndSet(this, cmp, val); + /** + * The maximum number of estimated removal failures (sweepVotes) + * to tolerate before sweeping through the queue unlinking + * dead nodes that were initially pinned. Must be a power of + * two minus one, at least 3. + */ + static final int SWEEP_THRESHOLD = (1 << 4) - 1; + + /** + * Adds a sweepVote and returns true if triggered threshold. + */ + final boolean sweepNow() { + return (SWEEP_THRESHOLD == + ((int)SWEEPVOTES.getAndAdd(this, 1) & (SWEEP_THRESHOLD))); + } + + /** + * Implements all queuing methods. Loops, trying: + * + * * If not initialized, try to add new node (unless immediate) and exit + * * If tail has same mode, start traversing at tail for a likely + * append, else at head for a likely match + * * Traverse over dead or wrong-mode nodes until finding a spot + * to match/append, or falling off the list because of self-links. + * * On success, update head or tail if slacked, and possibly wait, + * depending on ns argument + * + * @param e the item or null for take + * @param ns timeout or negative if async, 0 if immediate, + * Long.MAX_VALUE if untimed + * @return an item if matched, else e + */ + final Object xfer(Object e, long ns) { + boolean haveData = (e != null); + Object m; // the match or e if none + DualNode s = null, p; // enqueued node and its predecessor + restart: for (DualNode prevp = null;;) { + DualNode h, t, q; + if ((h = head) == null && // initialize unless immediate + (ns == 0L || + (h = cmpExHead(null, s = new DualNode(e, haveData))) == null)) { + p = null; // no predecessor + break; // else lost init race + } + p = (t = tail) != null && t.isData == haveData && t != prevp ? t : h; + prevp = p; // avoid known self-linked tail path + do { + m = p.item; + q = p.next; + if (p.isData != haveData && haveData != (m != null) && + p.cmpExItem(m, e) == m) { + Thread w = p.waiter; // matched complementary node + if (p != h && h == cmpExHead(h, (q == null) ? p : q)) + h.next = h; // advance head; self-link old + LockSupport.unpark(w); + return m; + } else if (q == null) { + if (ns == 0L) // try to append unless immediate + break restart; + if (s == null) + s = new DualNode(e, haveData); + if ((q = p.cmpExNext(null, s)) == null) { + if (p != t) + cmpExTail(t, s); + break restart; + } + } + } while (p != (p = q)); // restart if self-linked + } + if (s == null || ns <= 0L) + m = e; // don't wait + else if ((m = s.await(e, ns, this, // spin if at or near head + p == null || p.waiter == null)) == e) + unsplice(p, s); // cancelled + else if (m != null) + s.selfLinkItem(); + + return m; + } + + /* -------------- Removals -------------- */ + + /** + * Unlinks (now or later) the given (non-live) node with given + * predecessor. See above for rationale. + * + * @param pred if nonnull, a node that was at one time known to be the + * predecessor of s (else s may have been head) + * @param s the node to be unspliced + */ + private void unsplice(DualNode pred, DualNode s) { + boolean seen = false; // try removing by collapsing head + for (DualNode h = head, p = h, f; p != null;) { + boolean matched; + if (p == s) + matched = seen = true; + else + matched = p.matched(); + if ((f = p.next) == p) + p = h = head; + else if (f != null && matched) + p = f; + else { + if (p != h && cmpExHead(h, p) == h) + h.next = h; // self-link + break; + } + } + DualNode sn; // try to unsplice if not pinned + if (!seen && + pred != null && pred.next == s && s != null && (sn = s.next) != s && + (sn == null || pred.cmpExNext(s, sn) != s || pred.matched()) && + sweepNow()) { // occasionally sweep if might not have been removed + for (DualNode p = head, f, n, u; + p != null && (f = p.next) != null && (n = f.next) != null;) { + p = (f == p ? head : // stale + !f.matched() ? f : // skip + f == (u = p.cmpExNext(f, n)) ? n : u); // unspliced + } + } } /** * Tries to CAS pred.next (or head, if pred is null) from c to p. * Caller must ensure that we're not unlinking the trailing node. */ - private boolean tryCasSuccessor(Node pred, Node c, Node p) { - // assert p != null; - // assert c.isData != (c.item != null); - // assert c != p; + final boolean tryCasSuccessor(DualNode pred, DualNode c, DualNode p) { + // assert p != null && c.matched() && c != p; if (pred != null) - return pred.casNext(c, p); - if (casHead(c, p)) { - c.selfLink(); - return true; - } - return false; + return pred.cmpExNext(c, p) == c; + else if (cmpExHead(c, p) != c) + return false; + if (c != null) + c.next = c; + + return true; } /** @@ -540,147 +685,29 @@ public class LinkedTransferQueue extends AbstractQueue * @param q p.next: the next live node, or null if at end * @return pred if pred still alive and CAS succeeded; else p */ - private Node skipDeadNodes(Node pred, Node c, Node p, Node q) { - // assert pred != c; - // assert p != q; - // assert c.isMatched(); - // assert p.isMatched(); - if (q == null) { - // Never unlink trailing node. - if (c == p) return pred; + final DualNode skipDeadNodes(DualNode pred, DualNode c, + DualNode p, DualNode q) { + // assert pred != c && p != q; && c.matched() && p.matched(); + if (q == null) { // Never unlink trailing node. + if (c == p) + return pred; q = p; } - return (tryCasSuccessor(pred, c, q) - && (pred == null || !pred.isMatched())) + return (tryCasSuccessor(pred, c, q) && (pred == null || !pred.matched())) ? pred : p; } /** - * Collapses dead (matched) nodes from h (which was once head) to p. - * Caller ensures all nodes from h up to and including p are dead. + * Tries to match the given object only if p is a data + * node. Signals waiter on success. */ - private void skipDeadNodesNearHead(Node h, Node p) { - // assert h != null; - // assert h != p; - // assert p.isMatched(); - for (;;) { - final Node q; - if ((q = p.next) == null) break; - else if (!q.isMatched()) { p = q; break; } - else if (p == (p = q)) return; + final boolean tryMatchData(DualNode p, Object x) { + if (p != null && p.isData && + x != null && p.cmpExItem(x, null) == x) { + LockSupport.unpark(p.waiter); + return true; } - if (casHead(h, p)) - h.selfLink(); - } - - /* Possible values for "how" argument in xfer method. */ - - private static final int NOW = 0; // for untimed poll, tryTransfer - private static final int ASYNC = 1; // for offer, put, add - private static final int SYNC = 2; // for transfer, take - private static final int TIMED = 3; // for timed poll, tryTransfer - - /** - * Implements all queuing methods. See above for explanation. - * - * @param e the item or null for take - * @param haveData true if this is a put, else a take - * @param how NOW, ASYNC, SYNC, or TIMED - * @param nanos timeout in nanosecs, used only if mode is TIMED - * @return an item if matched, else e - * @throws NullPointerException if haveData mode but e is null - */ - @SuppressWarnings("unchecked") - private E xfer(E e, boolean haveData, int how, long nanos) { - if (haveData && (e == null)) - throw new NullPointerException(); - - restart: for (Node s = null, t = null, h = null;;) { - for (Node p = (t != (t = tail) && t.isData == haveData) ? t - : (h = head);; ) { - final Node q; final Object item; - if (p.isData != haveData - && haveData == ((item = p.item) == null)) { - if (h == null) h = head; - if (p.tryMatch(item, e)) { - if (h != p) skipDeadNodesNearHead(h, p); - return (E) item; - } - } - if ((q = p.next) == null) { - if (how == NOW) return e; - if (s == null) s = new Node(e); - if (!p.casNext(null, s)) continue; - if (p != t) casTail(t, s); - if (how == ASYNC) return e; - return awaitMatch(s, p, e, (how == TIMED), nanos); - } - if (p == (p = q)) continue restart; - } - } - } - - /** - * Possibly blocks until node s is matched or caller gives up. - * - * @param s the waiting node - * @param pred the predecessor of s, or null if unknown (the null - * case does not occur in any current calls but may in possible - * future extensions) - * @param e the comparison value for checking match - * @param timed if true, wait only until timeout elapses - * @param nanos timeout in nanosecs, used only if timed is true - * @return matched item, or e if unmatched on interrupt or timeout - */ - @SuppressWarnings("unchecked") - private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) { - final boolean isData = s.isData; - final long deadline = timed ? System.nanoTime() + nanos : 0L; - final Thread w = Thread.currentThread(); - int stat = -1; // -1: may yield, +1: park, else 0 - Object item; - while ((item = s.item) == e) { - if (needSweep) // help clean - sweep(); - else if ((timed && nanos <= 0L) || w.isInterrupted()) { - if (s.casItem(e, (e == null) ? s : null)) { - unsplice(pred, s); // cancelled - return e; - } - } - else if (stat <= 0) { - if (pred != null && pred.next == s) { - if (stat < 0 && - (pred.isData != isData || pred.isMatched())) { - stat = 0; // yield once if first - Thread.yield(); - } - else { - stat = 1; - s.waiter = w; // enable unpark - } - } // else signal in progress - } - else if ((item = s.item) != e) - break; // recheck - else if (!timed) { - LockSupport.setCurrentBlocker(this); - try { - ForkJoinPool.managedBlock(s); - } catch (InterruptedException cannotHappen) { } - LockSupport.setCurrentBlocker(null); - } - else { - nanos = deadline - System.nanoTime(); - if (nanos > SPIN_FOR_TIMEOUT_THRESHOLD) - LockSupport.parkNanos(this, nanos); - } - } - if (stat == 1) - WAITER.set(s, null); - if (!isData) - ITEM.set(s, s); // self-link to avoid garbage - return (E) item; + return false; } /* -------------- Traversal methods -------------- */ @@ -690,40 +717,39 @@ public class LinkedTransferQueue extends AbstractQueue * Callers must recheck if the returned node is unmatched * before using. */ - final Node firstDataNode() { - Node first = null; - restartFromHead: for (;;) { - Node h = head, p = h; - while (p != null) { - if (p.item != null) { - if (p.isData) { - first = p; - break; - } - } - else if (!p.isData) - break; - final Node q; - if ((q = p.next) == null) - break; - if (p == (p = q)) - continue restartFromHead; + final DualNode firstDataNode() { + for (DualNode h = head, p = h, q, u; p != null;) { + boolean isData = p.isData; + Object item = p.item; + if (isData && item != null) // is live data + return p; + else if (!isData && item == null) // is live request + break; + else if ((q = p.next) == null) // end of list + break; + else if (p == q) // self-link; restart + p = h = head; + else if (p == h) // traverse past header + p = q; + else if ((u = cmpExHead(h, q)) != h) + p = h = u; // lost update race + else { + h.next = h; // collapse; self-link + p = h = q; } - if (p != h && casHead(h, p)) - h.selfLink(); - return first; } + return null; } /** * Traverses and counts unmatched nodes of the given mode. * Used by methods size and getWaitingConsumerCount. */ - private int countOfMode(boolean data) { + final int countOfMode(boolean data) { restartFromHead: for (;;) { int count = 0; - for (Node p = head; p != null;) { - if (!p.isMatched()) { + for (DualNode p = head; p != null;) { + if (!p.matched()) { if (p.isData != data) return 0; if (++count == Integer.MAX_VALUE) @@ -741,7 +767,7 @@ public class LinkedTransferQueue extends AbstractQueue restartFromHead: for (;;) { int charLength = 0; int size = 0; - for (Node p = head; p != null;) { + for (DualNode p = head; p != null;) { Object item = p.item; if (p.isData) { if (item != null) { @@ -770,7 +796,7 @@ public class LinkedTransferQueue extends AbstractQueue Object[] x = a; restartFromHead: for (;;) { int size = 0; - for (Node p = head; p != null;) { + for (DualNode p = head; p != null;) { Object item = p.item; if (p.isData) { if (item != null) { @@ -863,27 +889,28 @@ public class LinkedTransferQueue extends AbstractQueue * but O(n) in the worst case, when lastRet is concurrently deleted. */ final class Itr implements Iterator { - private Node nextNode; // next node to return item for - private E nextItem; // the corresponding item - private Node lastRet; // last returned node, to support remove - private Node ancestor; // Helps unlink lastRet on remove() + private DualNode nextNode; // next node to return item for + private E nextItem; // the corresponding item + private DualNode lastRet; // last returned node, to support remove + private DualNode ancestor; // Helps unlink lastRet on remove() /** * Moves to next node after pred, or first node if pred null. */ @SuppressWarnings("unchecked") - private void advance(Node pred) { - for (Node p = (pred == null) ? head : pred.next, c = p; + private void advance(DualNode pred) { + for (DualNode p = (pred == null) ? head : pred.next, c = p; p != null; ) { - final Object item; - if ((item = p.item) != null && p.isData) { + boolean isData = p.isData; + Object item = p.item; + if (isData && item != null) { nextNode = p; nextItem = (E) item; if (c != p) tryCasSuccessor(pred, c, p); return; } - else if (!p.isData && item == null) + else if (!isData && item == null) break; if (c != p && !tryCasSuccessor(pred, c, c = p)) { pred = p; @@ -907,7 +934,7 @@ public class LinkedTransferQueue extends AbstractQueue } public final E next() { - final Node p; + DualNode p; if ((p = nextNode) == null) throw new NoSuchElementException(); E e = nextItem; advance(lastRet = p); @@ -916,28 +943,26 @@ public class LinkedTransferQueue extends AbstractQueue public void forEachRemaining(Consumer action) { Objects.requireNonNull(action); - Node q = null; - for (Node p; (p = nextNode) != null; advance(q = p)) + DualNode q = null; + for (DualNode p; (p = nextNode) != null; advance(q = p)) action.accept(nextItem); if (q != null) lastRet = q; } public final void remove() { - final Node lastRet = this.lastRet; + final DualNode lastRet = this.lastRet; if (lastRet == null) throw new IllegalStateException(); this.lastRet = null; if (lastRet.item == null) // already deleted? return; // Advance ancestor, collapsing intervening dead nodes - Node pred = ancestor; - for (Node p = (pred == null) ? head : pred.next, c = p, q; + DualNode pred = ancestor; + for (DualNode p = (pred == null) ? head : pred.next, c = p, q; p != null; ) { if (p == lastRet) { - final Object item; - if ((item = p.item) != null) - p.tryMatch(item, null); + tryMatchData(p, p.item); if ((q = p.next) == null) q = p; if (c != q) tryCasSuccessor(pred, c, q); ancestor = pred; @@ -962,20 +987,20 @@ public class LinkedTransferQueue extends AbstractQueue // leave ancestor at original location to avoid overshoot; // better luck next time! - // assert lastRet.isMatched(); + // assert lastRet.matched(); } } /** A customized variant of Spliterators.IteratorSpliterator */ final class LTQSpliterator implements Spliterator { static final int MAX_BATCH = 1 << 25; // max batch array size; - Node current; // current node; null until initialized + DualNode current; // current node; null until initialized int batch; // batch size for splits boolean exhausted; // true when no more nodes LTQSpliterator() {} public Spliterator trySplit() { - Node p, q; + DualNode p, q; if ((p = current()) == null || (q = p.next) == null) return null; int i = 0, n = batch = Math.min(batch + 1, MAX_BATCH); @@ -1004,7 +1029,7 @@ public class LinkedTransferQueue extends AbstractQueue public void forEachRemaining(Consumer action) { Objects.requireNonNull(action); - final Node p; + final DualNode p; if ((p = current()) != null) { current = null; exhausted = true; @@ -1015,12 +1040,12 @@ public class LinkedTransferQueue extends AbstractQueue @SuppressWarnings("unchecked") public boolean tryAdvance(Consumer action) { Objects.requireNonNull(action); - Node p; + DualNode p; if ((p = current()) != null) { E e = null; do { - final Object item = p.item; - final boolean isData = p.isData; + boolean isData = p.isData; + Object item = p.item; if (p == (p = p.next)) p = head; if (isData) { @@ -1041,13 +1066,13 @@ public class LinkedTransferQueue extends AbstractQueue return false; } - private void setCurrent(Node p) { + private void setCurrent(DualNode p) { if ((current = p) == null) exhausted = true; } - private Node current() { - Node p; + private DualNode current() { + DualNode p; if ((p = current) == null && !exhausted) setCurrent(p = firstDataNode()); return p; @@ -1082,76 +1107,10 @@ public class LinkedTransferQueue extends AbstractQueue return new LTQSpliterator(); } - /* -------------- Removal methods -------------- */ - - /** - * Unsplices (now or later) the given deleted/cancelled node with - * the given predecessor. - * - * @param pred a node that was at one time known to be the - * predecessor of s - * @param s the node to be unspliced - */ - final void unsplice(Node pred, Node s) { - // assert pred != null; - // assert pred != s; - // assert s != null; - // assert s.isMatched(); - // assert (SWEEP_THRESHOLD & (SWEEP_THRESHOLD - 1)) == 0; - s.waiter = null; // disable signals - /* - * See above for rationale. Briefly: if pred still points to - * s, try to unlink s. If s cannot be unlinked, because it is - * trailing node or pred might be unlinked, and neither pred - * nor s are head or offlist, set needSweep; - */ - if (pred != null && pred.next == s) { - Node n = s.next; - if (n == null || - (n != s && pred.casNext(s, n) && pred.isMatched())) { - for (;;) { // check if at, or could be, head - Node h = head; - if (h == pred || h == s) - return; // at head or list empty - if (!h.isMatched()) - break; - Node hn = h.next; - if (hn == null) - return; // now empty - if (hn != h && casHead(h, hn)) - h.selfLink(); // advance head - } - if (pred.next != pred && s.next != s) - needSweep = true; - } - } - } - - /** - * Unlinks matched (typically cancelled) nodes encountered in a - * traversal from head. - */ - private void sweep() { - needSweep = false; - for (Node p = head, s, n; p != null && (s = p.next) != null; ) { - if (!s.isMatched()) - // Unmatched nodes are never self-linked - p = s; - else if ((n = s.next) == null) // trailing node is pinned - break; - else if (s == n) // stale - // No need to also check for p == s, since that implies s == n - p = head; - else - p.casNext(s, n); - } - } - /** * Creates an initially empty {@code LinkedTransferQueue}. */ public LinkedTransferQueue() { - head = tail = new Node(); } /** @@ -1164,16 +1123,15 @@ public class LinkedTransferQueue extends AbstractQueue * of its elements are null */ public LinkedTransferQueue(Collection c) { - Node h = null, t = null; + DualNode h = null, t = null; for (E e : c) { - Node newNode = new Node(Objects.requireNonNull(e)); - if (h == null) - h = t = newNode; + DualNode newNode = new DualNode(Objects.requireNonNull(e), true); + if (t == null) + h = newNode; else - t.appendRelaxed(t = newNode); + t.next = newNode; + t = newNode; } - if (h == null) - h = t = new Node(); head = h; tail = t; } @@ -1185,7 +1143,7 @@ public class LinkedTransferQueue extends AbstractQueue * @throws NullPointerException if the specified element is null */ public void put(E e) { - xfer(e, true, ASYNC, 0L); + offer(e); } /** @@ -1198,8 +1156,7 @@ public class LinkedTransferQueue extends AbstractQueue * @throws NullPointerException if the specified element is null */ public boolean offer(E e, long timeout, TimeUnit unit) { - xfer(e, true, ASYNC, 0L); - return true; + return offer(e); } /** @@ -1210,7 +1167,8 @@ public class LinkedTransferQueue extends AbstractQueue * @throws NullPointerException if the specified element is null */ public boolean offer(E e) { - xfer(e, true, ASYNC, 0L); + Objects.requireNonNull(e); + xfer(e, -1L); return true; } @@ -1223,8 +1181,7 @@ public class LinkedTransferQueue extends AbstractQueue * @throws NullPointerException if the specified element is null */ public boolean add(E e) { - xfer(e, true, ASYNC, 0L); - return true; + return offer(e); } /** @@ -1238,7 +1195,8 @@ public class LinkedTransferQueue extends AbstractQueue * @throws NullPointerException if the specified element is null */ public boolean tryTransfer(E e) { - return xfer(e, true, NOW, 0L) == null; + Objects.requireNonNull(e); + return xfer(e, 0L) == null; } /** @@ -1253,10 +1211,13 @@ public class LinkedTransferQueue extends AbstractQueue * @throws NullPointerException if the specified element is null */ public void transfer(E e) throws InterruptedException { - if (xfer(e, true, SYNC, 0L) != null) { + Objects.requireNonNull(e); + if (!Thread.interrupted()) { + if (xfer(e, Long.MAX_VALUE) == null) + return; Thread.interrupted(); // failure possible only due to interrupt - throw new InterruptedException(); } + throw new InterruptedException(); } /** @@ -1275,30 +1236,38 @@ public class LinkedTransferQueue extends AbstractQueue */ public boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException { - if (xfer(e, true, TIMED, unit.toNanos(timeout)) == null) + Objects.requireNonNull(e); + long nanos = Math.max(unit.toNanos(timeout), 0L); + if (xfer(e, nanos) == null) return true; if (!Thread.interrupted()) return false; throw new InterruptedException(); } + @SuppressWarnings("unchecked") public E take() throws InterruptedException { - E e = xfer(null, false, SYNC, 0L); - if (e != null) - return e; - Thread.interrupted(); + Object e; + if (!Thread.interrupted()) { + if ((e = xfer(null, Long.MAX_VALUE)) != null) + return (E) e; + Thread.interrupted(); + } throw new InterruptedException(); } + @SuppressWarnings("unchecked") public E poll(long timeout, TimeUnit unit) throws InterruptedException { - E e = xfer(null, false, TIMED, unit.toNanos(timeout)); - if (e != null || !Thread.interrupted()) - return e; + Object e; + long nanos = Math.max(unit.toNanos(timeout), 0L); + if ((e = xfer(null, nanos)) != null || !Thread.interrupted()) + return (E) e; throw new InterruptedException(); } + @SuppressWarnings("unchecked") public E poll() { - return xfer(null, false, NOW, 0L); + return (E) xfer(null, 0L); } /** @@ -1344,7 +1313,7 @@ public class LinkedTransferQueue extends AbstractQueue public E peek() { restartFromHead: for (;;) { - for (Node p = head; p != null;) { + for (DualNode p = head; p != null;) { Object item = p.item; if (p.isData) { if (item != null) { @@ -1372,7 +1341,7 @@ public class LinkedTransferQueue extends AbstractQueue public boolean hasWaitingConsumer() { restartFromHead: for (;;) { - for (Node p = head; p != null;) { + for (DualNode p = head; p != null;) { Object item = p.item; if (p.isData) { if (item != null) @@ -1421,22 +1390,23 @@ public class LinkedTransferQueue extends AbstractQueue public boolean remove(Object o) { if (o == null) return false; restartFromHead: for (;;) { - for (Node p = head, pred = null; p != null; ) { - Node q = p.next; - final Object item; - if ((item = p.item) != null) { - if (p.isData) { - if (o.equals(item) && p.tryMatch(item, null)) { + for (DualNode p = head, pred = null; p != null; ) { + boolean isData = p.isData; + Object item = p.item; + DualNode q = p.next; + if (item != null) { + if (isData) { + if (o.equals(item) && tryMatchData(p, item)) { skipDeadNodes(pred, p, p, q); return true; } pred = p; p = q; continue; } } - else if (!p.isData) + else if (!isData) break; - for (Node c = p;; q = p.next) { - if (q == null || !q.isMatched()) { + for (DualNode c = p;; q = p.next) { + if (q == null || !q.matched()) { pred = skipDeadNodes(pred, c, p, q); p = q; break; } if (p == (p = q)) continue restartFromHead; @@ -1457,20 +1427,21 @@ public class LinkedTransferQueue extends AbstractQueue public boolean contains(Object o) { if (o == null) return false; restartFromHead: for (;;) { - for (Node p = head, pred = null; p != null; ) { - Node q = p.next; - final Object item; - if ((item = p.item) != null) { - if (p.isData) { + for (DualNode p = head, pred = null; p != null; ) { + boolean isData = p.isData; + Object item = p.item; + DualNode q = p.next; + if (item != null) { + if (isData) { if (o.equals(item)) return true; pred = p; p = q; continue; } } - else if (!p.isData) + else if (!isData) break; - for (Node c = p;; q = p.next) { - if (q == null || !q.isMatched()) { + for (DualNode c = p;; q = p.next) { + if (q == null || !q.matched()) { pred = skipDeadNodes(pred, c, p, q); p = q; break; } if (p == (p = q)) continue restartFromHead; @@ -1519,16 +1490,15 @@ public class LinkedTransferQueue extends AbstractQueue throws java.io.IOException, ClassNotFoundException { // Read in elements until trailing null sentinel found - Node h = null, t = null; + DualNode h = null, t = null; for (Object item; (item = s.readObject()) != null; ) { - Node newNode = new Node(item); - if (h == null) - h = t = newNode; + DualNode newNode = new DualNode(item, true); + if (t == null) + h = newNode; else - t.appendRelaxed(t = newNode); + t.next = newNode; + t = newNode; } - if (h == null) - h = t = new Node(); head = h; tail = t; } @@ -1567,6 +1537,7 @@ public class LinkedTransferQueue extends AbstractQueue */ private static final int MAX_HOPS = 8; + /** Implementation of bulk remove methods. */ @SuppressWarnings("unchecked") private boolean bulkRemove(Predicate filter) { @@ -1575,24 +1546,24 @@ public class LinkedTransferQueue extends AbstractQueue int hops = MAX_HOPS; // c will be CASed to collapse intervening dead nodes between // pred (or head if null) and p. - for (Node p = head, c = p, pred = null, q; p != null; p = q) { + for (DualNode p = head, c = p, pred = null, q; p != null; p = q) { + boolean isData = p.isData, pAlive; + Object item = p.item; q = p.next; - final Object item; boolean pAlive; - if (pAlive = ((item = p.item) != null && p.isData)) { + if (pAlive = (item != null && isData)) { if (filter.test((E) item)) { - if (p.tryMatch(item, null)) + if (tryMatchData(p, item)) removed = true; pAlive = false; } } - else if (!p.isData && item == null) + else if (!isData && item == null) break; if (pAlive || q == null || --hops == 0) { // p might already be self-linked here, but if so: // - CASing head will surely fail // - CASing pred's next will be useless but harmless. - if ((c != p && !tryCasSuccessor(pred, c, c = p)) - || pAlive) { + if ((c != p && !tryCasSuccessor(pred, c, c = p)) || pAlive) { // if CAS failed or alive, abandon old pred hops = MAX_HOPS; pred = p; @@ -1610,20 +1581,21 @@ public class LinkedTransferQueue extends AbstractQueue * If p is null, the action is not run. */ @SuppressWarnings("unchecked") - void forEachFrom(Consumer action, Node p) { - for (Node pred = null; p != null; ) { - Node q = p.next; - final Object item; - if ((item = p.item) != null) { - if (p.isData) { + void forEachFrom(Consumer action, DualNode p) { + for (DualNode pred = null; p != null; ) { + boolean isData = p.isData; + Object item = p.item; + DualNode q = p.next; + if (item != null) { + if (isData) { action.accept((E) item); pred = p; p = q; continue; } } - else if (!p.isData) + else if (!isData) break; - for (Node c = p;; q = p.next) { - if (q == null || !q.isMatched()) { + for (DualNode c = p;; q = p.next) { + if (q == null || !q.matched()) { pred = skipDeadNodes(pred, c, p, q); p = q; break; } if (p == (p = q)) { pred = null; p = head; break; } @@ -1640,27 +1612,18 @@ public class LinkedTransferQueue extends AbstractQueue } // VarHandle mechanics - private static final VarHandle HEAD; - private static final VarHandle TAIL; - static final VarHandle ITEM; - static final VarHandle NEXT; - static final VarHandle WAITER; + static final VarHandle HEAD; + static final VarHandle TAIL; + static final VarHandle SWEEPVOTES; static { try { + Class ltq = LinkedTransferQueue.class, tn = DualNode.class; MethodHandles.Lookup l = MethodHandles.lookup(); - HEAD = l.findVarHandle(LinkedTransferQueue.class, "head", - Node.class); - TAIL = l.findVarHandle(LinkedTransferQueue.class, "tail", - Node.class); - ITEM = l.findVarHandle(Node.class, "item", Object.class); - NEXT = l.findVarHandle(Node.class, "next", Node.class); - WAITER = l.findVarHandle(Node.class, "waiter", Thread.class); + HEAD = l.findVarHandle(ltq, "head", tn); + TAIL = l.findVarHandle(ltq, "tail", tn); + SWEEPVOTES = l.findVarHandle(ltq, "sweepVotes", int.class); } catch (ReflectiveOperationException e) { throw new ExceptionInInitializerError(e); } - - // Reduce the risk of rare disastrous classloading in first call to - // LockSupport.park: https://bugs.openjdk.org/browse/JDK-8074773 - Class ensureLoaded = LockSupport.class; } } diff --git a/src/java.base/share/classes/java/util/concurrent/SynchronousQueue.java b/src/java.base/share/classes/java/util/concurrent/SynchronousQueue.java index c22d9b1d8ae..32f7840ef04 100644 --- a/src/java.base/share/classes/java/util/concurrent/SynchronousQueue.java +++ b/src/java.base/share/classes/java/util/concurrent/SynchronousQueue.java @@ -47,6 +47,9 @@ import java.util.Spliterator; import java.util.Spliterators; import java.util.concurrent.locks.LockSupport; import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.ForkJoinWorkerThread; +import java.util.concurrent.LinkedTransferQueue; +import java.util.concurrent.TransferQueue; /** * A {@linkplain BlockingQueue blocking queue} in which each insert @@ -98,717 +101,137 @@ public class SynchronousQueue extends AbstractQueue * M. L. Scott. 18th Annual Conf. on Distributed Computing, * Oct. 2004 (see also * http://www.cs.rochester.edu/u/scott/synchronization/pseudocode/duals.html). - * The (Lifo) stack is used for non-fair mode, and the (Fifo) - * queue for fair mode. The performance of the two is generally - * similar. Fifo usually supports higher throughput under - * contention but Lifo maintains higher thread locality in common - * applications. + * The queue is treated as a Lifo stack in non-fair mode, and a + * Fifo queue in fair mode. In most contexts, transfer performance + * is roughly comparable across them. Lifo is usually faster under + * low contention, but slower under high contention. Performance + * of applications using them also varies. Lifo is generally + * preferable in resource management settings (for example cached + * thread pools) because of better temporal locality, but + * inappropriate for message-passing applications. * - * A dual queue (and similarly stack) is one that at any given - * time either holds "data" -- items provided by put operations, - * or "requests" -- slots representing take operations, or is - * empty. A call to "fulfill" (i.e., a call requesting an item - * from a queue holding data or vice versa) dequeues a - * complementary node. The most interesting feature of these - * queues is that any operation can figure out which mode the - * queue is in, and act accordingly without needing locks. - * - * Both the queue and stack extend abstract class Transferer - * defining the single method transfer that does a put or a - * take. These are unified into a single method because in dual - * data structures, the put and take operations are symmetrical, - * so nearly all code can be combined. The resulting transfer - * methods are on the long side, but are easier to follow than - * they would be if broken up into nearly-duplicated parts. - * - * The queue and stack data structures share many conceptual - * similarities but very few concrete details. For simplicity, - * they are kept distinct so that they can later evolve - * separately. + * A dual queue is one that at any given time either holds "data" + * -- items provided by put operations, or "requests" -- slots + * representing take operations, or is empty. A fulfilling + * operation (i.e., a call requesting an item from a queue holding + * data or vice versa) "matches" the item of and then dequeues a + * complementary node. Any operation can figure out which mode + * the queue is in, and act accordingly without needing locks. So + * put and take operations are symmetrical, and all transfer + * methods invoke a single "xfer" method that does a put or a take + * in either fifo or lifo mode. * * The algorithms here differ from the versions in the above paper - * in extending them for use in synchronous queues, as well as - * dealing with cancellation. The main differences include: + * in ways including: * - * 1. The original algorithms used bit-marked pointers, but - * the ones here use mode bits in nodes, leading to a number - * of further adaptations. - * 2. SynchronousQueues must block threads waiting to become - * fulfilled. - * 3. Support for cancellation via timeout and interrupts, - * including cleaning out cancelled nodes/threads - * from lists to avoid garbage retention and memory depletion. - * - * Blocking is mainly accomplished using LockSupport park/unpark, - * except that nodes that appear to be the next ones to become - * fulfilled first spin a bit (on multiprocessors only). On very - * busy synchronous queues, spinning can dramatically improve - * throughput. And on less busy ones, the amount of spinning is - * small enough not to be noticeable. - * - * Cleaning is done in different ways in queues vs stacks. For - * queues, we can almost always remove a node immediately in O(1) - * time (modulo retries for consistency checks) when it is - * cancelled. But if it may be pinned as the current tail, it must - * wait until some subsequent cancellation. For stacks, we need a - * potentially O(n) traversal to be sure that we can remove the - * node, but this can run concurrently with other threads - * accessing the stack. - * - * While garbage collection takes care of most node reclamation - * issues that otherwise complicate nonblocking algorithms, care - * is taken to "forget" references to data, other nodes, and - * threads that might be held on to long-term by blocked - * threads. In cases where setting to null would otherwise - * conflict with main algorithms, this is done by changing a - * node's link to now point to the node itself. This doesn't arise - * much for Stack nodes (because blocked threads do not hang on to - * old head pointers), but references in Queue nodes must be - * aggressively forgotten to avoid reachability of everything any - * node has ever referred to since arrival. - * - * The above steps improve throughput when many threads produce - * and/or consume data. But they don't help much with - * single-source / single-sink usages in which one side or the - * other is always transiently blocked, and so throughput is - * mainly a function of thread scheduling. This is not usually - * noticeably improved with bounded short spin-waits. Instead both - * forms of transfer try Thread.yield if apparently the sole - * waiter. This works well when there are more tasks that cores, - * which is expected to be the main usage context of this mode. In - * other cases, waiters may help with some bookkeeping, then - * park/unpark. + * * The original algorithms used bit-marked pointers, but the + * ones here use a bit (isData) in nodes, and usually avoid + * creating nodes when fulfilling. They also use the + * compareAndExchange form of CAS for pointer updates to + * reduce memory traffic. + * * Fifo mode is based on LinkedTransferQueue operations, but + * Lifo mode support is added in subclass Transferer. + * * The Fifo version accommodates lazy updates and slack as + * described in LinkedTransferQueue internal documentation. + * * Threads may block when waiting to become fulfilled, + * sometimes preceded by brief spins. + * * Support for cancellation via timeout and interrupts, + * including cleaning out cancelled nodes/threads from lists + * to avoid garbage retention and memory depletion. */ /** - * Shared internal API for dual stacks and queues. + * Extension of LinkedTransferQueue to support Lifo (stack) mode. + * Methods use the "head" field as head (top) of stack (versus + * queue). Note that popped nodes are not self-linked because they + * are not prone to unbounded garbage chains. Also note that + * "async" mode is never used and not supported for synchronous + * transfers. */ - abstract static class Transferer { + @SuppressWarnings("serial") // never serialized + static final class Transferer extends LinkedTransferQueue { + /** - * Performs a put or take. + * Puts or takes an item with lifo ordering. Loops trying: + * * If top (var p) exists and is already matched, pop and continue + * * If top has complementary type, try to fulfill by CASing item, + * On success pop (which will succeed unless already helped), + * otherwise restart. + * * If no possible match, unless immediate mode, push a + * node and wait, later unsplicing if cancelled. * - * @param e if non-null, the item to be handed to a consumer; - * if null, requests that transfer return an item - * offered by producer. - * @param timed if this operation should timeout - * @param nanos the timeout, in nanoseconds - * @return if non-null, the item provided or received; if null, - * the operation failed due to timeout or interrupt -- - * the caller can distinguish which of these occurred - * by checking Thread.interrupted. + * @param e the item or null for take + * @param ns timeout or 0 if immediate, Long.MAX_VALUE if untimed + * @return an item if matched, else e */ - abstract E transfer(E e, boolean timed, long nanos); - } - - /** - * The number of nanoseconds for which it is faster to spin - * rather than to use timed park. A rough estimate suffices. - */ - static final long SPIN_FOR_TIMEOUT_THRESHOLD = 1023L; - - /** Dual stack */ - static final class TransferStack extends Transferer { - /* - * This extends Scherer-Scott dual stack algorithm, differing, - * among other ways, by using "covering" nodes rather than - * bit-marked pointers: Fulfilling operations push on marker - * nodes (with FULFILLING bit set in mode) to reserve a spot - * to match a waiting node. - */ - - /* Modes for SNodes, ORed together in node fields */ - /** Node represents an unfulfilled consumer */ - static final int REQUEST = 0; - /** Node represents an unfulfilled producer */ - static final int DATA = 1; - /** Node is fulfilling another unfulfilled DATA or REQUEST */ - static final int FULFILLING = 2; - - /** Returns true if m has fulfilling bit set. */ - static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; } - - /** Node class for TransferStacks. */ - static final class SNode implements ForkJoinPool.ManagedBlocker { - volatile SNode next; // next node in stack - volatile SNode match; // the node matched to this - volatile Thread waiter; // to control park/unpark - Object item; // data; or null for REQUESTs - int mode; - // Note: item and mode fields don't need to be volatile - // since they are always written before, and read after, - // other volatile/atomic operations. - - SNode(Object item) { - this.item = item; - } - - boolean casNext(SNode cmp, SNode val) { - return cmp == next && - SNEXT.compareAndSet(this, cmp, val); - } - - /** - * Tries to match node s to this node, if so, waking up thread. - * Fulfillers call tryMatch to identify their waiters. - * Waiters block until they have been matched. - * - * @param s the node to match - * @return true if successfully matched to s - */ - boolean tryMatch(SNode s) { - SNode m; Thread w; - if ((m = match) == null) { - if (SMATCH.compareAndSet(this, null, s)) { - if ((w = waiter) != null) - LockSupport.unpark(w); - return true; - } - else - m = match; - } - return m == s; - } - - /** - * Tries to cancel a wait by matching node to itself. - */ - boolean tryCancel() { - return SMATCH.compareAndSet(this, null, this); - } - - boolean isCancelled() { - return match == this; - } - - public final boolean isReleasable() { - return match != null || Thread.currentThread().isInterrupted(); - } - - public final boolean block() { - while (!isReleasable()) LockSupport.park(); - return true; - } - - void forgetWaiter() { - SWAITER.setOpaque(this, null); - } - - // VarHandle mechanics - private static final VarHandle SMATCH; - private static final VarHandle SNEXT; - private static final VarHandle SWAITER; - static { - try { - MethodHandles.Lookup l = MethodHandles.lookup(); - SMATCH = l.findVarHandle(SNode.class, "match", SNode.class); - SNEXT = l.findVarHandle(SNode.class, "next", SNode.class); - SWAITER = l.findVarHandle(SNode.class, "waiter", Thread.class); - } catch (ReflectiveOperationException e) { - throw new ExceptionInInitializerError(e); - } - } - } - - /** The head (top) of the stack */ - volatile SNode head; - - boolean casHead(SNode h, SNode nh) { - return h == head && - SHEAD.compareAndSet(this, h, nh); - } - - /** - * Creates or resets fields of a node. Called only from transfer - * where the node to push on stack is lazily created and - * reused when possible to help reduce intervals between reads - * and CASes of head and to avoid surges of garbage when CASes - * to push nodes fail due to contention. - */ - static SNode snode(SNode s, Object e, SNode next, int mode) { - if (s == null) s = new SNode(e); - s.mode = mode; - s.next = next; - return s; - } - - /** - * Puts or takes an item. - */ - @SuppressWarnings("unchecked") - E transfer(E e, boolean timed, long nanos) { - /* - * Basic algorithm is to loop trying one of three actions: - * - * 1. If apparently empty or already containing nodes of same - * mode, try to push node on stack and wait for a match, - * returning it, or null if cancelled. - * - * 2. If apparently containing node of complementary mode, - * try to push a fulfilling node on to stack, match - * with corresponding waiting node, pop both from - * stack, and return matched item. The matching or - * unlinking might not actually be necessary because of - * other threads performing action 3: - * - * 3. If top of stack already holds another fulfilling node, - * help it out by doing its match and/or pop - * operations, and then continue. The code for helping - * is essentially the same as for fulfilling, except - * that it doesn't return the item. - */ - - SNode s = null; // constructed/reused as needed - int mode = (e == null) ? REQUEST : DATA; - - for (;;) { - SNode h = head; - if (h == null || h.mode == mode) { // empty or same-mode - if (timed && nanos <= 0L) { // can't wait - if (h != null && h.isCancelled()) - casHead(h, h.next); // pop cancelled node - else - return null; - } else if (casHead(h, s = snode(s, e, h, mode))) { - long deadline = timed ? System.nanoTime() + nanos : 0L; - Thread w = Thread.currentThread(); - int stat = -1; // -1: may yield, +1: park, else 0 - SNode m; // await fulfill or cancel - while ((m = s.match) == null) { - if ((timed && - (nanos = deadline - System.nanoTime()) <= 0) || - w.isInterrupted()) { - if (s.tryCancel()) { - clean(s); // wait cancelled - return null; - } - } else if ((m = s.match) != null) { - break; // recheck - } else if (stat <= 0) { - if (stat < 0 && h == null && head == s) { - stat = 0; // yield once if was empty - Thread.yield(); - } else { - stat = 1; - s.waiter = w; // enable signal - } - } else if (!timed) { - LockSupport.setCurrentBlocker(this); - try { - ForkJoinPool.managedBlock(s); - } catch (InterruptedException cannotHappen) { } - LockSupport.setCurrentBlocker(null); - } else if (nanos > SPIN_FOR_TIMEOUT_THRESHOLD) - LockSupport.parkNanos(this, nanos); - } - if (stat == 1) - s.forgetWaiter(); - Object result = (mode == REQUEST) ? m.item : s.item; - if (h != null && h.next == s) - casHead(h, s.next); // help fulfiller - return (E) result; - } - } else if (!isFulfilling(h.mode)) { // try to fulfill - if (h.isCancelled()) // already cancelled - casHead(h, h.next); // pop and retry - else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) { - for (;;) { // loop until matched or waiters disappear - SNode m = s.next; // m is s's match - if (m == null) { // all waiters are gone - casHead(s, null); // pop fulfill node - s = null; // use new node next time - break; // restart main loop - } - SNode mn = m.next; - if (m.tryMatch(s)) { - casHead(s, mn); // pop both s and m - return (E) ((mode == REQUEST) ? m.item : s.item); - } else // lost match - s.casNext(m, mn); // help unlink - } - } - } else { // help a fulfiller - SNode m = h.next; // m is h's match - if (m == null) // waiter is gone - casHead(h, null); // pop fulfilling node - else { - SNode mn = m.next; - if (m.tryMatch(h)) // help match - casHead(h, mn); // pop both h and m - else // lost match - h.casNext(m, mn); // help unlink + final Object xferLifo(Object e, long ns) { + boolean haveData = (e != null); + Object m; // the match or e if none + outer: for (DualNode s = null, p = head;;) { + while (p != null) { + boolean isData; DualNode n, u; // help collapse + if ((isData = p.isData) != ((m = p.item) != null)) + p = (p == (u = cmpExHead(p, (n = p.next)))) ? n : u; + else if (isData == haveData) // same mode; push below + break; + else if (p.cmpExItem(m, e) != m) + p = head; // missed; restart + else { // matched complementary node + Thread w = p.waiter; + cmpExHead(p, p.next); + LockSupport.unpark(w); + break outer; } } - } - } - - /** - * Unlinks s from the stack. - */ - void clean(SNode s) { - s.item = null; // forget item - s.forgetWaiter(); - - /* - * At worst we may need to traverse entire stack to unlink - * s. If there are multiple concurrent calls to clean, we - * might not see s if another thread has already removed - * it. But we can stop when we see any node known to - * follow s. We use s.next unless it too is cancelled, in - * which case we try the node one past. We don't check any - * further because we don't want to doubly traverse just to - * find sentinel. - */ - - SNode past = s.next; - if (past != null && past.isCancelled()) - past = past.next; - - // Absorb cancelled nodes at head - SNode p; - while ((p = head) != null && p != past && p.isCancelled()) - casHead(p, p.next); - - // Unsplice embedded nodes - while (p != null && p != past) { - SNode n = p.next; - if (n != null && n.isCancelled()) - p.casNext(n, n.next); - else - p = n; - } - } - - // VarHandle mechanics - private static final VarHandle SHEAD; - static { - try { - MethodHandles.Lookup l = MethodHandles.lookup(); - SHEAD = l.findVarHandle(TransferStack.class, "head", SNode.class); - } catch (ReflectiveOperationException e) { - throw new ExceptionInInitializerError(e); - } - } - } - - /** Dual Queue */ - static final class TransferQueue extends Transferer { - /* - * This extends Scherer-Scott dual queue algorithm, differing, - * among other ways, by using modes within nodes rather than - * marked pointers. The algorithm is a little simpler than - * that for stacks because fulfillers do not need explicit - * nodes, and matching is done by CAS'ing QNode.item field - * from non-null to null (for put) or vice versa (for take). - */ - - /** Node class for TransferQueue. */ - static final class QNode implements ForkJoinPool.ManagedBlocker { - volatile QNode next; // next node in queue - volatile Object item; // CAS'ed to or from null - volatile Thread waiter; // to control park/unpark - final boolean isData; - - QNode(Object item, boolean isData) { - this.item = item; - this.isData = isData; - } - - boolean casNext(QNode cmp, QNode val) { - return next == cmp && - QNEXT.compareAndSet(this, cmp, val); - } - - boolean casItem(Object cmp, Object val) { - return item == cmp && - QITEM.compareAndSet(this, cmp, val); - } - - /** - * Tries to cancel by CAS'ing ref to this as item. - */ - boolean tryCancel(Object cmp) { - return QITEM.compareAndSet(this, cmp, this); - } - - boolean isCancelled() { - return item == this; - } - - /** - * Returns true if this node is known to be off the queue - * because its next pointer has been forgotten due to - * an advanceHead operation. - */ - boolean isOffList() { - return next == this; - } - - void forgetWaiter() { - QWAITER.setOpaque(this, null); - } - - boolean isFulfilled() { - Object x; - return isData == ((x = item) == null) || x == this; - } - - public final boolean isReleasable() { - Object x; - return isData == ((x = item) == null) || x == this || - Thread.currentThread().isInterrupted(); - } - - public final boolean block() { - while (!isReleasable()) LockSupport.park(); - return true; - } - - // VarHandle mechanics - private static final VarHandle QITEM; - private static final VarHandle QNEXT; - private static final VarHandle QWAITER; - static { - try { - MethodHandles.Lookup l = MethodHandles.lookup(); - QITEM = l.findVarHandle(QNode.class, "item", Object.class); - QNEXT = l.findVarHandle(QNode.class, "next", QNode.class); - QWAITER = l.findVarHandle(QNode.class, "waiter", Thread.class); - } catch (ReflectiveOperationException e) { - throw new ExceptionInInitializerError(e); + if (ns == 0L) { // no match, no wait + m = e; + break; + } + if (s == null) // try to push node and wait + s = new DualNode(e, haveData); + s.next = p; + if (p == (p = cmpExHead(p, s))) { + if ((m = s.await(e, ns, this, // spin if (nearly) empty + p == null || p.waiter == null)) == e) + unspliceLifo(s); // cancelled + break; } } - } - - /** Head of queue */ - transient volatile QNode head; - /** Tail of queue */ - transient volatile QNode tail; - /** - * Reference to a cancelled node that might not yet have been - * unlinked from queue because it was the last inserted node - * when it was cancelled. - */ - transient volatile QNode cleanMe; - - TransferQueue() { - QNode h = new QNode(null, false); // initialize to dummy node. - head = h; - tail = h; + return m; } /** - * Tries to cas nh as new head; if successful, unlink - * old head's next node to avoid garbage retention. + * Unlinks node s. Same idea as Fifo version. */ - void advanceHead(QNode h, QNode nh) { - if (h == head && - QHEAD.compareAndSet(this, h, nh)) - h.next = h; // forget old next - } - - /** - * Tries to cas nt as new tail. - */ - void advanceTail(QNode t, QNode nt) { - if (tail == t) - QTAIL.compareAndSet(this, t, nt); - } - - /** - * Tries to CAS cleanMe slot. - */ - boolean casCleanMe(QNode cmp, QNode val) { - return cleanMe == cmp && - QCLEANME.compareAndSet(this, cmp, val); - } - - /** - * Puts or takes an item. - */ - @SuppressWarnings("unchecked") - E transfer(E e, boolean timed, long nanos) { - /* Basic algorithm is to loop trying to take either of - * two actions: - * - * 1. If queue apparently empty or holding same-mode nodes, - * try to add node to queue of waiters, wait to be - * fulfilled (or cancelled) and return matching item. - * - * 2. If queue apparently contains waiting items, and this - * call is of complementary mode, try to fulfill by CAS'ing - * item field of waiting node and dequeuing it, and then - * returning matching item. - * - * In each case, along the way, check for and try to help - * advance head and tail on behalf of other stalled/slow - * threads. - * - * The loop starts off with a null check guarding against - * seeing uninitialized head or tail values. This never - * happens in current SynchronousQueue, but could if - * callers held non-volatile/final ref to the - * transferer. The check is here anyway because it places - * null checks at top of loop, which is usually faster - * than having them implicitly interspersed. - */ - - QNode s = null; // constructed/reused as needed - boolean isData = (e != null); - for (;;) { - QNode t = tail, h = head, m, tn; // m is node to fulfill - if (t == null || h == null) - ; // inconsistent - else if (h == t || t.isData == isData) { // empty or same-mode - if (t != tail) // inconsistent - ; - else if ((tn = t.next) != null) // lagging tail - advanceTail(t, tn); - else if (timed && nanos <= 0L) // can't wait - return null; - else if (t.casNext(null, (s != null) ? s : - (s = new QNode(e, isData)))) { - advanceTail(t, s); - long deadline = timed ? System.nanoTime() + nanos : 0L; - Thread w = Thread.currentThread(); - int stat = -1; // same idea as TransferStack - Object item; - while ((item = s.item) == e) { - if ((timed && - (nanos = deadline - System.nanoTime()) <= 0) || - w.isInterrupted()) { - if (s.tryCancel(e)) { - clean(t, s); - return null; - } - } else if ((item = s.item) != e) { - break; // recheck - } else if (stat <= 0) { - if (t.next == s) { - if (stat < 0 && t.isFulfilled()) { - stat = 0; // yield once if first - Thread.yield(); - } - else { - stat = 1; - s.waiter = w; - } - } - } else if (!timed) { - LockSupport.setCurrentBlocker(this); - try { - ForkJoinPool.managedBlock(s); - } catch (InterruptedException cannotHappen) { } - LockSupport.setCurrentBlocker(null); - } - else if (nanos > SPIN_FOR_TIMEOUT_THRESHOLD) - LockSupport.parkNanos(this, nanos); - } - if (stat == 1) - s.forgetWaiter(); - if (!s.isOffList()) { // not already unlinked - advanceHead(t, s); // unlink if head - if (item != null) // and forget fields - s.item = s; - } - return (item != null) ? (E)item : e; - } - - } else if ((m = h.next) != null && t == tail && h == head) { - Thread waiter; - Object x = m.item; - boolean fulfilled = ((isData == (x == null)) && - x != m && m.casItem(x, e)); - advanceHead(h, m); // (help) dequeue - if (fulfilled) { - if ((waiter = m.waiter) != null) - LockSupport.unpark(waiter); - return (x != null) ? (E)x : e; - } - } + private void unspliceLifo(DualNode s) { + boolean seen = false; // try removing by collapsing head + DualNode p = head; + for (DualNode f, u; p != null && p.matched();) { + if (p == s) + seen = true; + p = (p == (u = cmpExHead(p, (f = p.next)))) ? f : u; } - } - - /** - * Gets rid of cancelled node s with original predecessor pred. - */ - void clean(QNode pred, QNode s) { - s.forgetWaiter(); - /* - * At any given time, exactly one node on list cannot be - * deleted -- the last inserted node. To accommodate this, - * if we cannot delete s, we save its predecessor as - * "cleanMe", deleting the previously saved version - * first. At least one of node s or the node previously - * saved can always be deleted, so this always terminates. - */ - while (pred.next == s) { // Return early if already unlinked - QNode h = head; - QNode hn = h.next; // Absorb cancelled first node as head - if (hn != null && hn.isCancelled()) { - advanceHead(h, hn); - continue; + if (p != null && !seen && sweepNow()) { // occasionally sweep + for (DualNode f, n, u; p != null && (f = p.next) != null; ) { + p = (!f.matched() ? f : + f == (u = p.cmpExNext(f, n = f.next)) ? n : u); } - QNode t = tail; // Ensure consistent read for tail - if (t == h) - return; - QNode tn = t.next; - if (t != tail) - continue; - if (tn != null) { - advanceTail(t, tn); - continue; - } - if (s != t) { // If not tail, try to unsplice - QNode sn = s.next; - if (sn == s || pred.casNext(s, sn)) - return; - } - QNode dp = cleanMe; - if (dp != null) { // Try unlinking previous cancelled node - QNode d = dp.next; - QNode dn; - if (d == null || // d is gone or - d == dp || // d is off list or - !d.isCancelled() || // d not cancelled or - (d != t && // d not tail and - (dn = d.next) != null && // has successor - dn != d && // that is on list - dp.casNext(d, dn))) // d unspliced - casCleanMe(dp, null); - if (dp == pred) - return; // s is already saved node - } else if (casCleanMe(null, pred)) - return; // Postpone cleaning s - } - } - - // VarHandle mechanics - private static final VarHandle QHEAD; - private static final VarHandle QTAIL; - private static final VarHandle QCLEANME; - static { - try { - MethodHandles.Lookup l = MethodHandles.lookup(); - QHEAD = l.findVarHandle(TransferQueue.class, "head", - QNode.class); - QTAIL = l.findVarHandle(TransferQueue.class, "tail", - QNode.class); - QCLEANME = l.findVarHandle(TransferQueue.class, "cleanMe", - QNode.class); - } catch (ReflectiveOperationException e) { - throw new ExceptionInInitializerError(e); } } } /** - * The transferer. Set only in constructor, but cannot be declared - * as final without further complicating serialization. Since - * this is accessed only at most once per public method, there - * isn't a noticeable performance penalty for using volatile - * instead of final here. + * The transferer. (See below about serialization.) */ - private transient volatile Transferer transferer; + private final transient Transferer transferer; + + private final transient boolean fair; + + /** Invokes fair or lifo transfer */ + private Object xfer(Object e, long nanos) { + Transferer x = transferer; + return (fair) ? x.xfer(e, nanos) : x.xferLifo(e, nanos); + } /** * Creates a {@code SynchronousQueue} with nonfair access policy. @@ -824,7 +247,8 @@ public class SynchronousQueue extends AbstractQueue * access; otherwise the order is unspecified. */ public SynchronousQueue(boolean fair) { - transferer = fair ? new TransferQueue() : new TransferStack(); + this.fair = fair; + transferer = new Transferer(); } /** @@ -835,11 +259,13 @@ public class SynchronousQueue extends AbstractQueue * @throws NullPointerException {@inheritDoc} */ public void put(E e) throws InterruptedException { - if (e == null) throw new NullPointerException(); - if (transferer.transfer(e, false, 0) == null) { - Thread.interrupted(); - throw new InterruptedException(); + Objects.requireNonNull(e); + if (!Thread.interrupted()) { + if (xfer(e, Long.MAX_VALUE) == null) + return; + Thread.interrupted(); // failure possible only due to interrupt } + throw new InterruptedException(); } /** @@ -853,8 +279,9 @@ public class SynchronousQueue extends AbstractQueue */ public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { - if (e == null) throw new NullPointerException(); - if (transferer.transfer(e, true, unit.toNanos(timeout)) != null) + Objects.requireNonNull(e); + long nanos = Math.max(unit.toNanos(timeout), 0L); + if (xfer(e, nanos) == null) return true; if (!Thread.interrupted()) return false; @@ -871,8 +298,8 @@ public class SynchronousQueue extends AbstractQueue * @throws NullPointerException if the specified element is null */ public boolean offer(E e) { - if (e == null) throw new NullPointerException(); - return transferer.transfer(e, true, 0) != null; + Objects.requireNonNull(e); + return xfer(e, 0L) == null; } /** @@ -882,11 +309,14 @@ public class SynchronousQueue extends AbstractQueue * @return the head of this queue * @throws InterruptedException {@inheritDoc} */ + @SuppressWarnings("unchecked") public E take() throws InterruptedException { - E e = transferer.transfer(null, false, 0); - if (e != null) - return e; - Thread.interrupted(); + Object e; + if (!Thread.interrupted()) { + if ((e = xfer(null, Long.MAX_VALUE)) != null) + return (E) e; + Thread.interrupted(); + } throw new InterruptedException(); } @@ -899,10 +329,12 @@ public class SynchronousQueue extends AbstractQueue * specified waiting time elapses before an element is present * @throws InterruptedException {@inheritDoc} */ + @SuppressWarnings("unchecked") public E poll(long timeout, TimeUnit unit) throws InterruptedException { - E e = transferer.transfer(null, true, unit.toNanos(timeout)); - if (e != null || !Thread.interrupted()) - return e; + Object e; + long nanos = Math.max(unit.toNanos(timeout), 0L); + if ((e = xfer(null, nanos)) != null || !Thread.interrupted()) + return (E) e; throw new InterruptedException(); } @@ -913,8 +345,9 @@ public class SynchronousQueue extends AbstractQueue * @return the head of this queue, or {@code null} if no * element is available */ + @SuppressWarnings("unchecked") public E poll() { - return transferer.transfer(null, true, 0); + return (E) xfer(null, 0L); } /** @@ -1104,11 +537,13 @@ public class SynchronousQueue extends AbstractQueue } /* - * To cope with serialization strategy in the 1.5 version of - * SynchronousQueue, we declare some unused classes and fields - * that exist solely to enable serializability across versions. - * These fields are never used, so are initialized only if this - * object is ever serialized or deserialized. + * To cope with serialization across multiple implementation + * overhauls, we declare some unused classes and fields that exist + * solely to enable serializability across versions. These fields + * are never used, so are initialized only if this object is ever + * serialized. We use readResolve to replace a deserialized queue + * with a fresh one. Note that no queue elements are serialized, + * since any existing ones are only transient. */ @SuppressWarnings("serial") @@ -1130,7 +565,6 @@ public class SynchronousQueue extends AbstractQueue */ private void writeObject(java.io.ObjectOutputStream s) throws java.io.IOException { - boolean fair = transferer instanceof TransferQueue; if (fair) { qlock = new ReentrantLock(true); waitingProducers = new FifoWaitQueue(); @@ -1145,24 +579,10 @@ public class SynchronousQueue extends AbstractQueue } /** - * Reconstitutes this queue from a stream (that is, deserializes it). - * @param s the stream - * @throws ClassNotFoundException if the class of a serialized object - * could not be found - * @throws java.io.IOException if an I/O error occurs + * Replaces a deserialized SynchronousQueue with a fresh one with + * the associated fairness */ - private void readObject(java.io.ObjectInputStream s) - throws java.io.IOException, ClassNotFoundException { - s.defaultReadObject(); - if (waitingProducers instanceof FifoWaitQueue) - transferer = new TransferQueue(); - else - transferer = new TransferStack(); - } - - static { - // Reduce the risk of rare disastrous classloading in first call to - // LockSupport.park: https://bugs.openjdk.org/browse/JDK-8074773 - Class ensureLoaded = LockSupport.class; + private Object readResolve() { + return new SynchronousQueue(waitingProducers instanceof FifoWaitQueue); } } diff --git a/test/jdk/java/util/concurrent/LinkedTransferQueue/WhiteBox.java b/test/jdk/java/util/concurrent/LinkedTransferQueue/WhiteBox.java index df32846f58d..eaf012cbf1d 100644 --- a/test/jdk/java/util/concurrent/LinkedTransferQueue/WhiteBox.java +++ b/test/jdk/java/util/concurrent/LinkedTransferQueue/WhiteBox.java @@ -59,18 +59,24 @@ import java.util.function.Consumer; @Test public class WhiteBox { + final ThreadLocalRandom rnd = ThreadLocalRandom.current(); final VarHandle HEAD, TAIL, ITEM, NEXT; - public WhiteBox() throws ReflectiveOperationException { - Class qClass = LinkedTransferQueue.class; - Class nodeClass = Class.forName(qClass.getName() + "$Node"); - MethodHandles.Lookup lookup - = MethodHandles.privateLookupIn(qClass, MethodHandles.lookup()); - HEAD = lookup.findVarHandle(qClass, "head", nodeClass); - TAIL = lookup.findVarHandle(qClass, "tail", nodeClass); - NEXT = lookup.findVarHandle(nodeClass, "next", nodeClass); - ITEM = lookup.findVarHandle(nodeClass, "item", Object.class); + public WhiteBox() throws Throwable { // throws ReflectiveOperationException { + try { + Class qClass = LinkedTransferQueue.class; + Class nodeClass = Class.forName(qClass.getName() + "$DualNode"); + MethodHandles.Lookup lookup + = MethodHandles.privateLookupIn(qClass, MethodHandles.lookup()); + HEAD = lookup.findVarHandle(qClass, "head", nodeClass); + TAIL = lookup.findVarHandle(qClass, "tail", nodeClass); + NEXT = lookup.findVarHandle(nodeClass, "next", nodeClass); + ITEM = lookup.findVarHandle(nodeClass, "item", Object.class); + } catch (Throwable ex) { + ex.printStackTrace(); + throw ex; + } } Object head(LinkedTransferQueue q) { return HEAD.getVolatile(q); } @@ -78,6 +84,16 @@ public class WhiteBox { Object item(Object node) { return ITEM.getVolatile(node); } Object next(Object node) { return NEXT.getVolatile(node); } + /* + * Modified for jdk22: Accommodate lazy initialization, so counts + * may vary by 1, and some nodes become headers vs unlinked, + * compared to previous versions. + */ + + static void checkCount(int val, int expect) { + assertTrue(val == expect || val == expect - 1); + } + int nodeCount(LinkedTransferQueue q) { int i = 0; for (Object p = head(q); p != null; ) { @@ -124,13 +140,15 @@ public class WhiteBox { public void addRemove() { LinkedTransferQueue q = new LinkedTransferQueue(); assertInvariants(q); - assertNull(next(head(q))); - assertNull(item(head(q))); + if (head(q) != null) { + assertNull(next(head(q))); + assertNull(item(head(q))); + } q.add(1); - assertEquals(nodeCount(q), 2); + checkCount(nodeCount(q), 2); assertInvariants(q); q.remove(1); - assertEquals(nodeCount(q), 1); + checkCount(nodeCount(q), 1); assertInvariants(q); } @@ -158,12 +176,12 @@ public class WhiteBox { Object oldHead; int n = 1 + rnd.nextInt(5); for (int i = 0; i < n; i++) q.add(i); - assertEquals(nodeCount(q), n + 1); + checkCount(nodeCount(q), n + 1); oldHead = head(q); traversalAction.accept(q); assertInvariants(q); - assertEquals(nodeCount(q), n); - assertIsSelfLinked(oldHead); + checkCount(nodeCount(q), n); + // assertIsSelfLinked(oldHead); } @Test(dataProvider = "traversalActions") @@ -204,7 +222,7 @@ public class WhiteBox { int c = nodeCount(q); traversalAction.accept(q); - assertEquals(nodeCount(q), c - 1); + checkCount(nodeCount(q), c - 1); assertSame(next(p0), p4); assertSame(next(p1), p4); @@ -217,7 +235,7 @@ public class WhiteBox { traversalAction.accept(q); assertSame(next(p4), p5); assertNull(next(p5)); - assertEquals(nodeCount(q), c - 1); + checkCount(nodeCount(q), c - 1); } /** @@ -228,7 +246,7 @@ public class WhiteBox { public void traversalOperationsCollapseRandomNodes( Consumer traversalAction) { LinkedTransferQueue q = new LinkedTransferQueue(); - int n = rnd.nextInt(6); + int n = 1 + rnd.nextInt(6); for (int i = 0; i < n; i++) q.add(i); ArrayList nulledOut = new ArrayList(); for (Object p = head(q); p != null; p = next(p)) @@ -238,7 +256,7 @@ public class WhiteBox { } traversalAction.accept(q); int c = nodeCount(q); - assertEquals(q.size(), c - (q.contains(n - 1) ? 0 : 1)); + checkCount(c - (q.contains(n - 1) ? 0 : 1), q.size() + 1); for (int i = 0; i < n; i++) assertTrue(nulledOut.contains(i) ^ q.contains(i)); } @@ -263,7 +281,7 @@ public class WhiteBox { int n = 1 + rnd.nextInt(5); for (int i = 0; i < n; i++) q.add(i); bulkRemovalAction.accept(q); - assertEquals(nodeCount(q), 1); + checkCount(nodeCount(q), 1); assertInvariants(q); } @@ -289,13 +307,13 @@ public class WhiteBox { LinkedTransferQueue q = new LinkedTransferQueue(); int n = 1 + rnd.nextInt(5); for (int i = 0; i < n; i++) q.add(i); - assertEquals(nodeCount(q), n + 1); + checkCount(nodeCount(q), n + 1); for (int i = 0; i < n; i++) { int c = nodeCount(q); boolean slack = item(head(q)) == null; if (slack) assertNotNull(item(next(head(q)))); pollAction.accept(q); - assertEquals(nodeCount(q), q.isEmpty() ? 1 : c - (slack ? 2 : 0)); + checkCount(nodeCount(q), q.isEmpty() ? 1 : c - (slack ? 2 : 0)); } assertInvariants(q); } @@ -318,11 +336,12 @@ public class WhiteBox { LinkedTransferQueue q = new LinkedTransferQueue(); int n = 1 + rnd.nextInt(9); for (int i = 0; i < n; i++) { - boolean slack = next(tail(q)) != null; + boolean empty = (tail(q) == null); + boolean slack = !empty && (next(tail(q)) != null); addAction.accept(q); if (slack) assertNull(next(tail(q))); - else { + else if (!empty) { assertNotNull(next(tail(q))); assertNull(next(next(tail(q)))); } @@ -365,10 +384,9 @@ public class WhiteBox { /** Checks conditions which should always be true. */ void assertInvariants(LinkedTransferQueue q) { - assertNotNull(head(q)); - assertNotNull(tail(q)); // head is never self-linked (but tail may!) - for (Object h; next(h = head(q)) == h; ) - assertNotSame(h, head(q)); // must be update race + Object h; + if ((h = head(q)) != null) + assertNotSame(h, next(h)); } }