From 1ba9fc6efdd23ae78a8a8bb0fba5ea21c4f07319 Mon Sep 17 00:00:00 2001 From: Doug Lea Date: Wed, 11 Mar 2026 08:54:49 -0400 Subject: [PATCH] Reduce reliance on noUserHelp --- .../java/util/concurrent/ForkJoinPool.java | 894 ++++++++++-------- .../java/util/concurrent/ForkJoinTask.java | 11 +- 2 files changed, 482 insertions(+), 423 deletions(-) diff --git a/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java b/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java index 1cac9c4c2f3..aa173b9f364 100644 --- a/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java +++ b/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java @@ -51,7 +51,6 @@ import jdk.internal.access.JavaUtilConcurrentFJPAccess; import jdk.internal.access.SharedSecrets; import jdk.internal.misc.Unsafe; import jdk.internal.vm.SharedThreadContainer; -import jdk.internal.vm.annotation.Stable; import static java.util.concurrent.DelayScheduler.ScheduledForkJoinTask; /** @@ -561,70 +560,89 @@ public class ForkJoinPool extends AbstractExecutorService * access (which is usually needed anyway). * * Signalling. Signals (in signalWork) cause new or reactivated - * workers to scan for tasks. SignalWork is invoked in two cases: - * (1) When a task is pushed onto an empty queue, and (2) When a - * worker takes a top-level task from a queue that has additional - * tasks. Together, these suffice in O(log(#threads)) steps to - * fully activate with at least enough workers, and ideally no - * more than required. This ideal is unobtainable: Callers do not - * know whether another worker will finish its current task and - * poll for others without need of a signal (which is otherwise an - * advantage of work-stealing vs other schemes), and also must - * conservatively estimate the triggering conditions of emptiness - * or non-emptiness; all of which usually cause more activations - * than necessary (see below). (Method signalWork is also used as - * failsafe in case of Thread failures in deregisterWorker, to - * activate or create a new worker to replace them). + * workers to scan for tasks. Method signalWork and its callers + * try to approximate the unattainable goal of having the right + * number of workers activated for the tasks at hand, but must err + * on the side of too many workers vs too few to avoid stalls: * - * Top-Level scheduling - * ==================== + * * If computations are purely tree structured, it suffices for + * every worker to activate another when it pushes a task into + * an empty queue, resulting in O(log(#threads)) steps to full + * activation. Emptiness must be conservatively approximated, + * which may result in unnecessary signals. Also, to reduce + * resource usages in some cases, at the expense of slower + * startup in others, activation of an idle thread is preferred + * over creating a new one, here and elsewhere. + * + * * At the other extreme, if "flat" tasks (those that do not in + * turn generate others) come in serially from only a single + * producer, each worker taking a task from a queue should + * propagate a signal if there are more tasks in that + * queue. This is equivalent to, but generally faster than, + * arranging the stealer take multiple tasks, re-pushing one or + * more on its own queue, and signalling (because its queue is + * empty), also resulting in logarithmic full activation + * time. If tasks do not not engage in unbounded loops based on + * the actions of other workers with unknown dependencies loop, + * this form of proagation can be limited to one signal per + * activation (phase change). We distinguish the cases by + * further signalling only if the task is an InterruptibleTask + * (see below), which are the only supported forms of task that + * may do so. + * + * * Because we don't know about usage patterns (or most commonly, + * mixtures), we use both approaches, which present even more + * opportunities to over-signal. (Failure to distinguish these + * cases in terms of submission methods was arguably an early + * design mistake.) Note that in either of these contexts, + * signals may be (and often are) unnecessary because active + * workers continue scanning after running tasks without the + * need to be signalled (which is one reason work stealing is + * often faster than alternatives), so additional workers + * aren't needed. + * + * * For rapidly branching tasks that require full pool resources, + * oversignalling is OK, because signalWork will soon have no + * more workers to create or reactivate. But for others (mainly + * externally submitted tasks), overprovisioning may cause very + * noticeable slowdowns due to contention and resource + * wastage. We reduce impact by deactivating workers when + * queues don't have accessible tasks, but reactivating and + * rescanning if other tasks remain. + * + * * Despite these, signal contention and overhead effects still + * occur during ramp-up and ramp-down of small computations. * * Scanning. Method runWorker performs top-level scanning for (and * execution of) tasks by polling a pseudo-random permutation of * the array (by starting at a given index, and using a constant * cyclically exhaustive stride.) It uses the same basic polling * method as WorkQueue.poll(), but restarts with a different - * permutation on each rescan. The pseudorandom generator need - * not have high-quality statistical properties in the long + * permutation on each invocation. The pseudorandom generator + * need not have high-quality statistical properties in the long * term. We use Marsaglia XorShifts, seeded with the Weyl sequence - * from ThreadLocalRandom probes, which are cheap and suffice. + * from ThreadLocalRandom probes, which are cheap and + * suffice. Each queue's polling attempts to avoid becoming stuck + * when other scanners/pollers stall. Scans do not otherwise + * explicitly take into account core affinities, loads, cache + * localities, etc, However, they do exploit temporal locality + * (which usually approximates these) by preferring to re-poll + * from the same queue after a successful poll before trying + * others, which also reduces bookkeeping, cache traffic, and + * scanning overhead. But it also reduces fairness, which is + * partially counteracted by giving up on detected interference + * (which also reduces contention when too many workers try to + * take small tasks from the same queue). * * Deactivation. When no tasks are found by a worker in runWorker, - * it invokes deactivate, that first deactivates (to an IDLE - * phase). Avoiding missed signals during deactivation requires a - * (conservative) rescan, reactivating if there may be tasks to - * poll. Because idle workers are often not yet blocked (parked), - * we use a WorkQueue field to advertise that a waiter actually - * needs unparking upon signal. - * - * When tasks are constructed as (recursive) DAGs, top-level - * scanning is usually infrequent, and doesn't encounter most - * of the following problems addressed by runWorker and awaitWork: - * - * Locality. Polls are organized into "runs", continuing until - * empty or contended, while also minimizing interference by - * postponing bookeeping to ends of runs. This may reduce - * fairness. - * - * Contention. When many workers try to poll few queues, they - * often collide, generating CAS failures and disrupting locality - * of workers already running their tasks. This also leads to - * stalls when tasks cannot be taken because other workers have - * not finished poll operations, which is detected by reading - * ahead in queue arrays. In both cases, workers restart scans in a - * way that approximates randomized backoff. - * - * Oversignalling. When many short top-level tasks are present in - * a small number of queues, the above signalling strategy may - * activate many more workers than needed, worsening locality and - * contention problems, while also generating more global - * contention (field ctl is CASed on every activation and - * deactivation). We filter out (both in runWorker and - * signalWork) attempted signals that are surely not needed - * because the signalled tasks are already taken. - * - * Shutdown and Quiescence - * ======================= + * it tries to deactivate()), giving up (and rescanning) on "ctl" + * contention. To avoid missed signals during deactivation, the + * method rescans and reactivates if there may have been a missed + * signal during deactivation. To reduce false-alarm reactivations + * while doing so, we scan multiple times (analogously to method + * quiescent()) before trying to reactivate. Because idle workers + * are often not yet blocked (parked), we use a WorkQueue field to + * advertise that a waiter actually needs unparking upon signal. * * Quiescence. Workers scan looking for work, giving up when they * don't find any, without being sure that none are available. @@ -874,7 +892,9 @@ public class ForkJoinPool extends AbstractExecutorService * shutdown, runners are interrupted so they can cancel. Since * external joining callers never run these tasks, they must await * cancellation by others, which can occur along several different - * paths. + * paths. The inability to rely on caller-runs may also require + * extra signalling (resulting in scanning and contention) so is + * done only conditionally in methods push and runworker. * * Across these APIs, rules for reporting exceptions for tasks * with results accessed via join() differ from those via get(), @@ -941,13 +961,9 @@ public class ForkJoinPool extends AbstractExecutorService * less-contended applications. To help arrange this, some * non-reference fields are declared as "long" even when ints or * shorts would suffice. For class WorkQueue, an - * embedded @Contended isolates the very busy top index, along - * with status and bookkeeping fields written (mostly) by owners, - * that otherwise interfere with reading array and base - * fields. There are other variables commonly contributing to - * false-sharing-related performance issues (including fields of - * class Thread), but we can't do much about this except try to - * minimize access. + * embedded @Contended region segregates fields most heavily + * updated by owners from those most commonly read by stealers or + * other management. * * Initial sizing and resizing of WorkQueue arrays is an even more * delicate tradeoff because the best strategy systematically @@ -956,11 +972,13 @@ public class ForkJoinPool extends AbstractExecutorService * direct false-sharing and indirect cases due to GC bookkeeping * (cardmarks etc), and reduce the number of resizes, which are * not especially fast because they require atomic transfers. - * Currently, arrays are initialized to be just large enough to - * avoid resizing in most tree-structured tasks, but grow rapidly - * until large. (Maintenance note: any changes in fields, queues, - * or their uses, or JVM layout policies, must be accompanied by - * re-evaluation of these placement and sizing decisions.) + * Currently, arrays for workers are initialized to be just large + * enough to avoid resizing in most tree-structured tasks, but + * larger for external queues where both false-sharing problems + * and the need for resizing are more common. (Maintenance note: + * any changes in fields, queues, or their uses, or JVM layout + * policies, must be accompanied by re-evaluation of these + * placement and sizing decisions.) * * Style notes * =========== @@ -1043,11 +1061,17 @@ public class ForkJoinPool extends AbstractExecutorService static final int DEFAULT_COMMON_MAX_SPARES = 256; /** - * Initial capacity of work-stealing queue array. + * Initial capacity of work-stealing queue array for workers. * Must be a power of two, at least 2. See above. */ static final int INITIAL_QUEUE_CAPACITY = 1 << 6; + /** + * Initial capacity of work-stealing queue array for external queues. + * Must be a power of two, at least 2. See above. + */ + static final int INITIAL_EXTERNAL_QUEUE_CAPACITY = 1 << 9; + // conversions among short, int, long static final int SMASK = 0xffff; // (unsigned) short bits static final long LMASK = 0xffffffffL; // lower 32 bits of long @@ -1079,7 +1103,6 @@ public class ForkJoinPool extends AbstractExecutorService static final int DROPPED = 1 << 16; // removed from ctl counts static final int UNCOMPENSATE = 1 << 16; // tryCompensate return static final int IDLE = 1 << 16; // phase seqlock/version count - static final int NEXTIDLE = IDLE << 1; // next IDLE phase bit static final int MIN_QUEUES_SIZE = 1 << 4; // ensure external slots /* @@ -1201,15 +1224,22 @@ public class ForkJoinPool extends AbstractExecutorService private static final long TOP; private static final long ARRAY; - /** - * SeqLock acquire for external queues. - * @return 1 if cannot acquire lock, else current phase - */ - final int tryLockPhase() { + final void updateBase(int v) { + U.putIntVolatile(this, BASE, v); + } + final void updateTop(int v) { + U.putIntOpaque(this, TOP, v); + } + final void updateArray(ForkJoinTask[] a) { + U.getAndSetReference(this, ARRAY, a); + } + final void unlockPhase() { + U.getAndAddInt(this, PHASE, IDLE); + } + final boolean tryLockPhase() { // seqlock acquire int p; return (((p = phase) & IDLE) != 0 && - (p == U.compareAndExchangeInt(this, PHASE, p, p + IDLE))) ? - p : 1; + U.compareAndSetInt(this, PHASE, p, p + IDLE)); } /** @@ -1218,11 +1248,11 @@ public class ForkJoinPool extends AbstractExecutorService */ WorkQueue(ForkJoinWorkerThread owner, int id, int cfg, boolean clearThreadLocals) { + array = new ForkJoinTask[owner == null ? + INITIAL_EXTERNAL_QUEUE_CAPACITY : + INITIAL_QUEUE_CAPACITY]; + this.owner = owner; this.config = (clearThreadLocals) ? cfg | CLEAR_TLS : cfg; - if ((this.owner = owner) == null) { - array = new ForkJoinTask[INITIAL_QUEUE_CAPACITY]; - phase = id | IDLE; - } } /** @@ -1243,67 +1273,61 @@ public class ForkJoinPool extends AbstractExecutorService /** * Pushes a task. Called only by owner or if already locked * - * @param task the task; caller must ensure nonnull + * @param task the task; no-op if null * @param pool the pool to signal if was previously empty, else null - * @param unlock if not 1, phase unlock value + * @param internal if caller owns this queue * @throws RejectedExecutionException if array could not be resized */ - final void push(ForkJoinTask task, ForkJoinPool pool, int unlock) { - ForkJoinTask[] a = array; - int b = base, s = top, p = top = s + 1, cap, m; long pos; - if (a == null || (cap = a.length) <= p - b || (m = cap - 1) < 0) - growAndPush(task, pool, unlock); - else { - U.putReferenceVolatile(a, pos = slotOffset(m & s), task); - boolean empty = (s == b) || - (U.getReferenceVolatile(a, slotOffset(m & (s - 1))) == null); - if (unlock != 1) { // release external lock - U.putInt(this, PHASE, unlock); - U.storeFence(); + final void push(ForkJoinTask task, ForkJoinPool pool, boolean internal) { + int s = top, b = base, m, cap, room; ForkJoinTask[] a; + if ((a = array) != null && (cap = a.length) > 0) { // else disabled + if ((room = (m = cap - 1) - (s - b)) >= 0) { + top = s + 1; + long pos = slotOffset(m & s); + if (!internal) + U.putReference(a, pos, task); // inside lock + else + U.putReferenceVolatile(a, pos, task); + if (room == 0) // resize + growArray(a, cap, s); } - if (empty && pool != null) - pool.signalWork(a, pos); // may have appeared empty + if (!internal) + unlockPhase(); + if (room < 0) + throw new RejectedExecutionException("Queue capacity exceeded"); + if ((room == 0 || room == m || + U.getReferenceVolatile(a, slotOffset(m & (s - 1))) == null) && + pool != null) + pool.signalWork(); // may have appeared empty } } /** - * Resizes the queue array and pushes unless out of memory. - * @param task the task; caller must ensure nonnull - * @param pool the pool to signal upon resize - * @param unlock if not 1, phase unlock value + * Resizes the queue array unless out of memory. + * @param a old array + * @param cap old array capacity + * @param s current top */ - private void growAndPush(ForkJoinTask task, ForkJoinPool pool, int unlock) { - ForkJoinTask[] a; int cap, newCap; - if ((a = array) != null && (cap = a.length) > 0 && - (newCap = (cap >= 1 << 16) ? cap << 1 : cap << 2) > 0) { + private void growArray(ForkJoinTask[] a, int cap, int s) { + int newCap = cap << 1; + if (a != null && a.length == cap && cap > 0 && newCap > 0) { ForkJoinTask[] newArray = null; try { newArray = new ForkJoinTask[newCap]; } catch (OutOfMemoryError ex) { } - if (newArray != null) { - int s = top - 1, mask = cap - 1, newMask = newCap - 1; - newArray[s & newMask] = task; - for (int k = s - 1, j = cap; j > 0; --j, --k) { + if (newArray != null) { // else throw on next push + int mask = cap - 1, newMask = newCap - 1; + for (int k = s, j = cap; j > 0; --j, --k) { ForkJoinTask u; // poll old, push to new if ((u = (ForkJoinTask)U.getAndSetReference( a, slotOffset(k & mask), null)) == null) break; // lost to pollers newArray[k & newMask] = u; } - long pos = slotOffset(s & newMask); - U.putReferenceVolatile(this, ARRAY, newArray); - if (unlock != 1) - phase = unlock; - if (pool != null) - pool.signalWork(newArray, pos); - return; + updateArray(newArray); // fully fenced } } - U.putIntOpaque(this, TOP, top - 1); // backout - if (unlock != 1) - phase = unlock; - throw new RejectedExecutionException("Queue capacity exceeded"); } /** @@ -1314,25 +1338,24 @@ public class ForkJoinPool extends AbstractExecutorService private ForkJoinTask nextLocalTask(int fifo) { ForkJoinTask t = null; ForkJoinTask[] a = array; - int b = base, s = top - 1, cap; - if (a != null && s - b >= 0 && (cap = a.length) > 0) { - if (fifo == 0) { - if ((t = (ForkJoinTask)U.getAndSetReference( - a, slotOffset((cap - 1) & s), null)) != null) - U.putIntOpaque(this, TOP, s); - } else { - do { + int b = base, p = top, cap; + if (p - b > 0 && a != null && (cap = a.length) > 0) { + for (int m = cap - 1, s, nb;;) { + if (fifo == 0 || (nb = b + 1) == p) { if ((t = (ForkJoinTask)U.getAndSetReference( - a, slotOffset((cap - 1) & b), null)) != null) { - base = b + 1; - U.storeFence(); - break; - } - if (b == s) - break; - while (b == (b = U.getIntAcquire(this, BASE))) - Thread.onSpinWait(); - } while (s - b >= 0); + a, slotOffset(m & (s = p - 1)), null)) != null) + updateTop(s); // else lost race for only task + break; + } + if ((t = (ForkJoinTask)U.getAndSetReference( + a, slotOffset(m & b), null)) != null) { + updateBase(nb); + break; + } + while (b == (b = U.getIntAcquire(this, BASE))) + Thread.onSpinWait(); // spin to reduce memory traffic + if (p - b <= 0) + break; } } return t; @@ -1354,16 +1377,16 @@ public class ForkJoinPool extends AbstractExecutorService final boolean tryUnpush(ForkJoinTask task, boolean internal) { boolean taken = false; ForkJoinTask[] a = array; - int p = top, s = p - 1, lock = 0, cap; long k; + int p = top, s = p - 1, cap; long k; if (a != null && (cap = a.length) > 0 && U.getReference(a, k = slotOffset((cap - 1) & s)) == task && - (internal || (lock = tryLockPhase()) != 1)) { + (internal || tryLockPhase())) { if (top == p && U.compareAndSetReference(a, k, task, null)) { taken = true; - U.putIntOpaque(this, TOP, s); + updateTop(s); } if (!internal) - phase = lock + NEXTIDLE; + unlockPhase(); } return taken; } @@ -1409,8 +1432,7 @@ public class ForkJoinPool extends AbstractExecutorService Thread.onSpinWait(); // stalled } else if (U.compareAndSetReference(a, k, t, null)) { - base = nb; - U.storeFence(); + updateBase(nb); return t; } } @@ -1444,27 +1466,25 @@ public class ForkJoinPool extends AbstractExecutorService if (t == null) break; if (t == task) { - int lock = 0; - if (!internal && (lock = tryLockPhase()) == 1) + if (!internal && !tryLockPhase()) break; // fail if locked if (taken = (top == p && U.compareAndSetReference(a, k, task, null))) { if (i == s) // act as pop - top = s; + updateTop(s); else if (i == base) // act as poll - base = i + 1; + updateBase(i + 1); else { // swap with top U.putReferenceVolatile( a, k, (ForkJoinTask) U.getAndSetReference( a, slotOffset(s & m), null)); - top = s; + updateTop(s); } - U.storeFence(); } if (!internal) - phase = lock + NEXTIDLE; + unlockPhase(); if (taken) task.doExec(); break; @@ -1503,15 +1523,14 @@ public class ForkJoinPool extends AbstractExecutorService if ((f = f.completer) == null || --steps == 0) break outer; } - int lock = 0; - if (!internal && (lock = tryLockPhase()) == 1) + if (!internal && !tryLockPhase()) break; if (taken = (top == p && U.compareAndSetReference(a, k, t, null))) - U.putIntOpaque(this, TOP, s); + updateTop(s); if (!internal) - phase = lock + NEXTIDLE; + unlockPhase(); if (!taken) break; t.doExec(); @@ -1546,8 +1565,7 @@ public class ForkJoinPool extends AbstractExecutorService break; if (base == b && t != null && U.compareAndSetReference(a, k, t, null)) { - base = b + 1; - U.storeFence(); + updateBase(b + 1); t.doExec(); } } @@ -1594,7 +1612,7 @@ public class ForkJoinPool extends AbstractExecutorService * Creates a new ForkJoinWorkerThread. This factory is used unless * overridden in ForkJoinPool constructors. */ - @Stable public static final ForkJoinWorkerThreadFactory + public static final ForkJoinWorkerThreadFactory defaultForkJoinWorkerThreadFactory; /** @@ -1603,7 +1621,7 @@ public class ForkJoinPool extends AbstractExecutorService * to paranoically avoid potential initialization circularities * as well as to simplify generated code. */ - @Stable static final ForkJoinPool common; + static final ForkJoinPool common; /** * Sequence number for creating worker names @@ -1613,7 +1631,7 @@ public class ForkJoinPool extends AbstractExecutorService /** * For VirtualThread intrinsics */ - @Stable private static final JavaLangAccess JLA; + private static final JavaLangAccess JLA; // fields declared in order of their likely layout on most VMs volatile CountDownLatch termination; // lazily constructed @@ -1621,8 +1639,8 @@ public class ForkJoinPool extends AbstractExecutorService final ForkJoinWorkerThreadFactory factory; final UncaughtExceptionHandler ueh; // per-worker UEH final SharedThreadContainer container; - @Stable final String workerNamePrefix; // null for common pool - @Stable final String poolName; + final String workerNamePrefix; // null for common pool + final String poolName; volatile DelayScheduler delayScheduler; // lazily constructed WorkQueue[] queues; // main registry volatile long runState; // versioned, lockable @@ -1646,15 +1664,46 @@ public class ForkJoinPool extends AbstractExecutorService private static final Object POOLIDS_BASE; private static final long POOLIDS; - // runState locking operations + private boolean compareAndSetCtl(long c, long v) { + return U.compareAndSetLong(this, CTL, c, v); + } + private long compareAndExchangeCtl(long c, long v) { + return U.compareAndExchangeLong(this, CTL, c, v); + } + private long getAndAddCtl(long v) { + return U.getAndAddLong(this, CTL, v); + } + private long incrementThreadIds() { + return U.getAndAddLong(this, THREADIDS, 1L); + } + private static int getAndAddPoolIds(int x) { + return U.getAndAddInt(POOLIDS_BASE, POOLIDS, x); + } + private int getAndSetParallelism(int v) { + return U.getAndSetInt(this, PARALLELISM, v); + } + private int getParallelismOpaque() { + return U.getIntOpaque(this, PARALLELISM); + } + private CountDownLatch cmpExTerminationSignal(CountDownLatch x) { + return (CountDownLatch) + U.compareAndExchangeReference(this, TERMINATION, null, x); + } + // runState operations + + private long getAndBitwiseOrRunState(long v) { // for status bits + return U.getAndBitwiseOrLong(this, RUNSTATE, v); + } + private boolean casRunState(long c, long v) { + return U.compareAndSetLong(this, RUNSTATE, c, v); + } private void unlockRunState() { // increment lock bit U.getAndAddLong(this, RUNSTATE, RS_LOCK); } private long lockRunState() { // lock and return current state long s, u; // locked when RS_LOCK set - if (((s = runState) & RS_LOCK) == 0L && - U.compareAndSetLong(this, RUNSTATE, s, u = s + RS_LOCK)) + if (((s = runState) & RS_LOCK) == 0L && casRunState(s, u = s + RS_LOCK)) return u; else return spinLockRunState(); @@ -1663,7 +1712,7 @@ public class ForkJoinPool extends AbstractExecutorService for (int waits = 0;;) { long s, u; if (((s = runState) & RS_LOCK) == 0L) { - if (U.compareAndSetLong(this, RUNSTATE, s, u = s + RS_LOCK)) + if (casRunState(s, u = s + RS_LOCK)) return u; waits = 0; } else if (waits < SPIN_WAITS) { @@ -1718,7 +1767,7 @@ public class ForkJoinPool extends AbstractExecutorService */ final String nextWorkerThreadName() { String prefix = workerNamePrefix; - long tid = U.getAndAddLong(this, THREADIDS, 1L) + 1L; + long tid = incrementThreadIds() + 1L; if (prefix == null) // commonPool has no prefix prefix = "ForkJoinPool.commonPool-worker-"; return prefix.concat(Long.toString(tid)); @@ -1731,7 +1780,6 @@ public class ForkJoinPool extends AbstractExecutorService */ final void registerWorker(WorkQueue w) { if (w != null && (runState & STOP) == 0L) { - w.array = new ForkJoinTask[INITIAL_QUEUE_CAPACITY]; ThreadLocalRandom.localInit(); int seed = w.stackPred = ThreadLocalRandom.getProbe(); int phaseSeq = seed & ~((IDLE << 1) - 1); // initial phase tag @@ -1789,7 +1837,7 @@ public class ForkJoinPool extends AbstractExecutorService releaseWaiters(); // ensure released if (w == null || w.source != DROPPED) { long c = ctl; // decrement counts - do {} while (c != (c = U.compareAndExchangeLong(this, CTL, + do {} while (c != (c = compareAndExchangeCtl( c, ((RC_MASK & (c - RC_UNIT)) | (TC_MASK & (c - TC_UNIT)) | (LMASK & c))))); @@ -1809,39 +1857,39 @@ public class ForkJoinPool extends AbstractExecutorService } if ((tryTerminate(false, false) & STOP) == 0L && phase != 0 && w != null && w.source != DROPPED) { + signalWork(); // possibly replace w.cancelTasks(); // clean queue - signalWork(null, 0L); // possibly replace } if (ex != null) ForkJoinTask.rethrow(ex); } /** - * Releases an idle worker, or creates one if not enough exist, - * giving up on contention if q is nonull and signalled slot - * already taken. - * - * @param src, if nonnull, the WorkQueue array containing signalled task - * @param offset the offset of task being signalled + * Releases an idle worker, or creates one if not enough exist. */ - final void signalWork(ForkJoinTask[] src, long offset) { - int pc = parallelism, i, sp; + final void signalWork() { + int pc = parallelism; for (long c = ctl;;) { WorkQueue[] qs = queues; - if ((short)(c >>> RC_SHIFT) >= pc || qs == null || - qs.length <= (i = (sp = (int)c) & SMASK)) + long ac = (c + RC_UNIT) & RC_MASK, nc; + int sp = (int)c, i = sp & SMASK; + if ((short)(c >>> RC_SHIFT) >= pc) break; - WorkQueue w = qs[i], v = null; long nc, cc; - if (i == 0) { + if (qs == null) + break; + if (qs.length <= i) + break; + WorkQueue w = qs[i], v = null; + if (sp == 0) { if ((short)(c >>> TC_SHIFT) >= pc) break; - nc = ((c + TC_UNIT) & TC_MASK) | ((c + RC_UNIT) & RC_MASK); + nc = ((c + TC_UNIT) & TC_MASK); } else if ((v = w) == null) break; else - nc = ((v = w).stackPred & LMASK) | ((c + RC_UNIT) & UMASK); - if (c == (cc = U.compareAndExchangeLong(this, CTL, c, nc))) { + nc = (v.stackPred & LMASK) | (c & TC_MASK); + if (c == (c = compareAndExchangeCtl(c, nc | ac))) { if (v == null) createWorker(); else { @@ -1851,10 +1899,6 @@ public class ForkJoinPool extends AbstractExecutorService } break; } - if ((cc & RC_MASK) > (c & RC_MASK) && src != null && - U.getReference(src, offset) == null) - break; - c = cc; } } @@ -1867,7 +1911,7 @@ public class ForkJoinPool extends AbstractExecutorService if ((sp = (int)c) == 0 || (qs = queues) == null || qs.length <= (i = sp & SMASK) || (v = qs[i]) == null) break; - if (c == (c = U.compareAndExchangeLong(this, CTL, + if (c == (c = compareAndExchangeCtl( c, ((UMASK & (c + RC_UNIT)) | (c & TC_MASK) | (v.stackPred & LMASK))))) { v.phase = sp; @@ -1913,11 +1957,8 @@ public class ForkJoinPool extends AbstractExecutorService return 0; else if ((ds = delayScheduler) != null && !ds.canShutDown()) return 0; - else if (U.compareAndSetLong(this, CTL, c, c) && - U.compareAndSetLong(this, RUNSTATE, e, e | STOP)) { - releaseWaiters(); + else if (compareAndSetCtl(c, c) && casRunState(e, e | STOP)) return 1; // enable termination - } else break; // restart } @@ -1932,209 +1973,205 @@ public class ForkJoinPool extends AbstractExecutorService */ final void runWorker(WorkQueue w) { if (w != null) { - int phase = w.phase, r = w.stackPred; // seed from registerWorker + int phase = w.phase, r = w.stackPred; // seed from registerWorker int fifo = w.config & FIFO, nsteals = 0, src = -1; - for (boolean taken = false;;) { - WorkQueue[] qs; int n; + for (;;) { + WorkQueue[] qs; r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift - int i = r, step = (r >>> 16) | 1; - long e = runState; - if ((qs = queues) == null || (e & STOP) != 0L || (n = qs.length) <= 0) - return; - scan: for (int j = n; ; i += step) { - WorkQueue q; int qid; - if ((q = qs[qid = i & (n - 1)]) != null) { - boolean running = false; - for (int pb = -1;;) { - ForkJoinTask t; int cap, m; long bp; - ForkJoinTask[] a = q.array; - int b = q.base; - if (a == null || (cap = a.length) <= 0) - break; // currently impossible + if ((runState & STOP) != 0L || (qs = queues) == null) + break; + int n = qs.length, i = r, step = (r >>> 16) | 1; + boolean rescan = false; + scan: for (int l = n; l > 0; --l, i += step) { // scan queues + int j, cap; WorkQueue q; ForkJoinTask[] a; + if ((q = qs[j = i & (n - 1)]) != null && + (a = q.array) != null && (cap = a.length) > 0) { + for (int m = cap - 1, pb = -1, b = q.base;;) { + ForkJoinTask t; long k; t = (ForkJoinTask)U.getReferenceAcquire( - a, bp = slotOffset((m = cap - 1) & b)); - if (q.array != a || q.base != b || - (t != null && // inconsistent or busy - !U.compareAndSetReference(a, bp, t, null))) { - if (qid != src) - break scan; // reduce interference - } - else if (t == null) { - if (U.getReference(a, bp) == null) { - if (running) // end of run + a, k = slotOffset(m & b)); + if (b != (b = q.base) || t == null || + !U.compareAndSetReference(a, k, t, null)) { + if (a[b & m] == null) { + if (rescan) // end of run break scan; if (a[(b + 1) & m] == null && - a[(b + 2) & m] == null) + a[(b + 2) & m] == null) { break; // probably empty - if (pb == (pb = b)) - break scan; // stalled + } + if (pb == (pb = b)) { // track progress + rescan = true; // stalled; reorder scan + break scan; + } } } else { - int nb = q.base = b + 1; + boolean propagate; + int prevSrc = src, nb; + long np = slotOffset((nb = b + 1) & m); + boolean more = (U.getReferenceVolatile(a, np) != null); + q.base = nb; w.nsteals = ++nsteals; - w.source = src = qid; - boolean propagate = !taken || - ((qid & 1) == 0 && - (fifo != 0 || t.noUserHelp() != 0)); - running = taken = true; - long np = slotOffset(nb & m); - if (propagate && - U.getReferenceAcquire(a, np) != null) - signalWork(a, np); + w.source = src = j; // volatile + rescan = true; + if (propagate = + (more && + (prevSrc != src || + ((j & 1) == 0) && + (fifo != 0 || t.noUserHelp() != 0)) && + U.getReference(a, np) != null)) + signalWork(); w.topLevelExec(t, fifo); + if ((b = q.base) != nb && !propagate) + break scan; // reduce interference } } } - if (--j == 0) { // empty scan - if (((phase = deactivate(w, phase)) & IDLE) != 0) - return; // trimmed or terminated - taken = false; + } + if (!rescan) { + if (((phase = deactivate(w, phase)) & IDLE) != 0) break; - } + src = -1; // re-enable propagation } } } } /** - * Possibly deactivates worker + * Deactivates and if necessary awaits signal or termination. * * @param w the worker * @param phase current phase - * @return current phase + * @return current phase, with IDLE set if worker should exit */ private int deactivate(WorkQueue w, int phase) { if (w == null) // currently impossible return IDLE; w.phase = phase | IDLE; int activePhase = phase + (IDLE << 1); - long pc = ctl, qac; + long pc = ctl, qc, qac; for (;;) { // enqueue w.stackPred = (int)pc; - long qc = (pc - RC_UNIT) & UMASK | (activePhase & LMASK); - qac = qc & RC_MASK; + qac = (qc = (pc - RC_UNIT) & UMASK | (activePhase & LMASK)) & RC_MASK; if (pc == (pc = U.compareAndExchangeLong(this, CTL, pc, qc))) break; else if (qac < (pc & RC_MASK)) return w.phase = phase; // back out if lost to signal } - long e; WorkQueue[] qs; int n; + int ac = (short)(qac >>> RC_SHIFT), n; long e; WorkQueue[] qs; if (((e = runState) & STOP) != 0L || - ((e & SHUTDOWN) != 0L && qac == 0L && quiescent() > 0) || + ((e & SHUTDOWN) != 0L && ac == 0 && quiescent() > 0) || (qs = queues) == null || (n = qs.length) <= 0) return IDLE; // terminating - boolean prescan = (qac != 0L); // missed signal check - for (int k = n << 1, i = phase; k-- > 0; ++i) { + for (int prechecks = Math.min(ac, 2), // reactivation threshold + k = Math.max(n + (n << 1), SPIN_WAITS << 1), + i = 0; k-- > 0 ; ++i) { WorkQueue q; int cap; ForkJoinTask[] a; - if ((q = qs[i & (n - 1)]) != null) { - if (w.phase == activePhase) - return activePhase; - if ((a = q.array) != null && (cap = a.length) > 0 && + if (w.phase == activePhase) + return activePhase; + if ((q = qs[i & (n - 1)]) == null) + Thread.onSpinWait(); + else if ((a = q.array) != null && (cap = a.length) > 0 && a[q.base & (cap - 1)] != null) { - WorkQueue v; int sp, j; long c; - if (prescan) - prescan = false; - else if ((sp = (int)(c = ctl)) == 0 || (j = sp & SMASK) >= n || - (v = qs[j]) == null) + WorkQueue v; int sp, j; long c; + if (prechecks > 0) + --prechecks; + else if (((c = ctl) & RC_MASK) <= qac && (sp = (int)c) != 0 && + (j = sp & SMASK) < n && (v = qs[j]) != null) { + long nc = (v.stackPred & LMASK) | ((c + RC_UNIT) & UMASK); + if (sp != activePhase && w.phase == activePhase) + return activePhase; + if ((sp == activePhase || k < n) && + U.compareAndSetLong(this, CTL, c, nc)) { + v.phase = sp; + if (sp == activePhase) + return activePhase; + if (v.parking != 0) + U.unpark(v.owner); break; - else { - long nc = (v.stackPred & LMASK) | ((c + RC_UNIT) & UMASK); - if (sp != activePhase && w.phase == activePhase) - return activePhase; - if ((sp == activePhase || (k < n && (c & RC_MASK) <= qac)) && - U.compareAndSetLong(this, CTL, c, nc)) { - v.phase = sp; - if (sp == activePhase) - return activePhase; - if (v.parking != 0) - U.unpark(v.owner); - break; - } - } - if (k < n) { - if ((runState & STOP) != 0L) - return IDLE; - k = n; // ensure re-encounter } } + if (k < n) + k = n; // ensure re-encounter } } - return ((phase = w.phase) == activePhase) ? activePhase : awaitWork(w, phase); + return (((phase = w.phase) & IDLE) == 0) ? phase : awaitWork(w, phase); } /** * Awaits signal or termination. * * @param w the work queue - * @param phase current phase (known to be idle) + * @param p current phase (known to be idle) * @return current phase, with IDLE set if worker should exit */ - private int awaitWork(WorkQueue w, int phase) { + private int awaitWork(WorkQueue w, int p) { if (w != null) { - int activePhase = phase + IDLE; - boolean trim = (w.source == INVALID_ID); - if ((w.config & CLEAR_TLS) != 0 && - (Thread.currentThread() instanceof ForkJoinWorkerThread f)) - f.resetThreadLocals(); // (instanceof check always true) - for (long deadline = 0L;;) { - Thread.interrupted(); // clear status - if ((runState & STOP) != 0L) - break; - boolean trimmable = false; // true if at ctl head and quiescent - long d = 0L, c; - if ((int)(c = ctl) == activePhase && (c & RC_MASK) == 0L) { - long now = System.currentTimeMillis(); - if (deadline == 0L) - d = deadline = now + keepAlive; - else if ((d = deadline) - now <= TIMEOUT_SLOP) - trim = true; - if (trim && tryTrim(w, c, activePhase)) - break; - trim = false; - trimmable = true; - } - if ((phase = w.phase) == activePhase) - break; + ForkJoinWorkerThread t; long deadline; + if ((w.config & CLEAR_TLS) != 0 && (t = w.owner) != null) + t.resetThreadLocals(); // clear before reactivate + if ((ctl & RC_MASK) > 0L) + deadline = 0L; + else if ((deadline = + (((w.source != INVALID_ID) ? keepAlive : TIMEOUT_SLOP)) + + System.currentTimeMillis()) == 0L) + deadline = 1L; // avoid zero + int activePhase = p + IDLE; + if ((p = w.phase) != activePhase && (runState & STOP) == 0L) { LockSupport.setCurrentBlocker(this); - w.parking = 1; // enable unpark and recheck - if (w.phase != activePhase) - U.park(trimmable, d); - w.parking = 0; // close unpark window + w.parking = 1; // enable unpark + while ((p = w.phase) != activePhase) { + boolean trimmable = false; int trim; + Thread.interrupted(); // clear status + if ((runState & STOP) != 0L) + break; + if (deadline != 0L) { + if ((trim = tryTrim(w, p, deadline)) > 0) + break; + else if (trim < 0) + deadline = 0L; + else + trimmable = true; + } + U.park(trimmable, deadline); + } + w.parking = 0; LockSupport.setCurrentBlocker(null); - if ((phase = w.phase) == activePhase) - break; } } - return phase; + return p; } /** * Tries to remove and deregister worker after timeout, and release - * another to do the same unless new tasks are found. - * @return true if trimmed + * another to do the same. + * @return > 0: trimmed, < 0 : not trimmable, else 0 */ - private boolean tryTrim(WorkQueue w, long c, int activePhase) { - if (w != null) { - int vp, i; WorkQueue[] vs; WorkQueue v; - long nc = ((w.stackPred & LMASK) | - ((RC_MASK & c) | (TC_MASK & (c - TC_UNIT)))); - if (U.compareAndSetLong(this, CTL, c, nc)) { - w.source = DROPPED; - w.phase = activePhase; - if ((vp = (int)nc) != 0 && (vs = queues) != null && - vs.length > (i = vp & SMASK) && (v = vs[i]) != null && - U.compareAndSetLong(this, CTL, // try to wake up next waiter - nc, ((v.stackPred & LMASK) | - ((UMASK & (nc + RC_UNIT)) | (nc & TC_MASK))))) { - v.source = INVALID_ID; - v.phase = vp; - U.unpark(v.owner); - } - return true; + private int tryTrim(WorkQueue w, int phase, long deadline) { + long c, nc; int stat, activePhase, vp, i; WorkQueue[] vs; WorkQueue v; + if ((activePhase = phase + IDLE) != (int)(c = ctl) || w == null) + stat = -1; // no longer ctl top + else if (deadline - System.currentTimeMillis() >= TIMEOUT_SLOP) + stat = 0; // spurious wakeup + else if (!compareAndSetCtl( + c, nc = ((w.stackPred & LMASK) | (RC_MASK & c) | + (TC_MASK & (c - TC_UNIT))))) + stat = -1; // lost race to signaller + else { + stat = 1; + w.source = DROPPED; + w.phase = activePhase; + if ((vp = (int)nc) != 0 && (vs = queues) != null && + vs.length > (i = vp & SMASK) && (v = vs[i]) != null && + compareAndSetCtl( // try to wake up next waiter + nc, ((UMASK & (nc + RC_UNIT)) | + (nc & TC_MASK) | (v.stackPred & LMASK)))) { + v.source = INVALID_ID; // enable cascaded timeouts + v.phase = vp; + U.unpark(v.owner); } } - return false; + return stat; } /** @@ -2187,8 +2224,7 @@ public class ForkJoinPool extends AbstractExecutorService WorkQueue[] qs; WorkQueue v; int i; if ((qs = queues) != null && qs.length > (i = sp & SMASK) && (v = qs[i]) != null && - U.compareAndSetLong(this, CTL, c, - (c & UMASK) | (v.stackPred & LMASK))) { + compareAndSetCtl(c, (c & UMASK) | (v.stackPred & LMASK))) { v.phase = sp; if (v.parking != 0) U.unpark(v.owner); @@ -2196,18 +2232,17 @@ public class ForkJoinPool extends AbstractExecutorService } } else if (active > minActive && total >= pc) { // reduce active workers - if (U.compareAndSetLong(this, CTL, c, - ((c - RC_UNIT) & RC_MASK) | (c & ~RC_MASK))) + if (compareAndSetCtl(c, ((c - RC_UNIT) & RC_MASK) | (c & ~RC_MASK))) stat = UNCOMPENSATE; } else if (total < maxTotal && total < MAX_CAP) { // try to expand pool long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK); if ((runState & STOP) != 0L) // terminating stat = 0; - else if (U.compareAndSetLong(this, CTL, c, nc)) + else if (compareAndSetCtl(c, nc)) stat = createWorker() ? UNCOMPENSATE : 0; } - else if (!U.compareAndSetLong(this, CTL, c, c)) // validate + else if (!compareAndSetCtl(c, c)) // validate ; else if ((sat = saturate) != null && sat.test(this)) stat = 0; @@ -2221,7 +2256,7 @@ public class ForkJoinPool extends AbstractExecutorService * Readjusts RC count; called from ForkJoinTask after blocking. */ final void uncompensate() { - U.getAndAddLong(this, CTL, RC_UNIT); + getAndAddCtl(RC_UNIT); } /** @@ -2240,8 +2275,7 @@ public class ForkJoinPool extends AbstractExecutorService w.tryRemoveAndExec(task, internal); int s = 0; if (task != null && (s = task.status) >= 0 && internal && w != null) { - int wsrc = w.source; - int wid = U.getInt(w, WorkQueue.PHASE) & SMASK, r = wid + 2; + int wid = w.phase & SMASK, r = wid + 2, wsrc = w.source; long sctl = 0L; // track stability outer: for (boolean rescan = true;;) { if ((s = task.status) < 0) @@ -2293,7 +2327,7 @@ public class ForkJoinPool extends AbstractExecutorService } if (U.compareAndSetReference(a, k, t, null)) { q.base = b + 1; - w.source = j; + w.source = j; // volatile write t.doExec(); w.source = wsrc; rescan = true; // restart at index r @@ -2319,7 +2353,7 @@ public class ForkJoinPool extends AbstractExecutorService final int helpComplete(ForkJoinTask task, WorkQueue w, boolean internal) { int s = 0; if (task != null && (s = task.status) >= 0 && w != null) { - int r = U.getInt(w, WorkQueue.PHASE) + 1; // for indexing + int r = w.phase + 1; // for indexing long sctl = 0L; // track stability outer: for (boolean rescan = true, locals = true;;) { if (locals && (s = w.helpComplete(task, internal, 0)) < 0) @@ -2341,7 +2375,7 @@ public class ForkJoinPool extends AbstractExecutorService if ((q = qs[j = r & SMASK & (n - 1)]) != null) { for (;;) { ForkJoinTask t; ForkJoinTask[] a; - int b, cap; long k; + int b, cap, nb; long k; boolean eligible = false; if ((a = q.array) == null || (cap = a.length) <= 0) break; @@ -2364,8 +2398,7 @@ public class ForkJoinPool extends AbstractExecutorService if (eligible) { if (U.compareAndSetReference( a, k, t, null)) { - q.base = b + 1; - U.storeFence(); + q.updateBase(b + 1); t.doExec(); locals = rescan = true; break scan; @@ -2414,8 +2447,7 @@ public class ForkJoinPool extends AbstractExecutorService } if (locals) { // run local tasks before (re)polling locals = false; - int fifo = ((int)config) & FIFO; - for (ForkJoinTask u; (u = w.nextLocalTask(fifo)) != null;) + for (ForkJoinTask u; (u = w.nextLocalTask()) != null;) u.doExec(); } WorkQueue[] qs = queues; @@ -2449,7 +2481,7 @@ public class ForkJoinPool extends AbstractExecutorService } if (U.compareAndSetReference(a, k, t, null)) { q.base = nb; - w.source = j; + w.source = j; // volatile write t.doExec(); w.source = wsrc; rescan = locals = true; @@ -2556,49 +2588,81 @@ public class ForkJoinPool extends AbstractExecutorService /** * Finds and locks a WorkQueue for an external submitter, or - * throws RejectedExecutionException if shutdown + * throws RejectedExecutionException if shutdown or terminating. + * @param r current ThreadLocalRandom.getProbe() value * @param rejectOnShutdown true if RejectedExecutionException - * should be thrown when shutdown + * should be thrown when shutdown (else only if terminating) */ - final void externalPush(ForkJoinTask task, boolean signalIfEmpty, - boolean rejectOnShutdown) { - int r; - if ((r = ThreadLocalRandom.getProbe()) == 0) { - ThreadLocalRandom.localInit(); // initialize caller's probe + private WorkQueue submissionQueue(int r, boolean rejectOnShutdown) { + int reuse; // nonzero if prefer create + if ((reuse = r) == 0) { + ThreadLocalRandom.localInit(); // initialize caller's probe r = ThreadLocalRandom.getProbe(); } - for (;; r = ThreadLocalRandom.advanceProbe(r)) { - WorkQueue q; WorkQueue[] qs; int n, id, i, lock; - if ((qs = queues) == null || (n = qs.length) <= 0) + for (int probes = 0; ; ++probes) { + int n, i, id; WorkQueue[] qs; WorkQueue q; + if ((qs = queues) == null) + break; + if ((n = qs.length) <= 0) break; if ((q = qs[i = (id = r & EXTERNAL_ID_MASK) & (n - 1)]) == null) { - WorkQueue newq = new WorkQueue(null, id, 0, false); - lockRunState(); - if (qs[i] == null && queues == qs) - q = qs[i] = newq; // else lost race to install + WorkQueue w = new WorkQueue(null, id, 0, false); + w.phase = id; + boolean reject = ((lockRunState() & SHUTDOWN) != 0 && + rejectOnShutdown); + if (!reject && queues == qs && qs[i] == null) + q = qs[i] = w; // else lost race to install unlockRunState(); + if (q != null) + return q; + if (reject) + break; + reuse = 0; } - if (q != null && (lock = q.tryLockPhase()) != 1) { - int unlock = lock + NEXTIDLE; - if (rejectOnShutdown && (runState & SHUTDOWN) != 0L) { - q.phase = unlock; - break; // check while q lock held + if (reuse == 0 || !q.tryLockPhase()) { // move index + if (reuse == 0) { + if (probes >= n >> 1) + reuse = r; // stop prefering free slot } - q.push(task, signalIfEmpty ? this : null, unlock); - return; + else if (q != null) + reuse = 0; // probe on collision + r = ThreadLocalRandom.advanceProbe(r); } + else if (rejectOnShutdown && (runState & SHUTDOWN) != 0L) { + q.unlockPhase(); // check while q lock held + break; + } + else + return q; } throw new RejectedExecutionException(); } - private void poolSubmit(ForkJoinTask task, boolean signalIfEmpty) { - Thread t; ForkJoinWorkerThread wt; WorkQueue q; + private ForkJoinTask poolSubmit(boolean signalIfEmpty, ForkJoinTask task) { + Thread t; ForkJoinWorkerThread wt; WorkQueue q; boolean internal; if (((t = JLA.currentCarrierThread()) instanceof ForkJoinWorkerThread) && - (q = (wt = (ForkJoinWorkerThread)t).workQueue) != null && - wt.pool == this) - q.push(task, signalIfEmpty ? this : null, 1); - else - externalPush(task, signalIfEmpty, true); + (wt = (ForkJoinWorkerThread)t).pool == this) { + internal = true; + q = wt.workQueue; + } + else { // find and lock queue + internal = false; + q = submissionQueue(ThreadLocalRandom.getProbe(), true); + } + q.push(task, signalIfEmpty ? this : null, internal); + return task; + } + + /** + * Returns queue for an external submission, bypassing call to + * submissionQueue if already established and unlocked. + */ + final WorkQueue externalSubmissionQueue(boolean rejectOnShutdown) { + WorkQueue[] qs; WorkQueue q; int n; + int r = ThreadLocalRandom.getProbe(); + return (((qs = queues) != null && (n = qs.length) > 0 && + (q = qs[r & EXTERNAL_ID_MASK & (n - 1)]) != null && r != 0 && + q.tryLockPhase()) ? q : submissionQueue(r, rejectOnShutdown)); } /** @@ -2713,8 +2777,7 @@ public class ForkJoinPool extends AbstractExecutorService else if ((e & STOP) != 0L) now = true; else if (now) { - if (((ps = U.getAndBitwiseOrLong(this, RUNSTATE, SHUTDOWN | STOP)) & - STOP) == 0L) { + if (((ps = getAndBitwiseOrRunState(SHUTDOWN|STOP) & STOP)) == 0L) { if ((ps & RS_LOCK) != 0L) { spinLockRunState(); // ensure queues array stable after stop unlockRunState(); @@ -2725,7 +2788,7 @@ public class ForkJoinPool extends AbstractExecutorService else if ((isShutdown = (e & SHUTDOWN)) != 0L || enable) { long quiet; DelayScheduler ds; if (isShutdown == 0L) - U.getAndBitwiseOrLong(this, RUNSTATE, SHUTDOWN); + getAndBitwiseOrRunState(SHUTDOWN); if ((quiet = quiescent()) > 0) now = true; else if (quiet == 0 && (ds = delayScheduler) != null) @@ -2741,7 +2804,7 @@ public class ForkJoinPool extends AbstractExecutorService if (((e = runState) & CLEANED) == 0L) { boolean clean = cleanQueues(); if (((e = runState) & CLEANED) == 0L && clean) - e = U.getAndBitwiseOrLong(this, RUNSTATE, CLEANED) | CLEANED; + e = getAndBitwiseOrRunState(CLEANED) | CLEANED; } if ((e & TERMINATED) != 0L) break; @@ -2751,8 +2814,7 @@ public class ForkJoinPool extends AbstractExecutorService break; if ((e & CLEANED) != 0L) { e |= TERMINATED; - if ((U.getAndBitwiseOrLong(this, RUNSTATE, TERMINATED) & - TERMINATED) == 0L) { + if ((getAndBitwiseOrRunState(TERMINATED) & TERMINATED) == 0L) { CountDownLatch done; SharedThreadContainer ctr; if ((done = termination) != null) done.countDown(); @@ -2790,8 +2852,7 @@ public class ForkJoinPool extends AbstractExecutorService a, k = slotOffset((cap - 1) & (b = q.base))); if (q.base == b && t != null && U.compareAndSetReference(a, k, t, null)) { - q.base = b + 1; - U.storeFence(); + q.updateBase(b + 1); try { t.cancel(false); } catch (Throwable ignore) { @@ -2832,8 +2893,7 @@ public class ForkJoinPool extends AbstractExecutorService private CountDownLatch terminationSignal() { CountDownLatch signal, s, u; if ((signal = termination) == null) - signal = ((u = (CountDownLatch)U.compareAndExchangeReference( - this, TERMINATION, null, + signal = ((u = cmpExTerminationSignal( s = new CountDownLatch(1))) == null) ? s : u; return signal; } @@ -3001,7 +3061,7 @@ public class ForkJoinPool extends AbstractExecutorService (((long)maxSpares) << TC_SHIFT) | (((long)minAvail) << RC_SHIFT)); this.queues = new WorkQueue[size]; - String pid = Integer.toString(U.getAndAddInt(POOLIDS_BASE, POOLIDS, 1) + 1); + String pid = Integer.toString(getAndAddPoolIds(1) + 1); String name = "ForkJoinPool-" + pid; this.poolName = name; this.workerNamePrefix = name + "-worker-"; @@ -3085,8 +3145,8 @@ public class ForkJoinPool extends AbstractExecutorService * Package-private access to commonPool overriding zero parallelism */ static ForkJoinPool asyncCommonPool() { - ForkJoinPool cp; - if ((cp = common).parallelism == 0) + ForkJoinPool cp; int p; + if ((p = (cp = common).parallelism) == 0) U.compareAndSetInt(cp, PARALLELISM, 0, 2); return cp; } @@ -3111,7 +3171,7 @@ public class ForkJoinPool extends AbstractExecutorService * scheduled for execution */ public T invoke(ForkJoinTask task) { - poolSubmit(Objects.requireNonNull(task), true); + poolSubmit(true, Objects.requireNonNull(task)); try { return task.join(); } catch (RuntimeException | Error unchecked) { @@ -3130,7 +3190,7 @@ public class ForkJoinPool extends AbstractExecutorService * scheduled for execution */ public void execute(ForkJoinTask task) { - poolSubmit(Objects.requireNonNull(task), true); + poolSubmit(true, Objects.requireNonNull(task)); } // AbstractExecutorService methods @@ -3143,9 +3203,9 @@ public class ForkJoinPool extends AbstractExecutorService @Override @SuppressWarnings("unchecked") public void execute(Runnable task) { - poolSubmit((Objects.requireNonNull(task) instanceof ForkJoinTask) + poolSubmit(true, (Objects.requireNonNull(task) instanceof ForkJoinTask) ? (ForkJoinTask) task // avoid re-wrap - : new ForkJoinTask.RunnableExecuteAction(task), true); + : new ForkJoinTask.RunnableExecuteAction(task)); } /** @@ -3163,8 +3223,7 @@ public class ForkJoinPool extends AbstractExecutorService * scheduled for execution */ public ForkJoinTask submit(ForkJoinTask task) { - poolSubmit(Objects.requireNonNull(task), true); - return task; + return poolSubmit(true, Objects.requireNonNull(task)); } /** @@ -3175,12 +3234,11 @@ public class ForkJoinPool extends AbstractExecutorService @Override public ForkJoinTask submit(Callable task) { Objects.requireNonNull(task); - ForkJoinTask t = + return poolSubmit( + true, (Thread.currentThread() instanceof ForkJoinWorkerThread) ? new ForkJoinTask.AdaptedCallable(task) : - new ForkJoinTask.AdaptedInterruptibleCallable(task); - poolSubmit(t, true); - return t; + new ForkJoinTask.AdaptedInterruptibleCallable(task)); } /** @@ -3191,12 +3249,11 @@ public class ForkJoinPool extends AbstractExecutorService @Override public ForkJoinTask submit(Runnable task, T result) { Objects.requireNonNull(task); - ForkJoinTask t = + return poolSubmit( + true, (Thread.currentThread() instanceof ForkJoinWorkerThread) ? new ForkJoinTask.AdaptedRunnable(task, result) : - new ForkJoinTask.AdaptedInterruptibleRunnable(task, result); - poolSubmit(t, true); - return t; + new ForkJoinTask.AdaptedInterruptibleRunnable(task, result)); } /** @@ -3208,14 +3265,13 @@ public class ForkJoinPool extends AbstractExecutorService @SuppressWarnings("unchecked") public ForkJoinTask submit(Runnable task) { Objects.requireNonNull(task); - ForkJoinTask t = + return poolSubmit( + true, (task instanceof ForkJoinTask) ? (ForkJoinTask) task : // avoid re-wrap ((Thread.currentThread() instanceof ForkJoinWorkerThread) ? new ForkJoinTask.AdaptedRunnable(task, null) : - new ForkJoinTask.AdaptedInterruptibleRunnable(task, null)); - poolSubmit(t, true); - return t; + new ForkJoinTask.AdaptedInterruptibleRunnable(task, null))); } /** @@ -3237,7 +3293,7 @@ public class ForkJoinPool extends AbstractExecutorService */ public ForkJoinTask externalSubmit(ForkJoinTask task) { Objects.requireNonNull(task); - externalPush(task, true, true); + externalSubmissionQueue(true).push(task, this, false); return task; } @@ -3258,8 +3314,7 @@ public class ForkJoinPool extends AbstractExecutorService * @since 19 */ public ForkJoinTask lazySubmit(ForkJoinTask task) { - poolSubmit(Objects.requireNonNull(task), false); - return task; + return poolSubmit(false, Objects.requireNonNull(task)); } /** @@ -3287,8 +3342,8 @@ public class ForkJoinPool extends AbstractExecutorService throw new IllegalArgumentException(); if ((config & PRESET_SIZE) != 0) throw new UnsupportedOperationException("Cannot override System property"); - if ((prevSize = U.getAndSetInt(this, PARALLELISM, size)) < size) - signalWork(null, 0L); // trigger worker activation + if ((prevSize = getAndSetParallelism(size)) < size) + signalWork(); // trigger worker activation return prevSize; } @@ -3322,7 +3377,7 @@ public class ForkJoinPool extends AbstractExecutorService for (Callable t : tasks) { ForkJoinTask f = ForkJoinTask.adapt(t); futures.add(f); - poolSubmit(f, true); + poolSubmit(true, f); } for (int i = futures.size() - 1; i >= 0; --i) ((ForkJoinTask)futures.get(i)).quietlyJoin(); @@ -3345,7 +3400,7 @@ public class ForkJoinPool extends AbstractExecutorService for (Callable t : tasks) { ForkJoinTask f = ForkJoinTask.adaptInterruptible(t); futures.add(f); - poolSubmit(f, true); + poolSubmit(true, f); } for (int i = futures.size() - 1; i >= 0; --i) ((ForkJoinTask)futures.get(i)) @@ -3461,7 +3516,7 @@ public class ForkJoinPool extends AbstractExecutorService * elapsed */ final void executeEnabledScheduledTask(ScheduledForkJoinTask task) { - externalPush(task, true, false); + externalSubmissionQueue(false).push(task, this, false); } /** @@ -3702,8 +3757,7 @@ public class ForkJoinPool extends AbstractExecutorService onTimeout.task = task = new ForkJoinTask.CallableWithTimeout(callable, timeoutTask); scheduleDelayedTask(timeoutTask); - poolSubmit(task, true); - return task; + return poolSubmit(true, task); } /** @@ -3750,7 +3804,7 @@ public class ForkJoinPool extends AbstractExecutorService * @return the targeted parallelism level of this pool */ public int getParallelism() { - return Math.max(U.getIntOpaque(this, PARALLELISM), 1); + return Math.max(getParallelismOpaque(), 1); } /** @@ -3862,7 +3916,7 @@ public class ForkJoinPool extends AbstractExecutorService * the pool. This method may be useful for tuning task * granularities.The returned count does not include scheduled * tasks that are not yet ready to execute, which are reported - * separately by method {@link #getDelayedTaskCount}. + * separately by method {@link getDelayedTaskCount}. * * @return the number of queued tasks * @see ForkJoinWorkerThread#getQueuedTaskCount() @@ -4302,7 +4356,7 @@ public class ForkJoinPool extends AbstractExecutorService done = blocker.block(); } finally { if (comp > 0) - U.getAndAddLong(this, CTL, RC_UNIT); + getAndAddCtl(RC_UNIT); } if (done) break; @@ -4329,7 +4383,7 @@ public class ForkJoinPool extends AbstractExecutorService */ void endCompensatedBlock(long post) { if (post > 0L) { - U.getAndAddLong(this, CTL, post); + getAndAddCtl(post); } } diff --git a/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java b/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java index b4b9ecc0c9b..137cac45ed0 100644 --- a/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java +++ b/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java @@ -641,10 +641,15 @@ public abstract class ForkJoinTask implements Future, Serializable { */ public final ForkJoinTask fork() { Thread t; ForkJoinWorkerThread wt; - if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) - ((wt = (ForkJoinWorkerThread)t).workQueue).push(this, wt.pool, 1); + ForkJoinPool p; ForkJoinPool.WorkQueue q; boolean internal; + if (internal = + (t = Thread.currentThread()) instanceof ForkJoinWorkerThread) { + q = (wt = (ForkJoinWorkerThread)t).workQueue; + p = wt.pool; + } else - ForkJoinPool.common.externalPush(this, true, false); + q = (p = ForkJoinPool.common).externalSubmissionQueue(false); + q.push(this, p, internal); return this; }