diff --git a/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java b/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java index f289186e0ad..e83a92e5e6f 100644 --- a/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java +++ b/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java @@ -560,70 +560,89 @@ public class ForkJoinPool extends AbstractExecutorService * access (which is usually needed anyway). * * Signalling. Signals (in signalWork) cause new or reactivated - * workers to scan for tasks. SignalWork is invoked in two cases: - * (1) When a task is pushed onto an empty queue, and (2) When a - * worker takes a top-level task from a queue that has additional - * tasks. Together, these suffice in O(log(#threads)) steps to - * fully activate with at least enough workers, and ideally no - * more than required. This ideal is unobtainable: Callers do not - * know whether another worker will finish its current task and - * poll for others without need of a signal (which is otherwise an - * advantage of work-stealing vs other schemes), and also must - * conservatively estimate the triggering conditions of emptiness - * or non-emptiness; all of which usually cause more activations - * than necessary (see below). (Method signalWork is also used as - * failsafe in case of Thread failures in deregisterWorker, to - * activate or create a new worker to replace them). + * workers to scan for tasks. Method signalWork and its callers + * try to approximate the unattainable goal of having the right + * number of workers activated for the tasks at hand, but must err + * on the side of too many workers vs too few to avoid stalls: * - * Top-Level scheduling - * ==================== + * * If computations are purely tree structured, it suffices for + * every worker to activate another when it pushes a task into + * an empty queue, resulting in O(log(#threads)) steps to full + * activation. Emptiness must be conservatively approximated, + * which may result in unnecessary signals. Also, to reduce + * resource usages in some cases, at the expense of slower + * startup in others, activation of an idle thread is preferred + * over creating a new one, here and elsewhere. + * + * * At the other extreme, if "flat" tasks (those that do not in + * turn generate others) come in serially from only a single + * producer, each worker taking a task from a queue should + * propagate a signal if there are more tasks in that + * queue. This is equivalent to, but generally faster than, + * arranging the stealer take multiple tasks, re-pushing one or + * more on its own queue, and signalling (because its queue is + * empty), also resulting in logarithmic full activation + * time. If tasks do not not engage in unbounded loops based on + * the actions of other workers with unknown dependencies loop, + * this form of proagation can be limited to one signal per + * activation (phase change). We distinguish the cases by + * further signalling only if the task is an InterruptibleTask + * (see below), which are the only supported forms of task that + * may do so. + * + * * Because we don't know about usage patterns (or most commonly, + * mixtures), we use both approaches, which present even more + * opportunities to over-signal. (Failure to distinguish these + * cases in terms of submission methods was arguably an early + * design mistake.) Note that in either of these contexts, + * signals may be (and often are) unnecessary because active + * workers continue scanning after running tasks without the + * need to be signalled (which is one reason work stealing is + * often faster than alternatives), so additional workers + * aren't needed. + * + * * For rapidly branching tasks that require full pool resources, + * oversignalling is OK, because signalWork will soon have no + * more workers to create or reactivate. But for others (mainly + * externally submitted tasks), overprovisioning may cause very + * noticeable slowdowns due to contention and resource + * wastage. We reduce impact by deactivating workers when + * queues don't have accessible tasks, but reactivating and + * rescanning if other tasks remain. + * + * * Despite these, signal contention and overhead effects still + * occur during ramp-up and ramp-down of small computations. * * Scanning. Method runWorker performs top-level scanning for (and * execution of) tasks by polling a pseudo-random permutation of * the array (by starting at a given index, and using a constant * cyclically exhaustive stride.) It uses the same basic polling * method as WorkQueue.poll(), but restarts with a different - * permutation on each rescan. The pseudorandom generator need - * not have high-quality statistical properties in the long + * permutation on each invocation. The pseudorandom generator + * need not have high-quality statistical properties in the long * term. We use Marsaglia XorShifts, seeded with the Weyl sequence - * from ThreadLocalRandom probes, which are cheap and suffice. + * from ThreadLocalRandom probes, which are cheap and + * suffice. Each queue's polling attempts to avoid becoming stuck + * when other scanners/pollers stall. Scans do not otherwise + * explicitly take into account core affinities, loads, cache + * localities, etc, However, they do exploit temporal locality + * (which usually approximates these) by preferring to re-poll + * from the same queue after a successful poll before trying + * others, which also reduces bookkeeping, cache traffic, and + * scanning overhead. But it also reduces fairness, which is + * partially counteracted by giving up on detected interference + * (which also reduces contention when too many workers try to + * take small tasks from the same queue). * * Deactivation. When no tasks are found by a worker in runWorker, - * it invokes deactivate, that first deactivates (to an IDLE - * phase). Avoiding missed signals during deactivation requires a - * (conservative) rescan, reactivating if there may be tasks to - * poll. Because idle workers are often not yet blocked (parked), - * we use a WorkQueue field to advertise that a waiter actually - * needs unparking upon signal. - * - * When tasks are constructed as (recursive) DAGs, top-level - * scanning is usually infrequent, and doesn't encounter most - * of the following problems addressed by runWorker and awaitWork: - * - * Locality. Polls are organized into "runs", continuing until - * empty or contended, while also minimizing interference by - * postponing bookeeping to ends of runs. This may reduce - * fairness. - * - * Contention. When many workers try to poll few queues, they - * often collide, generating CAS failures and disrupting locality - * of workers already running their tasks. This also leads to - * stalls when tasks cannot be taken because other workers have - * not finished poll operations, which is detected by reading - * ahead in queue arrays. In both cases, workers restart scans in a - * way that approximates randomized backoff. - * - * Oversignalling. When many short top-level tasks are present in - * a small number of queues, the above signalling strategy may - * activate many more workers than needed, worsening locality and - * contention problems, while also generating more global - * contention (field ctl is CASed on every activation and - * deactivation). We filter out (both in runWorker and - * signalWork) attempted signals that are surely not needed - * because the signalled tasks are already taken. - * - * Shutdown and Quiescence - * ======================= + * it tries to deactivate()), giving up (and rescanning) on "ctl" + * contention. To avoid missed signals during deactivation, the + * method rescans and reactivates if there may have been a missed + * signal during deactivation. To reduce false-alarm reactivations + * while doing so, we scan multiple times (analogously to method + * quiescent()) before trying to reactivate. Because idle workers + * are often not yet blocked (parked), we use a WorkQueue field to + * advertise that a waiter actually needs unparking upon signal. * * Quiescence. Workers scan looking for work, giving up when they * don't find any, without being sure that none are available. @@ -873,7 +892,9 @@ public class ForkJoinPool extends AbstractExecutorService * shutdown, runners are interrupted so they can cancel. Since * external joining callers never run these tasks, they must await * cancellation by others, which can occur along several different - * paths. + * paths. The inability to rely on caller-runs may also require + * extra signalling (resulting in scanning and contention) so is + * done only conditionally in methods push and runworker. * * Across these APIs, rules for reporting exceptions for tasks * with results accessed via join() differ from those via get(), @@ -940,13 +961,9 @@ public class ForkJoinPool extends AbstractExecutorService * less-contended applications. To help arrange this, some * non-reference fields are declared as "long" even when ints or * shorts would suffice. For class WorkQueue, an - * embedded @Contended isolates the very busy top index, along - * with status and bookkeeping fields written (mostly) by owners, - * that otherwise interfere with reading array and base - * fields. There are other variables commonly contributing to - * false-sharing-related performance issues (including fields of - * class Thread), but we can't do much about this except try to - * minimize access. + * embedded @Contended region segregates fields most heavily + * updated by owners from those most commonly read by stealers or + * other management. * * Initial sizing and resizing of WorkQueue arrays is an even more * delicate tradeoff because the best strategy systematically @@ -955,11 +972,13 @@ public class ForkJoinPool extends AbstractExecutorService * direct false-sharing and indirect cases due to GC bookkeeping * (cardmarks etc), and reduce the number of resizes, which are * not especially fast because they require atomic transfers. - * Currently, arrays are initialized to be just large enough to - * avoid resizing in most tree-structured tasks, but grow rapidly - * until large. (Maintenance note: any changes in fields, queues, - * or their uses, or JVM layout policies, must be accompanied by - * re-evaluation of these placement and sizing decisions.) + * Currently, arrays for workers are initialized to be just large + * enough to avoid resizing in most tree-structured tasks, but + * larger for external queues where both false-sharing problems + * and the need for resizing are more common. (Maintenance note: + * any changes in fields, queues, or their uses, or JVM layout + * policies, must be accompanied by re-evaluation of these + * placement and sizing decisions.) * * Style notes * =========== @@ -1042,11 +1061,17 @@ public class ForkJoinPool extends AbstractExecutorService static final int DEFAULT_COMMON_MAX_SPARES = 256; /** - * Initial capacity of work-stealing queue array. + * Initial capacity of work-stealing queue array for workers. * Must be a power of two, at least 2. See above. */ static final int INITIAL_QUEUE_CAPACITY = 1 << 6; + /** + * Initial capacity of work-stealing queue array for external queues. + * Must be a power of two, at least 2. See above. + */ + static final int INITIAL_EXTERNAL_QUEUE_CAPACITY = 1 << 9; + // conversions among short, int, long static final int SMASK = 0xffff; // (unsigned) short bits static final long LMASK = 0xffffffffL; // lower 32 bits of long @@ -1186,11 +1211,11 @@ public class ForkJoinPool extends AbstractExecutorService @jdk.internal.vm.annotation.Contended("w") int stackPred; // pool stack (ctl) predecessor link @jdk.internal.vm.annotation.Contended("w") - volatile int parking; // nonzero if parked in awaitWork - @jdk.internal.vm.annotation.Contended("w") volatile int source; // source queue id (or DROPPED) @jdk.internal.vm.annotation.Contended("w") int nsteals; // number of steals from other queues + @jdk.internal.vm.annotation.Contended("w") + volatile int parking; // nonzero if parked in awaitWork // Support for atomic operations private static final Unsafe U; @@ -1223,11 +1248,11 @@ public class ForkJoinPool extends AbstractExecutorService */ WorkQueue(ForkJoinWorkerThread owner, int id, int cfg, boolean clearThreadLocals) { + array = new ForkJoinTask[owner == null ? + INITIAL_EXTERNAL_QUEUE_CAPACITY : + INITIAL_QUEUE_CAPACITY]; + this.owner = owner; this.config = (clearThreadLocals) ? cfg | CLEAR_TLS : cfg; - if ((this.owner = owner) == null) { - array = new ForkJoinTask[INITIAL_QUEUE_CAPACITY]; - phase = id | IDLE; - } } /** @@ -1254,27 +1279,27 @@ public class ForkJoinPool extends AbstractExecutorService * @throws RejectedExecutionException if array could not be resized */ final void push(ForkJoinTask task, ForkJoinPool pool, boolean internal) { - int s = top, b = base, m, cap, room; ForkJoinTask[] a, na; - if ((a = array) != null && (cap = a.length) > 0) { // else disabled - int k = (m = cap - 1) & s; - if ((room = m - (s - b)) >= 0) { + int s = top, b = base, m, cap, room; ForkJoinTask[] a; + if ((a = array) != null && (cap = a.length) > 0 && // else disabled + task != null) { + int pk = task.noUserHelp() + 1; // prev slot offset + if ((room = (m = cap - 1) - (s - b)) >= 0) { top = s + 1; - long pos = slotOffset(k); + long pos = slotOffset(m & s); if (!internal) U.putReference(a, pos, task); // inside lock else U.getAndSetReference(a, pos, task); // fully fenced - if (room == 0 && (na = growArray(a, cap, s)) != null) - k = ((a = na).length - 1) & s; // resize + if (room == 0) // resize + growArray(a, cap, s); } if (!internal) unlockPhase(); if (room < 0) throw new RejectedExecutionException("Queue capacity exceeded"); - if (pool != null && - (room == 0 || - U.getReferenceAcquire(a, slotOffset(m & (s - 1))) == null)) - pool.signalWork(a, k); // may have appeared empty + if ((room == 0 || U.getReferenceAcquire(a, slotOffset(m & (s - pk))) == null) && + pool != null) + pool.signalWork(); // may have appeared empty } } @@ -1283,12 +1308,11 @@ public class ForkJoinPool extends AbstractExecutorService * @param a old array * @param cap old array capacity * @param s current top - * @return new array, or null on failure */ - private ForkJoinTask[] growArray(ForkJoinTask[] a, int cap, int s) { - int newCap = (cap >= 1 << 16) ? cap << 1 : cap << 2; - ForkJoinTask[] newArray = null; + private void growArray(ForkJoinTask[] a, int cap, int s) { + int newCap = cap << 1; if (a != null && a.length == cap && cap > 0 && newCap > 0) { + ForkJoinTask[] newArray = null; try { newArray = new ForkJoinTask[newCap]; } catch (OutOfMemoryError ex) { @@ -1305,45 +1329,34 @@ public class ForkJoinPool extends AbstractExecutorService updateArray(newArray); // fully fenced } } - return newArray; } /** - * Takes next task, if one exists, in lifo order. + * Takes next task, if one exists, in order specified by mode, + * so acts as either local-pop or local-poll. Called only by owner. + * @param fifo nonzero if FIFO mode */ - private ForkJoinTask localPop() { + private ForkJoinTask nextLocalTask(int fifo) { ForkJoinTask t = null; - int s = top - 1, cap; long k; ForkJoinTask[] a; - if ((a = array) != null && (cap = a.length) > 0 && - U.getReference(a, k = slotOffset((cap - 1) & s)) != null && - (t = (ForkJoinTask)U.getAndSetReference(a, k, null)) != null) - updateTop(s); - return t; - } - - /** - * Takes next task, if one exists, in fifo order. - */ - private ForkJoinTask localPoll() { - ForkJoinTask t = null; - int p = top, cap; ForkJoinTask[] a; - if ((a = array) != null && (cap = a.length) > 0) { - for (int b = base; p - b > 0; ) { - int nb = b + 1; - long k = slotOffset((cap - 1) & b); - if (U.getReference(a, k) == null) { - if (nb == p) - break; // else base is lagging - while (b == (b = U.getIntAcquire(this, BASE))) - Thread.onSpinWait(); // spin to reduce memory traffic + ForkJoinTask[] a = array; + int b = base, p = top, cap; + if (p - b > 0 && a != null && (cap = a.length) > 0) { + for (int m = cap - 1, s, nb;;) { + if (fifo == 0 || (nb = b + 1) == p) { + if ((t = (ForkJoinTask)U.getAndSetReference( + a, slotOffset(m & (s = p - 1)), null)) != null) + updateTop(s); // else lost race for only task + break; } - else if ((t = (ForkJoinTask) - U.getAndSetReference(a, k, null)) != null) { + if ((t = (ForkJoinTask)U.getAndSetReference( + a, slotOffset(m & b), null)) != null) { updateBase(nb); break; } - else - b = base; + while (b == (b = U.getIntAcquire(this, BASE))) + Thread.onSpinWait(); // spin to reduce memory traffic + if (p - b <= 0) + break; } } return t; @@ -1351,9 +1364,10 @@ public class ForkJoinPool extends AbstractExecutorService /** * Takes next task, if one exists, using configured mode. + * (Always internal, never called for Common pool.) */ final ForkJoinTask nextLocalTask() { - return (config & FIFO) == 0 ? localPop() : localPoll(); + return nextLocalTask(config & FIFO); } /** @@ -1429,12 +1443,12 @@ public class ForkJoinPool extends AbstractExecutorService // specialized execution methods /** - * Runs the given task, as well as remaining local tasks + * Runs the given task, as well as remaining local tasks. */ final void topLevelExec(ForkJoinTask task, int fifo) { while (task != null) { task.doExec(); - task = (fifo != 0) ? localPoll() : localPop(); + task = nextLocalTask(fifo); } } @@ -1564,7 +1578,7 @@ public class ForkJoinPool extends AbstractExecutorService * Cancels all local tasks. Called only by owner. */ final void cancelTasks() { - for (ForkJoinTask t; (t = localPop()) != null; ) { + for (ForkJoinTask t; (t = nextLocalTask(0)) != null; ) { try { t.cancel(false); } catch (Throwable ignore) { @@ -1766,8 +1780,7 @@ public class ForkJoinPool extends AbstractExecutorService * @param w caller's WorkQueue */ final void registerWorker(WorkQueue w) { - if (w != null) { - w.array = new ForkJoinTask[INITIAL_QUEUE_CAPACITY]; + if (w != null && (runState & STOP) == 0L) { ThreadLocalRandom.localInit(); int seed = w.stackPred = ThreadLocalRandom.getProbe(); int phaseSeq = seed & ~((IDLE << 1) - 1); // initial phase tag @@ -1845,18 +1858,17 @@ public class ForkJoinPool extends AbstractExecutorService } if ((tryTerminate(false, false) & STOP) == 0L && phase != 0 && w != null && w.source != DROPPED) { + signalWork(); // possibly replace w.cancelTasks(); // clean queue - signalWork(null, 0); // possibly replace } if (ex != null) ForkJoinTask.rethrow(ex); } /** - * Releases an idle worker, or creates one if not enough exist, - * giving up if array a is nonnull and task at a[k] already taken. + * Releases an idle worker, or creates one if not enough exist. */ - final void signalWork(ForkJoinTask[] a, int k) { + final void signalWork() { int pc = parallelism; for (long c = ctl;;) { WorkQueue[] qs = queues; @@ -1872,15 +1884,13 @@ public class ForkJoinPool extends AbstractExecutorService if (sp == 0) { if ((short)(c >>> TC_SHIFT) >= pc) break; - nc = ((c + TC_UNIT) & TC_MASK) | ac; + nc = ((c + TC_UNIT) & TC_MASK); } else if ((v = w) == null) break; else - nc = (v.stackPred & LMASK) | (c & TC_MASK) | ac; - if (a != null && k < a.length && k >= 0 && a[k] == null) - break; - if (c == (c = ctl) && c == (c = compareAndExchangeCtl(c, nc))) { + nc = (v.stackPred & LMASK) | (c & TC_MASK); + if (c == (c = compareAndExchangeCtl(c, nc | ac))) { if (v == null) createWorker(); else { @@ -1963,196 +1973,178 @@ public class ForkJoinPool extends AbstractExecutorService * @param w caller's WorkQueue (may be null on failed initialization) */ final void runWorker(WorkQueue w) { - if (w != null && w.phase != 0) { // else unregistered - WorkQueue[] qs; - int r = w.stackPred; // seed from registerWorker - int fifo = (int)config & FIFO, rescans = 0, inactive = 0, taken = 0, n; - while ((runState & STOP) == 0L && (qs = queues) != null && - (n = qs.length) > 0) { - int i = r, step = (r >>> 16) | 1; + if (w != null) { + int phase = w.phase, r = w.stackPred; // seed from registerWorker + int fifo = w.config & FIFO, nsteals = 0, src = -1; + for (;;) { + WorkQueue[] qs; r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift - scan: for (int j = n; j != 0; --j, i += step) { - WorkQueue q; int qid; - if ((q = qs[qid = i & (n - 1)]) != null) { - ForkJoinTask[] a; int cap; // poll queue - while ((a = q.array) != null && (cap = a.length) > 0) { - int b, nb, nk; long bp; ForkJoinTask t; + if ((runState & STOP) != 0L || (qs = queues) == null) + break; + int n = qs.length, i = r, step = (r >>> 16) | 1; + boolean rescan = false; + scan: for (int l = n; l > 0; --l, i += step) { // scan queues + int j, cap; WorkQueue q; ForkJoinTask[] a; + if ((q = qs[j = i & (n - 1)]) != null && + (a = q.array) != null && (cap = a.length) > 0) { + for (int m = cap - 1, pb = -1, b = q.base;;) { + ForkJoinTask t; long k; t = (ForkJoinTask)U.getReferenceAcquire( - a, bp = slotOffset((cap - 1) & (b = q.base))); - long np = slotOffset(nk = (nb = b + 1) & (cap - 1)); - if (q.base == b) { // else inconsistent - if (t == null) { - if (q.array == a) { // else resized - if (rescans > 0) // ran or stalled - break scan; - if (U.getReference(a, np) == null && - (rescans >= 0 || - (U.getReferenceAcquire(a, bp) == null && - q.top == q.base))) - break; - rescans = 1; // may be stalled + a, k = slotOffset(m & b)); + if (b != (b = q.base) || t == null || + !U.compareAndSetReference(a, k, t, null)) { + if (a[b & m] == null) { + if (rescan) // end of run + break scan; + if (a[(b + 1) & m] == null && + a[(b + 2) & m] == null) { + break; // probably empty } - } - else if (inactive != 0) { - if ((inactive = tryReactivate(w)) != 0) { - rescans = 1; // can't take yet + if (pb == (pb = b)) { // track progress + rescan = true; // stalled; reorder scan break scan; } } - else if (U.compareAndSetReference(a, bp, t, null)) { - q.base = nb; - Object nt = U.getReferenceAcquire(a, np); - w.source = qid; - rescans = 1; - ++taken; - if (nt != null && // confirm a[nk] - U.getReferenceAcquire(a, np) == nt) - signalWork(a, nk); // propagate - w.topLevelExec(t, fifo); - } + } + else { + boolean propagate; + int nb = q.base = b + 1, prevSrc = src; + w.nsteals = ++nsteals; + w.source = src = j; // volatile + rescan = true; + int nh = t.noUserHelp(); + if (propagate = + (prevSrc != src || nh != 0) && a[nb & m] != null) + signalWork(); + w.topLevelExec(t, fifo); + if ((b = q.base) != nb && !propagate) + break scan; // reduce interference } } } } - if (rescans >= 0) - --rescans; - else if (inactive == 0) { - if ((inactive = deactivate(w, taken)) != 0) - taken = 0; + if (!rescan) { + if (((phase = deactivate(w, phase)) & IDLE) != 0) + break; + src = -1; // re-enable propagation } - else if (awaitWork(w) == 0) - inactive = rescans = 0; - else - break; } } } /** - * Tries to deactivate worker, keeping active on contention + * Deactivates and if necessary awaits signal or termination. * - * @param w the work queue - * @param taken number of stolen tasks since last deactivation - * @return nonzero if inactive + * @param w the worker + * @param phase current phase + * @return current phase, with IDLE set if worker should exit */ - private int deactivate(WorkQueue w, int taken) { - int inactive = 0, phase; - if (w != null && (inactive = (phase = w.phase) & IDLE) == 0) { - long sp = (phase + (IDLE << 1)) & LMASK, pc, c; - w.phase = phase | IDLE; - w.stackPred = (int)(pc = ctl); // set ctl stack link - if (!compareAndSetCtl( // try to enqueue - pc, c = ((pc - RC_UNIT) & UMASK) | sp)) - w.phase = phase; // back out on contention - else { - if (taken != 0) { - w.nsteals += taken; - if ((w.config & CLEAR_TLS) != 0 && - (Thread.currentThread() instanceof ForkJoinWorkerThread f)) - f.resetThreadLocals(); // (instanceof check always true) - } - if (((c & RC_MASK) == 0L && quiescent() > 0) || taken == 0) - inactive = w.phase & IDLE; // check quiescent termination - else { // spin for approx 1 scan cost - int tc = (short)(c >>> TC_SHIFT); - int spins = Math.max((tc << 1) + tc, SPIN_WAITS); - while ((inactive = w.phase & IDLE) != 0 && --spins != 0) - Thread.onSpinWait(); - } - } - } - return inactive; - } + private int deactivate(WorkQueue w, int phase) { + if (w == null) // currently impossible + return IDLE; + int p = phase | IDLE, activePhase = phase + (IDLE << 1); + long pc = ctl, qc = (activePhase & LMASK) | ((pc - RC_UNIT) & UMASK); + int sp = w.stackPred = (int)pc; // set ctl stack link + w.phase = p; + if (!compareAndSetCtl(pc, qc)) // try to enqueue + return w.phase = phase; // back out on possible signal + int ac = (short)(qc >>> RC_SHIFT), n; long e; WorkQueue[] qs; + if (((e = runState) & STOP) != 0L || + ((e & SHUTDOWN) != 0L && ac == 0 && quiescent() > 0) || + (qs = queues) == null || (n = qs.length) <= 0) + return IDLE; // terminating - /** - * Reactivates worker w if it is currently top of ctl stack - * - * @param w the work queue - * @return 0 if now active - */ - private int tryReactivate(WorkQueue w) { - int inactive = 0; - if (w != null) { // always true; hoist checks - int sp = w.stackPred, phase, activePhase; long c; - if ((inactive = (phase = w.phase) & IDLE) != 0 && - (int)(c = ctl) == (activePhase = phase + IDLE) && - compareAndSetCtl(c, (sp & LMASK) | ((c + RC_UNIT) & UMASK))) { - w.phase = activePhase; - inactive = 0; - } + for (int prechecks = Math.min(ac, 2), // reactivation threshold + k = Math.max(n + (n << 1), SPIN_WAITS << 1);;) { + WorkQueue q; int cap; ForkJoinTask[] a; long c; + if (w.phase == activePhase) + return activePhase; + if (--k < 0) + return awaitWork(w, p); // block, drop, or exit + if ((q = qs[k & (n - 1)]) == null) + Thread.onSpinWait(); + else if ((a = q.array) != null && (cap = a.length) > 0 && + a[q.base & (cap - 1)] != null && --prechecks < 0 && + (int)(c = ctl) == activePhase && + compareAndSetCtl(c, (sp & LMASK) | ((c + RC_UNIT) & UMASK))) + return w.phase = activePhase; // reactivate } - return inactive; } /** * Awaits signal or termination. * * @param w the work queue - * @return 0 if now active + * @param p current phase (known to be idle) + * @return current phase, with IDLE set if worker should exit */ - private int awaitWork(WorkQueue w) { - int inactive = 0, phase; - if (w != null) { // always true; hoist checks - long waitTime = (w.source == INVALID_ID) ? 0L : keepAlive; - if ((inactive = (phase = w.phase) & IDLE) != 0) { + private int awaitWork(WorkQueue w, int p) { + if (w != null) { + ForkJoinWorkerThread t; long deadline; + if ((w.config & CLEAR_TLS) != 0 && (t = w.owner) != null) + t.resetThreadLocals(); // clear before reactivate + if ((ctl & RC_MASK) > 0L) + deadline = 0L; + else if ((deadline = + (((w.source != INVALID_ID) ? keepAlive : TIMEOUT_SLOP)) + + System.currentTimeMillis()) == 0L) + deadline = 1L; // avoid zero + int activePhase = p + IDLE; + if ((p = w.phase) != activePhase && (runState & STOP) == 0L) { LockSupport.setCurrentBlocker(this); - int activePhase = phase + IDLE; - for (long deadline = 0L;;) { - Thread.interrupted(); // clear status + w.parking = 1; // enable unpark + while ((p = w.phase) != activePhase) { + boolean trimmable = false; int trim; + Thread.interrupted(); // clear status if ((runState & STOP) != 0L) break; - boolean trimmable = false; // use timed wait if trimmable - long d = 0L, c; - if (((c = ctl) & RC_MASK) == 0L && (int)c == activePhase) { - long now = System.currentTimeMillis(); - if (deadline == 0L) - deadline = waitTime + now; - if (deadline - now <= TIMEOUT_SLOP) { - if (tryTrim(w, c, activePhase)) - break; - continue; // lost race to trim - } - d = deadline; - trimmable = true; + if (deadline != 0L) { + if ((trim = tryTrim(w, p, deadline)) > 0) + break; + else if (trim < 0) + deadline = 0L; + else + trimmable = true; } - w.parking = 1; // enable unpark and recheck - if ((inactive = w.phase & IDLE) != 0) - U.park(trimmable, d); - w.parking = 0; // close unpark window - if (inactive == 0 || (inactive = w.phase & IDLE) == 0) - break; + U.park(trimmable, deadline); } + w.parking = 0; LockSupport.setCurrentBlocker(null); } } - return inactive; + return p; } /** * Tries to remove and deregister worker after timeout, and release - * another to do the same unless new tasks are found. + * another to do the same. + * @return > 0: trimmed, < 0 : not trimmable, else 0 */ - private boolean tryTrim(WorkQueue w, long c, int activePhase) { - if (w != null) { - int vp, i; WorkQueue[] vs; WorkQueue v; - long nc = ((w.stackPred & LMASK) | - ((RC_MASK & c) | (TC_MASK & (c - TC_UNIT)))); - if (compareAndSetCtl(c, nc)) { - w.source = DROPPED; - w.phase = activePhase; - if ((vp = (int)nc) != 0 && (vs = queues) != null && - vs.length > (i = vp & SMASK) && (v = vs[i]) != null && - compareAndSetCtl( // try to wake up next waiter - nc, ((v.stackPred & LMASK) | - ((UMASK & (nc + RC_UNIT)) | (nc & TC_MASK))))) { - v.source = INVALID_ID; // enable cascaded timeouts - v.phase = vp; - U.unpark(v.owner); - } - return true; + private int tryTrim(WorkQueue w, int phase, long deadline) { + long c, nc; int stat, activePhase, vp, i; WorkQueue[] vs; WorkQueue v; + if ((activePhase = phase + IDLE) != (int)(c = ctl) || w == null) + stat = -1; // no longer ctl top + else if (deadline - System.currentTimeMillis() >= TIMEOUT_SLOP) + stat = 0; // spurious wakeup + else if (!compareAndSetCtl( + c, nc = ((w.stackPred & LMASK) | (RC_MASK & c) | + (TC_MASK & (c - TC_UNIT))))) + stat = -1; // lost race to signaller + else { + stat = 1; + w.source = DROPPED; + w.phase = activePhase; + if ((vp = (int)nc) != 0 && (vs = queues) != null && + vs.length > (i = vp & SMASK) && (v = vs[i]) != null && + compareAndSetCtl( // try to wake up next waiter + nc, ((UMASK & (nc + RC_UNIT)) | + (nc & TC_MASK) | (v.stackPred & LMASK)))) { + v.source = INVALID_ID; // enable cascaded timeouts + v.phase = vp; + U.unpark(v.owner); } } - return false; + return stat; } /** @@ -2569,35 +2561,52 @@ public class ForkJoinPool extends AbstractExecutorService /** * Finds and locks a WorkQueue for an external submitter, or - * throws RejectedExecutionException if shutdown + * throws RejectedExecutionException if shutdown or terminating. + * @param r current ThreadLocalRandom.getProbe() value * @param rejectOnShutdown true if RejectedExecutionException - * should be thrown when shutdown + * should be thrown when shutdown (else only if terminating) */ - final WorkQueue externalSubmissionQueue(boolean rejectOnShutdown) { - int r; - if ((r = ThreadLocalRandom.getProbe()) == 0) { - ThreadLocalRandom.localInit(); // initialize caller's probe + private WorkQueue submissionQueue(int r, boolean rejectOnShutdown) { + int reuse; // nonzero if prefer create + if ((reuse = r) == 0) { + ThreadLocalRandom.localInit(); // initialize caller's probe r = ThreadLocalRandom.getProbe(); } - for (;;) { - WorkQueue q; WorkQueue[] qs; int n, id, i; - if ((qs = queues) == null || (n = qs.length) <= 0) + for (int probes = 0; ; ++probes) { + int n, i, id; WorkQueue[] qs; WorkQueue q; + if ((qs = queues) == null) + break; + if ((n = qs.length) <= 0) break; if ((q = qs[i = (id = r & EXTERNAL_ID_MASK) & (n - 1)]) == null) { - WorkQueue newq = new WorkQueue(null, id, 0, false); - lockRunState(); - if (qs[i] == null && queues == qs) - q = qs[i] = newq; // else lost race to install + WorkQueue w = new WorkQueue(null, id, 0, false); + w.phase = id; + boolean reject = ((lockRunState() & SHUTDOWN) != 0 && + rejectOnShutdown); + if (!reject && queues == qs && qs[i] == null) + q = qs[i] = w; // else lost race to install unlockRunState(); - } - if (q != null && q.tryLockPhase()) { - if (rejectOnShutdown && (runState & SHUTDOWN) != 0L) { - q.unlockPhase(); // check while q lock held + if (q != null) + return q; + if (reject) break; - } - return q; + reuse = 0; } - r = ThreadLocalRandom.advanceProbe(r); // move + if (reuse == 0 || !q.tryLockPhase()) { // move index + if (reuse == 0) { + if (probes >= n >> 1) + reuse = r; // stop prefering free slot + } + else if (q != null) + reuse = 0; // probe on collision + r = ThreadLocalRandom.advanceProbe(r); + } + else if (rejectOnShutdown && (runState & SHUTDOWN) != 0L) { + q.unlockPhase(); // check while q lock held + break; + } + else + return q; } throw new RejectedExecutionException(); } @@ -2611,12 +2620,24 @@ public class ForkJoinPool extends AbstractExecutorService } else { // find and lock queue internal = false; - q = externalSubmissionQueue(true); + q = submissionQueue(ThreadLocalRandom.getProbe(), true); } q.push(task, signalIfEmpty ? this : null, internal); return task; } + /** + * Returns queue for an external submission, bypassing call to + * submissionQueue if already established and unlocked. + */ + final WorkQueue externalSubmissionQueue(boolean rejectOnShutdown) { + WorkQueue[] qs; WorkQueue q; int n; + int r = ThreadLocalRandom.getProbe(); + return (((qs = queues) != null && (n = qs.length) > 0 && + (q = qs[r & EXTERNAL_ID_MASK & (n - 1)]) != null && r != 0 && + q.tryLockPhase()) ? q : submissionQueue(r, rejectOnShutdown)); + } + /** * Returns queue for an external thread, if one exists that has * possibly ever submitted to the given pool (nonzero probe), or @@ -3295,7 +3316,7 @@ public class ForkJoinPool extends AbstractExecutorService if ((config & PRESET_SIZE) != 0) throw new UnsupportedOperationException("Cannot override System property"); if ((prevSize = getAndSetParallelism(size)) < size) - signalWork(null, 0); // trigger worker activation + signalWork(); // trigger worker activation return prevSize; } diff --git a/test/jdk/java/util/concurrent/forkjoin/Starvation.java b/test/jdk/java/util/concurrent/forkjoin/Starvation.java index d864fa0ba33..8397e852ffa 100644 --- a/test/jdk/java/util/concurrent/forkjoin/Starvation.java +++ b/test/jdk/java/util/concurrent/forkjoin/Starvation.java @@ -28,7 +28,6 @@ */ import java.util.concurrent.Callable; import java.util.concurrent.ForkJoinPool; -import java.util.concurrent.ForkJoinTask; import java.util.concurrent.atomic.AtomicInteger; public class Starvation { @@ -43,7 +42,7 @@ public class Starvation { while (count.get() == c) Thread.onSpinWait(); return null; }}; - static void testSubmitExternalCallable() throws Exception { + public static void main(String[] args) throws Exception { try (var pool = new ForkJoinPool(2)) { for (int i = 0; i < 100_000; i++) { var future1 = pool.submit(new AwaitCount(i)); @@ -54,21 +53,4 @@ public class Starvation { } } } - - static void testSubmitAdaptedCallable() throws Exception { - try (var pool = new ForkJoinPool(2)) { - for (int i = 0; i < 100_000; i++) { - var future1 = pool.submit(new AwaitCount(i)); - var future2 = pool.submit(ForkJoinTask.adapt(noop)); - future2.get(); - count.set(i + 1); - future1.get(); - } - } - } - - public static void main(String[] args) throws Exception { - testSubmitExternalCallable(); - testSubmitAdaptedCallable(); - } }