mirror of
https://github.com/openjdk/jdk.git
synced 2026-01-28 12:09:14 +00:00
Simplify scan mode control by moving and reworking topLevelExec and throwing on trim
This commit is contained in:
parent
88f1466dc7
commit
a1e5ce94e0
@ -1168,6 +1168,12 @@ public class ForkJoinPool extends AbstractExecutorService
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Exception thrown in tryTrim after idle timeout
|
||||
*/
|
||||
@SuppressWarnings("serial")
|
||||
static final class WorkerTrimmedException extends RuntimeException { }
|
||||
|
||||
/**
|
||||
* Queues supporting work-stealing as well as external task
|
||||
* submission. See above for descriptions and algorithms.
|
||||
@ -1309,23 +1315,20 @@ public class ForkJoinPool extends AbstractExecutorService
|
||||
* so acts as either local-pop or local-poll. Called only by owner.
|
||||
* @param fifo nonzero if FIFO mode
|
||||
*/
|
||||
private ForkJoinTask<?> nextLocalTask(int fifo) {
|
||||
final ForkJoinTask<?> nextLocalTask(int fifo) {
|
||||
ForkJoinTask<?> t = null;
|
||||
ForkJoinTask<?>[] a = array;
|
||||
int b = base, s = top - 1, cap;
|
||||
if (a != null && s - b >= 0 && (cap = a.length) > 0) {
|
||||
if (fifo == 0) {
|
||||
if ((t = (ForkJoinTask<?>)U.getAndSetReference(
|
||||
a, slotOffset((cap - 1) & s), null)) != null) {
|
||||
top = s;
|
||||
U.storeFence();
|
||||
}
|
||||
a, slotOffset((cap - 1) & s), null)) != null)
|
||||
U.putIntOpaque(this, TOP, s);
|
||||
} else {
|
||||
do {
|
||||
if ((t = (ForkJoinTask<?>)U.getAndSetReference(
|
||||
a, slotOffset((cap - 1) & b), null)) != null) {
|
||||
base = b + 1;
|
||||
U.storeFence();
|
||||
U.putIntVolatile(this, BASE, b + 1);
|
||||
break;
|
||||
}
|
||||
if (b == s)
|
||||
@ -1359,8 +1362,7 @@ public class ForkJoinPool extends AbstractExecutorService
|
||||
(internal || (lock = tryLockPhase()) != 1)) {
|
||||
if (top == p && U.compareAndSetReference(a, k, task, null)) {
|
||||
taken = true;
|
||||
top = s;
|
||||
U.storeFence();
|
||||
U.putIntOpaque(this, TOP, s);
|
||||
}
|
||||
if (!internal)
|
||||
phase = lock + NEXTIDLE;
|
||||
@ -1409,8 +1411,7 @@ public class ForkJoinPool extends AbstractExecutorService
|
||||
Thread.onSpinWait(); // stalled
|
||||
}
|
||||
else if (U.compareAndSetReference(a, k, t, null)) {
|
||||
base = nb;
|
||||
U.storeFence();
|
||||
U.putIntVolatile(this, BASE, nb);
|
||||
return t;
|
||||
}
|
||||
}
|
||||
@ -1419,16 +1420,6 @@ public class ForkJoinPool extends AbstractExecutorService
|
||||
|
||||
// specialized execution methods
|
||||
|
||||
/**
|
||||
* Runs the given task, as well as remaining local tasks
|
||||
*/
|
||||
final void topLevelExec(ForkJoinTask<?> task, int fifo) {
|
||||
while (task != null) {
|
||||
task.doExec();
|
||||
task = nextLocalTask(fifo);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Deep form of tryUnpush: Traverses from top and removes and
|
||||
* runs task if present.
|
||||
@ -1508,10 +1499,8 @@ public class ForkJoinPool extends AbstractExecutorService
|
||||
break;
|
||||
if (taken =
|
||||
(top == p &&
|
||||
U.compareAndSetReference(a, k, t, null))) {
|
||||
top = s;
|
||||
U.storeFence();
|
||||
}
|
||||
U.compareAndSetReference(a, k, t, null)))
|
||||
U.putIntOpaque(this, TOP, s);
|
||||
if (!internal)
|
||||
phase = lock + NEXTIDLE;
|
||||
if (!taken)
|
||||
@ -1548,8 +1537,7 @@ public class ForkJoinPool extends AbstractExecutorService
|
||||
break;
|
||||
if (base == b && t != null &&
|
||||
U.compareAndSetReference(a, k, t, null)) {
|
||||
base = b + 1;
|
||||
U.storeFence();
|
||||
U.putIntVolatile(this, BASE, b + 1);
|
||||
t.doExec();
|
||||
}
|
||||
}
|
||||
@ -1787,6 +1775,8 @@ public class ForkJoinPool extends AbstractExecutorService
|
||||
final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
|
||||
WorkQueue w = null; // null if not created
|
||||
int phase = 0; // 0 if not registered
|
||||
if (ex instanceof WorkerTrimmedException)
|
||||
ex = null;
|
||||
if (wt != null && (w = wt.workQueue) != null &&
|
||||
(phase = w.phase) != 0 && (phase & IDLE) != 0)
|
||||
releaseWaiters(); // ensure released
|
||||
@ -1828,8 +1818,8 @@ public class ForkJoinPool extends AbstractExecutorService
|
||||
* @param base src's base index for the task
|
||||
*/
|
||||
final void signalWork(WorkQueue src, int base) {
|
||||
int pc = parallelism, i, sp; // rely on caller sync for initial reads
|
||||
long c = U.getLong(this, CTL);
|
||||
int pc = parallelism, i, sp;
|
||||
long c = U.getLong(this, CTL); // rely on caller sync for initial reads
|
||||
WorkQueue[] qs = queues;
|
||||
while ((short)(c >>> RC_SHIFT) < pc && qs != null &&
|
||||
qs.length > (i = (sp = (int)c) & SMASK)) {
|
||||
@ -1935,14 +1925,14 @@ public class ForkJoinPool extends AbstractExecutorService
|
||||
final void runWorker(WorkQueue w) {
|
||||
if (w != null) {
|
||||
int r = w.stackPred; // seed from registerWorker
|
||||
int fifo = (int)config & FIFO;
|
||||
int src = 0, idle = 0, rescans = 0, taken = 0;
|
||||
for (;;) {
|
||||
int fifo = (int)config & FIFO, idle = 0;
|
||||
for (boolean scanned = false;;) {
|
||||
WorkQueue[] qs;
|
||||
long e = runState;
|
||||
int n = ((qs = queues) == null) ? 0 : qs.length;
|
||||
r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
|
||||
int i = r, step = (r >>> 16) | 1;
|
||||
boolean found = false;
|
||||
if ((e & STOP) != 0L || n <= 0)
|
||||
break;
|
||||
scan: for (int j = n; j != 0; --j, i += step) {
|
||||
@ -1950,91 +1940,127 @@ public class ForkJoinPool extends AbstractExecutorService
|
||||
if ((q = qs[qid = i & (n - 1)]) != null) {
|
||||
ForkJoinTask<?>[] a; int cap; // poll queue
|
||||
while ((a = q.array) != null && (cap = a.length) > 0) {
|
||||
int b, nb; long bp; ForkJoinTask<?> t;
|
||||
int b, nb; long bp, np; ForkJoinTask<?> t;
|
||||
t = (ForkJoinTask<?>)U.getReferenceAcquire(
|
||||
a, bp = slotOffset((cap - 1) & (b = q.base)));
|
||||
long np = slotOffset((nb = b + 1) & (cap - 1));
|
||||
Object nt = U.getReference(
|
||||
a, np = slotOffset((cap - 1) & (nb = b + 1)));
|
||||
if (q.array == a && q.base == b &&
|
||||
U.getReference(a, bp) == t) {
|
||||
if (t == null) {
|
||||
if (rescans > 0) // ran or stalled
|
||||
break scan;
|
||||
if (U.getReference(a, np) == null &&
|
||||
(rescans == 0 || q.top == b))
|
||||
break; // retry at most twice
|
||||
++rescans; // may be stalled
|
||||
if (nt == null && (!scanned || q.top == b)) {
|
||||
found = false;
|
||||
break;
|
||||
}
|
||||
if (found)
|
||||
break scan; // stall check
|
||||
found = true;
|
||||
}
|
||||
else if (idle != 0) {
|
||||
if ((idle = tryReactivate(w)) != 0) {
|
||||
rescans = 1;
|
||||
found = true;
|
||||
if ((idle = tryReactivate(w)) != 0)
|
||||
break scan; // can't take yet
|
||||
}
|
||||
rescans = 0;
|
||||
}
|
||||
else if (U.compareAndSetReference(a, bp, t, null)) {
|
||||
Object nt = U.getReference(a, np);
|
||||
nt = U.getReference(a, np);
|
||||
U.getAndSetInt(q, WorkQueue.BASE, nb);
|
||||
if (nt != null) // propagate
|
||||
signalWork(q, nb);
|
||||
rescans = 1;
|
||||
if (taken++ == 0 || src != qid)
|
||||
w.source = src = qid;
|
||||
w.topLevelExec(t, fifo);
|
||||
topLevelExec(t, w, q, fifo, qid);
|
||||
found = true;
|
||||
break scan;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (rescans > 0)
|
||||
rescans = 0;
|
||||
if (found)
|
||||
scanned = false;
|
||||
else if (!scanned && idle == 0)
|
||||
scanned = true;
|
||||
else if (!scanned || idle == 0) {
|
||||
if ((idle = deactivate(w, idle)) == 0)
|
||||
scanned = false;
|
||||
}
|
||||
else {
|
||||
if (rescans == 0)
|
||||
rescans = -1;
|
||||
else if (idle == 0) {
|
||||
if ((idle = deactivate(w)) == 0)
|
||||
rescans = 0;
|
||||
}
|
||||
else if ((idle = rescans = awaitWork(w)) != 0)
|
||||
awaitWork(w);
|
||||
idle = 0;
|
||||
scanned = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void topLevelExec(ForkJoinTask<?> t, WorkQueue w, WorkQueue q,
|
||||
int fifo, int qid) {
|
||||
if (t != null && q != null && w != null) { // always true; hoist checks
|
||||
w.source = qid;
|
||||
int taken = 1;
|
||||
for (;;) {
|
||||
t.doExec();
|
||||
if ((t = w.nextLocalTask(fifo)) == null) {
|
||||
ForkJoinTask<?>[] a; int cap, b, nb; long bp, np;
|
||||
if ((a = q.array) == null || (cap = a.length) <= 0)
|
||||
break; // similar to runWorker scan
|
||||
t = (ForkJoinTask<?>)U.getReferenceAcquire(
|
||||
a, bp = slotOffset((cap - 1) & (b = q.base)));
|
||||
Object nt = U.getReference(
|
||||
a, np = slotOffset((cap - 1) & (nb = b + 1)));
|
||||
if (t == null || q.base != b ||
|
||||
!U.compareAndSetReference(a, bp, t, null))
|
||||
break;
|
||||
if (idle != 0) {
|
||||
int ns = taken;
|
||||
taken = 0;
|
||||
if ((idle = onEmptyScan(w, ns)) == 0)
|
||||
rescans = 0;
|
||||
}
|
||||
nt = U.getReference(a, np);
|
||||
U.getAndSetInt(q, WorkQueue.BASE, nb);
|
||||
++taken;
|
||||
if (nt != null && // prevent stalls
|
||||
(fifo != 0 || (t instanceof ForkJoinTask.InterruptibleTask)))
|
||||
signalWork(q, nb);
|
||||
}
|
||||
}
|
||||
w.nsteals += taken;
|
||||
if ((w.config & CLEAR_TLS) != 0 &&
|
||||
(Thread.currentThread() instanceof ForkJoinWorkerThread f))
|
||||
f.resetThreadLocals(); // (instanceof check always true)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Deactivates and enqueues worker, possibly backing out on signal
|
||||
* contention.
|
||||
* Possibly deactivates and pauses worker
|
||||
*
|
||||
* @param w the work queue
|
||||
* @return active status
|
||||
* @param idle active status
|
||||
* @return 0 if now active
|
||||
*/
|
||||
private int deactivate(WorkQueue w) {
|
||||
int idle = 0;
|
||||
if (w != null) { // always true; hoist checks
|
||||
int phase = U.getInt(w, WorkQueue.PHASE);
|
||||
long sp = (phase + NEXTIDLE) & LMASK, pc = ctl;
|
||||
U.putInt(w, WorkQueue.PHASE, phase | IDLE);
|
||||
for (;;) { // try to enqueue
|
||||
w.stackPred = (int)pc;
|
||||
long c = (pc - RC_UNIT) & UMASK | sp, ac = c & RC_MASK;
|
||||
if (pc == (pc = U.compareAndExchangeLong(this, CTL, pc, c))) {
|
||||
if (ac == 0L) // check quiescent termination
|
||||
quiescent();
|
||||
idle = w.phase & IDLE;
|
||||
break;
|
||||
}
|
||||
else if (ac < (pc & RC_MASK)) {
|
||||
w.phase = phase; // back out if lost to signal
|
||||
break;
|
||||
private int deactivate(WorkQueue w, int idle) {
|
||||
if (w != null) { // always true; hoist checks
|
||||
int phase = w.phase;
|
||||
if (idle != 0) // already deactivated
|
||||
idle = phase & IDLE;
|
||||
else { // try to deactivate
|
||||
long sp = (phase + NEXTIDLE) & LMASK, pc = ctl;
|
||||
U.putInt(w, WorkQueue.PHASE, phase | IDLE);
|
||||
for (;;) { // enqueue
|
||||
w.stackPred = (int)pc;
|
||||
long c = (pc - RC_UNIT) & UMASK | sp, ac = c & RC_MASK;
|
||||
if (pc == (pc = U.compareAndExchangeLong(this, CTL, pc, c))) {
|
||||
if (ac == 0L) // check quiescent termination
|
||||
quiescent();
|
||||
idle = w.phase & IDLE;
|
||||
break;
|
||||
}
|
||||
else if (ac < (pc & RC_MASK)) {
|
||||
w.phase = phase; // back out if lost to signal
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (idle != 0 && (runState & STOP) == 0L) {
|
||||
int noise = (phase ^ (phase >>> 16)) & (SPIN_WAITS - 1);
|
||||
int spins = (SPIN_WAITS << 1) | noise;
|
||||
Thread.yield(); // helps when oversubscribed
|
||||
while ((idle = w.phase & IDLE) != 0 && --spins > 0)
|
||||
Thread.onSpinWait();
|
||||
}
|
||||
}
|
||||
return idle;
|
||||
}
|
||||
@ -2060,80 +2086,52 @@ public class ForkJoinPool extends AbstractExecutorService
|
||||
return idle;
|
||||
}
|
||||
|
||||
/**
|
||||
* Spins and/or yields to reduce unproductive scanning
|
||||
*
|
||||
* @param w the work queue
|
||||
* @param taken number of stolen tasks since last empty scan
|
||||
* @return active status
|
||||
*/
|
||||
private int onEmptyScan(WorkQueue w, int taken) {
|
||||
int idle = 0;
|
||||
if (w != null) { // always true; hoist checks
|
||||
if (taken != 0) {
|
||||
w.nsteals += taken;
|
||||
if ((w.config & CLEAR_TLS) != 0 &&
|
||||
(Thread.currentThread() instanceof ForkJoinWorkerThread f))
|
||||
f.resetThreadLocals(); // (instanceof check always true)
|
||||
Thread.interrupted(); // clear status
|
||||
}
|
||||
if ((idle = w.phase & IDLE) != 0 && (runState & STOP) == 0) {
|
||||
if (taken == 0)
|
||||
Thread.yield();
|
||||
for (int s = SPIN_WAITS; (idle = w.phase & IDLE) != 0 && --s != 0;)
|
||||
Thread.onSpinWait();
|
||||
}
|
||||
}
|
||||
return idle;
|
||||
}
|
||||
|
||||
/**
|
||||
* Awaits signal or termination.
|
||||
*
|
||||
* @param w the work queue
|
||||
* @return 0 if now active
|
||||
* @throws WorkerTrimmedException on idle timeout
|
||||
*/
|
||||
private int awaitWork(WorkQueue w) {
|
||||
int idle = 0, phase;
|
||||
if (w != null && (idle = (phase = w.phase) & IDLE) != 0) {
|
||||
LockSupport.setCurrentBlocker(this);
|
||||
private void awaitWork(WorkQueue w) {
|
||||
int phase;
|
||||
if (w != null && ((phase = w.phase) & IDLE) != 0) {
|
||||
int activePhase = phase + IDLE;
|
||||
long deadline = 0L, waitTime = (w.source == INVALID_ID) ? 0L : keepAlive;
|
||||
while ((runState & STOP) == 0L) {
|
||||
boolean trimmable = false; // use timed wait if trimmable
|
||||
LockSupport.setCurrentBlocker(this);
|
||||
for (;;) {
|
||||
Thread.interrupted(); // clear status
|
||||
if ((runState & STOP) != 0L)
|
||||
break;
|
||||
int idle;
|
||||
long d = 0L, c; // trim at head
|
||||
boolean trimmable = false; // use timed wait if trimmable
|
||||
if ((int)(c = ctl) == activePhase && (c & RC_MASK) == 0L) {
|
||||
trimmable = true;
|
||||
long now = System.currentTimeMillis();
|
||||
if (deadline == 0L)
|
||||
deadline = waitTime + now;
|
||||
if (deadline - now <= TIMEOUT_SLOP) {
|
||||
if (tryTrim(w, c, activePhase))
|
||||
break;
|
||||
continue; // lost race to trim
|
||||
}
|
||||
d = deadline;
|
||||
trimmable = true;
|
||||
if ((d = deadline) - now <= TIMEOUT_SLOP)
|
||||
tryTrim(w, c, activePhase); // throws if trimmed
|
||||
}
|
||||
if ((idle = w.phase & IDLE) == 0)
|
||||
if ((w.phase & IDLE) == 0)
|
||||
break;
|
||||
w.parking = 1; // enable unpark and recheck
|
||||
if ((idle = w.phase & IDLE) != 0)
|
||||
U.park(trimmable, d);
|
||||
w.parking = 0; // close unpark window
|
||||
if (idle == 0 || (idle = w.phase & IDLE) == 0)
|
||||
if (idle == 0 || (w.phase & IDLE) == 0)
|
||||
break;
|
||||
Thread.interrupted(); // clear status for next park
|
||||
}
|
||||
LockSupport.setCurrentBlocker(null);
|
||||
}
|
||||
return idle;
|
||||
}
|
||||
|
||||
/**
|
||||
* Tries to remove and deregister worker after timeout, and release
|
||||
* another to do the same unless new tasks are found.
|
||||
* @throws WorkerTrimmedException on idle timeout
|
||||
*/
|
||||
private boolean tryTrim(WorkQueue w, long c, int activePhase) {
|
||||
private void tryTrim(WorkQueue w, long c, int activePhase) {
|
||||
if (w != null) {
|
||||
int vp, i; WorkQueue[] vs; WorkQueue v;
|
||||
long nc = ((w.stackPred & LMASK) |
|
||||
@ -2150,10 +2148,10 @@ public class ForkJoinPool extends AbstractExecutorService
|
||||
v.phase = vp;
|
||||
U.unpark(v.owner);
|
||||
}
|
||||
return true;
|
||||
LockSupport.setCurrentBlocker(null);
|
||||
throw new WorkerTrimmedException();
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -2382,8 +2380,7 @@ public class ForkJoinPool extends AbstractExecutorService
|
||||
if (eligible) {
|
||||
if (U.compareAndSetReference(
|
||||
a, k, t, null)) {
|
||||
q.base = b + 1;
|
||||
U.storeFence();
|
||||
U.putIntVolatile(q, WorkQueue.BASE, b + 1);
|
||||
t.doExec();
|
||||
locals = rescan = true;
|
||||
break scan;
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user