diff --git a/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java b/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java index 80718e4351a..adfc6d94d13 100644 --- a/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java +++ b/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java @@ -1176,7 +1176,7 @@ public class ForkJoinPool extends AbstractExecutorService // fields declared in order of their likely layout on most VMs final ForkJoinWorkerThread owner; // null if shared ForkJoinTask[] array; // the queued tasks; power of 2 size - volatile int base; // index of next slot for poll + int base; // index of next slot for poll final int config; // mode bits // fields otherwise causing more unnecessary false-sharing cache misses @@ -1241,21 +1241,23 @@ public class ForkJoinPool extends AbstractExecutorService } /** - * Pushes a task on an internal queue. Called only by owner. - * (Use pool.externalPush for external queues). + * Pushes a task. Called only by owner or if already locked * * @param task the task; caller must ensure nonnull - * @param pool the pool to signal if was previously empty or resized + * @param pool the pool to signal if was previously empty, else null + * @param unlock if not 1, phase unlock value * @throws RejectedExecutionException if array could not be resized */ - final void push(ForkJoinTask task, ForkJoinPool pool) { + final void push(ForkJoinTask task, ForkJoinPool pool, int unlock) { ForkJoinTask[] a = array; - int s = top, m, cap = (a == null) ? 0 : a.length; - if (cap <= s + 1 - base || (m = cap - 1) < 0) - growAndPush(task, pool, 1); + int b = base, s = top, cap, m; + if (a == null || (cap = a.length) <= s + 1 - b || (m = cap - 1) < 0) + growAndPush(task, pool, unlock); else { top = s + 1; U.putReferenceVolatile(a, slotOffset(m & s), task); + if (unlock != 1) // release external lock + U.putInt(this, PHASE, unlock); if (U.getReferenceAcquire(a, slotOffset(m & (s - 1))) == null && pool != null) pool.signalWork(this, s); // may have appeared empty @@ -1310,12 +1312,12 @@ public class ForkJoinPool extends AbstractExecutorService private ForkJoinTask nextLocalTask(int fifo) { ForkJoinTask t = null; ForkJoinTask[] a = array; - int s = top - 1, b, cap = (a == null) ? 0 : a.length; - if (s - (b = base) >= 0 && cap > 0) { + int b = base, s = top - 1, cap; + if (a != null && s - b >= 0 && (cap = a.length) > 0) { if (fifo == 0) { if ((t = (ForkJoinTask)U.getAndSetReference( a, slotOffset((cap - 1) & s), null)) != null) - U.putIntOpaque(this, TOP, s); + top = s; } else { do { if ((t = (ForkJoinTask)U.getAndSetReference( @@ -1325,7 +1327,7 @@ public class ForkJoinPool extends AbstractExecutorService } if (b == s) break; - while (b == (b = base)) + while (b == (b = U.getIntAcquire(this, BASE))) Thread.onSpinWait(); } while (s - b >= 0); } @@ -1337,7 +1339,10 @@ public class ForkJoinPool extends AbstractExecutorService * Takes next task, if one exists, using configured mode. */ final ForkJoinTask nextLocalTask() { - return nextLocalTask(config & FIFO); + U.loadFence(); // ensure ordering for external callers + ForkJoinTask t = nextLocalTask(config & FIFO); + U.storeFence(); + return t; } /** @@ -1405,7 +1410,7 @@ public class ForkJoinPool extends AbstractExecutorService Thread.onSpinWait(); // stalled } else if (U.compareAndSetReference(a, k, t, null)) { - base = nb; + U.putIntVolatile(this, BASE, nb); return t; } } @@ -1430,8 +1435,8 @@ public class ForkJoinPool extends AbstractExecutorService */ final void tryRemoveAndExec(ForkJoinTask task, boolean internal) { ForkJoinTask[] a = array; - int p = top, s = p - 1, cap = (a == null) ? 0 : a.length, d = p - base; - if (cap > 0) { + int b = base, p = top, s = p - 1, d = p - b, cap; + if (a != null && (cap = a.length) > 0) { for (int m = cap - 1, i = s; d > 0; --i, --d) { long k; boolean taken; ForkJoinTask t = (ForkJoinTask)U.getReference( @@ -1549,13 +1554,6 @@ public class ForkJoinPool extends AbstractExecutorService // misc - final int spinWaitPhase() { - int spins = SPIN_WAITS, f; - while (((f = phase) & IDLE) != 0 && --spins != 0) - Thread.onSpinWait(); - return f; - } - /** * Cancels all local tasks. Called only by owner. */ @@ -1823,35 +1821,27 @@ public class ForkJoinPool extends AbstractExecutorService * giving up on contention if q is nonull and signalled slot * already taken. * - * @param q, if nonnull, the WorkQueue containing signalled task - * @param qbase q's base index for the task + * @param src, if nonnull, the WorkQueue containing signalled task + * @param base src's base index for the task */ - final void signalWork(WorkQueue q, int qbase) { - int pc = parallelism; - for (long c = ctl;;) { - WorkQueue[] qs = queues; - long ac = (c + RC_UNIT) & RC_MASK, nc; - int sp = (int)c, i = sp & SMASK; - if ((short)(c >>> RC_SHIFT) >= pc) - break; - if (qs == null) - break; - if (qs.length <= i) - break; - WorkQueue w = qs[i], v = null; - if (sp == 0) { + final void signalWork(WorkQueue src, int base) { + int pc = parallelism, i, sp; // rely on caller sync for initial reads + long c = U.getLong(this, CTL); + WorkQueue[] qs = queues; + while ((short)(c >>> RC_SHIFT) < pc && qs != null && + qs.length > (i = (sp = (int)c) & SMASK)) { + WorkQueue v; long nc; + if (i == 0) { if ((short)(c >>> TC_SHIFT) >= pc) break; - nc = ((c + TC_UNIT) & TC_MASK) | ac; + v = null; + nc = ((c + TC_UNIT) & TC_MASK) | ((c + RC_UNIT) & RC_MASK); } - else if ((v = w) == null) + else if ((v = qs[i]) == null) break; else - nc = (v.stackPred & LMASK) | (c & TC_MASK) | ac; - if (q != null && q.base - qbase > 0) - break; - if (c == (c = ctl) && - c == (c = U.compareAndExchangeLong(this, CTL, c, nc))) { + nc = (v.stackPred & LMASK) | ((c + RC_UNIT) & UMASK); + if (c == (c = U.compareAndExchangeLong(this, CTL, c, nc))) { if (v == null) createWorker(); else { @@ -1861,6 +1851,9 @@ public class ForkJoinPool extends AbstractExecutorService } break; } + qs = queues; + if (src != null && src.base - base > 0) + break; } } @@ -1938,14 +1931,17 @@ public class ForkJoinPool extends AbstractExecutorService */ final void runWorker(WorkQueue w) { if (w != null) { - WorkQueue[] qs; int n; int r = w.stackPred; // seed from registerWorker int fifo = (int)config & FIFO; int src = 0, idle = 0, rescans = 0, taken = 0; - while ((runState & STOP) == 0L && (qs = queues) != null && - (n = qs.length) > 0) { + for (;;) { + WorkQueue[] qs; + long e = runState; + int n = ((qs = queues) == null) ? 0 : qs.length; int i = r, step = (r >>> 16) | 1; r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift + if ((e & STOP) != 0L || n <= 0) + break; scan: for (int j = n; j != 0; --j, i += step) { WorkQueue q; int qid; if ((q = qs[qid = i & (n - 1)]) != null) { @@ -1955,27 +1951,26 @@ public class ForkJoinPool extends AbstractExecutorService t = (ForkJoinTask)U.getReferenceAcquire( a, bp = slotOffset((cap - 1) & (b = q.base))); long np = slotOffset((nb = b + 1) & (cap - 1)); - if (q.base == b) { // else inconsistent + if (q.array == a && q.base == b && + U.getReference(a, bp) == t) { if (t == null) { - if (q.array == a) { // else resized - if (rescans > 0) // ran or stalled - break scan; - if (U.getReference(a, np) == null && - U.getReference(a, bp) == null && - (rescans >= 0 || q.top == b)) - break; - rescans = 1; // may be stalled - } + if (rescans > 0) // ran or stalled + break scan; + if (U.getReference(a, np) == null && + (rescans == 0 || q.top == b)) + break; // retry at most twice + ++rescans; // may be stalled } else if (idle != 0) { if ((idle = tryReactivate(w)) != 0) { - rescans = 1; // can't take yet - break scan; + rescans = 1; + break scan; // can't take yet } + rescans = 0; } else if (U.compareAndSetReference(a, bp, t, null)) { Object nt = U.getReference(a, np); - q.base = nb; + U.getAndSetInt(q, WorkQueue.BASE, nb); if (nt != null) // propagate signalWork(q, nb); rescans = 1; @@ -1987,47 +1982,72 @@ public class ForkJoinPool extends AbstractExecutorService } } } - if (rescans >= 0) + int phase; + if (rescans >= 0) { --rescans; + if (idle != 0) + idle = w.phase & IDLE; + } else if (idle == 0) { - deactivate(w, taken); - idle = IDLE; + if ((idle = deactivate(w, taken)) == 0) + rescans = 0; taken = 0; } - else if ((idle = awaitWork(w)) == 0) + else if ((idle = (phase = w.phase) & IDLE) == 0 || + (idle = awaitWork(w, phase)) == 0) rescans = 0; else break; } - if (taken != 0) - w.nsteals += taken; + w.nsteals += taken; } } /** - * Deactivates and enqueues worker + * Deactivates and enqueues worker, possibly backing out on signal + * contention. * * @param w the work queue * @param taken number of stolen tasks since last reactivation + * @return active status */ - private void deactivate(WorkQueue w, int taken) { + private int deactivate(WorkQueue w, int taken) { + int idle = IDLE; if (w != null) { // always true; hoist checks int phase = U.getInt(w, WorkQueue.PHASE); long sp = (phase + NEXTIDLE) & LMASK, pc = ctl, c; U.putInt(w, WorkQueue.PHASE, phase | IDLE); - do { + for (;;) { // try to enqueue w.stackPred = (int)pc; - } while (pc != (pc = U.compareAndExchangeLong( - this, CTL, pc, c = ((pc - RC_UNIT) & UMASK) | sp))); + if (pc == (pc = U.compareAndExchangeLong( + this, CTL, pc, c = ((pc - RC_UNIT) & UMASK) | sp))) + break; + if ((c & RC_MASK) < (pc & RC_MASK)) { + w.phase = phase; // back out if lost to signal + idle = 0; + break; + } + } if (taken != 0) { w.nsteals += taken; if ((w.config & CLEAR_TLS) != 0 && (Thread.currentThread() instanceof ForkJoinWorkerThread f)) f.resetThreadLocals(); // (instanceof check always true) + Thread.interrupted(); // clear status + } + if (idle != 0 && (runState & STOP) == 0L) { + if ((c & RC_MASK) == 0L) { + if (quiescent() <= 0) // check quiescent termination + idle = w.phase & IDLE; + } + else if ((idle = w.phase & IDLE) != 0) { + Thread.yield(); // reduce unproductive scanning + for (int s = SPIN_WAITS; (idle = w.phase & IDLE) != 0 && --s != 0;) + Thread.onSpinWait(); + } } - if ((c & RC_MASK) == 0L && (runState & SHUTDOWN) != 0L) - quiescent(); // may trigger quiescent termination } + return idle; } /** @@ -2055,43 +2075,47 @@ public class ForkJoinPool extends AbstractExecutorService * Awaits signal or termination. * * @param w the work queue + * @param phase w's (inactive) phase * @return 0 if now active */ - private int awaitWork(WorkQueue w) { - int idle = IDLE, phase; - if ((runState & STOP) == 0L && w != null && - (idle = (phase = w.spinWaitPhase()) & IDLE) != 0) { + private int awaitWork(WorkQueue w, int phase) { + int idle = IDLE; + if (w != null) { // always true; hoist checks int activePhase = phase + IDLE; long deadline = 0L, waitTime = (w.source == INVALID_ID) ? 0L : keepAlive; - LockSupport.setCurrentBlocker(this); - for (;;) { - Thread.interrupted(); // clear status - if ((runState & STOP) != 0L) - break; + while ((runState & STOP) == 0L) { boolean trimmable = false; // use timed wait if trimmable - long d = 0L, c; - if (((c = ctl) & RC_MASK) == 0L && (int)c == activePhase) { - long now = System.currentTimeMillis(); - if (deadline == 0L) - deadline = waitTime + now; - if (deadline - now <= TIMEOUT_SLOP) { - if (tryTrim(w, c, activePhase)) - break; - continue; // lost race to trim + long d = 0L, c; // trim or spin at head + int spins = 1; + if ((int)(c = ctl) == activePhase) { + spins = SPIN_WAITS; + if ((c & RC_MASK) == 0L) { + long now = System.currentTimeMillis(); + if (deadline == 0L) + deadline = waitTime + now; + if (deadline - now <= TIMEOUT_SLOP) { + if (tryTrim(w, c, activePhase)) + break; + continue; // lost race to trim + } + d = deadline; + trimmable = true; } - d = deadline; - trimmable = true; } - if ((idle = w.phase & IDLE) == 0) + while ((idle = w.phase & IDLE) != 0 && --spins != 0) + Thread.onSpinWait(); + if (idle == 0) break; + LockSupport.setCurrentBlocker(this); w.parking = 1; // enable unpark and recheck if ((idle = w.phase & IDLE) != 0) U.park(trimmable, d); w.parking = 0; // close unpark window + LockSupport.setCurrentBlocker(null); if (idle == 0 || (idle = w.phase & IDLE) == 0) break; + Thread.interrupted(); // clear status for next park } - LockSupport.setCurrentBlocker(null); } return idle; } @@ -2326,7 +2350,7 @@ public class ForkJoinPool extends AbstractExecutorService if ((q = qs[j = r & SMASK & (n - 1)]) != null) { for (;;) { ForkJoinTask t; ForkJoinTask[] a; - int b, cap, nb; long k; + int b, cap; long k; boolean eligible = false; if ((a = q.array) == null || (cap = a.length) <= 0) break; @@ -2564,23 +2588,12 @@ public class ForkJoinPool extends AbstractExecutorService } ForkJoinTask[] a; if (q != null && (lock = q.tryLockPhase()) != 1) { - int unlock = lock + NEXTIDLE; - int s = q.top, cap = ((a = q.array) == null) ? 0 : a.length; - int m = (cap <= s + 1 - q.base) ? -1 : cap - 1; + int unlock = lock + NEXTIDLE; if (rejectOnShutdown && (runState & SHUTDOWN) != 0L) { q.phase = unlock; - break; // check while q lock held - } - else if (m < 0) - q.growAndPush(task, this, unlock); - else { - q.top = s + 1; - a[m & s] = task; - q.phase = unlock; - if (U.getReferenceAcquire(a, slotOffset(m & (s - 1))) == null && - signalIfEmpty) - signalWork(q, s); + break; // check while q lock held } + q.push(task, signalIfEmpty ? this : null, unlock); return; } } @@ -2591,7 +2604,7 @@ public class ForkJoinPool extends AbstractExecutorService if (((t = JLA.currentCarrierThread()) instanceof ForkJoinWorkerThread) && (q = (wt = (ForkJoinWorkerThread)t).workQueue) != null && wt.pool == this) - q.push(task, signalIfEmpty ? this : null); + q.push(task, signalIfEmpty ? this : null, 1); else externalPush(task, signalIfEmpty, true); } @@ -3079,8 +3092,8 @@ public class ForkJoinPool extends AbstractExecutorService * Package-private access to commonPool overriding zero parallelism */ static ForkJoinPool asyncCommonPool() { - ForkJoinPool cp; int p; - if ((p = (cp = common).parallelism) == 0) + ForkJoinPool cp; + if ((cp = common).parallelism == 0) U.compareAndSetInt(cp, PARALLELISM, 0, 2); return cp; } @@ -3856,7 +3869,7 @@ public class ForkJoinPool extends AbstractExecutorService * the pool. This method may be useful for tuning task * granularities.The returned count does not include scheduled * tasks that are not yet ready to execute, which are reported - * separately by method {@link getDelayedTaskCount}. + * separately by method {@link #getDelayedTaskCount}. * * @return the number of queued tasks * @see ForkJoinWorkerThread#getQueuedTaskCount() diff --git a/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java b/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java index 702687a6d8e..b4b9ecc0c9b 100644 --- a/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java +++ b/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java @@ -642,7 +642,7 @@ public abstract class ForkJoinTask implements Future, Serializable { public final ForkJoinTask fork() { Thread t; ForkJoinWorkerThread wt; if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) - ((wt = (ForkJoinWorkerThread)t).workQueue).push(this, wt.pool); + ((wt = (ForkJoinWorkerThread)t).workQueue).push(this, wt.pool, 1); else ForkJoinPool.common.externalPush(this, true, false); return this;