mirror of
https://github.com/openjdk/jdk.git
synced 2026-04-21 12:20:29 +00:00
Split external push
This commit is contained in:
parent
2a5807473b
commit
4328645c97
@ -1198,6 +1198,7 @@ public class ForkJoinPool extends AbstractExecutorService
|
||||
private static final long PHASE;
|
||||
private static final long BASE;
|
||||
private static final long TOP;
|
||||
private static final long SOURCE;
|
||||
private static final long ARRAY;
|
||||
|
||||
/**
|
||||
@ -1240,34 +1241,35 @@ public class ForkJoinPool extends AbstractExecutorService
|
||||
}
|
||||
|
||||
/**
|
||||
* Pushes a task. Called only by owner or if already locked
|
||||
* Pushes a task on an internal queue. Called only by owner.
|
||||
* (Use pool.externalPush for external queues).
|
||||
*
|
||||
* @param task the task; caller must ensure nonnull
|
||||
* @param pool the pool to signal if was previously empty, else null
|
||||
* @param unlock if not 1, phase unlock value
|
||||
* @param pool the pool to signal if was previously empty or resized
|
||||
|
||||
* @throws RejectedExecutionException if array could not be resized
|
||||
*/
|
||||
final void push(ForkJoinTask<?> task, ForkJoinPool pool, int unlock) {
|
||||
final void push(ForkJoinTask<?> task, ForkJoinPool pool) {
|
||||
ForkJoinTask<?>[] a = array;
|
||||
int b = base, s = top, room, m;
|
||||
if (a != null &&
|
||||
((room = a.length - (s + 1 - b)) > 0 || (a = growArray()) != null) &&
|
||||
(m = a.length - 1) >= 0) { // else rejected or disabled
|
||||
int b = base, s = top, cap, m;
|
||||
if (a == null || ((cap = a.length) - (s + 1 - b)) <= 0 || cap <= 0)
|
||||
growAndPush(task, pool, 1);
|
||||
else {
|
||||
top = s + 1;
|
||||
U.putReferenceVolatile(a, slotOffset(m & s), task);
|
||||
if (unlock != 1) // release external lock
|
||||
U.putInt(this, PHASE, unlock);
|
||||
if ((U.getReferenceAcquire(a, slotOffset(m & (s - 1))) == null ||
|
||||
room <= 0) && pool != null)
|
||||
U.putReferenceVolatile(a, slotOffset((m = cap - 1) & s), task);
|
||||
if (U.getReferenceAcquire(a, slotOffset(m & (s - 1))) == null &&
|
||||
pool != null)
|
||||
pool.signalWork(this, s); // may have appeared empty
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Resizes the queue array unless out of memory.
|
||||
* @return new array, or throw on OOME
|
||||
* Resizes the queue array and pushes unless out of memory.
|
||||
* @param task the task; caller must ensure nonnull
|
||||
* @param pool the pool to signal upon resize
|
||||
* @param unlock if not 1, phase unlock value
|
||||
*/
|
||||
private ForkJoinTask<?>[] growArray() {
|
||||
final void growAndPush(ForkJoinTask<?> task, ForkJoinPool pool, int unlock) {
|
||||
ForkJoinTask<?>[] a; int cap, newCap;
|
||||
if ((a = array) != null && (cap = a.length) > 0 &&
|
||||
(newCap = (cap >= 1 << 16) ? cap << 1 : cap << 2) > 0) {
|
||||
@ -1277,8 +1279,9 @@ public class ForkJoinPool extends AbstractExecutorService
|
||||
} catch (OutOfMemoryError ex) {
|
||||
}
|
||||
if (newArray != null) {
|
||||
int mask = cap - 1, newMask = newCap - 1;
|
||||
for (int k = top - 1, j = cap; j > 0; --j, --k) {
|
||||
int s = top++, mask = cap - 1, newMask = newCap - 1;
|
||||
newArray[s & newMask] = task;
|
||||
for (int k = s - 1, j = cap; j > 0; --j, --k) {
|
||||
ForkJoinTask<?> u; // poll old, push to new
|
||||
if ((u = (ForkJoinTask<?>)U.getAndSetReference(
|
||||
a, slotOffset(k & mask), null)) == null)
|
||||
@ -1286,12 +1289,15 @@ public class ForkJoinPool extends AbstractExecutorService
|
||||
newArray[k & newMask] = u;
|
||||
}
|
||||
U.putReferenceVolatile(this, ARRAY, newArray);
|
||||
return newArray;
|
||||
if (unlock != 1)
|
||||
phase = unlock;
|
||||
if (pool != null)
|
||||
pool.signalWork(this, s);
|
||||
return;
|
||||
}
|
||||
}
|
||||
int f = phase; // unlock if externally locked
|
||||
if ((f & (IDLE | 1)) == 0)
|
||||
phase = f + IDLE;
|
||||
if (unlock != 1)
|
||||
phase = unlock;
|
||||
throw new RejectedExecutionException("Queue capacity exceeded");
|
||||
}
|
||||
|
||||
@ -1308,12 +1314,12 @@ public class ForkJoinPool extends AbstractExecutorService
|
||||
if (fifo == 0) {
|
||||
if ((t = (ForkJoinTask<?>)U.getAndSetReference(
|
||||
a, slotOffset((cap - 1) & s), null)) != null)
|
||||
top = s;
|
||||
U.putIntOpaque(this, TOP, s);
|
||||
} else {
|
||||
do {
|
||||
if ((t = (ForkJoinTask<?>)U.getAndSetReference(
|
||||
a, slotOffset((cap - 1) & b), null)) != null) {
|
||||
base = b + 1;
|
||||
U.putIntVolatile(this, BASE, b + 1);
|
||||
break;
|
||||
}
|
||||
if (b == s)
|
||||
@ -1330,10 +1336,7 @@ public class ForkJoinPool extends AbstractExecutorService
|
||||
* Takes next task, if one exists, using configured mode.
|
||||
*/
|
||||
final ForkJoinTask<?> nextLocalTask() {
|
||||
U.loadFence(); // ensure ordering for external callers
|
||||
ForkJoinTask<?> t = nextLocalTask(config & FIFO);
|
||||
U.storeFence();
|
||||
return t;
|
||||
return nextLocalTask(config & FIFO);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -1452,6 +1455,7 @@ public class ForkJoinPool extends AbstractExecutorService
|
||||
a, slotOffset(s & m), null));
|
||||
top = s;
|
||||
}
|
||||
U.storeFence();
|
||||
}
|
||||
if (!internal)
|
||||
phase = lock + NEXTIDLE;
|
||||
@ -1499,7 +1503,7 @@ public class ForkJoinPool extends AbstractExecutorService
|
||||
if (taken =
|
||||
(top == p &&
|
||||
U.compareAndSetReference(a, k, t, null)))
|
||||
top = s;
|
||||
U.putIntOpaque(this, TOP, s);
|
||||
if (!internal)
|
||||
phase = lock + NEXTIDLE;
|
||||
if (!taken)
|
||||
@ -1536,7 +1540,7 @@ public class ForkJoinPool extends AbstractExecutorService
|
||||
break;
|
||||
if (base == b && t != null &&
|
||||
U.compareAndSetReference(a, k, t, null)) {
|
||||
base = b + 1;
|
||||
U.putIntVolatile(this, BASE, b + 1);
|
||||
t.doExec();
|
||||
}
|
||||
}
|
||||
@ -1573,6 +1577,7 @@ public class ForkJoinPool extends AbstractExecutorService
|
||||
PHASE = U.objectFieldOffset(klass, "phase");
|
||||
BASE = U.objectFieldOffset(klass, "base");
|
||||
TOP = U.objectFieldOffset(klass, "top");
|
||||
SOURCE = U.objectFieldOffset(klass, "source");
|
||||
ARRAY = U.objectFieldOffset(klass, "array");
|
||||
}
|
||||
}
|
||||
@ -1907,8 +1912,10 @@ public class ForkJoinPool extends AbstractExecutorService
|
||||
else if ((ds = delayScheduler) != null && !ds.canShutDown())
|
||||
return 0;
|
||||
else if (U.compareAndSetLong(this, CTL, c, c) &&
|
||||
U.compareAndSetLong(this, RUNSTATE, e, e | STOP))
|
||||
U.compareAndSetLong(this, RUNSTATE, e, e | STOP)) {
|
||||
releaseWaiters();
|
||||
return 1; // enable termination
|
||||
}
|
||||
else
|
||||
break; // restart
|
||||
}
|
||||
@ -1926,7 +1933,7 @@ public class ForkJoinPool extends AbstractExecutorService
|
||||
WorkQueue[] qs; int n;
|
||||
int r = w.stackPred; // seed from registerWorker
|
||||
int fifo = (int)config & FIFO;
|
||||
int src = -1, idle = 0, rescans = 0, taken = 0;
|
||||
int src = 0, idle = 0, rescans = 0, taken = 0;
|
||||
while ((runState & STOP) == 0L && (qs = queues) != null &&
|
||||
(n = qs.length) > 0) {
|
||||
int i = r, step = (r >>> 16) | 1;
|
||||
@ -1965,8 +1972,7 @@ public class ForkJoinPool extends AbstractExecutorService
|
||||
if (nt != null)
|
||||
signalWork(q, nb); // propagate
|
||||
rescans = 1;
|
||||
++taken;
|
||||
if (src != qid)
|
||||
if (taken++ == 0 || src != qid)
|
||||
w.source = src = qid;
|
||||
w.topLevelExec(t, fifo);
|
||||
}
|
||||
@ -1975,21 +1981,14 @@ public class ForkJoinPool extends AbstractExecutorService
|
||||
}
|
||||
if (rescans >= 0)
|
||||
--rescans;
|
||||
else if ((runState & STOP) != 0L)
|
||||
break;
|
||||
else if (idle == 0) {
|
||||
idle = deactivate(w, taken);
|
||||
taken = 0;
|
||||
}
|
||||
else {
|
||||
int phase;
|
||||
if ((idle = (phase = w.phase) & IDLE) != 0) {
|
||||
if ((idle = awaitWork(w, phase)) != 0)
|
||||
break;
|
||||
src = -1;
|
||||
}
|
||||
else if ((idle = awaitWork(w)) == 0)
|
||||
rescans = 0;
|
||||
}
|
||||
else
|
||||
break;
|
||||
}
|
||||
if (taken != 0)
|
||||
w.nsteals += taken;
|
||||
@ -1997,7 +1996,7 @@ public class ForkJoinPool extends AbstractExecutorService
|
||||
}
|
||||
|
||||
/**
|
||||
* Deactivates and enqueues worker
|
||||
* Deactivates and enqueues worker, backing out on signal
|
||||
*
|
||||
* @param w the work queue
|
||||
* @param taken number of stolen tasks since last reactivation
|
||||
@ -2009,24 +2008,28 @@ public class ForkJoinPool extends AbstractExecutorService
|
||||
int phase = U.getInt(w, WorkQueue.PHASE);
|
||||
long sp = (phase + NEXTIDLE) & LMASK, pc = ctl, c, e;
|
||||
U.putInt(w, WorkQueue.PHASE, phase | IDLE);
|
||||
do { // enqueue
|
||||
for (;;) { // try to enqueue
|
||||
w.stackPred = (int)pc;
|
||||
} while (pc != (pc = U.compareAndExchangeLong(
|
||||
this, CTL, pc, c = ((pc - RC_UNIT) & UMASK) | sp)));
|
||||
if (pc == (pc = U.compareAndExchangeLong(
|
||||
this, CTL, pc, c = ((pc - RC_UNIT) & UMASK) | sp)))
|
||||
break;
|
||||
else if ((c & RC_MASK) < (pc & RC_MASK)) {
|
||||
w.phase = phase; // back out if lost to signal
|
||||
idle = 0;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (taken != 0) {
|
||||
w.nsteals += taken;
|
||||
if ((w.config & CLEAR_TLS) != 0 &&
|
||||
(Thread.currentThread() instanceof ForkJoinWorkerThread f))
|
||||
f.resetThreadLocals(); // (instanceof check always true)
|
||||
}
|
||||
if (((e = runState) & STOP) != 0L ||
|
||||
((e & SHUTDOWN) != 0L && (c & RC_MASK) == 0L && quiescent() > 0))
|
||||
releaseWaiters();
|
||||
else { // spin for approx 1 scan cost
|
||||
int tc = (short)(c >>> TC_SHIFT);
|
||||
int spins = Math.max((tc << 1) + tc, SPIN_WAITS);
|
||||
while ((idle = w.phase & IDLE) != 0 && --spins != 0)
|
||||
Thread.onSpinWait();
|
||||
if (idle != 0 &&
|
||||
((e = runState) & STOP) == 0L &&
|
||||
((e & SHUTDOWN) == 0L || (c & RC_MASK) > 0L || quiescent() <= 0)) {
|
||||
for (int s = SPIN_WAITS; (idle = w.phase & IDLE) != 0 && --s != 0;)
|
||||
Thread.onSpinWait(); // spin before rescan
|
||||
}
|
||||
}
|
||||
return idle;
|
||||
@ -2057,43 +2060,46 @@ public class ForkJoinPool extends AbstractExecutorService
|
||||
* Awaits signal or termination.
|
||||
*
|
||||
* @param w the work queue
|
||||
* @param phase w's current phase (must be inactive)
|
||||
* @return 0 if now active
|
||||
*/
|
||||
private int awaitWork(WorkQueue w, int phase) {
|
||||
int idle = IDLE;
|
||||
if (w != null) { // always true; hoist checks
|
||||
private int awaitWork(WorkQueue w) {
|
||||
int idle = 0, phase;
|
||||
if (w != null && (idle = (phase = w.phase) & IDLE) != 0) {
|
||||
int activePhase = phase + IDLE;
|
||||
long deadline = 0L, waitTime = (w.source == INVALID_ID) ? 0L : keepAlive;
|
||||
LockSupport.setCurrentBlocker(this);
|
||||
for (;;) {
|
||||
do {
|
||||
boolean trimmable = false; // use timed wait if trimmable
|
||||
int spins = 0;
|
||||
long d = 0L, c;
|
||||
Thread.interrupted(); // clear status
|
||||
if ((runState & STOP) != 0L)
|
||||
break;
|
||||
boolean trimmable = false; // use timed wait if trimmable
|
||||
long d = 0L, c;
|
||||
if (((c = ctl) & RC_MASK) == 0L && (int)c == activePhase) {
|
||||
long now = System.currentTimeMillis();
|
||||
if (deadline == 0L)
|
||||
deadline = waitTime + now;
|
||||
if (deadline - now <= TIMEOUT_SLOP) {
|
||||
if (tryTrim(w, c, activePhase))
|
||||
break;
|
||||
continue; // lost race to trim
|
||||
if ((int)(c = ctl) == activePhase) {
|
||||
spins = SPIN_WAITS; // trim or spin at head
|
||||
if ((c & RC_MASK) == 0L) {
|
||||
long now = System.currentTimeMillis();
|
||||
if (deadline == 0L)
|
||||
deadline = waitTime + now;
|
||||
if (deadline - now <= TIMEOUT_SLOP) {
|
||||
if (tryTrim(w, c, activePhase))
|
||||
break;
|
||||
continue; // lost race to trim
|
||||
}
|
||||
d = deadline;
|
||||
trimmable = true;
|
||||
}
|
||||
d = deadline;
|
||||
trimmable = true;
|
||||
}
|
||||
if ((idle = w.phase & IDLE) == 0)
|
||||
while ((idle = w.phase & IDLE) != 0 && spins-- != 0)
|
||||
Thread.onSpinWait();
|
||||
if (idle == 0)
|
||||
break;
|
||||
LockSupport.setCurrentBlocker(this);
|
||||
w.parking = 1; // enable unpark and recheck
|
||||
if ((idle = w.phase & IDLE) != 0)
|
||||
U.park(trimmable, d);
|
||||
w.parking = 0; // close unpark window
|
||||
if (idle == 0 || (idle = w.phase & IDLE) == 0)
|
||||
break;
|
||||
}
|
||||
LockSupport.setCurrentBlocker(null);
|
||||
LockSupport.setCurrentBlocker(null);
|
||||
} while (idle != 0 && (idle = w.phase & IDLE) != 0);
|
||||
}
|
||||
return idle;
|
||||
}
|
||||
@ -2280,7 +2286,7 @@ public class ForkJoinPool extends AbstractExecutorService
|
||||
}
|
||||
if (U.compareAndSetReference(a, k, t, null)) {
|
||||
q.base = b + 1;
|
||||
w.source = j;
|
||||
U.putIntVolatile(w, WorkQueue.SOURCE, j);
|
||||
t.doExec();
|
||||
w.source = wsrc;
|
||||
rescan = true; // restart at index r
|
||||
@ -2352,6 +2358,7 @@ public class ForkJoinPool extends AbstractExecutorService
|
||||
if (U.compareAndSetReference(
|
||||
a, k, t, null)) {
|
||||
q.base = b + 1;
|
||||
U.storeFence();
|
||||
t.doExec();
|
||||
locals = rescan = true;
|
||||
break scan;
|
||||
@ -2435,7 +2442,7 @@ public class ForkJoinPool extends AbstractExecutorService
|
||||
}
|
||||
if (U.compareAndSetReference(a, k, t, null)) {
|
||||
q.base = nb;
|
||||
w.source = j;
|
||||
U.putIntVolatile(w, WorkQueue.SOURCE, j);
|
||||
t.doExec();
|
||||
w.source = wsrc;
|
||||
rescan = locals = true;
|
||||
@ -2553,7 +2560,7 @@ public class ForkJoinPool extends AbstractExecutorService
|
||||
ThreadLocalRandom.localInit(); // initialize caller's probe
|
||||
r = ThreadLocalRandom.getProbe();
|
||||
}
|
||||
for (;;) {
|
||||
for (;; r = ThreadLocalRandom.advanceProbe(r)) {
|
||||
WorkQueue q; WorkQueue[] qs; int n, id, i, lock;
|
||||
if ((qs = queues) == null || (n = qs.length) <= 0)
|
||||
break;
|
||||
@ -2570,10 +2577,20 @@ public class ForkJoinPool extends AbstractExecutorService
|
||||
q.phase = unlock;
|
||||
break; // check while q lock held
|
||||
}
|
||||
q.push(task, signalIfEmpty ? this : null, unlock);
|
||||
ForkJoinTask<?>[] a = q.array;
|
||||
int b = q.base, s = q.top, cap;
|
||||
if (a == null || ((cap = a.length) - (s + 1 - b)) <= 0 || cap <= 0)
|
||||
q.growAndPush(task, this, unlock);
|
||||
else {
|
||||
q.top = s + 1;
|
||||
a[(cap - 1) & s] = task;
|
||||
long pk = slotOffset((cap - 1) & (s - 1)); // predecessor index
|
||||
q.phase = unlock;
|
||||
if (U.getReferenceAcquire(a, pk) == null && signalIfEmpty)
|
||||
signalWork(q, s);
|
||||
}
|
||||
return;
|
||||
}
|
||||
r = ThreadLocalRandom.advanceProbe(r); // move
|
||||
}
|
||||
throw new RejectedExecutionException();
|
||||
}
|
||||
@ -2582,7 +2599,7 @@ public class ForkJoinPool extends AbstractExecutorService
|
||||
Thread t; ForkJoinWorkerThread wt;
|
||||
if (((t = JLA.currentCarrierThread()) instanceof ForkJoinWorkerThread) &&
|
||||
(wt = (ForkJoinWorkerThread)t).pool == this)
|
||||
wt.workQueue.push(task, signalIfEmpty ? this : null, 1);
|
||||
wt.workQueue.push(task, signalIfEmpty ? this : null);
|
||||
else
|
||||
externalPush(task, signalIfEmpty, true);
|
||||
}
|
||||
|
||||
@ -642,7 +642,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
|
||||
public final ForkJoinTask<V> fork() {
|
||||
Thread t; ForkJoinWorkerThread wt;
|
||||
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
|
||||
((wt = (ForkJoinWorkerThread)t).workQueue).push(this, wt.pool, 1);
|
||||
((wt = (ForkJoinWorkerThread)t).workQueue).push(this, wt.pool);
|
||||
else
|
||||
ForkJoinPool.common.externalPush(this, true, false);
|
||||
return this;
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user