diff --git a/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java b/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java index f59b22eda36..efe063d136f 100644 --- a/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java +++ b/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java @@ -1168,6 +1168,12 @@ public class ForkJoinPool extends AbstractExecutorService } } + /** + * Exception thrown in tryTrim after idle timeout + */ + @SuppressWarnings("serial") + static final class WorkerTrimmedException extends RuntimeException { } + /** * Queues supporting work-stealing as well as external task * submission. See above for descriptions and algorithms. @@ -1309,23 +1315,20 @@ public class ForkJoinPool extends AbstractExecutorService * so acts as either local-pop or local-poll. Called only by owner. * @param fifo nonzero if FIFO mode */ - private ForkJoinTask nextLocalTask(int fifo) { + final ForkJoinTask nextLocalTask(int fifo) { ForkJoinTask t = null; ForkJoinTask[] a = array; int b = base, s = top - 1, cap; if (a != null && s - b >= 0 && (cap = a.length) > 0) { if (fifo == 0) { if ((t = (ForkJoinTask)U.getAndSetReference( - a, slotOffset((cap - 1) & s), null)) != null) { - top = s; - U.storeFence(); - } + a, slotOffset((cap - 1) & s), null)) != null) + U.putIntOpaque(this, TOP, s); } else { do { if ((t = (ForkJoinTask)U.getAndSetReference( a, slotOffset((cap - 1) & b), null)) != null) { - base = b + 1; - U.storeFence(); + U.putIntVolatile(this, BASE, b + 1); break; } if (b == s) @@ -1359,8 +1362,7 @@ public class ForkJoinPool extends AbstractExecutorService (internal || (lock = tryLockPhase()) != 1)) { if (top == p && U.compareAndSetReference(a, k, task, null)) { taken = true; - top = s; - U.storeFence(); + U.putIntOpaque(this, TOP, s); } if (!internal) phase = lock + NEXTIDLE; @@ -1409,8 +1411,7 @@ public class ForkJoinPool extends AbstractExecutorService Thread.onSpinWait(); // stalled } else if (U.compareAndSetReference(a, k, t, null)) { - base = nb; - U.storeFence(); + U.putIntVolatile(this, BASE, nb); return t; } } @@ -1419,16 +1420,6 @@ public class ForkJoinPool extends AbstractExecutorService // specialized execution methods - /** - * Runs the given task, as well as remaining local tasks - */ - final void topLevelExec(ForkJoinTask task, int fifo) { - while (task != null) { - task.doExec(); - task = nextLocalTask(fifo); - } - } - /** * Deep form of tryUnpush: Traverses from top and removes and * runs task if present. @@ -1508,10 +1499,8 @@ public class ForkJoinPool extends AbstractExecutorService break; if (taken = (top == p && - U.compareAndSetReference(a, k, t, null))) { - top = s; - U.storeFence(); - } + U.compareAndSetReference(a, k, t, null))) + U.putIntOpaque(this, TOP, s); if (!internal) phase = lock + NEXTIDLE; if (!taken) @@ -1548,8 +1537,7 @@ public class ForkJoinPool extends AbstractExecutorService break; if (base == b && t != null && U.compareAndSetReference(a, k, t, null)) { - base = b + 1; - U.storeFence(); + U.putIntVolatile(this, BASE, b + 1); t.doExec(); } } @@ -1787,6 +1775,8 @@ public class ForkJoinPool extends AbstractExecutorService final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) { WorkQueue w = null; // null if not created int phase = 0; // 0 if not registered + if (ex instanceof WorkerTrimmedException) + ex = null; if (wt != null && (w = wt.workQueue) != null && (phase = w.phase) != 0 && (phase & IDLE) != 0) releaseWaiters(); // ensure released @@ -1828,8 +1818,8 @@ public class ForkJoinPool extends AbstractExecutorService * @param base src's base index for the task */ final void signalWork(WorkQueue src, int base) { - int pc = parallelism, i, sp; // rely on caller sync for initial reads - long c = U.getLong(this, CTL); + int pc = parallelism, i, sp; + long c = U.getLong(this, CTL); // rely on caller sync for initial reads WorkQueue[] qs = queues; while ((short)(c >>> RC_SHIFT) < pc && qs != null && qs.length > (i = (sp = (int)c) & SMASK)) { @@ -1935,14 +1925,14 @@ public class ForkJoinPool extends AbstractExecutorService final void runWorker(WorkQueue w) { if (w != null) { int r = w.stackPred; // seed from registerWorker - int fifo = (int)config & FIFO; - int src = 0, idle = 0, rescans = 0, taken = 0; - for (;;) { + int fifo = (int)config & FIFO, idle = 0; + for (boolean scanned = false;;) { WorkQueue[] qs; long e = runState; int n = ((qs = queues) == null) ? 0 : qs.length; r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift int i = r, step = (r >>> 16) | 1; + boolean found = false; if ((e & STOP) != 0L || n <= 0) break; scan: for (int j = n; j != 0; --j, i += step) { @@ -1950,91 +1940,127 @@ public class ForkJoinPool extends AbstractExecutorService if ((q = qs[qid = i & (n - 1)]) != null) { ForkJoinTask[] a; int cap; // poll queue while ((a = q.array) != null && (cap = a.length) > 0) { - int b, nb; long bp; ForkJoinTask t; + int b, nb; long bp, np; ForkJoinTask t; t = (ForkJoinTask)U.getReferenceAcquire( a, bp = slotOffset((cap - 1) & (b = q.base))); - long np = slotOffset((nb = b + 1) & (cap - 1)); + Object nt = U.getReference( + a, np = slotOffset((cap - 1) & (nb = b + 1))); if (q.array == a && q.base == b && U.getReference(a, bp) == t) { if (t == null) { - if (rescans > 0) // ran or stalled - break scan; - if (U.getReference(a, np) == null && - (rescans == 0 || q.top == b)) - break; // retry at most twice - ++rescans; // may be stalled + if (nt == null && (!scanned || q.top == b)) { + found = false; + break; + } + if (found) + break scan; // stall check + found = true; } else if (idle != 0) { - if ((idle = tryReactivate(w)) != 0) { - rescans = 1; + found = true; + if ((idle = tryReactivate(w)) != 0) break scan; // can't take yet - } - rescans = 0; } else if (U.compareAndSetReference(a, bp, t, null)) { - Object nt = U.getReference(a, np); + nt = U.getReference(a, np); U.getAndSetInt(q, WorkQueue.BASE, nb); if (nt != null) // propagate signalWork(q, nb); - rescans = 1; - if (taken++ == 0 || src != qid) - w.source = src = qid; - w.topLevelExec(t, fifo); + topLevelExec(t, w, q, fifo, qid); + found = true; + break scan; } } } } } - if (rescans > 0) - rescans = 0; + if (found) + scanned = false; + else if (!scanned && idle == 0) + scanned = true; + else if (!scanned || idle == 0) { + if ((idle = deactivate(w, idle)) == 0) + scanned = false; + } else { - if (rescans == 0) - rescans = -1; - else if (idle == 0) { - if ((idle = deactivate(w)) == 0) - rescans = 0; - } - else if ((idle = rescans = awaitWork(w)) != 0) + awaitWork(w); + idle = 0; + scanned = false; + } + } + } + } + + private void topLevelExec(ForkJoinTask t, WorkQueue w, WorkQueue q, + int fifo, int qid) { + if (t != null && q != null && w != null) { // always true; hoist checks + w.source = qid; + int taken = 1; + for (;;) { + t.doExec(); + if ((t = w.nextLocalTask(fifo)) == null) { + ForkJoinTask[] a; int cap, b, nb; long bp, np; + if ((a = q.array) == null || (cap = a.length) <= 0) + break; // similar to runWorker scan + t = (ForkJoinTask)U.getReferenceAcquire( + a, bp = slotOffset((cap - 1) & (b = q.base))); + Object nt = U.getReference( + a, np = slotOffset((cap - 1) & (nb = b + 1))); + if (t == null || q.base != b || + !U.compareAndSetReference(a, bp, t, null)) break; - if (idle != 0) { - int ns = taken; - taken = 0; - if ((idle = onEmptyScan(w, ns)) == 0) - rescans = 0; - } + nt = U.getReference(a, np); + U.getAndSetInt(q, WorkQueue.BASE, nb); + ++taken; + if (nt != null && // prevent stalls + (fifo != 0 || (t instanceof ForkJoinTask.InterruptibleTask))) + signalWork(q, nb); } } w.nsteals += taken; + if ((w.config & CLEAR_TLS) != 0 && + (Thread.currentThread() instanceof ForkJoinWorkerThread f)) + f.resetThreadLocals(); // (instanceof check always true) } } /** - * Deactivates and enqueues worker, possibly backing out on signal - * contention. + * Possibly deactivates and pauses worker * * @param w the work queue - * @return active status + * @param idle active status + * @return 0 if now active */ - private int deactivate(WorkQueue w) { - int idle = 0; - if (w != null) { // always true; hoist checks - int phase = U.getInt(w, WorkQueue.PHASE); - long sp = (phase + NEXTIDLE) & LMASK, pc = ctl; - U.putInt(w, WorkQueue.PHASE, phase | IDLE); - for (;;) { // try to enqueue - w.stackPred = (int)pc; - long c = (pc - RC_UNIT) & UMASK | sp, ac = c & RC_MASK; - if (pc == (pc = U.compareAndExchangeLong(this, CTL, pc, c))) { - if (ac == 0L) // check quiescent termination - quiescent(); - idle = w.phase & IDLE; - break; - } - else if (ac < (pc & RC_MASK)) { - w.phase = phase; // back out if lost to signal - break; + private int deactivate(WorkQueue w, int idle) { + if (w != null) { // always true; hoist checks + int phase = w.phase; + if (idle != 0) // already deactivated + idle = phase & IDLE; + else { // try to deactivate + long sp = (phase + NEXTIDLE) & LMASK, pc = ctl; + U.putInt(w, WorkQueue.PHASE, phase | IDLE); + for (;;) { // enqueue + w.stackPred = (int)pc; + long c = (pc - RC_UNIT) & UMASK | sp, ac = c & RC_MASK; + if (pc == (pc = U.compareAndExchangeLong(this, CTL, pc, c))) { + if (ac == 0L) // check quiescent termination + quiescent(); + idle = w.phase & IDLE; + break; + } + else if (ac < (pc & RC_MASK)) { + w.phase = phase; // back out if lost to signal + break; + } } } + if (idle != 0 && (runState & STOP) == 0L) { + int noise = (phase ^ (phase >>> 16)) & (SPIN_WAITS - 1); + int spins = (SPIN_WAITS << 1) | noise; + Thread.yield(); // helps when oversubscribed + while ((idle = w.phase & IDLE) != 0 && --spins > 0) + Thread.onSpinWait(); + } } return idle; } @@ -2060,80 +2086,52 @@ public class ForkJoinPool extends AbstractExecutorService return idle; } - /** - * Spins and/or yields to reduce unproductive scanning - * - * @param w the work queue - * @param taken number of stolen tasks since last empty scan - * @return active status - */ - private int onEmptyScan(WorkQueue w, int taken) { - int idle = 0; - if (w != null) { // always true; hoist checks - if (taken != 0) { - w.nsteals += taken; - if ((w.config & CLEAR_TLS) != 0 && - (Thread.currentThread() instanceof ForkJoinWorkerThread f)) - f.resetThreadLocals(); // (instanceof check always true) - Thread.interrupted(); // clear status - } - if ((idle = w.phase & IDLE) != 0 && (runState & STOP) == 0) { - if (taken == 0) - Thread.yield(); - for (int s = SPIN_WAITS; (idle = w.phase & IDLE) != 0 && --s != 0;) - Thread.onSpinWait(); - } - } - return idle; - } - /** * Awaits signal or termination. * * @param w the work queue - * @return 0 if now active + * @throws WorkerTrimmedException on idle timeout */ - private int awaitWork(WorkQueue w) { - int idle = 0, phase; - if (w != null && (idle = (phase = w.phase) & IDLE) != 0) { - LockSupport.setCurrentBlocker(this); + private void awaitWork(WorkQueue w) { + int phase; + if (w != null && ((phase = w.phase) & IDLE) != 0) { int activePhase = phase + IDLE; long deadline = 0L, waitTime = (w.source == INVALID_ID) ? 0L : keepAlive; - while ((runState & STOP) == 0L) { - boolean trimmable = false; // use timed wait if trimmable + LockSupport.setCurrentBlocker(this); + for (;;) { + Thread.interrupted(); // clear status + if ((runState & STOP) != 0L) + break; + int idle; long d = 0L, c; // trim at head + boolean trimmable = false; // use timed wait if trimmable if ((int)(c = ctl) == activePhase && (c & RC_MASK) == 0L) { + trimmable = true; long now = System.currentTimeMillis(); if (deadline == 0L) deadline = waitTime + now; - if (deadline - now <= TIMEOUT_SLOP) { - if (tryTrim(w, c, activePhase)) - break; - continue; // lost race to trim - } - d = deadline; - trimmable = true; + if ((d = deadline) - now <= TIMEOUT_SLOP) + tryTrim(w, c, activePhase); // throws if trimmed } - if ((idle = w.phase & IDLE) == 0) + if ((w.phase & IDLE) == 0) break; w.parking = 1; // enable unpark and recheck if ((idle = w.phase & IDLE) != 0) U.park(trimmable, d); w.parking = 0; // close unpark window - if (idle == 0 || (idle = w.phase & IDLE) == 0) + if (idle == 0 || (w.phase & IDLE) == 0) break; - Thread.interrupted(); // clear status for next park } LockSupport.setCurrentBlocker(null); } - return idle; } /** * Tries to remove and deregister worker after timeout, and release * another to do the same unless new tasks are found. + * @throws WorkerTrimmedException on idle timeout */ - private boolean tryTrim(WorkQueue w, long c, int activePhase) { + private void tryTrim(WorkQueue w, long c, int activePhase) { if (w != null) { int vp, i; WorkQueue[] vs; WorkQueue v; long nc = ((w.stackPred & LMASK) | @@ -2150,10 +2148,10 @@ public class ForkJoinPool extends AbstractExecutorService v.phase = vp; U.unpark(v.owner); } - return true; + LockSupport.setCurrentBlocker(null); + throw new WorkerTrimmedException(); } } - return false; } /** @@ -2382,8 +2380,7 @@ public class ForkJoinPool extends AbstractExecutorService if (eligible) { if (U.compareAndSetReference( a, k, t, null)) { - q.base = b + 1; - U.storeFence(); + U.putIntVolatile(q, WorkQueue.BASE, b + 1); t.doExec(); locals = rescan = true; break scan;