mirror of
https://github.com/openjdk/jdk.git
synced 2026-04-19 03:17:40 +00:00
reunify push; improve contention vs activation vs park balance
This commit is contained in:
parent
d2b6c7c082
commit
400a413020
@ -1176,7 +1176,7 @@ public class ForkJoinPool extends AbstractExecutorService
|
||||
// fields declared in order of their likely layout on most VMs
|
||||
final ForkJoinWorkerThread owner; // null if shared
|
||||
ForkJoinTask<?>[] array; // the queued tasks; power of 2 size
|
||||
volatile int base; // index of next slot for poll
|
||||
int base; // index of next slot for poll
|
||||
final int config; // mode bits
|
||||
|
||||
// fields otherwise causing more unnecessary false-sharing cache misses
|
||||
@ -1241,21 +1241,23 @@ public class ForkJoinPool extends AbstractExecutorService
|
||||
}
|
||||
|
||||
/**
|
||||
* Pushes a task on an internal queue. Called only by owner.
|
||||
* (Use pool.externalPush for external queues).
|
||||
* Pushes a task. Called only by owner or if already locked
|
||||
*
|
||||
* @param task the task; caller must ensure nonnull
|
||||
* @param pool the pool to signal if was previously empty or resized
|
||||
* @param pool the pool to signal if was previously empty, else null
|
||||
* @param unlock if not 1, phase unlock value
|
||||
* @throws RejectedExecutionException if array could not be resized
|
||||
*/
|
||||
final void push(ForkJoinTask<?> task, ForkJoinPool pool) {
|
||||
final void push(ForkJoinTask<?> task, ForkJoinPool pool, int unlock) {
|
||||
ForkJoinTask<?>[] a = array;
|
||||
int s = top, m, cap = (a == null) ? 0 : a.length;
|
||||
if (cap <= s + 1 - base || (m = cap - 1) < 0)
|
||||
growAndPush(task, pool, 1);
|
||||
int b = base, s = top, cap, m;
|
||||
if (a == null || (cap = a.length) <= s + 1 - b || (m = cap - 1) < 0)
|
||||
growAndPush(task, pool, unlock);
|
||||
else {
|
||||
top = s + 1;
|
||||
U.putReferenceVolatile(a, slotOffset(m & s), task);
|
||||
if (unlock != 1) // release external lock
|
||||
U.putInt(this, PHASE, unlock);
|
||||
if (U.getReferenceAcquire(a, slotOffset(m & (s - 1))) == null &&
|
||||
pool != null)
|
||||
pool.signalWork(this, s); // may have appeared empty
|
||||
@ -1310,12 +1312,12 @@ public class ForkJoinPool extends AbstractExecutorService
|
||||
private ForkJoinTask<?> nextLocalTask(int fifo) {
|
||||
ForkJoinTask<?> t = null;
|
||||
ForkJoinTask<?>[] a = array;
|
||||
int s = top - 1, b, cap = (a == null) ? 0 : a.length;
|
||||
if (s - (b = base) >= 0 && cap > 0) {
|
||||
int b = base, s = top - 1, cap;
|
||||
if (a != null && s - b >= 0 && (cap = a.length) > 0) {
|
||||
if (fifo == 0) {
|
||||
if ((t = (ForkJoinTask<?>)U.getAndSetReference(
|
||||
a, slotOffset((cap - 1) & s), null)) != null)
|
||||
U.putIntOpaque(this, TOP, s);
|
||||
top = s;
|
||||
} else {
|
||||
do {
|
||||
if ((t = (ForkJoinTask<?>)U.getAndSetReference(
|
||||
@ -1325,7 +1327,7 @@ public class ForkJoinPool extends AbstractExecutorService
|
||||
}
|
||||
if (b == s)
|
||||
break;
|
||||
while (b == (b = base))
|
||||
while (b == (b = U.getIntAcquire(this, BASE)))
|
||||
Thread.onSpinWait();
|
||||
} while (s - b >= 0);
|
||||
}
|
||||
@ -1337,7 +1339,10 @@ public class ForkJoinPool extends AbstractExecutorService
|
||||
* Takes next task, if one exists, using configured mode.
|
||||
*/
|
||||
final ForkJoinTask<?> nextLocalTask() {
|
||||
return nextLocalTask(config & FIFO);
|
||||
U.loadFence(); // ensure ordering for external callers
|
||||
ForkJoinTask<?> t = nextLocalTask(config & FIFO);
|
||||
U.storeFence();
|
||||
return t;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -1405,7 +1410,7 @@ public class ForkJoinPool extends AbstractExecutorService
|
||||
Thread.onSpinWait(); // stalled
|
||||
}
|
||||
else if (U.compareAndSetReference(a, k, t, null)) {
|
||||
base = nb;
|
||||
U.putIntVolatile(this, BASE, nb);
|
||||
return t;
|
||||
}
|
||||
}
|
||||
@ -1430,8 +1435,8 @@ public class ForkJoinPool extends AbstractExecutorService
|
||||
*/
|
||||
final void tryRemoveAndExec(ForkJoinTask<?> task, boolean internal) {
|
||||
ForkJoinTask<?>[] a = array;
|
||||
int p = top, s = p - 1, cap = (a == null) ? 0 : a.length, d = p - base;
|
||||
if (cap > 0) {
|
||||
int b = base, p = top, s = p - 1, d = p - b, cap;
|
||||
if (a != null && (cap = a.length) > 0) {
|
||||
for (int m = cap - 1, i = s; d > 0; --i, --d) {
|
||||
long k; boolean taken;
|
||||
ForkJoinTask<?> t = (ForkJoinTask<?>)U.getReference(
|
||||
@ -1549,13 +1554,6 @@ public class ForkJoinPool extends AbstractExecutorService
|
||||
|
||||
// misc
|
||||
|
||||
final int spinWaitPhase() {
|
||||
int spins = SPIN_WAITS, f;
|
||||
while (((f = phase) & IDLE) != 0 && --spins != 0)
|
||||
Thread.onSpinWait();
|
||||
return f;
|
||||
}
|
||||
|
||||
/**
|
||||
* Cancels all local tasks. Called only by owner.
|
||||
*/
|
||||
@ -1823,35 +1821,27 @@ public class ForkJoinPool extends AbstractExecutorService
|
||||
* giving up on contention if q is nonull and signalled slot
|
||||
* already taken.
|
||||
*
|
||||
* @param q, if nonnull, the WorkQueue containing signalled task
|
||||
* @param qbase q's base index for the task
|
||||
* @param src, if nonnull, the WorkQueue containing signalled task
|
||||
* @param base src's base index for the task
|
||||
*/
|
||||
final void signalWork(WorkQueue q, int qbase) {
|
||||
int pc = parallelism;
|
||||
for (long c = ctl;;) {
|
||||
WorkQueue[] qs = queues;
|
||||
long ac = (c + RC_UNIT) & RC_MASK, nc;
|
||||
int sp = (int)c, i = sp & SMASK;
|
||||
if ((short)(c >>> RC_SHIFT) >= pc)
|
||||
break;
|
||||
if (qs == null)
|
||||
break;
|
||||
if (qs.length <= i)
|
||||
break;
|
||||
WorkQueue w = qs[i], v = null;
|
||||
if (sp == 0) {
|
||||
final void signalWork(WorkQueue src, int base) {
|
||||
int pc = parallelism, i, sp; // rely on caller sync for initial reads
|
||||
long c = U.getLong(this, CTL);
|
||||
WorkQueue[] qs = queues;
|
||||
while ((short)(c >>> RC_SHIFT) < pc && qs != null &&
|
||||
qs.length > (i = (sp = (int)c) & SMASK)) {
|
||||
WorkQueue v; long nc;
|
||||
if (i == 0) {
|
||||
if ((short)(c >>> TC_SHIFT) >= pc)
|
||||
break;
|
||||
nc = ((c + TC_UNIT) & TC_MASK) | ac;
|
||||
v = null;
|
||||
nc = ((c + TC_UNIT) & TC_MASK) | ((c + RC_UNIT) & RC_MASK);
|
||||
}
|
||||
else if ((v = w) == null)
|
||||
else if ((v = qs[i]) == null)
|
||||
break;
|
||||
else
|
||||
nc = (v.stackPred & LMASK) | (c & TC_MASK) | ac;
|
||||
if (q != null && q.base - qbase > 0)
|
||||
break;
|
||||
if (c == (c = ctl) &&
|
||||
c == (c = U.compareAndExchangeLong(this, CTL, c, nc))) {
|
||||
nc = (v.stackPred & LMASK) | ((c + RC_UNIT) & UMASK);
|
||||
if (c == (c = U.compareAndExchangeLong(this, CTL, c, nc))) {
|
||||
if (v == null)
|
||||
createWorker();
|
||||
else {
|
||||
@ -1861,6 +1851,9 @@ public class ForkJoinPool extends AbstractExecutorService
|
||||
}
|
||||
break;
|
||||
}
|
||||
qs = queues;
|
||||
if (src != null && src.base - base > 0)
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
@ -1938,14 +1931,17 @@ public class ForkJoinPool extends AbstractExecutorService
|
||||
*/
|
||||
final void runWorker(WorkQueue w) {
|
||||
if (w != null) {
|
||||
WorkQueue[] qs; int n;
|
||||
int r = w.stackPred; // seed from registerWorker
|
||||
int fifo = (int)config & FIFO;
|
||||
int src = 0, idle = 0, rescans = 0, taken = 0;
|
||||
while ((runState & STOP) == 0L && (qs = queues) != null &&
|
||||
(n = qs.length) > 0) {
|
||||
for (;;) {
|
||||
WorkQueue[] qs;
|
||||
long e = runState;
|
||||
int n = ((qs = queues) == null) ? 0 : qs.length;
|
||||
int i = r, step = (r >>> 16) | 1;
|
||||
r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
|
||||
if ((e & STOP) != 0L || n <= 0)
|
||||
break;
|
||||
scan: for (int j = n; j != 0; --j, i += step) {
|
||||
WorkQueue q; int qid;
|
||||
if ((q = qs[qid = i & (n - 1)]) != null) {
|
||||
@ -1955,27 +1951,26 @@ public class ForkJoinPool extends AbstractExecutorService
|
||||
t = (ForkJoinTask<?>)U.getReferenceAcquire(
|
||||
a, bp = slotOffset((cap - 1) & (b = q.base)));
|
||||
long np = slotOffset((nb = b + 1) & (cap - 1));
|
||||
if (q.base == b) { // else inconsistent
|
||||
if (q.array == a && q.base == b &&
|
||||
U.getReference(a, bp) == t) {
|
||||
if (t == null) {
|
||||
if (q.array == a) { // else resized
|
||||
if (rescans > 0) // ran or stalled
|
||||
break scan;
|
||||
if (U.getReference(a, np) == null &&
|
||||
U.getReference(a, bp) == null &&
|
||||
(rescans >= 0 || q.top == b))
|
||||
break;
|
||||
rescans = 1; // may be stalled
|
||||
}
|
||||
if (rescans > 0) // ran or stalled
|
||||
break scan;
|
||||
if (U.getReference(a, np) == null &&
|
||||
(rescans == 0 || q.top == b))
|
||||
break; // retry at most twice
|
||||
++rescans; // may be stalled
|
||||
}
|
||||
else if (idle != 0) {
|
||||
if ((idle = tryReactivate(w)) != 0) {
|
||||
rescans = 1; // can't take yet
|
||||
break scan;
|
||||
rescans = 1;
|
||||
break scan; // can't take yet
|
||||
}
|
||||
rescans = 0;
|
||||
}
|
||||
else if (U.compareAndSetReference(a, bp, t, null)) {
|
||||
Object nt = U.getReference(a, np);
|
||||
q.base = nb;
|
||||
U.getAndSetInt(q, WorkQueue.BASE, nb);
|
||||
if (nt != null) // propagate
|
||||
signalWork(q, nb);
|
||||
rescans = 1;
|
||||
@ -1987,47 +1982,72 @@ public class ForkJoinPool extends AbstractExecutorService
|
||||
}
|
||||
}
|
||||
}
|
||||
if (rescans >= 0)
|
||||
int phase;
|
||||
if (rescans >= 0) {
|
||||
--rescans;
|
||||
if (idle != 0)
|
||||
idle = w.phase & IDLE;
|
||||
}
|
||||
else if (idle == 0) {
|
||||
deactivate(w, taken);
|
||||
idle = IDLE;
|
||||
if ((idle = deactivate(w, taken)) == 0)
|
||||
rescans = 0;
|
||||
taken = 0;
|
||||
}
|
||||
else if ((idle = awaitWork(w)) == 0)
|
||||
else if ((idle = (phase = w.phase) & IDLE) == 0 ||
|
||||
(idle = awaitWork(w, phase)) == 0)
|
||||
rescans = 0;
|
||||
else
|
||||
break;
|
||||
}
|
||||
if (taken != 0)
|
||||
w.nsteals += taken;
|
||||
w.nsteals += taken;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Deactivates and enqueues worker
|
||||
* Deactivates and enqueues worker, possibly backing out on signal
|
||||
* contention.
|
||||
*
|
||||
* @param w the work queue
|
||||
* @param taken number of stolen tasks since last reactivation
|
||||
* @return active status
|
||||
*/
|
||||
private void deactivate(WorkQueue w, int taken) {
|
||||
private int deactivate(WorkQueue w, int taken) {
|
||||
int idle = IDLE;
|
||||
if (w != null) { // always true; hoist checks
|
||||
int phase = U.getInt(w, WorkQueue.PHASE);
|
||||
long sp = (phase + NEXTIDLE) & LMASK, pc = ctl, c;
|
||||
U.putInt(w, WorkQueue.PHASE, phase | IDLE);
|
||||
do {
|
||||
for (;;) { // try to enqueue
|
||||
w.stackPred = (int)pc;
|
||||
} while (pc != (pc = U.compareAndExchangeLong(
|
||||
this, CTL, pc, c = ((pc - RC_UNIT) & UMASK) | sp)));
|
||||
if (pc == (pc = U.compareAndExchangeLong(
|
||||
this, CTL, pc, c = ((pc - RC_UNIT) & UMASK) | sp)))
|
||||
break;
|
||||
if ((c & RC_MASK) < (pc & RC_MASK)) {
|
||||
w.phase = phase; // back out if lost to signal
|
||||
idle = 0;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (taken != 0) {
|
||||
w.nsteals += taken;
|
||||
if ((w.config & CLEAR_TLS) != 0 &&
|
||||
(Thread.currentThread() instanceof ForkJoinWorkerThread f))
|
||||
f.resetThreadLocals(); // (instanceof check always true)
|
||||
Thread.interrupted(); // clear status
|
||||
}
|
||||
if (idle != 0 && (runState & STOP) == 0L) {
|
||||
if ((c & RC_MASK) == 0L) {
|
||||
if (quiescent() <= 0) // check quiescent termination
|
||||
idle = w.phase & IDLE;
|
||||
}
|
||||
else if ((idle = w.phase & IDLE) != 0) {
|
||||
Thread.yield(); // reduce unproductive scanning
|
||||
for (int s = SPIN_WAITS; (idle = w.phase & IDLE) != 0 && --s != 0;)
|
||||
Thread.onSpinWait();
|
||||
}
|
||||
}
|
||||
if ((c & RC_MASK) == 0L && (runState & SHUTDOWN) != 0L)
|
||||
quiescent(); // may trigger quiescent termination
|
||||
}
|
||||
return idle;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -2055,43 +2075,47 @@ public class ForkJoinPool extends AbstractExecutorService
|
||||
* Awaits signal or termination.
|
||||
*
|
||||
* @param w the work queue
|
||||
* @param phase w's (inactive) phase
|
||||
* @return 0 if now active
|
||||
*/
|
||||
private int awaitWork(WorkQueue w) {
|
||||
int idle = IDLE, phase;
|
||||
if ((runState & STOP) == 0L && w != null &&
|
||||
(idle = (phase = w.spinWaitPhase()) & IDLE) != 0) {
|
||||
private int awaitWork(WorkQueue w, int phase) {
|
||||
int idle = IDLE;
|
||||
if (w != null) { // always true; hoist checks
|
||||
int activePhase = phase + IDLE;
|
||||
long deadline = 0L, waitTime = (w.source == INVALID_ID) ? 0L : keepAlive;
|
||||
LockSupport.setCurrentBlocker(this);
|
||||
for (;;) {
|
||||
Thread.interrupted(); // clear status
|
||||
if ((runState & STOP) != 0L)
|
||||
break;
|
||||
while ((runState & STOP) == 0L) {
|
||||
boolean trimmable = false; // use timed wait if trimmable
|
||||
long d = 0L, c;
|
||||
if (((c = ctl) & RC_MASK) == 0L && (int)c == activePhase) {
|
||||
long now = System.currentTimeMillis();
|
||||
if (deadline == 0L)
|
||||
deadline = waitTime + now;
|
||||
if (deadline - now <= TIMEOUT_SLOP) {
|
||||
if (tryTrim(w, c, activePhase))
|
||||
break;
|
||||
continue; // lost race to trim
|
||||
long d = 0L, c; // trim or spin at head
|
||||
int spins = 1;
|
||||
if ((int)(c = ctl) == activePhase) {
|
||||
spins = SPIN_WAITS;
|
||||
if ((c & RC_MASK) == 0L) {
|
||||
long now = System.currentTimeMillis();
|
||||
if (deadline == 0L)
|
||||
deadline = waitTime + now;
|
||||
if (deadline - now <= TIMEOUT_SLOP) {
|
||||
if (tryTrim(w, c, activePhase))
|
||||
break;
|
||||
continue; // lost race to trim
|
||||
}
|
||||
d = deadline;
|
||||
trimmable = true;
|
||||
}
|
||||
d = deadline;
|
||||
trimmable = true;
|
||||
}
|
||||
if ((idle = w.phase & IDLE) == 0)
|
||||
while ((idle = w.phase & IDLE) != 0 && --spins != 0)
|
||||
Thread.onSpinWait();
|
||||
if (idle == 0)
|
||||
break;
|
||||
LockSupport.setCurrentBlocker(this);
|
||||
w.parking = 1; // enable unpark and recheck
|
||||
if ((idle = w.phase & IDLE) != 0)
|
||||
U.park(trimmable, d);
|
||||
w.parking = 0; // close unpark window
|
||||
LockSupport.setCurrentBlocker(null);
|
||||
if (idle == 0 || (idle = w.phase & IDLE) == 0)
|
||||
break;
|
||||
Thread.interrupted(); // clear status for next park
|
||||
}
|
||||
LockSupport.setCurrentBlocker(null);
|
||||
}
|
||||
return idle;
|
||||
}
|
||||
@ -2326,7 +2350,7 @@ public class ForkJoinPool extends AbstractExecutorService
|
||||
if ((q = qs[j = r & SMASK & (n - 1)]) != null) {
|
||||
for (;;) {
|
||||
ForkJoinTask<?> t; ForkJoinTask<?>[] a;
|
||||
int b, cap, nb; long k;
|
||||
int b, cap; long k;
|
||||
boolean eligible = false;
|
||||
if ((a = q.array) == null || (cap = a.length) <= 0)
|
||||
break;
|
||||
@ -2564,23 +2588,12 @@ public class ForkJoinPool extends AbstractExecutorService
|
||||
}
|
||||
ForkJoinTask<?>[] a;
|
||||
if (q != null && (lock = q.tryLockPhase()) != 1) {
|
||||
int unlock = lock + NEXTIDLE;
|
||||
int s = q.top, cap = ((a = q.array) == null) ? 0 : a.length;
|
||||
int m = (cap <= s + 1 - q.base) ? -1 : cap - 1;
|
||||
int unlock = lock + NEXTIDLE;
|
||||
if (rejectOnShutdown && (runState & SHUTDOWN) != 0L) {
|
||||
q.phase = unlock;
|
||||
break; // check while q lock held
|
||||
}
|
||||
else if (m < 0)
|
||||
q.growAndPush(task, this, unlock);
|
||||
else {
|
||||
q.top = s + 1;
|
||||
a[m & s] = task;
|
||||
q.phase = unlock;
|
||||
if (U.getReferenceAcquire(a, slotOffset(m & (s - 1))) == null &&
|
||||
signalIfEmpty)
|
||||
signalWork(q, s);
|
||||
break; // check while q lock held
|
||||
}
|
||||
q.push(task, signalIfEmpty ? this : null, unlock);
|
||||
return;
|
||||
}
|
||||
}
|
||||
@ -2591,7 +2604,7 @@ public class ForkJoinPool extends AbstractExecutorService
|
||||
if (((t = JLA.currentCarrierThread()) instanceof ForkJoinWorkerThread) &&
|
||||
(q = (wt = (ForkJoinWorkerThread)t).workQueue) != null &&
|
||||
wt.pool == this)
|
||||
q.push(task, signalIfEmpty ? this : null);
|
||||
q.push(task, signalIfEmpty ? this : null, 1);
|
||||
else
|
||||
externalPush(task, signalIfEmpty, true);
|
||||
}
|
||||
@ -3079,8 +3092,8 @@ public class ForkJoinPool extends AbstractExecutorService
|
||||
* Package-private access to commonPool overriding zero parallelism
|
||||
*/
|
||||
static ForkJoinPool asyncCommonPool() {
|
||||
ForkJoinPool cp; int p;
|
||||
if ((p = (cp = common).parallelism) == 0)
|
||||
ForkJoinPool cp;
|
||||
if ((cp = common).parallelism == 0)
|
||||
U.compareAndSetInt(cp, PARALLELISM, 0, 2);
|
||||
return cp;
|
||||
}
|
||||
@ -3856,7 +3869,7 @@ public class ForkJoinPool extends AbstractExecutorService
|
||||
* the pool. This method may be useful for tuning task
|
||||
* granularities.The returned count does not include scheduled
|
||||
* tasks that are not yet ready to execute, which are reported
|
||||
* separately by method {@link getDelayedTaskCount}.
|
||||
* separately by method {@link #getDelayedTaskCount}.
|
||||
*
|
||||
* @return the number of queued tasks
|
||||
* @see ForkJoinWorkerThread#getQueuedTaskCount()
|
||||
|
||||
@ -642,7 +642,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
|
||||
public final ForkJoinTask<V> fork() {
|
||||
Thread t; ForkJoinWorkerThread wt;
|
||||
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
|
||||
((wt = (ForkJoinWorkerThread)t).workQueue).push(this, wt.pool);
|
||||
((wt = (ForkJoinWorkerThread)t).workQueue).push(this, wt.pool, 1);
|
||||
else
|
||||
ForkJoinPool.common.externalPush(this, true, false);
|
||||
return this;
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user