From a7f1d63f6d4635c15f0d9b3edaeb15ec55e65773 Mon Sep 17 00:00:00 2001 From: Doug Lea Date: Tue, 27 Jan 2026 07:06:26 -0500 Subject: [PATCH] Avoid yield, for performance test --- .../java/util/concurrent/ForkJoinPool.java | 144 +++++++++--------- 1 file changed, 68 insertions(+), 76 deletions(-) diff --git a/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java b/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java index c1d50f520b0..a328e9b39a4 100644 --- a/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java +++ b/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java @@ -1055,7 +1055,6 @@ public class ForkJoinPool extends AbstractExecutorService // masks and sentinels for queue indices static final int MAX_CAP = 0x7fff; // max # workers static final int EXTERNAL_ID_MASK = 0x3ffe; // max external queue id - static final int INVALID_ID = 0x4000; // unused external queue id // pool.runState bits static final long STOP = 1L << 0; // terminating @@ -1193,7 +1192,7 @@ public class ForkJoinPool extends AbstractExecutorService @jdk.internal.vm.annotation.Contended("w") int nsteals; // number of steals from other queues @jdk.internal.vm.annotation.Contended("w") - int dropOnEmptyScan; // nonzero if trimmable + int trimStatus; // <0: drop on empty scan; >0 dropped, else 0 // Support for atomic operations private static final Unsafe U; @@ -1300,7 +1299,7 @@ public class ForkJoinPool extends AbstractExecutorService return; } } - --top; // back out + U.putIntOpaque(this, TOP, top - 1); // backout if (unlock != 1) phase = unlock; throw new RejectedExecutionException("Queue capacity exceeded"); @@ -1784,7 +1783,7 @@ public class ForkJoinPool extends AbstractExecutorService if (wt != null && (w = wt.workQueue) != null && (phase = w.phase) != 0 && (phase & IDLE) != 0) releaseWaiters(); // ensure released - if (w == null || w.source != DROPPED) { + if (w == null || w.trimStatus <= 0) { long c = ctl; // decrement counts do {} while (c != (c = U.compareAndExchangeLong(this, CTL, c, ((RC_MASK & (c - RC_UNIT)) | @@ -1805,7 +1804,7 @@ public class ForkJoinPool extends AbstractExecutorService } } if ((tryTerminate(false, false) & STOP) == 0L && - phase != 0 && w != null && w.source != DROPPED) { + phase != 0 && w != null && w.trimStatus <= 0) { w.cancelTasks(); // clean queue signalWork(null, 0); // possibly replace } @@ -1822,8 +1821,8 @@ public class ForkJoinPool extends AbstractExecutorService * @param base src's base index for the task */ final void signalWork(WorkQueue src, int base) { - int pc = parallelism, i, sp; // rely on caller sync for initial reads - long c = U.getLong(this, CTL); + int pc = parallelism, i, sp; + long c = ctl; WorkQueue[] qs = queues; while ((short)(c >>> RC_SHIFT) < pc && qs != null && qs.length > (i = (sp = (int)c) & SMASK)) { @@ -1848,7 +1847,7 @@ public class ForkJoinPool extends AbstractExecutorService break; } qs = queues; - if (src != null && src.base - base > 0) + if (src != null && src.base - base > 0) break; } } @@ -1943,7 +1942,7 @@ public class ForkJoinPool extends AbstractExecutorService if ((q = qs[qid = i & (n - 1)]) != null) { ForkJoinTask[] a; int pb = -1, ran = 0, cap; while ((a = q.array) != null && (cap = a.length) > 0) { - int b, nb; long bp, np, ps; ForkJoinTask t; + int b, nb; long bp, np; ForkJoinTask t; t = (ForkJoinTask)U.getReferenceAcquire( a, bp = slotOffset((cap - 1) & (b = q.base))); Object nt = U.getReference( @@ -1956,40 +1955,44 @@ public class ForkJoinPool extends AbstractExecutorService break scan; // can't take yet } else if (qb != b) { // inconsistent - if (taken == 0) + if (src < 0) break scan; // busy } else if (t == null) { if (U.getReference(a, bp) == null) { - if (ran != 0) // end run on this queue - break scan; - if (nt == null) // probably empty + if (nt == null) { // probably empty + if (ran != 0) // end run on this queue + break scan; break; + } if (pb == (pb = b)) break scan; // stalled; reorder scan } } else if (U.compareAndSetReference(a, bp, t, null)) { - nt = U.getReference(a, np); - U.getAndSetInt(q, WorkQueue.BASE, nb); - if (qid != (ps = src)) + Object rnt = U.getReference(a, np); // reread + U.getAndSetInt(q, WorkQueue.BASE, pb = nb); + boolean propagate = rnt != null && + (qid != src || + ((qid & 1) == 0 && (fifo != 0 || taken == 0))); + ran = 1; + ++taken; + if (qid != src) U.putIntOpaque(w, WorkQueue.SOURCE, src = qid); - if (nt != null && - (qid != ps || - ((qid & 1) == 0 && (fifo != 0 || taken == 0))) && - U.getReferenceAcquire(a, np) != null) - signalWork(q, nb); // propagate - ran = ++taken; + if (propagate && U.getReferenceAcquire(a, np) != null) + signalWork(q, nb); w.topLevelExec(t, fifo); } } } if (--j == 0) { // empty scan - if (idle == 0) { - idle = tryDeactivate(w, taken); + if (taken != 0) { + w.nsteals += taken; taken = 0; } - else if ((idle = awaitWork(w)) != 0) + else if (idle == 0) + idle = tryDeactivate(w); + else if ((idle = awaitWork(w, src)) != 0) break rescan; // trimmed or terminated else src = -1; @@ -2001,40 +2004,29 @@ public class ForkJoinPool extends AbstractExecutorService } /** - * Possibly deactivates or pauses worker + * Possibly deactivates worker * * @param w the work queue - * @param taken number of tasks taken since last activate * @return 0 if now active */ - private int tryDeactivate(WorkQueue w, int taken) { + private int tryDeactivate(WorkQueue w) { int idle = 0; - if (w != null) { // always true; hoist checks - if (taken != 0) { // rescan before deactivating - w.nsteals += taken; - if ((w.config & CLEAR_TLS) != 0 && - (Thread.currentThread() instanceof ForkJoinWorkerThread f)) - f.resetThreadLocals(); // (instanceof check always true) - if ((runState & (SHUTDOWN|STOP)) == 0L) - Thread.yield(); // pause before rescan - } - else { - int phase = U.getInt(w, WorkQueue.PHASE); - long sp = (phase + NEXTIDLE) & LMASK, pc = ctl; - U.putInt(w, WorkQueue.PHASE, phase | IDLE); - for (;;) { // enqueue - w.stackPred = (int)pc; - long c = (pc - RC_UNIT) & UMASK | sp, ac = c & RC_MASK; - if (pc == (pc = U.compareAndExchangeLong(this, CTL, pc, c))) { - if (ac == 0L && (runState & (SHUTDOWN|STOP)) == SHUTDOWN) - quiescent(); // check quiescent termination - idle = w.phase & IDLE; - break; - } - else if (ac < (pc & RC_MASK)) { - w.phase = phase; // back out if lost to signal - break; - } + if (w != null) { // always true; hoist checks + int phase = U.getInt(w, WorkQueue.PHASE); + long sp = (phase + NEXTIDLE) & LMASK, pc = ctl; + U.putInt(w, WorkQueue.PHASE, phase | IDLE); + for (;;) { // enqueue + w.stackPred = (int)pc; + long c = (pc - RC_UNIT) & UMASK | sp, ac = c & RC_MASK; + if (pc == (pc = U.compareAndExchangeLong(this, CTL, pc, c))) { + if (ac == 0L && (runState & (SHUTDOWN|STOP)) == SHUTDOWN) + quiescent(); // check quiescent termination + idle = w.phase & IDLE; + break; + } + else if (ac < (pc & RC_MASK)) { + w.phase = phase; // back out if lost to signal + break; } } } @@ -2068,36 +2060,36 @@ public class ForkJoinPool extends AbstractExecutorService * @param w the work queue * @return 0 if now active */ - private int awaitWork(WorkQueue w) { + private int awaitWork(WorkQueue w, int src) { int idle = 0, phase; if (w != null && (idle = (phase = w.phase) & IDLE) != 0) { + ForkJoinWorkerThread t; int activePhase = phase + IDLE, trim; - if ((trim = w.dropOnEmptyScan) != 0) - w.dropOnEmptyScan = 0; + if ((trim = w.trimStatus) != 0) + w.trimStatus = 0; + if (src >= 0 && (w.config & CLEAR_TLS) != 0 && (t = w.owner) != null) + t.resetThreadLocals(); // clear before reactivate for (long deadline = 0L;;) { Thread.interrupted(); // clear status if ((runState & STOP) != 0L) break; - boolean trimmable = false; + boolean trimmable = false; // true if at head of ctl and quiescent long d = 0L, c; - int ac = (short)((c = ctl) >>> RC_SHIFT); - int spins = ((short)(c >>> TC_SHIFT) | 1) & SMASK; // >= # workers - if ((int)c == activePhase) { // at head of ctl - spins += Math.max(spins << 1, SPIN_WAITS); // approx 1 scan cost - if (ac == 0) { // quiescent - long now = System.currentTimeMillis(); - if (deadline == 0L) - d = deadline = now + keepAlive; - else if ((d = deadline) - now <= TIMEOUT_SLOP) - trim = 1; - if (trim != 0 && tryTrim(w, c, activePhase)) - break; - trim = 0; - trimmable = true; - } + if (((c = ctl) & RC_MASK) == 0L && (int)c == activePhase) { + long now = System.currentTimeMillis(); + if (deadline == 0L) + d = deadline = now + keepAlive; + else if ((d = deadline) - now <= TIMEOUT_SLOP) + trim = 1; + if (trim != 0 && tryTrim(w, c, activePhase)) + break; + trim = 0; + trimmable = true; } + int tc = ((short)(c >>> TC_SHIFT) | 1) & SMASK; // >= # workers + int spins = tc + Math.max(tc << 1, SPIN_WAITS); while ((idle = w.phase & IDLE) != 0 && --spins != 0) - Thread.onSpinWait(); + Thread.onSpinWait(); // spin for approx 1 scan cost if (idle == 0) break; LockSupport.setCurrentBlocker(this); @@ -2124,14 +2116,14 @@ public class ForkJoinPool extends AbstractExecutorService long nc = ((w.stackPred & LMASK) | ((RC_MASK & c) | (TC_MASK & (c - TC_UNIT)))); if (U.compareAndSetLong(this, CTL, c, nc)) { - w.source = DROPPED; + w.trimStatus = 1; w.phase = activePhase; if ((vp = (int)nc) != 0 && (vs = queues) != null && vs.length > (i = vp & SMASK) && (v = vs[i]) != null && U.compareAndSetLong(this, CTL, // try to wake up next waiter nc, ((v.stackPred & LMASK) | ((UMASK & (nc + RC_UNIT)) | (nc & TC_MASK))))) { - v.dropOnEmptyScan = 1; + v.trimStatus = -1; v.phase = vp; U.unpark(v.owner); }