diff --git a/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java b/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java index 0f04f9a891a..fda2f87da08 100644 --- a/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java +++ b/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java @@ -1931,15 +1931,14 @@ public class ForkJoinPool extends AbstractExecutorService */ final void runWorker(WorkQueue w) { if (w != null) { - int r = w.stackPred; // seed from registerWorker + int phase = w.phase, r = w.stackPred; // seed from registerWorker int fifo = (int)config & FIFO; - for (int nsteals = 0, idle = 0, src = -1;;) { + for (int nsteals = 0, src = -1;;) { WorkQueue[] qs; int n, j; r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift long e = runState; - if ((qs = queues) == null || - (j = ((n = qs.length) << 1)) <= 0 || // 2 sweeps per scan - (e & STOP) != 0L) + if ((qs = queues) == null || (e & STOP) != 0L || + (j = ((n = qs.length) << 1)) <= 0) // 2 sweeps per scan return; scan: for (int i = r, step = (r >>> 16) | 1; ; i += step) { WorkQueue q; int qid; @@ -1952,8 +1951,7 @@ public class ForkJoinPool extends AbstractExecutorService if (q.array == a && q.base == b) { // else inconsistent long np = slotOffset((cap - 1) & (nb = b + 1)); if (t == null || - (idle == 0 && - !U.compareAndSetReference(a, bp, t, null))) { + !U.compareAndSetReference(a, bp, t, null)) { if (taken != 0) { // end of run w.nsteals = nsteals += taken; break scan; @@ -1965,17 +1963,14 @@ public class ForkJoinPool extends AbstractExecutorService break scan; // stalled; reorder scan } } - else if (idle != 0) { - if ((idle = tryReactivate(w)) != 0) - break scan; // can't take yet - } else { q.base = nb; Object nt = U.getReferenceVolatile(a, np); if (qid != (prevSrc = src)) w.source = src = qid; ++taken; - if (nt != null && (prevSrc < 0 || fifo != 0) && + if (nt != null && + (qid != prevSrc || fifo != 0) && U.getReference(a, np) != null) signalWork(q, nb); w.topLevelExec(t, fifo); @@ -1984,9 +1979,7 @@ public class ForkJoinPool extends AbstractExecutorService } } if (--j == 0) { // empty scan - if (idle == 0) - idle = tryDeactivate(w, src); - else if ((idle = awaitWork(w)) != 0) + if (((phase = deactivate(w, phase)) & IDLE) != 0) return; // trimmed or terminated src = -1; break; @@ -2000,83 +1993,67 @@ public class ForkJoinPool extends AbstractExecutorService * Possibly deactivates worker. * * @param w the work queue - * @return idle status + * @param p current phase + * @return current phase, with IDLE set if worker should exit */ - private int tryDeactivate(WorkQueue w, int src) { - int idle = 0; - if (w != null) { // always true; hoist checks - if (src >= 0 && (w.config & CLEAR_TLS) != 0 && - (Thread.currentThread() instanceof ForkJoinWorkerThread f)) - f.resetThreadLocals(); // (instanceof check always true) - int phase = U.getInt(w, WorkQueue.PHASE); - long sp = (phase + NEXTIDLE) & LMASK, pc = ctl; - U.putInt(w, WorkQueue.PHASE, phase | IDLE); - for (;;) { // enqueue - w.stackPred = (int)pc; - long c = (pc - RC_UNIT) & UMASK | sp, ac = c & RC_MASK; - if (pc == (pc = U.compareAndExchangeLong(this, CTL, pc, c))) { - if (ac == 0L) { - if ((runState & (SHUTDOWN|STOP)) == SHUTDOWN) - quiescent(); // check quiescent termination - idle = w.phase & IDLE; - } - else { - int tc = ((short)(c >>> TC_SHIFT) | 1) & SMASK; - int spins = tc + Math.max(tc << 1, SPIN_WAITS); - while ((idle = w.phase & IDLE) != 0 && --spins != 0) - Thread.onSpinWait(); // spin for approx 1 scan cost - } - break; - } - else if (ac < (pc & RC_MASK)) { - w.phase = phase; // back out if lost to signal - break; - } - } + private int deactivate(WorkQueue w, int p) { + if (w == null) // currently impossible + return IDLE; + int activePhase = p + NEXTIDLE, sp; + long ap = activePhase & LMASK, pc = ctl, qc; + U.putInt(w, WorkQueue.PHASE, p | IDLE); + for (;;) { // enqueue + sp = w.stackPred = (int)pc; + if (pc == (pc = U.compareAndExchangeLong( + this, CTL, pc, qc = (pc - RC_UNIT) & UMASK | ap))) + break; + else if ((qc & RC_MASK) < (pc & RC_MASK)) + return w.phase = p; // back out if lost to signal } - return idle; - } + int ac = (short)(qc >>> RC_SHIFT), n; long e; WorkQueue[] qs; + if (((e = runState) & STOP) != 0L || + ((e & SHUTDOWN) != 0L && ac == 0 && quiescent() > 0) || + (qs = queues) == null || (n = qs.length) <= 0) + return IDLE; // terminating - - /** - * Reactivates worker w if it is currently top of ctl stack - * - * @param w the work queue - * @return 0 if now active - */ - @jdk.internal.vm.annotation.DontInline // avoid prefetch of contending memory - private int tryReactivate(WorkQueue w) { - int idle = 0; - if (w != null) { // always true; hoist checks - int sp = w.stackPred, phase, activePhase; long c; - if ((idle = (phase = w.phase) & IDLE) != 0 && - (activePhase = phase + IDLE) == (int)(c = ctl)) { - if (U.compareAndSetLong(this, CTL, c, - (sp & LMASK) | ((c + RC_UNIT) & UMASK))) { - w.phase = activePhase; - idle = 0; - } - else - idle = w.phase & IDLE; - } + for (int prechecks = Math.min(ac, 2), // reactivation threshold + k = Math.max(n + (n << 1), SPIN_WAITS << 1);;) { + WorkQueue q; int cap; ForkJoinTask[] a; long c; + if ((p = w.phase) == activePhase) + return activePhase; + if (--k < 0) + return awaitWork(w, p); // block, drop, or exit + if ((q = qs[k & (n - 1)]) == null) + Thread.onSpinWait(); + else if ((a = q.array) != null && (cap = a.length) > 0 && + a[q.base & (cap - 1)] != null && --prechecks < 0 && + (int)(c = ctl) == activePhase && + U.compareAndSetLong(this, CTL, c, + (sp & LMASK) | ((c + RC_UNIT) & UMASK))) + return w.phase = activePhase; // reactivate } - return idle; } /** * Awaits signal or termination. * * @param w the work queue - * @return 0 if now active else -1 + * @param p current phase (known to be idle) + * @return current phase, with IDLE set if worker should exit */ - private int awaitWork(WorkQueue w) { - int idle = IDLE, phase; - if (w != null && (idle = (phase = w.phase) & IDLE) != 0) { - int activePhase = phase + IDLE, trim; + private int awaitWork(WorkQueue w, int p) { + if (w != null) { + long deadline = 0L; + int activePhase = p + IDLE, trim; if ((trim = w.trimStatus) != 0) w.trimStatus = 0; - long deadline = 0L; - while ((runState & STOP) == 0L) { + if ((w.config & CLEAR_TLS) != 0 && // clear before reactivate + (Thread.currentThread() instanceof ForkJoinWorkerThread f)) + f.resetThreadLocals(); // (instanceof check always true) + for (;;) { + Thread.interrupted(); // clear status + if ((runState & STOP) != 0L) + break; boolean trimmable = false; // true if at ctl head and quiescent long d = 0L, c; if ((int)(c = ctl) == activePhase && (c & RC_MASK) == 0L) { @@ -2090,20 +2067,19 @@ public class ForkJoinPool extends AbstractExecutorService trim = 0; trimmable = true; } - if ((idle = w.phase & IDLE) == 0) + if ((p = w.phase) == activePhase) break; LockSupport.setCurrentBlocker(this); w.parking = 1; // enable unpark and recheck - if ((idle = w.phase & IDLE) != 0) + if (w.phase != activePhase) U.park(trimmable, d); w.parking = 0; // close unpark window LockSupport.setCurrentBlocker(null); - if (idle == 0 || (idle = w.phase & IDLE) == 0) + if ((p = w.phase) == activePhase) break; - Thread.interrupted(); // clear status for next park } } - return idle; + return p; } /**