mirror of
https://github.com/openjdk/jdk.git
synced 2026-01-28 12:09:14 +00:00
Don't oversignal LIFO
This commit is contained in:
parent
02ddb13d7b
commit
04928c9449
@ -1065,7 +1065,7 @@ public class ForkJoinPool extends AbstractExecutorService
|
||||
static final long RS_LOCK = 1L << 4; // lowest seqlock bit
|
||||
|
||||
// spin/sleep limits for runState locking and elsewhere
|
||||
static final int SPIN_WAITS = 1 << 8; // max calls to onSpinWait
|
||||
static final int SPIN_WAITS = 1 << 7; // max calls to onSpinWait
|
||||
static final int MIN_SLEEP = 1 << 10; // approx 1 usec as nanos
|
||||
static final int MAX_SLEEP = 1 << 20; // approx 1 sec as nanos
|
||||
|
||||
@ -1168,12 +1168,6 @@ public class ForkJoinPool extends AbstractExecutorService
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Exception thrown in tryTrim after idle timeout
|
||||
*/
|
||||
@SuppressWarnings("serial")
|
||||
static final class WorkerTrimmedException extends RuntimeException { }
|
||||
|
||||
/**
|
||||
* Queues supporting work-stealing as well as external task
|
||||
* submission. See above for descriptions and algorithms.
|
||||
@ -1195,9 +1189,11 @@ public class ForkJoinPool extends AbstractExecutorService
|
||||
@jdk.internal.vm.annotation.Contended("w")
|
||||
volatile int parking; // nonzero if parked in awaitWork
|
||||
@jdk.internal.vm.annotation.Contended("w")
|
||||
int source; // source queue id (or DROPPED)
|
||||
volatile int source; // source queue id (or DROPPED)
|
||||
@jdk.internal.vm.annotation.Contended("w")
|
||||
int nsteals; // number of steals from other queues
|
||||
@jdk.internal.vm.annotation.Contended("w")
|
||||
int dropOnEmptyScan; // nonzero if trimmable
|
||||
|
||||
// Support for atomic operations
|
||||
private static final Unsafe U;
|
||||
@ -1256,19 +1252,18 @@ public class ForkJoinPool extends AbstractExecutorService
|
||||
*/
|
||||
final void push(ForkJoinTask<?> task, ForkJoinPool pool, int unlock) {
|
||||
ForkJoinTask<?>[] a = array;
|
||||
int b = base, s = top, cap, m; long pos;
|
||||
if (a == null || (cap = a.length) <= s + 1 - b || (m = cap - 1) < 0)
|
||||
int b = base, s = top, p = top = s + 1, cap, m;
|
||||
if (a == null || (cap = a.length) <= p - b || (m = cap - 1) < 0)
|
||||
growAndPush(task, pool, unlock);
|
||||
else {
|
||||
top = s + 1;
|
||||
U.getAndSetReference(a, pos = slotOffset(m & s), task);
|
||||
U.getAndSetReference(a, slotOffset(m & s), task);
|
||||
Object pred = U.getReferenceAcquire(a, slotOffset(m & (s - 1)));
|
||||
if (unlock != 1) { // release external lock
|
||||
U.putInt(this, PHASE, unlock);
|
||||
U.storeFence();
|
||||
}
|
||||
if (pred == null && pool != null)
|
||||
pool.signalWork(a, pos); // may have appeared empty
|
||||
pool.signalWork(this, s); // may have appeared empty
|
||||
}
|
||||
}
|
||||
|
||||
@ -1288,7 +1283,7 @@ public class ForkJoinPool extends AbstractExecutorService
|
||||
} catch (OutOfMemoryError ex) {
|
||||
}
|
||||
if (newArray != null) {
|
||||
int s = top++, mask = cap - 1, newMask = newCap - 1;
|
||||
int s = top - 1, mask = cap - 1, newMask = newCap - 1;
|
||||
newArray[s & newMask] = task;
|
||||
for (int k = s - 1, j = cap; j > 0; --j, --k) {
|
||||
ForkJoinTask<?> u; // poll old, push to new
|
||||
@ -1297,15 +1292,15 @@ public class ForkJoinPool extends AbstractExecutorService
|
||||
break; // lost to pollers
|
||||
newArray[k & newMask] = u;
|
||||
}
|
||||
long pos = slotOffset(s & newMask);
|
||||
U.putReferenceVolatile(this, ARRAY, newArray);
|
||||
if (unlock != 1)
|
||||
phase = unlock;
|
||||
if (pool != null)
|
||||
pool.signalWork(newArray, pos);
|
||||
pool.signalWork(this, s);
|
||||
return;
|
||||
}
|
||||
}
|
||||
--top; // back out
|
||||
if (unlock != 1)
|
||||
phase = unlock;
|
||||
throw new RejectedExecutionException("Queue capacity exceeded");
|
||||
@ -1419,45 +1414,16 @@ public class ForkJoinPool extends AbstractExecutorService
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Tries once to poll for a task
|
||||
* @param pool if non-null, pool to propagate signals
|
||||
*/
|
||||
private ForkJoinTask<?> tryPoll(ForkJoinPool pool) {
|
||||
ForkJoinTask<?>[] a; int cap, b, nb; long bp;
|
||||
if ((a = array) != null && (cap = a.length) > 0) {
|
||||
ForkJoinTask<?> t = (ForkJoinTask<?>)U.getReferenceAcquire(
|
||||
a, bp = slotOffset((cap - 1) & (b = base)));
|
||||
long np = slotOffset((cap - 1) & (nb = b + 1));
|
||||
if (t != null && base == b &&
|
||||
U.compareAndSetReference(a, bp, t, null)) {
|
||||
Object nt = U.getReference(a, np);
|
||||
U.getAndSetInt(this, BASE, nb);
|
||||
if (pool != null && nt != null)
|
||||
pool.signalWork(a, np);
|
||||
return t;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
// specialized execution methods
|
||||
|
||||
/**
|
||||
* Runs the given task, as well as remaining local tasks
|
||||
*/
|
||||
final int topLevelExec(ForkJoinTask<?> task, WorkQueue q, ForkJoinPool pool,
|
||||
int fifo) {
|
||||
ForkJoinPool p = (fifo == 0) ? null : pool;
|
||||
int taken = 1;
|
||||
final void topLevelExec(ForkJoinTask<?> task, int fifo) {
|
||||
while (task != null) {
|
||||
task.doExec();
|
||||
if ((task = nextLocalTask(fifo)) == null &&
|
||||
(q == null || (task = q.tryPoll(p)) == null))
|
||||
break;
|
||||
++taken;
|
||||
task = nextLocalTask(fifo);
|
||||
}
|
||||
return taken;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -1577,7 +1543,7 @@ public class ForkJoinPool extends AbstractExecutorService
|
||||
break;
|
||||
if (base == b && t != null &&
|
||||
U.compareAndSetReference(a, k, t, null)) {
|
||||
U.putIntVolatile(this, BASE, b + 1);
|
||||
base = b + 1;
|
||||
t.doExec();
|
||||
}
|
||||
}
|
||||
@ -1761,7 +1727,7 @@ public class ForkJoinPool extends AbstractExecutorService
|
||||
* @param w caller's WorkQueue
|
||||
*/
|
||||
final void registerWorker(WorkQueue w) {
|
||||
if (w != null) {
|
||||
if (w != null && (runState & STOP) == 0L) {
|
||||
w.array = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
|
||||
ThreadLocalRandom.localInit();
|
||||
int seed = w.stackPred = ThreadLocalRandom.getProbe();
|
||||
@ -1815,8 +1781,6 @@ public class ForkJoinPool extends AbstractExecutorService
|
||||
final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
|
||||
WorkQueue w = null; // null if not created
|
||||
int phase = 0; // 0 if not registered
|
||||
if (ex instanceof WorkerTrimmedException)
|
||||
ex = null;
|
||||
if (wt != null && (w = wt.workQueue) != null &&
|
||||
(phase = w.phase) != 0 && (phase & IDLE) != 0)
|
||||
releaseWaiters(); // ensure released
|
||||
@ -1843,7 +1807,7 @@ public class ForkJoinPool extends AbstractExecutorService
|
||||
if ((tryTerminate(false, false) & STOP) == 0L &&
|
||||
phase != 0 && w != null && w.source != DROPPED) {
|
||||
w.cancelTasks(); // clean queue
|
||||
signalWork(null, 0L); // possibly replace
|
||||
signalWork(null, 0); // possibly replace
|
||||
}
|
||||
if (ex != null)
|
||||
ForkJoinTask.rethrow(ex);
|
||||
@ -1851,23 +1815,26 @@ public class ForkJoinPool extends AbstractExecutorService
|
||||
|
||||
/**
|
||||
* Releases an idle worker, or creates one if not enough exist,
|
||||
* giving up on contention if source slot already taken.
|
||||
* giving up on contention if q is nonull and signalled slot
|
||||
* already taken.
|
||||
*
|
||||
* @param src, if nonnull, the array containing signalled task
|
||||
* @param offset slot offset for the task
|
||||
* @param src, if nonnull, the WorkQueue containing signalled task
|
||||
* @param base src's base index for the task
|
||||
*/
|
||||
final void signalWork(ForkJoinTask<?>[] src, long offset) {
|
||||
final void signalWork(WorkQueue src, int base) {
|
||||
int pc = parallelism, i, sp; // rely on caller sync for initial reads
|
||||
long c = U.getLong(this, CTL);
|
||||
WorkQueue[] qs = queues;
|
||||
while ((short)(c >>> RC_SHIFT) < pc && qs != null &&
|
||||
qs.length > (i = (sp = (int)c) & SMASK)) {
|
||||
WorkQueue w = qs[i], v = null; long nc;
|
||||
if (i == 0 || w == null) {
|
||||
if (i == 0) {
|
||||
if ((short)(c >>> TC_SHIFT) >= pc)
|
||||
break;
|
||||
nc = ((c + TC_UNIT) & TC_MASK) | ((c + RC_UNIT) & RC_MASK);
|
||||
}
|
||||
else if ((v = w) == null)
|
||||
break;
|
||||
else
|
||||
nc = ((v = w).stackPred & LMASK) | ((c + RC_UNIT) & UMASK);
|
||||
if (c == (c = U.compareAndExchangeLong(this, CTL, c, nc))) {
|
||||
@ -1881,7 +1848,7 @@ public class ForkJoinPool extends AbstractExecutorService
|
||||
break;
|
||||
}
|
||||
qs = queues;
|
||||
if (src != null && U.getReference(src, offset) == null)
|
||||
if (src != null && src.base - base > 0)
|
||||
break;
|
||||
}
|
||||
}
|
||||
@ -1961,23 +1928,22 @@ public class ForkJoinPool extends AbstractExecutorService
|
||||
final void runWorker(WorkQueue w) {
|
||||
if (w != null) {
|
||||
int r = w.stackPred; // seed from registerWorker
|
||||
for (int fifo = (int)config & FIFO, idle = 0, taken = 0;;) {
|
||||
WorkQueue[] qs;
|
||||
long e = runState;
|
||||
int n = ((qs = queues) == null) ? 0 : qs.length;
|
||||
int fifo = (int)config & FIFO;
|
||||
rescan: for (int idle = 0, taken = 0, src = -1;;) {
|
||||
WorkQueue[] qs; int n;
|
||||
r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
|
||||
int i = r, step = (r >>> 16) | 1;
|
||||
if ((e & STOP) != 0L || n <= 0) {
|
||||
long e = runState;
|
||||
if ((qs = queues) == null || (n = qs.length) <= 0 ||
|
||||
(e & STOP) != 0L) {
|
||||
w.nsteals += taken;
|
||||
break;
|
||||
}
|
||||
boolean rescan = false;
|
||||
scan: for (int j = n; j != 0; --j, i += step) {
|
||||
scan: for (int j = n, i = r, step = (r >>> 16) | 1; ; i += step) {
|
||||
WorkQueue q; int qid;
|
||||
if ((q = qs[qid = i & (n - 1)]) != null) {
|
||||
ForkJoinTask<?>[] a; int pb = -1, cap; // poll queue
|
||||
ForkJoinTask<?>[] a; int pb = -1, ran = 0, cap;
|
||||
while ((a = q.array) != null && (cap = a.length) > 0) {
|
||||
int b, nb; long bp, np; ForkJoinTask<?> t;
|
||||
int b, nb; long bp, np, ps; ForkJoinTask<?> t;
|
||||
t = (ForkJoinTask<?>)U.getReferenceAcquire(
|
||||
a, bp = slotOffset((cap - 1) & (b = q.base)));
|
||||
Object nt = U.getReference(
|
||||
@ -1986,54 +1952,65 @@ public class ForkJoinPool extends AbstractExecutorService
|
||||
if (idle != 0) {
|
||||
if (t == null && nt == null && q.top - qb <= 0)
|
||||
break;
|
||||
if ((idle = tryReactivate(w)) != 0) {
|
||||
rescan = true;
|
||||
if ((idle = tryReactivate(w)) != 0)
|
||||
break scan; // can't take yet
|
||||
}
|
||||
}
|
||||
else if (qb != b) // inconsistent/busy
|
||||
;
|
||||
else if (qb != b) { // inconsistent
|
||||
if (taken == 0)
|
||||
break scan; // busy
|
||||
}
|
||||
else if (t == null) {
|
||||
if (q.array == a && U.getReference(a, bp) == null) {
|
||||
if (nt == null)
|
||||
break;
|
||||
if (pb == (pb = b)) {
|
||||
rescan = true; // stalled; reorder scan
|
||||
if (U.getReference(a, bp) == null) {
|
||||
if (ran != 0) // end run on this queue
|
||||
break scan;
|
||||
}
|
||||
if (nt == null) // probably empty
|
||||
break;
|
||||
if (pb == (pb = b))
|
||||
break scan; // stalled; reorder scan
|
||||
}
|
||||
}
|
||||
else if (U.compareAndSetReference(a, bp, t, null)) {
|
||||
nt = U.getReference(a, np);
|
||||
U.getAndSetInt(q, WorkQueue.BASE, nb);
|
||||
if (nt != null) // propagate
|
||||
signalWork(a, np);
|
||||
w.source = qid;
|
||||
taken += w.topLevelExec(t, q, this, fifo);
|
||||
rescan = true;
|
||||
break scan;
|
||||
if (qid != (ps = src))
|
||||
U.putIntOpaque(w, WorkQueue.SOURCE, src = qid);
|
||||
if (nt != null &&
|
||||
(qid != ps ||
|
||||
((qid & 1) == 0 && (fifo != 0 || taken == 0))) &&
|
||||
U.getReferenceAcquire(a, np) != null)
|
||||
signalWork(q, nb); // propagate
|
||||
ran = ++taken;
|
||||
w.topLevelExec(t, fifo);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!rescan) {
|
||||
idle = onEmptyScan(w, idle, taken);
|
||||
taken = 0;
|
||||
if (--j == 0) { // empty scan
|
||||
if (idle == 0) {
|
||||
idle = tryDeactivate(w, taken);
|
||||
taken = 0;
|
||||
}
|
||||
else if ((idle = awaitWork(w)) != 0)
|
||||
break rescan; // trimmed or terminated
|
||||
else
|
||||
src = -1;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Possibly deactivates, reactivates or pauses worker
|
||||
* Possibly deactivates or pauses worker
|
||||
*
|
||||
* @param w the work queue
|
||||
* @param taken number of tasks taken since last activate
|
||||
* @return 0 if now active
|
||||
*/
|
||||
private int onEmptyScan(WorkQueue w, int idle, int taken) {
|
||||
private int tryDeactivate(WorkQueue w, int taken) {
|
||||
int idle = 0;
|
||||
if (w != null) { // always true; hoist checks
|
||||
int phase = w.phase;
|
||||
if (taken != 0) {
|
||||
if (taken != 0) { // rescan before deactivating
|
||||
w.nsteals += taken;
|
||||
if ((w.config & CLEAR_TLS) != 0 &&
|
||||
(Thread.currentThread() instanceof ForkJoinWorkerThread f))
|
||||
@ -2041,7 +2018,8 @@ public class ForkJoinPool extends AbstractExecutorService
|
||||
if ((runState & (SHUTDOWN|STOP)) == 0L)
|
||||
Thread.yield(); // pause before rescan
|
||||
}
|
||||
else if (idle == 0) { // deactivate
|
||||
else {
|
||||
int phase = U.getInt(w, WorkQueue.PHASE);
|
||||
long sp = (phase + NEXTIDLE) & LMASK, pc = ctl;
|
||||
U.putInt(w, WorkQueue.PHASE, phase | IDLE);
|
||||
for (;;) { // enqueue
|
||||
@ -2059,10 +2037,6 @@ public class ForkJoinPool extends AbstractExecutorService
|
||||
}
|
||||
}
|
||||
}
|
||||
else if ((idle = phase & IDLE) != 0) {
|
||||
awaitWork(w, phase);
|
||||
idle = 0;
|
||||
}
|
||||
}
|
||||
return idle;
|
||||
}
|
||||
@ -2092,29 +2066,35 @@ public class ForkJoinPool extends AbstractExecutorService
|
||||
* Awaits signal or termination.
|
||||
*
|
||||
* @param w the work queue
|
||||
* @throws WorkerTrimmedException on idle timeout
|
||||
* @return 0 if now active
|
||||
*/
|
||||
private void awaitWork(WorkQueue w, int phase) {
|
||||
if (w != null) {
|
||||
int activePhase = phase + IDLE;
|
||||
long deadline = 0L, waitTime = (w.source == INVALID_ID) ? 0L : keepAlive;
|
||||
for (;;) {
|
||||
long d = 0L, c; int idle;
|
||||
private int awaitWork(WorkQueue w) {
|
||||
int idle = 0, phase;
|
||||
if (w != null && (idle = (phase = w.phase) & IDLE) != 0) {
|
||||
int activePhase = phase + IDLE, trim;
|
||||
if ((trim = w.dropOnEmptyScan) != 0)
|
||||
w.dropOnEmptyScan = 0;
|
||||
for (long deadline = 0L;;) {
|
||||
Thread.interrupted(); // clear status
|
||||
if ((runState & STOP) != 0L)
|
||||
break;
|
||||
boolean trimmable = false; // use timed wait if trimmable
|
||||
int spins = ((short)((c = ctl) >>> TC_SHIFT) | 1) & SMASK;
|
||||
if ((int)c == activePhase) { // at head
|
||||
if ((c & RC_MASK) == 0L) {
|
||||
boolean trimmable = false;
|
||||
long d = 0L, c;
|
||||
int ac = (short)((c = ctl) >>> RC_SHIFT);
|
||||
int spins = ((short)(c >>> TC_SHIFT) | 1) & SMASK; // >= # workers
|
||||
if ((int)c == activePhase) { // at head of ctl
|
||||
spins += Math.max(spins << 1, SPIN_WAITS); // approx 1 scan cost
|
||||
if (ac == 0) { // quiescent
|
||||
long now = System.currentTimeMillis();
|
||||
if (deadline == 0L)
|
||||
deadline = waitTime + now;
|
||||
if ((d = deadline) - now <= TIMEOUT_SLOP)
|
||||
tryTrim(w, c, activePhase); // throws if trimmed
|
||||
d = deadline = now + keepAlive;
|
||||
else if ((d = deadline) - now <= TIMEOUT_SLOP)
|
||||
trim = 1;
|
||||
if (trim != 0 && tryTrim(w, c, activePhase))
|
||||
break;
|
||||
trim = 0;
|
||||
trimmable = true;
|
||||
}
|
||||
spins += SPIN_WAITS; // spin more
|
||||
}
|
||||
while ((idle = w.phase & IDLE) != 0 && --spins != 0)
|
||||
Thread.onSpinWait();
|
||||
@ -2126,18 +2106,19 @@ public class ForkJoinPool extends AbstractExecutorService
|
||||
U.park(trimmable, d);
|
||||
w.parking = 0; // close unpark window
|
||||
LockSupport.setCurrentBlocker(null);
|
||||
if (idle == 0 || (w.phase & IDLE) == 0)
|
||||
if (idle == 0 || (idle = w.phase & IDLE) == 0)
|
||||
break;
|
||||
}
|
||||
}
|
||||
return idle;
|
||||
}
|
||||
|
||||
/**
|
||||
* Tries to remove and deregister worker after timeout, and release
|
||||
* another to do the same unless new tasks are found.
|
||||
* @throws WorkerTrimmedException on idle timeout
|
||||
* @return true if trimmed
|
||||
*/
|
||||
private void tryTrim(WorkQueue w, long c, int activePhase) {
|
||||
private boolean tryTrim(WorkQueue w, long c, int activePhase) {
|
||||
if (w != null) {
|
||||
int vp, i; WorkQueue[] vs; WorkQueue v;
|
||||
long nc = ((w.stackPred & LMASK) |
|
||||
@ -2150,13 +2131,14 @@ public class ForkJoinPool extends AbstractExecutorService
|
||||
U.compareAndSetLong(this, CTL, // try to wake up next waiter
|
||||
nc, ((v.stackPred & LMASK) |
|
||||
((UMASK & (nc + RC_UNIT)) | (nc & TC_MASK))))) {
|
||||
v.source = INVALID_ID; // enable cascaded timeouts
|
||||
v.dropOnEmptyScan = 1;
|
||||
v.phase = vp;
|
||||
U.unpark(v.owner);
|
||||
}
|
||||
throw new WorkerTrimmedException();
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -2314,7 +2296,7 @@ public class ForkJoinPool extends AbstractExecutorService
|
||||
}
|
||||
if (U.compareAndSetReference(a, k, t, null)) {
|
||||
q.base = b + 1;
|
||||
U.putIntVolatile(w, WorkQueue.SOURCE, j);
|
||||
w.source = j;
|
||||
t.doExec();
|
||||
w.source = wsrc;
|
||||
rescan = true; // restart at index r
|
||||
@ -2469,7 +2451,7 @@ public class ForkJoinPool extends AbstractExecutorService
|
||||
}
|
||||
if (U.compareAndSetReference(a, k, t, null)) {
|
||||
q.base = nb;
|
||||
U.putIntVolatile(w, WorkQueue.SOURCE, j);
|
||||
w.source = j;
|
||||
t.doExec();
|
||||
w.source = wsrc;
|
||||
rescan = locals = true;
|
||||
@ -3307,7 +3289,7 @@ public class ForkJoinPool extends AbstractExecutorService
|
||||
if ((config & PRESET_SIZE) != 0)
|
||||
throw new UnsupportedOperationException("Cannot override System property");
|
||||
if ((prevSize = U.getAndSetInt(this, PARALLELISM, size)) < size)
|
||||
signalWork(null, 0L); // trigger worker activation
|
||||
signalWork(null, 0); // trigger worker activation
|
||||
return prevSize;
|
||||
}
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user