Improve ramp-up/down

This commit is contained in:
Doug Lea 2026-04-27 08:07:02 -04:00
parent d6890230a1
commit c451e61e67

View File

@ -1210,7 +1210,9 @@ public class ForkJoinPool extends AbstractExecutorService
int nsteals; // number of steals from other queues
@jdk.internal.vm.annotation.Contended("w")
volatile int parking; // next phase if parked in awaitWork
@jdk.internal.vm.annotation.Contended("w")
int prevSteals; // to track steals across phases
// Support for atomic operations
private static final Unsafe U;
private static final long PHASE;
@ -1968,24 +1970,25 @@ public class ForkJoinPool extends AbstractExecutorService
if (w != null) {
int phase = w.phase, r = w.stackPred; // seed from registerWorker
int fifo = w.config & FIFO, nsteals = 0, src = -1;
for (;;) {
WorkQueue[] qs;
for (int i = r;;) { // i is scan origin
long e = runState;
WorkQueue[] qs = queues;
r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
if ((runState & STOP) != 0L || (qs = queues) == null)
int step = (r >>> 16) | 1, n;
if ((e & STOP) != 0L || qs == null || (n = qs.length) <= 0)
break;
int n = qs.length, i = r, step = (r >>> 16) | 1;
boolean rescan = false;
scan: for (int l = n; l > 0; --l, i += step) { // scan queues
boolean rescan = false; // scan queues
scan: for (int j = (src < 0) ? n : n << 1; j != 0; --j, i += step) {
int qid, cap; WorkQueue q; ForkJoinTask<?>[] a;
if ((q = qs[qid = i & (n - 1)]) != null &&
(a = q.array) != null && (cap = a.length) > 0) {
for (int m = cap - 1, pb = -1, b = q.base;;) {
ForkJoinTask<?> t; long k;
for (int m = cap - 1, pb = -1, b = q.base; ; pb = b) {
ForkJoinTask<?> t; long k; int nb;
t = (ForkJoinTask<?>)U.getReferenceAcquire(
a, k = slotOffset(m & b));
if (b != (b = q.base)) { // inconsistent / busy
if (b != (b = q.base)) { // inconsistent / busy
if (src != qid) {
rescan = true; // reduce interference
rescan = true; // reduce interference
break scan;
}
}
@ -1994,69 +1997,66 @@ public class ForkJoinPool extends AbstractExecutorService
if (a[b & m] == null) {
if (a[(b + 1) & m] == null &&
a[(b + 2) & m] == null) {
if (rescan) // end of run
if (rescan) // end of run
break scan;
break; // probably empty
break; // probably empty
}
if (src != qid || pb == (pb = b)) {
rescan = true; // reorder scan
break scan;
if (src != qid || pb == b) {
rescan = true;
break scan; // reorder scan
}
}
}
else {
boolean propagate;
long np = slotOffset((pb = b + 1) & m);
boolean more = (U.getReferenceVolatile(a, np) != null);
q.base = pb;
Object nt = U.getReferenceVolatile(
a, slotOffset((nb = b + 1) & m));
q.base = nb;
w.nsteals = ++nsteals;
int prevSrc = src;
w.source = src = qid; // volatile
rescan = true;
if (propagate =
(more &&
(prevSrc != qid ||
((qid & 1) == 0 &&
(fifo != 0 || t.noUserHelp() != 0)))))
signalWork();
w.source = src = qid; // volatile
if (nt != null &&
(qid != prevSrc ||
((qid & 1) == 0 &&
(fifo != 0 || t.noUserHelp() != 0))))
signalWork(); // propagate
w.topLevelExec(t, fifo);
if ((b = q.base) != pb && !propagate)
break scan; // reduce interference
rescan = true;
b = q.base;
}
}
}
}
if (!rescan) {
if (((phase = deactivate(w, phase)) & IDLE) != 0)
break;
src = -1; // re-enable propagation
i = r; // move unless deactivated
if (!rescan && runState == e &&
phase != (phase = deactivate(w, phase))) {
if ((phase & IDLE) != 0)
break; // terminated
if (src >= 0) {
i = src; // restart at last src
src = -1; // re-enable propagation
}
}
}
}
}
/**
* Deactivates and if necessary awaits signal or termination.
* Deactivates unless contended and if necessary awaits signal or termination.
*
* @param w the worker
* @param phase current phase
* @return current phase, with IDLE set if worker should exit
*/
private int deactivate(WorkQueue w, int phase) {
if (w == null) // currently impossible
if (w == null) // currently impossible
return IDLE;
w.phase = phase | IDLE;
int activePhase = phase + (IDLE << 1);
long pc = ctl, qc, qac;
for (;;) { // enqueue
w.stackPred = (int)pc;
qac = (qc = (pc - RC_UNIT) & UMASK | (activePhase & LMASK)) & RC_MASK;
if (pc == (pc = U.compareAndExchangeLong(this, CTL, pc, qc)))
break;
else if (qac < (pc & RC_MASK))
return w.phase = phase; // back out if lost to signal
}
long e; WorkQueue[] qs; int n;
long pc = ctl, qc = ((pc - RC_UNIT) & UMASK) | (activePhase & LMASK);
w.stackPred = (int)pc; // try to enqueue
w.phase = phase | IDLE;
if (!U.compareAndSetLong(this, CTL, pc, qc))
return w.phase = phase; // back out on contention
long qac = qc & RC_MASK, e; WorkQueue[] qs; int n;
if (((e = runState) & STOP) != 0L ||
((e & SHUTDOWN) != 0L && qac == 0L && quiescent() > 0) ||
(qs = queues) == null || (n = qs.length) <= 0)
@ -2092,60 +2092,63 @@ public class ForkJoinPool extends AbstractExecutorService
}
}
}
return ((phase = w.phase) == activePhase) ? activePhase : awaitWork(w, phase);
return awaitWork(w);
}
/**
* Awaits signal or termination.
*
* @param w the work queue
* @param phase current phase (known to be idle)
* @return current phase, with IDLE set if worker should exit
*/
private int awaitWork(WorkQueue w, int phase) {
if (w != null) {
int activePhase = phase + IDLE, parking = 0;
private int awaitWork(WorkQueue w) {
int phase = IDLE;
if ((runState & STOP) == 0L && w != null && ((phase = w.phase) & IDLE) != 0) {
boolean trim = false;
int ns = w.nsteals, nstolen = ns - w.prevSteals;
if (nstolen != 0) {
w.prevSteals = ns;
if ((w.config & CLEAR_TLS) != 0 &&
(Thread.currentThread() instanceof ForkJoinWorkerThread f))
f.resetThreadLocals(); // (instanceof check always true)
}
else if (w.source == INVALID_ID)
trim = true;
long deadline = 0L;
boolean trim = (w.source == INVALID_ID);
if ((w.config & CLEAR_TLS) != 0 &&
(Thread.currentThread() instanceof ForkJoinWorkerThread f))
f.resetThreadLocals(); // (instanceof check always true)
int parkEnabled = 0, activePhase = phase + IDLE;
while ((phase = w.phase) != activePhase) {
boolean trimmable = false; // true if at ctl head and quiescent
long d = 0L, c;
boolean head; // true if at ctl head
boolean trimmable = false; // true if at head and quiescent
Thread.interrupted(); // clear status
if ((runState & STOP) != 0L)
break;
if ((int)(c = ctl) == activePhase) {
if ((c & RC_MASK) == 0L) {
long now = System.currentTimeMillis();
if (deadline == 0L)
d = deadline = now + keepAlive;
else if ((d = deadline) - now <= TIMEOUT_SLOP)
trim = true;
if (trim && tryTrim(w, c, activePhase))
break;
trim = false;
trimmable = true;
}
if (parking == 0) { // spin at head for approx 1 scan cost
int tc = ((short)(c >>> TC_SHIFT) | 1) & SMASK;
int spins = tc + (tc << 1);
while ((phase = w.phase) != activePhase && --spins != 0)
Thread.onSpinWait();
if (phase == activePhase)
break;
}
long d = 0L, c = ctl;
if ((head = ((int)c == activePhase)) && (c & RC_MASK) == 0L) {
long now = System.currentTimeMillis();
if (deadline == 0L)
d = deadline = now + keepAlive;
else if ((d = deadline) - now <= TIMEOUT_SLOP)
trim = true;
if (trim && tryTrim(w, c, activePhase))
break;
trim = false;
trimmable = true;
}
if (parking == 0) { // enable unpark
int spins = (parkEnabled != 0 || (nstolen == 0 && !head)) ? 1 :
SPIN_WAITS + (((short)(c >>> TC_SHIFT)) & SMASK);
while ((phase = w.phase) != activePhase && --spins != 0)
Thread.onSpinWait(); // spin if ran or at head
if (phase == activePhase)
break;
if (parkEnabled == 0) {
LockSupport.setCurrentBlocker(this);
w.parking = parking = activePhase;
w.parking = parkEnabled = activePhase;
if ((phase = w.phase) == activePhase)
break;
}
U.park(trimmable, d);
}
if (parking != 0) {
if (parkEnabled != 0) {
w.parking = 0;
LockSupport.setCurrentBlocker(null);
}