8375130: [BACKOUT] Scalability issue when submitting virtual threads with almost empty tasks

Reviewed-by: vklang
This commit is contained in:
Alan Bateman 2026-01-14 09:12:59 +00:00
parent ee30afae74
commit 283da4ddcb
2 changed files with 326 additions and 323 deletions

View File

@ -560,70 +560,89 @@ public class ForkJoinPool extends AbstractExecutorService
* access (which is usually needed anyway).
*
* Signalling. Signals (in signalWork) cause new or reactivated
* workers to scan for tasks. SignalWork is invoked in two cases:
* (1) When a task is pushed onto an empty queue, and (2) When a
* worker takes a top-level task from a queue that has additional
* tasks. Together, these suffice in O(log(#threads)) steps to
* fully activate with at least enough workers, and ideally no
* more than required. This ideal is unobtainable: Callers do not
* know whether another worker will finish its current task and
* poll for others without need of a signal (which is otherwise an
* advantage of work-stealing vs other schemes), and also must
* conservatively estimate the triggering conditions of emptiness
* or non-emptiness; all of which usually cause more activations
* than necessary (see below). (Method signalWork is also used as
* failsafe in case of Thread failures in deregisterWorker, to
* activate or create a new worker to replace them).
* workers to scan for tasks. Method signalWork and its callers
* try to approximate the unattainable goal of having the right
* number of workers activated for the tasks at hand, but must err
* on the side of too many workers vs too few to avoid stalls:
*
* Top-Level scheduling
* ====================
* * If computations are purely tree structured, it suffices for
* every worker to activate another when it pushes a task into
* an empty queue, resulting in O(log(#threads)) steps to full
* activation. Emptiness must be conservatively approximated,
* which may result in unnecessary signals. Also, to reduce
* resource usages in some cases, at the expense of slower
* startup in others, activation of an idle thread is preferred
* over creating a new one, here and elsewhere.
*
* * At the other extreme, if "flat" tasks (those that do not in
* turn generate others) come in serially from only a single
* producer, each worker taking a task from a queue should
* propagate a signal if there are more tasks in that
* queue. This is equivalent to, but generally faster than,
* arranging the stealer take multiple tasks, re-pushing one or
* more on its own queue, and signalling (because its queue is
* empty), also resulting in logarithmic full activation
* time. If tasks do not not engage in unbounded loops based on
* the actions of other workers with unknown dependencies loop,
* this form of proagation can be limited to one signal per
* activation (phase change). We distinguish the cases by
* further signalling only if the task is an InterruptibleTask
* (see below), which are the only supported forms of task that
* may do so.
*
* * Because we don't know about usage patterns (or most commonly,
* mixtures), we use both approaches, which present even more
* opportunities to over-signal. (Failure to distinguish these
* cases in terms of submission methods was arguably an early
* design mistake.) Note that in either of these contexts,
* signals may be (and often are) unnecessary because active
* workers continue scanning after running tasks without the
* need to be signalled (which is one reason work stealing is
* often faster than alternatives), so additional workers
* aren't needed.
*
* * For rapidly branching tasks that require full pool resources,
* oversignalling is OK, because signalWork will soon have no
* more workers to create or reactivate. But for others (mainly
* externally submitted tasks), overprovisioning may cause very
* noticeable slowdowns due to contention and resource
* wastage. We reduce impact by deactivating workers when
* queues don't have accessible tasks, but reactivating and
* rescanning if other tasks remain.
*
* * Despite these, signal contention and overhead effects still
* occur during ramp-up and ramp-down of small computations.
*
* Scanning. Method runWorker performs top-level scanning for (and
* execution of) tasks by polling a pseudo-random permutation of
* the array (by starting at a given index, and using a constant
* cyclically exhaustive stride.) It uses the same basic polling
* method as WorkQueue.poll(), but restarts with a different
* permutation on each rescan. The pseudorandom generator need
* not have high-quality statistical properties in the long
* permutation on each invocation. The pseudorandom generator
* need not have high-quality statistical properties in the long
* term. We use Marsaglia XorShifts, seeded with the Weyl sequence
* from ThreadLocalRandom probes, which are cheap and suffice.
* from ThreadLocalRandom probes, which are cheap and
* suffice. Each queue's polling attempts to avoid becoming stuck
* when other scanners/pollers stall. Scans do not otherwise
* explicitly take into account core affinities, loads, cache
* localities, etc, However, they do exploit temporal locality
* (which usually approximates these) by preferring to re-poll
* from the same queue after a successful poll before trying
* others, which also reduces bookkeeping, cache traffic, and
* scanning overhead. But it also reduces fairness, which is
* partially counteracted by giving up on detected interference
* (which also reduces contention when too many workers try to
* take small tasks from the same queue).
*
* Deactivation. When no tasks are found by a worker in runWorker,
* it invokes deactivate, that first deactivates (to an IDLE
* phase). Avoiding missed signals during deactivation requires a
* (conservative) rescan, reactivating if there may be tasks to
* poll. Because idle workers are often not yet blocked (parked),
* we use a WorkQueue field to advertise that a waiter actually
* needs unparking upon signal.
*
* When tasks are constructed as (recursive) DAGs, top-level
* scanning is usually infrequent, and doesn't encounter most
* of the following problems addressed by runWorker and awaitWork:
*
* Locality. Polls are organized into "runs", continuing until
* empty or contended, while also minimizing interference by
* postponing bookeeping to ends of runs. This may reduce
* fairness.
*
* Contention. When many workers try to poll few queues, they
* often collide, generating CAS failures and disrupting locality
* of workers already running their tasks. This also leads to
* stalls when tasks cannot be taken because other workers have
* not finished poll operations, which is detected by reading
* ahead in queue arrays. In both cases, workers restart scans in a
* way that approximates randomized backoff.
*
* Oversignalling. When many short top-level tasks are present in
* a small number of queues, the above signalling strategy may
* activate many more workers than needed, worsening locality and
* contention problems, while also generating more global
* contention (field ctl is CASed on every activation and
* deactivation). We filter out (both in runWorker and
* signalWork) attempted signals that are surely not needed
* because the signalled tasks are already taken.
*
* Shutdown and Quiescence
* =======================
* it tries to deactivate()), giving up (and rescanning) on "ctl"
* contention. To avoid missed signals during deactivation, the
* method rescans and reactivates if there may have been a missed
* signal during deactivation. To reduce false-alarm reactivations
* while doing so, we scan multiple times (analogously to method
* quiescent()) before trying to reactivate. Because idle workers
* are often not yet blocked (parked), we use a WorkQueue field to
* advertise that a waiter actually needs unparking upon signal.
*
* Quiescence. Workers scan looking for work, giving up when they
* don't find any, without being sure that none are available.
@ -873,7 +892,9 @@ public class ForkJoinPool extends AbstractExecutorService
* shutdown, runners are interrupted so they can cancel. Since
* external joining callers never run these tasks, they must await
* cancellation by others, which can occur along several different
* paths.
* paths. The inability to rely on caller-runs may also require
* extra signalling (resulting in scanning and contention) so is
* done only conditionally in methods push and runworker.
*
* Across these APIs, rules for reporting exceptions for tasks
* with results accessed via join() differ from those via get(),
@ -940,13 +961,9 @@ public class ForkJoinPool extends AbstractExecutorService
* less-contended applications. To help arrange this, some
* non-reference fields are declared as "long" even when ints or
* shorts would suffice. For class WorkQueue, an
* embedded @Contended isolates the very busy top index, along
* with status and bookkeeping fields written (mostly) by owners,
* that otherwise interfere with reading array and base
* fields. There are other variables commonly contributing to
* false-sharing-related performance issues (including fields of
* class Thread), but we can't do much about this except try to
* minimize access.
* embedded @Contended region segregates fields most heavily
* updated by owners from those most commonly read by stealers or
* other management.
*
* Initial sizing and resizing of WorkQueue arrays is an even more
* delicate tradeoff because the best strategy systematically
@ -955,11 +972,13 @@ public class ForkJoinPool extends AbstractExecutorService
* direct false-sharing and indirect cases due to GC bookkeeping
* (cardmarks etc), and reduce the number of resizes, which are
* not especially fast because they require atomic transfers.
* Currently, arrays are initialized to be just large enough to
* avoid resizing in most tree-structured tasks, but grow rapidly
* until large. (Maintenance note: any changes in fields, queues,
* or their uses, or JVM layout policies, must be accompanied by
* re-evaluation of these placement and sizing decisions.)
* Currently, arrays for workers are initialized to be just large
* enough to avoid resizing in most tree-structured tasks, but
* larger for external queues where both false-sharing problems
* and the need for resizing are more common. (Maintenance note:
* any changes in fields, queues, or their uses, or JVM layout
* policies, must be accompanied by re-evaluation of these
* placement and sizing decisions.)
*
* Style notes
* ===========
@ -1042,11 +1061,17 @@ public class ForkJoinPool extends AbstractExecutorService
static final int DEFAULT_COMMON_MAX_SPARES = 256;
/**
* Initial capacity of work-stealing queue array.
* Initial capacity of work-stealing queue array for workers.
* Must be a power of two, at least 2. See above.
*/
static final int INITIAL_QUEUE_CAPACITY = 1 << 6;
/**
* Initial capacity of work-stealing queue array for external queues.
* Must be a power of two, at least 2. See above.
*/
static final int INITIAL_EXTERNAL_QUEUE_CAPACITY = 1 << 9;
// conversions among short, int, long
static final int SMASK = 0xffff; // (unsigned) short bits
static final long LMASK = 0xffffffffL; // lower 32 bits of long
@ -1186,11 +1211,11 @@ public class ForkJoinPool extends AbstractExecutorService
@jdk.internal.vm.annotation.Contended("w")
int stackPred; // pool stack (ctl) predecessor link
@jdk.internal.vm.annotation.Contended("w")
volatile int parking; // nonzero if parked in awaitWork
@jdk.internal.vm.annotation.Contended("w")
volatile int source; // source queue id (or DROPPED)
@jdk.internal.vm.annotation.Contended("w")
int nsteals; // number of steals from other queues
@jdk.internal.vm.annotation.Contended("w")
volatile int parking; // nonzero if parked in awaitWork
// Support for atomic operations
private static final Unsafe U;
@ -1223,11 +1248,11 @@ public class ForkJoinPool extends AbstractExecutorService
*/
WorkQueue(ForkJoinWorkerThread owner, int id, int cfg,
boolean clearThreadLocals) {
array = new ForkJoinTask<?>[owner == null ?
INITIAL_EXTERNAL_QUEUE_CAPACITY :
INITIAL_QUEUE_CAPACITY];
this.owner = owner;
this.config = (clearThreadLocals) ? cfg | CLEAR_TLS : cfg;
if ((this.owner = owner) == null) {
array = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
phase = id | IDLE;
}
}
/**
@ -1254,27 +1279,27 @@ public class ForkJoinPool extends AbstractExecutorService
* @throws RejectedExecutionException if array could not be resized
*/
final void push(ForkJoinTask<?> task, ForkJoinPool pool, boolean internal) {
int s = top, b = base, m, cap, room; ForkJoinTask<?>[] a, na;
if ((a = array) != null && (cap = a.length) > 0) { // else disabled
int k = (m = cap - 1) & s;
if ((room = m - (s - b)) >= 0) {
int s = top, b = base, m, cap, room; ForkJoinTask<?>[] a;
if ((a = array) != null && (cap = a.length) > 0 && // else disabled
task != null) {
int pk = task.noUserHelp() + 1; // prev slot offset
if ((room = (m = cap - 1) - (s - b)) >= 0) {
top = s + 1;
long pos = slotOffset(k);
long pos = slotOffset(m & s);
if (!internal)
U.putReference(a, pos, task); // inside lock
else
U.getAndSetReference(a, pos, task); // fully fenced
if (room == 0 && (na = growArray(a, cap, s)) != null)
k = ((a = na).length - 1) & s; // resize
if (room == 0) // resize
growArray(a, cap, s);
}
if (!internal)
unlockPhase();
if (room < 0)
throw new RejectedExecutionException("Queue capacity exceeded");
if (pool != null &&
(room == 0 ||
U.getReferenceAcquire(a, slotOffset(m & (s - 1))) == null))
pool.signalWork(a, k); // may have appeared empty
if ((room == 0 || U.getReferenceAcquire(a, slotOffset(m & (s - pk))) == null) &&
pool != null)
pool.signalWork(); // may have appeared empty
}
}
@ -1283,12 +1308,11 @@ public class ForkJoinPool extends AbstractExecutorService
* @param a old array
* @param cap old array capacity
* @param s current top
* @return new array, or null on failure
*/
private ForkJoinTask<?>[] growArray(ForkJoinTask<?>[] a, int cap, int s) {
int newCap = (cap >= 1 << 16) ? cap << 1 : cap << 2;
ForkJoinTask<?>[] newArray = null;
private void growArray(ForkJoinTask<?>[] a, int cap, int s) {
int newCap = cap << 1;
if (a != null && a.length == cap && cap > 0 && newCap > 0) {
ForkJoinTask<?>[] newArray = null;
try {
newArray = new ForkJoinTask<?>[newCap];
} catch (OutOfMemoryError ex) {
@ -1305,45 +1329,34 @@ public class ForkJoinPool extends AbstractExecutorService
updateArray(newArray); // fully fenced
}
}
return newArray;
}
/**
* Takes next task, if one exists, in lifo order.
* Takes next task, if one exists, in order specified by mode,
* so acts as either local-pop or local-poll. Called only by owner.
* @param fifo nonzero if FIFO mode
*/
private ForkJoinTask<?> localPop() {
private ForkJoinTask<?> nextLocalTask(int fifo) {
ForkJoinTask<?> t = null;
int s = top - 1, cap; long k; ForkJoinTask<?>[] a;
if ((a = array) != null && (cap = a.length) > 0 &&
U.getReference(a, k = slotOffset((cap - 1) & s)) != null &&
(t = (ForkJoinTask<?>)U.getAndSetReference(a, k, null)) != null)
updateTop(s);
return t;
}
/**
* Takes next task, if one exists, in fifo order.
*/
private ForkJoinTask<?> localPoll() {
ForkJoinTask<?> t = null;
int p = top, cap; ForkJoinTask<?>[] a;
if ((a = array) != null && (cap = a.length) > 0) {
for (int b = base; p - b > 0; ) {
int nb = b + 1;
long k = slotOffset((cap - 1) & b);
if (U.getReference(a, k) == null) {
if (nb == p)
break; // else base is lagging
while (b == (b = U.getIntAcquire(this, BASE)))
Thread.onSpinWait(); // spin to reduce memory traffic
ForkJoinTask<?>[] a = array;
int b = base, p = top, cap;
if (p - b > 0 && a != null && (cap = a.length) > 0) {
for (int m = cap - 1, s, nb;;) {
if (fifo == 0 || (nb = b + 1) == p) {
if ((t = (ForkJoinTask<?>)U.getAndSetReference(
a, slotOffset(m & (s = p - 1)), null)) != null)
updateTop(s); // else lost race for only task
break;
}
else if ((t = (ForkJoinTask<?>)
U.getAndSetReference(a, k, null)) != null) {
if ((t = (ForkJoinTask<?>)U.getAndSetReference(
a, slotOffset(m & b), null)) != null) {
updateBase(nb);
break;
}
else
b = base;
while (b == (b = U.getIntAcquire(this, BASE)))
Thread.onSpinWait(); // spin to reduce memory traffic
if (p - b <= 0)
break;
}
}
return t;
@ -1351,9 +1364,10 @@ public class ForkJoinPool extends AbstractExecutorService
/**
* Takes next task, if one exists, using configured mode.
* (Always internal, never called for Common pool.)
*/
final ForkJoinTask<?> nextLocalTask() {
return (config & FIFO) == 0 ? localPop() : localPoll();
return nextLocalTask(config & FIFO);
}
/**
@ -1429,12 +1443,12 @@ public class ForkJoinPool extends AbstractExecutorService
// specialized execution methods
/**
* Runs the given task, as well as remaining local tasks
* Runs the given task, as well as remaining local tasks.
*/
final void topLevelExec(ForkJoinTask<?> task, int fifo) {
while (task != null) {
task.doExec();
task = (fifo != 0) ? localPoll() : localPop();
task = nextLocalTask(fifo);
}
}
@ -1564,7 +1578,7 @@ public class ForkJoinPool extends AbstractExecutorService
* Cancels all local tasks. Called only by owner.
*/
final void cancelTasks() {
for (ForkJoinTask<?> t; (t = localPop()) != null; ) {
for (ForkJoinTask<?> t; (t = nextLocalTask(0)) != null; ) {
try {
t.cancel(false);
} catch (Throwable ignore) {
@ -1766,8 +1780,7 @@ public class ForkJoinPool extends AbstractExecutorService
* @param w caller's WorkQueue
*/
final void registerWorker(WorkQueue w) {
if (w != null) {
w.array = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
if (w != null && (runState & STOP) == 0L) {
ThreadLocalRandom.localInit();
int seed = w.stackPred = ThreadLocalRandom.getProbe();
int phaseSeq = seed & ~((IDLE << 1) - 1); // initial phase tag
@ -1845,18 +1858,17 @@ public class ForkJoinPool extends AbstractExecutorService
}
if ((tryTerminate(false, false) & STOP) == 0L &&
phase != 0 && w != null && w.source != DROPPED) {
signalWork(); // possibly replace
w.cancelTasks(); // clean queue
signalWork(null, 0); // possibly replace
}
if (ex != null)
ForkJoinTask.rethrow(ex);
}
/**
* Releases an idle worker, or creates one if not enough exist,
* giving up if array a is nonnull and task at a[k] already taken.
* Releases an idle worker, or creates one if not enough exist.
*/
final void signalWork(ForkJoinTask<?>[] a, int k) {
final void signalWork() {
int pc = parallelism;
for (long c = ctl;;) {
WorkQueue[] qs = queues;
@ -1872,15 +1884,13 @@ public class ForkJoinPool extends AbstractExecutorService
if (sp == 0) {
if ((short)(c >>> TC_SHIFT) >= pc)
break;
nc = ((c + TC_UNIT) & TC_MASK) | ac;
nc = ((c + TC_UNIT) & TC_MASK);
}
else if ((v = w) == null)
break;
else
nc = (v.stackPred & LMASK) | (c & TC_MASK) | ac;
if (a != null && k < a.length && k >= 0 && a[k] == null)
break;
if (c == (c = ctl) && c == (c = compareAndExchangeCtl(c, nc))) {
nc = (v.stackPred & LMASK) | (c & TC_MASK);
if (c == (c = compareAndExchangeCtl(c, nc | ac))) {
if (v == null)
createWorker();
else {
@ -1963,196 +1973,178 @@ public class ForkJoinPool extends AbstractExecutorService
* @param w caller's WorkQueue (may be null on failed initialization)
*/
final void runWorker(WorkQueue w) {
if (w != null && w.phase != 0) { // else unregistered
WorkQueue[] qs;
int r = w.stackPred; // seed from registerWorker
int fifo = (int)config & FIFO, rescans = 0, inactive = 0, taken = 0, n;
while ((runState & STOP) == 0L && (qs = queues) != null &&
(n = qs.length) > 0) {
int i = r, step = (r >>> 16) | 1;
if (w != null) {
int phase = w.phase, r = w.stackPred; // seed from registerWorker
int fifo = w.config & FIFO, nsteals = 0, src = -1;
for (;;) {
WorkQueue[] qs;
r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
scan: for (int j = n; j != 0; --j, i += step) {
WorkQueue q; int qid;
if ((q = qs[qid = i & (n - 1)]) != null) {
ForkJoinTask<?>[] a; int cap; // poll queue
while ((a = q.array) != null && (cap = a.length) > 0) {
int b, nb, nk; long bp; ForkJoinTask<?> t;
if ((runState & STOP) != 0L || (qs = queues) == null)
break;
int n = qs.length, i = r, step = (r >>> 16) | 1;
boolean rescan = false;
scan: for (int l = n; l > 0; --l, i += step) { // scan queues
int j, cap; WorkQueue q; ForkJoinTask<?>[] a;
if ((q = qs[j = i & (n - 1)]) != null &&
(a = q.array) != null && (cap = a.length) > 0) {
for (int m = cap - 1, pb = -1, b = q.base;;) {
ForkJoinTask<?> t; long k;
t = (ForkJoinTask<?>)U.getReferenceAcquire(
a, bp = slotOffset((cap - 1) & (b = q.base)));
long np = slotOffset(nk = (nb = b + 1) & (cap - 1));
if (q.base == b) { // else inconsistent
if (t == null) {
if (q.array == a) { // else resized
if (rescans > 0) // ran or stalled
break scan;
if (U.getReference(a, np) == null &&
(rescans >= 0 ||
(U.getReferenceAcquire(a, bp) == null &&
q.top == q.base)))
break;
rescans = 1; // may be stalled
a, k = slotOffset(m & b));
if (b != (b = q.base) || t == null ||
!U.compareAndSetReference(a, k, t, null)) {
if (a[b & m] == null) {
if (rescan) // end of run
break scan;
if (a[(b + 1) & m] == null &&
a[(b + 2) & m] == null) {
break; // probably empty
}
}
else if (inactive != 0) {
if ((inactive = tryReactivate(w)) != 0) {
rescans = 1; // can't take yet
if (pb == (pb = b)) { // track progress
rescan = true; // stalled; reorder scan
break scan;
}
}
else if (U.compareAndSetReference(a, bp, t, null)) {
q.base = nb;
Object nt = U.getReferenceAcquire(a, np);
w.source = qid;
rescans = 1;
++taken;
if (nt != null && // confirm a[nk]
U.getReferenceAcquire(a, np) == nt)
signalWork(a, nk); // propagate
w.topLevelExec(t, fifo);
}
}
else {
boolean propagate;
int nb = q.base = b + 1, prevSrc = src;
w.nsteals = ++nsteals;
w.source = src = j; // volatile
rescan = true;
int nh = t.noUserHelp();
if (propagate =
(prevSrc != src || nh != 0) && a[nb & m] != null)
signalWork();
w.topLevelExec(t, fifo);
if ((b = q.base) != nb && !propagate)
break scan; // reduce interference
}
}
}
}
if (rescans >= 0)
--rescans;
else if (inactive == 0) {
if ((inactive = deactivate(w, taken)) != 0)
taken = 0;
if (!rescan) {
if (((phase = deactivate(w, phase)) & IDLE) != 0)
break;
src = -1; // re-enable propagation
}
else if (awaitWork(w) == 0)
inactive = rescans = 0;
else
break;
}
}
}
/**
* Tries to deactivate worker, keeping active on contention
* Deactivates and if necessary awaits signal or termination.
*
* @param w the work queue
* @param taken number of stolen tasks since last deactivation
* @return nonzero if inactive
* @param w the worker
* @param phase current phase
* @return current phase, with IDLE set if worker should exit
*/
private int deactivate(WorkQueue w, int taken) {
int inactive = 0, phase;
if (w != null && (inactive = (phase = w.phase) & IDLE) == 0) {
long sp = (phase + (IDLE << 1)) & LMASK, pc, c;
w.phase = phase | IDLE;
w.stackPred = (int)(pc = ctl); // set ctl stack link
if (!compareAndSetCtl( // try to enqueue
pc, c = ((pc - RC_UNIT) & UMASK) | sp))
w.phase = phase; // back out on contention
else {
if (taken != 0) {
w.nsteals += taken;
if ((w.config & CLEAR_TLS) != 0 &&
(Thread.currentThread() instanceof ForkJoinWorkerThread f))
f.resetThreadLocals(); // (instanceof check always true)
}
if (((c & RC_MASK) == 0L && quiescent() > 0) || taken == 0)
inactive = w.phase & IDLE; // check quiescent termination
else { // spin for approx 1 scan cost
int tc = (short)(c >>> TC_SHIFT);
int spins = Math.max((tc << 1) + tc, SPIN_WAITS);
while ((inactive = w.phase & IDLE) != 0 && --spins != 0)
Thread.onSpinWait();
}
}
}
return inactive;
}
private int deactivate(WorkQueue w, int phase) {
if (w == null) // currently impossible
return IDLE;
int p = phase | IDLE, activePhase = phase + (IDLE << 1);
long pc = ctl, qc = (activePhase & LMASK) | ((pc - RC_UNIT) & UMASK);
int sp = w.stackPred = (int)pc; // set ctl stack link
w.phase = p;
if (!compareAndSetCtl(pc, qc)) // try to enqueue
return w.phase = phase; // back out on possible signal
int ac = (short)(qc >>> RC_SHIFT), n; long e; WorkQueue[] qs;
if (((e = runState) & STOP) != 0L ||
((e & SHUTDOWN) != 0L && ac == 0 && quiescent() > 0) ||
(qs = queues) == null || (n = qs.length) <= 0)
return IDLE; // terminating
/**
* Reactivates worker w if it is currently top of ctl stack
*
* @param w the work queue
* @return 0 if now active
*/
private int tryReactivate(WorkQueue w) {
int inactive = 0;
if (w != null) { // always true; hoist checks
int sp = w.stackPred, phase, activePhase; long c;
if ((inactive = (phase = w.phase) & IDLE) != 0 &&
(int)(c = ctl) == (activePhase = phase + IDLE) &&
compareAndSetCtl(c, (sp & LMASK) | ((c + RC_UNIT) & UMASK))) {
w.phase = activePhase;
inactive = 0;
}
for (int prechecks = Math.min(ac, 2), // reactivation threshold
k = Math.max(n + (n << 1), SPIN_WAITS << 1);;) {
WorkQueue q; int cap; ForkJoinTask<?>[] a; long c;
if (w.phase == activePhase)
return activePhase;
if (--k < 0)
return awaitWork(w, p); // block, drop, or exit
if ((q = qs[k & (n - 1)]) == null)
Thread.onSpinWait();
else if ((a = q.array) != null && (cap = a.length) > 0 &&
a[q.base & (cap - 1)] != null && --prechecks < 0 &&
(int)(c = ctl) == activePhase &&
compareAndSetCtl(c, (sp & LMASK) | ((c + RC_UNIT) & UMASK)))
return w.phase = activePhase; // reactivate
}
return inactive;
}
/**
* Awaits signal or termination.
*
* @param w the work queue
* @return 0 if now active
* @param p current phase (known to be idle)
* @return current phase, with IDLE set if worker should exit
*/
private int awaitWork(WorkQueue w) {
int inactive = 0, phase;
if (w != null) { // always true; hoist checks
long waitTime = (w.source == INVALID_ID) ? 0L : keepAlive;
if ((inactive = (phase = w.phase) & IDLE) != 0) {
private int awaitWork(WorkQueue w, int p) {
if (w != null) {
ForkJoinWorkerThread t; long deadline;
if ((w.config & CLEAR_TLS) != 0 && (t = w.owner) != null)
t.resetThreadLocals(); // clear before reactivate
if ((ctl & RC_MASK) > 0L)
deadline = 0L;
else if ((deadline =
(((w.source != INVALID_ID) ? keepAlive : TIMEOUT_SLOP)) +
System.currentTimeMillis()) == 0L)
deadline = 1L; // avoid zero
int activePhase = p + IDLE;
if ((p = w.phase) != activePhase && (runState & STOP) == 0L) {
LockSupport.setCurrentBlocker(this);
int activePhase = phase + IDLE;
for (long deadline = 0L;;) {
Thread.interrupted(); // clear status
w.parking = 1; // enable unpark
while ((p = w.phase) != activePhase) {
boolean trimmable = false; int trim;
Thread.interrupted(); // clear status
if ((runState & STOP) != 0L)
break;
boolean trimmable = false; // use timed wait if trimmable
long d = 0L, c;
if (((c = ctl) & RC_MASK) == 0L && (int)c == activePhase) {
long now = System.currentTimeMillis();
if (deadline == 0L)
deadline = waitTime + now;
if (deadline - now <= TIMEOUT_SLOP) {
if (tryTrim(w, c, activePhase))
break;
continue; // lost race to trim
}
d = deadline;
trimmable = true;
if (deadline != 0L) {
if ((trim = tryTrim(w, p, deadline)) > 0)
break;
else if (trim < 0)
deadline = 0L;
else
trimmable = true;
}
w.parking = 1; // enable unpark and recheck
if ((inactive = w.phase & IDLE) != 0)
U.park(trimmable, d);
w.parking = 0; // close unpark window
if (inactive == 0 || (inactive = w.phase & IDLE) == 0)
break;
U.park(trimmable, deadline);
}
w.parking = 0;
LockSupport.setCurrentBlocker(null);
}
}
return inactive;
return p;
}
/**
* Tries to remove and deregister worker after timeout, and release
* another to do the same unless new tasks are found.
* another to do the same.
* @return > 0: trimmed, < 0 : not trimmable, else 0
*/
private boolean tryTrim(WorkQueue w, long c, int activePhase) {
if (w != null) {
int vp, i; WorkQueue[] vs; WorkQueue v;
long nc = ((w.stackPred & LMASK) |
((RC_MASK & c) | (TC_MASK & (c - TC_UNIT))));
if (compareAndSetCtl(c, nc)) {
w.source = DROPPED;
w.phase = activePhase;
if ((vp = (int)nc) != 0 && (vs = queues) != null &&
vs.length > (i = vp & SMASK) && (v = vs[i]) != null &&
compareAndSetCtl( // try to wake up next waiter
nc, ((v.stackPred & LMASK) |
((UMASK & (nc + RC_UNIT)) | (nc & TC_MASK))))) {
v.source = INVALID_ID; // enable cascaded timeouts
v.phase = vp;
U.unpark(v.owner);
}
return true;
private int tryTrim(WorkQueue w, int phase, long deadline) {
long c, nc; int stat, activePhase, vp, i; WorkQueue[] vs; WorkQueue v;
if ((activePhase = phase + IDLE) != (int)(c = ctl) || w == null)
stat = -1; // no longer ctl top
else if (deadline - System.currentTimeMillis() >= TIMEOUT_SLOP)
stat = 0; // spurious wakeup
else if (!compareAndSetCtl(
c, nc = ((w.stackPred & LMASK) | (RC_MASK & c) |
(TC_MASK & (c - TC_UNIT)))))
stat = -1; // lost race to signaller
else {
stat = 1;
w.source = DROPPED;
w.phase = activePhase;
if ((vp = (int)nc) != 0 && (vs = queues) != null &&
vs.length > (i = vp & SMASK) && (v = vs[i]) != null &&
compareAndSetCtl( // try to wake up next waiter
nc, ((UMASK & (nc + RC_UNIT)) |
(nc & TC_MASK) | (v.stackPred & LMASK)))) {
v.source = INVALID_ID; // enable cascaded timeouts
v.phase = vp;
U.unpark(v.owner);
}
}
return false;
return stat;
}
/**
@ -2569,35 +2561,52 @@ public class ForkJoinPool extends AbstractExecutorService
/**
* Finds and locks a WorkQueue for an external submitter, or
* throws RejectedExecutionException if shutdown
* throws RejectedExecutionException if shutdown or terminating.
* @param r current ThreadLocalRandom.getProbe() value
* @param rejectOnShutdown true if RejectedExecutionException
* should be thrown when shutdown
* should be thrown when shutdown (else only if terminating)
*/
final WorkQueue externalSubmissionQueue(boolean rejectOnShutdown) {
int r;
if ((r = ThreadLocalRandom.getProbe()) == 0) {
ThreadLocalRandom.localInit(); // initialize caller's probe
private WorkQueue submissionQueue(int r, boolean rejectOnShutdown) {
int reuse; // nonzero if prefer create
if ((reuse = r) == 0) {
ThreadLocalRandom.localInit(); // initialize caller's probe
r = ThreadLocalRandom.getProbe();
}
for (;;) {
WorkQueue q; WorkQueue[] qs; int n, id, i;
if ((qs = queues) == null || (n = qs.length) <= 0)
for (int probes = 0; ; ++probes) {
int n, i, id; WorkQueue[] qs; WorkQueue q;
if ((qs = queues) == null)
break;
if ((n = qs.length) <= 0)
break;
if ((q = qs[i = (id = r & EXTERNAL_ID_MASK) & (n - 1)]) == null) {
WorkQueue newq = new WorkQueue(null, id, 0, false);
lockRunState();
if (qs[i] == null && queues == qs)
q = qs[i] = newq; // else lost race to install
WorkQueue w = new WorkQueue(null, id, 0, false);
w.phase = id;
boolean reject = ((lockRunState() & SHUTDOWN) != 0 &&
rejectOnShutdown);
if (!reject && queues == qs && qs[i] == null)
q = qs[i] = w; // else lost race to install
unlockRunState();
}
if (q != null && q.tryLockPhase()) {
if (rejectOnShutdown && (runState & SHUTDOWN) != 0L) {
q.unlockPhase(); // check while q lock held
if (q != null)
return q;
if (reject)
break;
}
return q;
reuse = 0;
}
r = ThreadLocalRandom.advanceProbe(r); // move
if (reuse == 0 || !q.tryLockPhase()) { // move index
if (reuse == 0) {
if (probes >= n >> 1)
reuse = r; // stop prefering free slot
}
else if (q != null)
reuse = 0; // probe on collision
r = ThreadLocalRandom.advanceProbe(r);
}
else if (rejectOnShutdown && (runState & SHUTDOWN) != 0L) {
q.unlockPhase(); // check while q lock held
break;
}
else
return q;
}
throw new RejectedExecutionException();
}
@ -2611,12 +2620,24 @@ public class ForkJoinPool extends AbstractExecutorService
}
else { // find and lock queue
internal = false;
q = externalSubmissionQueue(true);
q = submissionQueue(ThreadLocalRandom.getProbe(), true);
}
q.push(task, signalIfEmpty ? this : null, internal);
return task;
}
/**
* Returns queue for an external submission, bypassing call to
* submissionQueue if already established and unlocked.
*/
final WorkQueue externalSubmissionQueue(boolean rejectOnShutdown) {
WorkQueue[] qs; WorkQueue q; int n;
int r = ThreadLocalRandom.getProbe();
return (((qs = queues) != null && (n = qs.length) > 0 &&
(q = qs[r & EXTERNAL_ID_MASK & (n - 1)]) != null && r != 0 &&
q.tryLockPhase()) ? q : submissionQueue(r, rejectOnShutdown));
}
/**
* Returns queue for an external thread, if one exists that has
* possibly ever submitted to the given pool (nonzero probe), or
@ -3295,7 +3316,7 @@ public class ForkJoinPool extends AbstractExecutorService
if ((config & PRESET_SIZE) != 0)
throw new UnsupportedOperationException("Cannot override System property");
if ((prevSize = getAndSetParallelism(size)) < size)
signalWork(null, 0); // trigger worker activation
signalWork(); // trigger worker activation
return prevSize;
}

View File

@ -28,7 +28,6 @@
*/
import java.util.concurrent.Callable;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.atomic.AtomicInteger;
public class Starvation {
@ -43,7 +42,7 @@ public class Starvation {
while (count.get() == c) Thread.onSpinWait();
return null; }};
static void testSubmitExternalCallable() throws Exception {
public static void main(String[] args) throws Exception {
try (var pool = new ForkJoinPool(2)) {
for (int i = 0; i < 100_000; i++) {
var future1 = pool.submit(new AwaitCount(i));
@ -54,21 +53,4 @@ public class Starvation {
}
}
}
static void testSubmitAdaptedCallable() throws Exception {
try (var pool = new ForkJoinPool(2)) {
for (int i = 0; i < 100_000; i++) {
var future1 = pool.submit(new AwaitCount(i));
var future2 = pool.submit(ForkJoinTask.adapt(noop));
future2.get();
count.set(i + 1);
future1.get();
}
}
}
public static void main(String[] args) throws Exception {
testSubmitExternalCallable();
testSubmitAdaptedCallable();
}
}