From ac619505e8d60bf9ac3f3cece4b9d0833b0e8e76 Mon Sep 17 00:00:00 2001
From: Doug Lea
Date: Tue, 23 Dec 2025 14:54:08 -0500
Subject: [PATCH] Check reworked ordering control
---
.../java/util/concurrent/ForkJoinPool.java | 343 ++++++++----------
.../java/util/concurrent/ForkJoinTask.java | 11 +-
2 files changed, 164 insertions(+), 190 deletions(-)
diff --git a/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java b/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java
index 479e874a644..86b3ed61d73 100644
--- a/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java
+++ b/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java
@@ -1078,6 +1078,7 @@ public class ForkJoinPool extends AbstractExecutorService
static final int DROPPED = 1 << 16; // removed from ctl counts
static final int UNCOMPENSATE = 1 << 16; // tryCompensate return
static final int IDLE = 1 << 16; // phase seqlock/version count
+ static final int NEXTIDLE = IDLE << 1; // next IDLE phase bit
static final int MIN_QUEUES_SIZE = 1 << 4; // ensure external slots
/*
@@ -1188,33 +1189,26 @@ public class ForkJoinPool extends AbstractExecutorService
@jdk.internal.vm.annotation.Contended("w")
volatile int parking; // nonzero if parked in awaitWork
@jdk.internal.vm.annotation.Contended("w")
- volatile int source; // source queue id (or DROPPED)
+ int source; // source queue id (or DROPPED)
@jdk.internal.vm.annotation.Contended("w")
int nsteals; // number of steals from other queues
- // Support for atomic operations
+ // Support for atomic operations (also used by ForkJoinPool)
private static final Unsafe U;
- private static final long PHASE;
- private static final long BASE;
- private static final long TOP;
- private static final long ARRAY;
+ static final long PHASE;
+ static final long BASE;
+ static final long TOP;
+ static final long ARRAY;
- final void updateBase(int v) {
- U.putIntVolatile(this, BASE, v);
- }
- final void updateTop(int v) {
- U.putIntOpaque(this, TOP, v);
- }
- final void updateArray(ForkJoinTask>[] a) {
- U.getAndSetReference(this, ARRAY, a);
- }
- final void unlockPhase() {
- U.getAndAddInt(this, PHASE, IDLE);
- }
- final boolean tryLockPhase() { // seqlock acquire
+ /**
+ * SeqLock acquire for external queues.
+ * @return 1 if cannot acquire lock, else current phase
+ */
+ final int tryLockPhase() {
int p;
return (((p = phase) & IDLE) != 0 &&
- U.compareAndSetInt(this, PHASE, p, p + IDLE));
+ (p == U.compareAndExchangeInt(this, PHASE, p, p + IDLE))) ?
+ p : 1;
}
/**
@@ -1248,61 +1242,57 @@ public class ForkJoinPool extends AbstractExecutorService
/**
* Pushes a task. Called only by owner or if already locked
*
- * @param task the task; no-op if null
+ * @param task the task; caller must ensure nonnull
* @param pool the pool to signal if was previously empty, else null
- * @param internal if caller owns this queue
+ * @param unlock if not 1, phase unlock value
* @throws RejectedExecutionException if array could not be resized
*/
- final void push(ForkJoinTask> task, ForkJoinPool pool, boolean internal) {
- int s = top, b = base, m, cap, room; ForkJoinTask>[] a;
- if ((a = array) != null && (cap = a.length) > 0) { // else disabled
- if ((room = (m = cap - 1) - (s - b)) >= 0) {
- top = s + 1;
- long pos = slotOffset(m & s);
- if (!internal)
- U.putReference(a, pos, task); // inside lock
- else
- U.getAndSetReference(a, pos, task); // fully fenced
- if (room == 0)
- growArray(a, cap, s);
- }
- if (!internal)
- unlockPhase();
- if (room < 0)
- throw new RejectedExecutionException("Queue capacity exceeded");
- if (pool != null &&
- (room == 0 ||
- U.getReferenceAcquire(a, slotOffset(m & (s - 1))) == null))
- pool.signalWork(this, s); // may have appeared empty
+ final void push(ForkJoinTask> task, ForkJoinPool pool, int unlock) {
+ ForkJoinTask>[] a = array;
+ int b = base, s = top, m;
+ if (a != null &&
+ (a.length > s + 1 - b || (a = growArray()) != null) &&
+ (m = a.length - 1) >= 0) { // else rejected or disabled
+ top = s + 1;
+ U.putReferenceVolatile(a, slotOffset(m & s), task);
+ if (unlock != 1) // release external lock
+ U.putInt(this, PHASE, unlock);
+ if (U.getReferenceAcquire(a, slotOffset(m & (s - 1))) == null &&
+ pool != null)
+ pool.signalWork(this, s); // may have appeared empty
}
}
/**
* Resizes the queue array unless out of memory.
- * @param a old array
- * @param cap old array capacity
- * @param s current top
+ * @return new array, or throw on OOME
*/
- private void growArray(ForkJoinTask>[] a, int cap, int s) {
- int newCap = (cap >= 1 << 16) ? cap << 1 : cap << 2;
- ForkJoinTask>[] newArray = null;
- if (a != null && a.length == cap && cap > 0 && newCap > 0) {
+ private ForkJoinTask>[] growArray() {
+ ForkJoinTask>[] a; int cap, newCap;
+ if ((a = array) != null && (cap = a.length) > 0 &&
+ (newCap = (cap >= 1 << 16) ? cap << 1 : cap << 2) > 0) {
+ ForkJoinTask>[] newArray = null;
try {
newArray = new ForkJoinTask>[newCap];
} catch (OutOfMemoryError ex) {
}
- if (newArray != null) { // else throw on next push
+ if (newArray != null) {
int mask = cap - 1, newMask = newCap - 1;
- for (int k = s, j = cap; j > 0; --j, --k) {
+ for (int k = top - 1, j = cap; j > 0; --j, --k) {
ForkJoinTask> u; // poll old, push to new
if ((u = (ForkJoinTask>)U.getAndSetReference(
a, slotOffset(k & mask), null)) == null)
break; // lost to pollers
newArray[k & newMask] = u;
}
- updateArray(newArray); // fully fenced
+ U.putReferenceVolatile(this, ARRAY, newArray);
+ return newArray;
}
}
+ int f = phase; // unlock if externally locked
+ if ((f & (IDLE | 1)) == 0)
+ phase = f + IDLE;
+ throw new RejectedExecutionException("Queue capacity exceeded");
}
/**
@@ -1314,7 +1304,7 @@ public class ForkJoinPool extends AbstractExecutorService
if ((a = array) != null && (cap = a.length) > 0 &&
U.getReference(a, k = slotOffset((cap - 1) & s)) != null &&
(t = (ForkJoinTask>)U.getAndSetReference(a, k, null)) != null)
- updateTop(s);
+ U.putIntOpaque(this, TOP, s);
return t;
}
@@ -1336,7 +1326,7 @@ public class ForkJoinPool extends AbstractExecutorService
}
else if ((t = (ForkJoinTask>)
U.getAndSetReference(a, k, null)) != null) {
- updateBase(nb);
+ U.putIntOpaque(this, BASE, nb);
break;
}
else
@@ -1361,16 +1351,16 @@ public class ForkJoinPool extends AbstractExecutorService
final boolean tryUnpush(ForkJoinTask> task, boolean internal) {
boolean taken = false;
ForkJoinTask>[] a = array;
- int p = top, s = p - 1, cap; long k;
+ int p = top, s = p - 1, lock = 0, cap; long k;
if (a != null && (cap = a.length) > 0 &&
U.getReference(a, k = slotOffset((cap - 1) & s)) == task &&
- (internal || tryLockPhase())) {
+ (internal || (lock = tryLockPhase()) != 1)) {
if (top == p && U.compareAndSetReference(a, k, task, null)) {
taken = true;
- updateTop(s);
+ U.putIntOpaque(this, TOP, s);
}
if (!internal)
- unlockPhase();
+ U.putIntRelease(this, PHASE, lock + NEXTIDLE);
}
return taken;
}
@@ -1416,7 +1406,7 @@ public class ForkJoinPool extends AbstractExecutorService
Thread.onSpinWait(); // stalled
}
else if (U.compareAndSetReference(a, k, t, null)) {
- updateBase(nb);
+ U.putIntVolatile(this, BASE, nb);
return t;
}
}
@@ -1474,25 +1464,26 @@ public class ForkJoinPool extends AbstractExecutorService
if (t == null)
break;
if (t == task) {
- if (!internal && !tryLockPhase())
+ int lock = 0;
+ if (!internal && (lock = tryLockPhase()) == 1)
break; // fail if locked
if (taken =
(top == p &&
U.compareAndSetReference(a, k, task, null))) {
if (i == s) // act as pop
- updateTop(s);
+ top = s;
else if (i == base) // act as poll
- updateBase(i + 1);
+ base = i + 1;
else { // swap with top
U.putReferenceVolatile(
a, k, (ForkJoinTask>)
U.getAndSetReference(
a, slotOffset(s & m), null));
- updateTop(s);
+ top = s;
}
}
if (!internal)
- unlockPhase();
+ U.putIntRelease(this, PHASE, lock + NEXTIDLE);
if (taken)
task.doExec();
break;
@@ -1531,14 +1522,15 @@ public class ForkJoinPool extends AbstractExecutorService
if ((f = f.completer) == null || --steps == 0)
break outer;
}
- if (!internal && !tryLockPhase())
+ int lock = 0;
+ if (!internal && (lock = tryLockPhase()) == 1)
break;
if (taken =
(top == p &&
U.compareAndSetReference(a, k, t, null)))
- updateTop(s);
+ top = s;
if (!internal)
- unlockPhase();
+ U.putIntRelease(this, PHASE, lock + NEXTIDLE);
if (!taken)
break;
t.doExec();
@@ -1573,7 +1565,7 @@ public class ForkJoinPool extends AbstractExecutorService
break;
if (base == b && t != null &&
U.compareAndSetReference(a, k, t, null)) {
- updateBase(b + 1);
+ base = b + 1;
t.doExec();
}
}
@@ -1672,46 +1664,15 @@ public class ForkJoinPool extends AbstractExecutorService
private static final Object POOLIDS_BASE;
private static final long POOLIDS;
- private boolean compareAndSetCtl(long c, long v) {
- return U.compareAndSetLong(this, CTL, c, v);
- }
- private long compareAndExchangeCtl(long c, long v) {
- return U.compareAndExchangeLong(this, CTL, c, v);
- }
- private long getAndAddCtl(long v) {
- return U.getAndAddLong(this, CTL, v);
- }
- private long incrementThreadIds() {
- return U.getAndAddLong(this, THREADIDS, 1L);
- }
- private static int getAndAddPoolIds(int x) {
- return U.getAndAddInt(POOLIDS_BASE, POOLIDS, x);
- }
- private int getAndSetParallelism(int v) {
- return U.getAndSetInt(this, PARALLELISM, v);
- }
- private int getParallelismOpaque() {
- return U.getIntOpaque(this, PARALLELISM);
- }
- private CountDownLatch cmpExTerminationSignal(CountDownLatch x) {
- return (CountDownLatch)
- U.compareAndExchangeReference(this, TERMINATION, null, x);
- }
+ // runState locking operations
- // runState operations
-
- private long getAndBitwiseOrRunState(long v) { // for status bits
- return U.getAndBitwiseOrLong(this, RUNSTATE, v);
- }
- private boolean casRunState(long c, long v) {
- return U.compareAndSetLong(this, RUNSTATE, c, v);
- }
private void unlockRunState() { // increment lock bit
U.getAndAddLong(this, RUNSTATE, RS_LOCK);
}
private long lockRunState() { // lock and return current state
long s, u; // locked when RS_LOCK set
- if (((s = runState) & RS_LOCK) == 0L && casRunState(s, u = s + RS_LOCK))
+ if (((s = runState) & RS_LOCK) == 0L &&
+ U.compareAndSetLong(this, RUNSTATE, s, u = s + RS_LOCK))
return u;
else
return spinLockRunState();
@@ -1720,7 +1681,7 @@ public class ForkJoinPool extends AbstractExecutorService
for (int waits = 0;;) {
long s, u;
if (((s = runState) & RS_LOCK) == 0L) {
- if (casRunState(s, u = s + RS_LOCK))
+ if (U.compareAndSetLong(this, RUNSTATE, s, u = s + RS_LOCK))
return u;
waits = 0;
} else if (waits < SPIN_WAITS) {
@@ -1775,7 +1736,7 @@ public class ForkJoinPool extends AbstractExecutorService
*/
final String nextWorkerThreadName() {
String prefix = workerNamePrefix;
- long tid = incrementThreadIds() + 1L;
+ long tid = U.getAndAddLong(this, THREADIDS, 1L) + 1L;
if (prefix == null) // commonPool has no prefix
prefix = "ForkJoinPool.commonPool-worker-";
return prefix.concat(Long.toString(tid));
@@ -1846,7 +1807,7 @@ public class ForkJoinPool extends AbstractExecutorService
releaseWaiters(); // ensure released
if (w == null || w.source != DROPPED) {
long c = ctl; // decrement counts
- do {} while (c != (c = compareAndExchangeCtl(
+ do {} while (c != (c = U.compareAndExchangeLong(this, CTL,
c, ((RC_MASK & (c - RC_UNIT)) |
(TC_MASK & (c - TC_UNIT)) |
(LMASK & c)))));
@@ -1875,14 +1836,16 @@ public class ForkJoinPool extends AbstractExecutorService
/**
* Releases an idle worker, or creates one if not enough exist,
- * giving up q is nonull and signalled slot already taken.
+ * giving up on contention if q is nonull and signalled slot
+ * already taken.
*
* @param q, if nonnull, the WorkQueue containing signalled task
* @param qbase q's base index for the task
*/
final void signalWork(WorkQueue q, int qbase) {
int pc = parallelism;
- for (long c = ctl;;) {
+ for (;;) {
+ long c = ctl;
WorkQueue[] qs = queues;
long ac = (c + RC_UNIT) & RC_MASK, nc;
int sp = (int)c, i = sp & SMASK;
@@ -1902,7 +1865,7 @@ public class ForkJoinPool extends AbstractExecutorService
break;
else
nc = (v.stackPred & LMASK) | (c & TC_MASK) | ac;
- if (c == (c = compareAndExchangeCtl(c, nc))) {
+ if (U.compareAndSetLong(this, CTL, c, nc)) {
if (v == null)
createWorker();
else {
@@ -1926,7 +1889,7 @@ public class ForkJoinPool extends AbstractExecutorService
if ((sp = (int)c) == 0 || (qs = queues) == null ||
qs.length <= (i = sp & SMASK) || (v = qs[i]) == null)
break;
- if (c == (c = compareAndExchangeCtl(
+ if (c == (c = U.compareAndExchangeLong(this, CTL,
c, ((UMASK & (c + RC_UNIT)) | (c & TC_MASK) |
(v.stackPred & LMASK))))) {
v.phase = sp;
@@ -1972,7 +1935,8 @@ public class ForkJoinPool extends AbstractExecutorService
return 0;
else if ((ds = delayScheduler) != null && !ds.canShutDown())
return 0;
- else if (compareAndSetCtl(c, c) && casRunState(e, e | STOP))
+ else if (U.compareAndSetLong(this, CTL, c, c) &&
+ U.compareAndSetLong(this, RUNSTATE, e, e | STOP))
return 1; // enable termination
else
break; // restart
@@ -1991,6 +1955,7 @@ public class ForkJoinPool extends AbstractExecutorService
WorkQueue[] qs;
int r = w.stackPred; // seed from registerWorker
int fifo = (int)config & FIFO, rescans = 0, inactive = 0, taken = 0, n;
+ int src = -1; // last queue taken from
while ((runState & STOP) == 0L && (qs = queues) != null &&
(n = qs.length) > 0) {
int i = r, step = (r >>> 16) | 1;
@@ -2024,12 +1989,14 @@ public class ForkJoinPool extends AbstractExecutorService
}
}
else if (U.compareAndSetReference(a, bp, t, null)) {
- q.base = nb;
- w.source = qid;
- rescans = 1;
- ++taken;
- if (U.getReferenceAcquire(a, np) != null)
+ Object nt = U.getReference(a, np);
+ U.getAndSetInt(q, WorkQueue.BASE, nb);
+ if (nt != null)
signalWork(q, nb); // propagate
+ ++taken;
+ rescans = 1;
+ if (src != qid)
+ w.source = src = qid;
w.topLevelExec(t, fifo);
}
}
@@ -2042,8 +2009,10 @@ public class ForkJoinPool extends AbstractExecutorService
if ((inactive = deactivate(w, taken)) != 0)
taken = 0;
}
- else if (awaitWork(w) == 0)
+ else if (awaitWork(w) == 0) {
inactive = rescans = 0;
+ src = -1;
+ }
else
break;
}
@@ -2059,12 +2028,13 @@ public class ForkJoinPool extends AbstractExecutorService
*/
private int deactivate(WorkQueue w, int taken) {
int inactive = 0, phase;
- if (w != null && (inactive = (phase = w.phase) & IDLE) == 0) {
+ if (w != null && (inactive = // plain accesses until CAS
+ (phase = U.getInt(w, WorkQueue.PHASE)) & IDLE) == 0) {
long sp = (phase + (IDLE << 1)) & LMASK, pc, c;
- w.phase = phase | IDLE;
- w.stackPred = (int)(pc = ctl); // set ctl stack link
- if (!compareAndSetCtl( // try to enqueue
- pc, c = ((pc - RC_UNIT) & UMASK) | sp))
+ U.putInt(w, WorkQueue.PHASE, phase | IDLE);
+ w.stackPred = (int)(pc = ctl); // try to enqueue
+ if (!U.compareAndSetLong(this, CTL, pc,
+ c = ((pc - RC_UNIT) & UMASK) | sp))
w.phase = phase; // back out on contention
else {
if (taken != 0) {
@@ -2098,7 +2068,8 @@ public class ForkJoinPool extends AbstractExecutorService
int sp = w.stackPred, phase, activePhase; long c;
if ((inactive = (phase = w.phase) & IDLE) != 0 &&
(int)(c = ctl) == (activePhase = phase + IDLE) &&
- compareAndSetCtl(c, (sp & LMASK) | ((c + RC_UNIT) & UMASK))) {
+ U.compareAndSetLong(this, CTL, c,
+ (sp & LMASK) | ((c + RC_UNIT) & UMASK))) {
w.phase = activePhase;
inactive = 0;
}
@@ -2159,12 +2130,12 @@ public class ForkJoinPool extends AbstractExecutorService
int vp, i; WorkQueue[] vs; WorkQueue v;
long nc = ((w.stackPred & LMASK) |
((RC_MASK & c) | (TC_MASK & (c - TC_UNIT))));
- if (compareAndSetCtl(c, nc)) {
+ if (U.compareAndSetLong(this, CTL, c, nc)) {
w.source = DROPPED;
w.phase = activePhase;
if ((vp = (int)nc) != 0 && (vs = queues) != null &&
vs.length > (i = vp & SMASK) && (v = vs[i]) != null &&
- compareAndSetCtl( // try to wake up next waiter
+ U.compareAndSetLong(this, CTL, // try to wake up next waiter
nc, ((v.stackPred & LMASK) |
((UMASK & (nc + RC_UNIT)) | (nc & TC_MASK))))) {
v.source = INVALID_ID; // enable cascaded timeouts
@@ -2227,7 +2198,8 @@ public class ForkJoinPool extends AbstractExecutorService
WorkQueue[] qs; WorkQueue v; int i;
if ((qs = queues) != null && qs.length > (i = sp & SMASK) &&
(v = qs[i]) != null &&
- compareAndSetCtl(c, (c & UMASK) | (v.stackPred & LMASK))) {
+ U.compareAndSetLong(this, CTL, c,
+ (c & UMASK) | (v.stackPred & LMASK))) {
v.phase = sp;
if (v.parking != 0)
U.unpark(v.owner);
@@ -2235,17 +2207,18 @@ public class ForkJoinPool extends AbstractExecutorService
}
}
else if (active > minActive && total >= pc) { // reduce active workers
- if (compareAndSetCtl(c, ((c - RC_UNIT) & RC_MASK) | (c & ~RC_MASK)))
+ if (U.compareAndSetLong(this, CTL, c,
+ ((c - RC_UNIT) & RC_MASK) | (c & ~RC_MASK)))
stat = UNCOMPENSATE;
}
else if (total < maxTotal && total < MAX_CAP) { // try to expand pool
long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK);
if ((runState & STOP) != 0L) // terminating
stat = 0;
- else if (compareAndSetCtl(c, nc))
+ else if (U.compareAndSetLong(this, CTL, c, nc))
stat = createWorker() ? UNCOMPENSATE : 0;
}
- else if (!compareAndSetCtl(c, c)) // validate
+ else if (!U.compareAndSetLong(this, CTL, c, c)) // validate
;
else if ((sat = saturate) != null && sat.test(this))
stat = 0;
@@ -2259,7 +2232,7 @@ public class ForkJoinPool extends AbstractExecutorService
* Readjusts RC count; called from ForkJoinTask after blocking.
*/
final void uncompensate() {
- getAndAddCtl(RC_UNIT);
+ U.getAndAddLong(this, CTL, RC_UNIT);
}
/**
@@ -2330,7 +2303,7 @@ public class ForkJoinPool extends AbstractExecutorService
}
if (U.compareAndSetReference(a, k, t, null)) {
q.base = b + 1;
- w.source = j; // volatile write
+ w.source = j;
t.doExec();
w.source = wsrc;
rescan = true; // restart at index r
@@ -2401,7 +2374,7 @@ public class ForkJoinPool extends AbstractExecutorService
if (eligible) {
if (U.compareAndSetReference(
a, k, t, null)) {
- q.updateBase(b + 1);
+ q.base = b + 1;
t.doExec();
locals = rescan = true;
break scan;
@@ -2484,7 +2457,7 @@ public class ForkJoinPool extends AbstractExecutorService
}
if (U.compareAndSetReference(a, k, t, null)) {
q.base = nb;
- w.source = j; // volatile write
+ w.source = j;
t.doExec();
w.source = wsrc;
rescan = locals = true;
@@ -2595,14 +2568,15 @@ public class ForkJoinPool extends AbstractExecutorService
* @param rejectOnShutdown true if RejectedExecutionException
* should be thrown when shutdown
*/
- final WorkQueue externalSubmissionQueue(boolean rejectOnShutdown) {
+ final void externalPush(ForkJoinTask> task, boolean signalIfEmpty,
+ boolean rejectOnShutdown) {
int r;
if ((r = ThreadLocalRandom.getProbe()) == 0) {
ThreadLocalRandom.localInit(); // initialize caller's probe
r = ThreadLocalRandom.getProbe();
}
for (;;) {
- WorkQueue q; WorkQueue[] qs; int n, id, i;
+ WorkQueue q; WorkQueue[] qs; int n, id, i, lock;
if ((qs = queues) == null || (n = qs.length) <= 0)
break;
if ((q = qs[i = (id = r & EXTERNAL_ID_MASK) & (n - 1)]) == null) {
@@ -2612,31 +2586,27 @@ public class ForkJoinPool extends AbstractExecutorService
q = qs[i] = newq; // else lost race to install
unlockRunState();
}
- if (q != null && q.tryLockPhase()) {
+ if (q != null && (lock = q.tryLockPhase()) != 1) {
+ int unlock = lock + NEXTIDLE;
if (rejectOnShutdown && (runState & SHUTDOWN) != 0L) {
- q.unlockPhase(); // check while q lock held
- break;
+ q.phase = unlock;
+ break; // check while q lock held
}
- return q;
+ q.push(task, signalIfEmpty ? this : null, unlock);
+ return;
}
r = ThreadLocalRandom.advanceProbe(r); // move
}
throw new RejectedExecutionException();
}
- private ForkJoinTask poolSubmit(boolean signalIfEmpty, ForkJoinTask task) {
- Thread t; ForkJoinWorkerThread wt; WorkQueue q; boolean internal;
+ private void poolSubmit(ForkJoinTask> task, boolean signalIfEmpty) {
+ Thread t; ForkJoinWorkerThread wt;
if (((t = JLA.currentCarrierThread()) instanceof ForkJoinWorkerThread) &&
- (wt = (ForkJoinWorkerThread)t).pool == this) {
- internal = true;
- q = wt.workQueue;
- }
- else { // find and lock queue
- internal = false;
- q = externalSubmissionQueue(true);
- }
- q.push(task, signalIfEmpty ? this : null, internal);
- return task;
+ (wt = (ForkJoinWorkerThread)t).pool == this)
+ wt.workQueue.push(task, signalIfEmpty ? this : null, 1);
+ else
+ externalPush(task, signalIfEmpty, true);
}
/**
@@ -2751,7 +2721,8 @@ public class ForkJoinPool extends AbstractExecutorService
else if ((e & STOP) != 0L)
now = true;
else if (now) {
- if (((ps = getAndBitwiseOrRunState(SHUTDOWN|STOP) & STOP)) == 0L) {
+ if (((ps = U.getAndBitwiseOrLong(this, RUNSTATE, SHUTDOWN | STOP)) &
+ STOP) == 0L) {
if ((ps & RS_LOCK) != 0L) {
spinLockRunState(); // ensure queues array stable after stop
unlockRunState();
@@ -2762,7 +2733,7 @@ public class ForkJoinPool extends AbstractExecutorService
else if ((isShutdown = (e & SHUTDOWN)) != 0L || enable) {
long quiet; DelayScheduler ds;
if (isShutdown == 0L)
- getAndBitwiseOrRunState(SHUTDOWN);
+ U.getAndBitwiseOrLong(this, RUNSTATE, SHUTDOWN);
if ((quiet = quiescent()) > 0)
now = true;
else if (quiet == 0 && (ds = delayScheduler) != null)
@@ -2778,7 +2749,7 @@ public class ForkJoinPool extends AbstractExecutorService
if (((e = runState) & CLEANED) == 0L) {
boolean clean = cleanQueues();
if (((e = runState) & CLEANED) == 0L && clean)
- e = getAndBitwiseOrRunState(CLEANED) | CLEANED;
+ e = U.getAndBitwiseOrLong(this, RUNSTATE, CLEANED) | CLEANED;
}
if ((e & TERMINATED) != 0L)
break;
@@ -2788,7 +2759,8 @@ public class ForkJoinPool extends AbstractExecutorService
break;
if ((e & CLEANED) != 0L) {
e |= TERMINATED;
- if ((getAndBitwiseOrRunState(TERMINATED) & TERMINATED) == 0L) {
+ if ((U.getAndBitwiseOrLong(this, RUNSTATE, TERMINATED) &
+ TERMINATED) == 0L) {
CountDownLatch done; SharedThreadContainer ctr;
if ((done = termination) != null)
done.countDown();
@@ -2826,7 +2798,7 @@ public class ForkJoinPool extends AbstractExecutorService
a, k = slotOffset((cap - 1) & (b = q.base)));
if (q.base == b && t != null &&
U.compareAndSetReference(a, k, t, null)) {
- q.updateBase(b + 1);
+ q.base = b + 1;
try {
t.cancel(false);
} catch (Throwable ignore) {
@@ -2867,7 +2839,8 @@ public class ForkJoinPool extends AbstractExecutorService
private CountDownLatch terminationSignal() {
CountDownLatch signal, s, u;
if ((signal = termination) == null)
- signal = ((u = cmpExTerminationSignal(
+ signal = ((u = (CountDownLatch)U.compareAndExchangeReference(
+ this, TERMINATION, null,
s = new CountDownLatch(1))) == null) ? s : u;
return signal;
}
@@ -3035,7 +3008,7 @@ public class ForkJoinPool extends AbstractExecutorService
(((long)maxSpares) << TC_SHIFT) |
(((long)minAvail) << RC_SHIFT));
this.queues = new WorkQueue[size];
- String pid = Integer.toString(getAndAddPoolIds(1) + 1);
+ String pid = Integer.toString(U.getAndAddInt(POOLIDS_BASE, POOLIDS, 1) + 1);
String name = "ForkJoinPool-" + pid;
this.poolName = name;
this.workerNamePrefix = name + "-worker-";
@@ -3145,7 +3118,7 @@ public class ForkJoinPool extends AbstractExecutorService
* scheduled for execution
*/
public T invoke(ForkJoinTask task) {
- poolSubmit(true, Objects.requireNonNull(task));
+ poolSubmit(Objects.requireNonNull(task), true);
try {
return task.join();
} catch (RuntimeException | Error unchecked) {
@@ -3164,7 +3137,7 @@ public class ForkJoinPool extends AbstractExecutorService
* scheduled for execution
*/
public void execute(ForkJoinTask> task) {
- poolSubmit(true, Objects.requireNonNull(task));
+ poolSubmit(Objects.requireNonNull(task), true);
}
// AbstractExecutorService methods
@@ -3177,9 +3150,9 @@ public class ForkJoinPool extends AbstractExecutorService
@Override
@SuppressWarnings("unchecked")
public void execute(Runnable task) {
- poolSubmit(true, (Objects.requireNonNull(task) instanceof ForkJoinTask>)
+ poolSubmit((Objects.requireNonNull(task) instanceof ForkJoinTask>)
? (ForkJoinTask) task // avoid re-wrap
- : new ForkJoinTask.RunnableExecuteAction(task));
+ : new ForkJoinTask.RunnableExecuteAction(task), true);
}
/**
@@ -3197,7 +3170,8 @@ public class ForkJoinPool extends AbstractExecutorService
* scheduled for execution
*/
public ForkJoinTask submit(ForkJoinTask task) {
- return poolSubmit(true, Objects.requireNonNull(task));
+ poolSubmit(Objects.requireNonNull(task), true);
+ return task;
}
/**
@@ -3208,11 +3182,12 @@ public class ForkJoinPool extends AbstractExecutorService
@Override
public ForkJoinTask submit(Callable task) {
Objects.requireNonNull(task);
- return poolSubmit(
- true,
+ ForkJoinTask t =
(Thread.currentThread() instanceof ForkJoinWorkerThread) ?
new ForkJoinTask.AdaptedCallable(task) :
- new ForkJoinTask.AdaptedInterruptibleCallable(task));
+ new ForkJoinTask.AdaptedInterruptibleCallable(task);
+ poolSubmit(t, true);
+ return t;
}
/**
@@ -3223,11 +3198,12 @@ public class ForkJoinPool extends AbstractExecutorService
@Override
public ForkJoinTask submit(Runnable task, T result) {
Objects.requireNonNull(task);
- return poolSubmit(
- true,
+ ForkJoinTask t =
(Thread.currentThread() instanceof ForkJoinWorkerThread) ?
new ForkJoinTask.AdaptedRunnable(task, result) :
- new ForkJoinTask.AdaptedInterruptibleRunnable(task, result));
+ new ForkJoinTask.AdaptedInterruptibleRunnable(task, result);
+ poolSubmit(t, true);
+ return t;
}
/**
@@ -3239,13 +3215,14 @@ public class ForkJoinPool extends AbstractExecutorService
@SuppressWarnings("unchecked")
public ForkJoinTask> submit(Runnable task) {
Objects.requireNonNull(task);
- return poolSubmit(
- true,
+ ForkJoinTask> t =
(task instanceof ForkJoinTask>) ?
(ForkJoinTask) task : // avoid re-wrap
((Thread.currentThread() instanceof ForkJoinWorkerThread) ?
new ForkJoinTask.AdaptedRunnable(task, null) :
- new ForkJoinTask.AdaptedInterruptibleRunnable(task, null)));
+ new ForkJoinTask.AdaptedInterruptibleRunnable(task, null));
+ poolSubmit(t, true);
+ return t;
}
/**
@@ -3267,7 +3244,7 @@ public class ForkJoinPool extends AbstractExecutorService
*/
public ForkJoinTask externalSubmit(ForkJoinTask task) {
Objects.requireNonNull(task);
- externalSubmissionQueue(true).push(task, this, false);
+ externalPush(task, true, true);
return task;
}
@@ -3288,7 +3265,8 @@ public class ForkJoinPool extends AbstractExecutorService
* @since 19
*/
public ForkJoinTask lazySubmit(ForkJoinTask task) {
- return poolSubmit(false, Objects.requireNonNull(task));
+ poolSubmit(Objects.requireNonNull(task), false);
+ return task;
}
/**
@@ -3316,7 +3294,7 @@ public class ForkJoinPool extends AbstractExecutorService
throw new IllegalArgumentException();
if ((config & PRESET_SIZE) != 0)
throw new UnsupportedOperationException("Cannot override System property");
- if ((prevSize = getAndSetParallelism(size)) < size)
+ if ((prevSize = U.getAndSetInt(this, PARALLELISM, size)) < size)
signalWork(null, 0); // trigger worker activation
return prevSize;
}
@@ -3351,7 +3329,7 @@ public class ForkJoinPool extends AbstractExecutorService
for (Callable t : tasks) {
ForkJoinTask f = ForkJoinTask.adapt(t);
futures.add(f);
- poolSubmit(true, f);
+ poolSubmit(f, true);
}
for (int i = futures.size() - 1; i >= 0; --i)
((ForkJoinTask>)futures.get(i)).quietlyJoin();
@@ -3374,7 +3352,7 @@ public class ForkJoinPool extends AbstractExecutorService
for (Callable t : tasks) {
ForkJoinTask f = ForkJoinTask.adaptInterruptible(t);
futures.add(f);
- poolSubmit(true, f);
+ poolSubmit(f, true);
}
for (int i = futures.size() - 1; i >= 0; --i)
((ForkJoinTask>)futures.get(i))
@@ -3490,7 +3468,7 @@ public class ForkJoinPool extends AbstractExecutorService
* elapsed
*/
final void executeEnabledScheduledTask(ScheduledForkJoinTask> task) {
- externalSubmissionQueue(false).push(task, this, false);
+ externalPush(task, true, false);
}
/**
@@ -3731,7 +3709,8 @@ public class ForkJoinPool extends AbstractExecutorService
onTimeout.task = task =
new ForkJoinTask.CallableWithTimeout(callable, timeoutTask);
scheduleDelayedTask(timeoutTask);
- return poolSubmit(true, task);
+ poolSubmit(task, true);
+ return task;
}
/**
@@ -3778,7 +3757,7 @@ public class ForkJoinPool extends AbstractExecutorService
* @return the targeted parallelism level of this pool
*/
public int getParallelism() {
- return Math.max(getParallelismOpaque(), 1);
+ return Math.max(U.getIntOpaque(this, PARALLELISM), 1);
}
/**
@@ -4330,7 +4309,7 @@ public class ForkJoinPool extends AbstractExecutorService
done = blocker.block();
} finally {
if (comp > 0)
- getAndAddCtl(RC_UNIT);
+ U.getAndAddLong(this, CTL, RC_UNIT);
}
if (done)
break;
@@ -4357,7 +4336,7 @@ public class ForkJoinPool extends AbstractExecutorService
*/
void endCompensatedBlock(long post) {
if (post > 0L) {
- getAndAddCtl(post);
+ U.getAndAddLong(this, CTL, post);
}
}
diff --git a/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java b/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java
index 137cac45ed0..b4b9ecc0c9b 100644
--- a/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java
+++ b/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java
@@ -641,15 +641,10 @@ public abstract class ForkJoinTask implements Future, Serializable {
*/
public final ForkJoinTask fork() {
Thread t; ForkJoinWorkerThread wt;
- ForkJoinPool p; ForkJoinPool.WorkQueue q; boolean internal;
- if (internal =
- (t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
- q = (wt = (ForkJoinWorkerThread)t).workQueue;
- p = wt.pool;
- }
+ if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
+ ((wt = (ForkJoinWorkerThread)t).workQueue).push(this, wt.pool, 1);
else
- q = (p = ForkJoinPool.common).externalSubmissionQueue(false);
- q.push(this, p, internal);
+ ForkJoinPool.common.externalPush(this, true, false);
return this;
}