Avoid yield, for performance test

This commit is contained in:
Doug Lea 2026-01-27 07:06:26 -05:00
parent 04928c9449
commit a7f1d63f6d

View File

@ -1055,7 +1055,6 @@ public class ForkJoinPool extends AbstractExecutorService
// masks and sentinels for queue indices
static final int MAX_CAP = 0x7fff; // max # workers
static final int EXTERNAL_ID_MASK = 0x3ffe; // max external queue id
static final int INVALID_ID = 0x4000; // unused external queue id
// pool.runState bits
static final long STOP = 1L << 0; // terminating
@ -1193,7 +1192,7 @@ public class ForkJoinPool extends AbstractExecutorService
@jdk.internal.vm.annotation.Contended("w")
int nsteals; // number of steals from other queues
@jdk.internal.vm.annotation.Contended("w")
int dropOnEmptyScan; // nonzero if trimmable
int trimStatus; // <0: drop on empty scan; >0 dropped, else 0
// Support for atomic operations
private static final Unsafe U;
@ -1300,7 +1299,7 @@ public class ForkJoinPool extends AbstractExecutorService
return;
}
}
--top; // back out
U.putIntOpaque(this, TOP, top - 1); // backout
if (unlock != 1)
phase = unlock;
throw new RejectedExecutionException("Queue capacity exceeded");
@ -1784,7 +1783,7 @@ public class ForkJoinPool extends AbstractExecutorService
if (wt != null && (w = wt.workQueue) != null &&
(phase = w.phase) != 0 && (phase & IDLE) != 0)
releaseWaiters(); // ensure released
if (w == null || w.source != DROPPED) {
if (w == null || w.trimStatus <= 0) {
long c = ctl; // decrement counts
do {} while (c != (c = U.compareAndExchangeLong(this, CTL,
c, ((RC_MASK & (c - RC_UNIT)) |
@ -1805,7 +1804,7 @@ public class ForkJoinPool extends AbstractExecutorService
}
}
if ((tryTerminate(false, false) & STOP) == 0L &&
phase != 0 && w != null && w.source != DROPPED) {
phase != 0 && w != null && w.trimStatus <= 0) {
w.cancelTasks(); // clean queue
signalWork(null, 0); // possibly replace
}
@ -1822,8 +1821,8 @@ public class ForkJoinPool extends AbstractExecutorService
* @param base src's base index for the task
*/
final void signalWork(WorkQueue src, int base) {
int pc = parallelism, i, sp; // rely on caller sync for initial reads
long c = U.getLong(this, CTL);
int pc = parallelism, i, sp;
long c = ctl;
WorkQueue[] qs = queues;
while ((short)(c >>> RC_SHIFT) < pc && qs != null &&
qs.length > (i = (sp = (int)c) & SMASK)) {
@ -1848,7 +1847,7 @@ public class ForkJoinPool extends AbstractExecutorService
break;
}
qs = queues;
if (src != null && src.base - base > 0)
if (src != null && src.base - base > 0)
break;
}
}
@ -1943,7 +1942,7 @@ public class ForkJoinPool extends AbstractExecutorService
if ((q = qs[qid = i & (n - 1)]) != null) {
ForkJoinTask<?>[] a; int pb = -1, ran = 0, cap;
while ((a = q.array) != null && (cap = a.length) > 0) {
int b, nb; long bp, np, ps; ForkJoinTask<?> t;
int b, nb; long bp, np; ForkJoinTask<?> t;
t = (ForkJoinTask<?>)U.getReferenceAcquire(
a, bp = slotOffset((cap - 1) & (b = q.base)));
Object nt = U.getReference(
@ -1956,40 +1955,44 @@ public class ForkJoinPool extends AbstractExecutorService
break scan; // can't take yet
}
else if (qb != b) { // inconsistent
if (taken == 0)
if (src < 0)
break scan; // busy
}
else if (t == null) {
if (U.getReference(a, bp) == null) {
if (ran != 0) // end run on this queue
break scan;
if (nt == null) // probably empty
if (nt == null) { // probably empty
if (ran != 0) // end run on this queue
break scan;
break;
}
if (pb == (pb = b))
break scan; // stalled; reorder scan
}
}
else if (U.compareAndSetReference(a, bp, t, null)) {
nt = U.getReference(a, np);
U.getAndSetInt(q, WorkQueue.BASE, nb);
if (qid != (ps = src))
Object rnt = U.getReference(a, np); // reread
U.getAndSetInt(q, WorkQueue.BASE, pb = nb);
boolean propagate = rnt != null &&
(qid != src ||
((qid & 1) == 0 && (fifo != 0 || taken == 0)));
ran = 1;
++taken;
if (qid != src)
U.putIntOpaque(w, WorkQueue.SOURCE, src = qid);
if (nt != null &&
(qid != ps ||
((qid & 1) == 0 && (fifo != 0 || taken == 0))) &&
U.getReferenceAcquire(a, np) != null)
signalWork(q, nb); // propagate
ran = ++taken;
if (propagate && U.getReferenceAcquire(a, np) != null)
signalWork(q, nb);
w.topLevelExec(t, fifo);
}
}
}
if (--j == 0) { // empty scan
if (idle == 0) {
idle = tryDeactivate(w, taken);
if (taken != 0) {
w.nsteals += taken;
taken = 0;
}
else if ((idle = awaitWork(w)) != 0)
else if (idle == 0)
idle = tryDeactivate(w);
else if ((idle = awaitWork(w, src)) != 0)
break rescan; // trimmed or terminated
else
src = -1;
@ -2001,40 +2004,29 @@ public class ForkJoinPool extends AbstractExecutorService
}
/**
* Possibly deactivates or pauses worker
* Possibly deactivates worker
*
* @param w the work queue
* @param taken number of tasks taken since last activate
* @return 0 if now active
*/
private int tryDeactivate(WorkQueue w, int taken) {
private int tryDeactivate(WorkQueue w) {
int idle = 0;
if (w != null) { // always true; hoist checks
if (taken != 0) { // rescan before deactivating
w.nsteals += taken;
if ((w.config & CLEAR_TLS) != 0 &&
(Thread.currentThread() instanceof ForkJoinWorkerThread f))
f.resetThreadLocals(); // (instanceof check always true)
if ((runState & (SHUTDOWN|STOP)) == 0L)
Thread.yield(); // pause before rescan
}
else {
int phase = U.getInt(w, WorkQueue.PHASE);
long sp = (phase + NEXTIDLE) & LMASK, pc = ctl;
U.putInt(w, WorkQueue.PHASE, phase | IDLE);
for (;;) { // enqueue
w.stackPred = (int)pc;
long c = (pc - RC_UNIT) & UMASK | sp, ac = c & RC_MASK;
if (pc == (pc = U.compareAndExchangeLong(this, CTL, pc, c))) {
if (ac == 0L && (runState & (SHUTDOWN|STOP)) == SHUTDOWN)
quiescent(); // check quiescent termination
idle = w.phase & IDLE;
break;
}
else if (ac < (pc & RC_MASK)) {
w.phase = phase; // back out if lost to signal
break;
}
if (w != null) { // always true; hoist checks
int phase = U.getInt(w, WorkQueue.PHASE);
long sp = (phase + NEXTIDLE) & LMASK, pc = ctl;
U.putInt(w, WorkQueue.PHASE, phase | IDLE);
for (;;) { // enqueue
w.stackPred = (int)pc;
long c = (pc - RC_UNIT) & UMASK | sp, ac = c & RC_MASK;
if (pc == (pc = U.compareAndExchangeLong(this, CTL, pc, c))) {
if (ac == 0L && (runState & (SHUTDOWN|STOP)) == SHUTDOWN)
quiescent(); // check quiescent termination
idle = w.phase & IDLE;
break;
}
else if (ac < (pc & RC_MASK)) {
w.phase = phase; // back out if lost to signal
break;
}
}
}
@ -2068,36 +2060,36 @@ public class ForkJoinPool extends AbstractExecutorService
* @param w the work queue
* @return 0 if now active
*/
private int awaitWork(WorkQueue w) {
private int awaitWork(WorkQueue w, int src) {
int idle = 0, phase;
if (w != null && (idle = (phase = w.phase) & IDLE) != 0) {
ForkJoinWorkerThread t;
int activePhase = phase + IDLE, trim;
if ((trim = w.dropOnEmptyScan) != 0)
w.dropOnEmptyScan = 0;
if ((trim = w.trimStatus) != 0)
w.trimStatus = 0;
if (src >= 0 && (w.config & CLEAR_TLS) != 0 && (t = w.owner) != null)
t.resetThreadLocals(); // clear before reactivate
for (long deadline = 0L;;) {
Thread.interrupted(); // clear status
if ((runState & STOP) != 0L)
break;
boolean trimmable = false;
boolean trimmable = false; // true if at head of ctl and quiescent
long d = 0L, c;
int ac = (short)((c = ctl) >>> RC_SHIFT);
int spins = ((short)(c >>> TC_SHIFT) | 1) & SMASK; // >= # workers
if ((int)c == activePhase) { // at head of ctl
spins += Math.max(spins << 1, SPIN_WAITS); // approx 1 scan cost
if (ac == 0) { // quiescent
long now = System.currentTimeMillis();
if (deadline == 0L)
d = deadline = now + keepAlive;
else if ((d = deadline) - now <= TIMEOUT_SLOP)
trim = 1;
if (trim != 0 && tryTrim(w, c, activePhase))
break;
trim = 0;
trimmable = true;
}
if (((c = ctl) & RC_MASK) == 0L && (int)c == activePhase) {
long now = System.currentTimeMillis();
if (deadline == 0L)
d = deadline = now + keepAlive;
else if ((d = deadline) - now <= TIMEOUT_SLOP)
trim = 1;
if (trim != 0 && tryTrim(w, c, activePhase))
break;
trim = 0;
trimmable = true;
}
int tc = ((short)(c >>> TC_SHIFT) | 1) & SMASK; // >= # workers
int spins = tc + Math.max(tc << 1, SPIN_WAITS);
while ((idle = w.phase & IDLE) != 0 && --spins != 0)
Thread.onSpinWait();
Thread.onSpinWait(); // spin for approx 1 scan cost
if (idle == 0)
break;
LockSupport.setCurrentBlocker(this);
@ -2124,14 +2116,14 @@ public class ForkJoinPool extends AbstractExecutorService
long nc = ((w.stackPred & LMASK) |
((RC_MASK & c) | (TC_MASK & (c - TC_UNIT))));
if (U.compareAndSetLong(this, CTL, c, nc)) {
w.source = DROPPED;
w.trimStatus = 1;
w.phase = activePhase;
if ((vp = (int)nc) != 0 && (vs = queues) != null &&
vs.length > (i = vp & SMASK) && (v = vs[i]) != null &&
U.compareAndSetLong(this, CTL, // try to wake up next waiter
nc, ((v.stackPred & LMASK) |
((UMASK & (nc + RC_UNIT)) | (nc & TC_MASK))))) {
v.dropOnEmptyScan = 1;
v.trimStatus = -1;
v.phase = vp;
U.unpark(v.owner);
}