diff --git a/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java b/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java index 80ac3a9a71f..0f04f9a891a 100644 --- a/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java +++ b/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java @@ -1933,66 +1933,62 @@ public class ForkJoinPool extends AbstractExecutorService if (w != null) { int r = w.stackPred; // seed from registerWorker int fifo = (int)config & FIFO; - for (int idle = 0, taken = 0, src = -1;;) { + for (int nsteals = 0, idle = 0, src = -1;;) { WorkQueue[] qs; int n, j; r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift long e = runState; if ((qs = queues) == null || (j = ((n = qs.length) << 1)) <= 0 || // 2 sweeps per scan - (e & STOP) != 0L) { - w.nsteals += taken; + (e & STOP) != 0L) return; - } scan: for (int i = r, step = (r >>> 16) | 1; ; i += step) { WorkQueue q; int qid; if ((q = qs[qid = i & (n - 1)]) != null) { - for (int pb = -1, found = 0;;) { - ForkJoinTask t; - ForkJoinTask[] a = q.array; - int b = q.base, cap, nb; - if (a == null || (cap = a.length) <= 0) - break; - long np = slotOffset((cap - 1) & (nb = b + 1)), bp; + ForkJoinTask[] a; int pb = -1, taken = 0, cap; + while ((a = q.array) != null && (cap = a.length) > 0) { + ForkJoinTask t; int b, nb, prevSrc; long bp; t = (ForkJoinTask)U.getReferenceAcquire( - a, bp = slotOffset((cap - 1) & b)); - if (q.array != a || q.base != b || - U.getReference(a, bp) != t) - ; // inconsistent - else if (t == null || - (idle == 0 && - !U.compareAndSetReference(a, bp, t, null))) { - if (U.getReference(a, np) == null) { - if (found != 0) // end of run + a, bp = slotOffset((cap - 1) & (b = q.base))); + if (q.array == a && q.base == b) { // else inconsistent + long np = slotOffset((cap - 1) & (nb = b + 1)); + if (t == null || + (idle == 0 && + !U.compareAndSetReference(a, bp, t, null))) { + if (taken != 0) { // end of run + w.nsteals = nsteals += taken; break scan; - break; // probably empty + } + if (U.getReference(a, bp) == null) { + if (U.getReference(a, np) == null) + break; // probably empty + if (pb == (pb = b)) + break scan; // stalled; reorder scan + } + } + else if (idle != 0) { + if ((idle = tryReactivate(w)) != 0) + break scan; // can't take yet + } + else { + q.base = nb; + Object nt = U.getReferenceVolatile(a, np); + if (qid != (prevSrc = src)) + w.source = src = qid; + ++taken; + if (nt != null && (prevSrc < 0 || fifo != 0) && + U.getReference(a, np) != null) + signalWork(q, nb); + w.topLevelExec(t, fifo); } - if (pb == (pb = b)) // track progress - break scan; // stalled; reorder scan - } - else if (idle != 0) { - found = 1; - if ((idle = tryReactivate(w)) != 0) - break scan; // can't take yet - } - else { - boolean propagate; - q.base = nb; - Object nt = U.getReferenceVolatile(a, np); - if (qid != src) - w.source = src = qid; - found = 1; - if (propagate = - ((++taken == 1 || fifo != 0) && - nt != null && U.getReference(a, np) != null)) - signalWork(q, nb); - w.topLevelExec(t, fifo); } } } if (--j == 0) { // empty scan - if ((idle = deactivate(w, idle, taken)) == -1) + if (idle == 0) + idle = tryDeactivate(w, src); + else if ((idle = awaitWork(w)) != 0) return; // trimmed or terminated - taken = 0; + src = -1; break; } } @@ -2004,50 +2000,51 @@ public class ForkJoinPool extends AbstractExecutorService * Possibly deactivates worker. * * @param w the work queue - * @param idle current idle status - * @param taken number of tasks stolen since last call - * @return idle status or -1 for exit + * @return idle status */ - private int deactivate(WorkQueue w, int idle, int taken) { + private int tryDeactivate(WorkQueue w, int src) { + int idle = 0; if (w != null) { // always true; hoist checks - int phase = w.phase; - if (taken != 0) { // postpone - w.nsteals += taken; - if ((w.config & CLEAR_TLS) != 0 && - (Thread.currentThread() instanceof ForkJoinWorkerThread f)) - f.resetThreadLocals(); // (instanceof check always true) - Thread.interrupted(); // clear status - } - else if (idle == 0) { - long sp = (phase + NEXTIDLE) & LMASK, pc = ctl; - U.putInt(w, WorkQueue.PHASE, phase | IDLE); - for (;;) { // enqueue - w.stackPred = (int)pc; - long c = (pc - RC_UNIT) & UMASK | sp, ac = c & RC_MASK; - if (pc == (pc = U.compareAndExchangeLong(this, CTL, pc, c))) { - if (ac <= 0L && (runState & (SHUTDOWN|STOP)) == SHUTDOWN) + if (src >= 0 && (w.config & CLEAR_TLS) != 0 && + (Thread.currentThread() instanceof ForkJoinWorkerThread f)) + f.resetThreadLocals(); // (instanceof check always true) + int phase = U.getInt(w, WorkQueue.PHASE); + long sp = (phase + NEXTIDLE) & LMASK, pc = ctl; + U.putInt(w, WorkQueue.PHASE, phase | IDLE); + for (;;) { // enqueue + w.stackPred = (int)pc; + long c = (pc - RC_UNIT) & UMASK | sp, ac = c & RC_MASK; + if (pc == (pc = U.compareAndExchangeLong(this, CTL, pc, c))) { + if (ac == 0L) { + if ((runState & (SHUTDOWN|STOP)) == SHUTDOWN) quiescent(); // check quiescent termination idle = w.phase & IDLE; - break; } - else if (ac < (pc & RC_MASK)) { - w.phase = phase; // back out if lost to signal - break; + else { + int tc = ((short)(c >>> TC_SHIFT) | 1) & SMASK; + int spins = tc + Math.max(tc << 1, SPIN_WAITS); + while ((idle = w.phase & IDLE) != 0 && --spins != 0) + Thread.onSpinWait(); // spin for approx 1 scan cost } + break; + } + else if (ac < (pc & RC_MASK)) { + w.phase = phase; // back out if lost to signal + break; } } - else if ((idle = phase & IDLE) != 0) - idle = awaitWork(w, phase); } return idle; } + /** * Reactivates worker w if it is currently top of ctl stack * * @param w the work queue * @return 0 if now active */ + @jdk.internal.vm.annotation.DontInline // avoid prefetch of contending memory private int tryReactivate(WorkQueue w) { int idle = 0; if (w != null) { // always true; hoist checks @@ -2070,12 +2067,11 @@ public class ForkJoinPool extends AbstractExecutorService * Awaits signal or termination. * * @param w the work queue - * @param phase current (inactive) phase * @return 0 if now active else -1 */ - private int awaitWork(WorkQueue w, int phase) { - int idle = IDLE; - if (w != null) { // always true; hoist checks + private int awaitWork(WorkQueue w) { + int idle = IDLE, phase; + if (w != null && (idle = (phase = w.phase) & IDLE) != 0) { int activePhase = phase + IDLE, trim; if ((trim = w.trimStatus) != 0) w.trimStatus = 0; @@ -2107,7 +2103,7 @@ public class ForkJoinPool extends AbstractExecutorService Thread.interrupted(); // clear status for next park } } - return (idle != 0) ? -1 : 0; + return idle; } /**