mirror of
https://github.com/openjdk/jdk.git
synced 2026-03-14 01:43:13 +00:00
Improve performance under oversubscription, step 1
This commit is contained in:
parent
1ba9fc6efd
commit
4f2adfcbc6
@ -1215,7 +1215,7 @@ public class ForkJoinPool extends AbstractExecutorService
|
||||
@jdk.internal.vm.annotation.Contended("w")
|
||||
int nsteals; // number of steals from other queues
|
||||
@jdk.internal.vm.annotation.Contended("w")
|
||||
volatile int parking; // nonzero if parked in awaitWork
|
||||
volatile int parking; // next phase if parked in awaitWork
|
||||
|
||||
// Support for atomic operations
|
||||
private static final Unsafe U;
|
||||
@ -1338,24 +1338,24 @@ public class ForkJoinPool extends AbstractExecutorService
|
||||
private ForkJoinTask<?> nextLocalTask(int fifo) {
|
||||
ForkJoinTask<?> t = null;
|
||||
ForkJoinTask<?>[] a = array;
|
||||
int b = base, p = top, cap;
|
||||
if (p - b > 0 && a != null && (cap = a.length) > 0) {
|
||||
for (int m = cap - 1, s, nb;;) {
|
||||
if (fifo == 0 || (nb = b + 1) == p) {
|
||||
if ((t = (ForkJoinTask<?>)U.getAndSetReference(
|
||||
a, slotOffset(m & (s = p - 1)), null)) != null)
|
||||
updateTop(s); // else lost race for only task
|
||||
break;
|
||||
}
|
||||
int b = base, s = top - 1, cap;
|
||||
if (a != null && s - b >= 0 && (cap = a.length) > 0) {
|
||||
if (fifo == 0) {
|
||||
if ((t = (ForkJoinTask<?>)U.getAndSetReference(
|
||||
a, slotOffset(m & b), null)) != null) {
|
||||
updateBase(nb);
|
||||
break;
|
||||
}
|
||||
while (b == (b = U.getIntAcquire(this, BASE)))
|
||||
Thread.onSpinWait(); // spin to reduce memory traffic
|
||||
if (p - b <= 0)
|
||||
break;
|
||||
a, slotOffset((cap - 1) & s), null)) != null)
|
||||
updateTop(s);
|
||||
} else {
|
||||
do {
|
||||
if ((t = (ForkJoinTask<?>)U.getAndSetReference(
|
||||
a, slotOffset((cap - 1) & b), null)) != null) {
|
||||
updateBase(b + 1);
|
||||
break;
|
||||
}
|
||||
if (b == s)
|
||||
break;
|
||||
while (b == (b = U.getIntAcquire(this, BASE)))
|
||||
Thread.onSpinWait();
|
||||
} while (s - b >= 0);
|
||||
}
|
||||
}
|
||||
return t;
|
||||
@ -1894,7 +1894,7 @@ public class ForkJoinPool extends AbstractExecutorService
|
||||
createWorker();
|
||||
else {
|
||||
v.phase = sp;
|
||||
if (v.parking != 0)
|
||||
if (v.parking == sp)
|
||||
U.unpark(v.owner);
|
||||
}
|
||||
break;
|
||||
@ -1915,7 +1915,7 @@ public class ForkJoinPool extends AbstractExecutorService
|
||||
c, ((UMASK & (c + RC_UNIT)) | (c & TC_MASK) |
|
||||
(v.stackPred & LMASK))))) {
|
||||
v.phase = sp;
|
||||
if (v.parking != 0)
|
||||
if (v.parking == sp)
|
||||
U.unpark(v.owner);
|
||||
}
|
||||
}
|
||||
@ -1983,46 +1983,51 @@ public class ForkJoinPool extends AbstractExecutorService
|
||||
int n = qs.length, i = r, step = (r >>> 16) | 1;
|
||||
boolean rescan = false;
|
||||
scan: for (int l = n; l > 0; --l, i += step) { // scan queues
|
||||
int j, cap; WorkQueue q; ForkJoinTask<?>[] a;
|
||||
if ((q = qs[j = i & (n - 1)]) != null &&
|
||||
int qid, cap; WorkQueue q; ForkJoinTask<?>[] a;
|
||||
if ((q = qs[qid = i & (n - 1)]) != null &&
|
||||
(a = q.array) != null && (cap = a.length) > 0) {
|
||||
for (int m = cap - 1, pb = -1, b = q.base;;) {
|
||||
ForkJoinTask<?> t; long k;
|
||||
t = (ForkJoinTask<?>)U.getReferenceAcquire(
|
||||
a, k = slotOffset(m & b));
|
||||
if (b != (b = q.base) || t == null ||
|
||||
!U.compareAndSetReference(a, k, t, null)) {
|
||||
if (b != (b = q.base)) { // inconsistent / busy
|
||||
if (src != qid) {
|
||||
rescan = true; // reduce interference
|
||||
break scan;
|
||||
}
|
||||
}
|
||||
else if (t == null ||
|
||||
!U.compareAndSetReference(a, k, t, null)) {
|
||||
if (a[b & m] == null) {
|
||||
if (rescan) // end of run
|
||||
break scan;
|
||||
if (a[(b + 1) & m] == null &&
|
||||
a[(b + 2) & m] == null) {
|
||||
break; // probably empty
|
||||
if (rescan) // end of run
|
||||
break scan;
|
||||
break; // probably empty
|
||||
}
|
||||
if (pb == (pb = b)) { // track progress
|
||||
rescan = true; // stalled; reorder scan
|
||||
if (src != qid || pb == (pb = b)) {
|
||||
rescan = true; // reorder scan
|
||||
break scan;
|
||||
}
|
||||
}
|
||||
}
|
||||
else {
|
||||
boolean propagate;
|
||||
int prevSrc = src, nb;
|
||||
long np = slotOffset((nb = b + 1) & m);
|
||||
long np = slotOffset((pb = b + 1) & m);
|
||||
boolean more = (U.getReferenceVolatile(a, np) != null);
|
||||
q.base = nb;
|
||||
q.base = pb;
|
||||
w.nsteals = ++nsteals;
|
||||
w.source = src = j; // volatile
|
||||
int prevSrc = src;
|
||||
w.source = src = qid; // volatile
|
||||
rescan = true;
|
||||
if (propagate =
|
||||
(more &&
|
||||
(prevSrc != src ||
|
||||
((j & 1) == 0) &&
|
||||
(fifo != 0 || t.noUserHelp() != 0)) &&
|
||||
U.getReference(a, np) != null))
|
||||
(prevSrc != qid ||
|
||||
((qid & 1) == 0 &&
|
||||
(fifo != 0 || t.noUserHelp() != 0)))))
|
||||
signalWork();
|
||||
w.topLevelExec(t, fifo);
|
||||
if ((b = q.base) != nb && !propagate)
|
||||
if ((b = q.base) != pb && !propagate)
|
||||
break scan; // reduce interference
|
||||
}
|
||||
}
|
||||
@ -2058,120 +2063,129 @@ public class ForkJoinPool extends AbstractExecutorService
|
||||
else if (qac < (pc & RC_MASK))
|
||||
return w.phase = phase; // back out if lost to signal
|
||||
}
|
||||
int ac = (short)(qac >>> RC_SHIFT), n; long e; WorkQueue[] qs;
|
||||
long e; WorkQueue[] qs; int n;
|
||||
if (((e = runState) & STOP) != 0L ||
|
||||
((e & SHUTDOWN) != 0L && ac == 0 && quiescent() > 0) ||
|
||||
((e & SHUTDOWN) != 0L && qac == 0L && quiescent() > 0) ||
|
||||
(qs = queues) == null || (n = qs.length) <= 0)
|
||||
return IDLE; // terminating
|
||||
for (int prechecks = Math.min(ac, 2), // reactivation threshold
|
||||
k = Math.max(n + (n << 1), SPIN_WAITS << 1),
|
||||
i = 0; k-- > 0 ; ++i) {
|
||||
return IDLE; // terminating
|
||||
int prechecks = Math.min((short)(qac >>> RC_SHIFT), 2);
|
||||
for (int k = n + (n << 1), i = activePhase; k-- > 0 ; ++i) {
|
||||
WorkQueue q; int cap; ForkJoinTask<?>[] a;
|
||||
if (w.phase == activePhase)
|
||||
return activePhase;
|
||||
if ((q = qs[i & (n - 1)]) == null)
|
||||
Thread.onSpinWait();
|
||||
else if ((a = q.array) != null && (cap = a.length) > 0 &&
|
||||
a[q.base & (cap - 1)] != null) {
|
||||
WorkQueue v; int sp, j; long c;
|
||||
if (prechecks > 0)
|
||||
--prechecks;
|
||||
else if (((c = ctl) & RC_MASK) <= qac && (sp = (int)c) != 0 &&
|
||||
(j = sp & SMASK) < n && (v = qs[j]) != null) {
|
||||
long nc = (v.stackPred & LMASK) | ((c + RC_UNIT) & UMASK);
|
||||
if (sp != activePhase && w.phase == activePhase)
|
||||
return activePhase;
|
||||
if ((sp == activePhase || k < n) &&
|
||||
U.compareAndSetLong(this, CTL, c, nc)) {
|
||||
v.phase = sp;
|
||||
if (sp == activePhase)
|
||||
if ((q = qs[i & (n - 1)]) != null) {
|
||||
if (w.phase == activePhase)
|
||||
return activePhase;
|
||||
if ((a = q.array) != null && (cap = a.length) > 0 &&
|
||||
a[q.base & (cap - 1)] != null) {
|
||||
WorkQueue v; int sp, j; long c;
|
||||
if (prechecks > 0)
|
||||
--prechecks;
|
||||
else if (((c = ctl) & RC_MASK) <= qac && (sp = (int)c) != 0 &&
|
||||
(j = sp & SMASK) < n && (v = qs[j]) != null) {
|
||||
long nc = (v.stackPred & LMASK) | ((c + RC_UNIT) & UMASK);
|
||||
if (sp != activePhase && w.phase == activePhase)
|
||||
return activePhase;
|
||||
if (v.parking != 0)
|
||||
U.unpark(v.owner);
|
||||
break;
|
||||
if ((sp == activePhase || k < n) &&
|
||||
U.compareAndSetLong(this, CTL, c, nc)) {
|
||||
v.phase = sp;
|
||||
if (sp == activePhase)
|
||||
return activePhase;
|
||||
if (v.parking == sp)
|
||||
U.unpark(v.owner);
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (k < n)
|
||||
k = n; // ensure re-encounter
|
||||
}
|
||||
if (k < n)
|
||||
k = n; // ensure re-encounter
|
||||
}
|
||||
}
|
||||
return (((phase = w.phase) & IDLE) == 0) ? phase : awaitWork(w, phase);
|
||||
return ((phase = w.phase) == activePhase) ? activePhase : awaitWork(w, phase);
|
||||
}
|
||||
|
||||
/**
|
||||
* Awaits signal or termination.
|
||||
*
|
||||
* @param w the work queue
|
||||
* @param p current phase (known to be idle)
|
||||
* @param phase current phase (known to be idle)
|
||||
* @return current phase, with IDLE set if worker should exit
|
||||
*/
|
||||
private int awaitWork(WorkQueue w, int p) {
|
||||
private int awaitWork(WorkQueue w, int phase) {
|
||||
if (w != null) {
|
||||
ForkJoinWorkerThread t; long deadline;
|
||||
if ((w.config & CLEAR_TLS) != 0 && (t = w.owner) != null)
|
||||
t.resetThreadLocals(); // clear before reactivate
|
||||
if ((ctl & RC_MASK) > 0L)
|
||||
deadline = 0L;
|
||||
else if ((deadline =
|
||||
(((w.source != INVALID_ID) ? keepAlive : TIMEOUT_SLOP)) +
|
||||
System.currentTimeMillis()) == 0L)
|
||||
deadline = 1L; // avoid zero
|
||||
int activePhase = p + IDLE;
|
||||
if ((p = w.phase) != activePhase && (runState & STOP) == 0L) {
|
||||
LockSupport.setCurrentBlocker(this);
|
||||
w.parking = 1; // enable unpark
|
||||
while ((p = w.phase) != activePhase) {
|
||||
boolean trimmable = false; int trim;
|
||||
Thread.interrupted(); // clear status
|
||||
if ((runState & STOP) != 0L)
|
||||
break;
|
||||
if (deadline != 0L) {
|
||||
if ((trim = tryTrim(w, p, deadline)) > 0)
|
||||
int activePhase = phase + IDLE, parking = 0;
|
||||
long deadline = 0L;
|
||||
boolean trim = (w.source == INVALID_ID);
|
||||
if ((w.config & CLEAR_TLS) != 0 &&
|
||||
(Thread.currentThread() instanceof ForkJoinWorkerThread f))
|
||||
f.resetThreadLocals(); // (instanceof check always true)
|
||||
while ((phase = w.phase) != activePhase) {
|
||||
boolean trimmable = false; // true if at ctl head and quiescent
|
||||
long d = 0L, c;
|
||||
Thread.interrupted(); // clear status
|
||||
if ((runState & STOP) != 0L)
|
||||
break;
|
||||
if ((int)(c = ctl) == activePhase) {
|
||||
if ((c & RC_MASK) == 0L) {
|
||||
long now = System.currentTimeMillis();
|
||||
if (deadline == 0L)
|
||||
d = deadline = now + keepAlive;
|
||||
else if ((d = deadline) - now <= TIMEOUT_SLOP)
|
||||
trim = true;
|
||||
if (trim && tryTrim(w, c, activePhase))
|
||||
break;
|
||||
trim = false;
|
||||
trimmable = true;
|
||||
}
|
||||
if (parking == 0) { // spin at head for approx 1 scan cost
|
||||
int tc = ((short)(c >>> TC_SHIFT) | 1) & SMASK;
|
||||
int spins = tc + (tc << 1);
|
||||
while ((phase = w.phase) != activePhase && --spins != 0)
|
||||
Thread.onSpinWait();
|
||||
if (phase == activePhase)
|
||||
break;
|
||||
else if (trim < 0)
|
||||
deadline = 0L;
|
||||
else
|
||||
trimmable = true;
|
||||
}
|
||||
U.park(trimmable, deadline);
|
||||
}
|
||||
if (parking == 0) { // enable unpark
|
||||
LockSupport.setCurrentBlocker(this);
|
||||
w.parking = activePhase;
|
||||
if ((phase = w.phase) == activePhase)
|
||||
break;
|
||||
}
|
||||
U.park(trimmable, d);
|
||||
}
|
||||
if (parking != 0) {
|
||||
w.parking = 0;
|
||||
LockSupport.setCurrentBlocker(null);
|
||||
}
|
||||
}
|
||||
return p;
|
||||
return phase;
|
||||
}
|
||||
|
||||
/**
|
||||
* Tries to remove and deregister worker after timeout, and release
|
||||
* another to do the same.
|
||||
* @return > 0: trimmed, < 0 : not trimmable, else 0
|
||||
* another to do the same unless new tasks are found.
|
||||
* @return true if trimmed
|
||||
*/
|
||||
private int tryTrim(WorkQueue w, int phase, long deadline) {
|
||||
long c, nc; int stat, activePhase, vp, i; WorkQueue[] vs; WorkQueue v;
|
||||
if ((activePhase = phase + IDLE) != (int)(c = ctl) || w == null)
|
||||
stat = -1; // no longer ctl top
|
||||
else if (deadline - System.currentTimeMillis() >= TIMEOUT_SLOP)
|
||||
stat = 0; // spurious wakeup
|
||||
else if (!compareAndSetCtl(
|
||||
c, nc = ((w.stackPred & LMASK) | (RC_MASK & c) |
|
||||
(TC_MASK & (c - TC_UNIT)))))
|
||||
stat = -1; // lost race to signaller
|
||||
else {
|
||||
stat = 1;
|
||||
w.source = DROPPED;
|
||||
w.phase = activePhase;
|
||||
if ((vp = (int)nc) != 0 && (vs = queues) != null &&
|
||||
vs.length > (i = vp & SMASK) && (v = vs[i]) != null &&
|
||||
compareAndSetCtl( // try to wake up next waiter
|
||||
nc, ((UMASK & (nc + RC_UNIT)) |
|
||||
(nc & TC_MASK) | (v.stackPred & LMASK)))) {
|
||||
v.source = INVALID_ID; // enable cascaded timeouts
|
||||
v.phase = vp;
|
||||
U.unpark(v.owner);
|
||||
private boolean tryTrim(WorkQueue w, long c, int activePhase) {
|
||||
if (w != null) {
|
||||
int vp, i; WorkQueue[] vs; WorkQueue v;
|
||||
long nc = ((w.stackPred & LMASK) |
|
||||
((RC_MASK & c) | (TC_MASK & (c - TC_UNIT))));
|
||||
if (U.compareAndSetLong(this, CTL, c, nc)) {
|
||||
w.source = DROPPED;
|
||||
w.phase = activePhase;
|
||||
if ((vp = (int)nc) != 0 && (vs = queues) != null &&
|
||||
vs.length > (i = vp & SMASK) && (v = vs[i]) != null &&
|
||||
U.compareAndSetLong(this, CTL, // try to wake up next waiter
|
||||
nc, ((v.stackPred & LMASK) |
|
||||
((UMASK & (nc + RC_UNIT)) | (nc & TC_MASK))))) {
|
||||
v.source = INVALID_ID;
|
||||
v.phase = vp;
|
||||
U.unpark(v.owner);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return stat;
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -2226,7 +2240,7 @@ public class ForkJoinPool extends AbstractExecutorService
|
||||
(v = qs[i]) != null &&
|
||||
compareAndSetCtl(c, (c & UMASK) | (v.stackPred & LMASK))) {
|
||||
v.phase = sp;
|
||||
if (v.parking != 0)
|
||||
if (v.parking == sp)
|
||||
U.unpark(v.owner);
|
||||
stat = UNCOMPENSATE;
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user