From c451e61e67d0f0902568f41ee581b08c1fb8a504 Mon Sep 17 00:00:00 2001 From: Doug Lea Date: Mon, 27 Apr 2026 08:07:02 -0400 Subject: [PATCH] Improve ramp-up/down --- .../java/util/concurrent/ForkJoinPool.java | 165 +++++++++--------- 1 file changed, 84 insertions(+), 81 deletions(-) diff --git a/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java b/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java index 5fa5465f24c..2b6e5cd84a4 100644 --- a/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java +++ b/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java @@ -1210,7 +1210,9 @@ public class ForkJoinPool extends AbstractExecutorService int nsteals; // number of steals from other queues @jdk.internal.vm.annotation.Contended("w") volatile int parking; // next phase if parked in awaitWork - + @jdk.internal.vm.annotation.Contended("w") + int prevSteals; // to track steals across phases + // Support for atomic operations private static final Unsafe U; private static final long PHASE; @@ -1968,24 +1970,25 @@ public class ForkJoinPool extends AbstractExecutorService if (w != null) { int phase = w.phase, r = w.stackPred; // seed from registerWorker int fifo = w.config & FIFO, nsteals = 0, src = -1; - for (;;) { - WorkQueue[] qs; + for (int i = r;;) { // i is scan origin + long e = runState; + WorkQueue[] qs = queues; r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift - if ((runState & STOP) != 0L || (qs = queues) == null) + int step = (r >>> 16) | 1, n; + if ((e & STOP) != 0L || qs == null || (n = qs.length) <= 0) break; - int n = qs.length, i = r, step = (r >>> 16) | 1; - boolean rescan = false; - scan: for (int l = n; l > 0; --l, i += step) { // scan queues + boolean rescan = false; // scan queues + scan: for (int j = (src < 0) ? n : n << 1; j != 0; --j, i += step) { int qid, cap; WorkQueue q; ForkJoinTask[] a; if ((q = qs[qid = i & (n - 1)]) != null && (a = q.array) != null && (cap = a.length) > 0) { - for (int m = cap - 1, pb = -1, b = q.base;;) { - ForkJoinTask t; long k; + for (int m = cap - 1, pb = -1, b = q.base; ; pb = b) { + ForkJoinTask t; long k; int nb; t = (ForkJoinTask)U.getReferenceAcquire( a, k = slotOffset(m & b)); - if (b != (b = q.base)) { // inconsistent / busy + if (b != (b = q.base)) { // inconsistent / busy if (src != qid) { - rescan = true; // reduce interference + rescan = true; // reduce interference break scan; } } @@ -1994,69 +1997,66 @@ public class ForkJoinPool extends AbstractExecutorService if (a[b & m] == null) { if (a[(b + 1) & m] == null && a[(b + 2) & m] == null) { - if (rescan) // end of run + if (rescan) // end of run break scan; - break; // probably empty + break; // probably empty } - if (src != qid || pb == (pb = b)) { - rescan = true; // reorder scan - break scan; + if (src != qid || pb == b) { + rescan = true; + break scan; // reorder scan } } } else { - boolean propagate; - long np = slotOffset((pb = b + 1) & m); - boolean more = (U.getReferenceVolatile(a, np) != null); - q.base = pb; + Object nt = U.getReferenceVolatile( + a, slotOffset((nb = b + 1) & m)); + q.base = nb; w.nsteals = ++nsteals; int prevSrc = src; - w.source = src = qid; // volatile - rescan = true; - if (propagate = - (more && - (prevSrc != qid || - ((qid & 1) == 0 && - (fifo != 0 || t.noUserHelp() != 0))))) - signalWork(); + w.source = src = qid; // volatile + if (nt != null && + (qid != prevSrc || + ((qid & 1) == 0 && + (fifo != 0 || t.noUserHelp() != 0)))) + signalWork(); // propagate w.topLevelExec(t, fifo); - if ((b = q.base) != pb && !propagate) - break scan; // reduce interference + rescan = true; + b = q.base; } } } } - if (!rescan) { - if (((phase = deactivate(w, phase)) & IDLE) != 0) - break; - src = -1; // re-enable propagation + i = r; // move unless deactivated + if (!rescan && runState == e && + phase != (phase = deactivate(w, phase))) { + if ((phase & IDLE) != 0) + break; // terminated + if (src >= 0) { + i = src; // restart at last src + src = -1; // re-enable propagation + } } } } } /** - * Deactivates and if necessary awaits signal or termination. + * Deactivates unless contended and if necessary awaits signal or termination. * * @param w the worker * @param phase current phase * @return current phase, with IDLE set if worker should exit */ private int deactivate(WorkQueue w, int phase) { - if (w == null) // currently impossible + if (w == null) // currently impossible return IDLE; - w.phase = phase | IDLE; int activePhase = phase + (IDLE << 1); - long pc = ctl, qc, qac; - for (;;) { // enqueue - w.stackPred = (int)pc; - qac = (qc = (pc - RC_UNIT) & UMASK | (activePhase & LMASK)) & RC_MASK; - if (pc == (pc = U.compareAndExchangeLong(this, CTL, pc, qc))) - break; - else if (qac < (pc & RC_MASK)) - return w.phase = phase; // back out if lost to signal - } - long e; WorkQueue[] qs; int n; + long pc = ctl, qc = ((pc - RC_UNIT) & UMASK) | (activePhase & LMASK); + w.stackPred = (int)pc; // try to enqueue + w.phase = phase | IDLE; + if (!U.compareAndSetLong(this, CTL, pc, qc)) + return w.phase = phase; // back out on contention + long qac = qc & RC_MASK, e; WorkQueue[] qs; int n; if (((e = runState) & STOP) != 0L || ((e & SHUTDOWN) != 0L && qac == 0L && quiescent() > 0) || (qs = queues) == null || (n = qs.length) <= 0) @@ -2092,60 +2092,63 @@ public class ForkJoinPool extends AbstractExecutorService } } } - return ((phase = w.phase) == activePhase) ? activePhase : awaitWork(w, phase); + return awaitWork(w); } /** * Awaits signal or termination. * * @param w the work queue - * @param phase current phase (known to be idle) * @return current phase, with IDLE set if worker should exit */ - private int awaitWork(WorkQueue w, int phase) { - if (w != null) { - int activePhase = phase + IDLE, parking = 0; + private int awaitWork(WorkQueue w) { + int phase = IDLE; + if ((runState & STOP) == 0L && w != null && ((phase = w.phase) & IDLE) != 0) { + boolean trim = false; + int ns = w.nsteals, nstolen = ns - w.prevSteals; + if (nstolen != 0) { + w.prevSteals = ns; + if ((w.config & CLEAR_TLS) != 0 && + (Thread.currentThread() instanceof ForkJoinWorkerThread f)) + f.resetThreadLocals(); // (instanceof check always true) + } + else if (w.source == INVALID_ID) + trim = true; long deadline = 0L; - boolean trim = (w.source == INVALID_ID); - if ((w.config & CLEAR_TLS) != 0 && - (Thread.currentThread() instanceof ForkJoinWorkerThread f)) - f.resetThreadLocals(); // (instanceof check always true) + int parkEnabled = 0, activePhase = phase + IDLE; while ((phase = w.phase) != activePhase) { - boolean trimmable = false; // true if at ctl head and quiescent - long d = 0L, c; + boolean head; // true if at ctl head + boolean trimmable = false; // true if at head and quiescent Thread.interrupted(); // clear status if ((runState & STOP) != 0L) break; - if ((int)(c = ctl) == activePhase) { - if ((c & RC_MASK) == 0L) { - long now = System.currentTimeMillis(); - if (deadline == 0L) - d = deadline = now + keepAlive; - else if ((d = deadline) - now <= TIMEOUT_SLOP) - trim = true; - if (trim && tryTrim(w, c, activePhase)) - break; - trim = false; - trimmable = true; - } - if (parking == 0) { // spin at head for approx 1 scan cost - int tc = ((short)(c >>> TC_SHIFT) | 1) & SMASK; - int spins = tc + (tc << 1); - while ((phase = w.phase) != activePhase && --spins != 0) - Thread.onSpinWait(); - if (phase == activePhase) - break; - } + long d = 0L, c = ctl; + if ((head = ((int)c == activePhase)) && (c & RC_MASK) == 0L) { + long now = System.currentTimeMillis(); + if (deadline == 0L) + d = deadline = now + keepAlive; + else if ((d = deadline) - now <= TIMEOUT_SLOP) + trim = true; + if (trim && tryTrim(w, c, activePhase)) + break; + trim = false; + trimmable = true; } - if (parking == 0) { // enable unpark + int spins = (parkEnabled != 0 || (nstolen == 0 && !head)) ? 1 : + SPIN_WAITS + (((short)(c >>> TC_SHIFT)) & SMASK); + while ((phase = w.phase) != activePhase && --spins != 0) + Thread.onSpinWait(); // spin if ran or at head + if (phase == activePhase) + break; + if (parkEnabled == 0) { LockSupport.setCurrentBlocker(this); - w.parking = parking = activePhase; + w.parking = parkEnabled = activePhase; if ((phase = w.phase) == activePhase) break; } U.park(trimmable, d); } - if (parking != 0) { + if (parkEnabled != 0) { w.parking = 0; LockSupport.setCurrentBlocker(null); }