diff --git a/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java b/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java index 479e874a644..86b3ed61d73 100644 --- a/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java +++ b/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java @@ -1078,6 +1078,7 @@ public class ForkJoinPool extends AbstractExecutorService static final int DROPPED = 1 << 16; // removed from ctl counts static final int UNCOMPENSATE = 1 << 16; // tryCompensate return static final int IDLE = 1 << 16; // phase seqlock/version count + static final int NEXTIDLE = IDLE << 1; // next IDLE phase bit static final int MIN_QUEUES_SIZE = 1 << 4; // ensure external slots /* @@ -1188,33 +1189,26 @@ public class ForkJoinPool extends AbstractExecutorService @jdk.internal.vm.annotation.Contended("w") volatile int parking; // nonzero if parked in awaitWork @jdk.internal.vm.annotation.Contended("w") - volatile int source; // source queue id (or DROPPED) + int source; // source queue id (or DROPPED) @jdk.internal.vm.annotation.Contended("w") int nsteals; // number of steals from other queues - // Support for atomic operations + // Support for atomic operations (also used by ForkJoinPool) private static final Unsafe U; - private static final long PHASE; - private static final long BASE; - private static final long TOP; - private static final long ARRAY; + static final long PHASE; + static final long BASE; + static final long TOP; + static final long ARRAY; - final void updateBase(int v) { - U.putIntVolatile(this, BASE, v); - } - final void updateTop(int v) { - U.putIntOpaque(this, TOP, v); - } - final void updateArray(ForkJoinTask[] a) { - U.getAndSetReference(this, ARRAY, a); - } - final void unlockPhase() { - U.getAndAddInt(this, PHASE, IDLE); - } - final boolean tryLockPhase() { // seqlock acquire + /** + * SeqLock acquire for external queues. + * @return 1 if cannot acquire lock, else current phase + */ + final int tryLockPhase() { int p; return (((p = phase) & IDLE) != 0 && - U.compareAndSetInt(this, PHASE, p, p + IDLE)); + (p == U.compareAndExchangeInt(this, PHASE, p, p + IDLE))) ? + p : 1; } /** @@ -1248,61 +1242,57 @@ public class ForkJoinPool extends AbstractExecutorService /** * Pushes a task. Called only by owner or if already locked * - * @param task the task; no-op if null + * @param task the task; caller must ensure nonnull * @param pool the pool to signal if was previously empty, else null - * @param internal if caller owns this queue + * @param unlock if not 1, phase unlock value * @throws RejectedExecutionException if array could not be resized */ - final void push(ForkJoinTask task, ForkJoinPool pool, boolean internal) { - int s = top, b = base, m, cap, room; ForkJoinTask[] a; - if ((a = array) != null && (cap = a.length) > 0) { // else disabled - if ((room = (m = cap - 1) - (s - b)) >= 0) { - top = s + 1; - long pos = slotOffset(m & s); - if (!internal) - U.putReference(a, pos, task); // inside lock - else - U.getAndSetReference(a, pos, task); // fully fenced - if (room == 0) - growArray(a, cap, s); - } - if (!internal) - unlockPhase(); - if (room < 0) - throw new RejectedExecutionException("Queue capacity exceeded"); - if (pool != null && - (room == 0 || - U.getReferenceAcquire(a, slotOffset(m & (s - 1))) == null)) - pool.signalWork(this, s); // may have appeared empty + final void push(ForkJoinTask task, ForkJoinPool pool, int unlock) { + ForkJoinTask[] a = array; + int b = base, s = top, m; + if (a != null && + (a.length > s + 1 - b || (a = growArray()) != null) && + (m = a.length - 1) >= 0) { // else rejected or disabled + top = s + 1; + U.putReferenceVolatile(a, slotOffset(m & s), task); + if (unlock != 1) // release external lock + U.putInt(this, PHASE, unlock); + if (U.getReferenceAcquire(a, slotOffset(m & (s - 1))) == null && + pool != null) + pool.signalWork(this, s); // may have appeared empty } } /** * Resizes the queue array unless out of memory. - * @param a old array - * @param cap old array capacity - * @param s current top + * @return new array, or throw on OOME */ - private void growArray(ForkJoinTask[] a, int cap, int s) { - int newCap = (cap >= 1 << 16) ? cap << 1 : cap << 2; - ForkJoinTask[] newArray = null; - if (a != null && a.length == cap && cap > 0 && newCap > 0) { + private ForkJoinTask[] growArray() { + ForkJoinTask[] a; int cap, newCap; + if ((a = array) != null && (cap = a.length) > 0 && + (newCap = (cap >= 1 << 16) ? cap << 1 : cap << 2) > 0) { + ForkJoinTask[] newArray = null; try { newArray = new ForkJoinTask[newCap]; } catch (OutOfMemoryError ex) { } - if (newArray != null) { // else throw on next push + if (newArray != null) { int mask = cap - 1, newMask = newCap - 1; - for (int k = s, j = cap; j > 0; --j, --k) { + for (int k = top - 1, j = cap; j > 0; --j, --k) { ForkJoinTask u; // poll old, push to new if ((u = (ForkJoinTask)U.getAndSetReference( a, slotOffset(k & mask), null)) == null) break; // lost to pollers newArray[k & newMask] = u; } - updateArray(newArray); // fully fenced + U.putReferenceVolatile(this, ARRAY, newArray); + return newArray; } } + int f = phase; // unlock if externally locked + if ((f & (IDLE | 1)) == 0) + phase = f + IDLE; + throw new RejectedExecutionException("Queue capacity exceeded"); } /** @@ -1314,7 +1304,7 @@ public class ForkJoinPool extends AbstractExecutorService if ((a = array) != null && (cap = a.length) > 0 && U.getReference(a, k = slotOffset((cap - 1) & s)) != null && (t = (ForkJoinTask)U.getAndSetReference(a, k, null)) != null) - updateTop(s); + U.putIntOpaque(this, TOP, s); return t; } @@ -1336,7 +1326,7 @@ public class ForkJoinPool extends AbstractExecutorService } else if ((t = (ForkJoinTask) U.getAndSetReference(a, k, null)) != null) { - updateBase(nb); + U.putIntOpaque(this, BASE, nb); break; } else @@ -1361,16 +1351,16 @@ public class ForkJoinPool extends AbstractExecutorService final boolean tryUnpush(ForkJoinTask task, boolean internal) { boolean taken = false; ForkJoinTask[] a = array; - int p = top, s = p - 1, cap; long k; + int p = top, s = p - 1, lock = 0, cap; long k; if (a != null && (cap = a.length) > 0 && U.getReference(a, k = slotOffset((cap - 1) & s)) == task && - (internal || tryLockPhase())) { + (internal || (lock = tryLockPhase()) != 1)) { if (top == p && U.compareAndSetReference(a, k, task, null)) { taken = true; - updateTop(s); + U.putIntOpaque(this, TOP, s); } if (!internal) - unlockPhase(); + U.putIntRelease(this, PHASE, lock + NEXTIDLE); } return taken; } @@ -1416,7 +1406,7 @@ public class ForkJoinPool extends AbstractExecutorService Thread.onSpinWait(); // stalled } else if (U.compareAndSetReference(a, k, t, null)) { - updateBase(nb); + U.putIntVolatile(this, BASE, nb); return t; } } @@ -1474,25 +1464,26 @@ public class ForkJoinPool extends AbstractExecutorService if (t == null) break; if (t == task) { - if (!internal && !tryLockPhase()) + int lock = 0; + if (!internal && (lock = tryLockPhase()) == 1) break; // fail if locked if (taken = (top == p && U.compareAndSetReference(a, k, task, null))) { if (i == s) // act as pop - updateTop(s); + top = s; else if (i == base) // act as poll - updateBase(i + 1); + base = i + 1; else { // swap with top U.putReferenceVolatile( a, k, (ForkJoinTask) U.getAndSetReference( a, slotOffset(s & m), null)); - updateTop(s); + top = s; } } if (!internal) - unlockPhase(); + U.putIntRelease(this, PHASE, lock + NEXTIDLE); if (taken) task.doExec(); break; @@ -1531,14 +1522,15 @@ public class ForkJoinPool extends AbstractExecutorService if ((f = f.completer) == null || --steps == 0) break outer; } - if (!internal && !tryLockPhase()) + int lock = 0; + if (!internal && (lock = tryLockPhase()) == 1) break; if (taken = (top == p && U.compareAndSetReference(a, k, t, null))) - updateTop(s); + top = s; if (!internal) - unlockPhase(); + U.putIntRelease(this, PHASE, lock + NEXTIDLE); if (!taken) break; t.doExec(); @@ -1573,7 +1565,7 @@ public class ForkJoinPool extends AbstractExecutorService break; if (base == b && t != null && U.compareAndSetReference(a, k, t, null)) { - updateBase(b + 1); + base = b + 1; t.doExec(); } } @@ -1672,46 +1664,15 @@ public class ForkJoinPool extends AbstractExecutorService private static final Object POOLIDS_BASE; private static final long POOLIDS; - private boolean compareAndSetCtl(long c, long v) { - return U.compareAndSetLong(this, CTL, c, v); - } - private long compareAndExchangeCtl(long c, long v) { - return U.compareAndExchangeLong(this, CTL, c, v); - } - private long getAndAddCtl(long v) { - return U.getAndAddLong(this, CTL, v); - } - private long incrementThreadIds() { - return U.getAndAddLong(this, THREADIDS, 1L); - } - private static int getAndAddPoolIds(int x) { - return U.getAndAddInt(POOLIDS_BASE, POOLIDS, x); - } - private int getAndSetParallelism(int v) { - return U.getAndSetInt(this, PARALLELISM, v); - } - private int getParallelismOpaque() { - return U.getIntOpaque(this, PARALLELISM); - } - private CountDownLatch cmpExTerminationSignal(CountDownLatch x) { - return (CountDownLatch) - U.compareAndExchangeReference(this, TERMINATION, null, x); - } + // runState locking operations - // runState operations - - private long getAndBitwiseOrRunState(long v) { // for status bits - return U.getAndBitwiseOrLong(this, RUNSTATE, v); - } - private boolean casRunState(long c, long v) { - return U.compareAndSetLong(this, RUNSTATE, c, v); - } private void unlockRunState() { // increment lock bit U.getAndAddLong(this, RUNSTATE, RS_LOCK); } private long lockRunState() { // lock and return current state long s, u; // locked when RS_LOCK set - if (((s = runState) & RS_LOCK) == 0L && casRunState(s, u = s + RS_LOCK)) + if (((s = runState) & RS_LOCK) == 0L && + U.compareAndSetLong(this, RUNSTATE, s, u = s + RS_LOCK)) return u; else return spinLockRunState(); @@ -1720,7 +1681,7 @@ public class ForkJoinPool extends AbstractExecutorService for (int waits = 0;;) { long s, u; if (((s = runState) & RS_LOCK) == 0L) { - if (casRunState(s, u = s + RS_LOCK)) + if (U.compareAndSetLong(this, RUNSTATE, s, u = s + RS_LOCK)) return u; waits = 0; } else if (waits < SPIN_WAITS) { @@ -1775,7 +1736,7 @@ public class ForkJoinPool extends AbstractExecutorService */ final String nextWorkerThreadName() { String prefix = workerNamePrefix; - long tid = incrementThreadIds() + 1L; + long tid = U.getAndAddLong(this, THREADIDS, 1L) + 1L; if (prefix == null) // commonPool has no prefix prefix = "ForkJoinPool.commonPool-worker-"; return prefix.concat(Long.toString(tid)); @@ -1846,7 +1807,7 @@ public class ForkJoinPool extends AbstractExecutorService releaseWaiters(); // ensure released if (w == null || w.source != DROPPED) { long c = ctl; // decrement counts - do {} while (c != (c = compareAndExchangeCtl( + do {} while (c != (c = U.compareAndExchangeLong(this, CTL, c, ((RC_MASK & (c - RC_UNIT)) | (TC_MASK & (c - TC_UNIT)) | (LMASK & c))))); @@ -1875,14 +1836,16 @@ public class ForkJoinPool extends AbstractExecutorService /** * Releases an idle worker, or creates one if not enough exist, - * giving up q is nonull and signalled slot already taken. + * giving up on contention if q is nonull and signalled slot + * already taken. * * @param q, if nonnull, the WorkQueue containing signalled task * @param qbase q's base index for the task */ final void signalWork(WorkQueue q, int qbase) { int pc = parallelism; - for (long c = ctl;;) { + for (;;) { + long c = ctl; WorkQueue[] qs = queues; long ac = (c + RC_UNIT) & RC_MASK, nc; int sp = (int)c, i = sp & SMASK; @@ -1902,7 +1865,7 @@ public class ForkJoinPool extends AbstractExecutorService break; else nc = (v.stackPred & LMASK) | (c & TC_MASK) | ac; - if (c == (c = compareAndExchangeCtl(c, nc))) { + if (U.compareAndSetLong(this, CTL, c, nc)) { if (v == null) createWorker(); else { @@ -1926,7 +1889,7 @@ public class ForkJoinPool extends AbstractExecutorService if ((sp = (int)c) == 0 || (qs = queues) == null || qs.length <= (i = sp & SMASK) || (v = qs[i]) == null) break; - if (c == (c = compareAndExchangeCtl( + if (c == (c = U.compareAndExchangeLong(this, CTL, c, ((UMASK & (c + RC_UNIT)) | (c & TC_MASK) | (v.stackPred & LMASK))))) { v.phase = sp; @@ -1972,7 +1935,8 @@ public class ForkJoinPool extends AbstractExecutorService return 0; else if ((ds = delayScheduler) != null && !ds.canShutDown()) return 0; - else if (compareAndSetCtl(c, c) && casRunState(e, e | STOP)) + else if (U.compareAndSetLong(this, CTL, c, c) && + U.compareAndSetLong(this, RUNSTATE, e, e | STOP)) return 1; // enable termination else break; // restart @@ -1991,6 +1955,7 @@ public class ForkJoinPool extends AbstractExecutorService WorkQueue[] qs; int r = w.stackPred; // seed from registerWorker int fifo = (int)config & FIFO, rescans = 0, inactive = 0, taken = 0, n; + int src = -1; // last queue taken from while ((runState & STOP) == 0L && (qs = queues) != null && (n = qs.length) > 0) { int i = r, step = (r >>> 16) | 1; @@ -2024,12 +1989,14 @@ public class ForkJoinPool extends AbstractExecutorService } } else if (U.compareAndSetReference(a, bp, t, null)) { - q.base = nb; - w.source = qid; - rescans = 1; - ++taken; - if (U.getReferenceAcquire(a, np) != null) + Object nt = U.getReference(a, np); + U.getAndSetInt(q, WorkQueue.BASE, nb); + if (nt != null) signalWork(q, nb); // propagate + ++taken; + rescans = 1; + if (src != qid) + w.source = src = qid; w.topLevelExec(t, fifo); } } @@ -2042,8 +2009,10 @@ public class ForkJoinPool extends AbstractExecutorService if ((inactive = deactivate(w, taken)) != 0) taken = 0; } - else if (awaitWork(w) == 0) + else if (awaitWork(w) == 0) { inactive = rescans = 0; + src = -1; + } else break; } @@ -2059,12 +2028,13 @@ public class ForkJoinPool extends AbstractExecutorService */ private int deactivate(WorkQueue w, int taken) { int inactive = 0, phase; - if (w != null && (inactive = (phase = w.phase) & IDLE) == 0) { + if (w != null && (inactive = // plain accesses until CAS + (phase = U.getInt(w, WorkQueue.PHASE)) & IDLE) == 0) { long sp = (phase + (IDLE << 1)) & LMASK, pc, c; - w.phase = phase | IDLE; - w.stackPred = (int)(pc = ctl); // set ctl stack link - if (!compareAndSetCtl( // try to enqueue - pc, c = ((pc - RC_UNIT) & UMASK) | sp)) + U.putInt(w, WorkQueue.PHASE, phase | IDLE); + w.stackPred = (int)(pc = ctl); // try to enqueue + if (!U.compareAndSetLong(this, CTL, pc, + c = ((pc - RC_UNIT) & UMASK) | sp)) w.phase = phase; // back out on contention else { if (taken != 0) { @@ -2098,7 +2068,8 @@ public class ForkJoinPool extends AbstractExecutorService int sp = w.stackPred, phase, activePhase; long c; if ((inactive = (phase = w.phase) & IDLE) != 0 && (int)(c = ctl) == (activePhase = phase + IDLE) && - compareAndSetCtl(c, (sp & LMASK) | ((c + RC_UNIT) & UMASK))) { + U.compareAndSetLong(this, CTL, c, + (sp & LMASK) | ((c + RC_UNIT) & UMASK))) { w.phase = activePhase; inactive = 0; } @@ -2159,12 +2130,12 @@ public class ForkJoinPool extends AbstractExecutorService int vp, i; WorkQueue[] vs; WorkQueue v; long nc = ((w.stackPred & LMASK) | ((RC_MASK & c) | (TC_MASK & (c - TC_UNIT)))); - if (compareAndSetCtl(c, nc)) { + if (U.compareAndSetLong(this, CTL, c, nc)) { w.source = DROPPED; w.phase = activePhase; if ((vp = (int)nc) != 0 && (vs = queues) != null && vs.length > (i = vp & SMASK) && (v = vs[i]) != null && - compareAndSetCtl( // try to wake up next waiter + U.compareAndSetLong(this, CTL, // try to wake up next waiter nc, ((v.stackPred & LMASK) | ((UMASK & (nc + RC_UNIT)) | (nc & TC_MASK))))) { v.source = INVALID_ID; // enable cascaded timeouts @@ -2227,7 +2198,8 @@ public class ForkJoinPool extends AbstractExecutorService WorkQueue[] qs; WorkQueue v; int i; if ((qs = queues) != null && qs.length > (i = sp & SMASK) && (v = qs[i]) != null && - compareAndSetCtl(c, (c & UMASK) | (v.stackPred & LMASK))) { + U.compareAndSetLong(this, CTL, c, + (c & UMASK) | (v.stackPred & LMASK))) { v.phase = sp; if (v.parking != 0) U.unpark(v.owner); @@ -2235,17 +2207,18 @@ public class ForkJoinPool extends AbstractExecutorService } } else if (active > minActive && total >= pc) { // reduce active workers - if (compareAndSetCtl(c, ((c - RC_UNIT) & RC_MASK) | (c & ~RC_MASK))) + if (U.compareAndSetLong(this, CTL, c, + ((c - RC_UNIT) & RC_MASK) | (c & ~RC_MASK))) stat = UNCOMPENSATE; } else if (total < maxTotal && total < MAX_CAP) { // try to expand pool long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK); if ((runState & STOP) != 0L) // terminating stat = 0; - else if (compareAndSetCtl(c, nc)) + else if (U.compareAndSetLong(this, CTL, c, nc)) stat = createWorker() ? UNCOMPENSATE : 0; } - else if (!compareAndSetCtl(c, c)) // validate + else if (!U.compareAndSetLong(this, CTL, c, c)) // validate ; else if ((sat = saturate) != null && sat.test(this)) stat = 0; @@ -2259,7 +2232,7 @@ public class ForkJoinPool extends AbstractExecutorService * Readjusts RC count; called from ForkJoinTask after blocking. */ final void uncompensate() { - getAndAddCtl(RC_UNIT); + U.getAndAddLong(this, CTL, RC_UNIT); } /** @@ -2330,7 +2303,7 @@ public class ForkJoinPool extends AbstractExecutorService } if (U.compareAndSetReference(a, k, t, null)) { q.base = b + 1; - w.source = j; // volatile write + w.source = j; t.doExec(); w.source = wsrc; rescan = true; // restart at index r @@ -2401,7 +2374,7 @@ public class ForkJoinPool extends AbstractExecutorService if (eligible) { if (U.compareAndSetReference( a, k, t, null)) { - q.updateBase(b + 1); + q.base = b + 1; t.doExec(); locals = rescan = true; break scan; @@ -2484,7 +2457,7 @@ public class ForkJoinPool extends AbstractExecutorService } if (U.compareAndSetReference(a, k, t, null)) { q.base = nb; - w.source = j; // volatile write + w.source = j; t.doExec(); w.source = wsrc; rescan = locals = true; @@ -2595,14 +2568,15 @@ public class ForkJoinPool extends AbstractExecutorService * @param rejectOnShutdown true if RejectedExecutionException * should be thrown when shutdown */ - final WorkQueue externalSubmissionQueue(boolean rejectOnShutdown) { + final void externalPush(ForkJoinTask task, boolean signalIfEmpty, + boolean rejectOnShutdown) { int r; if ((r = ThreadLocalRandom.getProbe()) == 0) { ThreadLocalRandom.localInit(); // initialize caller's probe r = ThreadLocalRandom.getProbe(); } for (;;) { - WorkQueue q; WorkQueue[] qs; int n, id, i; + WorkQueue q; WorkQueue[] qs; int n, id, i, lock; if ((qs = queues) == null || (n = qs.length) <= 0) break; if ((q = qs[i = (id = r & EXTERNAL_ID_MASK) & (n - 1)]) == null) { @@ -2612,31 +2586,27 @@ public class ForkJoinPool extends AbstractExecutorService q = qs[i] = newq; // else lost race to install unlockRunState(); } - if (q != null && q.tryLockPhase()) { + if (q != null && (lock = q.tryLockPhase()) != 1) { + int unlock = lock + NEXTIDLE; if (rejectOnShutdown && (runState & SHUTDOWN) != 0L) { - q.unlockPhase(); // check while q lock held - break; + q.phase = unlock; + break; // check while q lock held } - return q; + q.push(task, signalIfEmpty ? this : null, unlock); + return; } r = ThreadLocalRandom.advanceProbe(r); // move } throw new RejectedExecutionException(); } - private ForkJoinTask poolSubmit(boolean signalIfEmpty, ForkJoinTask task) { - Thread t; ForkJoinWorkerThread wt; WorkQueue q; boolean internal; + private void poolSubmit(ForkJoinTask task, boolean signalIfEmpty) { + Thread t; ForkJoinWorkerThread wt; if (((t = JLA.currentCarrierThread()) instanceof ForkJoinWorkerThread) && - (wt = (ForkJoinWorkerThread)t).pool == this) { - internal = true; - q = wt.workQueue; - } - else { // find and lock queue - internal = false; - q = externalSubmissionQueue(true); - } - q.push(task, signalIfEmpty ? this : null, internal); - return task; + (wt = (ForkJoinWorkerThread)t).pool == this) + wt.workQueue.push(task, signalIfEmpty ? this : null, 1); + else + externalPush(task, signalIfEmpty, true); } /** @@ -2751,7 +2721,8 @@ public class ForkJoinPool extends AbstractExecutorService else if ((e & STOP) != 0L) now = true; else if (now) { - if (((ps = getAndBitwiseOrRunState(SHUTDOWN|STOP) & STOP)) == 0L) { + if (((ps = U.getAndBitwiseOrLong(this, RUNSTATE, SHUTDOWN | STOP)) & + STOP) == 0L) { if ((ps & RS_LOCK) != 0L) { spinLockRunState(); // ensure queues array stable after stop unlockRunState(); @@ -2762,7 +2733,7 @@ public class ForkJoinPool extends AbstractExecutorService else if ((isShutdown = (e & SHUTDOWN)) != 0L || enable) { long quiet; DelayScheduler ds; if (isShutdown == 0L) - getAndBitwiseOrRunState(SHUTDOWN); + U.getAndBitwiseOrLong(this, RUNSTATE, SHUTDOWN); if ((quiet = quiescent()) > 0) now = true; else if (quiet == 0 && (ds = delayScheduler) != null) @@ -2778,7 +2749,7 @@ public class ForkJoinPool extends AbstractExecutorService if (((e = runState) & CLEANED) == 0L) { boolean clean = cleanQueues(); if (((e = runState) & CLEANED) == 0L && clean) - e = getAndBitwiseOrRunState(CLEANED) | CLEANED; + e = U.getAndBitwiseOrLong(this, RUNSTATE, CLEANED) | CLEANED; } if ((e & TERMINATED) != 0L) break; @@ -2788,7 +2759,8 @@ public class ForkJoinPool extends AbstractExecutorService break; if ((e & CLEANED) != 0L) { e |= TERMINATED; - if ((getAndBitwiseOrRunState(TERMINATED) & TERMINATED) == 0L) { + if ((U.getAndBitwiseOrLong(this, RUNSTATE, TERMINATED) & + TERMINATED) == 0L) { CountDownLatch done; SharedThreadContainer ctr; if ((done = termination) != null) done.countDown(); @@ -2826,7 +2798,7 @@ public class ForkJoinPool extends AbstractExecutorService a, k = slotOffset((cap - 1) & (b = q.base))); if (q.base == b && t != null && U.compareAndSetReference(a, k, t, null)) { - q.updateBase(b + 1); + q.base = b + 1; try { t.cancel(false); } catch (Throwable ignore) { @@ -2867,7 +2839,8 @@ public class ForkJoinPool extends AbstractExecutorService private CountDownLatch terminationSignal() { CountDownLatch signal, s, u; if ((signal = termination) == null) - signal = ((u = cmpExTerminationSignal( + signal = ((u = (CountDownLatch)U.compareAndExchangeReference( + this, TERMINATION, null, s = new CountDownLatch(1))) == null) ? s : u; return signal; } @@ -3035,7 +3008,7 @@ public class ForkJoinPool extends AbstractExecutorService (((long)maxSpares) << TC_SHIFT) | (((long)minAvail) << RC_SHIFT)); this.queues = new WorkQueue[size]; - String pid = Integer.toString(getAndAddPoolIds(1) + 1); + String pid = Integer.toString(U.getAndAddInt(POOLIDS_BASE, POOLIDS, 1) + 1); String name = "ForkJoinPool-" + pid; this.poolName = name; this.workerNamePrefix = name + "-worker-"; @@ -3145,7 +3118,7 @@ public class ForkJoinPool extends AbstractExecutorService * scheduled for execution */ public T invoke(ForkJoinTask task) { - poolSubmit(true, Objects.requireNonNull(task)); + poolSubmit(Objects.requireNonNull(task), true); try { return task.join(); } catch (RuntimeException | Error unchecked) { @@ -3164,7 +3137,7 @@ public class ForkJoinPool extends AbstractExecutorService * scheduled for execution */ public void execute(ForkJoinTask task) { - poolSubmit(true, Objects.requireNonNull(task)); + poolSubmit(Objects.requireNonNull(task), true); } // AbstractExecutorService methods @@ -3177,9 +3150,9 @@ public class ForkJoinPool extends AbstractExecutorService @Override @SuppressWarnings("unchecked") public void execute(Runnable task) { - poolSubmit(true, (Objects.requireNonNull(task) instanceof ForkJoinTask) + poolSubmit((Objects.requireNonNull(task) instanceof ForkJoinTask) ? (ForkJoinTask) task // avoid re-wrap - : new ForkJoinTask.RunnableExecuteAction(task)); + : new ForkJoinTask.RunnableExecuteAction(task), true); } /** @@ -3197,7 +3170,8 @@ public class ForkJoinPool extends AbstractExecutorService * scheduled for execution */ public ForkJoinTask submit(ForkJoinTask task) { - return poolSubmit(true, Objects.requireNonNull(task)); + poolSubmit(Objects.requireNonNull(task), true); + return task; } /** @@ -3208,11 +3182,12 @@ public class ForkJoinPool extends AbstractExecutorService @Override public ForkJoinTask submit(Callable task) { Objects.requireNonNull(task); - return poolSubmit( - true, + ForkJoinTask t = (Thread.currentThread() instanceof ForkJoinWorkerThread) ? new ForkJoinTask.AdaptedCallable(task) : - new ForkJoinTask.AdaptedInterruptibleCallable(task)); + new ForkJoinTask.AdaptedInterruptibleCallable(task); + poolSubmit(t, true); + return t; } /** @@ -3223,11 +3198,12 @@ public class ForkJoinPool extends AbstractExecutorService @Override public ForkJoinTask submit(Runnable task, T result) { Objects.requireNonNull(task); - return poolSubmit( - true, + ForkJoinTask t = (Thread.currentThread() instanceof ForkJoinWorkerThread) ? new ForkJoinTask.AdaptedRunnable(task, result) : - new ForkJoinTask.AdaptedInterruptibleRunnable(task, result)); + new ForkJoinTask.AdaptedInterruptibleRunnable(task, result); + poolSubmit(t, true); + return t; } /** @@ -3239,13 +3215,14 @@ public class ForkJoinPool extends AbstractExecutorService @SuppressWarnings("unchecked") public ForkJoinTask submit(Runnable task) { Objects.requireNonNull(task); - return poolSubmit( - true, + ForkJoinTask t = (task instanceof ForkJoinTask) ? (ForkJoinTask) task : // avoid re-wrap ((Thread.currentThread() instanceof ForkJoinWorkerThread) ? new ForkJoinTask.AdaptedRunnable(task, null) : - new ForkJoinTask.AdaptedInterruptibleRunnable(task, null))); + new ForkJoinTask.AdaptedInterruptibleRunnable(task, null)); + poolSubmit(t, true); + return t; } /** @@ -3267,7 +3244,7 @@ public class ForkJoinPool extends AbstractExecutorService */ public ForkJoinTask externalSubmit(ForkJoinTask task) { Objects.requireNonNull(task); - externalSubmissionQueue(true).push(task, this, false); + externalPush(task, true, true); return task; } @@ -3288,7 +3265,8 @@ public class ForkJoinPool extends AbstractExecutorService * @since 19 */ public ForkJoinTask lazySubmit(ForkJoinTask task) { - return poolSubmit(false, Objects.requireNonNull(task)); + poolSubmit(Objects.requireNonNull(task), false); + return task; } /** @@ -3316,7 +3294,7 @@ public class ForkJoinPool extends AbstractExecutorService throw new IllegalArgumentException(); if ((config & PRESET_SIZE) != 0) throw new UnsupportedOperationException("Cannot override System property"); - if ((prevSize = getAndSetParallelism(size)) < size) + if ((prevSize = U.getAndSetInt(this, PARALLELISM, size)) < size) signalWork(null, 0); // trigger worker activation return prevSize; } @@ -3351,7 +3329,7 @@ public class ForkJoinPool extends AbstractExecutorService for (Callable t : tasks) { ForkJoinTask f = ForkJoinTask.adapt(t); futures.add(f); - poolSubmit(true, f); + poolSubmit(f, true); } for (int i = futures.size() - 1; i >= 0; --i) ((ForkJoinTask)futures.get(i)).quietlyJoin(); @@ -3374,7 +3352,7 @@ public class ForkJoinPool extends AbstractExecutorService for (Callable t : tasks) { ForkJoinTask f = ForkJoinTask.adaptInterruptible(t); futures.add(f); - poolSubmit(true, f); + poolSubmit(f, true); } for (int i = futures.size() - 1; i >= 0; --i) ((ForkJoinTask)futures.get(i)) @@ -3490,7 +3468,7 @@ public class ForkJoinPool extends AbstractExecutorService * elapsed */ final void executeEnabledScheduledTask(ScheduledForkJoinTask task) { - externalSubmissionQueue(false).push(task, this, false); + externalPush(task, true, false); } /** @@ -3731,7 +3709,8 @@ public class ForkJoinPool extends AbstractExecutorService onTimeout.task = task = new ForkJoinTask.CallableWithTimeout(callable, timeoutTask); scheduleDelayedTask(timeoutTask); - return poolSubmit(true, task); + poolSubmit(task, true); + return task; } /** @@ -3778,7 +3757,7 @@ public class ForkJoinPool extends AbstractExecutorService * @return the targeted parallelism level of this pool */ public int getParallelism() { - return Math.max(getParallelismOpaque(), 1); + return Math.max(U.getIntOpaque(this, PARALLELISM), 1); } /** @@ -4330,7 +4309,7 @@ public class ForkJoinPool extends AbstractExecutorService done = blocker.block(); } finally { if (comp > 0) - getAndAddCtl(RC_UNIT); + U.getAndAddLong(this, CTL, RC_UNIT); } if (done) break; @@ -4357,7 +4336,7 @@ public class ForkJoinPool extends AbstractExecutorService */ void endCompensatedBlock(long post) { if (post > 0L) { - getAndAddCtl(post); + U.getAndAddLong(this, CTL, post); } } diff --git a/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java b/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java index 137cac45ed0..b4b9ecc0c9b 100644 --- a/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java +++ b/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java @@ -641,15 +641,10 @@ public abstract class ForkJoinTask implements Future, Serializable { */ public final ForkJoinTask fork() { Thread t; ForkJoinWorkerThread wt; - ForkJoinPool p; ForkJoinPool.WorkQueue q; boolean internal; - if (internal = - (t = Thread.currentThread()) instanceof ForkJoinWorkerThread) { - q = (wt = (ForkJoinWorkerThread)t).workQueue; - p = wt.pool; - } + if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) + ((wt = (ForkJoinWorkerThread)t).workQueue).push(this, wt.pool, 1); else - q = (p = ForkJoinPool.common).externalSubmissionQueue(false); - q.push(this, p, internal); + ForkJoinPool.common.externalPush(this, true, false); return this; }