From 04928c9449535717672adbdd019221cd8fced8f9 Mon Sep 17 00:00:00 2001 From: Doug Lea Date: Sun, 25 Jan 2026 15:03:32 -0500 Subject: [PATCH] Don't oversignal LIFO --- .../java/util/concurrent/ForkJoinPool.java | 224 ++++++++---------- 1 file changed, 103 insertions(+), 121 deletions(-) diff --git a/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java b/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java index 7fcd6919b08..c1d50f520b0 100644 --- a/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java +++ b/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java @@ -1065,7 +1065,7 @@ public class ForkJoinPool extends AbstractExecutorService static final long RS_LOCK = 1L << 4; // lowest seqlock bit // spin/sleep limits for runState locking and elsewhere - static final int SPIN_WAITS = 1 << 8; // max calls to onSpinWait + static final int SPIN_WAITS = 1 << 7; // max calls to onSpinWait static final int MIN_SLEEP = 1 << 10; // approx 1 usec as nanos static final int MAX_SLEEP = 1 << 20; // approx 1 sec as nanos @@ -1168,12 +1168,6 @@ public class ForkJoinPool extends AbstractExecutorService } } - /** - * Exception thrown in tryTrim after idle timeout - */ - @SuppressWarnings("serial") - static final class WorkerTrimmedException extends RuntimeException { } - /** * Queues supporting work-stealing as well as external task * submission. See above for descriptions and algorithms. @@ -1195,9 +1189,11 @@ public class ForkJoinPool extends AbstractExecutorService @jdk.internal.vm.annotation.Contended("w") volatile int parking; // nonzero if parked in awaitWork @jdk.internal.vm.annotation.Contended("w") - int source; // source queue id (or DROPPED) + volatile int source; // source queue id (or DROPPED) @jdk.internal.vm.annotation.Contended("w") int nsteals; // number of steals from other queues + @jdk.internal.vm.annotation.Contended("w") + int dropOnEmptyScan; // nonzero if trimmable // Support for atomic operations private static final Unsafe U; @@ -1256,19 +1252,18 @@ public class ForkJoinPool extends AbstractExecutorService */ final void push(ForkJoinTask task, ForkJoinPool pool, int unlock) { ForkJoinTask[] a = array; - int b = base, s = top, cap, m; long pos; - if (a == null || (cap = a.length) <= s + 1 - b || (m = cap - 1) < 0) + int b = base, s = top, p = top = s + 1, cap, m; + if (a == null || (cap = a.length) <= p - b || (m = cap - 1) < 0) growAndPush(task, pool, unlock); else { - top = s + 1; - U.getAndSetReference(a, pos = slotOffset(m & s), task); + U.getAndSetReference(a, slotOffset(m & s), task); Object pred = U.getReferenceAcquire(a, slotOffset(m & (s - 1))); if (unlock != 1) { // release external lock U.putInt(this, PHASE, unlock); U.storeFence(); } if (pred == null && pool != null) - pool.signalWork(a, pos); // may have appeared empty + pool.signalWork(this, s); // may have appeared empty } } @@ -1288,7 +1283,7 @@ public class ForkJoinPool extends AbstractExecutorService } catch (OutOfMemoryError ex) { } if (newArray != null) { - int s = top++, mask = cap - 1, newMask = newCap - 1; + int s = top - 1, mask = cap - 1, newMask = newCap - 1; newArray[s & newMask] = task; for (int k = s - 1, j = cap; j > 0; --j, --k) { ForkJoinTask u; // poll old, push to new @@ -1297,15 +1292,15 @@ public class ForkJoinPool extends AbstractExecutorService break; // lost to pollers newArray[k & newMask] = u; } - long pos = slotOffset(s & newMask); U.putReferenceVolatile(this, ARRAY, newArray); if (unlock != 1) phase = unlock; if (pool != null) - pool.signalWork(newArray, pos); + pool.signalWork(this, s); return; } } + --top; // back out if (unlock != 1) phase = unlock; throw new RejectedExecutionException("Queue capacity exceeded"); @@ -1419,45 +1414,16 @@ public class ForkJoinPool extends AbstractExecutorService return null; } - /** - * Tries once to poll for a task - * @param pool if non-null, pool to propagate signals - */ - private ForkJoinTask tryPoll(ForkJoinPool pool) { - ForkJoinTask[] a; int cap, b, nb; long bp; - if ((a = array) != null && (cap = a.length) > 0) { - ForkJoinTask t = (ForkJoinTask)U.getReferenceAcquire( - a, bp = slotOffset((cap - 1) & (b = base))); - long np = slotOffset((cap - 1) & (nb = b + 1)); - if (t != null && base == b && - U.compareAndSetReference(a, bp, t, null)) { - Object nt = U.getReference(a, np); - U.getAndSetInt(this, BASE, nb); - if (pool != null && nt != null) - pool.signalWork(a, np); - return t; - } - } - return null; - } - // specialized execution methods /** * Runs the given task, as well as remaining local tasks */ - final int topLevelExec(ForkJoinTask task, WorkQueue q, ForkJoinPool pool, - int fifo) { - ForkJoinPool p = (fifo == 0) ? null : pool; - int taken = 1; + final void topLevelExec(ForkJoinTask task, int fifo) { while (task != null) { task.doExec(); - if ((task = nextLocalTask(fifo)) == null && - (q == null || (task = q.tryPoll(p)) == null)) - break; - ++taken; + task = nextLocalTask(fifo); } - return taken; } /** @@ -1577,7 +1543,7 @@ public class ForkJoinPool extends AbstractExecutorService break; if (base == b && t != null && U.compareAndSetReference(a, k, t, null)) { - U.putIntVolatile(this, BASE, b + 1); + base = b + 1; t.doExec(); } } @@ -1761,7 +1727,7 @@ public class ForkJoinPool extends AbstractExecutorService * @param w caller's WorkQueue */ final void registerWorker(WorkQueue w) { - if (w != null) { + if (w != null && (runState & STOP) == 0L) { w.array = new ForkJoinTask[INITIAL_QUEUE_CAPACITY]; ThreadLocalRandom.localInit(); int seed = w.stackPred = ThreadLocalRandom.getProbe(); @@ -1815,8 +1781,6 @@ public class ForkJoinPool extends AbstractExecutorService final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) { WorkQueue w = null; // null if not created int phase = 0; // 0 if not registered - if (ex instanceof WorkerTrimmedException) - ex = null; if (wt != null && (w = wt.workQueue) != null && (phase = w.phase) != 0 && (phase & IDLE) != 0) releaseWaiters(); // ensure released @@ -1843,7 +1807,7 @@ public class ForkJoinPool extends AbstractExecutorService if ((tryTerminate(false, false) & STOP) == 0L && phase != 0 && w != null && w.source != DROPPED) { w.cancelTasks(); // clean queue - signalWork(null, 0L); // possibly replace + signalWork(null, 0); // possibly replace } if (ex != null) ForkJoinTask.rethrow(ex); @@ -1851,23 +1815,26 @@ public class ForkJoinPool extends AbstractExecutorService /** * Releases an idle worker, or creates one if not enough exist, - * giving up on contention if source slot already taken. + * giving up on contention if q is nonull and signalled slot + * already taken. * - * @param src, if nonnull, the array containing signalled task - * @param offset slot offset for the task + * @param src, if nonnull, the WorkQueue containing signalled task + * @param base src's base index for the task */ - final void signalWork(ForkJoinTask[] src, long offset) { + final void signalWork(WorkQueue src, int base) { int pc = parallelism, i, sp; // rely on caller sync for initial reads long c = U.getLong(this, CTL); WorkQueue[] qs = queues; while ((short)(c >>> RC_SHIFT) < pc && qs != null && qs.length > (i = (sp = (int)c) & SMASK)) { WorkQueue w = qs[i], v = null; long nc; - if (i == 0 || w == null) { + if (i == 0) { if ((short)(c >>> TC_SHIFT) >= pc) break; nc = ((c + TC_UNIT) & TC_MASK) | ((c + RC_UNIT) & RC_MASK); } + else if ((v = w) == null) + break; else nc = ((v = w).stackPred & LMASK) | ((c + RC_UNIT) & UMASK); if (c == (c = U.compareAndExchangeLong(this, CTL, c, nc))) { @@ -1881,7 +1848,7 @@ public class ForkJoinPool extends AbstractExecutorService break; } qs = queues; - if (src != null && U.getReference(src, offset) == null) + if (src != null && src.base - base > 0) break; } } @@ -1961,23 +1928,22 @@ public class ForkJoinPool extends AbstractExecutorService final void runWorker(WorkQueue w) { if (w != null) { int r = w.stackPred; // seed from registerWorker - for (int fifo = (int)config & FIFO, idle = 0, taken = 0;;) { - WorkQueue[] qs; - long e = runState; - int n = ((qs = queues) == null) ? 0 : qs.length; + int fifo = (int)config & FIFO; + rescan: for (int idle = 0, taken = 0, src = -1;;) { + WorkQueue[] qs; int n; r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift - int i = r, step = (r >>> 16) | 1; - if ((e & STOP) != 0L || n <= 0) { + long e = runState; + if ((qs = queues) == null || (n = qs.length) <= 0 || + (e & STOP) != 0L) { w.nsteals += taken; break; } - boolean rescan = false; - scan: for (int j = n; j != 0; --j, i += step) { + scan: for (int j = n, i = r, step = (r >>> 16) | 1; ; i += step) { WorkQueue q; int qid; if ((q = qs[qid = i & (n - 1)]) != null) { - ForkJoinTask[] a; int pb = -1, cap; // poll queue + ForkJoinTask[] a; int pb = -1, ran = 0, cap; while ((a = q.array) != null && (cap = a.length) > 0) { - int b, nb; long bp, np; ForkJoinTask t; + int b, nb; long bp, np, ps; ForkJoinTask t; t = (ForkJoinTask)U.getReferenceAcquire( a, bp = slotOffset((cap - 1) & (b = q.base))); Object nt = U.getReference( @@ -1986,54 +1952,65 @@ public class ForkJoinPool extends AbstractExecutorService if (idle != 0) { if (t == null && nt == null && q.top - qb <= 0) break; - if ((idle = tryReactivate(w)) != 0) { - rescan = true; + if ((idle = tryReactivate(w)) != 0) break scan; // can't take yet - } } - else if (qb != b) // inconsistent/busy - ; + else if (qb != b) { // inconsistent + if (taken == 0) + break scan; // busy + } else if (t == null) { - if (q.array == a && U.getReference(a, bp) == null) { - if (nt == null) - break; - if (pb == (pb = b)) { - rescan = true; // stalled; reorder scan + if (U.getReference(a, bp) == null) { + if (ran != 0) // end run on this queue break scan; - } + if (nt == null) // probably empty + break; + if (pb == (pb = b)) + break scan; // stalled; reorder scan } } else if (U.compareAndSetReference(a, bp, t, null)) { nt = U.getReference(a, np); U.getAndSetInt(q, WorkQueue.BASE, nb); - if (nt != null) // propagate - signalWork(a, np); - w.source = qid; - taken += w.topLevelExec(t, q, this, fifo); - rescan = true; - break scan; + if (qid != (ps = src)) + U.putIntOpaque(w, WorkQueue.SOURCE, src = qid); + if (nt != null && + (qid != ps || + ((qid & 1) == 0 && (fifo != 0 || taken == 0))) && + U.getReferenceAcquire(a, np) != null) + signalWork(q, nb); // propagate + ran = ++taken; + w.topLevelExec(t, fifo); } } } - } - if (!rescan) { - idle = onEmptyScan(w, idle, taken); - taken = 0; + if (--j == 0) { // empty scan + if (idle == 0) { + idle = tryDeactivate(w, taken); + taken = 0; + } + else if ((idle = awaitWork(w)) != 0) + break rescan; // trimmed or terminated + else + src = -1; + break; + } } } } } /** - * Possibly deactivates, reactivates or pauses worker + * Possibly deactivates or pauses worker * * @param w the work queue + * @param taken number of tasks taken since last activate * @return 0 if now active */ - private int onEmptyScan(WorkQueue w, int idle, int taken) { + private int tryDeactivate(WorkQueue w, int taken) { + int idle = 0; if (w != null) { // always true; hoist checks - int phase = w.phase; - if (taken != 0) { + if (taken != 0) { // rescan before deactivating w.nsteals += taken; if ((w.config & CLEAR_TLS) != 0 && (Thread.currentThread() instanceof ForkJoinWorkerThread f)) @@ -2041,7 +2018,8 @@ public class ForkJoinPool extends AbstractExecutorService if ((runState & (SHUTDOWN|STOP)) == 0L) Thread.yield(); // pause before rescan } - else if (idle == 0) { // deactivate + else { + int phase = U.getInt(w, WorkQueue.PHASE); long sp = (phase + NEXTIDLE) & LMASK, pc = ctl; U.putInt(w, WorkQueue.PHASE, phase | IDLE); for (;;) { // enqueue @@ -2059,10 +2037,6 @@ public class ForkJoinPool extends AbstractExecutorService } } } - else if ((idle = phase & IDLE) != 0) { - awaitWork(w, phase); - idle = 0; - } } return idle; } @@ -2092,29 +2066,35 @@ public class ForkJoinPool extends AbstractExecutorService * Awaits signal or termination. * * @param w the work queue - * @throws WorkerTrimmedException on idle timeout + * @return 0 if now active */ - private void awaitWork(WorkQueue w, int phase) { - if (w != null) { - int activePhase = phase + IDLE; - long deadline = 0L, waitTime = (w.source == INVALID_ID) ? 0L : keepAlive; - for (;;) { - long d = 0L, c; int idle; + private int awaitWork(WorkQueue w) { + int idle = 0, phase; + if (w != null && (idle = (phase = w.phase) & IDLE) != 0) { + int activePhase = phase + IDLE, trim; + if ((trim = w.dropOnEmptyScan) != 0) + w.dropOnEmptyScan = 0; + for (long deadline = 0L;;) { Thread.interrupted(); // clear status if ((runState & STOP) != 0L) break; - boolean trimmable = false; // use timed wait if trimmable - int spins = ((short)((c = ctl) >>> TC_SHIFT) | 1) & SMASK; - if ((int)c == activePhase) { // at head - if ((c & RC_MASK) == 0L) { + boolean trimmable = false; + long d = 0L, c; + int ac = (short)((c = ctl) >>> RC_SHIFT); + int spins = ((short)(c >>> TC_SHIFT) | 1) & SMASK; // >= # workers + if ((int)c == activePhase) { // at head of ctl + spins += Math.max(spins << 1, SPIN_WAITS); // approx 1 scan cost + if (ac == 0) { // quiescent long now = System.currentTimeMillis(); if (deadline == 0L) - deadline = waitTime + now; - if ((d = deadline) - now <= TIMEOUT_SLOP) - tryTrim(w, c, activePhase); // throws if trimmed + d = deadline = now + keepAlive; + else if ((d = deadline) - now <= TIMEOUT_SLOP) + trim = 1; + if (trim != 0 && tryTrim(w, c, activePhase)) + break; + trim = 0; trimmable = true; } - spins += SPIN_WAITS; // spin more } while ((idle = w.phase & IDLE) != 0 && --spins != 0) Thread.onSpinWait(); @@ -2126,18 +2106,19 @@ public class ForkJoinPool extends AbstractExecutorService U.park(trimmable, d); w.parking = 0; // close unpark window LockSupport.setCurrentBlocker(null); - if (idle == 0 || (w.phase & IDLE) == 0) + if (idle == 0 || (idle = w.phase & IDLE) == 0) break; } } + return idle; } /** * Tries to remove and deregister worker after timeout, and release * another to do the same unless new tasks are found. - * @throws WorkerTrimmedException on idle timeout + * @return true if trimmed */ - private void tryTrim(WorkQueue w, long c, int activePhase) { + private boolean tryTrim(WorkQueue w, long c, int activePhase) { if (w != null) { int vp, i; WorkQueue[] vs; WorkQueue v; long nc = ((w.stackPred & LMASK) | @@ -2150,13 +2131,14 @@ public class ForkJoinPool extends AbstractExecutorService U.compareAndSetLong(this, CTL, // try to wake up next waiter nc, ((v.stackPred & LMASK) | ((UMASK & (nc + RC_UNIT)) | (nc & TC_MASK))))) { - v.source = INVALID_ID; // enable cascaded timeouts + v.dropOnEmptyScan = 1; v.phase = vp; U.unpark(v.owner); } - throw new WorkerTrimmedException(); + return true; } } + return false; } /** @@ -2314,7 +2296,7 @@ public class ForkJoinPool extends AbstractExecutorService } if (U.compareAndSetReference(a, k, t, null)) { q.base = b + 1; - U.putIntVolatile(w, WorkQueue.SOURCE, j); + w.source = j; t.doExec(); w.source = wsrc; rescan = true; // restart at index r @@ -2469,7 +2451,7 @@ public class ForkJoinPool extends AbstractExecutorService } if (U.compareAndSetReference(a, k, t, null)) { q.base = nb; - U.putIntVolatile(w, WorkQueue.SOURCE, j); + w.source = j; t.doExec(); w.source = wsrc; rescan = locals = true; @@ -3307,7 +3289,7 @@ public class ForkJoinPool extends AbstractExecutorService if ((config & PRESET_SIZE) != 0) throw new UnsupportedOperationException("Cannot override System property"); if ((prevSize = U.getAndSetInt(this, PARALLELISM, size)) < size) - signalWork(null, 0L); // trigger worker activation + signalWork(null, 0); // trigger worker activation return prevSize; }