8319447: Improve performance of delayed task handling

Reviewed-by: vklang, alanb
This commit is contained in:
Doug Lea 2025-03-31 19:23:59 +00:00
parent fe8bd75621
commit 8b0602dbed
12 changed files with 1765 additions and 373 deletions

View File

@ -46,6 +46,7 @@ import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.Objects;
import static java.util.concurrent.DelayScheduler.ScheduledForkJoinTask;
/**
* A {@link Future} that may be explicitly completed (setting its
@ -69,18 +70,15 @@ import java.util.Objects;
* a completion method.
*
* <li>All <em>async</em> methods without an explicit Executor
* argument are performed using the {@link ForkJoinPool#commonPool()}
* (unless it does not support a parallelism level of at least two, in
* which case, a new Thread is created to run each task). This may be
* overridden for non-static methods in subclasses by defining method
* {@link #defaultExecutor()}. To simplify monitoring, debugging,
* and tracking, all generated asynchronous tasks are instances of the
* marker interface {@link AsynchronousCompletionTask}. Operations
* with time-delays can use adapter methods defined in this class, for
* example: {@code supplyAsync(supplier, delayedExecutor(timeout,
* timeUnit))}. To support methods with delays and timeouts, this
* class maintains at most one daemon thread for triggering and
* cancelling actions, not for running them.
* argument, as well as those involving delays are performed using the
* {@link ForkJoinPool#commonPool()}. The default async Executor may
* be overridden for non-static methods in subclasses by defining
* method {@link #defaultExecutor()}. To simplify monitoring,
* debugging, and tracking, all generated asynchronous tasks are
* instances of the marker interface {@link
* AsynchronousCompletionTask}. Operations with time-delays can use
* adapter methods defined in this class, for example: {@code
* supplyAsync(supplier, delayedExecutor(timeout, timeUnit))}.
*
* <li>All CompletionStage methods are implemented independently of
* other public methods, so the behavior of one method is not impacted
@ -473,34 +471,11 @@ public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
public static interface AsynchronousCompletionTask {
}
private static final boolean USE_COMMON_POOL =
(ForkJoinPool.getCommonPoolParallelism() > 1);
/**
* Default executor -- ForkJoinPool.commonPool() unless it cannot
* support parallelism.
* Default Executor
*/
private static final Executor ASYNC_POOL = USE_COMMON_POOL ?
ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
/** Fallback if ForkJoinPool.commonPool() cannot support parallelism */
private static final class ThreadPerTaskExecutor implements Executor {
public void execute(Runnable r) {
Objects.requireNonNull(r);
new Thread(r).start();
}
}
/**
* Null-checks user executor argument, and translates uses of
* commonPool to ASYNC_POOL in case parallelism disabled.
*/
static Executor screenExecutor(Executor e) {
if (!USE_COMMON_POOL && e == ForkJoinPool.commonPool())
return ASYNC_POOL;
if (e == null) throw new NullPointerException();
return e;
}
private static final ForkJoinPool ASYNC_POOL =
ForkJoinPool.asyncCommonPool(); // ensures minimal parallelism
// Modes for Completion.tryFire. Signedness matters.
static final int SYNC = 0;
@ -702,8 +677,8 @@ public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
private <V> CompletableFuture<V> uniApplyStage(
Executor e, Function<? super T,? extends V> f) {
if (f == null) throw new NullPointerException();
Object r;
Objects.requireNonNull(f);
if ((r = result) != null)
return uniApplyNow(r, e, f);
CompletableFuture<V> d = newIncompleteFuture();
@ -775,8 +750,8 @@ public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
private CompletableFuture<Void> uniAcceptStage(Executor e,
Consumer<? super T> f) {
if (f == null) throw new NullPointerException();
Object r;
Objects.requireNonNull(f);
if ((r = result) != null)
return uniAcceptNow(r, e, f);
CompletableFuture<Void> d = newIncompleteFuture();
@ -843,8 +818,8 @@ public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
}
private CompletableFuture<Void> uniRunStage(Executor e, Runnable f) {
if (f == null) throw new NullPointerException();
Object r;
Objects.requireNonNull(f);
if ((r = result) != null)
return uniRunNow(r, e, f);
CompletableFuture<Void> d = newIncompleteFuture();
@ -924,7 +899,7 @@ public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
private CompletableFuture<T> uniWhenCompleteStage(
Executor e, BiConsumer<? super T, ? super Throwable> f) {
if (f == null) throw new NullPointerException();
Objects.requireNonNull(f);
CompletableFuture<T> d = newIncompleteFuture();
Object r;
if ((r = result) == null)
@ -987,7 +962,7 @@ public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
private <V> CompletableFuture<V> uniHandleStage(
Executor e, BiFunction<? super T, Throwable, ? extends V> f) {
if (f == null) throw new NullPointerException();
Objects.requireNonNull(f);
CompletableFuture<V> d = newIncompleteFuture();
Object r;
if ((r = result) == null)
@ -1045,7 +1020,7 @@ public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
private CompletableFuture<T> uniExceptionallyStage(
Executor e, Function<Throwable, ? extends T> f) {
if (f == null) throw new NullPointerException();
Objects.requireNonNull(f);
CompletableFuture<T> d = newIncompleteFuture();
Object r;
if ((r = result) == null)
@ -1105,7 +1080,7 @@ public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
private CompletableFuture<T> uniComposeExceptionallyStage(
Executor e, Function<Throwable, ? extends CompletionStage<T>> f) {
if (f == null) throw new NullPointerException();
Objects.requireNonNull(f);
CompletableFuture<T> d = newIncompleteFuture();
Object r, s; Throwable x;
if ((r = result) == null)
@ -1212,7 +1187,7 @@ public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
private <V> CompletableFuture<V> uniComposeStage(
Executor e, Function<? super T, ? extends CompletionStage<V>> f) {
if (f == null) throw new NullPointerException();
Objects.requireNonNull(f);
CompletableFuture<V> d = newIncompleteFuture();
Object r, s; Throwable x;
if ((r = result) == null)
@ -1823,7 +1798,7 @@ public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
static <U> CompletableFuture<U> asyncSupplyStage(Executor e,
Supplier<U> f) {
if (f == null) throw new NullPointerException();
Objects.requireNonNull(f);
CompletableFuture<U> d = new CompletableFuture<U>();
e.execute(new AsyncSupply<U>(d, f));
return d;
@ -1859,7 +1834,7 @@ public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
}
static CompletableFuture<Void> asyncRunStage(Executor e, Runnable f) {
if (f == null) throw new NullPointerException();
Objects.requireNonNull(f);
CompletableFuture<Void> d = new CompletableFuture<Void>();
e.execute(new AsyncRun(d, f));
return d;
@ -2048,7 +2023,7 @@ public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
*/
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
Executor executor) {
return asyncSupplyStage(screenExecutor(executor), supplier);
return asyncSupplyStage(Objects.requireNonNull(executor), supplier);
}
/**
@ -2076,7 +2051,7 @@ public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
*/
public static CompletableFuture<Void> runAsync(Runnable runnable,
Executor executor) {
return asyncRunStage(screenExecutor(executor), runnable);
return asyncRunStage(Objects.requireNonNull(executor), runnable);
}
/**
@ -2241,7 +2216,7 @@ public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
* to transition to a completed state, else {@code false}
*/
public boolean completeExceptionally(Throwable ex) {
if (ex == null) throw new NullPointerException();
Objects.requireNonNull(ex);
boolean triggered = internalComplete(new AltResult(ex));
postComplete();
return triggered;
@ -2259,7 +2234,7 @@ public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
public <U> CompletableFuture<U> thenApplyAsync(
Function<? super T,? extends U> fn, Executor executor) {
return uniApplyStage(screenExecutor(executor), fn);
return uniApplyStage(Objects.requireNonNull(executor), fn);
}
public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
@ -2272,7 +2247,7 @@ public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,
Executor executor) {
return uniAcceptStage(screenExecutor(executor), action);
return uniAcceptStage(Objects.requireNonNull(executor), action);
}
public CompletableFuture<Void> thenRun(Runnable action) {
@ -2285,7 +2260,7 @@ public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
public CompletableFuture<Void> thenRunAsync(Runnable action,
Executor executor) {
return uniRunStage(screenExecutor(executor), action);
return uniRunStage(Objects.requireNonNull(executor), action);
}
public <U,V> CompletableFuture<V> thenCombine(
@ -2303,7 +2278,7 @@ public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
public <U,V> CompletableFuture<V> thenCombineAsync(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn, Executor executor) {
return biApplyStage(screenExecutor(executor), other, fn);
return biApplyStage(Objects.requireNonNull(executor), other, fn);
}
public <U> CompletableFuture<Void> thenAcceptBoth(
@ -2321,7 +2296,7 @@ public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
public <U> CompletableFuture<Void> thenAcceptBothAsync(
CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action, Executor executor) {
return biAcceptStage(screenExecutor(executor), other, action);
return biAcceptStage(Objects.requireNonNull(executor), other, action);
}
public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,
@ -2337,7 +2312,7 @@ public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,
Runnable action,
Executor executor) {
return biRunStage(screenExecutor(executor), other, action);
return biRunStage(Objects.requireNonNull(executor), other, action);
}
public <U> CompletableFuture<U> applyToEither(
@ -2353,7 +2328,7 @@ public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
public <U> CompletableFuture<U> applyToEitherAsync(
CompletionStage<? extends T> other, Function<? super T, U> fn,
Executor executor) {
return orApplyStage(screenExecutor(executor), other, fn);
return orApplyStage(Objects.requireNonNull(executor), other, fn);
}
public CompletableFuture<Void> acceptEither(
@ -2369,7 +2344,7 @@ public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
public CompletableFuture<Void> acceptEitherAsync(
CompletionStage<? extends T> other, Consumer<? super T> action,
Executor executor) {
return orAcceptStage(screenExecutor(executor), other, action);
return orAcceptStage(Objects.requireNonNull(executor), other, action);
}
public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,
@ -2385,7 +2360,7 @@ public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,
Runnable action,
Executor executor) {
return orRunStage(screenExecutor(executor), other, action);
return orRunStage(Objects.requireNonNull(executor), other, action);
}
public <U> CompletableFuture<U> thenCompose(
@ -2401,7 +2376,7 @@ public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
public <U> CompletableFuture<U> thenComposeAsync(
Function<? super T, ? extends CompletionStage<U>> fn,
Executor executor) {
return uniComposeStage(screenExecutor(executor), fn);
return uniComposeStage(Objects.requireNonNull(executor), fn);
}
public CompletableFuture<T> whenComplete(
@ -2416,7 +2391,7 @@ public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
public CompletableFuture<T> whenCompleteAsync(
BiConsumer<? super T, ? super Throwable> action, Executor executor) {
return uniWhenCompleteStage(screenExecutor(executor), action);
return uniWhenCompleteStage(Objects.requireNonNull(executor), action);
}
public <U> CompletableFuture<U> handle(
@ -2431,7 +2406,7 @@ public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
public <U> CompletableFuture<U> handleAsync(
BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) {
return uniHandleStage(screenExecutor(executor), fn);
return uniHandleStage(Objects.requireNonNull(executor), fn);
}
/**
@ -2461,7 +2436,7 @@ public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
*/
public CompletableFuture<T> exceptionallyAsync(
Function<Throwable, ? extends T> fn, Executor executor) {
return uniExceptionallyStage(screenExecutor(executor), fn);
return uniExceptionallyStage(Objects.requireNonNull(executor), fn);
}
/**
@ -2486,7 +2461,7 @@ public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
public CompletableFuture<T> exceptionallyComposeAsync(
Function<Throwable, ? extends CompletionStage<T>> fn,
Executor executor) {
return uniComposeExceptionallyStage(screenExecutor(executor), fn);
return uniComposeExceptionallyStage(Objects.requireNonNull(executor), fn);
}
/* ------------- Arbitrary-arity constructions -------------- */
@ -2652,8 +2627,7 @@ public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
* @throws NullPointerException if the exception is null
*/
public void obtrudeException(Throwable ex) {
if (ex == null) throw new NullPointerException();
result = new AltResult(ex);
result = new AltResult(Objects.requireNonNull(ex));
postComplete();
}
@ -2784,9 +2758,8 @@ public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
*/
public CompletableFuture<T> completeAsync(Supplier<? extends T> supplier,
Executor executor) {
if (supplier == null || executor == null)
throw new NullPointerException();
executor.execute(new AsyncSupply<T>(this, supplier));
executor.execute(new AsyncSupply<T>(
this, Objects.requireNonNull(supplier)));
return this;
}
@ -2817,11 +2790,8 @@ public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
* @since 9
*/
public CompletableFuture<T> orTimeout(long timeout, TimeUnit unit) {
if (unit == null)
throw new NullPointerException();
if (result == null)
whenComplete(new Canceller(Delayer.delay(new Timeout(this),
timeout, unit)));
arrangeTimeout(unit.toNanos(timeout), // Implicit null-check of unit
new Timeout<T>(this, null, true));
return this;
}
@ -2839,15 +2809,53 @@ public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
*/
public CompletableFuture<T> completeOnTimeout(T value, long timeout,
TimeUnit unit) {
if (unit == null)
throw new NullPointerException();
if (result == null)
whenComplete(new Canceller(Delayer.delay(
new DelayedCompleter<T>(this, value),
timeout, unit)));
arrangeTimeout(unit.toNanos(timeout),
new Timeout<T>(this, value, false));
return this;
}
/** Action to complete (possibly exceptionally) on timeout */
static final class Timeout<U> implements Runnable {
final CompletableFuture<U> f;
final U value;
final boolean exceptional;
Timeout(CompletableFuture<U> f, U value, boolean exceptional) {
this.f = f; this.value = value; this.exceptional = exceptional;
}
public void run() {
if (f != null && !f.isDone()) {
if (exceptional)
f.completeExceptionally(new TimeoutException());
else
f.complete(value);
}
}
}
/** Action to cancel unneeded timeouts */
static final class Canceller implements BiConsumer<Object, Throwable> {
final Future<?> f;
Canceller(Future<?> f) { this.f = f; }
public void accept(Object ignore, Throwable ex) {
if (f != null) // currently never null
f.cancel(false);
}
}
/**
* Schedules a timeout action, as well as whenComplete handling to
* cancel the action if not needed.
*/
private <U> void arrangeTimeout(long nanoDelay, Timeout<U> onTimeout) {
ForkJoinPool e = ASYNC_POOL;
if (result == null) {
ScheduledForkJoinTask<Void> t = new ScheduledForkJoinTask<Void>(
nanoDelay, 0L, true, onTimeout, null, e);
whenComplete(new Canceller(t));
e.scheduleDelayedTask(t);
}
}
/**
* Returns a new Executor that submits a task to the given base
* executor after the given delay (or no delay if non-positive).
@ -2863,9 +2871,8 @@ public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
*/
public static Executor delayedExecutor(long delay, TimeUnit unit,
Executor executor) {
if (unit == null || executor == null)
throw new NullPointerException();
return new DelayedExecutor(delay, unit, executor);
return new DelayedExecutor(unit.toNanos(delay), // implicit null check
Objects.requireNonNull(executor));
}
/**
@ -2881,9 +2888,8 @@ public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
* @since 9
*/
public static Executor delayedExecutor(long delay, TimeUnit unit) {
if (unit == null)
throw new NullPointerException();
return new DelayedExecutor(delay, unit, ASYNC_POOL);
return new DelayedExecutor(unit.toNanos(delay), // implicit null check
ASYNC_POOL);
}
/**
@ -2910,8 +2916,7 @@ public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
* @since 9
*/
public static <U> CompletableFuture<U> failedFuture(Throwable ex) {
if (ex == null) throw new NullPointerException();
return new CompletableFuture<U>(new AltResult(ex));
return new CompletableFuture<U>(new AltResult(Objects.requireNonNull(ex)));
}
/**
@ -2925,48 +2930,23 @@ public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
* @since 9
*/
public static <U> CompletionStage<U> failedStage(Throwable ex) {
if (ex == null) throw new NullPointerException();
return new MinimalStage<U>(new AltResult(ex));
}
/**
* Singleton delay scheduler, used only for starting and
* cancelling tasks.
*/
static final class Delayer {
static ScheduledFuture<?> delay(Runnable command, long delay,
TimeUnit unit) {
return delayer.schedule(command, delay, unit);
}
static final class DaemonThreadFactory implements ThreadFactory {
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
t.setName("CompletableFutureDelayScheduler");
return t;
}
}
static final ScheduledThreadPoolExecutor delayer;
static {
(delayer = new ScheduledThreadPoolExecutor(
1, new DaemonThreadFactory())).
setRemoveOnCancelPolicy(true);
}
return new MinimalStage<U>(new AltResult(Objects.requireNonNull(ex)));
}
// Little class-ified lambdas to better support monitoring
static final class DelayedExecutor implements Executor {
final long delay;
final TimeUnit unit;
final long nanoDelay;
final Executor executor;
DelayedExecutor(long delay, TimeUnit unit, Executor executor) {
this.delay = delay; this.unit = unit; this.executor = executor;
DelayedExecutor(long nanoDelay, Executor executor) {
this.nanoDelay = nanoDelay; this.executor = executor;
}
public void execute(Runnable r) {
Delayer.delay(new TaskSubmitter(executor, r), delay, unit);
ForkJoinPool e = ASYNC_POOL; // Use immediate mode to relay task
e.scheduleDelayedTask(
new ScheduledForkJoinTask<Void>(
nanoDelay, 0L, true,
new TaskSubmitter(executor, r), null, e));
}
}
@ -2981,37 +2961,6 @@ public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
public void run() { executor.execute(action); }
}
/** Action to completeExceptionally on timeout */
static final class Timeout implements Runnable {
final CompletableFuture<?> f;
Timeout(CompletableFuture<?> f) { this.f = f; }
public void run() {
if (f != null && !f.isDone())
f.completeExceptionally(new TimeoutException());
}
}
/** Action to complete on timeout */
static final class DelayedCompleter<U> implements Runnable {
final CompletableFuture<U> f;
final U u;
DelayedCompleter(CompletableFuture<U> f, U u) { this.f = f; this.u = u; }
public void run() {
if (f != null)
f.complete(u);
}
}
/** Action to cancel unneeded timeouts */
static final class Canceller implements BiConsumer<Object, Throwable> {
final Future<?> f;
Canceller(Future<?> f) { this.f = f; }
public void accept(Object ignore, Throwable ex) {
if (f != null && !f.isDone())
f.cancel(false);
}
}
/**
* A subclass that just throws UOE for most non-CompletionStage methods.
*/

View File

@ -0,0 +1,568 @@
/*
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
/*
* This file is available under and governed by the GNU General Public
* License version 2 only, as published by the Free Software Foundation.
* However, the following notice accompanied the original version of this
* file:
*
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/publicdomain/zero/1.0/
*/
package java.util.concurrent;
import java.util.Arrays;
import jdk.internal.misc.Unsafe;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
/**
* An add-on for ForkJoinPools that provides scheduling for
* delayed and periodic tasks
*/
final class DelayScheduler extends Thread {
/*
* A DelayScheduler maintains a 4-ary heap (see
* https://en.wikipedia.org/wiki/D-ary_heap) based on trigger
* times (field ScheduledForkJoinTask.when) along with a pending
* queue of tasks submitted by other threads. When enabled (their
* delays elapse), tasks are relayed to the pool, or run directly
* if task.isImmediate. Immediate mode is designed for internal
* jdk usages in which the (non-blocking) action is to cancel,
* unblock or differently relay another async task (in some cases,
* this relies on user code to trigger async tasks). If
* processing encounters resource failures (possible when growing
* heap or ForkJoinPool WorkQueue arrays), tasks are cancelled.
*
* To reduce memory contention, the heap is maintained solely via
* local variables in method loop() (forcing noticeable code
* sprawl), recording only the current heap size when blocked to
* allow method canShutDown to conservatively check emptiness, and
* to report approximate current size for monitoring.
*
* The pending queue uses a design similar to ForkJoinTask.Aux
* queues: Incoming requests prepend (Treiber-stack-style) to the
* pending list. The scheduler thread takes and nulls out the
* entire list per step to process them as a batch. The pending
* queue may encounter contention and retries among requesters,
* but much less so versus the scheduler. It is possible to use
* multiple pending queues to reduce this form of contention but
* it doesn't seem worthwhile even under heavy loads.
*
* The implementation relies on the scheduler being a non-virtual
* final Thread subclass. Field "active" records whether the
* scheduler may have any pending tasks (and/or shutdown actions)
* to process, otherwise parking either indefinitely or until the
* next task deadline. Incoming pending tasks ensure active
* status, unparking if necessary. The scheduler thread sets
* status to inactive when there is apparently no work, and then
* rechecks before actually parking. The active field takes on a
* negative value on termination, as a sentinel used in pool
* termination checks as well as to suppress reactivation after
* terminating.
*
* We avoid the need for auxilliary data structures by embedding
* pending queue links, heap indices, and pool references inside
* ScheduledForkJoinTasks. (We use the same structure for both
* Runnable and Callable versions, since including an extra field
* in either case doesn't hurt -- it seems mildly preferable for
* these objects to be larger than other kinds of tasks to reduce
* false sharing during possibly frequent bookkeeping updates.) To
* reduce GC pressure and memory retention, these are nulled out
* as soon as possible.
*
* The implementation is designed to accommodate usages in which
* many or even most tasks are cancelled before executing (which
* is typical with IO-based timeouts). The use of a 4-ary heap (in
* which each element has up to 4 children) improves locality, and
* reduces the need for array movement and memory writes compared
* to a standard binary heap, at the expense of more expensive
* replace() operations (with about half the writes but twice the
* reads). Especially in the presence of cancellations, this is
* often faster because the replace method removes cancelled tasks
* seen while performing sift-down operations, in which case these
* elements are not further recorded or accessed, even before
* processing the removal request generated by method
* ScheduledForkJoinTask.cancel() (which is then a no-op or not
* generated at all).
*
* To ensure that comparisons do not encounter integer wraparound
* errors, times are offset with the most negative possible value
* (nanoTimeOffset) determined during static initialization.
* Negative delays are screened out before use.
*
* Upon noticing pool shutdown, delayed and/or periodic tasks are
* purged according to pool configuration and policy; the
* scheduler then tries to terminate the pool if the heap is
* empty. The asynchronicity of these steps with respect to pool
* runState weakens guarantees about exactly when purged tasks
* report isCancelled to callers (they do not run, but there may
* be a lag setting their status).
*/
ForkJoinPool pool; // read once and detached upon starting
volatile ScheduledForkJoinTask<?> pending; // submitted adds and removes
volatile int active; // 0: inactive, -1: stopped, +1: running
int restingSize; // written only before parking
volatile int cancelDelayedTasksOnShutdown; // policy control
int pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7;
int pad8, pad9, padA, padB, padC, padD, padE; // reduce false sharing
private static final int INITIAL_HEAP_CAPACITY = 1 << 6;
private static final int POOL_STOPPING = 1; // must match ForkJoinPool
static final long nanoTimeOffset = // Most negative possible time base
Math.min(System.nanoTime(), 0L) + Long.MIN_VALUE;
private static final Unsafe U; // for atomic operations
private static final long ACTIVE;
private static final long PENDING;
static {
U = Unsafe.getUnsafe();
Class<DelayScheduler> klass = DelayScheduler.class;
ACTIVE = U.objectFieldOffset(klass, "active");
PENDING = U.objectFieldOffset(klass, "pending");
}
DelayScheduler(ForkJoinPool p, String name) {
super(name);
setDaemon(true);
pool = p;
}
/**
* Returns System.nanoTime() with nanoTimeOffset
*/
static final long now() {
return nanoTimeOffset + System.nanoTime();
}
/**
* Ensures the scheduler is not parked unless stopped.
* Returns negative if already stopped
*/
final int signal() {
int state;
if ((state = active) == 0 && U.getAndBitwiseOrInt(this, ACTIVE, 1) == 0)
U.unpark(this);
return state;
}
/**
* Inserts the task (if nonnull) to pending queue, to add,
* remove, or ignore depending on task status when processed.
*/
final void pend(ScheduledForkJoinTask<?> task) {
ScheduledForkJoinTask<?> f = pending;
if (task != null) {
do {} while (
f != (f = (ScheduledForkJoinTask<?>)
U.compareAndExchangeReference(
this, PENDING, task.nextPending = f, task)));
signal();
}
}
/**
* Returns true if (momentarily) inactive and heap is empty
*/
final boolean canShutDown() {
return (active <= 0 && restingSize <= 0);
}
/**
* Returns the number of elements in heap when last idle
*/
final int lastStableSize() {
return (active < 0) ? 0 : restingSize;
}
/**
* Turns on cancelDelayedTasksOnShutdown policy
*/
final void cancelDelayedTasksOnShutdown() {
cancelDelayedTasksOnShutdown = 1;
signal();
}
/**
* Sets up and runs scheduling loop
*/
public final void run() {
ForkJoinPool p = pool;
pool = null; // detach
if (p == null) // failed initialization
active = -1;
else {
try {
loop(p);
} finally {
restingSize = 0;
active = -1;
p.tryStopIfShutdown(this);
}
}
}
/**
* After initialization, repeatedly:
* 1. If apparently no work,
* if active, set tentatively inactive,
* else park until next trigger time, or indefinitely if none
* 2. Process pending tasks in batches, to add or remove from heap
* 3. Check for shutdown, either exiting or preparing for shutdown when empty
* 4. Trigger all enabled tasks by submitting them to pool or run if immediate
*/
private void loop(ForkJoinPool p) {
if (p != null) { // currently always true
ScheduledForkJoinTask<?>[] h = // heap array
new ScheduledForkJoinTask<?>[INITIAL_HEAP_CAPACITY];
int cap = h.length, n = 0, prevRunStatus = 0; // n is heap size
long parkTime = 0L; // zero for untimed park
for (;;) { // loop until stopped
ScheduledForkJoinTask<?> q, t; int runStatus;
if ((q = pending) == null) {
restingSize = n;
if (active != 0) // deactivate and recheck
U.compareAndSetInt(this, ACTIVE, 1, 0);
else {
Thread.interrupted(); // clear before park
U.park(false, parkTime);
}
q = pending;
}
while (q != null && // process pending tasks
(t = (ScheduledForkJoinTask<?>)
U.getAndSetReference(this, PENDING, null)) != null) {
ScheduledForkJoinTask<?> next;
do {
int i;
if ((next = t.nextPending) != null)
t.nextPending = null;
if ((i = t.heapIndex) >= 0) {
t.heapIndex = -1; // remove cancelled task
if (i < cap && h[i] == t)
n = replace(h, i, n);
}
else if (n >= cap || n < 0)
t.trySetCancelled(); // couldn't resize
else {
long d = t.when; // add and sift up
if (t.status >= 0) {
ScheduledForkJoinTask<?> parent;
int k = n++, pk, newCap;
while (k > 0 &&
(parent = h[pk = (k - 1) >>> 2]) != null &&
(parent.when > d)) {
parent.heapIndex = k;
h[k] = parent;
k = pk;
}
t.heapIndex = k;
h[k] = t;
if (n >= cap && (newCap = cap << 1) > cap) {
ScheduledForkJoinTask<?>[] a = null;
try { // try to resize
a = Arrays.copyOf(h, newCap);
} catch (Error | RuntimeException ex) {
}
if (a != null && a.length == newCap) {
cap = newCap;
h = a; // else keep using old array
}
}
}
}
} while ((t = next) != null);
q = pending;
}
if ((runStatus = p.shutdownStatus(this)) != 0) {
if ((n = tryStop(p, h, n, runStatus, prevRunStatus)) < 0)
break;
prevRunStatus = runStatus;
}
parkTime = 0L;
if (n > 0 && h.length > 0) { // submit enabled tasks
long now = now();
do {
ScheduledForkJoinTask<?> f; int stat;
if ((f = h[0]) != null) {
long d = f.when - now;
if ((stat = f.status) >= 0 && d > 0L) {
parkTime = d;
break;
}
f.heapIndex = -1;
if (stat < 0)
; // already cancelled
else if (f.isImmediate)
f.doExec();
else {
try {
p.executeEnabledScheduledTask(f);
}
catch (Error | RuntimeException ex) {
f.trySetCancelled();
}
}
}
} while ((n = replace(h, 0, n)) > 0);
}
}
}
}
/**
* Replaces removed heap element at index k, along with other
* cancelled nodes found while doing so.
* @return current heap size
*/
private static int replace(ScheduledForkJoinTask<?>[] h, int k, int n) {
if (h != null && h.length >= n) { // hoist checks
while (k >= 0 && n > k) {
int alsoReplace = -1; // non-negative if cancelled task seen
ScheduledForkJoinTask<?> t = null, u;
long d = 0L;
while (--n > k) { // find uncancelled replacement
if ((u = h[n]) != null) {
h[n] = null;
d = u.when;
if (u.status >= 0) {
t = u;
break;
}
u.heapIndex = -1;
}
}
if (t != null) { // sift down
for (int cs; (cs = (k << 2) + 1) < n; ) {
ScheduledForkJoinTask<?> leastChild = null, c;
int leastIndex = 0;
long leastValue = Long.MAX_VALUE;
for (int ck = cs, j = 4;;) { // at most 4 children
if ((c = h[ck]) == null)
break;
long cd = c.when;
if (c.status < 0 && alsoReplace < 0) {
alsoReplace = ck; // at most once per pass
c.heapIndex = -1;
}
else if (leastChild == null || cd < leastValue) {
leastValue = cd;
leastIndex = ck;
leastChild = c;
}
if (--j == 0 || ++ck >= n)
break;
}
if (leastChild == null || d <= leastValue)
break;
leastChild.heapIndex = k;
h[k] = leastChild;
k = leastIndex;
}
t.heapIndex = k;
}
h[k] = t;
k = alsoReplace;
}
}
return n;
}
/**
* Call only when pool run status is nonzero. Possibly cancels
* tasks and stops during pool shutdown and termination. If called
* when shutdown but not stopping, removes tasks according to
* policy if not already done so, and if not empty or pool not
* terminating, returns. Otherwise, cancels all tasks in heap and
* pending queue.
* @return negative if stop, else current heap size.
*/
private int tryStop(ForkJoinPool p, ScheduledForkJoinTask<?>[] h, int n,
int runStatus, int prevRunStatus) {
if ((runStatus & POOL_STOPPING) == 0) {
if (n > 0) {
if (cancelDelayedTasksOnShutdown != 0) {
cancelAll(h, n);
n = 0;
}
else if (prevRunStatus == 0 && h != null && h.length >= n) {
ScheduledForkJoinTask<?> t; int stat; // remove periodic tasks
for (int i = n - 1; i >= 0; --i) {
if ((t = h[i]) != null &&
((stat = t.status) < 0 || t.nextDelay != 0L)) {
t.heapIndex = -1;
if (stat >= 0)
t.trySetCancelled();
n = replace(h, i, n);
}
}
}
}
if (n > 0 || p == null || !p.tryStopIfShutdown(this))
return n; // check for quiescent shutdown
}
if (n > 0)
cancelAll(h, n);
for (ScheduledForkJoinTask<?> a = (ScheduledForkJoinTask<?>)
U.getAndSetReference(this, PENDING, null);
a != null; a = a.nextPending)
a.trySetCancelled(); // clear pending requests
return -1;
}
private static void cancelAll(ScheduledForkJoinTask<?>[] h, int n) {
if (h != null && h.length >= n) {
ScheduledForkJoinTask<?> t;
for (int i = 0; i < n; ++i) {
if ((t = h[i]) != null) {
h[i] = null;
t.heapIndex = -1;
t.trySetCancelled();
}
}
}
}
/**
* Task class for DelayScheduler operations
*/
@SuppressWarnings("serial") // Not designed to be serializable
static final class ScheduledForkJoinTask<T>
extends ForkJoinTask.InterruptibleTask<T>
implements ScheduledFuture<T> {
ForkJoinPool pool; // nulled out after use
Runnable runnable; // at most one of runnable or callable nonnull
Callable<? extends T> callable;
T result;
ScheduledForkJoinTask<?> nextPending; // for DelayScheduler pending queue
long when; // nanoTime-based trigger time
final long nextDelay; // 0: once; <0: fixedDelay; >0: fixedRate
int heapIndex; // if non-negative, index on heap
final boolean isImmediate; // run by scheduler vs submitted when ready
/**
* Creates a new ScheduledForkJoinTask
* @param delay initial delay, in nanoseconds
* @param nextDelay 0 for one-shot, negative for fixed delay,
* positive for fixed rate, in nanoseconds
* @param isImmediate if action is to be performed
* by scheduler versus submitting to a WorkQueue
* @param runnable action (null if implementing callable version)
* @param callable function (null if implementing runnable versions)
* @param pool the pool for resubmissions and cancellations
* (disabled if null)
*/
public ScheduledForkJoinTask(long delay, long nextDelay,
boolean isImmediate, Runnable runnable,
Callable<T> callable, ForkJoinPool pool) {
this.when = DelayScheduler.now() + Math.max(delay, 0L);
this.heapIndex = -1;
this.nextDelay = nextDelay;
this.isImmediate = isImmediate;
this.runnable = runnable;
this.callable = callable;
this.pool = pool;
}
public void schedule() { // relay to pool, to allow independent use
ForkJoinPool p;
if ((p = pool) != null) // else already run
p.scheduleDelayedTask(this);
}
// InterruptibleTask methods
public final T getRawResult() { return result; }
public final void setRawResult(T v) { result = v; }
final Object adaptee() { return (runnable != null) ? runnable : callable; }
final T compute() throws Exception {
Callable<? extends T> c; Runnable r;
T res = null;
if ((r = runnable) != null)
r.run();
else if ((c = callable) != null)
res = c.call();
return res;
}
final boolean postExec() { // possibly resubmit
long d; ForkJoinPool p; DelayScheduler ds;
if ((d = nextDelay) != 0L && // is periodic
status >= 0 && // not abnormally completed
(p = pool) != null && (ds = p.delayScheduler) != null) {
if (p.shutdownStatus(ds) == 0) {
heapIndex = -1;
if (d < 0L)
when = DelayScheduler.now() - d;
else
when += d;
ds.pend(this);
return false;
}
trySetCancelled(); // pool is shutdown
}
pool = null; // reduce memory retention
runnable = null;
callable = null;
return true;
}
public final boolean cancel(boolean mayInterruptIfRunning) {
int s; ForkJoinPool p; DelayScheduler ds;
if ((s = trySetCancelled()) < 0)
return ((s & (ABNORMAL | THROWN)) == ABNORMAL);
if ((p = pool) != null &&
!interruptIfRunning(mayInterruptIfRunning)) {
pool = null;
runnable = null;
callable = null;
if (heapIndex >= 0 && nextPending == null &&
(ds = p.delayScheduler) != null)
ds.pend(this); // for heap cleanup
}
return true;
}
// ScheduledFuture methods
public final long getDelay(TimeUnit unit) {
return unit.convert(when - DelayScheduler.now(), NANOSECONDS);
}
public int compareTo(Delayed other) { // never used internally
long diff = (other instanceof ScheduledForkJoinTask<?> t) ?
when - t.when : // avoid nanoTime calls and conversions
getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
}
}

View File

@ -42,6 +42,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.LockSupport;
@ -50,6 +51,7 @@ import jdk.internal.access.JavaUtilConcurrentFJPAccess;
import jdk.internal.access.SharedSecrets;
import jdk.internal.misc.Unsafe;
import jdk.internal.vm.SharedThreadContainer;
import static java.util.concurrent.DelayScheduler.ScheduledForkJoinTask;
/**
* An {@link ExecutorService} for running {@link ForkJoinTask}s.
@ -133,11 +135,34 @@ import jdk.internal.vm.SharedThreadContainer;
* </tr>
* </table>
*
* <p>Additionally, this class supports {@link
* ScheduledExecutorService} methods to delay or periodically execute
* tasks, as well as method {@link #submitWithTimeout} to cancel tasks
* that take too long. The scheduled functions or actions may create
* and invoke other {@linkplain ForkJoinTask ForkJoinTasks}. Delayed
* actions become <em>enabled</em> and behave as ordinary submitted
* tasks when their delays elapse. Scheduling methods return
* {@linkplain ForkJoinTask ForkJoinTasks} that implement the {@link
* ScheduledFuture} interface. Resource exhaustion encountered after
* initial submission results in task cancellation. When time-based
* methods are used, shutdown policies match the default policies of
* class {@link ScheduledThreadPoolExecutor}: upon {@link #shutdown},
* existing periodic tasks will not re-execute, and the pool
* terminates when quiescent and existing delayed tasks
* complete. Method {@link #cancelDelayedTasksOnShutdown} may be used
* to disable all delayed tasks upon shutdown, and method {@link
* #shutdownNow} may be used to instead unconditionally initiate pool
* termination. Monitoring methods such as {@link #getQueuedTaskCount}
* do not include scheduled tasks that are not yet enabled to execute,
* which are reported separately by method {@link
* #getDelayedTaskCount}.
*
* <p>The parameters used to construct the common pool may be controlled by
* setting the following {@linkplain System#getProperty system properties}:
* <ul>
* <li>{@systemProperty java.util.concurrent.ForkJoinPool.common.parallelism}
* - the parallelism level, a non-negative integer
* - the parallelism level, a non-negative integer. Usage is discouraged.
* Use {@link #setParallelism} instead.
* <li>{@systemProperty java.util.concurrent.ForkJoinPool.common.threadFactory}
* - the class name of a {@link ForkJoinWorkerThreadFactory}.
* The {@linkplain ClassLoader#getSystemClassLoader() system class loader}
@ -155,10 +180,11 @@ import jdk.internal.vm.SharedThreadContainer;
* {@linkplain Thread#getContextClassLoader() thread context class loader}.
*
* Upon any error in establishing these settings, default parameters
* are used. It is possible to disable or limit the use of threads in
* the common pool by setting the parallelism property to zero, and/or
* using a factory that may return {@code null}. However doing so may
* cause unjoined tasks to never be executed.
* are used. It is possible to disable use of threads by using a
* factory that may return {@code null}, in which case some tasks may
* never execute. While possible, it is strongly discouraged to set
* the parallelism property to zero, which may be internally
* overridden in the presence of intrinsically async tasks.
*
* @implNote This implementation restricts the maximum number of
* running threads to 32767. Attempts to create pools with greater
@ -171,7 +197,8 @@ import jdk.internal.vm.SharedThreadContainer;
* @since 1.7
* @author Doug Lea
*/
public class ForkJoinPool extends AbstractExecutorService {
public class ForkJoinPool extends AbstractExecutorService
implements ScheduledExecutorService {
/*
* Implementation Overview
@ -215,6 +242,7 @@ public class ForkJoinPool extends AbstractExecutorService {
* 3. Completion-based tasks (mainly CountedCompleters)
* 4. CommonPool and parallelStream support
* 5. InterruptibleTasks for externally submitted tasks
* 6. Support ScheduledExecutorService methods
*
* Most changes involve adaptions of base algorithms using
* combinations of static and dynamic bitwise mode settings (both
@ -359,18 +387,18 @@ public class ForkJoinPool extends AbstractExecutorService {
* WorkQueues are also used in a similar way for tasks submitted
* to the pool. We cannot mix these tasks in the same queues used
* by workers. Instead, we randomly associate submission queues
* with submitting threads, using a form of hashing. The
* ThreadLocalRandom probe value serves as a hash code for
* choosing existing queues, and may be randomly repositioned upon
* contention with other submitters. In essence, submitters act
* like workers except that they are restricted to executing local
* tasks that they submitted (or when known, subtasks thereof).
* Insertion of tasks in shared mode requires a lock. We use only
* a simple spinlock (as one role of field "phase") because
* submitters encountering a busy queue move to a different
* position to use or create other queues. They (spin) block when
* registering new queues, or indirectly elsewhere, by revisiting
* later.
* with submitting threads (or carriers when using VirtualThreads)
* using a form of hashing. The ThreadLocalRandom probe value
* serves as a hash code for choosing existing queues, and may be
* randomly repositioned upon contention with other submitters.
* In essence, submitters act like workers except that they are
* restricted to executing local tasks that they submitted (or
* when known, subtasks thereof). Insertion of tasks in shared
* mode requires a lock. We use only a simple spinlock (as one
* role of field "phase") because submitters encountering a busy
* queue move to a different position to use or create other
* queues. They (spin) block when registering new queues, or
* indirectly elsewhere, by revisiting later.
*
* Management
* ==========
@ -826,7 +854,13 @@ public class ForkJoinPool extends AbstractExecutorService {
* fashion or use CountedCompleters (as is true for jdk
* parallelStreams). Support infiltrates several methods,
* including those that retry helping steps until we are sure that
* none apply if there are no workers.
* none apply if there are no workers. To deal with conflicting
* requirements, uses of the commonPool that require async because
* caller-runs need not apply, ensure threads are enabled (by
* setting parallelism) via method asyncCommonPool before
* proceeding. (In principle, overriding zero parallelism needs to
* ensure at least one worker, but due to other backward
* compatibility contraints, ensures two.)
*
* As a more appropriate default in managed environments, unless
* overridden by system properties, we use workers of subclass
@ -851,7 +885,8 @@ public class ForkJoinPool extends AbstractExecutorService {
* To comply with ExecutorService specs, we use subclasses of
* abstract class InterruptibleTask for tasks that require
* stronger interruption and cancellation guarantees. External
* submitters never run these tasks, even if in the common pool.
* submitters never run these tasks, even if in the common pool
* (as indicated by ForkJoinTask.noUserHelp status bit).
* InterruptibleTasks include a "runner" field (implemented
* similarly to FutureTask) to support cancel(true). Upon pool
* shutdown, runners are interrupted so they can cancel. Since
@ -880,6 +915,27 @@ public class ForkJoinPool extends AbstractExecutorService {
* writing, virtual thread bodies are by default run as some form
* of InterruptibleTask.
*
* DelayScheduler
* ================
*
* This class supports ScheduledExecutorService methods by
* creating and starting a DelayScheduler on first use of these
* methods (via startDelayScheduler). The scheduler operates
* independently in its own thread, relaying tasks to the pool to
* execute when their delays elapse (see method
* executeEnabledScheduledTask). The only other interactions with
* the delayScheduler are to control shutdown and maintain
* shutdown-related policies in methods quiescent() and
* tryTerminate(). In particular, processing must deal with cases
* in which tasks are submitted before shutdown, but not enabled
* until afterwards, in which case they must bypass some screening
* to be allowed to run. Conversely, the DelayScheduler checks
* runState status and when enabled, completes termination, using
* only methods shutdownStatus and tryStopIfShutdown. All of these
* methods are final and have signatures referencing
* DelaySchedulers, so cannot conflict with those of any existing
* FJP subclasses.
*
* Memory placement
* ================
*
@ -950,7 +1006,9 @@ public class ForkJoinPool extends AbstractExecutorService {
* Nearly all explicit checks lead to bypass/return, not exception
* throws, because they may legitimately arise during shutdown. A
* few unusual loop constructions encourage (with varying
* effectiveness) JVMs about where (not) to place safepoints.
* effectiveness) JVMs about where (not) to place safepoints. All
* public methods screen arguments (mainly null checks) before
* creating or executing tasks.
*
* There is a lot of representation-level coupling among classes
* ForkJoinPool, ForkJoinWorkerThread, and ForkJoinTask. The
@ -1045,6 +1103,7 @@ public class ForkJoinPool extends AbstractExecutorService {
static final int DROPPED = 1 << 16; // removed from ctl counts
static final int UNCOMPENSATE = 1 << 16; // tryCompensate return
static final int IDLE = 1 << 16; // phase seqlock/version count
static final int MIN_QUEUES_SIZE = 1 << 4; // ensure external slots
/*
* Bits and masks for ctl and bounds are packed with 4 16 bit subfields:
@ -1214,7 +1273,7 @@ public class ForkJoinPool extends AbstractExecutorService {
/**
* Pushes a task. Called only by owner or if already locked
*
* @param task the task. Caller must ensure non-null.
* @param task the task; no-op if null
* @param pool the pool to signal if was previously empty, else null
* @param internal if caller owns this queue
* @throws RejectedExecutionException if array could not be resized
@ -1222,7 +1281,9 @@ public class ForkJoinPool extends AbstractExecutorService {
final void push(ForkJoinTask<?> task, ForkJoinPool pool,
boolean internal) {
int s = top, b = base, m, cap, room; ForkJoinTask<?>[] a;
if ((a = array) != null && (cap = a.length) > 0) { // else disabled
if ((a = array) != null && (cap = a.length) > 0 && // else disabled
task != null) {
int pk = task.noUserHelp() + 1; // prev slot offset
if ((room = (m = cap - 1) - (s - b)) >= 0) {
top = s + 1;
long pos = slotOffset(m & s);
@ -1237,10 +1298,7 @@ public class ForkJoinPool extends AbstractExecutorService {
unlockPhase();
if (room < 0)
throw new RejectedExecutionException("Queue capacity exceeded");
if ((room == 0 || // pad for InterruptibleTasks
a[m & (s - ((internal || task == null ||
task.getClass().getSuperclass() !=
interruptibleTaskClass) ? 1 : 2))] == null) &&
if ((room == 0 || a[m & (s - pk)] == null) &&
pool != null)
pool.signalWork(); // may have appeared empty
}
@ -1579,11 +1637,6 @@ public class ForkJoinPool extends AbstractExecutorService {
*/
static volatile RuntimePermission modifyThreadPermission;
/**
* Cached for faster type tests.
*/
static final Class<?> interruptibleTaskClass;
/**
* For VirtualThread intrinsics
*/
@ -1596,12 +1649,15 @@ public class ForkJoinPool extends AbstractExecutorService {
final UncaughtExceptionHandler ueh; // per-worker UEH
final SharedThreadContainer container;
final String workerNamePrefix; // null for common pool
final String poolName;
volatile DelayScheduler delayScheduler; // lazily constructed
WorkQueue[] queues; // main registry
volatile long runState; // versioned, lockable
final long keepAlive; // milliseconds before dropping if idle
final long config; // static configuration bits
volatile long stealCount; // collects worker nsteals
volatile long threadIds; // for worker thread names
@jdk.internal.vm.annotation.Contended("fjpctl") // segregate
volatile long ctl; // main pool control
@jdk.internal.vm.annotation.Contended("fjpctl") // colocate
@ -1885,6 +1941,7 @@ public class ForkJoinPool extends AbstractExecutorService {
long phaseSum = 0L;
boolean swept = false;
for (long e, prevRunState = 0L; ; prevRunState = e) {
DelayScheduler ds;
long c = ctl;
if (((e = runState) & STOP) != 0L)
return 1; // terminating
@ -1907,6 +1964,8 @@ public class ForkJoinPool extends AbstractExecutorService {
}
else if ((e & SHUTDOWN) == 0)
return 0;
else if ((ds = delayScheduler) != null && !ds.canShutDown())
return 0;
else if (compareAndSetCtl(c, c) && casRunState(e, e | STOP))
return 1; // enable termination
else
@ -1957,15 +2016,13 @@ public class ForkJoinPool extends AbstractExecutorService {
}
else {
boolean propagate;
int nb = q.base = b + 1;
int nb = q.base = b + 1, prevSrc = src;
w.nsteals = ++nsteals;
w.source = j; // volatile
w.source = src = j; // volatile
rescan = true;
int nh = t.noUserHelp();
if (propagate =
((src != (src = j) ||
t.getClass().getSuperclass() ==
interruptibleTaskClass) &&
a[nb & m] != null))
(prevSrc != src || nh != 0) && a[nb & m] != null)
signalWork();
w.topLevelExec(t, fifo);
if ((b = q.base) != nb && !propagate)
@ -2004,8 +2061,8 @@ public class ForkJoinPool extends AbstractExecutorService {
((e & SHUTDOWN) != 0L && ac == 0 && quiescent() > 0) ||
(qs = queues) == null || (n = qs.length) <= 0)
return IDLE; // terminating
int prechecks = Math.min(ac, 2); // reactivation threshold
for (int k = Math.max(n << 2, SPIN_WAITS << 1);;) {
int k = Math.max(n << 2, SPIN_WAITS << 1);
for (int prechecks = k / n;;) { // reactivation threshold
WorkQueue q; int cap; ForkJoinTask<?>[] a; long c;
if (w.phase == activePhase)
return activePhase;
@ -2014,7 +2071,7 @@ public class ForkJoinPool extends AbstractExecutorService {
if ((q = qs[k & (n - 1)]) == null)
Thread.onSpinWait();
else if ((a = q.array) != null && (cap = a.length) > 0 &&
a[q.base & (cap - 1)] != null && --prechecks < 0 &&
a[q.base & (cap - 1)] != null && --prechecks <= 0 &&
(int)(c = ctl) == activePhase &&
compareAndSetCtl(c, (sp & LMASK) | ((c + RC_UNIT) & UMASK)))
return w.phase = activePhase; // reactivate
@ -2513,37 +2570,45 @@ public class ForkJoinPool extends AbstractExecutorService {
* Finds and locks a WorkQueue for an external submitter, or
* throws RejectedExecutionException if shutdown or terminating.
* @param r current ThreadLocalRandom.getProbe() value
* @param isSubmit false if this is for a common pool fork
* @param rejectOnShutdown true if RejectedExecutionException
* should be thrown when shutdown (else only if terminating)
*/
private WorkQueue submissionQueue(int r) {
if (r == 0) {
private WorkQueue submissionQueue(int r, boolean rejectOnShutdown) {
int reuse; // nonzero if prefer create
if ((reuse = r) == 0) {
ThreadLocalRandom.localInit(); // initialize caller's probe
r = ThreadLocalRandom.getProbe();
}
for (;;) {
int n, i, id; WorkQueue[] qs; WorkQueue q, w = null;
for (int probes = 0; ; ++probes) {
int n, i, id; WorkQueue[] qs; WorkQueue q;
if ((qs = queues) == null)
break;
if ((n = qs.length) <= 0)
break;
if ((q = qs[i = (id = r & EXTERNAL_ID_MASK) & (n - 1)]) == null) {
if (w == null)
w = new WorkQueue(null, id, 0, false);
WorkQueue w = new WorkQueue(null, id, 0, false);
w.phase = id;
long isShutdown = lockRunState() & SHUTDOWN;
if (isShutdown == 0L && queues == qs && qs[i] == null) {
q = qs[i] = w; // else retry
w = null;
}
boolean reject = ((lockRunState() & SHUTDOWN) != 0 &&
rejectOnShutdown);
if (!reject && queues == qs && qs[i] == null)
q = qs[i] = w; // else lost race to install
unlockRunState();
if (q != null)
return q;
if (isShutdown != 0L)
if (reject)
break;
reuse = 0;
}
else if (!q.tryLockPhase()) // move index
if (reuse == 0 || !q.tryLockPhase()) { // move index
if (reuse == 0) {
if (probes >= n >> 1)
reuse = r; // stop prefering free slot
}
else if (q != null)
reuse = 0; // probe on collision
r = ThreadLocalRandom.advanceProbe(r);
else if ((runState & SHUTDOWN) != 0L) {
}
else if (rejectOnShutdown && (runState & SHUTDOWN) != 0L) {
q.unlockPhase(); // check while q lock held
break;
}
@ -2553,7 +2618,7 @@ public class ForkJoinPool extends AbstractExecutorService {
throw new RejectedExecutionException();
}
private void poolSubmit(boolean signalIfEmpty, ForkJoinTask<?> task) {
private <T> ForkJoinTask<T> poolSubmit(boolean signalIfEmpty, ForkJoinTask<T> task) {
Thread t; ForkJoinWorkerThread wt; WorkQueue q; boolean internal;
if (((t = JLA.currentCarrierThread()) instanceof ForkJoinWorkerThread) &&
(wt = (ForkJoinWorkerThread)t).pool == this) {
@ -2562,21 +2627,22 @@ public class ForkJoinPool extends AbstractExecutorService {
}
else { // find and lock queue
internal = false;
q = submissionQueue(ThreadLocalRandom.getProbe());
q = submissionQueue(ThreadLocalRandom.getProbe(), true);
}
q.push(task, signalIfEmpty ? this : null, internal);
return task;
}
/**
* Returns queue for an external submission, bypassing call to
* submissionQueue if already established and unlocked.
*/
final WorkQueue externalSubmissionQueue() {
final WorkQueue externalSubmissionQueue(boolean rejectOnShutdown) {
WorkQueue[] qs; WorkQueue q; int n;
int r = ThreadLocalRandom.getProbe();
return (((qs = queues) != null && (n = qs.length) > 0 &&
(q = qs[r & EXTERNAL_ID_MASK & (n - 1)]) != null && r != 0 &&
q.tryLockPhase()) ? q : submissionQueue(r));
q.tryLockPhase()) ? q : submissionQueue(r, rejectOnShutdown));
}
/**
@ -2700,14 +2766,20 @@ public class ForkJoinPool extends AbstractExecutorService {
}
}
else if ((isShutdown = (e & SHUTDOWN)) != 0L || enable) {
long quiet; DelayScheduler ds;
if (isShutdown == 0L)
getAndBitwiseOrRunState(SHUTDOWN);
if (quiescent() > 0)
if ((quiet = quiescent()) > 0)
now = true;
else if (quiet == 0 && (ds = delayScheduler) != null)
ds.signal();
}
if (now) {
DelayScheduler ds;
releaseWaiters();
if ((ds = delayScheduler) != null)
ds.signal();
for (;;) {
if (((e = runState) & CLEANED) == 0L) {
boolean clean = cleanQueues();
@ -2718,6 +2790,8 @@ public class ForkJoinPool extends AbstractExecutorService {
break;
if (ctl != 0L) // else loop if didn't finish cleaning
break;
if ((ds = delayScheduler) != null && ds.signal() >= 0)
break;
if ((e & CLEANED) != 0L) {
e |= TERMINATED;
if ((getAndBitwiseOrRunState(TERMINATED) & TERMINATED) == 0L) {
@ -2888,12 +2962,9 @@ public class ForkJoinPool extends AbstractExecutorService {
* event-style asynchronous tasks. For default value, use {@code
* false}.
*
* @param corePoolSize the number of threads to keep in the pool
* (unless timed out after an elapsed keep-alive). Normally (and
* by default) this is the same value as the parallelism level,
* but may be set to a larger value to reduce dynamic overhead if
* tasks regularly block. Using a smaller value (for example
* {@code 0}) has the same effect as the default.
* @param corePoolSize ignored: used in previous releases of this
* class but no longer applicable. Using {@code 0} maintains
* compatibility across releases.
*
* @param maximumPoolSize the maximum number of threads allowed.
* When the maximum is reached, attempts to replace blocked
@ -2957,7 +3028,8 @@ public class ForkJoinPool extends AbstractExecutorService {
throw new IllegalArgumentException();
if (factory == null || unit == null)
throw new NullPointerException();
int size = 1 << (33 - Integer.numberOfLeadingZeros(p - 1));
int size = Math.max(MIN_QUEUES_SIZE,
1 << (33 - Integer.numberOfLeadingZeros(p - 1)));
this.parallelism = p;
this.factory = factory;
this.ueh = handler;
@ -2971,6 +3043,7 @@ public class ForkJoinPool extends AbstractExecutorService {
this.queues = new WorkQueue[size];
String pid = Integer.toString(getAndAddPoolIds(1) + 1);
String name = "ForkJoinPool-" + pid;
this.poolName = name;
this.workerNamePrefix = name + "-worker-";
this.container = SharedThreadContainer.create(name);
}
@ -2980,6 +3053,7 @@ public class ForkJoinPool extends AbstractExecutorService {
* overridden by system properties
*/
private ForkJoinPool(byte forCommonPoolOnly) {
String name = "ForkJoinPool.commonPool";
ForkJoinWorkerThreadFactory fac = defaultForkJoinWorkerThreadFactory;
UncaughtExceptionHandler handler = null;
int maxSpares = DEFAULT_COMMON_MAX_SPARES;
@ -3013,7 +3087,9 @@ public class ForkJoinPool extends AbstractExecutorService {
if (preset == 0)
pc = Math.max(1, Runtime.getRuntime().availableProcessors() - 1);
int p = Math.min(pc, MAX_CAP);
int size = (p == 0) ? 1 : 1 << (33 - Integer.numberOfLeadingZeros(p-1));
int size = Math.max(MIN_QUEUES_SIZE,
(p == 0) ? 1 :
1 << (33 - Integer.numberOfLeadingZeros(p-1)));
this.parallelism = p;
this.config = ((preset & LMASK) | (((long)maxSpares) << TC_SHIFT) |
(1L << RC_SHIFT));
@ -3022,8 +3098,9 @@ public class ForkJoinPool extends AbstractExecutorService {
this.keepAlive = DEFAULT_KEEPALIVE;
this.saturate = null;
this.workerNamePrefix = null;
this.poolName = name;
this.queues = new WorkQueue[size];
this.container = SharedThreadContainer.create("ForkJoinPool.commonPool");
this.container = SharedThreadContainer.create(name);
}
/**
@ -3044,6 +3121,16 @@ public class ForkJoinPool extends AbstractExecutorService {
return common;
}
/**
* Package-private access to commonPool overriding zero parallelism
*/
static ForkJoinPool asyncCommonPool() {
ForkJoinPool cp; int p;
if ((p = (cp = common).parallelism) == 0)
U.compareAndSetInt(cp, PARALLELISM, 0, 2);
return cp;
}
// Execution methods
/**
@ -3064,8 +3151,7 @@ public class ForkJoinPool extends AbstractExecutorService {
* scheduled for execution
*/
public <T> T invoke(ForkJoinTask<T> task) {
Objects.requireNonNull(task);
poolSubmit(true, task);
poolSubmit(true, Objects.requireNonNull(task));
try {
return task.join();
} catch (RuntimeException | Error unchecked) {
@ -3084,8 +3170,7 @@ public class ForkJoinPool extends AbstractExecutorService {
* scheduled for execution
*/
public void execute(ForkJoinTask<?> task) {
Objects.requireNonNull(task);
poolSubmit(true, task);
poolSubmit(true, Objects.requireNonNull(task));
}
// AbstractExecutorService methods
@ -3098,7 +3183,7 @@ public class ForkJoinPool extends AbstractExecutorService {
@Override
@SuppressWarnings("unchecked")
public void execute(Runnable task) {
poolSubmit(true, (task instanceof ForkJoinTask<?>)
poolSubmit(true, (Objects.requireNonNull(task) instanceof ForkJoinTask<?>)
? (ForkJoinTask<Void>) task // avoid re-wrap
: new ForkJoinTask.RunnableExecuteAction(task));
}
@ -3118,9 +3203,7 @@ public class ForkJoinPool extends AbstractExecutorService {
* scheduled for execution
*/
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
Objects.requireNonNull(task);
poolSubmit(true, task);
return task;
return poolSubmit(true, Objects.requireNonNull(task));
}
/**
@ -3130,12 +3213,12 @@ public class ForkJoinPool extends AbstractExecutorService {
*/
@Override
public <T> ForkJoinTask<T> submit(Callable<T> task) {
ForkJoinTask<T> t =
Objects.requireNonNull(task);
return poolSubmit(
true,
(Thread.currentThread() instanceof ForkJoinWorkerThread) ?
new ForkJoinTask.AdaptedCallable<T>(task) :
new ForkJoinTask.AdaptedInterruptibleCallable<T>(task);
poolSubmit(true, t);
return t;
new ForkJoinTask.AdaptedInterruptibleCallable<T>(task));
}
/**
@ -3145,12 +3228,12 @@ public class ForkJoinPool extends AbstractExecutorService {
*/
@Override
public <T> ForkJoinTask<T> submit(Runnable task, T result) {
ForkJoinTask<T> t =
Objects.requireNonNull(task);
return poolSubmit(
true,
(Thread.currentThread() instanceof ForkJoinWorkerThread) ?
new ForkJoinTask.AdaptedRunnable<T>(task, result) :
new ForkJoinTask.AdaptedInterruptibleRunnable<T>(task, result);
poolSubmit(true, t);
return t;
new ForkJoinTask.AdaptedInterruptibleRunnable<T>(task, result));
}
/**
@ -3161,13 +3244,14 @@ public class ForkJoinPool extends AbstractExecutorService {
@Override
@SuppressWarnings("unchecked")
public ForkJoinTask<?> submit(Runnable task) {
ForkJoinTask<?> f = (task instanceof ForkJoinTask<?>) ?
Objects.requireNonNull(task);
return poolSubmit(
true,
(task instanceof ForkJoinTask<?>) ?
(ForkJoinTask<Void>) task : // avoid re-wrap
((Thread.currentThread() instanceof ForkJoinWorkerThread) ?
new ForkJoinTask.AdaptedRunnable<Void>(task, null) :
new ForkJoinTask.AdaptedInterruptibleRunnable<Void>(task, null));
poolSubmit(true, f);
return f;
new ForkJoinTask.AdaptedInterruptibleRunnable<Void>(task, null)));
}
/**
@ -3189,7 +3273,7 @@ public class ForkJoinPool extends AbstractExecutorService {
*/
public <T> ForkJoinTask<T> externalSubmit(ForkJoinTask<T> task) {
Objects.requireNonNull(task);
externalSubmissionQueue().push(task, this, false);
externalSubmissionQueue(true).push(task, this, false);
return task;
}
@ -3210,9 +3294,7 @@ public class ForkJoinPool extends AbstractExecutorService {
* @since 19
*/
public <T> ForkJoinTask<T> lazySubmit(ForkJoinTask<T> task) {
Objects.requireNonNull(task);
poolSubmit(false, task);
return task;
return poolSubmit(false, Objects.requireNonNull(task));
}
/**
@ -3350,6 +3432,322 @@ public class ForkJoinPool extends AbstractExecutorService {
.invokeAny(tasks, this, true, unit.toNanos(timeout));
}
// Support for delayed tasks
/**
* Returns STOP and SHUTDOWN status (zero if neither), masking or
* truncating out other bits.
*/
final int shutdownStatus(DelayScheduler ds) {
return (int)(runState & (SHUTDOWN | STOP));
}
/**
* Tries to stop and possibly terminate if already enabled, return success.
*/
final boolean tryStopIfShutdown(DelayScheduler ds) {
return (tryTerminate(false, false) & STOP) != 0L;
}
/**
* Creates and starts DelayScheduler
*/
private DelayScheduler startDelayScheduler() {
DelayScheduler ds;
if ((ds = delayScheduler) == null) {
boolean start = false;
String name = poolName + "-delayScheduler";
if (workerNamePrefix == null)
asyncCommonPool(); // override common parallelism zero
lockRunState();
try {
if ((ds = delayScheduler) == null) {
ds = delayScheduler = new DelayScheduler(this, name);
start = true;
}
} finally {
unlockRunState();
}
if (start) { // start outside of lock
// exceptions on start passed to (external) callers
SharedThreadContainer ctr;
if ((ctr = container) != null)
ctr.start(ds);
else
ds.start();
}
}
return ds;
}
/**
* Arranges execution of a ScheduledForkJoinTask whose delay has
* elapsed
*/
final void executeEnabledScheduledTask(ScheduledForkJoinTask<?> task) {
externalSubmissionQueue(false).push(task, this, false);
}
/**
* Arranges delayed execution of a ScheduledForkJoinTask via the
* DelayScheduler, creating and starting it if necessary.
* @return the task
*/
final <T> ScheduledForkJoinTask<T> scheduleDelayedTask(ScheduledForkJoinTask<T> task) {
DelayScheduler ds;
if (((ds = delayScheduler) == null &&
(ds = startDelayScheduler()) == null) ||
(runState & SHUTDOWN) != 0L)
throw new RejectedExecutionException();
ds.pend(task);
return task;
}
/**
* Submits a one-shot task that becomes enabled after the given
* delay. At that point it will execute unless explicitly
* cancelled, or fail to execute (eventually reporting
* cancellation) when encountering resource exhaustion, or the
* pool is {@link #shutdownNow}, or is {@link #shutdown} when
* otherwise quiescent and {@link #cancelDelayedTasksOnShutdown}
* is in effect.
*
* @param command the task to execute
* @param delay the time from now to delay execution
* @param unit the time unit of the delay parameter
* @return a ForkJoinTask implementing the ScheduledFuture
* interface, whose {@code get()} method will return
* {@code null} upon normal completion.
* @throws RejectedExecutionException if the pool is shutdown or
* submission encounters resource exhaustion.
* @throws NullPointerException if command or unit is null
* @since 25
*/
public ScheduledFuture<?> schedule(Runnable command,
long delay, TimeUnit unit) {
return scheduleDelayedTask(
new ScheduledForkJoinTask<Void>(
unit.toNanos(delay), 0L, false, // implicit null check of unit
Objects.requireNonNull(command), null, this));
}
/**
* Submits a value-returning one-shot task that becomes enabled
* after the given delay. At that point it will execute unless
* explicitly cancelled, or fail to execute (eventually reporting
* cancellation) when encountering resource exhaustion, or the
* pool is {@link #shutdownNow}, or is {@link #shutdown} when
* otherwise quiescent and {@link #cancelDelayedTasksOnShutdown}
* is in effect.
*
* @param callable the function to execute
* @param delay the time from now to delay execution
* @param unit the time unit of the delay parameter
* @param <V> the type of the callable's result
* @return a ForkJoinTask implementing the ScheduledFuture
* interface, whose {@code get()} method will return the
* value from the callable upon normal completion.
* @throws RejectedExecutionException if the pool is shutdown or
* submission encounters resource exhaustion.
* @throws NullPointerException if command or unit is null
* @since 25
*/
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay, TimeUnit unit) {
return scheduleDelayedTask(
new ScheduledForkJoinTask<V>(
unit.toNanos(delay), 0L, false, null, // implicit null check of unit
Objects.requireNonNull(callable), this));
}
/**
* Submits a periodic action that becomes enabled first after the
* given initial delay, and subsequently with the given period;
* that is, executions will commence after
* {@code initialDelay}, then {@code initialDelay + period}, then
* {@code initialDelay + 2 * period}, and so on.
*
* <p>The sequence of task executions continues indefinitely until
* one of the following exceptional completions occur:
* <ul>
* <li>The task is {@linkplain Future#cancel explicitly cancelled}
* <li>Method {@link #shutdownNow} is called
* <li>Method {@link #shutdown} is called and the pool is
* otherwise quiescent, in which case existing executions continue
* but subsequent executions do not.
* <li>An execution or the task encounters resource exhaustion.
* <li>An execution of the task throws an exception. In this case
* calling {@link Future#get() get} on the returned future will throw
* {@link ExecutionException}, holding the exception as its cause.
* </ul>
* Subsequent executions are suppressed. Subsequent calls to
* {@link Future#isDone isDone()} on the returned future will
* return {@code true}.
*
* <p>If any execution of this task takes longer than its period, then
* subsequent executions may start late, but will not concurrently
* execute.
* @param command the task to execute
* @param initialDelay the time to delay first execution
* @param period the period between successive executions
* @param unit the time unit of the initialDelay and period parameters
* @return a ForkJoinTask implementing the ScheduledFuture
* interface. The future's {@link Future#get() get()}
* method will never return normally, and will throw an
* exception upon task cancellation or abnormal
* termination of a task execution.
* @throws RejectedExecutionException if the pool is shutdown or
* submission encounters resource exhaustion.
* @throws NullPointerException if command or unit is null
* @throws IllegalArgumentException if period less than or equal to zero
* @since 25
*/
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period, TimeUnit unit) {
if (period <= 0L)
throw new IllegalArgumentException();
return scheduleDelayedTask(
new ScheduledForkJoinTask<Void>(
unit.toNanos(initialDelay), // implicit null check of unit
unit.toNanos(period), false,
Objects.requireNonNull(command), null, this));
}
/**
* Submits a periodic action that becomes enabled first after the
* given initial delay, and subsequently with the given delay
* between the termination of one execution and the commencement of
* the next.
* <p>The sequence of task executions continues indefinitely until
* one of the following exceptional completions occur:
* <ul>
* <li>The task is {@linkplain Future#cancel explicitly cancelled}
* <li>Method {@link #shutdownNow} is called
* <li>Method {@link #shutdown} is called and the pool is
* otherwise quiescent, in which case existing executions continue
* but subsequent executions do not.
* <li>An execution or the task encounters resource exhaustion.
* <li>An execution of the task throws an exception. In this case
* calling {@link Future#get() get} on the returned future will throw
* {@link ExecutionException}, holding the exception as its cause.
* </ul>
* Subsequent executions are suppressed. Subsequent calls to
* {@link Future#isDone isDone()} on the returned future will
* return {@code true}.
* @param command the task to execute
* @param initialDelay the time to delay first execution
* @param delay the delay between the termination of one
* execution and the commencement of the next
* @param unit the time unit of the initialDelay and delay parameters
* @return a ForkJoinTask implementing the ScheduledFuture
* interface. The future's {@link Future#get() get()}
* method will never return normally, and will throw an
* exception upon task cancellation or abnormal
* termination of a task execution.
* @throws RejectedExecutionException if the pool is shutdown or
* submission encounters resource exhaustion.
* @throws NullPointerException if command or unit is null
* @throws IllegalArgumentException if delay less than or equal to zero
* @since 25
*/
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay, TimeUnit unit) {
if (delay <= 0L)
throw new IllegalArgumentException();
return scheduleDelayedTask(
new ScheduledForkJoinTask<Void>(
unit.toNanos(initialDelay), // implicit null check of unit
-unit.toNanos(delay), false, // negative for fixed delay
Objects.requireNonNull(command), null, this));
}
/**
* Body of a task performed on timeout of another task
*/
static final class TimeoutAction<V> implements Runnable {
// set after construction, nulled after use
ForkJoinTask.CallableWithTimeout<V> task;
Consumer<? super ForkJoinTask<V>> action;
TimeoutAction(Consumer<? super ForkJoinTask<V>> action) {
this.action = action;
}
public void run() {
ForkJoinTask.CallableWithTimeout<V> t = task;
Consumer<? super ForkJoinTask<V>> a = action;
task = null;
action = null;
if (t != null && t.status >= 0) {
if (a == null)
t.cancel(true);
else {
a.accept(t);
t.interruptIfRunning(true);
}
}
}
}
/**
* Submits a task executing the given function, cancelling the
* task or performing a given timeoutAction if not completed
* within the given timeout period. If the optional {@code
* timeoutAction} is null, the task is cancelled (via {@code
* cancel(true)}. Otherwise, the action is applied and the task
* may be interrupted if running. Actions may include {@link
* ForkJoinTask#complete} to set a replacement value or {@link
* ForkJoinTask#completeExceptionally} to throw an appropriate
* exception. Note that these can succeed only if the task has
* not already completed when the timeoutAction executes.
*
* @param callable the function to execute
* @param <V> the type of the callable's result
* @param timeout the time to wait before cancelling if not completed
* @param timeoutAction if nonnull, an action to perform on
* timeout, otherwise the default action is to cancel using
* {@code cancel(true)}.
* @param unit the time unit of the timeout parameter
* @return a Future that can be used to extract result or cancel
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
* @throws NullPointerException if callable or unit is null
* @since 25
*/
public <V> ForkJoinTask<V> submitWithTimeout(Callable<V> callable,
long timeout, TimeUnit unit,
Consumer<? super ForkJoinTask<V>> timeoutAction) {
ForkJoinTask.CallableWithTimeout<V> task; TimeoutAction<V> onTimeout;
Objects.requireNonNull(callable);
ScheduledForkJoinTask<Void> timeoutTask =
new ScheduledForkJoinTask<Void>(
unit.toNanos(timeout), 0L, true,
onTimeout = new TimeoutAction<V>(timeoutAction), null, this);
onTimeout.task = task =
new ForkJoinTask.CallableWithTimeout<V>(callable, timeoutTask);
scheduleDelayedTask(timeoutTask);
return poolSubmit(true, task);
}
/**
* Arranges that scheduled tasks that are not executing and have
* not already been enabled for execution will not be executed and
* will be cancelled upon {@link #shutdown} (unless this pool is
* the {@link #commonPool()} which never shuts down). This method
* may be invoked either before {@link #shutdown} to take effect
* upon the next call, or afterwards to cancel such tasks, which
* may then allow termination. Note that subsequent executions of
* periodic tasks are always disabled upon shutdown, so this
* method applies meaningfully only to non-periodic tasks.
* @since 25
*/
public void cancelDelayedTasksOnShutdown() {
DelayScheduler ds;
if ((ds = delayScheduler) != null ||
(ds = startDelayScheduler()) != null)
ds.cancelDelayedTasksOnShutdown();
}
/**
* Returns the factory used for constructing new workers.
*
@ -3485,14 +3883,16 @@ public class ForkJoinPool extends AbstractExecutorService {
* to the pool that have not begun executing). This value is only
* an approximation, obtained by iterating across all threads in
* the pool. This method may be useful for tuning task
* granularities.
* granularities.The returned count does not include scheduled
* tasks that are not yet ready to execute, which are reported
* separately by method {@link getDelayedTaskCount}.
*
* @return the number of queued tasks
* @see ForkJoinWorkerThread#getQueuedTaskCount()
*/
public long getQueuedTaskCount() {
WorkQueue[] qs; WorkQueue q;
int count = 0;
long count = 0;
if ((runState & TERMINATED) == 0L && (qs = queues) != null) {
for (int i = 1; i < qs.length; i += 2) {
if ((q = qs[i]) != null)
@ -3521,6 +3921,20 @@ public class ForkJoinPool extends AbstractExecutorService {
return count;
}
/**
* Returns an estimate of the number of delayed (including
* periodic) tasks scheduled in this pool that are not yet ready
* to submit for execution. The returned value is inaccurate while
* delayed tasks are being processed.
*
* @return an estimate of the number of delayed tasks
* @since 25
*/
public long getDelayedTaskCount() {
DelayScheduler ds;
return ((ds = delayScheduler) == null ? 0 : ds.lastStableSize());
}
/**
* Returns {@code true} if there are any tasks submitted to this
* pool that have not yet begun executing.
@ -3584,6 +3998,7 @@ public class ForkJoinPool extends AbstractExecutorService {
*/
public String toString() {
// Use a single pass through queues to collect counts
DelayScheduler ds;
long e = runState;
long st = stealCount;
long qt = 0L, ss = 0L; int rc = 0;
@ -3603,7 +4018,8 @@ public class ForkJoinPool extends AbstractExecutorService {
}
}
}
String delayed = ((ds = delayScheduler) == null ? "" :
", delayed = " + ds.lastStableSize());
int pc = parallelism;
long c = ctl;
int tc = (short)(c >>> TC_SHIFT);
@ -3623,6 +4039,7 @@ public class ForkJoinPool extends AbstractExecutorService {
", steals = " + st +
", tasks = " + qt +
", submissions = " + ss +
delayed +
"]";
}
@ -3948,6 +4365,7 @@ public class ForkJoinPool extends AbstractExecutorService {
@Override
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
Objects.requireNonNull(runnable);
return (Thread.currentThread() instanceof ForkJoinWorkerThread) ?
new ForkJoinTask.AdaptedRunnable<T>(runnable, value) :
new ForkJoinTask.AdaptedInterruptibleRunnable<T>(runnable, value);
@ -3955,6 +4373,7 @@ public class ForkJoinPool extends AbstractExecutorService {
@Override
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
Objects.requireNonNull(callable);
return (Thread.currentThread() instanceof ForkJoinWorkerThread) ?
new ForkJoinTask.AdaptedCallable<T>(callable) :
new ForkJoinTask.AdaptedInterruptibleCallable<T>(callable);
@ -3982,7 +4401,6 @@ public class ForkJoinPool extends AbstractExecutorService {
if ((scale & (scale - 1)) != 0)
throw new Error("array index scale not a power of two");
interruptibleTaskClass = ForkJoinTask.InterruptibleTask.class;
Class<?> dep = LockSupport.class; // ensure loaded
// allow access to non-public methods
JLA = SharedSecrets.getJavaLangAccess();

View File

@ -273,6 +273,8 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
static final int ABNORMAL = 1 << 16;
static final int THROWN = 1 << 17;
static final int HAVE_EXCEPTION = DONE | ABNORMAL | THROWN;
static final int NUH_BIT = 24; // no external caller helping
static final int NO_USER_HELP = 1 << NUH_BIT;
static final int MARKER = 1 << 30; // utility marker
static final int SMASK = 0xffff; // short bits for tags
static final int UNCOMPENSATE = 1 << 16; // helpJoin sentinel
@ -292,6 +294,12 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
private boolean casStatus(int c, int v) {
return U.compareAndSetInt(this, STATUS, c, v);
}
final int noUserHelp() { // nonvolatile read; return 0 or 1
return (U.getInt(this, STATUS) & NO_USER_HELP) >>> NUH_BIT;
}
final void setNoUserHelp() { // for use in constructors only
U.putInt(this, STATUS, NO_USER_HELP);
}
// Support for waiting and signalling
@ -330,14 +338,9 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
*/
final int trySetCancelled() {
int s;
for (;;) {
if ((s = status) < 0)
break;
if (casStatus(s, s | (DONE | ABNORMAL))) {
signalWaiters();
break;
}
}
if ((s = status) >= 0 &&
(s = getAndBitwiseOrStatus(DONE | ABNORMAL)) >= 0)
signalWaiters();
return s;
}
@ -481,7 +484,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
*/
private int awaitDone(boolean interruptible, long deadline) {
ForkJoinWorkerThread wt; ForkJoinPool p; ForkJoinPool.WorkQueue q;
Thread t; boolean internal; int s;
Thread t; boolean internal; int s, ss;
if (internal =
(t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
p = (wt = (ForkJoinWorkerThread)t).pool;
@ -492,7 +495,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
return (((s = (p == null) ? 0 :
((this instanceof CountedCompleter) ?
p.helpComplete(this, q, internal) :
(this instanceof InterruptibleTask) && !internal ? status :
!internal && ((ss = status) & NO_USER_HELP) != 0 ? ss :
p.helpJoin(this, q, internal))) < 0)) ? s :
awaitDone(internal ? p : null, s, interruptible, deadline);
}
@ -642,7 +645,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
p = wt.pool;
}
else
q = (p = ForkJoinPool.common).externalSubmissionQueue();
q = (p = ForkJoinPool.common).externalSubmissionQueue(false);
q.push(this, p, internal);
return this;
}
@ -1160,7 +1163,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
*/
public void reinitialize() {
aux = null;
status = 0;
status &= NO_USER_HELP;
}
/**
@ -1414,7 +1417,8 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
* @return the task
*/
public static ForkJoinTask<?> adapt(Runnable runnable) {
return new AdaptedRunnableAction(runnable);
return new AdaptedRunnableAction(
Objects.requireNonNull(runnable));
}
/**
@ -1428,7 +1432,8 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
* @return the task
*/
public static <T> ForkJoinTask<T> adapt(Runnable runnable, T result) {
return new AdaptedRunnable<T>(runnable, result);
return new AdaptedRunnable<T>(
Objects.requireNonNull(runnable), result);
}
/**
@ -1442,7 +1447,8 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
* @return the task
*/
public static <T> ForkJoinTask<T> adapt(Callable<? extends T> callable) {
return new AdaptedCallable<T>(callable);
return new AdaptedCallable<T>(
Objects.requireNonNull(callable));
}
/**
@ -1460,7 +1466,8 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
* @since 19
*/
public static <T> ForkJoinTask<T> adaptInterruptible(Callable<? extends T> callable) {
return new AdaptedInterruptibleCallable<T>(callable);
return new AdaptedInterruptibleCallable<T>(
Objects.requireNonNull(callable));
}
/**
@ -1479,7 +1486,8 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
* @since 22
*/
public static <T> ForkJoinTask<T> adaptInterruptible(Runnable runnable, T result) {
return new AdaptedInterruptibleRunnable<T>(runnable, result);
return new AdaptedInterruptibleRunnable<T>(
Objects.requireNonNull(runnable), result);
}
/**
@ -1497,7 +1505,8 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
* @since 22
*/
public static ForkJoinTask<?> adaptInterruptible(Runnable runnable) {
return new AdaptedInterruptibleRunnable<Void>(runnable, null);
return new AdaptedInterruptibleRunnable<Void>(
Objects.requireNonNull(runnable), null);
}
// Serialization support
@ -1556,7 +1565,6 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
@SuppressWarnings("serial") // Conditionally serializable
T result;
AdaptedRunnable(Runnable runnable, T result) {
Objects.requireNonNull(runnable);
this.runnable = runnable;
this.result = result; // OK to set this even before completion
}
@ -1578,7 +1586,6 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
@SuppressWarnings("serial") // Conditionally serializable
final Runnable runnable;
AdaptedRunnableAction(Runnable runnable) {
Objects.requireNonNull(runnable);
this.runnable = runnable;
}
public final Void getRawResult() { return null; }
@ -1601,7 +1608,6 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
@SuppressWarnings("serial") // Conditionally serializable
T result;
AdaptedCallable(Callable<? extends T> callable) {
Objects.requireNonNull(callable);
this.callable = callable;
}
public final T getRawResult() { return result; }
@ -1636,6 +1642,9 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
abstract static class InterruptibleTask<T> extends ForkJoinTask<T>
implements RunnableFuture<T> {
transient volatile Thread runner;
InterruptibleTask() {
setNoUserHelp();
}
abstract T compute() throws Exception;
public final boolean exec() {
Thread.interrupted();
@ -1655,20 +1664,29 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
} finally {
runner = null;
}
return postExec();
}
boolean postExec() { // cleanup and return completion status to doExec
return true;
}
final boolean interruptIfRunning(boolean enabled) {
Thread t;
if ((t = runner) == null) // return false if not running
return false;
if (enabled) {
try {
t.interrupt();
} catch (Throwable ignore) {
}
}
return true;
}
public boolean cancel(boolean mayInterruptIfRunning) {
Thread t;
if (trySetCancelled() >= 0) {
if (mayInterruptIfRunning && (t = runner) != null) {
try {
t.interrupt();
} catch (Throwable ignore) {
}
}
return true;
}
return isCancelled();
int s;
if ((s = trySetCancelled()) < 0)
return ((s & (ABNORMAL | THROWN)) == ABNORMAL);
interruptIfRunning(mayInterruptIfRunning);
return true;
}
public final void run() { quietlyInvoke(); }
Object adaptee() { return null; } // for printing and diagnostics
@ -1690,7 +1708,6 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
@SuppressWarnings("serial") // Conditionally serializable
T result;
AdaptedInterruptibleCallable(Callable<? extends T> callable) {
Objects.requireNonNull(callable);
this.callable = callable;
}
public final T getRawResult() { return result; }
@ -1709,7 +1726,6 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
@SuppressWarnings("serial") // Conditionally serializable
final T result;
AdaptedInterruptibleRunnable(Runnable runnable, T result) {
Objects.requireNonNull(runnable);
this.runnable = runnable;
this.result = result;
}
@ -1727,7 +1743,6 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
@SuppressWarnings("serial") // Conditionally serializable
final Runnable runnable;
RunnableExecuteAction(Runnable runnable) {
Objects.requireNonNull(runnable);
this.runnable = runnable;
}
public final Void getRawResult() { return null; }
@ -1793,9 +1808,11 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
throw new NullPointerException();
InvokeAnyTask<T> t = null; // list of submitted tasks
try {
for (Callable<T> c : tasks)
for (Callable<T> c : tasks) {
Objects.requireNonNull(c);
pool.execute((ForkJoinTask<?>)
(t = new InvokeAnyTask<T>(c, this, t)));
}
return timed ? get(nanos, TimeUnit.NANOSECONDS) : get();
} finally {
for (; t != null; t = t.pred)
@ -1822,7 +1839,6 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
final InvokeAnyTask<T> pred; // to traverse on cancellation
InvokeAnyTask(Callable<T> callable, InvokeAnyRoot<T> root,
InvokeAnyTask<T> pred) {
Objects.requireNonNull(callable);
this.callable = callable;
this.root = root;
this.pred = pred;
@ -1857,4 +1873,39 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
public final void setRawResult(Void v) { }
final Object adaptee() { return callable; }
}
/**
* Adapter for Callable-based interruptible tasks with timeout actions.
*/
@SuppressWarnings("serial") // Conditionally serializable
static final class CallableWithTimeout<T> extends InterruptibleTask<T> {
Callable<? extends T> callable; // nulled out after use
ForkJoinTask<?> timeoutAction;
T result;
CallableWithTimeout(Callable<? extends T> callable,
ForkJoinTask<?> timeoutAction) {
this.callable = callable;
this.timeoutAction = timeoutAction;
}
public final T getRawResult() { return result; }
public final void setRawResult(T v) { result = v; }
final Object adaptee() { return callable; }
final T compute() throws Exception {
Callable<? extends T> c;
return ((c = callable) != null) ? c.call() : null;
}
final boolean postExec() { // cancel timeout action
ForkJoinTask<?> t;
callable = null;
if ((t = timeoutAction) != null) {
timeoutAction = null;
try {
t.cancel(false);
} catch (Error | RuntimeException ex) {
}
}
return true;
}
}
}

View File

@ -206,22 +206,6 @@ public class SubmissionPublisher<T> implements Publisher<T>,
(n >= BUFFER_CAPACITY_LIMIT) ? BUFFER_CAPACITY_LIMIT : n + 1;
}
// default Executor setup; nearly the same as CompletableFuture
/**
* Default executor -- ForkJoinPool.commonPool() unless it cannot
* support parallelism.
*/
private static final Executor ASYNC_POOL =
(ForkJoinPool.getCommonPoolParallelism() > 1) ?
ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
/** Fallback if ForkJoinPool.commonPool() cannot support parallelism */
private static final class ThreadPerTaskExecutor implements Executor {
ThreadPerTaskExecutor() {} // prevent access constructor creation
public void execute(Runnable r) { new Thread(r).start(); }
}
/**
* Clients (BufferedSubscriptions) are maintained in a linked list
* (via their "next" fields). This works well for publish loops.
@ -316,7 +300,7 @@ public class SubmissionPublisher<T> implements Publisher<T>,
* Flow.Subscriber#onNext(Object) onNext}.
*/
public SubmissionPublisher() {
this(ASYNC_POOL, Flow.defaultBufferSize(), null);
this(ForkJoinPool.asyncCommonPool(), Flow.defaultBufferSize(), null);
}
/**

View File

@ -45,6 +45,7 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.DoubleStream;
import java.util.stream.IntStream;
import java.util.stream.LongStream;
import jdk.internal.access.JavaLangAccess;
import jdk.internal.access.JavaUtilConcurrentTLRAccess;
import jdk.internal.access.SharedSecrets;
import jdk.internal.util.random.RandomSupport;
@ -158,11 +159,19 @@ public final class ThreadLocalRandom extends Random {
* rely on (static) atomic generators to initialize the values.
*/
static final void localInit() {
int p = probeGenerator.addAndGet(PROBE_INCREMENT);
int probe = (p == 0) ? 1 : p; // skip 0
long seed = RandomSupport.mixMurmur64(seeder.getAndAdd(SEEDER_INCREMENT));
Thread t = Thread.currentThread();
Thread t = Thread.currentThread(), carrier;
U.putLong(t, SEED, seed);
int probe = 0; // if virtual, share probe with carrier
if ((carrier = JLA.currentCarrierThread()) != t &&
(probe = U.getInt(carrier, PROBE)) == 0) {
seed = RandomSupport.mixMurmur64(seeder.getAndAdd(SEEDER_INCREMENT));
U.putLong(carrier, SEED, seed);
}
if (probe == 0 && (probe = probeGenerator.addAndGet(PROBE_INCREMENT)) == 0)
probe = 1; // skip 0
if (carrier != t)
U.putInt(carrier, PROBE, probe);
U.putInt(t, PROBE, probe);
}
@ -233,16 +242,18 @@ public final class ThreadLocalRandom extends Random {
* the classes that use them. Briefly, a thread's "probe" value is
* a non-zero hash code that (probably) does not collide with
* other existing threads with respect to any power of two
* collision space. When it does collide, it is pseudo-randomly
* adjusted (using a Marsaglia XorShift). The nextSecondarySeed
* method is used in the same contexts as ThreadLocalRandom, but
* only for transient usages such as random adaptive spin/block
* sequences for which a cheap RNG suffices and for which it could
* in principle disrupt user-visible statistical properties of the
* main ThreadLocalRandom if we were to use it.
* collision space, based on carrier threads in the case of
* VirtualThreads to reduce the expected collision rate. When it
* does collide, it is pseudo-randomly adjusted (using a Marsaglia
* XorShift). The nextSecondarySeed method is used in the same
* contexts as ThreadLocalRandom, but only for transient usages
* such as random adaptive spin/block sequences for which a cheap
* RNG suffices and for which it could in principle disrupt
* user-visible statistical properties of the main
* ThreadLocalRandom if we were to use it.
*
* Note: Because of package-protection issues, versions of some
* these methods also appear in some subpackage classes.
* Note: jdk SharedSecrets are used enable use in jdk classes
* outside this package.
*/
/**
@ -251,7 +262,7 @@ public final class ThreadLocalRandom extends Random {
* can be used to force initialization on zero return.
*/
static final int getProbe() {
return U.getInt(Thread.currentThread(), PROBE);
return U.getInt(JLA.currentCarrierThread(), PROBE);
}
/**
@ -262,7 +273,7 @@ public final class ThreadLocalRandom extends Random {
probe ^= probe << 13; // xorshift
probe ^= probe >>> 17;
probe ^= probe << 5;
U.putInt(Thread.currentThread(), PROBE, probe);
U.putInt(JLA.currentCarrierThread(), PROBE, probe);
return probe;
}
@ -378,6 +389,8 @@ public final class ThreadLocalRandom extends Random {
= new AtomicLong(RandomSupport.mixMurmur64(System.currentTimeMillis()) ^
RandomSupport.mixMurmur64(System.nanoTime()));
private static final JavaLangAccess JLA = SharedSecrets.getJavaLangAccess();
// used by ScopedValue
private static class Access {
static {
@ -386,6 +399,12 @@ public final class ThreadLocalRandom extends Random {
public int nextSecondaryThreadLocalRandomSeed() {
return nextSecondarySeed();
}
public int getThreadLocalRandomProbe() {
return getProbe();
}
public int advanceThreadLocalRandomProbe(int r) {
return advanceProbe(r);
}
}
);
}

View File

@ -43,6 +43,8 @@ import java.util.Arrays;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.DoubleBinaryOperator;
import java.util.function.LongBinaryOperator;
import jdk.internal.access.SharedSecrets;
import jdk.internal.access.JavaUtilConcurrentTLRAccess;
/**
* A package-local class holding common representation and mechanics
@ -188,24 +190,18 @@ abstract class Striped64 extends Number {
}
/**
* Returns the probe value for the current thread.
* Duplicated from ThreadLocalRandom because of packaging restrictions.
* Returns the ThreadLocalRandom probe value for the current carrier thread.
*/
static final int getProbe() {
return (int) THREAD_PROBE.get(Thread.currentThread());
return TLR.getThreadLocalRandomProbe();
}
/**
* Pseudo-randomly advances and records the given probe value for the
* given thread.
* Duplicated from ThreadLocalRandom because of packaging restrictions.
* given carrier thread.
*/
static final int advanceProbe(int probe) {
probe ^= probe << 13; // xorshift
probe ^= probe >>> 17;
probe ^= probe << 5;
THREAD_PROBE.set(Thread.currentThread(), probe);
return probe;
return TLR.advanceThreadLocalRandomProbe(probe);
}
/**
@ -371,21 +367,17 @@ abstract class Striped64 extends Number {
}
}
private static final JavaUtilConcurrentTLRAccess TLR =
SharedSecrets.getJavaUtilConcurrentTLRAccess();
// VarHandle mechanics
private static final VarHandle BASE;
private static final VarHandle CELLSBUSY;
private static final VarHandle THREAD_PROBE;
static {
MethodHandles.Lookup l1 = MethodHandles.lookup();
BASE = MhUtil.findVarHandle(l1, "base", long.class);
CELLSBUSY = MhUtil.findVarHandle(l1, "cellsBusy", int.class);
try {
MethodHandles.Lookup l2 = MethodHandles.privateLookupIn(Thread.class, l1);
THREAD_PROBE = MhUtil.findVarHandle(l2, "threadLocalRandomProbe", int.class);
} catch (ReflectiveOperationException e) {
throw new ExceptionInInitializerError(e);
}
}
}

View File

@ -27,4 +27,6 @@ package jdk.internal.access;
public interface JavaUtilConcurrentTLRAccess {
int nextSecondaryThreadLocalRandomSeed();
int getThreadLocalRandomProbe();
int advanceThreadLocalRandomProbe(int r);
}

View File

@ -30,39 +30,25 @@
*/
import java.time.Duration;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertEquals;
class CompletableFutureOrTimeoutExceptionallyTest {
static final BlockingQueue<Runnable> delayerQueue;
static {
try {
var delayerClass = Class.forName("java.util.concurrent.CompletableFuture$Delayer",
true,
CompletableFuture.class.getClassLoader());
var delayerField = delayerClass.getDeclaredField("delayer");
delayerField.setAccessible(true);
delayerQueue = ((ScheduledThreadPoolExecutor)delayerField.get(null)).getQueue();
} catch (Throwable t) {
throw new ExceptionInInitializerError(t);
}
}
// updated February 2025 to adapt to CompletableFuture DelayScheduler changes
/**
* Test that orTimeout task is cancelled if the CompletableFuture is completed Exceptionally
*/
@Test
void testOrTimeoutWithCompleteExceptionallyDoesNotLeak() throws InterruptedException {
assertTrue(delayerQueue.peek() == null);
ForkJoinPool delayer = ForkJoinPool.commonPool();
assertEquals(delayer.getDelayedTaskCount(), 0);
var future = new CompletableFuture<>().orTimeout(12, TimeUnit.HOURS);
assertTrue(delayerQueue.peek() != null);
future.completeExceptionally(new RuntimeException("This is fine"));
while (delayerQueue.peek() != null) {
while (delayer.getDelayedTaskCount() > 0) {
Thread.sleep(100);
};
}
@ -72,12 +58,13 @@ class CompletableFutureOrTimeoutExceptionallyTest {
*/
@Test
void testCompleteOnTimeoutWithCompleteExceptionallyDoesNotLeak() throws InterruptedException {
assertTrue(delayerQueue.peek() == null);
ForkJoinPool delayer = ForkJoinPool.commonPool();
assertEquals(delayer.getDelayedTaskCount(), 0);
var future = new CompletableFuture<>().completeOnTimeout(null, 12, TimeUnit.HOURS);
assertTrue(delayerQueue.peek() != null);
future.completeExceptionally(new RuntimeException("This is fine"));
while (delayerQueue.peek() != null) {
while (delayer.getDelayedTaskCount() > 0) {
Thread.sleep(100);
};
}
}

View File

@ -32,6 +32,7 @@
* http://creativecommons.org/publicdomain/zero/1.0/
*/
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.concurrent.CompletableFuture.completedFuture;
@ -101,7 +102,7 @@ public class CompletableFutureTest extends JSR166TestCase {
assertNull(result);
try {
f.get(randomExpiredTimeout(), randomTimeUnit());
f.get(1, NANOSECONDS);
shouldThrow();
}
catch (TimeoutException success) {}
@ -658,8 +659,6 @@ public class CompletableFutureTest extends JSR166TestCase {
}
}
static final boolean defaultExecutorIsCommonPool
= ForkJoinPool.getCommonPoolParallelism() > 1;
/**
* Permits the testing of parallel code for the 3 different
@ -750,8 +749,7 @@ public class CompletableFutureTest extends JSR166TestCase {
},
ASYNC {
public void checkExecutionMode() {
mustEqual(defaultExecutorIsCommonPool,
(ForkJoinPool.commonPool() == ForkJoinTask.getPool()));
mustEqual(ForkJoinPool.commonPool(), ForkJoinTask.getPool());
}
public CompletableFuture<Void> runAsync(Runnable a) {
return CompletableFuture.runAsync(a);
@ -3794,10 +3792,7 @@ public class CompletableFutureTest extends JSR166TestCase {
CompletableFuture<Item> f = new CompletableFuture<>();
Executor e = f.defaultExecutor();
Executor c = ForkJoinPool.commonPool();
if (ForkJoinPool.getCommonPoolParallelism() > 1)
assertSame(e, c);
else
assertNotSame(e, c);
assertSame(e, c);
}
/**

View File

@ -31,13 +31,35 @@
* http://creativecommons.org/publicdomain/zero/1.0/
*/
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.ForkJoinWorkerThread;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
import java.util.stream.Stream;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import junit.framework.Test;
import junit.framework.TestSuite;
@ -216,4 +238,412 @@ public class ForkJoinPool20Test extends JSR166TestCase {
}
return worker;
}
// additions for ScheduledExecutorService
/**
* delayed schedule of callable successfully executes after delay
*/
public void testSchedule1() throws Exception {
final ForkJoinPool p = new ForkJoinPool(2);
try (PoolCleaner cleaner = cleaner(p)) {
final long startTime = System.nanoTime();
final CountDownLatch done = new CountDownLatch(1);
Callable<Boolean> task = new CheckedCallable<>() {
public Boolean realCall() {
done.countDown();
assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
return Boolean.TRUE;
}};
Future<Boolean> f = p.schedule(task, timeoutMillis(), MILLISECONDS);
assertSame(Boolean.TRUE, f.get());
assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
assertEquals(0L, done.getCount());
}
}
/**
* delayed schedule of callable successfully executes after delay
* even if shutdown.
*/
public void testSchedule1b() throws Exception {
final ForkJoinPool p = new ForkJoinPool(2);
try (PoolCleaner cleaner = cleaner(p)) {
final long startTime = System.nanoTime();
final CountDownLatch done = new CountDownLatch(1);
Callable<Boolean> task = new CheckedCallable<>() {
public Boolean realCall() {
done.countDown();
assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
return Boolean.TRUE;
}};
Future<Boolean> f = p.schedule(task, timeoutMillis(), MILLISECONDS);
p.shutdown();
assertSame(Boolean.TRUE, f.get());
assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
assertEquals(0L, done.getCount());
}
}
/**
* delayed schedule of runnable successfully executes after delay
*/
public void testSchedule3() throws Exception {
final ForkJoinPool p = new ForkJoinPool(2);
try (PoolCleaner cleaner = cleaner(p)) {
final long startTime = System.nanoTime();
final CountDownLatch done = new CountDownLatch(1);
Runnable task = new CheckedRunnable() {
public void realRun() {
done.countDown();
assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
}};
Future<?> f = p.schedule(task, timeoutMillis(), MILLISECONDS);
await(done);
assertNull(f.get(LONG_DELAY_MS, MILLISECONDS));
assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
}
}
/**
* scheduleAtFixedRate executes runnable after given initial delay
*/
public void testSchedule4() throws Exception {
final ForkJoinPool p = new ForkJoinPool(2);
try (PoolCleaner cleaner = cleaner(p)) {
final long startTime = System.nanoTime();
final CountDownLatch done = new CountDownLatch(1);
Runnable task = new CheckedRunnable() {
public void realRun() {
done.countDown();
assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
}};
ScheduledFuture<?> f =
p.scheduleAtFixedRate(task, timeoutMillis(),
LONG_DELAY_MS, MILLISECONDS);
await(done);
assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
f.cancel(true);
}
}
/**
* scheduleAtFixedRate with 0 initial delay re-rexecutes
*/
public void testSchedule4a() throws Exception {
final ForkJoinPool p = new ForkJoinPool(2);
try (PoolCleaner cleaner = cleaner(p)) {
final long startTime = System.nanoTime();
final CountDownLatch done = new CountDownLatch(2);
Runnable task = new Runnable() {
public void run() {
done.countDown();
}};
ScheduledFuture<?> f =
p.scheduleAtFixedRate(task, 0L, timeoutMillis(),
MILLISECONDS);
await(done);
f.cancel(true);
assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
}
}
/**
* scheduleWithFixedDelay executes runnable after given initial delay
*/
public void testSchedule5() throws Exception {
final ForkJoinPool p = new ForkJoinPool(2);
try (PoolCleaner cleaner = cleaner(p)) {
final long startTime = System.nanoTime();
final CountDownLatch done = new CountDownLatch(1);
Runnable task = new CheckedRunnable() {
public void realRun() {
done.countDown();
assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
}};
ScheduledFuture<?> f =
p.scheduleWithFixedDelay(task, timeoutMillis(),
LONG_DELAY_MS, MILLISECONDS);
await(done);
assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
f.cancel(true);
}
}
/**
* scheduleWithFixedDelay with 0 initial delay re-rexecutes
*/
public void testSchedule5a() throws Exception {
final ForkJoinPool p = new ForkJoinPool(2);
try (PoolCleaner cleaner = cleaner(p)) {
final long startTime = System.nanoTime();
final CountDownLatch done = new CountDownLatch(2);
Runnable task = new Runnable() {
public void run() {
done.countDown();
}};
ScheduledFuture<?> f =
p.scheduleWithFixedDelay(task, 0L, timeoutMillis(),
MILLISECONDS);
await(done);
assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
f.cancel(true);
}
}
static class RunnableCounter implements Runnable {
AtomicInteger count = new AtomicInteger(0);
public void run() { count.getAndIncrement(); }
}
/**
* scheduleAtFixedRate executes series of tasks at given rate.
* Eventually, it must hold that:
* cycles - 1 <= elapsedMillis/delay < cycles
* Additionally, periodic tasks are not run after shutdown.
*/
public void testFixedRateSequence() throws InterruptedException {
final ForkJoinPool p = new ForkJoinPool(4);
try (PoolCleaner cleaner = cleaner(p)) {
for (int delay = 1; delay <= LONG_DELAY_MS; delay *= 3) {
final long startTime = System.nanoTime();
final int cycles = 8;
final CountDownLatch done = new CountDownLatch(cycles);
final Runnable task = new CheckedRunnable() {
public void realRun() { done.countDown(); }};
final ScheduledFuture<?> periodicTask =
p.scheduleAtFixedRate(task, 0, delay, MILLISECONDS);
final int totalDelayMillis = (cycles - 1) * delay;
await(done, totalDelayMillis + LONG_DELAY_MS);
final long elapsedMillis = millisElapsedSince(startTime);
assertTrue(elapsedMillis >= totalDelayMillis);
if (elapsedMillis <= cycles * delay)
return;
periodicTask.cancel(true); // retry with longer delay
}
fail("unexpected execution rate");
}
}
/**
* scheduleWithFixedDelay executes series of tasks with given
* period. Eventually, it must hold that each task starts at
* least delay and at most 2 * delay after the termination of the
* previous task. Additionally, periodic tasks are not run after
* shutdown.
*/
public void testFixedDelaySequence() throws InterruptedException {
final ForkJoinPool p = new ForkJoinPool(1);
try (PoolCleaner cleaner = cleaner(p)) {
for (int delay = 1; delay <= LONG_DELAY_MS; delay *= 3) {
final long startTime = System.nanoTime();
final AtomicLong previous = new AtomicLong(startTime);
final AtomicBoolean tryLongerDelay = new AtomicBoolean(false);
final int cycles = 8;
final CountDownLatch done = new CountDownLatch(cycles);
final int d = delay;
final Runnable task = new CheckedRunnable() {
public void realRun() {
long now = System.nanoTime();
long elapsedMillis
= NANOSECONDS.toMillis(now - previous.get());
if (elapsedMillis >= (done.getCount() == cycles ? d : 2 * d))
tryLongerDelay.set(true);
previous.set(now);
assertTrue(done.getCount() > 0);
done.countDown();
}};
final ScheduledFuture<?> periodicTask =
p.scheduleWithFixedDelay(task, 0, delay, MILLISECONDS);
final int totalDelayMillis = (cycles - 1) * delay;
await(done, totalDelayMillis + cycles * LONG_DELAY_MS);
final long elapsedMillis = millisElapsedSince(startTime);
assertTrue(elapsedMillis >= totalDelayMillis);
if (!tryLongerDelay.get())
return;
periodicTask.cancel(true); // retry with longer delay
}
fail("unexpected execution rate");
}
}
/**
* Submitting null tasks throws NullPointerException
*/
public void testNullTaskSubmission() {
final ForkJoinPool p = new ForkJoinPool(1);
try (PoolCleaner cleaner = cleaner(p)) {
assertNullTaskSubmissionThrowsNullPointerException(p);
}
}
/**
* Submitted tasks are rejected when shutdown
*/
public void testSubmittedTasksRejectedWhenShutdown() throws InterruptedException {
final ForkJoinPool p = new ForkJoinPool(4);
final ThreadLocalRandom rnd = ThreadLocalRandom.current();
final CountDownLatch threadsStarted = new CountDownLatch(p.getParallelism());
final CountDownLatch done = new CountDownLatch(1);
final Runnable r = () -> {
threadsStarted.countDown();
for (;;) {
try {
done.await();
return;
} catch (InterruptedException shutdownNowDeliberatelyIgnored) {}
}};
final Callable<Boolean> c = () -> {
threadsStarted.countDown();
for (;;) {
try {
done.await();
return Boolean.TRUE;
} catch (InterruptedException shutdownNowDeliberatelyIgnored) {}
}};
try (PoolCleaner cleaner = cleaner(p, done)) {
for (int i = p.getParallelism(); i-- > 0; ) {
switch (rnd.nextInt(4)) {
case 0: p.execute(r); break;
case 1: assertFalse(p.submit(r).isDone()); break;
case 2: assertFalse(p.submit(r, Boolean.TRUE).isDone()); break;
case 3: assertFalse(p.submit(c).isDone()); break;
}
}
await(threadsStarted);
p.shutdown();
done.countDown(); // release blocking tasks
assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS));
}
}
/**
* A fixed delay task with overflowing period should not prevent a
* one-shot task from executing.
* https://bugs.openjdk.org/browse/JDK-8051859
*/
@SuppressWarnings("FutureReturnValueIgnored")
public void testScheduleWithFixedDelay_overflow() throws Exception {
final CountDownLatch delayedDone = new CountDownLatch(1);
final CountDownLatch immediateDone = new CountDownLatch(1);
final ForkJoinPool p = new ForkJoinPool(2);
try (PoolCleaner cleaner = cleaner(p)) {
final Runnable delayed = () -> {
delayedDone.countDown();
p.submit(() -> immediateDone.countDown());
};
p.scheduleWithFixedDelay(delayed, 0L, Long.MAX_VALUE, SECONDS);
await(delayedDone);
await(immediateDone);
}
}
/**
* shutdownNow cancels tasks that were not run
*/
public void testShutdownNow_delayedTasks() throws InterruptedException {
final ForkJoinPool p = new ForkJoinPool(2);
List<ScheduledFuture<?>> tasks = new ArrayList<>();
for (int i = 0; i < 3; i++) {
Runnable r = new NoOpRunnable();
tasks.add(p.schedule(r, 9, SECONDS));
tasks.add(p.scheduleAtFixedRate(r, 9, 9, SECONDS));
tasks.add(p.scheduleWithFixedDelay(r, 9, 9, SECONDS));
}
p.shutdownNow();
assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS));
for (ScheduledFuture<?> task : tasks) {
assertTrue(task.isDone());
}
assertTrue(p.isTerminated());
}
/**
* submitWithTimeout (eventually) cancels task after timeout
*/
public void testSubmitWithTimeoutCancels() throws InterruptedException {
final ForkJoinPool p = ForkJoinPool.commonPool();
Callable<Boolean> c = new Callable<Boolean>() {
public Boolean call() throws Exception {
Thread.sleep(LONGER_DELAY_MS); return Boolean.TRUE; }};
ForkJoinTask<?> task = p.submitWithTimeout(c, 1, NANOSECONDS, null);
Thread.sleep(timeoutMillis());
assertTrue(task.isCancelled());
}
static final class SubmitWithTimeoutException extends RuntimeException {}
/**
* submitWithTimeout using complete completes after timeout
*/
public void testSubmitWithCompleterTimeoutCompletes() throws InterruptedException {
final ForkJoinPool p = ForkJoinPool.commonPool();
Callable<Item> c = new Callable<Item>() {
public Item call() throws Exception {
Thread.sleep(LONGER_DELAY_MS); return one; }};
ForkJoinTask<Item> task = p.submitWithTimeout(
c, 1, NANOSECONDS,
(ForkJoinTask<Item> t) ->
t.complete(two));
Thread.sleep(timeoutMillis());
assertEquals(task.join(), two);
}
/**
* submitWithTimeout using completeExceptionally throws after timeout
*/
public void testSubmitWithTimeoutThrows() throws InterruptedException {
final ForkJoinPool p = ForkJoinPool.commonPool();
Callable<Boolean> c = new Callable<Boolean>() {
public Boolean call() throws Exception {
Thread.sleep(LONGER_DELAY_MS); return Boolean.TRUE; }};
ForkJoinTask<Boolean> task = p.submitWithTimeout(
c, 1, NANOSECONDS,
(ForkJoinTask<Boolean> t) ->
t.completeExceptionally(new SubmitWithTimeoutException()));
Thread.sleep(timeoutMillis());
try {
task.join();
shouldThrow();
}
catch (Exception ex) {
assertTrue(ex instanceof SubmitWithTimeoutException);
}
}
/**
* submitWithTimeout doesn't cancel if completed before timeout
*/
public void testSubmitWithTimeout_NoTimeout() throws InterruptedException {
final ForkJoinPool p = ForkJoinPool.commonPool();
Callable<Boolean> c = new Callable<Boolean>() {
public Boolean call() throws Exception {
return Boolean.TRUE; }};
ForkJoinTask<?> task = p.submitWithTimeout(c, LONGER_DELAY_MS, MILLISECONDS, null);
Thread.sleep(timeoutMillis());
assertFalse(task.isCancelled());
assertEquals(task.join(), Boolean.TRUE);
}
/**
* A delayed task completes (possibly abnormally) if shutdown after
* calling cancelDelayedTasksOnShutdown()
*/
public void testCancelDelayedTasksOnShutdown() throws Exception {
final ForkJoinPool p = new ForkJoinPool(2);
p.cancelDelayedTasksOnShutdown();
try (PoolCleaner cleaner = cleaner(p)) {
Callable<Boolean> task = new CheckedCallable<>() {
public Boolean realCall() {
return Boolean.TRUE;
}};
Future<Boolean> f = p.schedule(task, LONGER_DELAY_MS, MILLISECONDS);
p.shutdown();
assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS));
assertTrue(f.isDone());
}
}
}

View File

@ -178,10 +178,7 @@ public class SubmissionPublisherTest extends JSR166TestCase {
checkInitialState(p);
assertEquals(p.getMaxBufferCapacity(), Flow.defaultBufferSize());
Executor e = p.getExecutor(), c = ForkJoinPool.commonPool();
if (ForkJoinPool.getCommonPoolParallelism() > 1)
assertSame(e, c);
else
assertNotSame(e, c);
assertSame(e, c);
}
/**