Check reworked ordering control

This commit is contained in:
Doug Lea 2025-12-23 14:54:08 -05:00
parent b0b37d385a
commit ac619505e8
2 changed files with 164 additions and 190 deletions

View File

@ -1078,6 +1078,7 @@ public class ForkJoinPool extends AbstractExecutorService
static final int DROPPED = 1 << 16; // removed from ctl counts
static final int UNCOMPENSATE = 1 << 16; // tryCompensate return
static final int IDLE = 1 << 16; // phase seqlock/version count
static final int NEXTIDLE = IDLE << 1; // next IDLE phase bit
static final int MIN_QUEUES_SIZE = 1 << 4; // ensure external slots
/*
@ -1188,33 +1189,26 @@ public class ForkJoinPool extends AbstractExecutorService
@jdk.internal.vm.annotation.Contended("w")
volatile int parking; // nonzero if parked in awaitWork
@jdk.internal.vm.annotation.Contended("w")
volatile int source; // source queue id (or DROPPED)
int source; // source queue id (or DROPPED)
@jdk.internal.vm.annotation.Contended("w")
int nsteals; // number of steals from other queues
// Support for atomic operations
// Support for atomic operations (also used by ForkJoinPool)
private static final Unsafe U;
private static final long PHASE;
private static final long BASE;
private static final long TOP;
private static final long ARRAY;
static final long PHASE;
static final long BASE;
static final long TOP;
static final long ARRAY;
final void updateBase(int v) {
U.putIntVolatile(this, BASE, v);
}
final void updateTop(int v) {
U.putIntOpaque(this, TOP, v);
}
final void updateArray(ForkJoinTask<?>[] a) {
U.getAndSetReference(this, ARRAY, a);
}
final void unlockPhase() {
U.getAndAddInt(this, PHASE, IDLE);
}
final boolean tryLockPhase() { // seqlock acquire
/**
* SeqLock acquire for external queues.
* @return 1 if cannot acquire lock, else current phase
*/
final int tryLockPhase() {
int p;
return (((p = phase) & IDLE) != 0 &&
U.compareAndSetInt(this, PHASE, p, p + IDLE));
(p == U.compareAndExchangeInt(this, PHASE, p, p + IDLE))) ?
p : 1;
}
/**
@ -1248,61 +1242,57 @@ public class ForkJoinPool extends AbstractExecutorService
/**
* Pushes a task. Called only by owner or if already locked
*
* @param task the task; no-op if null
* @param task the task; caller must ensure nonnull
* @param pool the pool to signal if was previously empty, else null
* @param internal if caller owns this queue
* @param unlock if not 1, phase unlock value
* @throws RejectedExecutionException if array could not be resized
*/
final void push(ForkJoinTask<?> task, ForkJoinPool pool, boolean internal) {
int s = top, b = base, m, cap, room; ForkJoinTask<?>[] a;
if ((a = array) != null && (cap = a.length) > 0) { // else disabled
if ((room = (m = cap - 1) - (s - b)) >= 0) {
top = s + 1;
long pos = slotOffset(m & s);
if (!internal)
U.putReference(a, pos, task); // inside lock
else
U.getAndSetReference(a, pos, task); // fully fenced
if (room == 0)
growArray(a, cap, s);
}
if (!internal)
unlockPhase();
if (room < 0)
throw new RejectedExecutionException("Queue capacity exceeded");
if (pool != null &&
(room == 0 ||
U.getReferenceAcquire(a, slotOffset(m & (s - 1))) == null))
pool.signalWork(this, s); // may have appeared empty
final void push(ForkJoinTask<?> task, ForkJoinPool pool, int unlock) {
ForkJoinTask<?>[] a = array;
int b = base, s = top, m;
if (a != null &&
(a.length > s + 1 - b || (a = growArray()) != null) &&
(m = a.length - 1) >= 0) { // else rejected or disabled
top = s + 1;
U.putReferenceVolatile(a, slotOffset(m & s), task);
if (unlock != 1) // release external lock
U.putInt(this, PHASE, unlock);
if (U.getReferenceAcquire(a, slotOffset(m & (s - 1))) == null &&
pool != null)
pool.signalWork(this, s); // may have appeared empty
}
}
/**
* Resizes the queue array unless out of memory.
* @param a old array
* @param cap old array capacity
* @param s current top
* @return new array, or throw on OOME
*/
private void growArray(ForkJoinTask<?>[] a, int cap, int s) {
int newCap = (cap >= 1 << 16) ? cap << 1 : cap << 2;
ForkJoinTask<?>[] newArray = null;
if (a != null && a.length == cap && cap > 0 && newCap > 0) {
private ForkJoinTask<?>[] growArray() {
ForkJoinTask<?>[] a; int cap, newCap;
if ((a = array) != null && (cap = a.length) > 0 &&
(newCap = (cap >= 1 << 16) ? cap << 1 : cap << 2) > 0) {
ForkJoinTask<?>[] newArray = null;
try {
newArray = new ForkJoinTask<?>[newCap];
} catch (OutOfMemoryError ex) {
}
if (newArray != null) { // else throw on next push
if (newArray != null) {
int mask = cap - 1, newMask = newCap - 1;
for (int k = s, j = cap; j > 0; --j, --k) {
for (int k = top - 1, j = cap; j > 0; --j, --k) {
ForkJoinTask<?> u; // poll old, push to new
if ((u = (ForkJoinTask<?>)U.getAndSetReference(
a, slotOffset(k & mask), null)) == null)
break; // lost to pollers
newArray[k & newMask] = u;
}
updateArray(newArray); // fully fenced
U.putReferenceVolatile(this, ARRAY, newArray);
return newArray;
}
}
int f = phase; // unlock if externally locked
if ((f & (IDLE | 1)) == 0)
phase = f + IDLE;
throw new RejectedExecutionException("Queue capacity exceeded");
}
/**
@ -1314,7 +1304,7 @@ public class ForkJoinPool extends AbstractExecutorService
if ((a = array) != null && (cap = a.length) > 0 &&
U.getReference(a, k = slotOffset((cap - 1) & s)) != null &&
(t = (ForkJoinTask<?>)U.getAndSetReference(a, k, null)) != null)
updateTop(s);
U.putIntOpaque(this, TOP, s);
return t;
}
@ -1336,7 +1326,7 @@ public class ForkJoinPool extends AbstractExecutorService
}
else if ((t = (ForkJoinTask<?>)
U.getAndSetReference(a, k, null)) != null) {
updateBase(nb);
U.putIntOpaque(this, BASE, nb);
break;
}
else
@ -1361,16 +1351,16 @@ public class ForkJoinPool extends AbstractExecutorService
final boolean tryUnpush(ForkJoinTask<?> task, boolean internal) {
boolean taken = false;
ForkJoinTask<?>[] a = array;
int p = top, s = p - 1, cap; long k;
int p = top, s = p - 1, lock = 0, cap; long k;
if (a != null && (cap = a.length) > 0 &&
U.getReference(a, k = slotOffset((cap - 1) & s)) == task &&
(internal || tryLockPhase())) {
(internal || (lock = tryLockPhase()) != 1)) {
if (top == p && U.compareAndSetReference(a, k, task, null)) {
taken = true;
updateTop(s);
U.putIntOpaque(this, TOP, s);
}
if (!internal)
unlockPhase();
U.putIntRelease(this, PHASE, lock + NEXTIDLE);
}
return taken;
}
@ -1416,7 +1406,7 @@ public class ForkJoinPool extends AbstractExecutorService
Thread.onSpinWait(); // stalled
}
else if (U.compareAndSetReference(a, k, t, null)) {
updateBase(nb);
U.putIntVolatile(this, BASE, nb);
return t;
}
}
@ -1474,25 +1464,26 @@ public class ForkJoinPool extends AbstractExecutorService
if (t == null)
break;
if (t == task) {
if (!internal && !tryLockPhase())
int lock = 0;
if (!internal && (lock = tryLockPhase()) == 1)
break; // fail if locked
if (taken =
(top == p &&
U.compareAndSetReference(a, k, task, null))) {
if (i == s) // act as pop
updateTop(s);
top = s;
else if (i == base) // act as poll
updateBase(i + 1);
base = i + 1;
else { // swap with top
U.putReferenceVolatile(
a, k, (ForkJoinTask<?>)
U.getAndSetReference(
a, slotOffset(s & m), null));
updateTop(s);
top = s;
}
}
if (!internal)
unlockPhase();
U.putIntRelease(this, PHASE, lock + NEXTIDLE);
if (taken)
task.doExec();
break;
@ -1531,14 +1522,15 @@ public class ForkJoinPool extends AbstractExecutorService
if ((f = f.completer) == null || --steps == 0)
break outer;
}
if (!internal && !tryLockPhase())
int lock = 0;
if (!internal && (lock = tryLockPhase()) == 1)
break;
if (taken =
(top == p &&
U.compareAndSetReference(a, k, t, null)))
updateTop(s);
top = s;
if (!internal)
unlockPhase();
U.putIntRelease(this, PHASE, lock + NEXTIDLE);
if (!taken)
break;
t.doExec();
@ -1573,7 +1565,7 @@ public class ForkJoinPool extends AbstractExecutorService
break;
if (base == b && t != null &&
U.compareAndSetReference(a, k, t, null)) {
updateBase(b + 1);
base = b + 1;
t.doExec();
}
}
@ -1672,46 +1664,15 @@ public class ForkJoinPool extends AbstractExecutorService
private static final Object POOLIDS_BASE;
private static final long POOLIDS;
private boolean compareAndSetCtl(long c, long v) {
return U.compareAndSetLong(this, CTL, c, v);
}
private long compareAndExchangeCtl(long c, long v) {
return U.compareAndExchangeLong(this, CTL, c, v);
}
private long getAndAddCtl(long v) {
return U.getAndAddLong(this, CTL, v);
}
private long incrementThreadIds() {
return U.getAndAddLong(this, THREADIDS, 1L);
}
private static int getAndAddPoolIds(int x) {
return U.getAndAddInt(POOLIDS_BASE, POOLIDS, x);
}
private int getAndSetParallelism(int v) {
return U.getAndSetInt(this, PARALLELISM, v);
}
private int getParallelismOpaque() {
return U.getIntOpaque(this, PARALLELISM);
}
private CountDownLatch cmpExTerminationSignal(CountDownLatch x) {
return (CountDownLatch)
U.compareAndExchangeReference(this, TERMINATION, null, x);
}
// runState locking operations
// runState operations
private long getAndBitwiseOrRunState(long v) { // for status bits
return U.getAndBitwiseOrLong(this, RUNSTATE, v);
}
private boolean casRunState(long c, long v) {
return U.compareAndSetLong(this, RUNSTATE, c, v);
}
private void unlockRunState() { // increment lock bit
U.getAndAddLong(this, RUNSTATE, RS_LOCK);
}
private long lockRunState() { // lock and return current state
long s, u; // locked when RS_LOCK set
if (((s = runState) & RS_LOCK) == 0L && casRunState(s, u = s + RS_LOCK))
if (((s = runState) & RS_LOCK) == 0L &&
U.compareAndSetLong(this, RUNSTATE, s, u = s + RS_LOCK))
return u;
else
return spinLockRunState();
@ -1720,7 +1681,7 @@ public class ForkJoinPool extends AbstractExecutorService
for (int waits = 0;;) {
long s, u;
if (((s = runState) & RS_LOCK) == 0L) {
if (casRunState(s, u = s + RS_LOCK))
if (U.compareAndSetLong(this, RUNSTATE, s, u = s + RS_LOCK))
return u;
waits = 0;
} else if (waits < SPIN_WAITS) {
@ -1775,7 +1736,7 @@ public class ForkJoinPool extends AbstractExecutorService
*/
final String nextWorkerThreadName() {
String prefix = workerNamePrefix;
long tid = incrementThreadIds() + 1L;
long tid = U.getAndAddLong(this, THREADIDS, 1L) + 1L;
if (prefix == null) // commonPool has no prefix
prefix = "ForkJoinPool.commonPool-worker-";
return prefix.concat(Long.toString(tid));
@ -1846,7 +1807,7 @@ public class ForkJoinPool extends AbstractExecutorService
releaseWaiters(); // ensure released
if (w == null || w.source != DROPPED) {
long c = ctl; // decrement counts
do {} while (c != (c = compareAndExchangeCtl(
do {} while (c != (c = U.compareAndExchangeLong(this, CTL,
c, ((RC_MASK & (c - RC_UNIT)) |
(TC_MASK & (c - TC_UNIT)) |
(LMASK & c)))));
@ -1875,14 +1836,16 @@ public class ForkJoinPool extends AbstractExecutorService
/**
* Releases an idle worker, or creates one if not enough exist,
* giving up q is nonull and signalled slot already taken.
* giving up on contention if q is nonull and signalled slot
* already taken.
*
* @param q, if nonnull, the WorkQueue containing signalled task
* @param qbase q's base index for the task
*/
final void signalWork(WorkQueue q, int qbase) {
int pc = parallelism;
for (long c = ctl;;) {
for (;;) {
long c = ctl;
WorkQueue[] qs = queues;
long ac = (c + RC_UNIT) & RC_MASK, nc;
int sp = (int)c, i = sp & SMASK;
@ -1902,7 +1865,7 @@ public class ForkJoinPool extends AbstractExecutorService
break;
else
nc = (v.stackPred & LMASK) | (c & TC_MASK) | ac;
if (c == (c = compareAndExchangeCtl(c, nc))) {
if (U.compareAndSetLong(this, CTL, c, nc)) {
if (v == null)
createWorker();
else {
@ -1926,7 +1889,7 @@ public class ForkJoinPool extends AbstractExecutorService
if ((sp = (int)c) == 0 || (qs = queues) == null ||
qs.length <= (i = sp & SMASK) || (v = qs[i]) == null)
break;
if (c == (c = compareAndExchangeCtl(
if (c == (c = U.compareAndExchangeLong(this, CTL,
c, ((UMASK & (c + RC_UNIT)) | (c & TC_MASK) |
(v.stackPred & LMASK))))) {
v.phase = sp;
@ -1972,7 +1935,8 @@ public class ForkJoinPool extends AbstractExecutorService
return 0;
else if ((ds = delayScheduler) != null && !ds.canShutDown())
return 0;
else if (compareAndSetCtl(c, c) && casRunState(e, e | STOP))
else if (U.compareAndSetLong(this, CTL, c, c) &&
U.compareAndSetLong(this, RUNSTATE, e, e | STOP))
return 1; // enable termination
else
break; // restart
@ -1991,6 +1955,7 @@ public class ForkJoinPool extends AbstractExecutorService
WorkQueue[] qs;
int r = w.stackPred; // seed from registerWorker
int fifo = (int)config & FIFO, rescans = 0, inactive = 0, taken = 0, n;
int src = -1; // last queue taken from
while ((runState & STOP) == 0L && (qs = queues) != null &&
(n = qs.length) > 0) {
int i = r, step = (r >>> 16) | 1;
@ -2024,12 +1989,14 @@ public class ForkJoinPool extends AbstractExecutorService
}
}
else if (U.compareAndSetReference(a, bp, t, null)) {
q.base = nb;
w.source = qid;
rescans = 1;
++taken;
if (U.getReferenceAcquire(a, np) != null)
Object nt = U.getReference(a, np);
U.getAndSetInt(q, WorkQueue.BASE, nb);
if (nt != null)
signalWork(q, nb); // propagate
++taken;
rescans = 1;
if (src != qid)
w.source = src = qid;
w.topLevelExec(t, fifo);
}
}
@ -2042,8 +2009,10 @@ public class ForkJoinPool extends AbstractExecutorService
if ((inactive = deactivate(w, taken)) != 0)
taken = 0;
}
else if (awaitWork(w) == 0)
else if (awaitWork(w) == 0) {
inactive = rescans = 0;
src = -1;
}
else
break;
}
@ -2059,12 +2028,13 @@ public class ForkJoinPool extends AbstractExecutorService
*/
private int deactivate(WorkQueue w, int taken) {
int inactive = 0, phase;
if (w != null && (inactive = (phase = w.phase) & IDLE) == 0) {
if (w != null && (inactive = // plain accesses until CAS
(phase = U.getInt(w, WorkQueue.PHASE)) & IDLE) == 0) {
long sp = (phase + (IDLE << 1)) & LMASK, pc, c;
w.phase = phase | IDLE;
w.stackPred = (int)(pc = ctl); // set ctl stack link
if (!compareAndSetCtl( // try to enqueue
pc, c = ((pc - RC_UNIT) & UMASK) | sp))
U.putInt(w, WorkQueue.PHASE, phase | IDLE);
w.stackPred = (int)(pc = ctl); // try to enqueue
if (!U.compareAndSetLong(this, CTL, pc,
c = ((pc - RC_UNIT) & UMASK) | sp))
w.phase = phase; // back out on contention
else {
if (taken != 0) {
@ -2098,7 +2068,8 @@ public class ForkJoinPool extends AbstractExecutorService
int sp = w.stackPred, phase, activePhase; long c;
if ((inactive = (phase = w.phase) & IDLE) != 0 &&
(int)(c = ctl) == (activePhase = phase + IDLE) &&
compareAndSetCtl(c, (sp & LMASK) | ((c + RC_UNIT) & UMASK))) {
U.compareAndSetLong(this, CTL, c,
(sp & LMASK) | ((c + RC_UNIT) & UMASK))) {
w.phase = activePhase;
inactive = 0;
}
@ -2159,12 +2130,12 @@ public class ForkJoinPool extends AbstractExecutorService
int vp, i; WorkQueue[] vs; WorkQueue v;
long nc = ((w.stackPred & LMASK) |
((RC_MASK & c) | (TC_MASK & (c - TC_UNIT))));
if (compareAndSetCtl(c, nc)) {
if (U.compareAndSetLong(this, CTL, c, nc)) {
w.source = DROPPED;
w.phase = activePhase;
if ((vp = (int)nc) != 0 && (vs = queues) != null &&
vs.length > (i = vp & SMASK) && (v = vs[i]) != null &&
compareAndSetCtl( // try to wake up next waiter
U.compareAndSetLong(this, CTL, // try to wake up next waiter
nc, ((v.stackPred & LMASK) |
((UMASK & (nc + RC_UNIT)) | (nc & TC_MASK))))) {
v.source = INVALID_ID; // enable cascaded timeouts
@ -2227,7 +2198,8 @@ public class ForkJoinPool extends AbstractExecutorService
WorkQueue[] qs; WorkQueue v; int i;
if ((qs = queues) != null && qs.length > (i = sp & SMASK) &&
(v = qs[i]) != null &&
compareAndSetCtl(c, (c & UMASK) | (v.stackPred & LMASK))) {
U.compareAndSetLong(this, CTL, c,
(c & UMASK) | (v.stackPred & LMASK))) {
v.phase = sp;
if (v.parking != 0)
U.unpark(v.owner);
@ -2235,17 +2207,18 @@ public class ForkJoinPool extends AbstractExecutorService
}
}
else if (active > minActive && total >= pc) { // reduce active workers
if (compareAndSetCtl(c, ((c - RC_UNIT) & RC_MASK) | (c & ~RC_MASK)))
if (U.compareAndSetLong(this, CTL, c,
((c - RC_UNIT) & RC_MASK) | (c & ~RC_MASK)))
stat = UNCOMPENSATE;
}
else if (total < maxTotal && total < MAX_CAP) { // try to expand pool
long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK);
if ((runState & STOP) != 0L) // terminating
stat = 0;
else if (compareAndSetCtl(c, nc))
else if (U.compareAndSetLong(this, CTL, c, nc))
stat = createWorker() ? UNCOMPENSATE : 0;
}
else if (!compareAndSetCtl(c, c)) // validate
else if (!U.compareAndSetLong(this, CTL, c, c)) // validate
;
else if ((sat = saturate) != null && sat.test(this))
stat = 0;
@ -2259,7 +2232,7 @@ public class ForkJoinPool extends AbstractExecutorService
* Readjusts RC count; called from ForkJoinTask after blocking.
*/
final void uncompensate() {
getAndAddCtl(RC_UNIT);
U.getAndAddLong(this, CTL, RC_UNIT);
}
/**
@ -2330,7 +2303,7 @@ public class ForkJoinPool extends AbstractExecutorService
}
if (U.compareAndSetReference(a, k, t, null)) {
q.base = b + 1;
w.source = j; // volatile write
w.source = j;
t.doExec();
w.source = wsrc;
rescan = true; // restart at index r
@ -2401,7 +2374,7 @@ public class ForkJoinPool extends AbstractExecutorService
if (eligible) {
if (U.compareAndSetReference(
a, k, t, null)) {
q.updateBase(b + 1);
q.base = b + 1;
t.doExec();
locals = rescan = true;
break scan;
@ -2484,7 +2457,7 @@ public class ForkJoinPool extends AbstractExecutorService
}
if (U.compareAndSetReference(a, k, t, null)) {
q.base = nb;
w.source = j; // volatile write
w.source = j;
t.doExec();
w.source = wsrc;
rescan = locals = true;
@ -2595,14 +2568,15 @@ public class ForkJoinPool extends AbstractExecutorService
* @param rejectOnShutdown true if RejectedExecutionException
* should be thrown when shutdown
*/
final WorkQueue externalSubmissionQueue(boolean rejectOnShutdown) {
final void externalPush(ForkJoinTask<?> task, boolean signalIfEmpty,
boolean rejectOnShutdown) {
int r;
if ((r = ThreadLocalRandom.getProbe()) == 0) {
ThreadLocalRandom.localInit(); // initialize caller's probe
r = ThreadLocalRandom.getProbe();
}
for (;;) {
WorkQueue q; WorkQueue[] qs; int n, id, i;
WorkQueue q; WorkQueue[] qs; int n, id, i, lock;
if ((qs = queues) == null || (n = qs.length) <= 0)
break;
if ((q = qs[i = (id = r & EXTERNAL_ID_MASK) & (n - 1)]) == null) {
@ -2612,31 +2586,27 @@ public class ForkJoinPool extends AbstractExecutorService
q = qs[i] = newq; // else lost race to install
unlockRunState();
}
if (q != null && q.tryLockPhase()) {
if (q != null && (lock = q.tryLockPhase()) != 1) {
int unlock = lock + NEXTIDLE;
if (rejectOnShutdown && (runState & SHUTDOWN) != 0L) {
q.unlockPhase(); // check while q lock held
break;
q.phase = unlock;
break; // check while q lock held
}
return q;
q.push(task, signalIfEmpty ? this : null, unlock);
return;
}
r = ThreadLocalRandom.advanceProbe(r); // move
}
throw new RejectedExecutionException();
}
private <T> ForkJoinTask<T> poolSubmit(boolean signalIfEmpty, ForkJoinTask<T> task) {
Thread t; ForkJoinWorkerThread wt; WorkQueue q; boolean internal;
private void poolSubmit(ForkJoinTask<?> task, boolean signalIfEmpty) {
Thread t; ForkJoinWorkerThread wt;
if (((t = JLA.currentCarrierThread()) instanceof ForkJoinWorkerThread) &&
(wt = (ForkJoinWorkerThread)t).pool == this) {
internal = true;
q = wt.workQueue;
}
else { // find and lock queue
internal = false;
q = externalSubmissionQueue(true);
}
q.push(task, signalIfEmpty ? this : null, internal);
return task;
(wt = (ForkJoinWorkerThread)t).pool == this)
wt.workQueue.push(task, signalIfEmpty ? this : null, 1);
else
externalPush(task, signalIfEmpty, true);
}
/**
@ -2751,7 +2721,8 @@ public class ForkJoinPool extends AbstractExecutorService
else if ((e & STOP) != 0L)
now = true;
else if (now) {
if (((ps = getAndBitwiseOrRunState(SHUTDOWN|STOP) & STOP)) == 0L) {
if (((ps = U.getAndBitwiseOrLong(this, RUNSTATE, SHUTDOWN | STOP)) &
STOP) == 0L) {
if ((ps & RS_LOCK) != 0L) {
spinLockRunState(); // ensure queues array stable after stop
unlockRunState();
@ -2762,7 +2733,7 @@ public class ForkJoinPool extends AbstractExecutorService
else if ((isShutdown = (e & SHUTDOWN)) != 0L || enable) {
long quiet; DelayScheduler ds;
if (isShutdown == 0L)
getAndBitwiseOrRunState(SHUTDOWN);
U.getAndBitwiseOrLong(this, RUNSTATE, SHUTDOWN);
if ((quiet = quiescent()) > 0)
now = true;
else if (quiet == 0 && (ds = delayScheduler) != null)
@ -2778,7 +2749,7 @@ public class ForkJoinPool extends AbstractExecutorService
if (((e = runState) & CLEANED) == 0L) {
boolean clean = cleanQueues();
if (((e = runState) & CLEANED) == 0L && clean)
e = getAndBitwiseOrRunState(CLEANED) | CLEANED;
e = U.getAndBitwiseOrLong(this, RUNSTATE, CLEANED) | CLEANED;
}
if ((e & TERMINATED) != 0L)
break;
@ -2788,7 +2759,8 @@ public class ForkJoinPool extends AbstractExecutorService
break;
if ((e & CLEANED) != 0L) {
e |= TERMINATED;
if ((getAndBitwiseOrRunState(TERMINATED) & TERMINATED) == 0L) {
if ((U.getAndBitwiseOrLong(this, RUNSTATE, TERMINATED) &
TERMINATED) == 0L) {
CountDownLatch done; SharedThreadContainer ctr;
if ((done = termination) != null)
done.countDown();
@ -2826,7 +2798,7 @@ public class ForkJoinPool extends AbstractExecutorService
a, k = slotOffset((cap - 1) & (b = q.base)));
if (q.base == b && t != null &&
U.compareAndSetReference(a, k, t, null)) {
q.updateBase(b + 1);
q.base = b + 1;
try {
t.cancel(false);
} catch (Throwable ignore) {
@ -2867,7 +2839,8 @@ public class ForkJoinPool extends AbstractExecutorService
private CountDownLatch terminationSignal() {
CountDownLatch signal, s, u;
if ((signal = termination) == null)
signal = ((u = cmpExTerminationSignal(
signal = ((u = (CountDownLatch)U.compareAndExchangeReference(
this, TERMINATION, null,
s = new CountDownLatch(1))) == null) ? s : u;
return signal;
}
@ -3035,7 +3008,7 @@ public class ForkJoinPool extends AbstractExecutorService
(((long)maxSpares) << TC_SHIFT) |
(((long)minAvail) << RC_SHIFT));
this.queues = new WorkQueue[size];
String pid = Integer.toString(getAndAddPoolIds(1) + 1);
String pid = Integer.toString(U.getAndAddInt(POOLIDS_BASE, POOLIDS, 1) + 1);
String name = "ForkJoinPool-" + pid;
this.poolName = name;
this.workerNamePrefix = name + "-worker-";
@ -3145,7 +3118,7 @@ public class ForkJoinPool extends AbstractExecutorService
* scheduled for execution
*/
public <T> T invoke(ForkJoinTask<T> task) {
poolSubmit(true, Objects.requireNonNull(task));
poolSubmit(Objects.requireNonNull(task), true);
try {
return task.join();
} catch (RuntimeException | Error unchecked) {
@ -3164,7 +3137,7 @@ public class ForkJoinPool extends AbstractExecutorService
* scheduled for execution
*/
public void execute(ForkJoinTask<?> task) {
poolSubmit(true, Objects.requireNonNull(task));
poolSubmit(Objects.requireNonNull(task), true);
}
// AbstractExecutorService methods
@ -3177,9 +3150,9 @@ public class ForkJoinPool extends AbstractExecutorService
@Override
@SuppressWarnings("unchecked")
public void execute(Runnable task) {
poolSubmit(true, (Objects.requireNonNull(task) instanceof ForkJoinTask<?>)
poolSubmit((Objects.requireNonNull(task) instanceof ForkJoinTask<?>)
? (ForkJoinTask<Void>) task // avoid re-wrap
: new ForkJoinTask.RunnableExecuteAction(task));
: new ForkJoinTask.RunnableExecuteAction(task), true);
}
/**
@ -3197,7 +3170,8 @@ public class ForkJoinPool extends AbstractExecutorService
* scheduled for execution
*/
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
return poolSubmit(true, Objects.requireNonNull(task));
poolSubmit(Objects.requireNonNull(task), true);
return task;
}
/**
@ -3208,11 +3182,12 @@ public class ForkJoinPool extends AbstractExecutorService
@Override
public <T> ForkJoinTask<T> submit(Callable<T> task) {
Objects.requireNonNull(task);
return poolSubmit(
true,
ForkJoinTask<T> t =
(Thread.currentThread() instanceof ForkJoinWorkerThread) ?
new ForkJoinTask.AdaptedCallable<T>(task) :
new ForkJoinTask.AdaptedInterruptibleCallable<T>(task));
new ForkJoinTask.AdaptedInterruptibleCallable<T>(task);
poolSubmit(t, true);
return t;
}
/**
@ -3223,11 +3198,12 @@ public class ForkJoinPool extends AbstractExecutorService
@Override
public <T> ForkJoinTask<T> submit(Runnable task, T result) {
Objects.requireNonNull(task);
return poolSubmit(
true,
ForkJoinTask<T> t =
(Thread.currentThread() instanceof ForkJoinWorkerThread) ?
new ForkJoinTask.AdaptedRunnable<T>(task, result) :
new ForkJoinTask.AdaptedInterruptibleRunnable<T>(task, result));
new ForkJoinTask.AdaptedInterruptibleRunnable<T>(task, result);
poolSubmit(t, true);
return t;
}
/**
@ -3239,13 +3215,14 @@ public class ForkJoinPool extends AbstractExecutorService
@SuppressWarnings("unchecked")
public ForkJoinTask<?> submit(Runnable task) {
Objects.requireNonNull(task);
return poolSubmit(
true,
ForkJoinTask<?> t =
(task instanceof ForkJoinTask<?>) ?
(ForkJoinTask<Void>) task : // avoid re-wrap
((Thread.currentThread() instanceof ForkJoinWorkerThread) ?
new ForkJoinTask.AdaptedRunnable<Void>(task, null) :
new ForkJoinTask.AdaptedInterruptibleRunnable<Void>(task, null)));
new ForkJoinTask.AdaptedInterruptibleRunnable<Void>(task, null));
poolSubmit(t, true);
return t;
}
/**
@ -3267,7 +3244,7 @@ public class ForkJoinPool extends AbstractExecutorService
*/
public <T> ForkJoinTask<T> externalSubmit(ForkJoinTask<T> task) {
Objects.requireNonNull(task);
externalSubmissionQueue(true).push(task, this, false);
externalPush(task, true, true);
return task;
}
@ -3288,7 +3265,8 @@ public class ForkJoinPool extends AbstractExecutorService
* @since 19
*/
public <T> ForkJoinTask<T> lazySubmit(ForkJoinTask<T> task) {
return poolSubmit(false, Objects.requireNonNull(task));
poolSubmit(Objects.requireNonNull(task), false);
return task;
}
/**
@ -3316,7 +3294,7 @@ public class ForkJoinPool extends AbstractExecutorService
throw new IllegalArgumentException();
if ((config & PRESET_SIZE) != 0)
throw new UnsupportedOperationException("Cannot override System property");
if ((prevSize = getAndSetParallelism(size)) < size)
if ((prevSize = U.getAndSetInt(this, PARALLELISM, size)) < size)
signalWork(null, 0); // trigger worker activation
return prevSize;
}
@ -3351,7 +3329,7 @@ public class ForkJoinPool extends AbstractExecutorService
for (Callable<T> t : tasks) {
ForkJoinTask<T> f = ForkJoinTask.adapt(t);
futures.add(f);
poolSubmit(true, f);
poolSubmit(f, true);
}
for (int i = futures.size() - 1; i >= 0; --i)
((ForkJoinTask<?>)futures.get(i)).quietlyJoin();
@ -3374,7 +3352,7 @@ public class ForkJoinPool extends AbstractExecutorService
for (Callable<T> t : tasks) {
ForkJoinTask<T> f = ForkJoinTask.adaptInterruptible(t);
futures.add(f);
poolSubmit(true, f);
poolSubmit(f, true);
}
for (int i = futures.size() - 1; i >= 0; --i)
((ForkJoinTask<?>)futures.get(i))
@ -3490,7 +3468,7 @@ public class ForkJoinPool extends AbstractExecutorService
* elapsed
*/
final void executeEnabledScheduledTask(ScheduledForkJoinTask<?> task) {
externalSubmissionQueue(false).push(task, this, false);
externalPush(task, true, false);
}
/**
@ -3731,7 +3709,8 @@ public class ForkJoinPool extends AbstractExecutorService
onTimeout.task = task =
new ForkJoinTask.CallableWithTimeout<V>(callable, timeoutTask);
scheduleDelayedTask(timeoutTask);
return poolSubmit(true, task);
poolSubmit(task, true);
return task;
}
/**
@ -3778,7 +3757,7 @@ public class ForkJoinPool extends AbstractExecutorService
* @return the targeted parallelism level of this pool
*/
public int getParallelism() {
return Math.max(getParallelismOpaque(), 1);
return Math.max(U.getIntOpaque(this, PARALLELISM), 1);
}
/**
@ -4330,7 +4309,7 @@ public class ForkJoinPool extends AbstractExecutorService
done = blocker.block();
} finally {
if (comp > 0)
getAndAddCtl(RC_UNIT);
U.getAndAddLong(this, CTL, RC_UNIT);
}
if (done)
break;
@ -4357,7 +4336,7 @@ public class ForkJoinPool extends AbstractExecutorService
*/
void endCompensatedBlock(long post) {
if (post > 0L) {
getAndAddCtl(post);
U.getAndAddLong(this, CTL, post);
}
}

View File

@ -641,15 +641,10 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
*/
public final ForkJoinTask<V> fork() {
Thread t; ForkJoinWorkerThread wt;
ForkJoinPool p; ForkJoinPool.WorkQueue q; boolean internal;
if (internal =
(t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
q = (wt = (ForkJoinWorkerThread)t).workQueue;
p = wt.pool;
}
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
((wt = (ForkJoinWorkerThread)t).workQueue).push(this, wt.pool, 1);
else
q = (p = ForkJoinPool.common).externalSubmissionQueue(false);
q.push(this, p, internal);
ForkJoinPool.common.externalPush(this, true, false);
return this;
}