From 872fd0b5f803aef3ea18cf520ef3e73487d65eca Mon Sep 17 00:00:00 2001
From: Doug Lea
Date: Tue, 17 Feb 2026 19:31:32 -0500
Subject: [PATCH] No interleaved reactivate
---
.../java/util/concurrent/ForkJoinPool.java | 144 ++++++++----------
1 file changed, 60 insertions(+), 84 deletions(-)
diff --git a/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java b/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java
index 0f04f9a891a..fda2f87da08 100644
--- a/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java
+++ b/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java
@@ -1931,15 +1931,14 @@ public class ForkJoinPool extends AbstractExecutorService
*/
final void runWorker(WorkQueue w) {
if (w != null) {
- int r = w.stackPred; // seed from registerWorker
+ int phase = w.phase, r = w.stackPred; // seed from registerWorker
int fifo = (int)config & FIFO;
- for (int nsteals = 0, idle = 0, src = -1;;) {
+ for (int nsteals = 0, src = -1;;) {
WorkQueue[] qs; int n, j;
r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
long e = runState;
- if ((qs = queues) == null ||
- (j = ((n = qs.length) << 1)) <= 0 || // 2 sweeps per scan
- (e & STOP) != 0L)
+ if ((qs = queues) == null || (e & STOP) != 0L ||
+ (j = ((n = qs.length) << 1)) <= 0) // 2 sweeps per scan
return;
scan: for (int i = r, step = (r >>> 16) | 1; ; i += step) {
WorkQueue q; int qid;
@@ -1952,8 +1951,7 @@ public class ForkJoinPool extends AbstractExecutorService
if (q.array == a && q.base == b) { // else inconsistent
long np = slotOffset((cap - 1) & (nb = b + 1));
if (t == null ||
- (idle == 0 &&
- !U.compareAndSetReference(a, bp, t, null))) {
+ !U.compareAndSetReference(a, bp, t, null)) {
if (taken != 0) { // end of run
w.nsteals = nsteals += taken;
break scan;
@@ -1965,17 +1963,14 @@ public class ForkJoinPool extends AbstractExecutorService
break scan; // stalled; reorder scan
}
}
- else if (idle != 0) {
- if ((idle = tryReactivate(w)) != 0)
- break scan; // can't take yet
- }
else {
q.base = nb;
Object nt = U.getReferenceVolatile(a, np);
if (qid != (prevSrc = src))
w.source = src = qid;
++taken;
- if (nt != null && (prevSrc < 0 || fifo != 0) &&
+ if (nt != null &&
+ (qid != prevSrc || fifo != 0) &&
U.getReference(a, np) != null)
signalWork(q, nb);
w.topLevelExec(t, fifo);
@@ -1984,9 +1979,7 @@ public class ForkJoinPool extends AbstractExecutorService
}
}
if (--j == 0) { // empty scan
- if (idle == 0)
- idle = tryDeactivate(w, src);
- else if ((idle = awaitWork(w)) != 0)
+ if (((phase = deactivate(w, phase)) & IDLE) != 0)
return; // trimmed or terminated
src = -1;
break;
@@ -2000,83 +1993,67 @@ public class ForkJoinPool extends AbstractExecutorService
* Possibly deactivates worker.
*
* @param w the work queue
- * @return idle status
+ * @param p current phase
+ * @return current phase, with IDLE set if worker should exit
*/
- private int tryDeactivate(WorkQueue w, int src) {
- int idle = 0;
- if (w != null) { // always true; hoist checks
- if (src >= 0 && (w.config & CLEAR_TLS) != 0 &&
- (Thread.currentThread() instanceof ForkJoinWorkerThread f))
- f.resetThreadLocals(); // (instanceof check always true)
- int phase = U.getInt(w, WorkQueue.PHASE);
- long sp = (phase + NEXTIDLE) & LMASK, pc = ctl;
- U.putInt(w, WorkQueue.PHASE, phase | IDLE);
- for (;;) { // enqueue
- w.stackPred = (int)pc;
- long c = (pc - RC_UNIT) & UMASK | sp, ac = c & RC_MASK;
- if (pc == (pc = U.compareAndExchangeLong(this, CTL, pc, c))) {
- if (ac == 0L) {
- if ((runState & (SHUTDOWN|STOP)) == SHUTDOWN)
- quiescent(); // check quiescent termination
- idle = w.phase & IDLE;
- }
- else {
- int tc = ((short)(c >>> TC_SHIFT) | 1) & SMASK;
- int spins = tc + Math.max(tc << 1, SPIN_WAITS);
- while ((idle = w.phase & IDLE) != 0 && --spins != 0)
- Thread.onSpinWait(); // spin for approx 1 scan cost
- }
- break;
- }
- else if (ac < (pc & RC_MASK)) {
- w.phase = phase; // back out if lost to signal
- break;
- }
- }
+ private int deactivate(WorkQueue w, int p) {
+ if (w == null) // currently impossible
+ return IDLE;
+ int activePhase = p + NEXTIDLE, sp;
+ long ap = activePhase & LMASK, pc = ctl, qc;
+ U.putInt(w, WorkQueue.PHASE, p | IDLE);
+ for (;;) { // enqueue
+ sp = w.stackPred = (int)pc;
+ if (pc == (pc = U.compareAndExchangeLong(
+ this, CTL, pc, qc = (pc - RC_UNIT) & UMASK | ap)))
+ break;
+ else if ((qc & RC_MASK) < (pc & RC_MASK))
+ return w.phase = p; // back out if lost to signal
}
- return idle;
- }
+ int ac = (short)(qc >>> RC_SHIFT), n; long e; WorkQueue[] qs;
+ if (((e = runState) & STOP) != 0L ||
+ ((e & SHUTDOWN) != 0L && ac == 0 && quiescent() > 0) ||
+ (qs = queues) == null || (n = qs.length) <= 0)
+ return IDLE; // terminating
-
- /**
- * Reactivates worker w if it is currently top of ctl stack
- *
- * @param w the work queue
- * @return 0 if now active
- */
- @jdk.internal.vm.annotation.DontInline // avoid prefetch of contending memory
- private int tryReactivate(WorkQueue w) {
- int idle = 0;
- if (w != null) { // always true; hoist checks
- int sp = w.stackPred, phase, activePhase; long c;
- if ((idle = (phase = w.phase) & IDLE) != 0 &&
- (activePhase = phase + IDLE) == (int)(c = ctl)) {
- if (U.compareAndSetLong(this, CTL, c,
- (sp & LMASK) | ((c + RC_UNIT) & UMASK))) {
- w.phase = activePhase;
- idle = 0;
- }
- else
- idle = w.phase & IDLE;
- }
+ for (int prechecks = Math.min(ac, 2), // reactivation threshold
+ k = Math.max(n + (n << 1), SPIN_WAITS << 1);;) {
+ WorkQueue q; int cap; ForkJoinTask>[] a; long c;
+ if ((p = w.phase) == activePhase)
+ return activePhase;
+ if (--k < 0)
+ return awaitWork(w, p); // block, drop, or exit
+ if ((q = qs[k & (n - 1)]) == null)
+ Thread.onSpinWait();
+ else if ((a = q.array) != null && (cap = a.length) > 0 &&
+ a[q.base & (cap - 1)] != null && --prechecks < 0 &&
+ (int)(c = ctl) == activePhase &&
+ U.compareAndSetLong(this, CTL, c,
+ (sp & LMASK) | ((c + RC_UNIT) & UMASK)))
+ return w.phase = activePhase; // reactivate
}
- return idle;
}
/**
* Awaits signal or termination.
*
* @param w the work queue
- * @return 0 if now active else -1
+ * @param p current phase (known to be idle)
+ * @return current phase, with IDLE set if worker should exit
*/
- private int awaitWork(WorkQueue w) {
- int idle = IDLE, phase;
- if (w != null && (idle = (phase = w.phase) & IDLE) != 0) {
- int activePhase = phase + IDLE, trim;
+ private int awaitWork(WorkQueue w, int p) {
+ if (w != null) {
+ long deadline = 0L;
+ int activePhase = p + IDLE, trim;
if ((trim = w.trimStatus) != 0)
w.trimStatus = 0;
- long deadline = 0L;
- while ((runState & STOP) == 0L) {
+ if ((w.config & CLEAR_TLS) != 0 && // clear before reactivate
+ (Thread.currentThread() instanceof ForkJoinWorkerThread f))
+ f.resetThreadLocals(); // (instanceof check always true)
+ for (;;) {
+ Thread.interrupted(); // clear status
+ if ((runState & STOP) != 0L)
+ break;
boolean trimmable = false; // true if at ctl head and quiescent
long d = 0L, c;
if ((int)(c = ctl) == activePhase && (c & RC_MASK) == 0L) {
@@ -2090,20 +2067,19 @@ public class ForkJoinPool extends AbstractExecutorService
trim = 0;
trimmable = true;
}
- if ((idle = w.phase & IDLE) == 0)
+ if ((p = w.phase) == activePhase)
break;
LockSupport.setCurrentBlocker(this);
w.parking = 1; // enable unpark and recheck
- if ((idle = w.phase & IDLE) != 0)
+ if (w.phase != activePhase)
U.park(trimmable, d);
w.parking = 0; // close unpark window
LockSupport.setCurrentBlocker(null);
- if (idle == 0 || (idle = w.phase & IDLE) == 0)
+ if ((p = w.phase) == activePhase)
break;
- Thread.interrupted(); // clear status for next park
}
}
- return idle;
+ return p;
}
/**