From 4f2adfcbc6b97b9036b6245bceecf9cb694dc6bf Mon Sep 17 00:00:00 2001
From: Doug Lea
Date: Fri, 13 Mar 2026 13:08:54 -0400
Subject: [PATCH] Improve performance under oversubscription, step 1
---
.../java/util/concurrent/ForkJoinPool.java | 258 +++++++++---------
1 file changed, 136 insertions(+), 122 deletions(-)
diff --git a/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java b/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java
index aa173b9f364..2dc1dc7556c 100644
--- a/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java
+++ b/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java
@@ -1215,7 +1215,7 @@ public class ForkJoinPool extends AbstractExecutorService
@jdk.internal.vm.annotation.Contended("w")
int nsteals; // number of steals from other queues
@jdk.internal.vm.annotation.Contended("w")
- volatile int parking; // nonzero if parked in awaitWork
+ volatile int parking; // next phase if parked in awaitWork
// Support for atomic operations
private static final Unsafe U;
@@ -1338,24 +1338,24 @@ public class ForkJoinPool extends AbstractExecutorService
private ForkJoinTask> nextLocalTask(int fifo) {
ForkJoinTask> t = null;
ForkJoinTask>[] a = array;
- int b = base, p = top, cap;
- if (p - b > 0 && a != null && (cap = a.length) > 0) {
- for (int m = cap - 1, s, nb;;) {
- if (fifo == 0 || (nb = b + 1) == p) {
- if ((t = (ForkJoinTask>)U.getAndSetReference(
- a, slotOffset(m & (s = p - 1)), null)) != null)
- updateTop(s); // else lost race for only task
- break;
- }
+ int b = base, s = top - 1, cap;
+ if (a != null && s - b >= 0 && (cap = a.length) > 0) {
+ if (fifo == 0) {
if ((t = (ForkJoinTask>)U.getAndSetReference(
- a, slotOffset(m & b), null)) != null) {
- updateBase(nb);
- break;
- }
- while (b == (b = U.getIntAcquire(this, BASE)))
- Thread.onSpinWait(); // spin to reduce memory traffic
- if (p - b <= 0)
- break;
+ a, slotOffset((cap - 1) & s), null)) != null)
+ updateTop(s);
+ } else {
+ do {
+ if ((t = (ForkJoinTask>)U.getAndSetReference(
+ a, slotOffset((cap - 1) & b), null)) != null) {
+ updateBase(b + 1);
+ break;
+ }
+ if (b == s)
+ break;
+ while (b == (b = U.getIntAcquire(this, BASE)))
+ Thread.onSpinWait();
+ } while (s - b >= 0);
}
}
return t;
@@ -1894,7 +1894,7 @@ public class ForkJoinPool extends AbstractExecutorService
createWorker();
else {
v.phase = sp;
- if (v.parking != 0)
+ if (v.parking == sp)
U.unpark(v.owner);
}
break;
@@ -1915,7 +1915,7 @@ public class ForkJoinPool extends AbstractExecutorService
c, ((UMASK & (c + RC_UNIT)) | (c & TC_MASK) |
(v.stackPred & LMASK))))) {
v.phase = sp;
- if (v.parking != 0)
+ if (v.parking == sp)
U.unpark(v.owner);
}
}
@@ -1983,46 +1983,51 @@ public class ForkJoinPool extends AbstractExecutorService
int n = qs.length, i = r, step = (r >>> 16) | 1;
boolean rescan = false;
scan: for (int l = n; l > 0; --l, i += step) { // scan queues
- int j, cap; WorkQueue q; ForkJoinTask>[] a;
- if ((q = qs[j = i & (n - 1)]) != null &&
+ int qid, cap; WorkQueue q; ForkJoinTask>[] a;
+ if ((q = qs[qid = i & (n - 1)]) != null &&
(a = q.array) != null && (cap = a.length) > 0) {
for (int m = cap - 1, pb = -1, b = q.base;;) {
ForkJoinTask> t; long k;
t = (ForkJoinTask>)U.getReferenceAcquire(
a, k = slotOffset(m & b));
- if (b != (b = q.base) || t == null ||
- !U.compareAndSetReference(a, k, t, null)) {
+ if (b != (b = q.base)) { // inconsistent / busy
+ if (src != qid) {
+ rescan = true; // reduce interference
+ break scan;
+ }
+ }
+ else if (t == null ||
+ !U.compareAndSetReference(a, k, t, null)) {
if (a[b & m] == null) {
- if (rescan) // end of run
- break scan;
if (a[(b + 1) & m] == null &&
a[(b + 2) & m] == null) {
- break; // probably empty
+ if (rescan) // end of run
+ break scan;
+ break; // probably empty
}
- if (pb == (pb = b)) { // track progress
- rescan = true; // stalled; reorder scan
+ if (src != qid || pb == (pb = b)) {
+ rescan = true; // reorder scan
break scan;
}
}
}
else {
boolean propagate;
- int prevSrc = src, nb;
- long np = slotOffset((nb = b + 1) & m);
+ long np = slotOffset((pb = b + 1) & m);
boolean more = (U.getReferenceVolatile(a, np) != null);
- q.base = nb;
+ q.base = pb;
w.nsteals = ++nsteals;
- w.source = src = j; // volatile
+ int prevSrc = src;
+ w.source = src = qid; // volatile
rescan = true;
if (propagate =
(more &&
- (prevSrc != src ||
- ((j & 1) == 0) &&
- (fifo != 0 || t.noUserHelp() != 0)) &&
- U.getReference(a, np) != null))
+ (prevSrc != qid ||
+ ((qid & 1) == 0 &&
+ (fifo != 0 || t.noUserHelp() != 0)))))
signalWork();
w.topLevelExec(t, fifo);
- if ((b = q.base) != nb && !propagate)
+ if ((b = q.base) != pb && !propagate)
break scan; // reduce interference
}
}
@@ -2058,120 +2063,129 @@ public class ForkJoinPool extends AbstractExecutorService
else if (qac < (pc & RC_MASK))
return w.phase = phase; // back out if lost to signal
}
- int ac = (short)(qac >>> RC_SHIFT), n; long e; WorkQueue[] qs;
+ long e; WorkQueue[] qs; int n;
if (((e = runState) & STOP) != 0L ||
- ((e & SHUTDOWN) != 0L && ac == 0 && quiescent() > 0) ||
+ ((e & SHUTDOWN) != 0L && qac == 0L && quiescent() > 0) ||
(qs = queues) == null || (n = qs.length) <= 0)
- return IDLE; // terminating
- for (int prechecks = Math.min(ac, 2), // reactivation threshold
- k = Math.max(n + (n << 1), SPIN_WAITS << 1),
- i = 0; k-- > 0 ; ++i) {
+ return IDLE; // terminating
+ int prechecks = Math.min((short)(qac >>> RC_SHIFT), 2);
+ for (int k = n + (n << 1), i = activePhase; k-- > 0 ; ++i) {
WorkQueue q; int cap; ForkJoinTask>[] a;
- if (w.phase == activePhase)
- return activePhase;
- if ((q = qs[i & (n - 1)]) == null)
- Thread.onSpinWait();
- else if ((a = q.array) != null && (cap = a.length) > 0 &&
- a[q.base & (cap - 1)] != null) {
- WorkQueue v; int sp, j; long c;
- if (prechecks > 0)
- --prechecks;
- else if (((c = ctl) & RC_MASK) <= qac && (sp = (int)c) != 0 &&
- (j = sp & SMASK) < n && (v = qs[j]) != null) {
- long nc = (v.stackPred & LMASK) | ((c + RC_UNIT) & UMASK);
- if (sp != activePhase && w.phase == activePhase)
- return activePhase;
- if ((sp == activePhase || k < n) &&
- U.compareAndSetLong(this, CTL, c, nc)) {
- v.phase = sp;
- if (sp == activePhase)
+ if ((q = qs[i & (n - 1)]) != null) {
+ if (w.phase == activePhase)
+ return activePhase;
+ if ((a = q.array) != null && (cap = a.length) > 0 &&
+ a[q.base & (cap - 1)] != null) {
+ WorkQueue v; int sp, j; long c;
+ if (prechecks > 0)
+ --prechecks;
+ else if (((c = ctl) & RC_MASK) <= qac && (sp = (int)c) != 0 &&
+ (j = sp & SMASK) < n && (v = qs[j]) != null) {
+ long nc = (v.stackPred & LMASK) | ((c + RC_UNIT) & UMASK);
+ if (sp != activePhase && w.phase == activePhase)
return activePhase;
- if (v.parking != 0)
- U.unpark(v.owner);
- break;
+ if ((sp == activePhase || k < n) &&
+ U.compareAndSetLong(this, CTL, c, nc)) {
+ v.phase = sp;
+ if (sp == activePhase)
+ return activePhase;
+ if (v.parking == sp)
+ U.unpark(v.owner);
+ break;
+ }
}
+ if (k < n)
+ k = n; // ensure re-encounter
}
- if (k < n)
- k = n; // ensure re-encounter
}
}
- return (((phase = w.phase) & IDLE) == 0) ? phase : awaitWork(w, phase);
+ return ((phase = w.phase) == activePhase) ? activePhase : awaitWork(w, phase);
}
/**
* Awaits signal or termination.
*
* @param w the work queue
- * @param p current phase (known to be idle)
+ * @param phase current phase (known to be idle)
* @return current phase, with IDLE set if worker should exit
*/
- private int awaitWork(WorkQueue w, int p) {
+ private int awaitWork(WorkQueue w, int phase) {
if (w != null) {
- ForkJoinWorkerThread t; long deadline;
- if ((w.config & CLEAR_TLS) != 0 && (t = w.owner) != null)
- t.resetThreadLocals(); // clear before reactivate
- if ((ctl & RC_MASK) > 0L)
- deadline = 0L;
- else if ((deadline =
- (((w.source != INVALID_ID) ? keepAlive : TIMEOUT_SLOP)) +
- System.currentTimeMillis()) == 0L)
- deadline = 1L; // avoid zero
- int activePhase = p + IDLE;
- if ((p = w.phase) != activePhase && (runState & STOP) == 0L) {
- LockSupport.setCurrentBlocker(this);
- w.parking = 1; // enable unpark
- while ((p = w.phase) != activePhase) {
- boolean trimmable = false; int trim;
- Thread.interrupted(); // clear status
- if ((runState & STOP) != 0L)
- break;
- if (deadline != 0L) {
- if ((trim = tryTrim(w, p, deadline)) > 0)
+ int activePhase = phase + IDLE, parking = 0;
+ long deadline = 0L;
+ boolean trim = (w.source == INVALID_ID);
+ if ((w.config & CLEAR_TLS) != 0 &&
+ (Thread.currentThread() instanceof ForkJoinWorkerThread f))
+ f.resetThreadLocals(); // (instanceof check always true)
+ while ((phase = w.phase) != activePhase) {
+ boolean trimmable = false; // true if at ctl head and quiescent
+ long d = 0L, c;
+ Thread.interrupted(); // clear status
+ if ((runState & STOP) != 0L)
+ break;
+ if ((int)(c = ctl) == activePhase) {
+ if ((c & RC_MASK) == 0L) {
+ long now = System.currentTimeMillis();
+ if (deadline == 0L)
+ d = deadline = now + keepAlive;
+ else if ((d = deadline) - now <= TIMEOUT_SLOP)
+ trim = true;
+ if (trim && tryTrim(w, c, activePhase))
+ break;
+ trim = false;
+ trimmable = true;
+ }
+ if (parking == 0) { // spin at head for approx 1 scan cost
+ int tc = ((short)(c >>> TC_SHIFT) | 1) & SMASK;
+ int spins = tc + (tc << 1);
+ while ((phase = w.phase) != activePhase && --spins != 0)
+ Thread.onSpinWait();
+ if (phase == activePhase)
break;
- else if (trim < 0)
- deadline = 0L;
- else
- trimmable = true;
}
- U.park(trimmable, deadline);
}
+ if (parking == 0) { // enable unpark
+ LockSupport.setCurrentBlocker(this);
+ w.parking = activePhase;
+ if ((phase = w.phase) == activePhase)
+ break;
+ }
+ U.park(trimmable, d);
+ }
+ if (parking != 0) {
w.parking = 0;
LockSupport.setCurrentBlocker(null);
}
}
- return p;
+ return phase;
}
/**
* Tries to remove and deregister worker after timeout, and release
- * another to do the same.
- * @return > 0: trimmed, < 0 : not trimmable, else 0
+ * another to do the same unless new tasks are found.
+ * @return true if trimmed
*/
- private int tryTrim(WorkQueue w, int phase, long deadline) {
- long c, nc; int stat, activePhase, vp, i; WorkQueue[] vs; WorkQueue v;
- if ((activePhase = phase + IDLE) != (int)(c = ctl) || w == null)
- stat = -1; // no longer ctl top
- else if (deadline - System.currentTimeMillis() >= TIMEOUT_SLOP)
- stat = 0; // spurious wakeup
- else if (!compareAndSetCtl(
- c, nc = ((w.stackPred & LMASK) | (RC_MASK & c) |
- (TC_MASK & (c - TC_UNIT)))))
- stat = -1; // lost race to signaller
- else {
- stat = 1;
- w.source = DROPPED;
- w.phase = activePhase;
- if ((vp = (int)nc) != 0 && (vs = queues) != null &&
- vs.length > (i = vp & SMASK) && (v = vs[i]) != null &&
- compareAndSetCtl( // try to wake up next waiter
- nc, ((UMASK & (nc + RC_UNIT)) |
- (nc & TC_MASK) | (v.stackPred & LMASK)))) {
- v.source = INVALID_ID; // enable cascaded timeouts
- v.phase = vp;
- U.unpark(v.owner);
+ private boolean tryTrim(WorkQueue w, long c, int activePhase) {
+ if (w != null) {
+ int vp, i; WorkQueue[] vs; WorkQueue v;
+ long nc = ((w.stackPred & LMASK) |
+ ((RC_MASK & c) | (TC_MASK & (c - TC_UNIT))));
+ if (U.compareAndSetLong(this, CTL, c, nc)) {
+ w.source = DROPPED;
+ w.phase = activePhase;
+ if ((vp = (int)nc) != 0 && (vs = queues) != null &&
+ vs.length > (i = vp & SMASK) && (v = vs[i]) != null &&
+ U.compareAndSetLong(this, CTL, // try to wake up next waiter
+ nc, ((v.stackPred & LMASK) |
+ ((UMASK & (nc + RC_UNIT)) | (nc & TC_MASK))))) {
+ v.source = INVALID_ID;
+ v.phase = vp;
+ U.unpark(v.owner);
+ }
+ return true;
}
}
- return stat;
+ return false;
}
/**
@@ -2226,7 +2240,7 @@ public class ForkJoinPool extends AbstractExecutorService
(v = qs[i]) != null &&
compareAndSetCtl(c, (c & UMASK) | (v.stackPred & LMASK))) {
v.phase = sp;
- if (v.parking != 0)
+ if (v.parking == sp)
U.unpark(v.owner);
stat = UNCOMPENSATE;
}