diff --git a/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java b/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java index aa173b9f364..2dc1dc7556c 100644 --- a/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java +++ b/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java @@ -1215,7 +1215,7 @@ public class ForkJoinPool extends AbstractExecutorService @jdk.internal.vm.annotation.Contended("w") int nsteals; // number of steals from other queues @jdk.internal.vm.annotation.Contended("w") - volatile int parking; // nonzero if parked in awaitWork + volatile int parking; // next phase if parked in awaitWork // Support for atomic operations private static final Unsafe U; @@ -1338,24 +1338,24 @@ public class ForkJoinPool extends AbstractExecutorService private ForkJoinTask nextLocalTask(int fifo) { ForkJoinTask t = null; ForkJoinTask[] a = array; - int b = base, p = top, cap; - if (p - b > 0 && a != null && (cap = a.length) > 0) { - for (int m = cap - 1, s, nb;;) { - if (fifo == 0 || (nb = b + 1) == p) { - if ((t = (ForkJoinTask)U.getAndSetReference( - a, slotOffset(m & (s = p - 1)), null)) != null) - updateTop(s); // else lost race for only task - break; - } + int b = base, s = top - 1, cap; + if (a != null && s - b >= 0 && (cap = a.length) > 0) { + if (fifo == 0) { if ((t = (ForkJoinTask)U.getAndSetReference( - a, slotOffset(m & b), null)) != null) { - updateBase(nb); - break; - } - while (b == (b = U.getIntAcquire(this, BASE))) - Thread.onSpinWait(); // spin to reduce memory traffic - if (p - b <= 0) - break; + a, slotOffset((cap - 1) & s), null)) != null) + updateTop(s); + } else { + do { + if ((t = (ForkJoinTask)U.getAndSetReference( + a, slotOffset((cap - 1) & b), null)) != null) { + updateBase(b + 1); + break; + } + if (b == s) + break; + while (b == (b = U.getIntAcquire(this, BASE))) + Thread.onSpinWait(); + } while (s - b >= 0); } } return t; @@ -1894,7 +1894,7 @@ public class ForkJoinPool extends AbstractExecutorService createWorker(); else { v.phase = sp; - if (v.parking != 0) + if (v.parking == sp) U.unpark(v.owner); } break; @@ -1915,7 +1915,7 @@ public class ForkJoinPool extends AbstractExecutorService c, ((UMASK & (c + RC_UNIT)) | (c & TC_MASK) | (v.stackPred & LMASK))))) { v.phase = sp; - if (v.parking != 0) + if (v.parking == sp) U.unpark(v.owner); } } @@ -1983,46 +1983,51 @@ public class ForkJoinPool extends AbstractExecutorService int n = qs.length, i = r, step = (r >>> 16) | 1; boolean rescan = false; scan: for (int l = n; l > 0; --l, i += step) { // scan queues - int j, cap; WorkQueue q; ForkJoinTask[] a; - if ((q = qs[j = i & (n - 1)]) != null && + int qid, cap; WorkQueue q; ForkJoinTask[] a; + if ((q = qs[qid = i & (n - 1)]) != null && (a = q.array) != null && (cap = a.length) > 0) { for (int m = cap - 1, pb = -1, b = q.base;;) { ForkJoinTask t; long k; t = (ForkJoinTask)U.getReferenceAcquire( a, k = slotOffset(m & b)); - if (b != (b = q.base) || t == null || - !U.compareAndSetReference(a, k, t, null)) { + if (b != (b = q.base)) { // inconsistent / busy + if (src != qid) { + rescan = true; // reduce interference + break scan; + } + } + else if (t == null || + !U.compareAndSetReference(a, k, t, null)) { if (a[b & m] == null) { - if (rescan) // end of run - break scan; if (a[(b + 1) & m] == null && a[(b + 2) & m] == null) { - break; // probably empty + if (rescan) // end of run + break scan; + break; // probably empty } - if (pb == (pb = b)) { // track progress - rescan = true; // stalled; reorder scan + if (src != qid || pb == (pb = b)) { + rescan = true; // reorder scan break scan; } } } else { boolean propagate; - int prevSrc = src, nb; - long np = slotOffset((nb = b + 1) & m); + long np = slotOffset((pb = b + 1) & m); boolean more = (U.getReferenceVolatile(a, np) != null); - q.base = nb; + q.base = pb; w.nsteals = ++nsteals; - w.source = src = j; // volatile + int prevSrc = src; + w.source = src = qid; // volatile rescan = true; if (propagate = (more && - (prevSrc != src || - ((j & 1) == 0) && - (fifo != 0 || t.noUserHelp() != 0)) && - U.getReference(a, np) != null)) + (prevSrc != qid || + ((qid & 1) == 0 && + (fifo != 0 || t.noUserHelp() != 0))))) signalWork(); w.topLevelExec(t, fifo); - if ((b = q.base) != nb && !propagate) + if ((b = q.base) != pb && !propagate) break scan; // reduce interference } } @@ -2058,120 +2063,129 @@ public class ForkJoinPool extends AbstractExecutorService else if (qac < (pc & RC_MASK)) return w.phase = phase; // back out if lost to signal } - int ac = (short)(qac >>> RC_SHIFT), n; long e; WorkQueue[] qs; + long e; WorkQueue[] qs; int n; if (((e = runState) & STOP) != 0L || - ((e & SHUTDOWN) != 0L && ac == 0 && quiescent() > 0) || + ((e & SHUTDOWN) != 0L && qac == 0L && quiescent() > 0) || (qs = queues) == null || (n = qs.length) <= 0) - return IDLE; // terminating - for (int prechecks = Math.min(ac, 2), // reactivation threshold - k = Math.max(n + (n << 1), SPIN_WAITS << 1), - i = 0; k-- > 0 ; ++i) { + return IDLE; // terminating + int prechecks = Math.min((short)(qac >>> RC_SHIFT), 2); + for (int k = n + (n << 1), i = activePhase; k-- > 0 ; ++i) { WorkQueue q; int cap; ForkJoinTask[] a; - if (w.phase == activePhase) - return activePhase; - if ((q = qs[i & (n - 1)]) == null) - Thread.onSpinWait(); - else if ((a = q.array) != null && (cap = a.length) > 0 && - a[q.base & (cap - 1)] != null) { - WorkQueue v; int sp, j; long c; - if (prechecks > 0) - --prechecks; - else if (((c = ctl) & RC_MASK) <= qac && (sp = (int)c) != 0 && - (j = sp & SMASK) < n && (v = qs[j]) != null) { - long nc = (v.stackPred & LMASK) | ((c + RC_UNIT) & UMASK); - if (sp != activePhase && w.phase == activePhase) - return activePhase; - if ((sp == activePhase || k < n) && - U.compareAndSetLong(this, CTL, c, nc)) { - v.phase = sp; - if (sp == activePhase) + if ((q = qs[i & (n - 1)]) != null) { + if (w.phase == activePhase) + return activePhase; + if ((a = q.array) != null && (cap = a.length) > 0 && + a[q.base & (cap - 1)] != null) { + WorkQueue v; int sp, j; long c; + if (prechecks > 0) + --prechecks; + else if (((c = ctl) & RC_MASK) <= qac && (sp = (int)c) != 0 && + (j = sp & SMASK) < n && (v = qs[j]) != null) { + long nc = (v.stackPred & LMASK) | ((c + RC_UNIT) & UMASK); + if (sp != activePhase && w.phase == activePhase) return activePhase; - if (v.parking != 0) - U.unpark(v.owner); - break; + if ((sp == activePhase || k < n) && + U.compareAndSetLong(this, CTL, c, nc)) { + v.phase = sp; + if (sp == activePhase) + return activePhase; + if (v.parking == sp) + U.unpark(v.owner); + break; + } } + if (k < n) + k = n; // ensure re-encounter } - if (k < n) - k = n; // ensure re-encounter } } - return (((phase = w.phase) & IDLE) == 0) ? phase : awaitWork(w, phase); + return ((phase = w.phase) == activePhase) ? activePhase : awaitWork(w, phase); } /** * Awaits signal or termination. * * @param w the work queue - * @param p current phase (known to be idle) + * @param phase current phase (known to be idle) * @return current phase, with IDLE set if worker should exit */ - private int awaitWork(WorkQueue w, int p) { + private int awaitWork(WorkQueue w, int phase) { if (w != null) { - ForkJoinWorkerThread t; long deadline; - if ((w.config & CLEAR_TLS) != 0 && (t = w.owner) != null) - t.resetThreadLocals(); // clear before reactivate - if ((ctl & RC_MASK) > 0L) - deadline = 0L; - else if ((deadline = - (((w.source != INVALID_ID) ? keepAlive : TIMEOUT_SLOP)) + - System.currentTimeMillis()) == 0L) - deadline = 1L; // avoid zero - int activePhase = p + IDLE; - if ((p = w.phase) != activePhase && (runState & STOP) == 0L) { - LockSupport.setCurrentBlocker(this); - w.parking = 1; // enable unpark - while ((p = w.phase) != activePhase) { - boolean trimmable = false; int trim; - Thread.interrupted(); // clear status - if ((runState & STOP) != 0L) - break; - if (deadline != 0L) { - if ((trim = tryTrim(w, p, deadline)) > 0) + int activePhase = phase + IDLE, parking = 0; + long deadline = 0L; + boolean trim = (w.source == INVALID_ID); + if ((w.config & CLEAR_TLS) != 0 && + (Thread.currentThread() instanceof ForkJoinWorkerThread f)) + f.resetThreadLocals(); // (instanceof check always true) + while ((phase = w.phase) != activePhase) { + boolean trimmable = false; // true if at ctl head and quiescent + long d = 0L, c; + Thread.interrupted(); // clear status + if ((runState & STOP) != 0L) + break; + if ((int)(c = ctl) == activePhase) { + if ((c & RC_MASK) == 0L) { + long now = System.currentTimeMillis(); + if (deadline == 0L) + d = deadline = now + keepAlive; + else if ((d = deadline) - now <= TIMEOUT_SLOP) + trim = true; + if (trim && tryTrim(w, c, activePhase)) + break; + trim = false; + trimmable = true; + } + if (parking == 0) { // spin at head for approx 1 scan cost + int tc = ((short)(c >>> TC_SHIFT) | 1) & SMASK; + int spins = tc + (tc << 1); + while ((phase = w.phase) != activePhase && --spins != 0) + Thread.onSpinWait(); + if (phase == activePhase) break; - else if (trim < 0) - deadline = 0L; - else - trimmable = true; } - U.park(trimmable, deadline); } + if (parking == 0) { // enable unpark + LockSupport.setCurrentBlocker(this); + w.parking = activePhase; + if ((phase = w.phase) == activePhase) + break; + } + U.park(trimmable, d); + } + if (parking != 0) { w.parking = 0; LockSupport.setCurrentBlocker(null); } } - return p; + return phase; } /** * Tries to remove and deregister worker after timeout, and release - * another to do the same. - * @return > 0: trimmed, < 0 : not trimmable, else 0 + * another to do the same unless new tasks are found. + * @return true if trimmed */ - private int tryTrim(WorkQueue w, int phase, long deadline) { - long c, nc; int stat, activePhase, vp, i; WorkQueue[] vs; WorkQueue v; - if ((activePhase = phase + IDLE) != (int)(c = ctl) || w == null) - stat = -1; // no longer ctl top - else if (deadline - System.currentTimeMillis() >= TIMEOUT_SLOP) - stat = 0; // spurious wakeup - else if (!compareAndSetCtl( - c, nc = ((w.stackPred & LMASK) | (RC_MASK & c) | - (TC_MASK & (c - TC_UNIT))))) - stat = -1; // lost race to signaller - else { - stat = 1; - w.source = DROPPED; - w.phase = activePhase; - if ((vp = (int)nc) != 0 && (vs = queues) != null && - vs.length > (i = vp & SMASK) && (v = vs[i]) != null && - compareAndSetCtl( // try to wake up next waiter - nc, ((UMASK & (nc + RC_UNIT)) | - (nc & TC_MASK) | (v.stackPred & LMASK)))) { - v.source = INVALID_ID; // enable cascaded timeouts - v.phase = vp; - U.unpark(v.owner); + private boolean tryTrim(WorkQueue w, long c, int activePhase) { + if (w != null) { + int vp, i; WorkQueue[] vs; WorkQueue v; + long nc = ((w.stackPred & LMASK) | + ((RC_MASK & c) | (TC_MASK & (c - TC_UNIT)))); + if (U.compareAndSetLong(this, CTL, c, nc)) { + w.source = DROPPED; + w.phase = activePhase; + if ((vp = (int)nc) != 0 && (vs = queues) != null && + vs.length > (i = vp & SMASK) && (v = vs[i]) != null && + U.compareAndSetLong(this, CTL, // try to wake up next waiter + nc, ((v.stackPred & LMASK) | + ((UMASK & (nc + RC_UNIT)) | (nc & TC_MASK))))) { + v.source = INVALID_ID; + v.phase = vp; + U.unpark(v.owner); + } + return true; } } - return stat; + return false; } /** @@ -2226,7 +2240,7 @@ public class ForkJoinPool extends AbstractExecutorService (v = qs[i]) != null && compareAndSetCtl(c, (c & UMASK) | (v.stackPred & LMASK))) { v.phase = sp; - if (v.parking != 0) + if (v.parking == sp) U.unpark(v.owner); stat = UNCOMPENSATE; }