From 4328645c978e5b19b6f2eb80f1d2a2990ce7c818 Mon Sep 17 00:00:00 2001 From: Doug Lea Date: Tue, 6 Jan 2026 17:46:52 -0500 Subject: [PATCH] Split external push --- .../java/util/concurrent/ForkJoinPool.java | 187 ++++++++++-------- .../java/util/concurrent/ForkJoinTask.java | 2 +- 2 files changed, 103 insertions(+), 86 deletions(-) diff --git a/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java b/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java index 1835d9addd2..22d75175a8e 100644 --- a/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java +++ b/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java @@ -1198,6 +1198,7 @@ public class ForkJoinPool extends AbstractExecutorService private static final long PHASE; private static final long BASE; private static final long TOP; + private static final long SOURCE; private static final long ARRAY; /** @@ -1240,34 +1241,35 @@ public class ForkJoinPool extends AbstractExecutorService } /** - * Pushes a task. Called only by owner or if already locked + * Pushes a task on an internal queue. Called only by owner. + * (Use pool.externalPush for external queues). * * @param task the task; caller must ensure nonnull - * @param pool the pool to signal if was previously empty, else null - * @param unlock if not 1, phase unlock value + * @param pool the pool to signal if was previously empty or resized + * @throws RejectedExecutionException if array could not be resized */ - final void push(ForkJoinTask task, ForkJoinPool pool, int unlock) { + final void push(ForkJoinTask task, ForkJoinPool pool) { ForkJoinTask[] a = array; - int b = base, s = top, room, m; - if (a != null && - ((room = a.length - (s + 1 - b)) > 0 || (a = growArray()) != null) && - (m = a.length - 1) >= 0) { // else rejected or disabled + int b = base, s = top, cap, m; + if (a == null || ((cap = a.length) - (s + 1 - b)) <= 0 || cap <= 0) + growAndPush(task, pool, 1); + else { top = s + 1; - U.putReferenceVolatile(a, slotOffset(m & s), task); - if (unlock != 1) // release external lock - U.putInt(this, PHASE, unlock); - if ((U.getReferenceAcquire(a, slotOffset(m & (s - 1))) == null || - room <= 0) && pool != null) + U.putReferenceVolatile(a, slotOffset((m = cap - 1) & s), task); + if (U.getReferenceAcquire(a, slotOffset(m & (s - 1))) == null && + pool != null) pool.signalWork(this, s); // may have appeared empty } } /** - * Resizes the queue array unless out of memory. - * @return new array, or throw on OOME + * Resizes the queue array and pushes unless out of memory. + * @param task the task; caller must ensure nonnull + * @param pool the pool to signal upon resize + * @param unlock if not 1, phase unlock value */ - private ForkJoinTask[] growArray() { + final void growAndPush(ForkJoinTask task, ForkJoinPool pool, int unlock) { ForkJoinTask[] a; int cap, newCap; if ((a = array) != null && (cap = a.length) > 0 && (newCap = (cap >= 1 << 16) ? cap << 1 : cap << 2) > 0) { @@ -1277,8 +1279,9 @@ public class ForkJoinPool extends AbstractExecutorService } catch (OutOfMemoryError ex) { } if (newArray != null) { - int mask = cap - 1, newMask = newCap - 1; - for (int k = top - 1, j = cap; j > 0; --j, --k) { + int s = top++, mask = cap - 1, newMask = newCap - 1; + newArray[s & newMask] = task; + for (int k = s - 1, j = cap; j > 0; --j, --k) { ForkJoinTask u; // poll old, push to new if ((u = (ForkJoinTask)U.getAndSetReference( a, slotOffset(k & mask), null)) == null) @@ -1286,12 +1289,15 @@ public class ForkJoinPool extends AbstractExecutorService newArray[k & newMask] = u; } U.putReferenceVolatile(this, ARRAY, newArray); - return newArray; + if (unlock != 1) + phase = unlock; + if (pool != null) + pool.signalWork(this, s); + return; } } - int f = phase; // unlock if externally locked - if ((f & (IDLE | 1)) == 0) - phase = f + IDLE; + if (unlock != 1) + phase = unlock; throw new RejectedExecutionException("Queue capacity exceeded"); } @@ -1308,12 +1314,12 @@ public class ForkJoinPool extends AbstractExecutorService if (fifo == 0) { if ((t = (ForkJoinTask)U.getAndSetReference( a, slotOffset((cap - 1) & s), null)) != null) - top = s; + U.putIntOpaque(this, TOP, s); } else { do { if ((t = (ForkJoinTask)U.getAndSetReference( a, slotOffset((cap - 1) & b), null)) != null) { - base = b + 1; + U.putIntVolatile(this, BASE, b + 1); break; } if (b == s) @@ -1330,10 +1336,7 @@ public class ForkJoinPool extends AbstractExecutorService * Takes next task, if one exists, using configured mode. */ final ForkJoinTask nextLocalTask() { - U.loadFence(); // ensure ordering for external callers - ForkJoinTask t = nextLocalTask(config & FIFO); - U.storeFence(); - return t; + return nextLocalTask(config & FIFO); } /** @@ -1452,6 +1455,7 @@ public class ForkJoinPool extends AbstractExecutorService a, slotOffset(s & m), null)); top = s; } + U.storeFence(); } if (!internal) phase = lock + NEXTIDLE; @@ -1499,7 +1503,7 @@ public class ForkJoinPool extends AbstractExecutorService if (taken = (top == p && U.compareAndSetReference(a, k, t, null))) - top = s; + U.putIntOpaque(this, TOP, s); if (!internal) phase = lock + NEXTIDLE; if (!taken) @@ -1536,7 +1540,7 @@ public class ForkJoinPool extends AbstractExecutorService break; if (base == b && t != null && U.compareAndSetReference(a, k, t, null)) { - base = b + 1; + U.putIntVolatile(this, BASE, b + 1); t.doExec(); } } @@ -1573,6 +1577,7 @@ public class ForkJoinPool extends AbstractExecutorService PHASE = U.objectFieldOffset(klass, "phase"); BASE = U.objectFieldOffset(klass, "base"); TOP = U.objectFieldOffset(klass, "top"); + SOURCE = U.objectFieldOffset(klass, "source"); ARRAY = U.objectFieldOffset(klass, "array"); } } @@ -1907,8 +1912,10 @@ public class ForkJoinPool extends AbstractExecutorService else if ((ds = delayScheduler) != null && !ds.canShutDown()) return 0; else if (U.compareAndSetLong(this, CTL, c, c) && - U.compareAndSetLong(this, RUNSTATE, e, e | STOP)) + U.compareAndSetLong(this, RUNSTATE, e, e | STOP)) { + releaseWaiters(); return 1; // enable termination + } else break; // restart } @@ -1926,7 +1933,7 @@ public class ForkJoinPool extends AbstractExecutorService WorkQueue[] qs; int n; int r = w.stackPred; // seed from registerWorker int fifo = (int)config & FIFO; - int src = -1, idle = 0, rescans = 0, taken = 0; + int src = 0, idle = 0, rescans = 0, taken = 0; while ((runState & STOP) == 0L && (qs = queues) != null && (n = qs.length) > 0) { int i = r, step = (r >>> 16) | 1; @@ -1965,8 +1972,7 @@ public class ForkJoinPool extends AbstractExecutorService if (nt != null) signalWork(q, nb); // propagate rescans = 1; - ++taken; - if (src != qid) + if (taken++ == 0 || src != qid) w.source = src = qid; w.topLevelExec(t, fifo); } @@ -1975,21 +1981,14 @@ public class ForkJoinPool extends AbstractExecutorService } if (rescans >= 0) --rescans; - else if ((runState & STOP) != 0L) - break; else if (idle == 0) { idle = deactivate(w, taken); taken = 0; } - else { - int phase; - if ((idle = (phase = w.phase) & IDLE) != 0) { - if ((idle = awaitWork(w, phase)) != 0) - break; - src = -1; - } + else if ((idle = awaitWork(w)) == 0) rescans = 0; - } + else + break; } if (taken != 0) w.nsteals += taken; @@ -1997,7 +1996,7 @@ public class ForkJoinPool extends AbstractExecutorService } /** - * Deactivates and enqueues worker + * Deactivates and enqueues worker, backing out on signal * * @param w the work queue * @param taken number of stolen tasks since last reactivation @@ -2009,24 +2008,28 @@ public class ForkJoinPool extends AbstractExecutorService int phase = U.getInt(w, WorkQueue.PHASE); long sp = (phase + NEXTIDLE) & LMASK, pc = ctl, c, e; U.putInt(w, WorkQueue.PHASE, phase | IDLE); - do { // enqueue + for (;;) { // try to enqueue w.stackPred = (int)pc; - } while (pc != (pc = U.compareAndExchangeLong( - this, CTL, pc, c = ((pc - RC_UNIT) & UMASK) | sp))); + if (pc == (pc = U.compareAndExchangeLong( + this, CTL, pc, c = ((pc - RC_UNIT) & UMASK) | sp))) + break; + else if ((c & RC_MASK) < (pc & RC_MASK)) { + w.phase = phase; // back out if lost to signal + idle = 0; + break; + } + } if (taken != 0) { w.nsteals += taken; if ((w.config & CLEAR_TLS) != 0 && (Thread.currentThread() instanceof ForkJoinWorkerThread f)) f.resetThreadLocals(); // (instanceof check always true) } - if (((e = runState) & STOP) != 0L || - ((e & SHUTDOWN) != 0L && (c & RC_MASK) == 0L && quiescent() > 0)) - releaseWaiters(); - else { // spin for approx 1 scan cost - int tc = (short)(c >>> TC_SHIFT); - int spins = Math.max((tc << 1) + tc, SPIN_WAITS); - while ((idle = w.phase & IDLE) != 0 && --spins != 0) - Thread.onSpinWait(); + if (idle != 0 && + ((e = runState) & STOP) == 0L && + ((e & SHUTDOWN) == 0L || (c & RC_MASK) > 0L || quiescent() <= 0)) { + for (int s = SPIN_WAITS; (idle = w.phase & IDLE) != 0 && --s != 0;) + Thread.onSpinWait(); // spin before rescan } } return idle; @@ -2057,43 +2060,46 @@ public class ForkJoinPool extends AbstractExecutorService * Awaits signal or termination. * * @param w the work queue - * @param phase w's current phase (must be inactive) * @return 0 if now active */ - private int awaitWork(WorkQueue w, int phase) { - int idle = IDLE; - if (w != null) { // always true; hoist checks + private int awaitWork(WorkQueue w) { + int idle = 0, phase; + if (w != null && (idle = (phase = w.phase) & IDLE) != 0) { int activePhase = phase + IDLE; long deadline = 0L, waitTime = (w.source == INVALID_ID) ? 0L : keepAlive; - LockSupport.setCurrentBlocker(this); - for (;;) { + do { + boolean trimmable = false; // use timed wait if trimmable + int spins = 0; + long d = 0L, c; Thread.interrupted(); // clear status if ((runState & STOP) != 0L) break; - boolean trimmable = false; // use timed wait if trimmable - long d = 0L, c; - if (((c = ctl) & RC_MASK) == 0L && (int)c == activePhase) { - long now = System.currentTimeMillis(); - if (deadline == 0L) - deadline = waitTime + now; - if (deadline - now <= TIMEOUT_SLOP) { - if (tryTrim(w, c, activePhase)) - break; - continue; // lost race to trim + if ((int)(c = ctl) == activePhase) { + spins = SPIN_WAITS; // trim or spin at head + if ((c & RC_MASK) == 0L) { + long now = System.currentTimeMillis(); + if (deadline == 0L) + deadline = waitTime + now; + if (deadline - now <= TIMEOUT_SLOP) { + if (tryTrim(w, c, activePhase)) + break; + continue; // lost race to trim + } + d = deadline; + trimmable = true; } - d = deadline; - trimmable = true; } - if ((idle = w.phase & IDLE) == 0) + while ((idle = w.phase & IDLE) != 0 && spins-- != 0) + Thread.onSpinWait(); + if (idle == 0) break; + LockSupport.setCurrentBlocker(this); w.parking = 1; // enable unpark and recheck if ((idle = w.phase & IDLE) != 0) U.park(trimmable, d); w.parking = 0; // close unpark window - if (idle == 0 || (idle = w.phase & IDLE) == 0) - break; - } - LockSupport.setCurrentBlocker(null); + LockSupport.setCurrentBlocker(null); + } while (idle != 0 && (idle = w.phase & IDLE) != 0); } return idle; } @@ -2280,7 +2286,7 @@ public class ForkJoinPool extends AbstractExecutorService } if (U.compareAndSetReference(a, k, t, null)) { q.base = b + 1; - w.source = j; + U.putIntVolatile(w, WorkQueue.SOURCE, j); t.doExec(); w.source = wsrc; rescan = true; // restart at index r @@ -2352,6 +2358,7 @@ public class ForkJoinPool extends AbstractExecutorService if (U.compareAndSetReference( a, k, t, null)) { q.base = b + 1; + U.storeFence(); t.doExec(); locals = rescan = true; break scan; @@ -2435,7 +2442,7 @@ public class ForkJoinPool extends AbstractExecutorService } if (U.compareAndSetReference(a, k, t, null)) { q.base = nb; - w.source = j; + U.putIntVolatile(w, WorkQueue.SOURCE, j); t.doExec(); w.source = wsrc; rescan = locals = true; @@ -2553,7 +2560,7 @@ public class ForkJoinPool extends AbstractExecutorService ThreadLocalRandom.localInit(); // initialize caller's probe r = ThreadLocalRandom.getProbe(); } - for (;;) { + for (;; r = ThreadLocalRandom.advanceProbe(r)) { WorkQueue q; WorkQueue[] qs; int n, id, i, lock; if ((qs = queues) == null || (n = qs.length) <= 0) break; @@ -2570,10 +2577,20 @@ public class ForkJoinPool extends AbstractExecutorService q.phase = unlock; break; // check while q lock held } - q.push(task, signalIfEmpty ? this : null, unlock); + ForkJoinTask[] a = q.array; + int b = q.base, s = q.top, cap; + if (a == null || ((cap = a.length) - (s + 1 - b)) <= 0 || cap <= 0) + q.growAndPush(task, this, unlock); + else { + q.top = s + 1; + a[(cap - 1) & s] = task; + long pk = slotOffset((cap - 1) & (s - 1)); // predecessor index + q.phase = unlock; + if (U.getReferenceAcquire(a, pk) == null && signalIfEmpty) + signalWork(q, s); + } return; } - r = ThreadLocalRandom.advanceProbe(r); // move } throw new RejectedExecutionException(); } @@ -2582,7 +2599,7 @@ public class ForkJoinPool extends AbstractExecutorService Thread t; ForkJoinWorkerThread wt; if (((t = JLA.currentCarrierThread()) instanceof ForkJoinWorkerThread) && (wt = (ForkJoinWorkerThread)t).pool == this) - wt.workQueue.push(task, signalIfEmpty ? this : null, 1); + wt.workQueue.push(task, signalIfEmpty ? this : null); else externalPush(task, signalIfEmpty, true); } diff --git a/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java b/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java index b4b9ecc0c9b..702687a6d8e 100644 --- a/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java +++ b/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java @@ -642,7 +642,7 @@ public abstract class ForkJoinTask implements Future, Serializable { public final ForkJoinTask fork() { Thread t; ForkJoinWorkerThread wt; if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) - ((wt = (ForkJoinWorkerThread)t).workQueue).push(this, wt.pool, 1); + ((wt = (ForkJoinWorkerThread)t).workQueue).push(this, wt.pool); else ForkJoinPool.common.externalPush(this, true, false); return this;