From 02ddb13d7b4b05dff5900e47f5969f1de69866ab Mon Sep 17 00:00:00 2001
From: Doug Lea
Date: Wed, 21 Jan 2026 11:29:23 -0500
Subject: [PATCH] Try out different approach
---
.../java/util/concurrent/ForkJoinPool.java | 254 +++++++++---------
1 file changed, 129 insertions(+), 125 deletions(-)
diff --git a/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java b/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java
index efe063d136f..7fcd6919b08 100644
--- a/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java
+++ b/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java
@@ -1065,7 +1065,7 @@ public class ForkJoinPool extends AbstractExecutorService
static final long RS_LOCK = 1L << 4; // lowest seqlock bit
// spin/sleep limits for runState locking and elsewhere
- static final int SPIN_WAITS = 1 << 7; // max calls to onSpinWait
+ static final int SPIN_WAITS = 1 << 8; // max calls to onSpinWait
static final int MIN_SLEEP = 1 << 10; // approx 1 usec as nanos
static final int MAX_SLEEP = 1 << 20; // approx 1 sec as nanos
@@ -1256,19 +1256,19 @@ public class ForkJoinPool extends AbstractExecutorService
*/
final void push(ForkJoinTask> task, ForkJoinPool pool, int unlock) {
ForkJoinTask>[] a = array;
- int b = base, s = top, cap, m;
+ int b = base, s = top, cap, m; long pos;
if (a == null || (cap = a.length) <= s + 1 - b || (m = cap - 1) < 0)
growAndPush(task, pool, unlock);
else {
top = s + 1;
- U.getAndSetReference(a, slotOffset(m & s), task);
+ U.getAndSetReference(a, pos = slotOffset(m & s), task);
Object pred = U.getReferenceAcquire(a, slotOffset(m & (s - 1)));
if (unlock != 1) { // release external lock
U.putInt(this, PHASE, unlock);
U.storeFence();
}
if (pred == null && pool != null)
- pool.signalWork(this, s); // may have appeared empty
+ pool.signalWork(a, pos); // may have appeared empty
}
}
@@ -1297,11 +1297,12 @@ public class ForkJoinPool extends AbstractExecutorService
break; // lost to pollers
newArray[k & newMask] = u;
}
+ long pos = slotOffset(s & newMask);
U.putReferenceVolatile(this, ARRAY, newArray);
if (unlock != 1)
phase = unlock;
if (pool != null)
- pool.signalWork(this, s);
+ pool.signalWork(newArray, pos);
return;
}
}
@@ -1418,8 +1419,47 @@ public class ForkJoinPool extends AbstractExecutorService
return null;
}
+ /**
+ * Tries once to poll for a task
+ * @param pool if non-null, pool to propagate signals
+ */
+ private ForkJoinTask> tryPoll(ForkJoinPool pool) {
+ ForkJoinTask>[] a; int cap, b, nb; long bp;
+ if ((a = array) != null && (cap = a.length) > 0) {
+ ForkJoinTask> t = (ForkJoinTask>)U.getReferenceAcquire(
+ a, bp = slotOffset((cap - 1) & (b = base)));
+ long np = slotOffset((cap - 1) & (nb = b + 1));
+ if (t != null && base == b &&
+ U.compareAndSetReference(a, bp, t, null)) {
+ Object nt = U.getReference(a, np);
+ U.getAndSetInt(this, BASE, nb);
+ if (pool != null && nt != null)
+ pool.signalWork(a, np);
+ return t;
+ }
+ }
+ return null;
+ }
+
// specialized execution methods
+ /**
+ * Runs the given task, as well as remaining local tasks
+ */
+ final int topLevelExec(ForkJoinTask> task, WorkQueue q, ForkJoinPool pool,
+ int fifo) {
+ ForkJoinPool p = (fifo == 0) ? null : pool;
+ int taken = 1;
+ while (task != null) {
+ task.doExec();
+ if ((task = nextLocalTask(fifo)) == null &&
+ (q == null || (task = q.tryPoll(p)) == null))
+ break;
+ ++taken;
+ }
+ return taken;
+ }
+
/**
* Deep form of tryUnpush: Traverses from top and removes and
* runs task if present.
@@ -1803,7 +1843,7 @@ public class ForkJoinPool extends AbstractExecutorService
if ((tryTerminate(false, false) & STOP) == 0L &&
phase != 0 && w != null && w.source != DROPPED) {
w.cancelTasks(); // clean queue
- signalWork(null, 0); // possibly replace
+ signalWork(null, 0L); // possibly replace
}
if (ex != null)
ForkJoinTask.rethrow(ex);
@@ -1811,29 +1851,25 @@ public class ForkJoinPool extends AbstractExecutorService
/**
* Releases an idle worker, or creates one if not enough exist,
- * giving up on contention if q is nonull and signalled slot
- * already taken.
+ * giving up on contention if source slot already taken.
*
- * @param src, if nonnull, the WorkQueue containing signalled task
- * @param base src's base index for the task
+ * @param src, if nonnull, the array containing signalled task
+ * @param offset slot offset for the task
*/
- final void signalWork(WorkQueue src, int base) {
- int pc = parallelism, i, sp;
- long c = U.getLong(this, CTL); // rely on caller sync for initial reads
+ final void signalWork(ForkJoinTask>[] src, long offset) {
+ int pc = parallelism, i, sp; // rely on caller sync for initial reads
+ long c = U.getLong(this, CTL);
WorkQueue[] qs = queues;
while ((short)(c >>> RC_SHIFT) < pc && qs != null &&
qs.length > (i = (sp = (int)c) & SMASK)) {
- WorkQueue v; long nc;
- if (i == 0) {
+ WorkQueue w = qs[i], v = null; long nc;
+ if (i == 0 || w == null) {
if ((short)(c >>> TC_SHIFT) >= pc)
break;
- v = null;
nc = ((c + TC_UNIT) & TC_MASK) | ((c + RC_UNIT) & RC_MASK);
}
- else if ((v = qs[i]) == null)
- break;
else
- nc = (v.stackPred & LMASK) | ((c + RC_UNIT) & UMASK);
+ nc = ((v = w).stackPred & LMASK) | ((c + RC_UNIT) & UMASK);
if (c == (c = U.compareAndExchangeLong(this, CTL, c, nc))) {
if (v == null)
createWorker();
@@ -1845,7 +1881,7 @@ public class ForkJoinPool extends AbstractExecutorService
break;
}
qs = queues;
- if (src != null && src.base - base > 0)
+ if (src != null && U.getReference(src, offset) == null)
break;
}
}
@@ -1925,126 +1961,95 @@ public class ForkJoinPool extends AbstractExecutorService
final void runWorker(WorkQueue w) {
if (w != null) {
int r = w.stackPred; // seed from registerWorker
- int fifo = (int)config & FIFO, idle = 0;
- for (boolean scanned = false;;) {
+ for (int fifo = (int)config & FIFO, idle = 0, taken = 0;;) {
WorkQueue[] qs;
long e = runState;
int n = ((qs = queues) == null) ? 0 : qs.length;
r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
int i = r, step = (r >>> 16) | 1;
- boolean found = false;
- if ((e & STOP) != 0L || n <= 0)
+ if ((e & STOP) != 0L || n <= 0) {
+ w.nsteals += taken;
break;
+ }
+ boolean rescan = false;
scan: for (int j = n; j != 0; --j, i += step) {
WorkQueue q; int qid;
if ((q = qs[qid = i & (n - 1)]) != null) {
- ForkJoinTask>[] a; int cap; // poll queue
+ ForkJoinTask>[] a; int pb = -1, cap; // poll queue
while ((a = q.array) != null && (cap = a.length) > 0) {
int b, nb; long bp, np; ForkJoinTask> t;
t = (ForkJoinTask>)U.getReferenceAcquire(
a, bp = slotOffset((cap - 1) & (b = q.base)));
Object nt = U.getReference(
a, np = slotOffset((cap - 1) & (nb = b + 1)));
- if (q.array == a && q.base == b &&
- U.getReference(a, bp) == t) {
- if (t == null) {
- if (nt == null && (!scanned || q.top == b)) {
- found = false;
+ int qb = q.base; // reread
+ if (idle != 0) {
+ if (t == null && nt == null && q.top - qb <= 0)
+ break;
+ if ((idle = tryReactivate(w)) != 0) {
+ rescan = true;
+ break scan; // can't take yet
+ }
+ }
+ else if (qb != b) // inconsistent/busy
+ ;
+ else if (t == null) {
+ if (q.array == a && U.getReference(a, bp) == null) {
+ if (nt == null)
break;
+ if (pb == (pb = b)) {
+ rescan = true; // stalled; reorder scan
+ break scan;
}
- if (found)
- break scan; // stall check
- found = true;
- }
- else if (idle != 0) {
- found = true;
- if ((idle = tryReactivate(w)) != 0)
- break scan; // can't take yet
- }
- else if (U.compareAndSetReference(a, bp, t, null)) {
- nt = U.getReference(a, np);
- U.getAndSetInt(q, WorkQueue.BASE, nb);
- if (nt != null) // propagate
- signalWork(q, nb);
- topLevelExec(t, w, q, fifo, qid);
- found = true;
- break scan;
}
}
+ else if (U.compareAndSetReference(a, bp, t, null)) {
+ nt = U.getReference(a, np);
+ U.getAndSetInt(q, WorkQueue.BASE, nb);
+ if (nt != null) // propagate
+ signalWork(a, np);
+ w.source = qid;
+ taken += w.topLevelExec(t, q, this, fifo);
+ rescan = true;
+ break scan;
+ }
}
}
}
- if (found)
- scanned = false;
- else if (!scanned && idle == 0)
- scanned = true;
- else if (!scanned || idle == 0) {
- if ((idle = deactivate(w, idle)) == 0)
- scanned = false;
- }
- else {
- awaitWork(w);
- idle = 0;
- scanned = false;
+ if (!rescan) {
+ idle = onEmptyScan(w, idle, taken);
+ taken = 0;
}
}
}
}
- private void topLevelExec(ForkJoinTask> t, WorkQueue w, WorkQueue q,
- int fifo, int qid) {
- if (t != null && q != null && w != null) { // always true; hoist checks
- w.source = qid;
- int taken = 1;
- for (;;) {
- t.doExec();
- if ((t = w.nextLocalTask(fifo)) == null) {
- ForkJoinTask>[] a; int cap, b, nb; long bp, np;
- if ((a = q.array) == null || (cap = a.length) <= 0)
- break; // similar to runWorker scan
- t = (ForkJoinTask>)U.getReferenceAcquire(
- a, bp = slotOffset((cap - 1) & (b = q.base)));
- Object nt = U.getReference(
- a, np = slotOffset((cap - 1) & (nb = b + 1)));
- if (t == null || q.base != b ||
- !U.compareAndSetReference(a, bp, t, null))
- break;
- nt = U.getReference(a, np);
- U.getAndSetInt(q, WorkQueue.BASE, nb);
- ++taken;
- if (nt != null && // prevent stalls
- (fifo != 0 || (t instanceof ForkJoinTask.InterruptibleTask)))
- signalWork(q, nb);
- }
- }
- w.nsteals += taken;
- if ((w.config & CLEAR_TLS) != 0 &&
- (Thread.currentThread() instanceof ForkJoinWorkerThread f))
- f.resetThreadLocals(); // (instanceof check always true)
- }
- }
-
/**
- * Possibly deactivates and pauses worker
+ * Possibly deactivates, reactivates or pauses worker
*
* @param w the work queue
- * @param idle active status
* @return 0 if now active
*/
- private int deactivate(WorkQueue w, int idle) {
+ private int onEmptyScan(WorkQueue w, int idle, int taken) {
if (w != null) { // always true; hoist checks
int phase = w.phase;
- if (idle != 0) // already deactivated
- idle = phase & IDLE;
- else { // try to deactivate
+ if (taken != 0) {
+ w.nsteals += taken;
+ if ((w.config & CLEAR_TLS) != 0 &&
+ (Thread.currentThread() instanceof ForkJoinWorkerThread f))
+ f.resetThreadLocals(); // (instanceof check always true)
+ if ((runState & (SHUTDOWN|STOP)) == 0L)
+ Thread.yield(); // pause before rescan
+ }
+ else if (idle == 0) { // deactivate
long sp = (phase + NEXTIDLE) & LMASK, pc = ctl;
U.putInt(w, WorkQueue.PHASE, phase | IDLE);
for (;;) { // enqueue
w.stackPred = (int)pc;
long c = (pc - RC_UNIT) & UMASK | sp, ac = c & RC_MASK;
if (pc == (pc = U.compareAndExchangeLong(this, CTL, pc, c))) {
- if (ac == 0L) // check quiescent termination
- quiescent();
+ if (ac == 0L && (runState & (SHUTDOWN|STOP)) == SHUTDOWN)
+ quiescent(); // check quiescent termination
idle = w.phase & IDLE;
break;
}
@@ -2054,12 +2059,9 @@ public class ForkJoinPool extends AbstractExecutorService
}
}
}
- if (idle != 0 && (runState & STOP) == 0L) {
- int noise = (phase ^ (phase >>> 16)) & (SPIN_WAITS - 1);
- int spins = (SPIN_WAITS << 1) | noise;
- Thread.yield(); // helps when oversubscribed
- while ((idle = w.phase & IDLE) != 0 && --spins > 0)
- Thread.onSpinWait();
+ else if ((idle = phase & IDLE) != 0) {
+ awaitWork(w, phase);
+ idle = 0;
}
}
return idle;
@@ -2092,37 +2094,41 @@ public class ForkJoinPool extends AbstractExecutorService
* @param w the work queue
* @throws WorkerTrimmedException on idle timeout
*/
- private void awaitWork(WorkQueue w) {
- int phase;
- if (w != null && ((phase = w.phase) & IDLE) != 0) {
+ private void awaitWork(WorkQueue w, int phase) {
+ if (w != null) {
int activePhase = phase + IDLE;
long deadline = 0L, waitTime = (w.source == INVALID_ID) ? 0L : keepAlive;
- LockSupport.setCurrentBlocker(this);
for (;;) {
+ long d = 0L, c; int idle;
Thread.interrupted(); // clear status
if ((runState & STOP) != 0L)
break;
- int idle;
- long d = 0L, c; // trim at head
boolean trimmable = false; // use timed wait if trimmable
- if ((int)(c = ctl) == activePhase && (c & RC_MASK) == 0L) {
- trimmable = true;
- long now = System.currentTimeMillis();
- if (deadline == 0L)
- deadline = waitTime + now;
- if ((d = deadline) - now <= TIMEOUT_SLOP)
- tryTrim(w, c, activePhase); // throws if trimmed
+ int spins = ((short)((c = ctl) >>> TC_SHIFT) | 1) & SMASK;
+ if ((int)c == activePhase) { // at head
+ if ((c & RC_MASK) == 0L) {
+ long now = System.currentTimeMillis();
+ if (deadline == 0L)
+ deadline = waitTime + now;
+ if ((d = deadline) - now <= TIMEOUT_SLOP)
+ tryTrim(w, c, activePhase); // throws if trimmed
+ trimmable = true;
+ }
+ spins += SPIN_WAITS; // spin more
}
- if ((w.phase & IDLE) == 0)
+ while ((idle = w.phase & IDLE) != 0 && --spins != 0)
+ Thread.onSpinWait();
+ if (idle == 0)
break;
+ LockSupport.setCurrentBlocker(this);
w.parking = 1; // enable unpark and recheck
if ((idle = w.phase & IDLE) != 0)
U.park(trimmable, d);
w.parking = 0; // close unpark window
+ LockSupport.setCurrentBlocker(null);
if (idle == 0 || (w.phase & IDLE) == 0)
break;
}
- LockSupport.setCurrentBlocker(null);
}
}
@@ -2148,7 +2154,6 @@ public class ForkJoinPool extends AbstractExecutorService
v.phase = vp;
U.unpark(v.owner);
}
- LockSupport.setCurrentBlocker(null);
throw new WorkerTrimmedException();
}
}
@@ -2805,8 +2810,7 @@ public class ForkJoinPool extends AbstractExecutorService
a, k = slotOffset((cap - 1) & (b = q.base)));
if (q.base == b && t != null &&
U.compareAndSetReference(a, k, t, null)) {
- q.base = b + 1;
- U.storeFence();
+ U.putIntVolatile(q, WorkQueue.BASE, b + 1);
try {
t.cancel(false);
} catch (Throwable ignore) {
@@ -3303,7 +3307,7 @@ public class ForkJoinPool extends AbstractExecutorService
if ((config & PRESET_SIZE) != 0)
throw new UnsupportedOperationException("Cannot override System property");
if ((prevSize = U.getAndSetInt(this, PARALLELISM, size)) < size)
- signalWork(null, 0); // trigger worker activation
+ signalWork(null, 0L); // trigger worker activation
return prevSize;
}