From a1e5ce94e00dbddea8447b063f569af9c86cb578 Mon Sep 17 00:00:00 2001
From: Doug Lea
Date: Sun, 18 Jan 2026 16:03:31 -0500
Subject: [PATCH] Simplify scan mode control by moving and reworking
topLevelExec and throwing on trim
---
.../java/util/concurrent/ForkJoinPool.java | 267 +++++++++---------
1 file changed, 132 insertions(+), 135 deletions(-)
diff --git a/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java b/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java
index f59b22eda36..efe063d136f 100644
--- a/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java
+++ b/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java
@@ -1168,6 +1168,12 @@ public class ForkJoinPool extends AbstractExecutorService
}
}
+ /**
+ * Exception thrown in tryTrim after idle timeout
+ */
+ @SuppressWarnings("serial")
+ static final class WorkerTrimmedException extends RuntimeException { }
+
/**
* Queues supporting work-stealing as well as external task
* submission. See above for descriptions and algorithms.
@@ -1309,23 +1315,20 @@ public class ForkJoinPool extends AbstractExecutorService
* so acts as either local-pop or local-poll. Called only by owner.
* @param fifo nonzero if FIFO mode
*/
- private ForkJoinTask> nextLocalTask(int fifo) {
+ final ForkJoinTask> nextLocalTask(int fifo) {
ForkJoinTask> t = null;
ForkJoinTask>[] a = array;
int b = base, s = top - 1, cap;
if (a != null && s - b >= 0 && (cap = a.length) > 0) {
if (fifo == 0) {
if ((t = (ForkJoinTask>)U.getAndSetReference(
- a, slotOffset((cap - 1) & s), null)) != null) {
- top = s;
- U.storeFence();
- }
+ a, slotOffset((cap - 1) & s), null)) != null)
+ U.putIntOpaque(this, TOP, s);
} else {
do {
if ((t = (ForkJoinTask>)U.getAndSetReference(
a, slotOffset((cap - 1) & b), null)) != null) {
- base = b + 1;
- U.storeFence();
+ U.putIntVolatile(this, BASE, b + 1);
break;
}
if (b == s)
@@ -1359,8 +1362,7 @@ public class ForkJoinPool extends AbstractExecutorService
(internal || (lock = tryLockPhase()) != 1)) {
if (top == p && U.compareAndSetReference(a, k, task, null)) {
taken = true;
- top = s;
- U.storeFence();
+ U.putIntOpaque(this, TOP, s);
}
if (!internal)
phase = lock + NEXTIDLE;
@@ -1409,8 +1411,7 @@ public class ForkJoinPool extends AbstractExecutorService
Thread.onSpinWait(); // stalled
}
else if (U.compareAndSetReference(a, k, t, null)) {
- base = nb;
- U.storeFence();
+ U.putIntVolatile(this, BASE, nb);
return t;
}
}
@@ -1419,16 +1420,6 @@ public class ForkJoinPool extends AbstractExecutorService
// specialized execution methods
- /**
- * Runs the given task, as well as remaining local tasks
- */
- final void topLevelExec(ForkJoinTask> task, int fifo) {
- while (task != null) {
- task.doExec();
- task = nextLocalTask(fifo);
- }
- }
-
/**
* Deep form of tryUnpush: Traverses from top and removes and
* runs task if present.
@@ -1508,10 +1499,8 @@ public class ForkJoinPool extends AbstractExecutorService
break;
if (taken =
(top == p &&
- U.compareAndSetReference(a, k, t, null))) {
- top = s;
- U.storeFence();
- }
+ U.compareAndSetReference(a, k, t, null)))
+ U.putIntOpaque(this, TOP, s);
if (!internal)
phase = lock + NEXTIDLE;
if (!taken)
@@ -1548,8 +1537,7 @@ public class ForkJoinPool extends AbstractExecutorService
break;
if (base == b && t != null &&
U.compareAndSetReference(a, k, t, null)) {
- base = b + 1;
- U.storeFence();
+ U.putIntVolatile(this, BASE, b + 1);
t.doExec();
}
}
@@ -1787,6 +1775,8 @@ public class ForkJoinPool extends AbstractExecutorService
final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
WorkQueue w = null; // null if not created
int phase = 0; // 0 if not registered
+ if (ex instanceof WorkerTrimmedException)
+ ex = null;
if (wt != null && (w = wt.workQueue) != null &&
(phase = w.phase) != 0 && (phase & IDLE) != 0)
releaseWaiters(); // ensure released
@@ -1828,8 +1818,8 @@ public class ForkJoinPool extends AbstractExecutorService
* @param base src's base index for the task
*/
final void signalWork(WorkQueue src, int base) {
- int pc = parallelism, i, sp; // rely on caller sync for initial reads
- long c = U.getLong(this, CTL);
+ int pc = parallelism, i, sp;
+ long c = U.getLong(this, CTL); // rely on caller sync for initial reads
WorkQueue[] qs = queues;
while ((short)(c >>> RC_SHIFT) < pc && qs != null &&
qs.length > (i = (sp = (int)c) & SMASK)) {
@@ -1935,14 +1925,14 @@ public class ForkJoinPool extends AbstractExecutorService
final void runWorker(WorkQueue w) {
if (w != null) {
int r = w.stackPred; // seed from registerWorker
- int fifo = (int)config & FIFO;
- int src = 0, idle = 0, rescans = 0, taken = 0;
- for (;;) {
+ int fifo = (int)config & FIFO, idle = 0;
+ for (boolean scanned = false;;) {
WorkQueue[] qs;
long e = runState;
int n = ((qs = queues) == null) ? 0 : qs.length;
r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
int i = r, step = (r >>> 16) | 1;
+ boolean found = false;
if ((e & STOP) != 0L || n <= 0)
break;
scan: for (int j = n; j != 0; --j, i += step) {
@@ -1950,91 +1940,127 @@ public class ForkJoinPool extends AbstractExecutorService
if ((q = qs[qid = i & (n - 1)]) != null) {
ForkJoinTask>[] a; int cap; // poll queue
while ((a = q.array) != null && (cap = a.length) > 0) {
- int b, nb; long bp; ForkJoinTask> t;
+ int b, nb; long bp, np; ForkJoinTask> t;
t = (ForkJoinTask>)U.getReferenceAcquire(
a, bp = slotOffset((cap - 1) & (b = q.base)));
- long np = slotOffset((nb = b + 1) & (cap - 1));
+ Object nt = U.getReference(
+ a, np = slotOffset((cap - 1) & (nb = b + 1)));
if (q.array == a && q.base == b &&
U.getReference(a, bp) == t) {
if (t == null) {
- if (rescans > 0) // ran or stalled
- break scan;
- if (U.getReference(a, np) == null &&
- (rescans == 0 || q.top == b))
- break; // retry at most twice
- ++rescans; // may be stalled
+ if (nt == null && (!scanned || q.top == b)) {
+ found = false;
+ break;
+ }
+ if (found)
+ break scan; // stall check
+ found = true;
}
else if (idle != 0) {
- if ((idle = tryReactivate(w)) != 0) {
- rescans = 1;
+ found = true;
+ if ((idle = tryReactivate(w)) != 0)
break scan; // can't take yet
- }
- rescans = 0;
}
else if (U.compareAndSetReference(a, bp, t, null)) {
- Object nt = U.getReference(a, np);
+ nt = U.getReference(a, np);
U.getAndSetInt(q, WorkQueue.BASE, nb);
if (nt != null) // propagate
signalWork(q, nb);
- rescans = 1;
- if (taken++ == 0 || src != qid)
- w.source = src = qid;
- w.topLevelExec(t, fifo);
+ topLevelExec(t, w, q, fifo, qid);
+ found = true;
+ break scan;
}
}
}
}
}
- if (rescans > 0)
- rescans = 0;
+ if (found)
+ scanned = false;
+ else if (!scanned && idle == 0)
+ scanned = true;
+ else if (!scanned || idle == 0) {
+ if ((idle = deactivate(w, idle)) == 0)
+ scanned = false;
+ }
else {
- if (rescans == 0)
- rescans = -1;
- else if (idle == 0) {
- if ((idle = deactivate(w)) == 0)
- rescans = 0;
- }
- else if ((idle = rescans = awaitWork(w)) != 0)
+ awaitWork(w);
+ idle = 0;
+ scanned = false;
+ }
+ }
+ }
+ }
+
+ private void topLevelExec(ForkJoinTask> t, WorkQueue w, WorkQueue q,
+ int fifo, int qid) {
+ if (t != null && q != null && w != null) { // always true; hoist checks
+ w.source = qid;
+ int taken = 1;
+ for (;;) {
+ t.doExec();
+ if ((t = w.nextLocalTask(fifo)) == null) {
+ ForkJoinTask>[] a; int cap, b, nb; long bp, np;
+ if ((a = q.array) == null || (cap = a.length) <= 0)
+ break; // similar to runWorker scan
+ t = (ForkJoinTask>)U.getReferenceAcquire(
+ a, bp = slotOffset((cap - 1) & (b = q.base)));
+ Object nt = U.getReference(
+ a, np = slotOffset((cap - 1) & (nb = b + 1)));
+ if (t == null || q.base != b ||
+ !U.compareAndSetReference(a, bp, t, null))
break;
- if (idle != 0) {
- int ns = taken;
- taken = 0;
- if ((idle = onEmptyScan(w, ns)) == 0)
- rescans = 0;
- }
+ nt = U.getReference(a, np);
+ U.getAndSetInt(q, WorkQueue.BASE, nb);
+ ++taken;
+ if (nt != null && // prevent stalls
+ (fifo != 0 || (t instanceof ForkJoinTask.InterruptibleTask)))
+ signalWork(q, nb);
}
}
w.nsteals += taken;
+ if ((w.config & CLEAR_TLS) != 0 &&
+ (Thread.currentThread() instanceof ForkJoinWorkerThread f))
+ f.resetThreadLocals(); // (instanceof check always true)
}
}
/**
- * Deactivates and enqueues worker, possibly backing out on signal
- * contention.
+ * Possibly deactivates and pauses worker
*
* @param w the work queue
- * @return active status
+ * @param idle active status
+ * @return 0 if now active
*/
- private int deactivate(WorkQueue w) {
- int idle = 0;
- if (w != null) { // always true; hoist checks
- int phase = U.getInt(w, WorkQueue.PHASE);
- long sp = (phase + NEXTIDLE) & LMASK, pc = ctl;
- U.putInt(w, WorkQueue.PHASE, phase | IDLE);
- for (;;) { // try to enqueue
- w.stackPred = (int)pc;
- long c = (pc - RC_UNIT) & UMASK | sp, ac = c & RC_MASK;
- if (pc == (pc = U.compareAndExchangeLong(this, CTL, pc, c))) {
- if (ac == 0L) // check quiescent termination
- quiescent();
- idle = w.phase & IDLE;
- break;
- }
- else if (ac < (pc & RC_MASK)) {
- w.phase = phase; // back out if lost to signal
- break;
+ private int deactivate(WorkQueue w, int idle) {
+ if (w != null) { // always true; hoist checks
+ int phase = w.phase;
+ if (idle != 0) // already deactivated
+ idle = phase & IDLE;
+ else { // try to deactivate
+ long sp = (phase + NEXTIDLE) & LMASK, pc = ctl;
+ U.putInt(w, WorkQueue.PHASE, phase | IDLE);
+ for (;;) { // enqueue
+ w.stackPred = (int)pc;
+ long c = (pc - RC_UNIT) & UMASK | sp, ac = c & RC_MASK;
+ if (pc == (pc = U.compareAndExchangeLong(this, CTL, pc, c))) {
+ if (ac == 0L) // check quiescent termination
+ quiescent();
+ idle = w.phase & IDLE;
+ break;
+ }
+ else if (ac < (pc & RC_MASK)) {
+ w.phase = phase; // back out if lost to signal
+ break;
+ }
}
}
+ if (idle != 0 && (runState & STOP) == 0L) {
+ int noise = (phase ^ (phase >>> 16)) & (SPIN_WAITS - 1);
+ int spins = (SPIN_WAITS << 1) | noise;
+ Thread.yield(); // helps when oversubscribed
+ while ((idle = w.phase & IDLE) != 0 && --spins > 0)
+ Thread.onSpinWait();
+ }
}
return idle;
}
@@ -2060,80 +2086,52 @@ public class ForkJoinPool extends AbstractExecutorService
return idle;
}
- /**
- * Spins and/or yields to reduce unproductive scanning
- *
- * @param w the work queue
- * @param taken number of stolen tasks since last empty scan
- * @return active status
- */
- private int onEmptyScan(WorkQueue w, int taken) {
- int idle = 0;
- if (w != null) { // always true; hoist checks
- if (taken != 0) {
- w.nsteals += taken;
- if ((w.config & CLEAR_TLS) != 0 &&
- (Thread.currentThread() instanceof ForkJoinWorkerThread f))
- f.resetThreadLocals(); // (instanceof check always true)
- Thread.interrupted(); // clear status
- }
- if ((idle = w.phase & IDLE) != 0 && (runState & STOP) == 0) {
- if (taken == 0)
- Thread.yield();
- for (int s = SPIN_WAITS; (idle = w.phase & IDLE) != 0 && --s != 0;)
- Thread.onSpinWait();
- }
- }
- return idle;
- }
-
/**
* Awaits signal or termination.
*
* @param w the work queue
- * @return 0 if now active
+ * @throws WorkerTrimmedException on idle timeout
*/
- private int awaitWork(WorkQueue w) {
- int idle = 0, phase;
- if (w != null && (idle = (phase = w.phase) & IDLE) != 0) {
- LockSupport.setCurrentBlocker(this);
+ private void awaitWork(WorkQueue w) {
+ int phase;
+ if (w != null && ((phase = w.phase) & IDLE) != 0) {
int activePhase = phase + IDLE;
long deadline = 0L, waitTime = (w.source == INVALID_ID) ? 0L : keepAlive;
- while ((runState & STOP) == 0L) {
- boolean trimmable = false; // use timed wait if trimmable
+ LockSupport.setCurrentBlocker(this);
+ for (;;) {
+ Thread.interrupted(); // clear status
+ if ((runState & STOP) != 0L)
+ break;
+ int idle;
long d = 0L, c; // trim at head
+ boolean trimmable = false; // use timed wait if trimmable
if ((int)(c = ctl) == activePhase && (c & RC_MASK) == 0L) {
+ trimmable = true;
long now = System.currentTimeMillis();
if (deadline == 0L)
deadline = waitTime + now;
- if (deadline - now <= TIMEOUT_SLOP) {
- if (tryTrim(w, c, activePhase))
- break;
- continue; // lost race to trim
- }
- d = deadline;
- trimmable = true;
+ if ((d = deadline) - now <= TIMEOUT_SLOP)
+ tryTrim(w, c, activePhase); // throws if trimmed
}
- if ((idle = w.phase & IDLE) == 0)
+ if ((w.phase & IDLE) == 0)
break;
w.parking = 1; // enable unpark and recheck
if ((idle = w.phase & IDLE) != 0)
U.park(trimmable, d);
w.parking = 0; // close unpark window
- if (idle == 0 || (idle = w.phase & IDLE) == 0)
+ if (idle == 0 || (w.phase & IDLE) == 0)
break;
- Thread.interrupted(); // clear status for next park
}
LockSupport.setCurrentBlocker(null);
}
- return idle;
}
/**
* Tries to remove and deregister worker after timeout, and release
* another to do the same unless new tasks are found.
+ * @throws WorkerTrimmedException on idle timeout
*/
- private boolean tryTrim(WorkQueue w, long c, int activePhase) {
+ private void tryTrim(WorkQueue w, long c, int activePhase) {
if (w != null) {
int vp, i; WorkQueue[] vs; WorkQueue v;
long nc = ((w.stackPred & LMASK) |
@@ -2150,10 +2148,10 @@ public class ForkJoinPool extends AbstractExecutorService
v.phase = vp;
U.unpark(v.owner);
}
- return true;
+ LockSupport.setCurrentBlocker(null);
+ throw new WorkerTrimmedException();
}
}
- return false;
}
/**
@@ -2382,8 +2380,7 @@ public class ForkJoinPool extends AbstractExecutorService
if (eligible) {
if (U.compareAndSetReference(
a, k, t, null)) {
- q.base = b + 1;
- U.storeFence();
+ U.putIntVolatile(q, WorkQueue.BASE, b + 1);
t.doExec();
locals = rescan = true;
break scan;