callable, ForkJoinPool pool) {
+ this.when = DelayScheduler.now() + Math.max(delay, 0L);
+ this.heapIndex = -1;
+ this.nextDelay = nextDelay;
+ this.isImmediate = isImmediate;
+ this.runnable = runnable;
+ this.callable = callable;
+ this.pool = pool;
+ }
+
+ public void schedule() { // relay to pool, to allow independent use
+ ForkJoinPool p;
+ if ((p = pool) != null) // else already run
+ p.scheduleDelayedTask(this);
+ }
+
+ // InterruptibleTask methods
+ public final T getRawResult() { return result; }
+ public final void setRawResult(T v) { result = v; }
+ final Object adaptee() { return (runnable != null) ? runnable : callable; }
+
+ final T compute() throws Exception {
+ Callable extends T> c; Runnable r;
+ T res = null;
+ if ((r = runnable) != null)
+ r.run();
+ else if ((c = callable) != null)
+ res = c.call();
+ return res;
+ }
+
+ final boolean postExec() { // possibly resubmit
+ long d; ForkJoinPool p; DelayScheduler ds;
+ if ((d = nextDelay) != 0L && // is periodic
+ status >= 0 && // not abnormally completed
+ (p = pool) != null && (ds = p.delayScheduler) != null) {
+ if (p.shutdownStatus(ds) == 0) {
+ heapIndex = -1;
+ if (d < 0L)
+ when = DelayScheduler.now() - d;
+ else
+ when += d;
+ ds.pend(this);
+ return false;
+ }
+ trySetCancelled(); // pool is shutdown
+ }
+ pool = null; // reduce memory retention
+ runnable = null;
+ callable = null;
+ return true;
+ }
+
+ public final boolean cancel(boolean mayInterruptIfRunning) {
+ int s; ForkJoinPool p; DelayScheduler ds;
+ if ((s = trySetCancelled()) < 0)
+ return ((s & (ABNORMAL | THROWN)) == ABNORMAL);
+ if ((p = pool) != null &&
+ !interruptIfRunning(mayInterruptIfRunning)) {
+ pool = null;
+ runnable = null;
+ callable = null;
+ if (heapIndex >= 0 && nextPending == null &&
+ (ds = p.delayScheduler) != null)
+ ds.pend(this); // for heap cleanup
+ }
+ return true;
+ }
+
+
+ // ScheduledFuture methods
+ public final long getDelay(TimeUnit unit) {
+ return unit.convert(when - DelayScheduler.now(), NANOSECONDS);
+ }
+ public int compareTo(Delayed other) { // never used internally
+ long diff = (other instanceof ScheduledForkJoinTask> t) ?
+ when - t.when : // avoid nanoTime calls and conversions
+ getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
+ return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
+ }
+ }
+
+}
+
diff --git a/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java b/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java
index 61e9f0e3ffa..ec46f61291d 100644
--- a/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java
+++ b/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java
@@ -42,6 +42,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
+import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.LockSupport;
@@ -50,6 +51,7 @@ import jdk.internal.access.JavaUtilConcurrentFJPAccess;
import jdk.internal.access.SharedSecrets;
import jdk.internal.misc.Unsafe;
import jdk.internal.vm.SharedThreadContainer;
+import static java.util.concurrent.DelayScheduler.ScheduledForkJoinTask;
/**
* An {@link ExecutorService} for running {@link ForkJoinTask}s.
@@ -133,11 +135,34 @@ import jdk.internal.vm.SharedThreadContainer;
*
*
*
+ * Additionally, this class supports {@link
+ * ScheduledExecutorService} methods to delay or periodically execute
+ * tasks, as well as method {@link #submitWithTimeout} to cancel tasks
+ * that take too long. The scheduled functions or actions may create
+ * and invoke other {@linkplain ForkJoinTask ForkJoinTasks}. Delayed
+ * actions become enabled and behave as ordinary submitted
+ * tasks when their delays elapse. Scheduling methods return
+ * {@linkplain ForkJoinTask ForkJoinTasks} that implement the {@link
+ * ScheduledFuture} interface. Resource exhaustion encountered after
+ * initial submission results in task cancellation. When time-based
+ * methods are used, shutdown policies match the default policies of
+ * class {@link ScheduledThreadPoolExecutor}: upon {@link #shutdown},
+ * existing periodic tasks will not re-execute, and the pool
+ * terminates when quiescent and existing delayed tasks
+ * complete. Method {@link #cancelDelayedTasksOnShutdown} may be used
+ * to disable all delayed tasks upon shutdown, and method {@link
+ * #shutdownNow} may be used to instead unconditionally initiate pool
+ * termination. Monitoring methods such as {@link #getQueuedTaskCount}
+ * do not include scheduled tasks that are not yet enabled to execute,
+ * which are reported separately by method {@link
+ * #getDelayedTaskCount}.
+ *
*
The parameters used to construct the common pool may be controlled by
* setting the following {@linkplain System#getProperty system properties}:
*
* - {@systemProperty java.util.concurrent.ForkJoinPool.common.parallelism}
- * - the parallelism level, a non-negative integer
+ * - the parallelism level, a non-negative integer. Usage is discouraged.
+ * Use {@link #setParallelism} instead.
*
- {@systemProperty java.util.concurrent.ForkJoinPool.common.threadFactory}
* - the class name of a {@link ForkJoinWorkerThreadFactory}.
* The {@linkplain ClassLoader#getSystemClassLoader() system class loader}
@@ -155,10 +180,11 @@ import jdk.internal.vm.SharedThreadContainer;
* {@linkplain Thread#getContextClassLoader() thread context class loader}.
*
* Upon any error in establishing these settings, default parameters
- * are used. It is possible to disable or limit the use of threads in
- * the common pool by setting the parallelism property to zero, and/or
- * using a factory that may return {@code null}. However doing so may
- * cause unjoined tasks to never be executed.
+ * are used. It is possible to disable use of threads by using a
+ * factory that may return {@code null}, in which case some tasks may
+ * never execute. While possible, it is strongly discouraged to set
+ * the parallelism property to zero, which may be internally
+ * overridden in the presence of intrinsically async tasks.
*
* @implNote This implementation restricts the maximum number of
* running threads to 32767. Attempts to create pools with greater
@@ -171,7 +197,8 @@ import jdk.internal.vm.SharedThreadContainer;
* @since 1.7
* @author Doug Lea
*/
-public class ForkJoinPool extends AbstractExecutorService {
+public class ForkJoinPool extends AbstractExecutorService
+ implements ScheduledExecutorService {
/*
* Implementation Overview
@@ -215,6 +242,7 @@ public class ForkJoinPool extends AbstractExecutorService {
* 3. Completion-based tasks (mainly CountedCompleters)
* 4. CommonPool and parallelStream support
* 5. InterruptibleTasks for externally submitted tasks
+ * 6. Support ScheduledExecutorService methods
*
* Most changes involve adaptions of base algorithms using
* combinations of static and dynamic bitwise mode settings (both
@@ -359,18 +387,18 @@ public class ForkJoinPool extends AbstractExecutorService {
* WorkQueues are also used in a similar way for tasks submitted
* to the pool. We cannot mix these tasks in the same queues used
* by workers. Instead, we randomly associate submission queues
- * with submitting threads, using a form of hashing. The
- * ThreadLocalRandom probe value serves as a hash code for
- * choosing existing queues, and may be randomly repositioned upon
- * contention with other submitters. In essence, submitters act
- * like workers except that they are restricted to executing local
- * tasks that they submitted (or when known, subtasks thereof).
- * Insertion of tasks in shared mode requires a lock. We use only
- * a simple spinlock (as one role of field "phase") because
- * submitters encountering a busy queue move to a different
- * position to use or create other queues. They (spin) block when
- * registering new queues, or indirectly elsewhere, by revisiting
- * later.
+ * with submitting threads (or carriers when using VirtualThreads)
+ * using a form of hashing. The ThreadLocalRandom probe value
+ * serves as a hash code for choosing existing queues, and may be
+ * randomly repositioned upon contention with other submitters.
+ * In essence, submitters act like workers except that they are
+ * restricted to executing local tasks that they submitted (or
+ * when known, subtasks thereof). Insertion of tasks in shared
+ * mode requires a lock. We use only a simple spinlock (as one
+ * role of field "phase") because submitters encountering a busy
+ * queue move to a different position to use or create other
+ * queues. They (spin) block when registering new queues, or
+ * indirectly elsewhere, by revisiting later.
*
* Management
* ==========
@@ -826,7 +854,13 @@ public class ForkJoinPool extends AbstractExecutorService {
* fashion or use CountedCompleters (as is true for jdk
* parallelStreams). Support infiltrates several methods,
* including those that retry helping steps until we are sure that
- * none apply if there are no workers.
+ * none apply if there are no workers. To deal with conflicting
+ * requirements, uses of the commonPool that require async because
+ * caller-runs need not apply, ensure threads are enabled (by
+ * setting parallelism) via method asyncCommonPool before
+ * proceeding. (In principle, overriding zero parallelism needs to
+ * ensure at least one worker, but due to other backward
+ * compatibility contraints, ensures two.)
*
* As a more appropriate default in managed environments, unless
* overridden by system properties, we use workers of subclass
@@ -851,7 +885,8 @@ public class ForkJoinPool extends AbstractExecutorService {
* To comply with ExecutorService specs, we use subclasses of
* abstract class InterruptibleTask for tasks that require
* stronger interruption and cancellation guarantees. External
- * submitters never run these tasks, even if in the common pool.
+ * submitters never run these tasks, even if in the common pool
+ * (as indicated by ForkJoinTask.noUserHelp status bit).
* InterruptibleTasks include a "runner" field (implemented
* similarly to FutureTask) to support cancel(true). Upon pool
* shutdown, runners are interrupted so they can cancel. Since
@@ -880,6 +915,27 @@ public class ForkJoinPool extends AbstractExecutorService {
* writing, virtual thread bodies are by default run as some form
* of InterruptibleTask.
*
+ * DelayScheduler
+ * ================
+ *
+ * This class supports ScheduledExecutorService methods by
+ * creating and starting a DelayScheduler on first use of these
+ * methods (via startDelayScheduler). The scheduler operates
+ * independently in its own thread, relaying tasks to the pool to
+ * execute when their delays elapse (see method
+ * executeEnabledScheduledTask). The only other interactions with
+ * the delayScheduler are to control shutdown and maintain
+ * shutdown-related policies in methods quiescent() and
+ * tryTerminate(). In particular, processing must deal with cases
+ * in which tasks are submitted before shutdown, but not enabled
+ * until afterwards, in which case they must bypass some screening
+ * to be allowed to run. Conversely, the DelayScheduler checks
+ * runState status and when enabled, completes termination, using
+ * only methods shutdownStatus and tryStopIfShutdown. All of these
+ * methods are final and have signatures referencing
+ * DelaySchedulers, so cannot conflict with those of any existing
+ * FJP subclasses.
+ *
* Memory placement
* ================
*
@@ -950,7 +1006,9 @@ public class ForkJoinPool extends AbstractExecutorService {
* Nearly all explicit checks lead to bypass/return, not exception
* throws, because they may legitimately arise during shutdown. A
* few unusual loop constructions encourage (with varying
- * effectiveness) JVMs about where (not) to place safepoints.
+ * effectiveness) JVMs about where (not) to place safepoints. All
+ * public methods screen arguments (mainly null checks) before
+ * creating or executing tasks.
*
* There is a lot of representation-level coupling among classes
* ForkJoinPool, ForkJoinWorkerThread, and ForkJoinTask. The
@@ -1045,6 +1103,7 @@ public class ForkJoinPool extends AbstractExecutorService {
static final int DROPPED = 1 << 16; // removed from ctl counts
static final int UNCOMPENSATE = 1 << 16; // tryCompensate return
static final int IDLE = 1 << 16; // phase seqlock/version count
+ static final int MIN_QUEUES_SIZE = 1 << 4; // ensure external slots
/*
* Bits and masks for ctl and bounds are packed with 4 16 bit subfields:
@@ -1214,7 +1273,7 @@ public class ForkJoinPool extends AbstractExecutorService {
/**
* Pushes a task. Called only by owner or if already locked
*
- * @param task the task. Caller must ensure non-null.
+ * @param task the task; no-op if null
* @param pool the pool to signal if was previously empty, else null
* @param internal if caller owns this queue
* @throws RejectedExecutionException if array could not be resized
@@ -1222,7 +1281,9 @@ public class ForkJoinPool extends AbstractExecutorService {
final void push(ForkJoinTask> task, ForkJoinPool pool,
boolean internal) {
int s = top, b = base, m, cap, room; ForkJoinTask>[] a;
- if ((a = array) != null && (cap = a.length) > 0) { // else disabled
+ if ((a = array) != null && (cap = a.length) > 0 && // else disabled
+ task != null) {
+ int pk = task.noUserHelp() + 1; // prev slot offset
if ((room = (m = cap - 1) - (s - b)) >= 0) {
top = s + 1;
long pos = slotOffset(m & s);
@@ -1237,10 +1298,7 @@ public class ForkJoinPool extends AbstractExecutorService {
unlockPhase();
if (room < 0)
throw new RejectedExecutionException("Queue capacity exceeded");
- if ((room == 0 || // pad for InterruptibleTasks
- a[m & (s - ((internal || task == null ||
- task.getClass().getSuperclass() !=
- interruptibleTaskClass) ? 1 : 2))] == null) &&
+ if ((room == 0 || a[m & (s - pk)] == null) &&
pool != null)
pool.signalWork(); // may have appeared empty
}
@@ -1579,11 +1637,6 @@ public class ForkJoinPool extends AbstractExecutorService {
*/
static volatile RuntimePermission modifyThreadPermission;
- /**
- * Cached for faster type tests.
- */
- static final Class> interruptibleTaskClass;
-
/**
* For VirtualThread intrinsics
*/
@@ -1596,12 +1649,15 @@ public class ForkJoinPool extends AbstractExecutorService {
final UncaughtExceptionHandler ueh; // per-worker UEH
final SharedThreadContainer container;
final String workerNamePrefix; // null for common pool
+ final String poolName;
+ volatile DelayScheduler delayScheduler; // lazily constructed
WorkQueue[] queues; // main registry
volatile long runState; // versioned, lockable
final long keepAlive; // milliseconds before dropping if idle
final long config; // static configuration bits
volatile long stealCount; // collects worker nsteals
volatile long threadIds; // for worker thread names
+
@jdk.internal.vm.annotation.Contended("fjpctl") // segregate
volatile long ctl; // main pool control
@jdk.internal.vm.annotation.Contended("fjpctl") // colocate
@@ -1885,6 +1941,7 @@ public class ForkJoinPool extends AbstractExecutorService {
long phaseSum = 0L;
boolean swept = false;
for (long e, prevRunState = 0L; ; prevRunState = e) {
+ DelayScheduler ds;
long c = ctl;
if (((e = runState) & STOP) != 0L)
return 1; // terminating
@@ -1907,6 +1964,8 @@ public class ForkJoinPool extends AbstractExecutorService {
}
else if ((e & SHUTDOWN) == 0)
return 0;
+ else if ((ds = delayScheduler) != null && !ds.canShutDown())
+ return 0;
else if (compareAndSetCtl(c, c) && casRunState(e, e | STOP))
return 1; // enable termination
else
@@ -1957,15 +2016,13 @@ public class ForkJoinPool extends AbstractExecutorService {
}
else {
boolean propagate;
- int nb = q.base = b + 1;
+ int nb = q.base = b + 1, prevSrc = src;
w.nsteals = ++nsteals;
- w.source = j; // volatile
+ w.source = src = j; // volatile
rescan = true;
+ int nh = t.noUserHelp();
if (propagate =
- ((src != (src = j) ||
- t.getClass().getSuperclass() ==
- interruptibleTaskClass) &&
- a[nb & m] != null))
+ (prevSrc != src || nh != 0) && a[nb & m] != null)
signalWork();
w.topLevelExec(t, fifo);
if ((b = q.base) != nb && !propagate)
@@ -2004,8 +2061,8 @@ public class ForkJoinPool extends AbstractExecutorService {
((e & SHUTDOWN) != 0L && ac == 0 && quiescent() > 0) ||
(qs = queues) == null || (n = qs.length) <= 0)
return IDLE; // terminating
- int prechecks = Math.min(ac, 2); // reactivation threshold
- for (int k = Math.max(n << 2, SPIN_WAITS << 1);;) {
+ int k = Math.max(n << 2, SPIN_WAITS << 1);
+ for (int prechecks = k / n;;) { // reactivation threshold
WorkQueue q; int cap; ForkJoinTask>[] a; long c;
if (w.phase == activePhase)
return activePhase;
@@ -2014,7 +2071,7 @@ public class ForkJoinPool extends AbstractExecutorService {
if ((q = qs[k & (n - 1)]) == null)
Thread.onSpinWait();
else if ((a = q.array) != null && (cap = a.length) > 0 &&
- a[q.base & (cap - 1)] != null && --prechecks < 0 &&
+ a[q.base & (cap - 1)] != null && --prechecks <= 0 &&
(int)(c = ctl) == activePhase &&
compareAndSetCtl(c, (sp & LMASK) | ((c + RC_UNIT) & UMASK)))
return w.phase = activePhase; // reactivate
@@ -2513,37 +2570,45 @@ public class ForkJoinPool extends AbstractExecutorService {
* Finds and locks a WorkQueue for an external submitter, or
* throws RejectedExecutionException if shutdown or terminating.
* @param r current ThreadLocalRandom.getProbe() value
- * @param isSubmit false if this is for a common pool fork
+ * @param rejectOnShutdown true if RejectedExecutionException
+ * should be thrown when shutdown (else only if terminating)
*/
- private WorkQueue submissionQueue(int r) {
- if (r == 0) {
+ private WorkQueue submissionQueue(int r, boolean rejectOnShutdown) {
+ int reuse; // nonzero if prefer create
+ if ((reuse = r) == 0) {
ThreadLocalRandom.localInit(); // initialize caller's probe
r = ThreadLocalRandom.getProbe();
}
- for (;;) {
- int n, i, id; WorkQueue[] qs; WorkQueue q, w = null;
+ for (int probes = 0; ; ++probes) {
+ int n, i, id; WorkQueue[] qs; WorkQueue q;
if ((qs = queues) == null)
break;
if ((n = qs.length) <= 0)
break;
if ((q = qs[i = (id = r & EXTERNAL_ID_MASK) & (n - 1)]) == null) {
- if (w == null)
- w = new WorkQueue(null, id, 0, false);
+ WorkQueue w = new WorkQueue(null, id, 0, false);
w.phase = id;
- long isShutdown = lockRunState() & SHUTDOWN;
- if (isShutdown == 0L && queues == qs && qs[i] == null) {
- q = qs[i] = w; // else retry
- w = null;
- }
+ boolean reject = ((lockRunState() & SHUTDOWN) != 0 &&
+ rejectOnShutdown);
+ if (!reject && queues == qs && qs[i] == null)
+ q = qs[i] = w; // else lost race to install
unlockRunState();
if (q != null)
return q;
- if (isShutdown != 0L)
+ if (reject)
break;
+ reuse = 0;
}
- else if (!q.tryLockPhase()) // move index
+ if (reuse == 0 || !q.tryLockPhase()) { // move index
+ if (reuse == 0) {
+ if (probes >= n >> 1)
+ reuse = r; // stop prefering free slot
+ }
+ else if (q != null)
+ reuse = 0; // probe on collision
r = ThreadLocalRandom.advanceProbe(r);
- else if ((runState & SHUTDOWN) != 0L) {
+ }
+ else if (rejectOnShutdown && (runState & SHUTDOWN) != 0L) {
q.unlockPhase(); // check while q lock held
break;
}
@@ -2553,7 +2618,7 @@ public class ForkJoinPool extends AbstractExecutorService {
throw new RejectedExecutionException();
}
- private void poolSubmit(boolean signalIfEmpty, ForkJoinTask> task) {
+ private ForkJoinTask poolSubmit(boolean signalIfEmpty, ForkJoinTask task) {
Thread t; ForkJoinWorkerThread wt; WorkQueue q; boolean internal;
if (((t = JLA.currentCarrierThread()) instanceof ForkJoinWorkerThread) &&
(wt = (ForkJoinWorkerThread)t).pool == this) {
@@ -2562,21 +2627,22 @@ public class ForkJoinPool extends AbstractExecutorService {
}
else { // find and lock queue
internal = false;
- q = submissionQueue(ThreadLocalRandom.getProbe());
+ q = submissionQueue(ThreadLocalRandom.getProbe(), true);
}
q.push(task, signalIfEmpty ? this : null, internal);
+ return task;
}
/**
* Returns queue for an external submission, bypassing call to
* submissionQueue if already established and unlocked.
*/
- final WorkQueue externalSubmissionQueue() {
+ final WorkQueue externalSubmissionQueue(boolean rejectOnShutdown) {
WorkQueue[] qs; WorkQueue q; int n;
int r = ThreadLocalRandom.getProbe();
return (((qs = queues) != null && (n = qs.length) > 0 &&
(q = qs[r & EXTERNAL_ID_MASK & (n - 1)]) != null && r != 0 &&
- q.tryLockPhase()) ? q : submissionQueue(r));
+ q.tryLockPhase()) ? q : submissionQueue(r, rejectOnShutdown));
}
/**
@@ -2700,14 +2766,20 @@ public class ForkJoinPool extends AbstractExecutorService {
}
}
else if ((isShutdown = (e & SHUTDOWN)) != 0L || enable) {
+ long quiet; DelayScheduler ds;
if (isShutdown == 0L)
getAndBitwiseOrRunState(SHUTDOWN);
- if (quiescent() > 0)
+ if ((quiet = quiescent()) > 0)
now = true;
+ else if (quiet == 0 && (ds = delayScheduler) != null)
+ ds.signal();
}
if (now) {
+ DelayScheduler ds;
releaseWaiters();
+ if ((ds = delayScheduler) != null)
+ ds.signal();
for (;;) {
if (((e = runState) & CLEANED) == 0L) {
boolean clean = cleanQueues();
@@ -2718,6 +2790,8 @@ public class ForkJoinPool extends AbstractExecutorService {
break;
if (ctl != 0L) // else loop if didn't finish cleaning
break;
+ if ((ds = delayScheduler) != null && ds.signal() >= 0)
+ break;
if ((e & CLEANED) != 0L) {
e |= TERMINATED;
if ((getAndBitwiseOrRunState(TERMINATED) & TERMINATED) == 0L) {
@@ -2888,12 +2962,9 @@ public class ForkJoinPool extends AbstractExecutorService {
* event-style asynchronous tasks. For default value, use {@code
* false}.
*
- * @param corePoolSize the number of threads to keep in the pool
- * (unless timed out after an elapsed keep-alive). Normally (and
- * by default) this is the same value as the parallelism level,
- * but may be set to a larger value to reduce dynamic overhead if
- * tasks regularly block. Using a smaller value (for example
- * {@code 0}) has the same effect as the default.
+ * @param corePoolSize ignored: used in previous releases of this
+ * class but no longer applicable. Using {@code 0} maintains
+ * compatibility across releases.
*
* @param maximumPoolSize the maximum number of threads allowed.
* When the maximum is reached, attempts to replace blocked
@@ -2957,7 +3028,8 @@ public class ForkJoinPool extends AbstractExecutorService {
throw new IllegalArgumentException();
if (factory == null || unit == null)
throw new NullPointerException();
- int size = 1 << (33 - Integer.numberOfLeadingZeros(p - 1));
+ int size = Math.max(MIN_QUEUES_SIZE,
+ 1 << (33 - Integer.numberOfLeadingZeros(p - 1)));
this.parallelism = p;
this.factory = factory;
this.ueh = handler;
@@ -2971,6 +3043,7 @@ public class ForkJoinPool extends AbstractExecutorService {
this.queues = new WorkQueue[size];
String pid = Integer.toString(getAndAddPoolIds(1) + 1);
String name = "ForkJoinPool-" + pid;
+ this.poolName = name;
this.workerNamePrefix = name + "-worker-";
this.container = SharedThreadContainer.create(name);
}
@@ -2980,6 +3053,7 @@ public class ForkJoinPool extends AbstractExecutorService {
* overridden by system properties
*/
private ForkJoinPool(byte forCommonPoolOnly) {
+ String name = "ForkJoinPool.commonPool";
ForkJoinWorkerThreadFactory fac = defaultForkJoinWorkerThreadFactory;
UncaughtExceptionHandler handler = null;
int maxSpares = DEFAULT_COMMON_MAX_SPARES;
@@ -3013,7 +3087,9 @@ public class ForkJoinPool extends AbstractExecutorService {
if (preset == 0)
pc = Math.max(1, Runtime.getRuntime().availableProcessors() - 1);
int p = Math.min(pc, MAX_CAP);
- int size = (p == 0) ? 1 : 1 << (33 - Integer.numberOfLeadingZeros(p-1));
+ int size = Math.max(MIN_QUEUES_SIZE,
+ (p == 0) ? 1 :
+ 1 << (33 - Integer.numberOfLeadingZeros(p-1)));
this.parallelism = p;
this.config = ((preset & LMASK) | (((long)maxSpares) << TC_SHIFT) |
(1L << RC_SHIFT));
@@ -3022,8 +3098,9 @@ public class ForkJoinPool extends AbstractExecutorService {
this.keepAlive = DEFAULT_KEEPALIVE;
this.saturate = null;
this.workerNamePrefix = null;
+ this.poolName = name;
this.queues = new WorkQueue[size];
- this.container = SharedThreadContainer.create("ForkJoinPool.commonPool");
+ this.container = SharedThreadContainer.create(name);
}
/**
@@ -3044,6 +3121,16 @@ public class ForkJoinPool extends AbstractExecutorService {
return common;
}
+ /**
+ * Package-private access to commonPool overriding zero parallelism
+ */
+ static ForkJoinPool asyncCommonPool() {
+ ForkJoinPool cp; int p;
+ if ((p = (cp = common).parallelism) == 0)
+ U.compareAndSetInt(cp, PARALLELISM, 0, 2);
+ return cp;
+ }
+
// Execution methods
/**
@@ -3064,8 +3151,7 @@ public class ForkJoinPool extends AbstractExecutorService {
* scheduled for execution
*/
public T invoke(ForkJoinTask task) {
- Objects.requireNonNull(task);
- poolSubmit(true, task);
+ poolSubmit(true, Objects.requireNonNull(task));
try {
return task.join();
} catch (RuntimeException | Error unchecked) {
@@ -3084,8 +3170,7 @@ public class ForkJoinPool extends AbstractExecutorService {
* scheduled for execution
*/
public void execute(ForkJoinTask> task) {
- Objects.requireNonNull(task);
- poolSubmit(true, task);
+ poolSubmit(true, Objects.requireNonNull(task));
}
// AbstractExecutorService methods
@@ -3098,7 +3183,7 @@ public class ForkJoinPool extends AbstractExecutorService {
@Override
@SuppressWarnings("unchecked")
public void execute(Runnable task) {
- poolSubmit(true, (task instanceof ForkJoinTask>)
+ poolSubmit(true, (Objects.requireNonNull(task) instanceof ForkJoinTask>)
? (ForkJoinTask) task // avoid re-wrap
: new ForkJoinTask.RunnableExecuteAction(task));
}
@@ -3118,9 +3203,7 @@ public class ForkJoinPool extends AbstractExecutorService {
* scheduled for execution
*/
public ForkJoinTask submit(ForkJoinTask task) {
- Objects.requireNonNull(task);
- poolSubmit(true, task);
- return task;
+ return poolSubmit(true, Objects.requireNonNull(task));
}
/**
@@ -3130,12 +3213,12 @@ public class ForkJoinPool extends AbstractExecutorService {
*/
@Override
public ForkJoinTask submit(Callable task) {
- ForkJoinTask t =
+ Objects.requireNonNull(task);
+ return poolSubmit(
+ true,
(Thread.currentThread() instanceof ForkJoinWorkerThread) ?
new ForkJoinTask.AdaptedCallable(task) :
- new ForkJoinTask.AdaptedInterruptibleCallable(task);
- poolSubmit(true, t);
- return t;
+ new ForkJoinTask.AdaptedInterruptibleCallable(task));
}
/**
@@ -3145,12 +3228,12 @@ public class ForkJoinPool extends AbstractExecutorService {
*/
@Override
public ForkJoinTask submit(Runnable task, T result) {
- ForkJoinTask t =
+ Objects.requireNonNull(task);
+ return poolSubmit(
+ true,
(Thread.currentThread() instanceof ForkJoinWorkerThread) ?
new ForkJoinTask.AdaptedRunnable(task, result) :
- new ForkJoinTask.AdaptedInterruptibleRunnable(task, result);
- poolSubmit(true, t);
- return t;
+ new ForkJoinTask.AdaptedInterruptibleRunnable(task, result));
}
/**
@@ -3161,13 +3244,14 @@ public class ForkJoinPool extends AbstractExecutorService {
@Override
@SuppressWarnings("unchecked")
public ForkJoinTask> submit(Runnable task) {
- ForkJoinTask> f = (task instanceof ForkJoinTask>) ?
+ Objects.requireNonNull(task);
+ return poolSubmit(
+ true,
+ (task instanceof ForkJoinTask>) ?
(ForkJoinTask) task : // avoid re-wrap
((Thread.currentThread() instanceof ForkJoinWorkerThread) ?
new ForkJoinTask.AdaptedRunnable(task, null) :
- new ForkJoinTask.AdaptedInterruptibleRunnable(task, null));
- poolSubmit(true, f);
- return f;
+ new ForkJoinTask.AdaptedInterruptibleRunnable(task, null)));
}
/**
@@ -3189,7 +3273,7 @@ public class ForkJoinPool extends AbstractExecutorService {
*/
public ForkJoinTask externalSubmit(ForkJoinTask task) {
Objects.requireNonNull(task);
- externalSubmissionQueue().push(task, this, false);
+ externalSubmissionQueue(true).push(task, this, false);
return task;
}
@@ -3210,9 +3294,7 @@ public class ForkJoinPool extends AbstractExecutorService {
* @since 19
*/
public ForkJoinTask lazySubmit(ForkJoinTask task) {
- Objects.requireNonNull(task);
- poolSubmit(false, task);
- return task;
+ return poolSubmit(false, Objects.requireNonNull(task));
}
/**
@@ -3350,6 +3432,322 @@ public class ForkJoinPool extends AbstractExecutorService {
.invokeAny(tasks, this, true, unit.toNanos(timeout));
}
+ // Support for delayed tasks
+
+ /**
+ * Returns STOP and SHUTDOWN status (zero if neither), masking or
+ * truncating out other bits.
+ */
+ final int shutdownStatus(DelayScheduler ds) {
+ return (int)(runState & (SHUTDOWN | STOP));
+ }
+
+ /**
+ * Tries to stop and possibly terminate if already enabled, return success.
+ */
+ final boolean tryStopIfShutdown(DelayScheduler ds) {
+ return (tryTerminate(false, false) & STOP) != 0L;
+ }
+
+ /**
+ * Creates and starts DelayScheduler
+ */
+ private DelayScheduler startDelayScheduler() {
+ DelayScheduler ds;
+ if ((ds = delayScheduler) == null) {
+ boolean start = false;
+ String name = poolName + "-delayScheduler";
+ if (workerNamePrefix == null)
+ asyncCommonPool(); // override common parallelism zero
+ lockRunState();
+ try {
+ if ((ds = delayScheduler) == null) {
+ ds = delayScheduler = new DelayScheduler(this, name);
+ start = true;
+ }
+ } finally {
+ unlockRunState();
+ }
+ if (start) { // start outside of lock
+ // exceptions on start passed to (external) callers
+ SharedThreadContainer ctr;
+ if ((ctr = container) != null)
+ ctr.start(ds);
+ else
+ ds.start();
+ }
+ }
+ return ds;
+ }
+
+ /**
+ * Arranges execution of a ScheduledForkJoinTask whose delay has
+ * elapsed
+ */
+ final void executeEnabledScheduledTask(ScheduledForkJoinTask> task) {
+ externalSubmissionQueue(false).push(task, this, false);
+ }
+
+ /**
+ * Arranges delayed execution of a ScheduledForkJoinTask via the
+ * DelayScheduler, creating and starting it if necessary.
+ * @return the task
+ */
+ final ScheduledForkJoinTask