From 5f84c2a9803bcb64eb8b6d92f14f8a88e93eca06 Mon Sep 17 00:00:00 2001
From: Doug Lea
Date: Mon, 16 Feb 2026 19:18:03 -0500
Subject: [PATCH] More policy exploration
---
.../java/util/concurrent/ForkJoinPool.java | 144 +++++++++---------
1 file changed, 70 insertions(+), 74 deletions(-)
diff --git a/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java b/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java
index 80ac3a9a71f..0f04f9a891a 100644
--- a/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java
+++ b/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java
@@ -1933,66 +1933,62 @@ public class ForkJoinPool extends AbstractExecutorService
if (w != null) {
int r = w.stackPred; // seed from registerWorker
int fifo = (int)config & FIFO;
- for (int idle = 0, taken = 0, src = -1;;) {
+ for (int nsteals = 0, idle = 0, src = -1;;) {
WorkQueue[] qs; int n, j;
r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
long e = runState;
if ((qs = queues) == null ||
(j = ((n = qs.length) << 1)) <= 0 || // 2 sweeps per scan
- (e & STOP) != 0L) {
- w.nsteals += taken;
+ (e & STOP) != 0L)
return;
- }
scan: for (int i = r, step = (r >>> 16) | 1; ; i += step) {
WorkQueue q; int qid;
if ((q = qs[qid = i & (n - 1)]) != null) {
- for (int pb = -1, found = 0;;) {
- ForkJoinTask> t;
- ForkJoinTask>[] a = q.array;
- int b = q.base, cap, nb;
- if (a == null || (cap = a.length) <= 0)
- break;
- long np = slotOffset((cap - 1) & (nb = b + 1)), bp;
+ ForkJoinTask>[] a; int pb = -1, taken = 0, cap;
+ while ((a = q.array) != null && (cap = a.length) > 0) {
+ ForkJoinTask> t; int b, nb, prevSrc; long bp;
t = (ForkJoinTask>)U.getReferenceAcquire(
- a, bp = slotOffset((cap - 1) & b));
- if (q.array != a || q.base != b ||
- U.getReference(a, bp) != t)
- ; // inconsistent
- else if (t == null ||
- (idle == 0 &&
- !U.compareAndSetReference(a, bp, t, null))) {
- if (U.getReference(a, np) == null) {
- if (found != 0) // end of run
+ a, bp = slotOffset((cap - 1) & (b = q.base)));
+ if (q.array == a && q.base == b) { // else inconsistent
+ long np = slotOffset((cap - 1) & (nb = b + 1));
+ if (t == null ||
+ (idle == 0 &&
+ !U.compareAndSetReference(a, bp, t, null))) {
+ if (taken != 0) { // end of run
+ w.nsteals = nsteals += taken;
break scan;
- break; // probably empty
+ }
+ if (U.getReference(a, bp) == null) {
+ if (U.getReference(a, np) == null)
+ break; // probably empty
+ if (pb == (pb = b))
+ break scan; // stalled; reorder scan
+ }
+ }
+ else if (idle != 0) {
+ if ((idle = tryReactivate(w)) != 0)
+ break scan; // can't take yet
+ }
+ else {
+ q.base = nb;
+ Object nt = U.getReferenceVolatile(a, np);
+ if (qid != (prevSrc = src))
+ w.source = src = qid;
+ ++taken;
+ if (nt != null && (prevSrc < 0 || fifo != 0) &&
+ U.getReference(a, np) != null)
+ signalWork(q, nb);
+ w.topLevelExec(t, fifo);
}
- if (pb == (pb = b)) // track progress
- break scan; // stalled; reorder scan
- }
- else if (idle != 0) {
- found = 1;
- if ((idle = tryReactivate(w)) != 0)
- break scan; // can't take yet
- }
- else {
- boolean propagate;
- q.base = nb;
- Object nt = U.getReferenceVolatile(a, np);
- if (qid != src)
- w.source = src = qid;
- found = 1;
- if (propagate =
- ((++taken == 1 || fifo != 0) &&
- nt != null && U.getReference(a, np) != null))
- signalWork(q, nb);
- w.topLevelExec(t, fifo);
}
}
}
if (--j == 0) { // empty scan
- if ((idle = deactivate(w, idle, taken)) == -1)
+ if (idle == 0)
+ idle = tryDeactivate(w, src);
+ else if ((idle = awaitWork(w)) != 0)
return; // trimmed or terminated
- taken = 0;
+ src = -1;
break;
}
}
@@ -2004,50 +2000,51 @@ public class ForkJoinPool extends AbstractExecutorService
* Possibly deactivates worker.
*
* @param w the work queue
- * @param idle current idle status
- * @param taken number of tasks stolen since last call
- * @return idle status or -1 for exit
+ * @return idle status
*/
- private int deactivate(WorkQueue w, int idle, int taken) {
+ private int tryDeactivate(WorkQueue w, int src) {
+ int idle = 0;
if (w != null) { // always true; hoist checks
- int phase = w.phase;
- if (taken != 0) { // postpone
- w.nsteals += taken;
- if ((w.config & CLEAR_TLS) != 0 &&
- (Thread.currentThread() instanceof ForkJoinWorkerThread f))
- f.resetThreadLocals(); // (instanceof check always true)
- Thread.interrupted(); // clear status
- }
- else if (idle == 0) {
- long sp = (phase + NEXTIDLE) & LMASK, pc = ctl;
- U.putInt(w, WorkQueue.PHASE, phase | IDLE);
- for (;;) { // enqueue
- w.stackPred = (int)pc;
- long c = (pc - RC_UNIT) & UMASK | sp, ac = c & RC_MASK;
- if (pc == (pc = U.compareAndExchangeLong(this, CTL, pc, c))) {
- if (ac <= 0L && (runState & (SHUTDOWN|STOP)) == SHUTDOWN)
+ if (src >= 0 && (w.config & CLEAR_TLS) != 0 &&
+ (Thread.currentThread() instanceof ForkJoinWorkerThread f))
+ f.resetThreadLocals(); // (instanceof check always true)
+ int phase = U.getInt(w, WorkQueue.PHASE);
+ long sp = (phase + NEXTIDLE) & LMASK, pc = ctl;
+ U.putInt(w, WorkQueue.PHASE, phase | IDLE);
+ for (;;) { // enqueue
+ w.stackPred = (int)pc;
+ long c = (pc - RC_UNIT) & UMASK | sp, ac = c & RC_MASK;
+ if (pc == (pc = U.compareAndExchangeLong(this, CTL, pc, c))) {
+ if (ac == 0L) {
+ if ((runState & (SHUTDOWN|STOP)) == SHUTDOWN)
quiescent(); // check quiescent termination
idle = w.phase & IDLE;
- break;
}
- else if (ac < (pc & RC_MASK)) {
- w.phase = phase; // back out if lost to signal
- break;
+ else {
+ int tc = ((short)(c >>> TC_SHIFT) | 1) & SMASK;
+ int spins = tc + Math.max(tc << 1, SPIN_WAITS);
+ while ((idle = w.phase & IDLE) != 0 && --spins != 0)
+ Thread.onSpinWait(); // spin for approx 1 scan cost
}
+ break;
+ }
+ else if (ac < (pc & RC_MASK)) {
+ w.phase = phase; // back out if lost to signal
+ break;
}
}
- else if ((idle = phase & IDLE) != 0)
- idle = awaitWork(w, phase);
}
return idle;
}
+
/**
* Reactivates worker w if it is currently top of ctl stack
*
* @param w the work queue
* @return 0 if now active
*/
+ @jdk.internal.vm.annotation.DontInline // avoid prefetch of contending memory
private int tryReactivate(WorkQueue w) {
int idle = 0;
if (w != null) { // always true; hoist checks
@@ -2070,12 +2067,11 @@ public class ForkJoinPool extends AbstractExecutorService
* Awaits signal or termination.
*
* @param w the work queue
- * @param phase current (inactive) phase
* @return 0 if now active else -1
*/
- private int awaitWork(WorkQueue w, int phase) {
- int idle = IDLE;
- if (w != null) { // always true; hoist checks
+ private int awaitWork(WorkQueue w) {
+ int idle = IDLE, phase;
+ if (w != null && (idle = (phase = w.phase) & IDLE) != 0) {
int activePhase = phase + IDLE, trim;
if ((trim = w.trimStatus) != 0)
w.trimStatus = 0;
@@ -2107,7 +2103,7 @@ public class ForkJoinPool extends AbstractExecutorService
Thread.interrupted(); // clear status for next park
}
}
- return (idle != 0) ? -1 : 0;
+ return idle;
}
/**