diff --git a/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java b/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java index efe063d136f..7fcd6919b08 100644 --- a/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java +++ b/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java @@ -1065,7 +1065,7 @@ public class ForkJoinPool extends AbstractExecutorService static final long RS_LOCK = 1L << 4; // lowest seqlock bit // spin/sleep limits for runState locking and elsewhere - static final int SPIN_WAITS = 1 << 7; // max calls to onSpinWait + static final int SPIN_WAITS = 1 << 8; // max calls to onSpinWait static final int MIN_SLEEP = 1 << 10; // approx 1 usec as nanos static final int MAX_SLEEP = 1 << 20; // approx 1 sec as nanos @@ -1256,19 +1256,19 @@ public class ForkJoinPool extends AbstractExecutorService */ final void push(ForkJoinTask task, ForkJoinPool pool, int unlock) { ForkJoinTask[] a = array; - int b = base, s = top, cap, m; + int b = base, s = top, cap, m; long pos; if (a == null || (cap = a.length) <= s + 1 - b || (m = cap - 1) < 0) growAndPush(task, pool, unlock); else { top = s + 1; - U.getAndSetReference(a, slotOffset(m & s), task); + U.getAndSetReference(a, pos = slotOffset(m & s), task); Object pred = U.getReferenceAcquire(a, slotOffset(m & (s - 1))); if (unlock != 1) { // release external lock U.putInt(this, PHASE, unlock); U.storeFence(); } if (pred == null && pool != null) - pool.signalWork(this, s); // may have appeared empty + pool.signalWork(a, pos); // may have appeared empty } } @@ -1297,11 +1297,12 @@ public class ForkJoinPool extends AbstractExecutorService break; // lost to pollers newArray[k & newMask] = u; } + long pos = slotOffset(s & newMask); U.putReferenceVolatile(this, ARRAY, newArray); if (unlock != 1) phase = unlock; if (pool != null) - pool.signalWork(this, s); + pool.signalWork(newArray, pos); return; } } @@ -1418,8 +1419,47 @@ public class ForkJoinPool extends AbstractExecutorService return null; } + /** + * Tries once to poll for a task + * @param pool if non-null, pool to propagate signals + */ + private ForkJoinTask tryPoll(ForkJoinPool pool) { + ForkJoinTask[] a; int cap, b, nb; long bp; + if ((a = array) != null && (cap = a.length) > 0) { + ForkJoinTask t = (ForkJoinTask)U.getReferenceAcquire( + a, bp = slotOffset((cap - 1) & (b = base))); + long np = slotOffset((cap - 1) & (nb = b + 1)); + if (t != null && base == b && + U.compareAndSetReference(a, bp, t, null)) { + Object nt = U.getReference(a, np); + U.getAndSetInt(this, BASE, nb); + if (pool != null && nt != null) + pool.signalWork(a, np); + return t; + } + } + return null; + } + // specialized execution methods + /** + * Runs the given task, as well as remaining local tasks + */ + final int topLevelExec(ForkJoinTask task, WorkQueue q, ForkJoinPool pool, + int fifo) { + ForkJoinPool p = (fifo == 0) ? null : pool; + int taken = 1; + while (task != null) { + task.doExec(); + if ((task = nextLocalTask(fifo)) == null && + (q == null || (task = q.tryPoll(p)) == null)) + break; + ++taken; + } + return taken; + } + /** * Deep form of tryUnpush: Traverses from top and removes and * runs task if present. @@ -1803,7 +1843,7 @@ public class ForkJoinPool extends AbstractExecutorService if ((tryTerminate(false, false) & STOP) == 0L && phase != 0 && w != null && w.source != DROPPED) { w.cancelTasks(); // clean queue - signalWork(null, 0); // possibly replace + signalWork(null, 0L); // possibly replace } if (ex != null) ForkJoinTask.rethrow(ex); @@ -1811,29 +1851,25 @@ public class ForkJoinPool extends AbstractExecutorService /** * Releases an idle worker, or creates one if not enough exist, - * giving up on contention if q is nonull and signalled slot - * already taken. + * giving up on contention if source slot already taken. * - * @param src, if nonnull, the WorkQueue containing signalled task - * @param base src's base index for the task + * @param src, if nonnull, the array containing signalled task + * @param offset slot offset for the task */ - final void signalWork(WorkQueue src, int base) { - int pc = parallelism, i, sp; - long c = U.getLong(this, CTL); // rely on caller sync for initial reads + final void signalWork(ForkJoinTask[] src, long offset) { + int pc = parallelism, i, sp; // rely on caller sync for initial reads + long c = U.getLong(this, CTL); WorkQueue[] qs = queues; while ((short)(c >>> RC_SHIFT) < pc && qs != null && qs.length > (i = (sp = (int)c) & SMASK)) { - WorkQueue v; long nc; - if (i == 0) { + WorkQueue w = qs[i], v = null; long nc; + if (i == 0 || w == null) { if ((short)(c >>> TC_SHIFT) >= pc) break; - v = null; nc = ((c + TC_UNIT) & TC_MASK) | ((c + RC_UNIT) & RC_MASK); } - else if ((v = qs[i]) == null) - break; else - nc = (v.stackPred & LMASK) | ((c + RC_UNIT) & UMASK); + nc = ((v = w).stackPred & LMASK) | ((c + RC_UNIT) & UMASK); if (c == (c = U.compareAndExchangeLong(this, CTL, c, nc))) { if (v == null) createWorker(); @@ -1845,7 +1881,7 @@ public class ForkJoinPool extends AbstractExecutorService break; } qs = queues; - if (src != null && src.base - base > 0) + if (src != null && U.getReference(src, offset) == null) break; } } @@ -1925,126 +1961,95 @@ public class ForkJoinPool extends AbstractExecutorService final void runWorker(WorkQueue w) { if (w != null) { int r = w.stackPred; // seed from registerWorker - int fifo = (int)config & FIFO, idle = 0; - for (boolean scanned = false;;) { + for (int fifo = (int)config & FIFO, idle = 0, taken = 0;;) { WorkQueue[] qs; long e = runState; int n = ((qs = queues) == null) ? 0 : qs.length; r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift int i = r, step = (r >>> 16) | 1; - boolean found = false; - if ((e & STOP) != 0L || n <= 0) + if ((e & STOP) != 0L || n <= 0) { + w.nsteals += taken; break; + } + boolean rescan = false; scan: for (int j = n; j != 0; --j, i += step) { WorkQueue q; int qid; if ((q = qs[qid = i & (n - 1)]) != null) { - ForkJoinTask[] a; int cap; // poll queue + ForkJoinTask[] a; int pb = -1, cap; // poll queue while ((a = q.array) != null && (cap = a.length) > 0) { int b, nb; long bp, np; ForkJoinTask t; t = (ForkJoinTask)U.getReferenceAcquire( a, bp = slotOffset((cap - 1) & (b = q.base))); Object nt = U.getReference( a, np = slotOffset((cap - 1) & (nb = b + 1))); - if (q.array == a && q.base == b && - U.getReference(a, bp) == t) { - if (t == null) { - if (nt == null && (!scanned || q.top == b)) { - found = false; + int qb = q.base; // reread + if (idle != 0) { + if (t == null && nt == null && q.top - qb <= 0) + break; + if ((idle = tryReactivate(w)) != 0) { + rescan = true; + break scan; // can't take yet + } + } + else if (qb != b) // inconsistent/busy + ; + else if (t == null) { + if (q.array == a && U.getReference(a, bp) == null) { + if (nt == null) break; + if (pb == (pb = b)) { + rescan = true; // stalled; reorder scan + break scan; } - if (found) - break scan; // stall check - found = true; - } - else if (idle != 0) { - found = true; - if ((idle = tryReactivate(w)) != 0) - break scan; // can't take yet - } - else if (U.compareAndSetReference(a, bp, t, null)) { - nt = U.getReference(a, np); - U.getAndSetInt(q, WorkQueue.BASE, nb); - if (nt != null) // propagate - signalWork(q, nb); - topLevelExec(t, w, q, fifo, qid); - found = true; - break scan; } } + else if (U.compareAndSetReference(a, bp, t, null)) { + nt = U.getReference(a, np); + U.getAndSetInt(q, WorkQueue.BASE, nb); + if (nt != null) // propagate + signalWork(a, np); + w.source = qid; + taken += w.topLevelExec(t, q, this, fifo); + rescan = true; + break scan; + } } } } - if (found) - scanned = false; - else if (!scanned && idle == 0) - scanned = true; - else if (!scanned || idle == 0) { - if ((idle = deactivate(w, idle)) == 0) - scanned = false; - } - else { - awaitWork(w); - idle = 0; - scanned = false; + if (!rescan) { + idle = onEmptyScan(w, idle, taken); + taken = 0; } } } } - private void topLevelExec(ForkJoinTask t, WorkQueue w, WorkQueue q, - int fifo, int qid) { - if (t != null && q != null && w != null) { // always true; hoist checks - w.source = qid; - int taken = 1; - for (;;) { - t.doExec(); - if ((t = w.nextLocalTask(fifo)) == null) { - ForkJoinTask[] a; int cap, b, nb; long bp, np; - if ((a = q.array) == null || (cap = a.length) <= 0) - break; // similar to runWorker scan - t = (ForkJoinTask)U.getReferenceAcquire( - a, bp = slotOffset((cap - 1) & (b = q.base))); - Object nt = U.getReference( - a, np = slotOffset((cap - 1) & (nb = b + 1))); - if (t == null || q.base != b || - !U.compareAndSetReference(a, bp, t, null)) - break; - nt = U.getReference(a, np); - U.getAndSetInt(q, WorkQueue.BASE, nb); - ++taken; - if (nt != null && // prevent stalls - (fifo != 0 || (t instanceof ForkJoinTask.InterruptibleTask))) - signalWork(q, nb); - } - } - w.nsteals += taken; - if ((w.config & CLEAR_TLS) != 0 && - (Thread.currentThread() instanceof ForkJoinWorkerThread f)) - f.resetThreadLocals(); // (instanceof check always true) - } - } - /** - * Possibly deactivates and pauses worker + * Possibly deactivates, reactivates or pauses worker * * @param w the work queue - * @param idle active status * @return 0 if now active */ - private int deactivate(WorkQueue w, int idle) { + private int onEmptyScan(WorkQueue w, int idle, int taken) { if (w != null) { // always true; hoist checks int phase = w.phase; - if (idle != 0) // already deactivated - idle = phase & IDLE; - else { // try to deactivate + if (taken != 0) { + w.nsteals += taken; + if ((w.config & CLEAR_TLS) != 0 && + (Thread.currentThread() instanceof ForkJoinWorkerThread f)) + f.resetThreadLocals(); // (instanceof check always true) + if ((runState & (SHUTDOWN|STOP)) == 0L) + Thread.yield(); // pause before rescan + } + else if (idle == 0) { // deactivate long sp = (phase + NEXTIDLE) & LMASK, pc = ctl; U.putInt(w, WorkQueue.PHASE, phase | IDLE); for (;;) { // enqueue w.stackPred = (int)pc; long c = (pc - RC_UNIT) & UMASK | sp, ac = c & RC_MASK; if (pc == (pc = U.compareAndExchangeLong(this, CTL, pc, c))) { - if (ac == 0L) // check quiescent termination - quiescent(); + if (ac == 0L && (runState & (SHUTDOWN|STOP)) == SHUTDOWN) + quiescent(); // check quiescent termination idle = w.phase & IDLE; break; } @@ -2054,12 +2059,9 @@ public class ForkJoinPool extends AbstractExecutorService } } } - if (idle != 0 && (runState & STOP) == 0L) { - int noise = (phase ^ (phase >>> 16)) & (SPIN_WAITS - 1); - int spins = (SPIN_WAITS << 1) | noise; - Thread.yield(); // helps when oversubscribed - while ((idle = w.phase & IDLE) != 0 && --spins > 0) - Thread.onSpinWait(); + else if ((idle = phase & IDLE) != 0) { + awaitWork(w, phase); + idle = 0; } } return idle; @@ -2092,37 +2094,41 @@ public class ForkJoinPool extends AbstractExecutorService * @param w the work queue * @throws WorkerTrimmedException on idle timeout */ - private void awaitWork(WorkQueue w) { - int phase; - if (w != null && ((phase = w.phase) & IDLE) != 0) { + private void awaitWork(WorkQueue w, int phase) { + if (w != null) { int activePhase = phase + IDLE; long deadline = 0L, waitTime = (w.source == INVALID_ID) ? 0L : keepAlive; - LockSupport.setCurrentBlocker(this); for (;;) { + long d = 0L, c; int idle; Thread.interrupted(); // clear status if ((runState & STOP) != 0L) break; - int idle; - long d = 0L, c; // trim at head boolean trimmable = false; // use timed wait if trimmable - if ((int)(c = ctl) == activePhase && (c & RC_MASK) == 0L) { - trimmable = true; - long now = System.currentTimeMillis(); - if (deadline == 0L) - deadline = waitTime + now; - if ((d = deadline) - now <= TIMEOUT_SLOP) - tryTrim(w, c, activePhase); // throws if trimmed + int spins = ((short)((c = ctl) >>> TC_SHIFT) | 1) & SMASK; + if ((int)c == activePhase) { // at head + if ((c & RC_MASK) == 0L) { + long now = System.currentTimeMillis(); + if (deadline == 0L) + deadline = waitTime + now; + if ((d = deadline) - now <= TIMEOUT_SLOP) + tryTrim(w, c, activePhase); // throws if trimmed + trimmable = true; + } + spins += SPIN_WAITS; // spin more } - if ((w.phase & IDLE) == 0) + while ((idle = w.phase & IDLE) != 0 && --spins != 0) + Thread.onSpinWait(); + if (idle == 0) break; + LockSupport.setCurrentBlocker(this); w.parking = 1; // enable unpark and recheck if ((idle = w.phase & IDLE) != 0) U.park(trimmable, d); w.parking = 0; // close unpark window + LockSupport.setCurrentBlocker(null); if (idle == 0 || (w.phase & IDLE) == 0) break; } - LockSupport.setCurrentBlocker(null); } } @@ -2148,7 +2154,6 @@ public class ForkJoinPool extends AbstractExecutorService v.phase = vp; U.unpark(v.owner); } - LockSupport.setCurrentBlocker(null); throw new WorkerTrimmedException(); } } @@ -2805,8 +2810,7 @@ public class ForkJoinPool extends AbstractExecutorService a, k = slotOffset((cap - 1) & (b = q.base))); if (q.base == b && t != null && U.compareAndSetReference(a, k, t, null)) { - q.base = b + 1; - U.storeFence(); + U.putIntVolatile(q, WorkQueue.BASE, b + 1); try { t.cancel(false); } catch (Throwable ignore) { @@ -3303,7 +3307,7 @@ public class ForkJoinPool extends AbstractExecutorService if ((config & PRESET_SIZE) != 0) throw new UnsupportedOperationException("Cannot override System property"); if ((prevSize = U.getAndSetInt(this, PARALLELISM, size)) < size) - signalWork(null, 0); // trigger worker activation + signalWork(null, 0L); // trigger worker activation return prevSize; }