More merge fixes

This commit is contained in:
Doug Lea 2026-03-09 17:11:22 -04:00
parent 7a1697eda4
commit caeb66bbff

View File

@ -561,89 +561,70 @@ public class ForkJoinPool extends AbstractExecutorService
* access (which is usually needed anyway).
*
* Signalling. Signals (in signalWork) cause new or reactivated
* workers to scan for tasks. Method signalWork and its callers
* try to approximate the unattainable goal of having the right
* number of workers activated for the tasks at hand, but must err
* on the side of too many workers vs too few to avoid stalls:
* workers to scan for tasks. SignalWork is invoked in two cases:
* (1) When a task is pushed onto an empty queue, and (2) When a
* worker takes a top-level task from a queue that has additional
* tasks. Together, these suffice in O(log(#threads)) steps to
* fully activate with at least enough workers, and ideally no
* more than required. This ideal is unobtainable: Callers do not
* know whether another worker will finish its current task and
* poll for others without need of a signal (which is otherwise an
* advantage of work-stealing vs other schemes), and also must
* conservatively estimate the triggering conditions of emptiness
* or non-emptiness; all of which usually cause more activations
* than necessary (see below). (Method signalWork is also used as
* failsafe in case of Thread failures in deregisterWorker, to
* activate or create a new worker to replace them).
*
* * If computations are purely tree structured, it suffices for
* every worker to activate another when it pushes a task into
* an empty queue, resulting in O(log(#threads)) steps to full
* activation. Emptiness must be conservatively approximated,
* which may result in unnecessary signals. Also, to reduce
* resource usages in some cases, at the expense of slower
* startup in others, activation of an idle thread is preferred
* over creating a new one, here and elsewhere.
*
* * At the other extreme, if "flat" tasks (those that do not in
* turn generate others) come in serially from only a single
* producer, each worker taking a task from a queue should
* propagate a signal if there are more tasks in that
* queue. This is equivalent to, but generally faster than,
* arranging the stealer take multiple tasks, re-pushing one or
* more on its own queue, and signalling (because its queue is
* empty), also resulting in logarithmic full activation
* time. If tasks do not not engage in unbounded loops based on
* the actions of other workers with unknown dependencies loop,
* this form of proagation can be limited to one signal per
* activation (phase change). We distinguish the cases by
* further signalling only if the task is an InterruptibleTask
* (see below), which are the only supported forms of task that
* may do so.
*
* * Because we don't know about usage patterns (or most commonly,
* mixtures), we use both approaches, which present even more
* opportunities to over-signal. (Failure to distinguish these
* cases in terms of submission methods was arguably an early
* design mistake.) Note that in either of these contexts,
* signals may be (and often are) unnecessary because active
* workers continue scanning after running tasks without the
* need to be signalled (which is one reason work stealing is
* often faster than alternatives), so additional workers
* aren't needed.
*
* * For rapidly branching tasks that require full pool resources,
* oversignalling is OK, because signalWork will soon have no
* more workers to create or reactivate. But for others (mainly
* externally submitted tasks), overprovisioning may cause very
* noticeable slowdowns due to contention and resource
* wastage. We reduce impact by deactivating workers when
* queues don't have accessible tasks, but reactivating and
* rescanning if other tasks remain.
*
* * Despite these, signal contention and overhead effects still
* occur during ramp-up and ramp-down of small computations.
* Top-Level scheduling
* ====================
*
* Scanning. Method runWorker performs top-level scanning for (and
* execution of) tasks by polling a pseudo-random permutation of
* the array (by starting at a given index, and using a constant
* cyclically exhaustive stride.) It uses the same basic polling
* method as WorkQueue.poll(), but restarts with a different
* permutation on each invocation. The pseudorandom generator
* need not have high-quality statistical properties in the long
* permutation on each rescan. The pseudorandom generator need
* not have high-quality statistical properties in the long
* term. We use Marsaglia XorShifts, seeded with the Weyl sequence
* from ThreadLocalRandom probes, which are cheap and
* suffice. Each queue's polling attempts to avoid becoming stuck
* when other scanners/pollers stall. Scans do not otherwise
* explicitly take into account core affinities, loads, cache
* localities, etc, However, they do exploit temporal locality
* (which usually approximates these) by preferring to re-poll
* from the same queue after a successful poll before trying
* others, which also reduces bookkeeping, cache traffic, and
* scanning overhead. But it also reduces fairness, which is
* partially counteracted by giving up on detected interference
* (which also reduces contention when too many workers try to
* take small tasks from the same queue).
* from ThreadLocalRandom probes, which are cheap and suffice.
*
* Deactivation. When no tasks are found by a worker in runWorker,
* it tries to deactivate()), giving up (and rescanning) on "ctl"
* contention. To avoid missed signals during deactivation, the
* method rescans and reactivates if there may have been a missed
* signal during deactivation. To reduce false-alarm reactivations
* while doing so, we scan multiple times (analogously to method
* quiescent()) before trying to reactivate. Because idle workers
* are often not yet blocked (parked), we use a WorkQueue field to
* advertise that a waiter actually needs unparking upon signal.
* it invokes deactivate, that first deactivates (to an IDLE
* phase). Avoiding missed signals during deactivation requires a
* (conservative) rescan, reactivating if there may be tasks to
* poll. Because idle workers are often not yet blocked (parked),
* we use a WorkQueue field to advertise that a waiter actually
* needs unparking upon signal.
*
* When tasks are constructed as (recursive) DAGs, top-level
* scanning is usually infrequent, and doesn't encounter most
* of the following problems addressed by runWorker and awaitWork:
*
* Locality. Polls are organized into "runs", continuing until
* empty or contended, while also minimizing interference by
* postponing bookeeping to ends of runs. This may reduce
* fairness.
*
* Contention. When many workers try to poll few queues, they
* often collide, generating CAS failures and disrupting locality
* of workers already running their tasks. This also leads to
* stalls when tasks cannot be taken because other workers have
* not finished poll operations, which is detected by reading
* ahead in queue arrays. In both cases, workers restart scans in a
* way that approximates randomized backoff.
*
* Oversignalling. When many short top-level tasks are present in
* a small number of queues, the above signalling strategy may
* activate many more workers than needed, worsening locality and
* contention problems, while also generating more global
* contention (field ctl is CASed on every activation and
* deactivation). We filter out (both in runWorker and
* signalWork) attempted signals that are surely not needed
* because the signalled tasks are already taken.
*
* Shutdown and Quiescence
* =======================
*
* Quiescence. Workers scan looking for work, giving up when they
* don't find any, without being sure that none are available.
@ -893,9 +874,7 @@ public class ForkJoinPool extends AbstractExecutorService
* shutdown, runners are interrupted so they can cancel. Since
* external joining callers never run these tasks, they must await
* cancellation by others, which can occur along several different
* paths. The inability to rely on caller-runs may also require
* extra signalling (resulting in scanning and contention) so is
* done only conditionally in methods push and runworker.
* paths.
*
* Across these APIs, rules for reporting exceptions for tasks
* with results accessed via join() differ from those via get(),
@ -962,9 +941,13 @@ public class ForkJoinPool extends AbstractExecutorService
* less-contended applications. To help arrange this, some
* non-reference fields are declared as "long" even when ints or
* shorts would suffice. For class WorkQueue, an
* embedded @Contended region segregates fields most heavily
* updated by owners from those most commonly read by stealers or
* other management.
* embedded @Contended isolates the very busy top index, along
* with status and bookkeeping fields written (mostly) by owners,
* that otherwise interfere with reading array and base
* fields. There are other variables commonly contributing to
* false-sharing-related performance issues (including fields of
* class Thread), but we can't do much about this except try to
* minimize access.
*
* Initial sizing and resizing of WorkQueue arrays is an even more
* delicate tradeoff because the best strategy systematically
@ -973,13 +956,11 @@ public class ForkJoinPool extends AbstractExecutorService
* direct false-sharing and indirect cases due to GC bookkeeping
* (cardmarks etc), and reduce the number of resizes, which are
* not especially fast because they require atomic transfers.
* Currently, arrays for workers are initialized to be just large
* enough to avoid resizing in most tree-structured tasks, but
* larger for external queues where both false-sharing problems
* and the need for resizing are more common. (Maintenance note:
* any changes in fields, queues, or their uses, or JVM layout
* policies, must be accompanied by re-evaluation of these
* placement and sizing decisions.)
* Currently, arrays are initialized to be just large enough to
* avoid resizing in most tree-structured tasks, but grow rapidly
* until large. (Maintenance note: any changes in fields, queues,
* or their uses, or JVM layout policies, must be accompanied by
* re-evaluation of these placement and sizing decisions.)
*
* Style notes
* ===========
@ -1062,17 +1043,11 @@ public class ForkJoinPool extends AbstractExecutorService
static final int DEFAULT_COMMON_MAX_SPARES = 256;
/**
* Initial capacity of work-stealing queue array for workers.
* Initial capacity of work-stealing queue array.
* Must be a power of two, at least 2. See above.
*/
static final int INITIAL_QUEUE_CAPACITY = 1 << 6;
/**
* Initial capacity of work-stealing queue array for external queues.
* Must be a power of two, at least 2. See above.
*/
static final int INITIAL_EXTERNAL_QUEUE_CAPACITY = 1 << 9;
// conversions among short, int, long
static final int SMASK = 0xffff; // (unsigned) short bits
static final long LMASK = 0xffffffffL; // lower 32 bits of long
@ -1243,11 +1218,11 @@ public class ForkJoinPool extends AbstractExecutorService
*/
WorkQueue(ForkJoinWorkerThread owner, int id, int cfg,
boolean clearThreadLocals) {
array = new ForkJoinTask<?>[owner == null ?
INITIAL_EXTERNAL_QUEUE_CAPACITY :
INITIAL_QUEUE_CAPACITY];
this.owner = owner;
this.config = (clearThreadLocals) ? cfg | CLEAR_TLS : cfg;
if ((this.owner = owner) == null) {
array = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
phase = id | IDLE;
}
}
/**