From 8b0602dbed2f7ced190ec81753defab8a4bc316d Mon Sep 17 00:00:00 2001 From: Doug Lea Date: Mon, 31 Mar 2025 19:23:59 +0000 Subject: [PATCH] 8319447: Improve performance of delayed task handling Reviewed-by: vklang, alanb --- .../util/concurrent/CompletableFuture.java | 257 +++----- .../java/util/concurrent/DelayScheduler.java | 568 ++++++++++++++++ .../java/util/concurrent/ForkJoinPool.java | 614 +++++++++++++++--- .../java/util/concurrent/ForkJoinTask.java | 125 ++-- .../util/concurrent/SubmissionPublisher.java | 18 +- .../util/concurrent/ThreadLocalRandom.java | 47 +- .../util/concurrent/atomic/Striped64.java | 26 +- .../access/JavaUtilConcurrentTLRAccess.java | 2 + ...tableFutureOrTimeoutExceptionallyTest.java | 33 +- .../concurrent/tck/CompletableFutureTest.java | 13 +- .../concurrent/tck/ForkJoinPool20Test.java | 430 ++++++++++++ .../tck/SubmissionPublisherTest.java | 5 +- 12 files changed, 1765 insertions(+), 373 deletions(-) create mode 100644 src/java.base/share/classes/java/util/concurrent/DelayScheduler.java diff --git a/src/java.base/share/classes/java/util/concurrent/CompletableFuture.java b/src/java.base/share/classes/java/util/concurrent/CompletableFuture.java index d6904944050..7503c154ddb 100644 --- a/src/java.base/share/classes/java/util/concurrent/CompletableFuture.java +++ b/src/java.base/share/classes/java/util/concurrent/CompletableFuture.java @@ -46,6 +46,7 @@ import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; import java.util.Objects; +import static java.util.concurrent.DelayScheduler.ScheduledForkJoinTask; /** * A {@link Future} that may be explicitly completed (setting its @@ -69,18 +70,15 @@ import java.util.Objects; * a completion method. * *
  • All async methods without an explicit Executor - * argument are performed using the {@link ForkJoinPool#commonPool()} - * (unless it does not support a parallelism level of at least two, in - * which case, a new Thread is created to run each task). This may be - * overridden for non-static methods in subclasses by defining method - * {@link #defaultExecutor()}. To simplify monitoring, debugging, - * and tracking, all generated asynchronous tasks are instances of the - * marker interface {@link AsynchronousCompletionTask}. Operations - * with time-delays can use adapter methods defined in this class, for - * example: {@code supplyAsync(supplier, delayedExecutor(timeout, - * timeUnit))}. To support methods with delays and timeouts, this - * class maintains at most one daemon thread for triggering and - * cancelling actions, not for running them. + * argument, as well as those involving delays are performed using the + * {@link ForkJoinPool#commonPool()}. The default async Executor may + * be overridden for non-static methods in subclasses by defining + * method {@link #defaultExecutor()}. To simplify monitoring, + * debugging, and tracking, all generated asynchronous tasks are + * instances of the marker interface {@link + * AsynchronousCompletionTask}. Operations with time-delays can use + * adapter methods defined in this class, for example: {@code + * supplyAsync(supplier, delayedExecutor(timeout, timeUnit))}. * *
  • All CompletionStage methods are implemented independently of * other public methods, so the behavior of one method is not impacted @@ -473,34 +471,11 @@ public class CompletableFuture implements Future, CompletionStage { public static interface AsynchronousCompletionTask { } - private static final boolean USE_COMMON_POOL = - (ForkJoinPool.getCommonPoolParallelism() > 1); - /** - * Default executor -- ForkJoinPool.commonPool() unless it cannot - * support parallelism. + * Default Executor */ - private static final Executor ASYNC_POOL = USE_COMMON_POOL ? - ForkJoinPool.commonPool() : new ThreadPerTaskExecutor(); - - /** Fallback if ForkJoinPool.commonPool() cannot support parallelism */ - private static final class ThreadPerTaskExecutor implements Executor { - public void execute(Runnable r) { - Objects.requireNonNull(r); - new Thread(r).start(); - } - } - - /** - * Null-checks user executor argument, and translates uses of - * commonPool to ASYNC_POOL in case parallelism disabled. - */ - static Executor screenExecutor(Executor e) { - if (!USE_COMMON_POOL && e == ForkJoinPool.commonPool()) - return ASYNC_POOL; - if (e == null) throw new NullPointerException(); - return e; - } + private static final ForkJoinPool ASYNC_POOL = + ForkJoinPool.asyncCommonPool(); // ensures minimal parallelism // Modes for Completion.tryFire. Signedness matters. static final int SYNC = 0; @@ -702,8 +677,8 @@ public class CompletableFuture implements Future, CompletionStage { private CompletableFuture uniApplyStage( Executor e, Function f) { - if (f == null) throw new NullPointerException(); Object r; + Objects.requireNonNull(f); if ((r = result) != null) return uniApplyNow(r, e, f); CompletableFuture d = newIncompleteFuture(); @@ -775,8 +750,8 @@ public class CompletableFuture implements Future, CompletionStage { private CompletableFuture uniAcceptStage(Executor e, Consumer f) { - if (f == null) throw new NullPointerException(); Object r; + Objects.requireNonNull(f); if ((r = result) != null) return uniAcceptNow(r, e, f); CompletableFuture d = newIncompleteFuture(); @@ -843,8 +818,8 @@ public class CompletableFuture implements Future, CompletionStage { } private CompletableFuture uniRunStage(Executor e, Runnable f) { - if (f == null) throw new NullPointerException(); Object r; + Objects.requireNonNull(f); if ((r = result) != null) return uniRunNow(r, e, f); CompletableFuture d = newIncompleteFuture(); @@ -924,7 +899,7 @@ public class CompletableFuture implements Future, CompletionStage { private CompletableFuture uniWhenCompleteStage( Executor e, BiConsumer f) { - if (f == null) throw new NullPointerException(); + Objects.requireNonNull(f); CompletableFuture d = newIncompleteFuture(); Object r; if ((r = result) == null) @@ -987,7 +962,7 @@ public class CompletableFuture implements Future, CompletionStage { private CompletableFuture uniHandleStage( Executor e, BiFunction f) { - if (f == null) throw new NullPointerException(); + Objects.requireNonNull(f); CompletableFuture d = newIncompleteFuture(); Object r; if ((r = result) == null) @@ -1045,7 +1020,7 @@ public class CompletableFuture implements Future, CompletionStage { private CompletableFuture uniExceptionallyStage( Executor e, Function f) { - if (f == null) throw new NullPointerException(); + Objects.requireNonNull(f); CompletableFuture d = newIncompleteFuture(); Object r; if ((r = result) == null) @@ -1105,7 +1080,7 @@ public class CompletableFuture implements Future, CompletionStage { private CompletableFuture uniComposeExceptionallyStage( Executor e, Function> f) { - if (f == null) throw new NullPointerException(); + Objects.requireNonNull(f); CompletableFuture d = newIncompleteFuture(); Object r, s; Throwable x; if ((r = result) == null) @@ -1212,7 +1187,7 @@ public class CompletableFuture implements Future, CompletionStage { private CompletableFuture uniComposeStage( Executor e, Function> f) { - if (f == null) throw new NullPointerException(); + Objects.requireNonNull(f); CompletableFuture d = newIncompleteFuture(); Object r, s; Throwable x; if ((r = result) == null) @@ -1823,7 +1798,7 @@ public class CompletableFuture implements Future, CompletionStage { static CompletableFuture asyncSupplyStage(Executor e, Supplier f) { - if (f == null) throw new NullPointerException(); + Objects.requireNonNull(f); CompletableFuture d = new CompletableFuture(); e.execute(new AsyncSupply(d, f)); return d; @@ -1859,7 +1834,7 @@ public class CompletableFuture implements Future, CompletionStage { } static CompletableFuture asyncRunStage(Executor e, Runnable f) { - if (f == null) throw new NullPointerException(); + Objects.requireNonNull(f); CompletableFuture d = new CompletableFuture(); e.execute(new AsyncRun(d, f)); return d; @@ -2048,7 +2023,7 @@ public class CompletableFuture implements Future, CompletionStage { */ public static CompletableFuture supplyAsync(Supplier supplier, Executor executor) { - return asyncSupplyStage(screenExecutor(executor), supplier); + return asyncSupplyStage(Objects.requireNonNull(executor), supplier); } /** @@ -2076,7 +2051,7 @@ public class CompletableFuture implements Future, CompletionStage { */ public static CompletableFuture runAsync(Runnable runnable, Executor executor) { - return asyncRunStage(screenExecutor(executor), runnable); + return asyncRunStage(Objects.requireNonNull(executor), runnable); } /** @@ -2241,7 +2216,7 @@ public class CompletableFuture implements Future, CompletionStage { * to transition to a completed state, else {@code false} */ public boolean completeExceptionally(Throwable ex) { - if (ex == null) throw new NullPointerException(); + Objects.requireNonNull(ex); boolean triggered = internalComplete(new AltResult(ex)); postComplete(); return triggered; @@ -2259,7 +2234,7 @@ public class CompletableFuture implements Future, CompletionStage { public CompletableFuture thenApplyAsync( Function fn, Executor executor) { - return uniApplyStage(screenExecutor(executor), fn); + return uniApplyStage(Objects.requireNonNull(executor), fn); } public CompletableFuture thenAccept(Consumer action) { @@ -2272,7 +2247,7 @@ public class CompletableFuture implements Future, CompletionStage { public CompletableFuture thenAcceptAsync(Consumer action, Executor executor) { - return uniAcceptStage(screenExecutor(executor), action); + return uniAcceptStage(Objects.requireNonNull(executor), action); } public CompletableFuture thenRun(Runnable action) { @@ -2285,7 +2260,7 @@ public class CompletableFuture implements Future, CompletionStage { public CompletableFuture thenRunAsync(Runnable action, Executor executor) { - return uniRunStage(screenExecutor(executor), action); + return uniRunStage(Objects.requireNonNull(executor), action); } public CompletableFuture thenCombine( @@ -2303,7 +2278,7 @@ public class CompletableFuture implements Future, CompletionStage { public CompletableFuture thenCombineAsync( CompletionStage other, BiFunction fn, Executor executor) { - return biApplyStage(screenExecutor(executor), other, fn); + return biApplyStage(Objects.requireNonNull(executor), other, fn); } public CompletableFuture thenAcceptBoth( @@ -2321,7 +2296,7 @@ public class CompletableFuture implements Future, CompletionStage { public CompletableFuture thenAcceptBothAsync( CompletionStage other, BiConsumer action, Executor executor) { - return biAcceptStage(screenExecutor(executor), other, action); + return biAcceptStage(Objects.requireNonNull(executor), other, action); } public CompletableFuture runAfterBoth(CompletionStage other, @@ -2337,7 +2312,7 @@ public class CompletableFuture implements Future, CompletionStage { public CompletableFuture runAfterBothAsync(CompletionStage other, Runnable action, Executor executor) { - return biRunStage(screenExecutor(executor), other, action); + return biRunStage(Objects.requireNonNull(executor), other, action); } public CompletableFuture applyToEither( @@ -2353,7 +2328,7 @@ public class CompletableFuture implements Future, CompletionStage { public CompletableFuture applyToEitherAsync( CompletionStage other, Function fn, Executor executor) { - return orApplyStage(screenExecutor(executor), other, fn); + return orApplyStage(Objects.requireNonNull(executor), other, fn); } public CompletableFuture acceptEither( @@ -2369,7 +2344,7 @@ public class CompletableFuture implements Future, CompletionStage { public CompletableFuture acceptEitherAsync( CompletionStage other, Consumer action, Executor executor) { - return orAcceptStage(screenExecutor(executor), other, action); + return orAcceptStage(Objects.requireNonNull(executor), other, action); } public CompletableFuture runAfterEither(CompletionStage other, @@ -2385,7 +2360,7 @@ public class CompletableFuture implements Future, CompletionStage { public CompletableFuture runAfterEitherAsync(CompletionStage other, Runnable action, Executor executor) { - return orRunStage(screenExecutor(executor), other, action); + return orRunStage(Objects.requireNonNull(executor), other, action); } public CompletableFuture thenCompose( @@ -2401,7 +2376,7 @@ public class CompletableFuture implements Future, CompletionStage { public CompletableFuture thenComposeAsync( Function> fn, Executor executor) { - return uniComposeStage(screenExecutor(executor), fn); + return uniComposeStage(Objects.requireNonNull(executor), fn); } public CompletableFuture whenComplete( @@ -2416,7 +2391,7 @@ public class CompletableFuture implements Future, CompletionStage { public CompletableFuture whenCompleteAsync( BiConsumer action, Executor executor) { - return uniWhenCompleteStage(screenExecutor(executor), action); + return uniWhenCompleteStage(Objects.requireNonNull(executor), action); } public CompletableFuture handle( @@ -2431,7 +2406,7 @@ public class CompletableFuture implements Future, CompletionStage { public CompletableFuture handleAsync( BiFunction fn, Executor executor) { - return uniHandleStage(screenExecutor(executor), fn); + return uniHandleStage(Objects.requireNonNull(executor), fn); } /** @@ -2461,7 +2436,7 @@ public class CompletableFuture implements Future, CompletionStage { */ public CompletableFuture exceptionallyAsync( Function fn, Executor executor) { - return uniExceptionallyStage(screenExecutor(executor), fn); + return uniExceptionallyStage(Objects.requireNonNull(executor), fn); } /** @@ -2486,7 +2461,7 @@ public class CompletableFuture implements Future, CompletionStage { public CompletableFuture exceptionallyComposeAsync( Function> fn, Executor executor) { - return uniComposeExceptionallyStage(screenExecutor(executor), fn); + return uniComposeExceptionallyStage(Objects.requireNonNull(executor), fn); } /* ------------- Arbitrary-arity constructions -------------- */ @@ -2652,8 +2627,7 @@ public class CompletableFuture implements Future, CompletionStage { * @throws NullPointerException if the exception is null */ public void obtrudeException(Throwable ex) { - if (ex == null) throw new NullPointerException(); - result = new AltResult(ex); + result = new AltResult(Objects.requireNonNull(ex)); postComplete(); } @@ -2784,9 +2758,8 @@ public class CompletableFuture implements Future, CompletionStage { */ public CompletableFuture completeAsync(Supplier supplier, Executor executor) { - if (supplier == null || executor == null) - throw new NullPointerException(); - executor.execute(new AsyncSupply(this, supplier)); + executor.execute(new AsyncSupply( + this, Objects.requireNonNull(supplier))); return this; } @@ -2817,11 +2790,8 @@ public class CompletableFuture implements Future, CompletionStage { * @since 9 */ public CompletableFuture orTimeout(long timeout, TimeUnit unit) { - if (unit == null) - throw new NullPointerException(); - if (result == null) - whenComplete(new Canceller(Delayer.delay(new Timeout(this), - timeout, unit))); + arrangeTimeout(unit.toNanos(timeout), // Implicit null-check of unit + new Timeout(this, null, true)); return this; } @@ -2839,15 +2809,53 @@ public class CompletableFuture implements Future, CompletionStage { */ public CompletableFuture completeOnTimeout(T value, long timeout, TimeUnit unit) { - if (unit == null) - throw new NullPointerException(); - if (result == null) - whenComplete(new Canceller(Delayer.delay( - new DelayedCompleter(this, value), - timeout, unit))); + arrangeTimeout(unit.toNanos(timeout), + new Timeout(this, value, false)); return this; } + /** Action to complete (possibly exceptionally) on timeout */ + static final class Timeout implements Runnable { + final CompletableFuture f; + final U value; + final boolean exceptional; + Timeout(CompletableFuture f, U value, boolean exceptional) { + this.f = f; this.value = value; this.exceptional = exceptional; + } + public void run() { + if (f != null && !f.isDone()) { + if (exceptional) + f.completeExceptionally(new TimeoutException()); + else + f.complete(value); + } + } + } + + /** Action to cancel unneeded timeouts */ + static final class Canceller implements BiConsumer { + final Future f; + Canceller(Future f) { this.f = f; } + public void accept(Object ignore, Throwable ex) { + if (f != null) // currently never null + f.cancel(false); + } + } + + /** + * Schedules a timeout action, as well as whenComplete handling to + * cancel the action if not needed. + */ + private void arrangeTimeout(long nanoDelay, Timeout onTimeout) { + ForkJoinPool e = ASYNC_POOL; + if (result == null) { + ScheduledForkJoinTask t = new ScheduledForkJoinTask( + nanoDelay, 0L, true, onTimeout, null, e); + whenComplete(new Canceller(t)); + e.scheduleDelayedTask(t); + } + } + /** * Returns a new Executor that submits a task to the given base * executor after the given delay (or no delay if non-positive). @@ -2863,9 +2871,8 @@ public class CompletableFuture implements Future, CompletionStage { */ public static Executor delayedExecutor(long delay, TimeUnit unit, Executor executor) { - if (unit == null || executor == null) - throw new NullPointerException(); - return new DelayedExecutor(delay, unit, executor); + return new DelayedExecutor(unit.toNanos(delay), // implicit null check + Objects.requireNonNull(executor)); } /** @@ -2881,9 +2888,8 @@ public class CompletableFuture implements Future, CompletionStage { * @since 9 */ public static Executor delayedExecutor(long delay, TimeUnit unit) { - if (unit == null) - throw new NullPointerException(); - return new DelayedExecutor(delay, unit, ASYNC_POOL); + return new DelayedExecutor(unit.toNanos(delay), // implicit null check + ASYNC_POOL); } /** @@ -2910,8 +2916,7 @@ public class CompletableFuture implements Future, CompletionStage { * @since 9 */ public static CompletableFuture failedFuture(Throwable ex) { - if (ex == null) throw new NullPointerException(); - return new CompletableFuture(new AltResult(ex)); + return new CompletableFuture(new AltResult(Objects.requireNonNull(ex))); } /** @@ -2925,48 +2930,23 @@ public class CompletableFuture implements Future, CompletionStage { * @since 9 */ public static CompletionStage failedStage(Throwable ex) { - if (ex == null) throw new NullPointerException(); - return new MinimalStage(new AltResult(ex)); - } - - /** - * Singleton delay scheduler, used only for starting and - * cancelling tasks. - */ - static final class Delayer { - static ScheduledFuture delay(Runnable command, long delay, - TimeUnit unit) { - return delayer.schedule(command, delay, unit); - } - - static final class DaemonThreadFactory implements ThreadFactory { - public Thread newThread(Runnable r) { - Thread t = new Thread(r); - t.setDaemon(true); - t.setName("CompletableFutureDelayScheduler"); - return t; - } - } - - static final ScheduledThreadPoolExecutor delayer; - static { - (delayer = new ScheduledThreadPoolExecutor( - 1, new DaemonThreadFactory())). - setRemoveOnCancelPolicy(true); - } + return new MinimalStage(new AltResult(Objects.requireNonNull(ex))); } // Little class-ified lambdas to better support monitoring static final class DelayedExecutor implements Executor { - final long delay; - final TimeUnit unit; + final long nanoDelay; final Executor executor; - DelayedExecutor(long delay, TimeUnit unit, Executor executor) { - this.delay = delay; this.unit = unit; this.executor = executor; + DelayedExecutor(long nanoDelay, Executor executor) { + this.nanoDelay = nanoDelay; this.executor = executor; } public void execute(Runnable r) { - Delayer.delay(new TaskSubmitter(executor, r), delay, unit); + ForkJoinPool e = ASYNC_POOL; // Use immediate mode to relay task + e.scheduleDelayedTask( + new ScheduledForkJoinTask( + nanoDelay, 0L, true, + new TaskSubmitter(executor, r), null, e)); } } @@ -2981,37 +2961,6 @@ public class CompletableFuture implements Future, CompletionStage { public void run() { executor.execute(action); } } - /** Action to completeExceptionally on timeout */ - static final class Timeout implements Runnable { - final CompletableFuture f; - Timeout(CompletableFuture f) { this.f = f; } - public void run() { - if (f != null && !f.isDone()) - f.completeExceptionally(new TimeoutException()); - } - } - - /** Action to complete on timeout */ - static final class DelayedCompleter implements Runnable { - final CompletableFuture f; - final U u; - DelayedCompleter(CompletableFuture f, U u) { this.f = f; this.u = u; } - public void run() { - if (f != null) - f.complete(u); - } - } - - /** Action to cancel unneeded timeouts */ - static final class Canceller implements BiConsumer { - final Future f; - Canceller(Future f) { this.f = f; } - public void accept(Object ignore, Throwable ex) { - if (f != null && !f.isDone()) - f.cancel(false); - } - } - /** * A subclass that just throws UOE for most non-CompletionStage methods. */ diff --git a/src/java.base/share/classes/java/util/concurrent/DelayScheduler.java b/src/java.base/share/classes/java/util/concurrent/DelayScheduler.java new file mode 100644 index 00000000000..358d3b69f1e --- /dev/null +++ b/src/java.base/share/classes/java/util/concurrent/DelayScheduler.java @@ -0,0 +1,568 @@ +/* + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. Oracle designates this + * particular file as subject to the "Classpath" exception as provided + * by Oracle in the LICENSE file that accompanied this code. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ + +/* + * This file is available under and governed by the GNU General Public + * License version 2 only, as published by the Free Software Foundation. + * However, the following notice accompanied the original version of this + * file: + * + * Written by Doug Lea with assistance from members of JCP JSR-166 + * Expert Group and released to the public domain, as explained at + * http://creativecommons.org/publicdomain/zero/1.0/ + */ + +package java.util.concurrent; + +import java.util.Arrays; +import jdk.internal.misc.Unsafe; +import static java.util.concurrent.TimeUnit.NANOSECONDS; + +/** + * An add-on for ForkJoinPools that provides scheduling for + * delayed and periodic tasks + */ +final class DelayScheduler extends Thread { + + /* + * A DelayScheduler maintains a 4-ary heap (see + * https://en.wikipedia.org/wiki/D-ary_heap) based on trigger + * times (field ScheduledForkJoinTask.when) along with a pending + * queue of tasks submitted by other threads. When enabled (their + * delays elapse), tasks are relayed to the pool, or run directly + * if task.isImmediate. Immediate mode is designed for internal + * jdk usages in which the (non-blocking) action is to cancel, + * unblock or differently relay another async task (in some cases, + * this relies on user code to trigger async tasks). If + * processing encounters resource failures (possible when growing + * heap or ForkJoinPool WorkQueue arrays), tasks are cancelled. + * + * To reduce memory contention, the heap is maintained solely via + * local variables in method loop() (forcing noticeable code + * sprawl), recording only the current heap size when blocked to + * allow method canShutDown to conservatively check emptiness, and + * to report approximate current size for monitoring. + * + * The pending queue uses a design similar to ForkJoinTask.Aux + * queues: Incoming requests prepend (Treiber-stack-style) to the + * pending list. The scheduler thread takes and nulls out the + * entire list per step to process them as a batch. The pending + * queue may encounter contention and retries among requesters, + * but much less so versus the scheduler. It is possible to use + * multiple pending queues to reduce this form of contention but + * it doesn't seem worthwhile even under heavy loads. + * + * The implementation relies on the scheduler being a non-virtual + * final Thread subclass. Field "active" records whether the + * scheduler may have any pending tasks (and/or shutdown actions) + * to process, otherwise parking either indefinitely or until the + * next task deadline. Incoming pending tasks ensure active + * status, unparking if necessary. The scheduler thread sets + * status to inactive when there is apparently no work, and then + * rechecks before actually parking. The active field takes on a + * negative value on termination, as a sentinel used in pool + * termination checks as well as to suppress reactivation after + * terminating. + * + * We avoid the need for auxilliary data structures by embedding + * pending queue links, heap indices, and pool references inside + * ScheduledForkJoinTasks. (We use the same structure for both + * Runnable and Callable versions, since including an extra field + * in either case doesn't hurt -- it seems mildly preferable for + * these objects to be larger than other kinds of tasks to reduce + * false sharing during possibly frequent bookkeeping updates.) To + * reduce GC pressure and memory retention, these are nulled out + * as soon as possible. + * + * The implementation is designed to accommodate usages in which + * many or even most tasks are cancelled before executing (which + * is typical with IO-based timeouts). The use of a 4-ary heap (in + * which each element has up to 4 children) improves locality, and + * reduces the need for array movement and memory writes compared + * to a standard binary heap, at the expense of more expensive + * replace() operations (with about half the writes but twice the + * reads). Especially in the presence of cancellations, this is + * often faster because the replace method removes cancelled tasks + * seen while performing sift-down operations, in which case these + * elements are not further recorded or accessed, even before + * processing the removal request generated by method + * ScheduledForkJoinTask.cancel() (which is then a no-op or not + * generated at all). + * + * To ensure that comparisons do not encounter integer wraparound + * errors, times are offset with the most negative possible value + * (nanoTimeOffset) determined during static initialization. + * Negative delays are screened out before use. + * + * Upon noticing pool shutdown, delayed and/or periodic tasks are + * purged according to pool configuration and policy; the + * scheduler then tries to terminate the pool if the heap is + * empty. The asynchronicity of these steps with respect to pool + * runState weakens guarantees about exactly when purged tasks + * report isCancelled to callers (they do not run, but there may + * be a lag setting their status). + */ + + ForkJoinPool pool; // read once and detached upon starting + volatile ScheduledForkJoinTask pending; // submitted adds and removes + volatile int active; // 0: inactive, -1: stopped, +1: running + int restingSize; // written only before parking + volatile int cancelDelayedTasksOnShutdown; // policy control + int pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7; + int pad8, pad9, padA, padB, padC, padD, padE; // reduce false sharing + + private static final int INITIAL_HEAP_CAPACITY = 1 << 6; + private static final int POOL_STOPPING = 1; // must match ForkJoinPool + static final long nanoTimeOffset = // Most negative possible time base + Math.min(System.nanoTime(), 0L) + Long.MIN_VALUE; + + private static final Unsafe U; // for atomic operations + private static final long ACTIVE; + private static final long PENDING; + static { + U = Unsafe.getUnsafe(); + Class klass = DelayScheduler.class; + ACTIVE = U.objectFieldOffset(klass, "active"); + PENDING = U.objectFieldOffset(klass, "pending"); + } + + DelayScheduler(ForkJoinPool p, String name) { + super(name); + setDaemon(true); + pool = p; + } + + /** + * Returns System.nanoTime() with nanoTimeOffset + */ + static final long now() { + return nanoTimeOffset + System.nanoTime(); + } + + /** + * Ensures the scheduler is not parked unless stopped. + * Returns negative if already stopped + */ + final int signal() { + int state; + if ((state = active) == 0 && U.getAndBitwiseOrInt(this, ACTIVE, 1) == 0) + U.unpark(this); + return state; + } + + /** + * Inserts the task (if nonnull) to pending queue, to add, + * remove, or ignore depending on task status when processed. + */ + final void pend(ScheduledForkJoinTask task) { + ScheduledForkJoinTask f = pending; + if (task != null) { + do {} while ( + f != (f = (ScheduledForkJoinTask) + U.compareAndExchangeReference( + this, PENDING, task.nextPending = f, task))); + signal(); + } + } + + /** + * Returns true if (momentarily) inactive and heap is empty + */ + final boolean canShutDown() { + return (active <= 0 && restingSize <= 0); + } + + /** + * Returns the number of elements in heap when last idle + */ + final int lastStableSize() { + return (active < 0) ? 0 : restingSize; + } + + /** + * Turns on cancelDelayedTasksOnShutdown policy + */ + final void cancelDelayedTasksOnShutdown() { + cancelDelayedTasksOnShutdown = 1; + signal(); + } + + /** + * Sets up and runs scheduling loop + */ + public final void run() { + ForkJoinPool p = pool; + pool = null; // detach + if (p == null) // failed initialization + active = -1; + else { + try { + loop(p); + } finally { + restingSize = 0; + active = -1; + p.tryStopIfShutdown(this); + } + } + } + + /** + * After initialization, repeatedly: + * 1. If apparently no work, + * if active, set tentatively inactive, + * else park until next trigger time, or indefinitely if none + * 2. Process pending tasks in batches, to add or remove from heap + * 3. Check for shutdown, either exiting or preparing for shutdown when empty + * 4. Trigger all enabled tasks by submitting them to pool or run if immediate + */ + private void loop(ForkJoinPool p) { + if (p != null) { // currently always true + ScheduledForkJoinTask[] h = // heap array + new ScheduledForkJoinTask[INITIAL_HEAP_CAPACITY]; + int cap = h.length, n = 0, prevRunStatus = 0; // n is heap size + long parkTime = 0L; // zero for untimed park + for (;;) { // loop until stopped + ScheduledForkJoinTask q, t; int runStatus; + if ((q = pending) == null) { + restingSize = n; + if (active != 0) // deactivate and recheck + U.compareAndSetInt(this, ACTIVE, 1, 0); + else { + Thread.interrupted(); // clear before park + U.park(false, parkTime); + } + q = pending; + } + + while (q != null && // process pending tasks + (t = (ScheduledForkJoinTask) + U.getAndSetReference(this, PENDING, null)) != null) { + ScheduledForkJoinTask next; + do { + int i; + if ((next = t.nextPending) != null) + t.nextPending = null; + if ((i = t.heapIndex) >= 0) { + t.heapIndex = -1; // remove cancelled task + if (i < cap && h[i] == t) + n = replace(h, i, n); + } + else if (n >= cap || n < 0) + t.trySetCancelled(); // couldn't resize + else { + long d = t.when; // add and sift up + if (t.status >= 0) { + ScheduledForkJoinTask parent; + int k = n++, pk, newCap; + while (k > 0 && + (parent = h[pk = (k - 1) >>> 2]) != null && + (parent.when > d)) { + parent.heapIndex = k; + h[k] = parent; + k = pk; + } + t.heapIndex = k; + h[k] = t; + if (n >= cap && (newCap = cap << 1) > cap) { + ScheduledForkJoinTask[] a = null; + try { // try to resize + a = Arrays.copyOf(h, newCap); + } catch (Error | RuntimeException ex) { + } + if (a != null && a.length == newCap) { + cap = newCap; + h = a; // else keep using old array + } + } + } + } + } while ((t = next) != null); + q = pending; + } + + if ((runStatus = p.shutdownStatus(this)) != 0) { + if ((n = tryStop(p, h, n, runStatus, prevRunStatus)) < 0) + break; + prevRunStatus = runStatus; + } + + parkTime = 0L; + if (n > 0 && h.length > 0) { // submit enabled tasks + long now = now(); + do { + ScheduledForkJoinTask f; int stat; + if ((f = h[0]) != null) { + long d = f.when - now; + if ((stat = f.status) >= 0 && d > 0L) { + parkTime = d; + break; + } + f.heapIndex = -1; + if (stat < 0) + ; // already cancelled + else if (f.isImmediate) + f.doExec(); + else { + try { + p.executeEnabledScheduledTask(f); + } + catch (Error | RuntimeException ex) { + f.trySetCancelled(); + } + } + } + } while ((n = replace(h, 0, n)) > 0); + } + } + } + } + + /** + * Replaces removed heap element at index k, along with other + * cancelled nodes found while doing so. + * @return current heap size + */ + private static int replace(ScheduledForkJoinTask[] h, int k, int n) { + if (h != null && h.length >= n) { // hoist checks + while (k >= 0 && n > k) { + int alsoReplace = -1; // non-negative if cancelled task seen + ScheduledForkJoinTask t = null, u; + long d = 0L; + while (--n > k) { // find uncancelled replacement + if ((u = h[n]) != null) { + h[n] = null; + d = u.when; + if (u.status >= 0) { + t = u; + break; + } + u.heapIndex = -1; + } + } + if (t != null) { // sift down + for (int cs; (cs = (k << 2) + 1) < n; ) { + ScheduledForkJoinTask leastChild = null, c; + int leastIndex = 0; + long leastValue = Long.MAX_VALUE; + for (int ck = cs, j = 4;;) { // at most 4 children + if ((c = h[ck]) == null) + break; + long cd = c.when; + if (c.status < 0 && alsoReplace < 0) { + alsoReplace = ck; // at most once per pass + c.heapIndex = -1; + } + else if (leastChild == null || cd < leastValue) { + leastValue = cd; + leastIndex = ck; + leastChild = c; + } + if (--j == 0 || ++ck >= n) + break; + } + if (leastChild == null || d <= leastValue) + break; + leastChild.heapIndex = k; + h[k] = leastChild; + k = leastIndex; + } + t.heapIndex = k; + } + h[k] = t; + k = alsoReplace; + } + } + return n; + } + + /** + * Call only when pool run status is nonzero. Possibly cancels + * tasks and stops during pool shutdown and termination. If called + * when shutdown but not stopping, removes tasks according to + * policy if not already done so, and if not empty or pool not + * terminating, returns. Otherwise, cancels all tasks in heap and + * pending queue. + * @return negative if stop, else current heap size. + */ + private int tryStop(ForkJoinPool p, ScheduledForkJoinTask[] h, int n, + int runStatus, int prevRunStatus) { + if ((runStatus & POOL_STOPPING) == 0) { + if (n > 0) { + if (cancelDelayedTasksOnShutdown != 0) { + cancelAll(h, n); + n = 0; + } + else if (prevRunStatus == 0 && h != null && h.length >= n) { + ScheduledForkJoinTask t; int stat; // remove periodic tasks + for (int i = n - 1; i >= 0; --i) { + if ((t = h[i]) != null && + ((stat = t.status) < 0 || t.nextDelay != 0L)) { + t.heapIndex = -1; + if (stat >= 0) + t.trySetCancelled(); + n = replace(h, i, n); + } + } + } + } + if (n > 0 || p == null || !p.tryStopIfShutdown(this)) + return n; // check for quiescent shutdown + } + if (n > 0) + cancelAll(h, n); + for (ScheduledForkJoinTask a = (ScheduledForkJoinTask) + U.getAndSetReference(this, PENDING, null); + a != null; a = a.nextPending) + a.trySetCancelled(); // clear pending requests + return -1; + } + + private static void cancelAll(ScheduledForkJoinTask[] h, int n) { + if (h != null && h.length >= n) { + ScheduledForkJoinTask t; + for (int i = 0; i < n; ++i) { + if ((t = h[i]) != null) { + h[i] = null; + t.heapIndex = -1; + t.trySetCancelled(); + } + } + } + } + + /** + * Task class for DelayScheduler operations + */ + @SuppressWarnings("serial") // Not designed to be serializable + static final class ScheduledForkJoinTask + extends ForkJoinTask.InterruptibleTask + implements ScheduledFuture { + ForkJoinPool pool; // nulled out after use + Runnable runnable; // at most one of runnable or callable nonnull + Callable callable; + T result; + ScheduledForkJoinTask nextPending; // for DelayScheduler pending queue + long when; // nanoTime-based trigger time + final long nextDelay; // 0: once; <0: fixedDelay; >0: fixedRate + int heapIndex; // if non-negative, index on heap + final boolean isImmediate; // run by scheduler vs submitted when ready + + /** + * Creates a new ScheduledForkJoinTask + * @param delay initial delay, in nanoseconds + * @param nextDelay 0 for one-shot, negative for fixed delay, + * positive for fixed rate, in nanoseconds + * @param isImmediate if action is to be performed + * by scheduler versus submitting to a WorkQueue + * @param runnable action (null if implementing callable version) + * @param callable function (null if implementing runnable versions) + * @param pool the pool for resubmissions and cancellations + * (disabled if null) + */ + public ScheduledForkJoinTask(long delay, long nextDelay, + boolean isImmediate, Runnable runnable, + Callable callable, ForkJoinPool pool) { + this.when = DelayScheduler.now() + Math.max(delay, 0L); + this.heapIndex = -1; + this.nextDelay = nextDelay; + this.isImmediate = isImmediate; + this.runnable = runnable; + this.callable = callable; + this.pool = pool; + } + + public void schedule() { // relay to pool, to allow independent use + ForkJoinPool p; + if ((p = pool) != null) // else already run + p.scheduleDelayedTask(this); + } + + // InterruptibleTask methods + public final T getRawResult() { return result; } + public final void setRawResult(T v) { result = v; } + final Object adaptee() { return (runnable != null) ? runnable : callable; } + + final T compute() throws Exception { + Callable c; Runnable r; + T res = null; + if ((r = runnable) != null) + r.run(); + else if ((c = callable) != null) + res = c.call(); + return res; + } + + final boolean postExec() { // possibly resubmit + long d; ForkJoinPool p; DelayScheduler ds; + if ((d = nextDelay) != 0L && // is periodic + status >= 0 && // not abnormally completed + (p = pool) != null && (ds = p.delayScheduler) != null) { + if (p.shutdownStatus(ds) == 0) { + heapIndex = -1; + if (d < 0L) + when = DelayScheduler.now() - d; + else + when += d; + ds.pend(this); + return false; + } + trySetCancelled(); // pool is shutdown + } + pool = null; // reduce memory retention + runnable = null; + callable = null; + return true; + } + + public final boolean cancel(boolean mayInterruptIfRunning) { + int s; ForkJoinPool p; DelayScheduler ds; + if ((s = trySetCancelled()) < 0) + return ((s & (ABNORMAL | THROWN)) == ABNORMAL); + if ((p = pool) != null && + !interruptIfRunning(mayInterruptIfRunning)) { + pool = null; + runnable = null; + callable = null; + if (heapIndex >= 0 && nextPending == null && + (ds = p.delayScheduler) != null) + ds.pend(this); // for heap cleanup + } + return true; + } + + + // ScheduledFuture methods + public final long getDelay(TimeUnit unit) { + return unit.convert(when - DelayScheduler.now(), NANOSECONDS); + } + public int compareTo(Delayed other) { // never used internally + long diff = (other instanceof ScheduledForkJoinTask t) ? + when - t.when : // avoid nanoTime calls and conversions + getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS); + return (diff < 0) ? -1 : (diff > 0) ? 1 : 0; + } + } + +} + diff --git a/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java b/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java index 61e9f0e3ffa..ec46f61291d 100644 --- a/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java +++ b/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java @@ -42,6 +42,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Objects; +import java.util.function.Consumer; import java.util.function.Predicate; import java.util.concurrent.CountDownLatch; import java.util.concurrent.locks.LockSupport; @@ -50,6 +51,7 @@ import jdk.internal.access.JavaUtilConcurrentFJPAccess; import jdk.internal.access.SharedSecrets; import jdk.internal.misc.Unsafe; import jdk.internal.vm.SharedThreadContainer; +import static java.util.concurrent.DelayScheduler.ScheduledForkJoinTask; /** * An {@link ExecutorService} for running {@link ForkJoinTask}s. @@ -133,11 +135,34 @@ import jdk.internal.vm.SharedThreadContainer; * * * + *

    Additionally, this class supports {@link + * ScheduledExecutorService} methods to delay or periodically execute + * tasks, as well as method {@link #submitWithTimeout} to cancel tasks + * that take too long. The scheduled functions or actions may create + * and invoke other {@linkplain ForkJoinTask ForkJoinTasks}. Delayed + * actions become enabled and behave as ordinary submitted + * tasks when their delays elapse. Scheduling methods return + * {@linkplain ForkJoinTask ForkJoinTasks} that implement the {@link + * ScheduledFuture} interface. Resource exhaustion encountered after + * initial submission results in task cancellation. When time-based + * methods are used, shutdown policies match the default policies of + * class {@link ScheduledThreadPoolExecutor}: upon {@link #shutdown}, + * existing periodic tasks will not re-execute, and the pool + * terminates when quiescent and existing delayed tasks + * complete. Method {@link #cancelDelayedTasksOnShutdown} may be used + * to disable all delayed tasks upon shutdown, and method {@link + * #shutdownNow} may be used to instead unconditionally initiate pool + * termination. Monitoring methods such as {@link #getQueuedTaskCount} + * do not include scheduled tasks that are not yet enabled to execute, + * which are reported separately by method {@link + * #getDelayedTaskCount}. + * *

    The parameters used to construct the common pool may be controlled by * setting the following {@linkplain System#getProperty system properties}: *

      *
    • {@systemProperty java.util.concurrent.ForkJoinPool.common.parallelism} - * - the parallelism level, a non-negative integer + * - the parallelism level, a non-negative integer. Usage is discouraged. + * Use {@link #setParallelism} instead. *
    • {@systemProperty java.util.concurrent.ForkJoinPool.common.threadFactory} * - the class name of a {@link ForkJoinWorkerThreadFactory}. * The {@linkplain ClassLoader#getSystemClassLoader() system class loader} @@ -155,10 +180,11 @@ import jdk.internal.vm.SharedThreadContainer; * {@linkplain Thread#getContextClassLoader() thread context class loader}. * * Upon any error in establishing these settings, default parameters - * are used. It is possible to disable or limit the use of threads in - * the common pool by setting the parallelism property to zero, and/or - * using a factory that may return {@code null}. However doing so may - * cause unjoined tasks to never be executed. + * are used. It is possible to disable use of threads by using a + * factory that may return {@code null}, in which case some tasks may + * never execute. While possible, it is strongly discouraged to set + * the parallelism property to zero, which may be internally + * overridden in the presence of intrinsically async tasks. * * @implNote This implementation restricts the maximum number of * running threads to 32767. Attempts to create pools with greater @@ -171,7 +197,8 @@ import jdk.internal.vm.SharedThreadContainer; * @since 1.7 * @author Doug Lea */ -public class ForkJoinPool extends AbstractExecutorService { +public class ForkJoinPool extends AbstractExecutorService + implements ScheduledExecutorService { /* * Implementation Overview @@ -215,6 +242,7 @@ public class ForkJoinPool extends AbstractExecutorService { * 3. Completion-based tasks (mainly CountedCompleters) * 4. CommonPool and parallelStream support * 5. InterruptibleTasks for externally submitted tasks + * 6. Support ScheduledExecutorService methods * * Most changes involve adaptions of base algorithms using * combinations of static and dynamic bitwise mode settings (both @@ -359,18 +387,18 @@ public class ForkJoinPool extends AbstractExecutorService { * WorkQueues are also used in a similar way for tasks submitted * to the pool. We cannot mix these tasks in the same queues used * by workers. Instead, we randomly associate submission queues - * with submitting threads, using a form of hashing. The - * ThreadLocalRandom probe value serves as a hash code for - * choosing existing queues, and may be randomly repositioned upon - * contention with other submitters. In essence, submitters act - * like workers except that they are restricted to executing local - * tasks that they submitted (or when known, subtasks thereof). - * Insertion of tasks in shared mode requires a lock. We use only - * a simple spinlock (as one role of field "phase") because - * submitters encountering a busy queue move to a different - * position to use or create other queues. They (spin) block when - * registering new queues, or indirectly elsewhere, by revisiting - * later. + * with submitting threads (or carriers when using VirtualThreads) + * using a form of hashing. The ThreadLocalRandom probe value + * serves as a hash code for choosing existing queues, and may be + * randomly repositioned upon contention with other submitters. + * In essence, submitters act like workers except that they are + * restricted to executing local tasks that they submitted (or + * when known, subtasks thereof). Insertion of tasks in shared + * mode requires a lock. We use only a simple spinlock (as one + * role of field "phase") because submitters encountering a busy + * queue move to a different position to use or create other + * queues. They (spin) block when registering new queues, or + * indirectly elsewhere, by revisiting later. * * Management * ========== @@ -826,7 +854,13 @@ public class ForkJoinPool extends AbstractExecutorService { * fashion or use CountedCompleters (as is true for jdk * parallelStreams). Support infiltrates several methods, * including those that retry helping steps until we are sure that - * none apply if there are no workers. + * none apply if there are no workers. To deal with conflicting + * requirements, uses of the commonPool that require async because + * caller-runs need not apply, ensure threads are enabled (by + * setting parallelism) via method asyncCommonPool before + * proceeding. (In principle, overriding zero parallelism needs to + * ensure at least one worker, but due to other backward + * compatibility contraints, ensures two.) * * As a more appropriate default in managed environments, unless * overridden by system properties, we use workers of subclass @@ -851,7 +885,8 @@ public class ForkJoinPool extends AbstractExecutorService { * To comply with ExecutorService specs, we use subclasses of * abstract class InterruptibleTask for tasks that require * stronger interruption and cancellation guarantees. External - * submitters never run these tasks, even if in the common pool. + * submitters never run these tasks, even if in the common pool + * (as indicated by ForkJoinTask.noUserHelp status bit). * InterruptibleTasks include a "runner" field (implemented * similarly to FutureTask) to support cancel(true). Upon pool * shutdown, runners are interrupted so they can cancel. Since @@ -880,6 +915,27 @@ public class ForkJoinPool extends AbstractExecutorService { * writing, virtual thread bodies are by default run as some form * of InterruptibleTask. * + * DelayScheduler + * ================ + * + * This class supports ScheduledExecutorService methods by + * creating and starting a DelayScheduler on first use of these + * methods (via startDelayScheduler). The scheduler operates + * independently in its own thread, relaying tasks to the pool to + * execute when their delays elapse (see method + * executeEnabledScheduledTask). The only other interactions with + * the delayScheduler are to control shutdown and maintain + * shutdown-related policies in methods quiescent() and + * tryTerminate(). In particular, processing must deal with cases + * in which tasks are submitted before shutdown, but not enabled + * until afterwards, in which case they must bypass some screening + * to be allowed to run. Conversely, the DelayScheduler checks + * runState status and when enabled, completes termination, using + * only methods shutdownStatus and tryStopIfShutdown. All of these + * methods are final and have signatures referencing + * DelaySchedulers, so cannot conflict with those of any existing + * FJP subclasses. + * * Memory placement * ================ * @@ -950,7 +1006,9 @@ public class ForkJoinPool extends AbstractExecutorService { * Nearly all explicit checks lead to bypass/return, not exception * throws, because they may legitimately arise during shutdown. A * few unusual loop constructions encourage (with varying - * effectiveness) JVMs about where (not) to place safepoints. + * effectiveness) JVMs about where (not) to place safepoints. All + * public methods screen arguments (mainly null checks) before + * creating or executing tasks. * * There is a lot of representation-level coupling among classes * ForkJoinPool, ForkJoinWorkerThread, and ForkJoinTask. The @@ -1045,6 +1103,7 @@ public class ForkJoinPool extends AbstractExecutorService { static final int DROPPED = 1 << 16; // removed from ctl counts static final int UNCOMPENSATE = 1 << 16; // tryCompensate return static final int IDLE = 1 << 16; // phase seqlock/version count + static final int MIN_QUEUES_SIZE = 1 << 4; // ensure external slots /* * Bits and masks for ctl and bounds are packed with 4 16 bit subfields: @@ -1214,7 +1273,7 @@ public class ForkJoinPool extends AbstractExecutorService { /** * Pushes a task. Called only by owner or if already locked * - * @param task the task. Caller must ensure non-null. + * @param task the task; no-op if null * @param pool the pool to signal if was previously empty, else null * @param internal if caller owns this queue * @throws RejectedExecutionException if array could not be resized @@ -1222,7 +1281,9 @@ public class ForkJoinPool extends AbstractExecutorService { final void push(ForkJoinTask task, ForkJoinPool pool, boolean internal) { int s = top, b = base, m, cap, room; ForkJoinTask[] a; - if ((a = array) != null && (cap = a.length) > 0) { // else disabled + if ((a = array) != null && (cap = a.length) > 0 && // else disabled + task != null) { + int pk = task.noUserHelp() + 1; // prev slot offset if ((room = (m = cap - 1) - (s - b)) >= 0) { top = s + 1; long pos = slotOffset(m & s); @@ -1237,10 +1298,7 @@ public class ForkJoinPool extends AbstractExecutorService { unlockPhase(); if (room < 0) throw new RejectedExecutionException("Queue capacity exceeded"); - if ((room == 0 || // pad for InterruptibleTasks - a[m & (s - ((internal || task == null || - task.getClass().getSuperclass() != - interruptibleTaskClass) ? 1 : 2))] == null) && + if ((room == 0 || a[m & (s - pk)] == null) && pool != null) pool.signalWork(); // may have appeared empty } @@ -1579,11 +1637,6 @@ public class ForkJoinPool extends AbstractExecutorService { */ static volatile RuntimePermission modifyThreadPermission; - /** - * Cached for faster type tests. - */ - static final Class interruptibleTaskClass; - /** * For VirtualThread intrinsics */ @@ -1596,12 +1649,15 @@ public class ForkJoinPool extends AbstractExecutorService { final UncaughtExceptionHandler ueh; // per-worker UEH final SharedThreadContainer container; final String workerNamePrefix; // null for common pool + final String poolName; + volatile DelayScheduler delayScheduler; // lazily constructed WorkQueue[] queues; // main registry volatile long runState; // versioned, lockable final long keepAlive; // milliseconds before dropping if idle final long config; // static configuration bits volatile long stealCount; // collects worker nsteals volatile long threadIds; // for worker thread names + @jdk.internal.vm.annotation.Contended("fjpctl") // segregate volatile long ctl; // main pool control @jdk.internal.vm.annotation.Contended("fjpctl") // colocate @@ -1885,6 +1941,7 @@ public class ForkJoinPool extends AbstractExecutorService { long phaseSum = 0L; boolean swept = false; for (long e, prevRunState = 0L; ; prevRunState = e) { + DelayScheduler ds; long c = ctl; if (((e = runState) & STOP) != 0L) return 1; // terminating @@ -1907,6 +1964,8 @@ public class ForkJoinPool extends AbstractExecutorService { } else if ((e & SHUTDOWN) == 0) return 0; + else if ((ds = delayScheduler) != null && !ds.canShutDown()) + return 0; else if (compareAndSetCtl(c, c) && casRunState(e, e | STOP)) return 1; // enable termination else @@ -1957,15 +2016,13 @@ public class ForkJoinPool extends AbstractExecutorService { } else { boolean propagate; - int nb = q.base = b + 1; + int nb = q.base = b + 1, prevSrc = src; w.nsteals = ++nsteals; - w.source = j; // volatile + w.source = src = j; // volatile rescan = true; + int nh = t.noUserHelp(); if (propagate = - ((src != (src = j) || - t.getClass().getSuperclass() == - interruptibleTaskClass) && - a[nb & m] != null)) + (prevSrc != src || nh != 0) && a[nb & m] != null) signalWork(); w.topLevelExec(t, fifo); if ((b = q.base) != nb && !propagate) @@ -2004,8 +2061,8 @@ public class ForkJoinPool extends AbstractExecutorService { ((e & SHUTDOWN) != 0L && ac == 0 && quiescent() > 0) || (qs = queues) == null || (n = qs.length) <= 0) return IDLE; // terminating - int prechecks = Math.min(ac, 2); // reactivation threshold - for (int k = Math.max(n << 2, SPIN_WAITS << 1);;) { + int k = Math.max(n << 2, SPIN_WAITS << 1); + for (int prechecks = k / n;;) { // reactivation threshold WorkQueue q; int cap; ForkJoinTask[] a; long c; if (w.phase == activePhase) return activePhase; @@ -2014,7 +2071,7 @@ public class ForkJoinPool extends AbstractExecutorService { if ((q = qs[k & (n - 1)]) == null) Thread.onSpinWait(); else if ((a = q.array) != null && (cap = a.length) > 0 && - a[q.base & (cap - 1)] != null && --prechecks < 0 && + a[q.base & (cap - 1)] != null && --prechecks <= 0 && (int)(c = ctl) == activePhase && compareAndSetCtl(c, (sp & LMASK) | ((c + RC_UNIT) & UMASK))) return w.phase = activePhase; // reactivate @@ -2513,37 +2570,45 @@ public class ForkJoinPool extends AbstractExecutorService { * Finds and locks a WorkQueue for an external submitter, or * throws RejectedExecutionException if shutdown or terminating. * @param r current ThreadLocalRandom.getProbe() value - * @param isSubmit false if this is for a common pool fork + * @param rejectOnShutdown true if RejectedExecutionException + * should be thrown when shutdown (else only if terminating) */ - private WorkQueue submissionQueue(int r) { - if (r == 0) { + private WorkQueue submissionQueue(int r, boolean rejectOnShutdown) { + int reuse; // nonzero if prefer create + if ((reuse = r) == 0) { ThreadLocalRandom.localInit(); // initialize caller's probe r = ThreadLocalRandom.getProbe(); } - for (;;) { - int n, i, id; WorkQueue[] qs; WorkQueue q, w = null; + for (int probes = 0; ; ++probes) { + int n, i, id; WorkQueue[] qs; WorkQueue q; if ((qs = queues) == null) break; if ((n = qs.length) <= 0) break; if ((q = qs[i = (id = r & EXTERNAL_ID_MASK) & (n - 1)]) == null) { - if (w == null) - w = new WorkQueue(null, id, 0, false); + WorkQueue w = new WorkQueue(null, id, 0, false); w.phase = id; - long isShutdown = lockRunState() & SHUTDOWN; - if (isShutdown == 0L && queues == qs && qs[i] == null) { - q = qs[i] = w; // else retry - w = null; - } + boolean reject = ((lockRunState() & SHUTDOWN) != 0 && + rejectOnShutdown); + if (!reject && queues == qs && qs[i] == null) + q = qs[i] = w; // else lost race to install unlockRunState(); if (q != null) return q; - if (isShutdown != 0L) + if (reject) break; + reuse = 0; } - else if (!q.tryLockPhase()) // move index + if (reuse == 0 || !q.tryLockPhase()) { // move index + if (reuse == 0) { + if (probes >= n >> 1) + reuse = r; // stop prefering free slot + } + else if (q != null) + reuse = 0; // probe on collision r = ThreadLocalRandom.advanceProbe(r); - else if ((runState & SHUTDOWN) != 0L) { + } + else if (rejectOnShutdown && (runState & SHUTDOWN) != 0L) { q.unlockPhase(); // check while q lock held break; } @@ -2553,7 +2618,7 @@ public class ForkJoinPool extends AbstractExecutorService { throw new RejectedExecutionException(); } - private void poolSubmit(boolean signalIfEmpty, ForkJoinTask task) { + private ForkJoinTask poolSubmit(boolean signalIfEmpty, ForkJoinTask task) { Thread t; ForkJoinWorkerThread wt; WorkQueue q; boolean internal; if (((t = JLA.currentCarrierThread()) instanceof ForkJoinWorkerThread) && (wt = (ForkJoinWorkerThread)t).pool == this) { @@ -2562,21 +2627,22 @@ public class ForkJoinPool extends AbstractExecutorService { } else { // find and lock queue internal = false; - q = submissionQueue(ThreadLocalRandom.getProbe()); + q = submissionQueue(ThreadLocalRandom.getProbe(), true); } q.push(task, signalIfEmpty ? this : null, internal); + return task; } /** * Returns queue for an external submission, bypassing call to * submissionQueue if already established and unlocked. */ - final WorkQueue externalSubmissionQueue() { + final WorkQueue externalSubmissionQueue(boolean rejectOnShutdown) { WorkQueue[] qs; WorkQueue q; int n; int r = ThreadLocalRandom.getProbe(); return (((qs = queues) != null && (n = qs.length) > 0 && (q = qs[r & EXTERNAL_ID_MASK & (n - 1)]) != null && r != 0 && - q.tryLockPhase()) ? q : submissionQueue(r)); + q.tryLockPhase()) ? q : submissionQueue(r, rejectOnShutdown)); } /** @@ -2700,14 +2766,20 @@ public class ForkJoinPool extends AbstractExecutorService { } } else if ((isShutdown = (e & SHUTDOWN)) != 0L || enable) { + long quiet; DelayScheduler ds; if (isShutdown == 0L) getAndBitwiseOrRunState(SHUTDOWN); - if (quiescent() > 0) + if ((quiet = quiescent()) > 0) now = true; + else if (quiet == 0 && (ds = delayScheduler) != null) + ds.signal(); } if (now) { + DelayScheduler ds; releaseWaiters(); + if ((ds = delayScheduler) != null) + ds.signal(); for (;;) { if (((e = runState) & CLEANED) == 0L) { boolean clean = cleanQueues(); @@ -2718,6 +2790,8 @@ public class ForkJoinPool extends AbstractExecutorService { break; if (ctl != 0L) // else loop if didn't finish cleaning break; + if ((ds = delayScheduler) != null && ds.signal() >= 0) + break; if ((e & CLEANED) != 0L) { e |= TERMINATED; if ((getAndBitwiseOrRunState(TERMINATED) & TERMINATED) == 0L) { @@ -2888,12 +2962,9 @@ public class ForkJoinPool extends AbstractExecutorService { * event-style asynchronous tasks. For default value, use {@code * false}. * - * @param corePoolSize the number of threads to keep in the pool - * (unless timed out after an elapsed keep-alive). Normally (and - * by default) this is the same value as the parallelism level, - * but may be set to a larger value to reduce dynamic overhead if - * tasks regularly block. Using a smaller value (for example - * {@code 0}) has the same effect as the default. + * @param corePoolSize ignored: used in previous releases of this + * class but no longer applicable. Using {@code 0} maintains + * compatibility across releases. * * @param maximumPoolSize the maximum number of threads allowed. * When the maximum is reached, attempts to replace blocked @@ -2957,7 +3028,8 @@ public class ForkJoinPool extends AbstractExecutorService { throw new IllegalArgumentException(); if (factory == null || unit == null) throw new NullPointerException(); - int size = 1 << (33 - Integer.numberOfLeadingZeros(p - 1)); + int size = Math.max(MIN_QUEUES_SIZE, + 1 << (33 - Integer.numberOfLeadingZeros(p - 1))); this.parallelism = p; this.factory = factory; this.ueh = handler; @@ -2971,6 +3043,7 @@ public class ForkJoinPool extends AbstractExecutorService { this.queues = new WorkQueue[size]; String pid = Integer.toString(getAndAddPoolIds(1) + 1); String name = "ForkJoinPool-" + pid; + this.poolName = name; this.workerNamePrefix = name + "-worker-"; this.container = SharedThreadContainer.create(name); } @@ -2980,6 +3053,7 @@ public class ForkJoinPool extends AbstractExecutorService { * overridden by system properties */ private ForkJoinPool(byte forCommonPoolOnly) { + String name = "ForkJoinPool.commonPool"; ForkJoinWorkerThreadFactory fac = defaultForkJoinWorkerThreadFactory; UncaughtExceptionHandler handler = null; int maxSpares = DEFAULT_COMMON_MAX_SPARES; @@ -3013,7 +3087,9 @@ public class ForkJoinPool extends AbstractExecutorService { if (preset == 0) pc = Math.max(1, Runtime.getRuntime().availableProcessors() - 1); int p = Math.min(pc, MAX_CAP); - int size = (p == 0) ? 1 : 1 << (33 - Integer.numberOfLeadingZeros(p-1)); + int size = Math.max(MIN_QUEUES_SIZE, + (p == 0) ? 1 : + 1 << (33 - Integer.numberOfLeadingZeros(p-1))); this.parallelism = p; this.config = ((preset & LMASK) | (((long)maxSpares) << TC_SHIFT) | (1L << RC_SHIFT)); @@ -3022,8 +3098,9 @@ public class ForkJoinPool extends AbstractExecutorService { this.keepAlive = DEFAULT_KEEPALIVE; this.saturate = null; this.workerNamePrefix = null; + this.poolName = name; this.queues = new WorkQueue[size]; - this.container = SharedThreadContainer.create("ForkJoinPool.commonPool"); + this.container = SharedThreadContainer.create(name); } /** @@ -3044,6 +3121,16 @@ public class ForkJoinPool extends AbstractExecutorService { return common; } + /** + * Package-private access to commonPool overriding zero parallelism + */ + static ForkJoinPool asyncCommonPool() { + ForkJoinPool cp; int p; + if ((p = (cp = common).parallelism) == 0) + U.compareAndSetInt(cp, PARALLELISM, 0, 2); + return cp; + } + // Execution methods /** @@ -3064,8 +3151,7 @@ public class ForkJoinPool extends AbstractExecutorService { * scheduled for execution */ public T invoke(ForkJoinTask task) { - Objects.requireNonNull(task); - poolSubmit(true, task); + poolSubmit(true, Objects.requireNonNull(task)); try { return task.join(); } catch (RuntimeException | Error unchecked) { @@ -3084,8 +3170,7 @@ public class ForkJoinPool extends AbstractExecutorService { * scheduled for execution */ public void execute(ForkJoinTask task) { - Objects.requireNonNull(task); - poolSubmit(true, task); + poolSubmit(true, Objects.requireNonNull(task)); } // AbstractExecutorService methods @@ -3098,7 +3183,7 @@ public class ForkJoinPool extends AbstractExecutorService { @Override @SuppressWarnings("unchecked") public void execute(Runnable task) { - poolSubmit(true, (task instanceof ForkJoinTask) + poolSubmit(true, (Objects.requireNonNull(task) instanceof ForkJoinTask) ? (ForkJoinTask) task // avoid re-wrap : new ForkJoinTask.RunnableExecuteAction(task)); } @@ -3118,9 +3203,7 @@ public class ForkJoinPool extends AbstractExecutorService { * scheduled for execution */ public ForkJoinTask submit(ForkJoinTask task) { - Objects.requireNonNull(task); - poolSubmit(true, task); - return task; + return poolSubmit(true, Objects.requireNonNull(task)); } /** @@ -3130,12 +3213,12 @@ public class ForkJoinPool extends AbstractExecutorService { */ @Override public ForkJoinTask submit(Callable task) { - ForkJoinTask t = + Objects.requireNonNull(task); + return poolSubmit( + true, (Thread.currentThread() instanceof ForkJoinWorkerThread) ? new ForkJoinTask.AdaptedCallable(task) : - new ForkJoinTask.AdaptedInterruptibleCallable(task); - poolSubmit(true, t); - return t; + new ForkJoinTask.AdaptedInterruptibleCallable(task)); } /** @@ -3145,12 +3228,12 @@ public class ForkJoinPool extends AbstractExecutorService { */ @Override public ForkJoinTask submit(Runnable task, T result) { - ForkJoinTask t = + Objects.requireNonNull(task); + return poolSubmit( + true, (Thread.currentThread() instanceof ForkJoinWorkerThread) ? new ForkJoinTask.AdaptedRunnable(task, result) : - new ForkJoinTask.AdaptedInterruptibleRunnable(task, result); - poolSubmit(true, t); - return t; + new ForkJoinTask.AdaptedInterruptibleRunnable(task, result)); } /** @@ -3161,13 +3244,14 @@ public class ForkJoinPool extends AbstractExecutorService { @Override @SuppressWarnings("unchecked") public ForkJoinTask submit(Runnable task) { - ForkJoinTask f = (task instanceof ForkJoinTask) ? + Objects.requireNonNull(task); + return poolSubmit( + true, + (task instanceof ForkJoinTask) ? (ForkJoinTask) task : // avoid re-wrap ((Thread.currentThread() instanceof ForkJoinWorkerThread) ? new ForkJoinTask.AdaptedRunnable(task, null) : - new ForkJoinTask.AdaptedInterruptibleRunnable(task, null)); - poolSubmit(true, f); - return f; + new ForkJoinTask.AdaptedInterruptibleRunnable(task, null))); } /** @@ -3189,7 +3273,7 @@ public class ForkJoinPool extends AbstractExecutorService { */ public ForkJoinTask externalSubmit(ForkJoinTask task) { Objects.requireNonNull(task); - externalSubmissionQueue().push(task, this, false); + externalSubmissionQueue(true).push(task, this, false); return task; } @@ -3210,9 +3294,7 @@ public class ForkJoinPool extends AbstractExecutorService { * @since 19 */ public ForkJoinTask lazySubmit(ForkJoinTask task) { - Objects.requireNonNull(task); - poolSubmit(false, task); - return task; + return poolSubmit(false, Objects.requireNonNull(task)); } /** @@ -3350,6 +3432,322 @@ public class ForkJoinPool extends AbstractExecutorService { .invokeAny(tasks, this, true, unit.toNanos(timeout)); } + // Support for delayed tasks + + /** + * Returns STOP and SHUTDOWN status (zero if neither), masking or + * truncating out other bits. + */ + final int shutdownStatus(DelayScheduler ds) { + return (int)(runState & (SHUTDOWN | STOP)); + } + + /** + * Tries to stop and possibly terminate if already enabled, return success. + */ + final boolean tryStopIfShutdown(DelayScheduler ds) { + return (tryTerminate(false, false) & STOP) != 0L; + } + + /** + * Creates and starts DelayScheduler + */ + private DelayScheduler startDelayScheduler() { + DelayScheduler ds; + if ((ds = delayScheduler) == null) { + boolean start = false; + String name = poolName + "-delayScheduler"; + if (workerNamePrefix == null) + asyncCommonPool(); // override common parallelism zero + lockRunState(); + try { + if ((ds = delayScheduler) == null) { + ds = delayScheduler = new DelayScheduler(this, name); + start = true; + } + } finally { + unlockRunState(); + } + if (start) { // start outside of lock + // exceptions on start passed to (external) callers + SharedThreadContainer ctr; + if ((ctr = container) != null) + ctr.start(ds); + else + ds.start(); + } + } + return ds; + } + + /** + * Arranges execution of a ScheduledForkJoinTask whose delay has + * elapsed + */ + final void executeEnabledScheduledTask(ScheduledForkJoinTask task) { + externalSubmissionQueue(false).push(task, this, false); + } + + /** + * Arranges delayed execution of a ScheduledForkJoinTask via the + * DelayScheduler, creating and starting it if necessary. + * @return the task + */ + final ScheduledForkJoinTask scheduleDelayedTask(ScheduledForkJoinTask task) { + DelayScheduler ds; + if (((ds = delayScheduler) == null && + (ds = startDelayScheduler()) == null) || + (runState & SHUTDOWN) != 0L) + throw new RejectedExecutionException(); + ds.pend(task); + return task; + } + + /** + * Submits a one-shot task that becomes enabled after the given + * delay. At that point it will execute unless explicitly + * cancelled, or fail to execute (eventually reporting + * cancellation) when encountering resource exhaustion, or the + * pool is {@link #shutdownNow}, or is {@link #shutdown} when + * otherwise quiescent and {@link #cancelDelayedTasksOnShutdown} + * is in effect. + * + * @param command the task to execute + * @param delay the time from now to delay execution + * @param unit the time unit of the delay parameter + * @return a ForkJoinTask implementing the ScheduledFuture + * interface, whose {@code get()} method will return + * {@code null} upon normal completion. + * @throws RejectedExecutionException if the pool is shutdown or + * submission encounters resource exhaustion. + * @throws NullPointerException if command or unit is null + * @since 25 + */ + public ScheduledFuture schedule(Runnable command, + long delay, TimeUnit unit) { + return scheduleDelayedTask( + new ScheduledForkJoinTask( + unit.toNanos(delay), 0L, false, // implicit null check of unit + Objects.requireNonNull(command), null, this)); + } + + /** + * Submits a value-returning one-shot task that becomes enabled + * after the given delay. At that point it will execute unless + * explicitly cancelled, or fail to execute (eventually reporting + * cancellation) when encountering resource exhaustion, or the + * pool is {@link #shutdownNow}, or is {@link #shutdown} when + * otherwise quiescent and {@link #cancelDelayedTasksOnShutdown} + * is in effect. + * + * @param callable the function to execute + * @param delay the time from now to delay execution + * @param unit the time unit of the delay parameter + * @param the type of the callable's result + * @return a ForkJoinTask implementing the ScheduledFuture + * interface, whose {@code get()} method will return the + * value from the callable upon normal completion. + * @throws RejectedExecutionException if the pool is shutdown or + * submission encounters resource exhaustion. + * @throws NullPointerException if command or unit is null + * @since 25 + */ + public ScheduledFuture schedule(Callable callable, + long delay, TimeUnit unit) { + return scheduleDelayedTask( + new ScheduledForkJoinTask( + unit.toNanos(delay), 0L, false, null, // implicit null check of unit + Objects.requireNonNull(callable), this)); + } + + /** + * Submits a periodic action that becomes enabled first after the + * given initial delay, and subsequently with the given period; + * that is, executions will commence after + * {@code initialDelay}, then {@code initialDelay + period}, then + * {@code initialDelay + 2 * period}, and so on. + * + *

      The sequence of task executions continues indefinitely until + * one of the following exceptional completions occur: + *

        + *
      • The task is {@linkplain Future#cancel explicitly cancelled} + *
      • Method {@link #shutdownNow} is called + *
      • Method {@link #shutdown} is called and the pool is + * otherwise quiescent, in which case existing executions continue + * but subsequent executions do not. + *
      • An execution or the task encounters resource exhaustion. + *
      • An execution of the task throws an exception. In this case + * calling {@link Future#get() get} on the returned future will throw + * {@link ExecutionException}, holding the exception as its cause. + *
      + * Subsequent executions are suppressed. Subsequent calls to + * {@link Future#isDone isDone()} on the returned future will + * return {@code true}. + * + *

      If any execution of this task takes longer than its period, then + * subsequent executions may start late, but will not concurrently + * execute. + * @param command the task to execute + * @param initialDelay the time to delay first execution + * @param period the period between successive executions + * @param unit the time unit of the initialDelay and period parameters + * @return a ForkJoinTask implementing the ScheduledFuture + * interface. The future's {@link Future#get() get()} + * method will never return normally, and will throw an + * exception upon task cancellation or abnormal + * termination of a task execution. + * @throws RejectedExecutionException if the pool is shutdown or + * submission encounters resource exhaustion. + * @throws NullPointerException if command or unit is null + * @throws IllegalArgumentException if period less than or equal to zero + * @since 25 + */ + public ScheduledFuture scheduleAtFixedRate(Runnable command, + long initialDelay, + long period, TimeUnit unit) { + if (period <= 0L) + throw new IllegalArgumentException(); + return scheduleDelayedTask( + new ScheduledForkJoinTask( + unit.toNanos(initialDelay), // implicit null check of unit + unit.toNanos(period), false, + Objects.requireNonNull(command), null, this)); + } + + /** + * Submits a periodic action that becomes enabled first after the + * given initial delay, and subsequently with the given delay + * between the termination of one execution and the commencement of + * the next. + *

      The sequence of task executions continues indefinitely until + * one of the following exceptional completions occur: + *

        + *
      • The task is {@linkplain Future#cancel explicitly cancelled} + *
      • Method {@link #shutdownNow} is called + *
      • Method {@link #shutdown} is called and the pool is + * otherwise quiescent, in which case existing executions continue + * but subsequent executions do not. + *
      • An execution or the task encounters resource exhaustion. + *
      • An execution of the task throws an exception. In this case + * calling {@link Future#get() get} on the returned future will throw + * {@link ExecutionException}, holding the exception as its cause. + *
      + * Subsequent executions are suppressed. Subsequent calls to + * {@link Future#isDone isDone()} on the returned future will + * return {@code true}. + * @param command the task to execute + * @param initialDelay the time to delay first execution + * @param delay the delay between the termination of one + * execution and the commencement of the next + * @param unit the time unit of the initialDelay and delay parameters + * @return a ForkJoinTask implementing the ScheduledFuture + * interface. The future's {@link Future#get() get()} + * method will never return normally, and will throw an + * exception upon task cancellation or abnormal + * termination of a task execution. + * @throws RejectedExecutionException if the pool is shutdown or + * submission encounters resource exhaustion. + * @throws NullPointerException if command or unit is null + * @throws IllegalArgumentException if delay less than or equal to zero + * @since 25 + */ + public ScheduledFuture scheduleWithFixedDelay(Runnable command, + long initialDelay, + long delay, TimeUnit unit) { + if (delay <= 0L) + throw new IllegalArgumentException(); + return scheduleDelayedTask( + new ScheduledForkJoinTask( + unit.toNanos(initialDelay), // implicit null check of unit + -unit.toNanos(delay), false, // negative for fixed delay + Objects.requireNonNull(command), null, this)); + } + + /** + * Body of a task performed on timeout of another task + */ + static final class TimeoutAction implements Runnable { + // set after construction, nulled after use + ForkJoinTask.CallableWithTimeout task; + Consumer> action; + TimeoutAction(Consumer> action) { + this.action = action; + } + public void run() { + ForkJoinTask.CallableWithTimeout t = task; + Consumer> a = action; + task = null; + action = null; + if (t != null && t.status >= 0) { + if (a == null) + t.cancel(true); + else { + a.accept(t); + t.interruptIfRunning(true); + } + } + } + } + + /** + * Submits a task executing the given function, cancelling the + * task or performing a given timeoutAction if not completed + * within the given timeout period. If the optional {@code + * timeoutAction} is null, the task is cancelled (via {@code + * cancel(true)}. Otherwise, the action is applied and the task + * may be interrupted if running. Actions may include {@link + * ForkJoinTask#complete} to set a replacement value or {@link + * ForkJoinTask#completeExceptionally} to throw an appropriate + * exception. Note that these can succeed only if the task has + * not already completed when the timeoutAction executes. + * + * @param callable the function to execute + * @param the type of the callable's result + * @param timeout the time to wait before cancelling if not completed + * @param timeoutAction if nonnull, an action to perform on + * timeout, otherwise the default action is to cancel using + * {@code cancel(true)}. + * @param unit the time unit of the timeout parameter + * @return a Future that can be used to extract result or cancel + * @throws RejectedExecutionException if the task cannot be + * scheduled for execution + * @throws NullPointerException if callable or unit is null + * @since 25 + */ + public ForkJoinTask submitWithTimeout(Callable callable, + long timeout, TimeUnit unit, + Consumer> timeoutAction) { + ForkJoinTask.CallableWithTimeout task; TimeoutAction onTimeout; + Objects.requireNonNull(callable); + ScheduledForkJoinTask timeoutTask = + new ScheduledForkJoinTask( + unit.toNanos(timeout), 0L, true, + onTimeout = new TimeoutAction(timeoutAction), null, this); + onTimeout.task = task = + new ForkJoinTask.CallableWithTimeout(callable, timeoutTask); + scheduleDelayedTask(timeoutTask); + return poolSubmit(true, task); + } + + /** + * Arranges that scheduled tasks that are not executing and have + * not already been enabled for execution will not be executed and + * will be cancelled upon {@link #shutdown} (unless this pool is + * the {@link #commonPool()} which never shuts down). This method + * may be invoked either before {@link #shutdown} to take effect + * upon the next call, or afterwards to cancel such tasks, which + * may then allow termination. Note that subsequent executions of + * periodic tasks are always disabled upon shutdown, so this + * method applies meaningfully only to non-periodic tasks. + * @since 25 + */ + public void cancelDelayedTasksOnShutdown() { + DelayScheduler ds; + if ((ds = delayScheduler) != null || + (ds = startDelayScheduler()) != null) + ds.cancelDelayedTasksOnShutdown(); + } + /** * Returns the factory used for constructing new workers. * @@ -3485,14 +3883,16 @@ public class ForkJoinPool extends AbstractExecutorService { * to the pool that have not begun executing). This value is only * an approximation, obtained by iterating across all threads in * the pool. This method may be useful for tuning task - * granularities. + * granularities.The returned count does not include scheduled + * tasks that are not yet ready to execute, which are reported + * separately by method {@link getDelayedTaskCount}. * * @return the number of queued tasks * @see ForkJoinWorkerThread#getQueuedTaskCount() */ public long getQueuedTaskCount() { WorkQueue[] qs; WorkQueue q; - int count = 0; + long count = 0; if ((runState & TERMINATED) == 0L && (qs = queues) != null) { for (int i = 1; i < qs.length; i += 2) { if ((q = qs[i]) != null) @@ -3521,6 +3921,20 @@ public class ForkJoinPool extends AbstractExecutorService { return count; } + /** + * Returns an estimate of the number of delayed (including + * periodic) tasks scheduled in this pool that are not yet ready + * to submit for execution. The returned value is inaccurate while + * delayed tasks are being processed. + * + * @return an estimate of the number of delayed tasks + * @since 25 + */ + public long getDelayedTaskCount() { + DelayScheduler ds; + return ((ds = delayScheduler) == null ? 0 : ds.lastStableSize()); + } + /** * Returns {@code true} if there are any tasks submitted to this * pool that have not yet begun executing. @@ -3584,6 +3998,7 @@ public class ForkJoinPool extends AbstractExecutorService { */ public String toString() { // Use a single pass through queues to collect counts + DelayScheduler ds; long e = runState; long st = stealCount; long qt = 0L, ss = 0L; int rc = 0; @@ -3603,7 +4018,8 @@ public class ForkJoinPool extends AbstractExecutorService { } } } - + String delayed = ((ds = delayScheduler) == null ? "" : + ", delayed = " + ds.lastStableSize()); int pc = parallelism; long c = ctl; int tc = (short)(c >>> TC_SHIFT); @@ -3623,6 +4039,7 @@ public class ForkJoinPool extends AbstractExecutorService { ", steals = " + st + ", tasks = " + qt + ", submissions = " + ss + + delayed + "]"; } @@ -3948,6 +4365,7 @@ public class ForkJoinPool extends AbstractExecutorService { @Override protected RunnableFuture newTaskFor(Runnable runnable, T value) { + Objects.requireNonNull(runnable); return (Thread.currentThread() instanceof ForkJoinWorkerThread) ? new ForkJoinTask.AdaptedRunnable(runnable, value) : new ForkJoinTask.AdaptedInterruptibleRunnable(runnable, value); @@ -3955,6 +4373,7 @@ public class ForkJoinPool extends AbstractExecutorService { @Override protected RunnableFuture newTaskFor(Callable callable) { + Objects.requireNonNull(callable); return (Thread.currentThread() instanceof ForkJoinWorkerThread) ? new ForkJoinTask.AdaptedCallable(callable) : new ForkJoinTask.AdaptedInterruptibleCallable(callable); @@ -3982,7 +4401,6 @@ public class ForkJoinPool extends AbstractExecutorService { if ((scale & (scale - 1)) != 0) throw new Error("array index scale not a power of two"); - interruptibleTaskClass = ForkJoinTask.InterruptibleTask.class; Class dep = LockSupport.class; // ensure loaded // allow access to non-public methods JLA = SharedSecrets.getJavaLangAccess(); diff --git a/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java b/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java index ec91e33975d..5f2dcb30d41 100644 --- a/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java +++ b/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java @@ -273,6 +273,8 @@ public abstract class ForkJoinTask implements Future, Serializable { static final int ABNORMAL = 1 << 16; static final int THROWN = 1 << 17; static final int HAVE_EXCEPTION = DONE | ABNORMAL | THROWN; + static final int NUH_BIT = 24; // no external caller helping + static final int NO_USER_HELP = 1 << NUH_BIT; static final int MARKER = 1 << 30; // utility marker static final int SMASK = 0xffff; // short bits for tags static final int UNCOMPENSATE = 1 << 16; // helpJoin sentinel @@ -292,6 +294,12 @@ public abstract class ForkJoinTask implements Future, Serializable { private boolean casStatus(int c, int v) { return U.compareAndSetInt(this, STATUS, c, v); } + final int noUserHelp() { // nonvolatile read; return 0 or 1 + return (U.getInt(this, STATUS) & NO_USER_HELP) >>> NUH_BIT; + } + final void setNoUserHelp() { // for use in constructors only + U.putInt(this, STATUS, NO_USER_HELP); + } // Support for waiting and signalling @@ -330,14 +338,9 @@ public abstract class ForkJoinTask implements Future, Serializable { */ final int trySetCancelled() { int s; - for (;;) { - if ((s = status) < 0) - break; - if (casStatus(s, s | (DONE | ABNORMAL))) { - signalWaiters(); - break; - } - } + if ((s = status) >= 0 && + (s = getAndBitwiseOrStatus(DONE | ABNORMAL)) >= 0) + signalWaiters(); return s; } @@ -481,7 +484,7 @@ public abstract class ForkJoinTask implements Future, Serializable { */ private int awaitDone(boolean interruptible, long deadline) { ForkJoinWorkerThread wt; ForkJoinPool p; ForkJoinPool.WorkQueue q; - Thread t; boolean internal; int s; + Thread t; boolean internal; int s, ss; if (internal = (t = Thread.currentThread()) instanceof ForkJoinWorkerThread) { p = (wt = (ForkJoinWorkerThread)t).pool; @@ -492,7 +495,7 @@ public abstract class ForkJoinTask implements Future, Serializable { return (((s = (p == null) ? 0 : ((this instanceof CountedCompleter) ? p.helpComplete(this, q, internal) : - (this instanceof InterruptibleTask) && !internal ? status : + !internal && ((ss = status) & NO_USER_HELP) != 0 ? ss : p.helpJoin(this, q, internal))) < 0)) ? s : awaitDone(internal ? p : null, s, interruptible, deadline); } @@ -642,7 +645,7 @@ public abstract class ForkJoinTask implements Future, Serializable { p = wt.pool; } else - q = (p = ForkJoinPool.common).externalSubmissionQueue(); + q = (p = ForkJoinPool.common).externalSubmissionQueue(false); q.push(this, p, internal); return this; } @@ -1160,7 +1163,7 @@ public abstract class ForkJoinTask implements Future, Serializable { */ public void reinitialize() { aux = null; - status = 0; + status &= NO_USER_HELP; } /** @@ -1414,7 +1417,8 @@ public abstract class ForkJoinTask implements Future, Serializable { * @return the task */ public static ForkJoinTask adapt(Runnable runnable) { - return new AdaptedRunnableAction(runnable); + return new AdaptedRunnableAction( + Objects.requireNonNull(runnable)); } /** @@ -1428,7 +1432,8 @@ public abstract class ForkJoinTask implements Future, Serializable { * @return the task */ public static ForkJoinTask adapt(Runnable runnable, T result) { - return new AdaptedRunnable(runnable, result); + return new AdaptedRunnable( + Objects.requireNonNull(runnable), result); } /** @@ -1442,7 +1447,8 @@ public abstract class ForkJoinTask implements Future, Serializable { * @return the task */ public static ForkJoinTask adapt(Callable callable) { - return new AdaptedCallable(callable); + return new AdaptedCallable( + Objects.requireNonNull(callable)); } /** @@ -1460,7 +1466,8 @@ public abstract class ForkJoinTask implements Future, Serializable { * @since 19 */ public static ForkJoinTask adaptInterruptible(Callable callable) { - return new AdaptedInterruptibleCallable(callable); + return new AdaptedInterruptibleCallable( + Objects.requireNonNull(callable)); } /** @@ -1479,7 +1486,8 @@ public abstract class ForkJoinTask implements Future, Serializable { * @since 22 */ public static ForkJoinTask adaptInterruptible(Runnable runnable, T result) { - return new AdaptedInterruptibleRunnable(runnable, result); + return new AdaptedInterruptibleRunnable( + Objects.requireNonNull(runnable), result); } /** @@ -1497,7 +1505,8 @@ public abstract class ForkJoinTask implements Future, Serializable { * @since 22 */ public static ForkJoinTask adaptInterruptible(Runnable runnable) { - return new AdaptedInterruptibleRunnable(runnable, null); + return new AdaptedInterruptibleRunnable( + Objects.requireNonNull(runnable), null); } // Serialization support @@ -1556,7 +1565,6 @@ public abstract class ForkJoinTask implements Future, Serializable { @SuppressWarnings("serial") // Conditionally serializable T result; AdaptedRunnable(Runnable runnable, T result) { - Objects.requireNonNull(runnable); this.runnable = runnable; this.result = result; // OK to set this even before completion } @@ -1578,7 +1586,6 @@ public abstract class ForkJoinTask implements Future, Serializable { @SuppressWarnings("serial") // Conditionally serializable final Runnable runnable; AdaptedRunnableAction(Runnable runnable) { - Objects.requireNonNull(runnable); this.runnable = runnable; } public final Void getRawResult() { return null; } @@ -1601,7 +1608,6 @@ public abstract class ForkJoinTask implements Future, Serializable { @SuppressWarnings("serial") // Conditionally serializable T result; AdaptedCallable(Callable callable) { - Objects.requireNonNull(callable); this.callable = callable; } public final T getRawResult() { return result; } @@ -1636,6 +1642,9 @@ public abstract class ForkJoinTask implements Future, Serializable { abstract static class InterruptibleTask extends ForkJoinTask implements RunnableFuture { transient volatile Thread runner; + InterruptibleTask() { + setNoUserHelp(); + } abstract T compute() throws Exception; public final boolean exec() { Thread.interrupted(); @@ -1655,20 +1664,29 @@ public abstract class ForkJoinTask implements Future, Serializable { } finally { runner = null; } + return postExec(); + } + boolean postExec() { // cleanup and return completion status to doExec return true; } + final boolean interruptIfRunning(boolean enabled) { + Thread t; + if ((t = runner) == null) // return false if not running + return false; + if (enabled) { + try { + t.interrupt(); + } catch (Throwable ignore) { + } + } + return true; + } public boolean cancel(boolean mayInterruptIfRunning) { - Thread t; - if (trySetCancelled() >= 0) { - if (mayInterruptIfRunning && (t = runner) != null) { - try { - t.interrupt(); - } catch (Throwable ignore) { - } - } - return true; - } - return isCancelled(); + int s; + if ((s = trySetCancelled()) < 0) + return ((s & (ABNORMAL | THROWN)) == ABNORMAL); + interruptIfRunning(mayInterruptIfRunning); + return true; } public final void run() { quietlyInvoke(); } Object adaptee() { return null; } // for printing and diagnostics @@ -1690,7 +1708,6 @@ public abstract class ForkJoinTask implements Future, Serializable { @SuppressWarnings("serial") // Conditionally serializable T result; AdaptedInterruptibleCallable(Callable callable) { - Objects.requireNonNull(callable); this.callable = callable; } public final T getRawResult() { return result; } @@ -1709,7 +1726,6 @@ public abstract class ForkJoinTask implements Future, Serializable { @SuppressWarnings("serial") // Conditionally serializable final T result; AdaptedInterruptibleRunnable(Runnable runnable, T result) { - Objects.requireNonNull(runnable); this.runnable = runnable; this.result = result; } @@ -1727,7 +1743,6 @@ public abstract class ForkJoinTask implements Future, Serializable { @SuppressWarnings("serial") // Conditionally serializable final Runnable runnable; RunnableExecuteAction(Runnable runnable) { - Objects.requireNonNull(runnable); this.runnable = runnable; } public final Void getRawResult() { return null; } @@ -1793,9 +1808,11 @@ public abstract class ForkJoinTask implements Future, Serializable { throw new NullPointerException(); InvokeAnyTask t = null; // list of submitted tasks try { - for (Callable c : tasks) + for (Callable c : tasks) { + Objects.requireNonNull(c); pool.execute((ForkJoinTask) (t = new InvokeAnyTask(c, this, t))); + } return timed ? get(nanos, TimeUnit.NANOSECONDS) : get(); } finally { for (; t != null; t = t.pred) @@ -1822,7 +1839,6 @@ public abstract class ForkJoinTask implements Future, Serializable { final InvokeAnyTask pred; // to traverse on cancellation InvokeAnyTask(Callable callable, InvokeAnyRoot root, InvokeAnyTask pred) { - Objects.requireNonNull(callable); this.callable = callable; this.root = root; this.pred = pred; @@ -1857,4 +1873,39 @@ public abstract class ForkJoinTask implements Future, Serializable { public final void setRawResult(Void v) { } final Object adaptee() { return callable; } } + + /** + * Adapter for Callable-based interruptible tasks with timeout actions. + */ + @SuppressWarnings("serial") // Conditionally serializable + static final class CallableWithTimeout extends InterruptibleTask { + Callable callable; // nulled out after use + ForkJoinTask timeoutAction; + T result; + CallableWithTimeout(Callable callable, + ForkJoinTask timeoutAction) { + this.callable = callable; + this.timeoutAction = timeoutAction; + } + public final T getRawResult() { return result; } + public final void setRawResult(T v) { result = v; } + final Object adaptee() { return callable; } + final T compute() throws Exception { + Callable c; + return ((c = callable) != null) ? c.call() : null; + } + final boolean postExec() { // cancel timeout action + ForkJoinTask t; + callable = null; + if ((t = timeoutAction) != null) { + timeoutAction = null; + try { + t.cancel(false); + } catch (Error | RuntimeException ex) { + } + } + return true; + } + } + } diff --git a/src/java.base/share/classes/java/util/concurrent/SubmissionPublisher.java b/src/java.base/share/classes/java/util/concurrent/SubmissionPublisher.java index 388241b115d..3a8f3746637 100644 --- a/src/java.base/share/classes/java/util/concurrent/SubmissionPublisher.java +++ b/src/java.base/share/classes/java/util/concurrent/SubmissionPublisher.java @@ -206,22 +206,6 @@ public class SubmissionPublisher implements Publisher, (n >= BUFFER_CAPACITY_LIMIT) ? BUFFER_CAPACITY_LIMIT : n + 1; } - // default Executor setup; nearly the same as CompletableFuture - - /** - * Default executor -- ForkJoinPool.commonPool() unless it cannot - * support parallelism. - */ - private static final Executor ASYNC_POOL = - (ForkJoinPool.getCommonPoolParallelism() > 1) ? - ForkJoinPool.commonPool() : new ThreadPerTaskExecutor(); - - /** Fallback if ForkJoinPool.commonPool() cannot support parallelism */ - private static final class ThreadPerTaskExecutor implements Executor { - ThreadPerTaskExecutor() {} // prevent access constructor creation - public void execute(Runnable r) { new Thread(r).start(); } - } - /** * Clients (BufferedSubscriptions) are maintained in a linked list * (via their "next" fields). This works well for publish loops. @@ -316,7 +300,7 @@ public class SubmissionPublisher implements Publisher, * Flow.Subscriber#onNext(Object) onNext}. */ public SubmissionPublisher() { - this(ASYNC_POOL, Flow.defaultBufferSize(), null); + this(ForkJoinPool.asyncCommonPool(), Flow.defaultBufferSize(), null); } /** diff --git a/src/java.base/share/classes/java/util/concurrent/ThreadLocalRandom.java b/src/java.base/share/classes/java/util/concurrent/ThreadLocalRandom.java index 19eb3122947..3713d616a3a 100644 --- a/src/java.base/share/classes/java/util/concurrent/ThreadLocalRandom.java +++ b/src/java.base/share/classes/java/util/concurrent/ThreadLocalRandom.java @@ -45,6 +45,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.stream.DoubleStream; import java.util.stream.IntStream; import java.util.stream.LongStream; +import jdk.internal.access.JavaLangAccess; import jdk.internal.access.JavaUtilConcurrentTLRAccess; import jdk.internal.access.SharedSecrets; import jdk.internal.util.random.RandomSupport; @@ -158,11 +159,19 @@ public final class ThreadLocalRandom extends Random { * rely on (static) atomic generators to initialize the values. */ static final void localInit() { - int p = probeGenerator.addAndGet(PROBE_INCREMENT); - int probe = (p == 0) ? 1 : p; // skip 0 long seed = RandomSupport.mixMurmur64(seeder.getAndAdd(SEEDER_INCREMENT)); - Thread t = Thread.currentThread(); + Thread t = Thread.currentThread(), carrier; U.putLong(t, SEED, seed); + int probe = 0; // if virtual, share probe with carrier + if ((carrier = JLA.currentCarrierThread()) != t && + (probe = U.getInt(carrier, PROBE)) == 0) { + seed = RandomSupport.mixMurmur64(seeder.getAndAdd(SEEDER_INCREMENT)); + U.putLong(carrier, SEED, seed); + } + if (probe == 0 && (probe = probeGenerator.addAndGet(PROBE_INCREMENT)) == 0) + probe = 1; // skip 0 + if (carrier != t) + U.putInt(carrier, PROBE, probe); U.putInt(t, PROBE, probe); } @@ -233,16 +242,18 @@ public final class ThreadLocalRandom extends Random { * the classes that use them. Briefly, a thread's "probe" value is * a non-zero hash code that (probably) does not collide with * other existing threads with respect to any power of two - * collision space. When it does collide, it is pseudo-randomly - * adjusted (using a Marsaglia XorShift). The nextSecondarySeed - * method is used in the same contexts as ThreadLocalRandom, but - * only for transient usages such as random adaptive spin/block - * sequences for which a cheap RNG suffices and for which it could - * in principle disrupt user-visible statistical properties of the - * main ThreadLocalRandom if we were to use it. + * collision space, based on carrier threads in the case of + * VirtualThreads to reduce the expected collision rate. When it + * does collide, it is pseudo-randomly adjusted (using a Marsaglia + * XorShift). The nextSecondarySeed method is used in the same + * contexts as ThreadLocalRandom, but only for transient usages + * such as random adaptive spin/block sequences for which a cheap + * RNG suffices and for which it could in principle disrupt + * user-visible statistical properties of the main + * ThreadLocalRandom if we were to use it. * - * Note: Because of package-protection issues, versions of some - * these methods also appear in some subpackage classes. + * Note: jdk SharedSecrets are used enable use in jdk classes + * outside this package. */ /** @@ -251,7 +262,7 @@ public final class ThreadLocalRandom extends Random { * can be used to force initialization on zero return. */ static final int getProbe() { - return U.getInt(Thread.currentThread(), PROBE); + return U.getInt(JLA.currentCarrierThread(), PROBE); } /** @@ -262,7 +273,7 @@ public final class ThreadLocalRandom extends Random { probe ^= probe << 13; // xorshift probe ^= probe >>> 17; probe ^= probe << 5; - U.putInt(Thread.currentThread(), PROBE, probe); + U.putInt(JLA.currentCarrierThread(), PROBE, probe); return probe; } @@ -378,6 +389,8 @@ public final class ThreadLocalRandom extends Random { = new AtomicLong(RandomSupport.mixMurmur64(System.currentTimeMillis()) ^ RandomSupport.mixMurmur64(System.nanoTime())); + private static final JavaLangAccess JLA = SharedSecrets.getJavaLangAccess(); + // used by ScopedValue private static class Access { static { @@ -386,6 +399,12 @@ public final class ThreadLocalRandom extends Random { public int nextSecondaryThreadLocalRandomSeed() { return nextSecondarySeed(); } + public int getThreadLocalRandomProbe() { + return getProbe(); + } + public int advanceThreadLocalRandomProbe(int r) { + return advanceProbe(r); + } } ); } diff --git a/src/java.base/share/classes/java/util/concurrent/atomic/Striped64.java b/src/java.base/share/classes/java/util/concurrent/atomic/Striped64.java index ad230f62ab6..e743947a6ff 100644 --- a/src/java.base/share/classes/java/util/concurrent/atomic/Striped64.java +++ b/src/java.base/share/classes/java/util/concurrent/atomic/Striped64.java @@ -43,6 +43,8 @@ import java.util.Arrays; import java.util.concurrent.ThreadLocalRandom; import java.util.function.DoubleBinaryOperator; import java.util.function.LongBinaryOperator; +import jdk.internal.access.SharedSecrets; +import jdk.internal.access.JavaUtilConcurrentTLRAccess; /** * A package-local class holding common representation and mechanics @@ -188,24 +190,18 @@ abstract class Striped64 extends Number { } /** - * Returns the probe value for the current thread. - * Duplicated from ThreadLocalRandom because of packaging restrictions. + * Returns the ThreadLocalRandom probe value for the current carrier thread. */ static final int getProbe() { - return (int) THREAD_PROBE.get(Thread.currentThread()); + return TLR.getThreadLocalRandomProbe(); } /** * Pseudo-randomly advances and records the given probe value for the - * given thread. - * Duplicated from ThreadLocalRandom because of packaging restrictions. + * given carrier thread. */ static final int advanceProbe(int probe) { - probe ^= probe << 13; // xorshift - probe ^= probe >>> 17; - probe ^= probe << 5; - THREAD_PROBE.set(Thread.currentThread(), probe); - return probe; + return TLR.advanceThreadLocalRandomProbe(probe); } /** @@ -371,21 +367,17 @@ abstract class Striped64 extends Number { } } + private static final JavaUtilConcurrentTLRAccess TLR = + SharedSecrets.getJavaUtilConcurrentTLRAccess(); + // VarHandle mechanics private static final VarHandle BASE; private static final VarHandle CELLSBUSY; - private static final VarHandle THREAD_PROBE; static { MethodHandles.Lookup l1 = MethodHandles.lookup(); BASE = MhUtil.findVarHandle(l1, "base", long.class); CELLSBUSY = MhUtil.findVarHandle(l1, "cellsBusy", int.class); - try { - MethodHandles.Lookup l2 = MethodHandles.privateLookupIn(Thread.class, l1); - THREAD_PROBE = MhUtil.findVarHandle(l2, "threadLocalRandomProbe", int.class); - } catch (ReflectiveOperationException e) { - throw new ExceptionInInitializerError(e); - } } } diff --git a/src/java.base/share/classes/jdk/internal/access/JavaUtilConcurrentTLRAccess.java b/src/java.base/share/classes/jdk/internal/access/JavaUtilConcurrentTLRAccess.java index 5683146e721..29434e823a4 100644 --- a/src/java.base/share/classes/jdk/internal/access/JavaUtilConcurrentTLRAccess.java +++ b/src/java.base/share/classes/jdk/internal/access/JavaUtilConcurrentTLRAccess.java @@ -27,4 +27,6 @@ package jdk.internal.access; public interface JavaUtilConcurrentTLRAccess { int nextSecondaryThreadLocalRandomSeed(); + int getThreadLocalRandomProbe(); + int advanceThreadLocalRandomProbe(int r); } diff --git a/test/jdk/java/util/concurrent/CompletableFuture/CompletableFutureOrTimeoutExceptionallyTest.java b/test/jdk/java/util/concurrent/CompletableFuture/CompletableFutureOrTimeoutExceptionallyTest.java index 9c6b5b6e5cd..210a791aad0 100644 --- a/test/jdk/java/util/concurrent/CompletableFuture/CompletableFutureOrTimeoutExceptionallyTest.java +++ b/test/jdk/java/util/concurrent/CompletableFuture/CompletableFutureOrTimeoutExceptionallyTest.java @@ -30,39 +30,25 @@ */ import java.time.Duration; -import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ForkJoinPool; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.Test; -import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.assertEquals; class CompletableFutureOrTimeoutExceptionallyTest { - static final BlockingQueue delayerQueue; - static { - try { - var delayerClass = Class.forName("java.util.concurrent.CompletableFuture$Delayer", - true, - CompletableFuture.class.getClassLoader()); - var delayerField = delayerClass.getDeclaredField("delayer"); - delayerField.setAccessible(true); - delayerQueue = ((ScheduledThreadPoolExecutor)delayerField.get(null)).getQueue(); - } catch (Throwable t) { - throw new ExceptionInInitializerError(t); - } - } - + // updated February 2025 to adapt to CompletableFuture DelayScheduler changes /** * Test that orTimeout task is cancelled if the CompletableFuture is completed Exceptionally */ @Test void testOrTimeoutWithCompleteExceptionallyDoesNotLeak() throws InterruptedException { - assertTrue(delayerQueue.peek() == null); + ForkJoinPool delayer = ForkJoinPool.commonPool(); + assertEquals(delayer.getDelayedTaskCount(), 0); var future = new CompletableFuture<>().orTimeout(12, TimeUnit.HOURS); - assertTrue(delayerQueue.peek() != null); future.completeExceptionally(new RuntimeException("This is fine")); - while (delayerQueue.peek() != null) { + while (delayer.getDelayedTaskCount() > 0) { Thread.sleep(100); }; } @@ -72,12 +58,13 @@ class CompletableFutureOrTimeoutExceptionallyTest { */ @Test void testCompleteOnTimeoutWithCompleteExceptionallyDoesNotLeak() throws InterruptedException { - assertTrue(delayerQueue.peek() == null); + ForkJoinPool delayer = ForkJoinPool.commonPool(); + assertEquals(delayer.getDelayedTaskCount(), 0); var future = new CompletableFuture<>().completeOnTimeout(null, 12, TimeUnit.HOURS); - assertTrue(delayerQueue.peek() != null); future.completeExceptionally(new RuntimeException("This is fine")); - while (delayerQueue.peek() != null) { + while (delayer.getDelayedTaskCount() > 0) { Thread.sleep(100); }; } + } diff --git a/test/jdk/java/util/concurrent/tck/CompletableFutureTest.java b/test/jdk/java/util/concurrent/tck/CompletableFutureTest.java index 60da228d175..de3d1dd1050 100644 --- a/test/jdk/java/util/concurrent/tck/CompletableFutureTest.java +++ b/test/jdk/java/util/concurrent/tck/CompletableFutureTest.java @@ -32,6 +32,7 @@ * http://creativecommons.org/publicdomain/zero/1.0/ */ +import static java.util.concurrent.TimeUnit.NANOSECONDS; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.concurrent.CompletableFuture.completedFuture; @@ -101,7 +102,7 @@ public class CompletableFutureTest extends JSR166TestCase { assertNull(result); try { - f.get(randomExpiredTimeout(), randomTimeUnit()); + f.get(1, NANOSECONDS); shouldThrow(); } catch (TimeoutException success) {} @@ -658,8 +659,6 @@ public class CompletableFutureTest extends JSR166TestCase { } } - static final boolean defaultExecutorIsCommonPool - = ForkJoinPool.getCommonPoolParallelism() > 1; /** * Permits the testing of parallel code for the 3 different @@ -750,8 +749,7 @@ public class CompletableFutureTest extends JSR166TestCase { }, ASYNC { public void checkExecutionMode() { - mustEqual(defaultExecutorIsCommonPool, - (ForkJoinPool.commonPool() == ForkJoinTask.getPool())); + mustEqual(ForkJoinPool.commonPool(), ForkJoinTask.getPool()); } public CompletableFuture runAsync(Runnable a) { return CompletableFuture.runAsync(a); @@ -3794,10 +3792,7 @@ public class CompletableFutureTest extends JSR166TestCase { CompletableFuture f = new CompletableFuture<>(); Executor e = f.defaultExecutor(); Executor c = ForkJoinPool.commonPool(); - if (ForkJoinPool.getCommonPoolParallelism() > 1) - assertSame(e, c); - else - assertNotSame(e, c); + assertSame(e, c); } /** diff --git a/test/jdk/java/util/concurrent/tck/ForkJoinPool20Test.java b/test/jdk/java/util/concurrent/tck/ForkJoinPool20Test.java index b35ab064d53..cfde352abdd 100644 --- a/test/jdk/java/util/concurrent/tck/ForkJoinPool20Test.java +++ b/test/jdk/java/util/concurrent/tck/ForkJoinPool20Test.java @@ -31,13 +31,35 @@ * http://creativecommons.org/publicdomain/zero/1.0/ */ +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinTask; import java.util.concurrent.ForkJoinWorkerThread; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.LockSupport; +import java.util.stream.Stream; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.NANOSECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; import junit.framework.Test; import junit.framework.TestSuite; @@ -216,4 +238,412 @@ public class ForkJoinPool20Test extends JSR166TestCase { } return worker; } + + // additions for ScheduledExecutorService + + + /** + * delayed schedule of callable successfully executes after delay + */ + public void testSchedule1() throws Exception { + final ForkJoinPool p = new ForkJoinPool(2); + try (PoolCleaner cleaner = cleaner(p)) { + final long startTime = System.nanoTime(); + final CountDownLatch done = new CountDownLatch(1); + Callable task = new CheckedCallable<>() { + public Boolean realCall() { + done.countDown(); + assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); + return Boolean.TRUE; + }}; + Future f = p.schedule(task, timeoutMillis(), MILLISECONDS); + assertSame(Boolean.TRUE, f.get()); + assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); + assertEquals(0L, done.getCount()); + } + } + + /** + * delayed schedule of callable successfully executes after delay + * even if shutdown. + */ + public void testSchedule1b() throws Exception { + final ForkJoinPool p = new ForkJoinPool(2); + try (PoolCleaner cleaner = cleaner(p)) { + final long startTime = System.nanoTime(); + final CountDownLatch done = new CountDownLatch(1); + Callable task = new CheckedCallable<>() { + public Boolean realCall() { + done.countDown(); + assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); + return Boolean.TRUE; + }}; + Future f = p.schedule(task, timeoutMillis(), MILLISECONDS); + p.shutdown(); + assertSame(Boolean.TRUE, f.get()); + assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); + assertEquals(0L, done.getCount()); + } + } + + /** + * delayed schedule of runnable successfully executes after delay + */ + public void testSchedule3() throws Exception { + final ForkJoinPool p = new ForkJoinPool(2); + try (PoolCleaner cleaner = cleaner(p)) { + final long startTime = System.nanoTime(); + final CountDownLatch done = new CountDownLatch(1); + Runnable task = new CheckedRunnable() { + public void realRun() { + done.countDown(); + assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); + }}; + Future f = p.schedule(task, timeoutMillis(), MILLISECONDS); + await(done); + assertNull(f.get(LONG_DELAY_MS, MILLISECONDS)); + assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); + } + } + + /** + * scheduleAtFixedRate executes runnable after given initial delay + */ + public void testSchedule4() throws Exception { + final ForkJoinPool p = new ForkJoinPool(2); + try (PoolCleaner cleaner = cleaner(p)) { + final long startTime = System.nanoTime(); + final CountDownLatch done = new CountDownLatch(1); + Runnable task = new CheckedRunnable() { + public void realRun() { + done.countDown(); + assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); + }}; + ScheduledFuture f = + p.scheduleAtFixedRate(task, timeoutMillis(), + LONG_DELAY_MS, MILLISECONDS); + await(done); + assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); + f.cancel(true); + } + } + + /** + * scheduleAtFixedRate with 0 initial delay re-rexecutes + */ + public void testSchedule4a() throws Exception { + final ForkJoinPool p = new ForkJoinPool(2); + try (PoolCleaner cleaner = cleaner(p)) { + final long startTime = System.nanoTime(); + final CountDownLatch done = new CountDownLatch(2); + Runnable task = new Runnable() { + public void run() { + done.countDown(); + }}; + ScheduledFuture f = + p.scheduleAtFixedRate(task, 0L, timeoutMillis(), + MILLISECONDS); + await(done); + f.cancel(true); + assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); + } + } + + /** + * scheduleWithFixedDelay executes runnable after given initial delay + */ + public void testSchedule5() throws Exception { + final ForkJoinPool p = new ForkJoinPool(2); + try (PoolCleaner cleaner = cleaner(p)) { + final long startTime = System.nanoTime(); + final CountDownLatch done = new CountDownLatch(1); + Runnable task = new CheckedRunnable() { + public void realRun() { + done.countDown(); + assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); + }}; + ScheduledFuture f = + p.scheduleWithFixedDelay(task, timeoutMillis(), + LONG_DELAY_MS, MILLISECONDS); + await(done); + assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); + f.cancel(true); + } + } + + /** + * scheduleWithFixedDelay with 0 initial delay re-rexecutes + */ + public void testSchedule5a() throws Exception { + final ForkJoinPool p = new ForkJoinPool(2); + try (PoolCleaner cleaner = cleaner(p)) { + final long startTime = System.nanoTime(); + final CountDownLatch done = new CountDownLatch(2); + Runnable task = new Runnable() { + public void run() { + done.countDown(); + }}; + ScheduledFuture f = + p.scheduleWithFixedDelay(task, 0L, timeoutMillis(), + MILLISECONDS); + await(done); + assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); + f.cancel(true); + } + } + + static class RunnableCounter implements Runnable { + AtomicInteger count = new AtomicInteger(0); + public void run() { count.getAndIncrement(); } + } + + /** + * scheduleAtFixedRate executes series of tasks at given rate. + * Eventually, it must hold that: + * cycles - 1 <= elapsedMillis/delay < cycles + * Additionally, periodic tasks are not run after shutdown. + */ + public void testFixedRateSequence() throws InterruptedException { + final ForkJoinPool p = new ForkJoinPool(4); + try (PoolCleaner cleaner = cleaner(p)) { + for (int delay = 1; delay <= LONG_DELAY_MS; delay *= 3) { + final long startTime = System.nanoTime(); + final int cycles = 8; + final CountDownLatch done = new CountDownLatch(cycles); + final Runnable task = new CheckedRunnable() { + public void realRun() { done.countDown(); }}; + final ScheduledFuture periodicTask = + p.scheduleAtFixedRate(task, 0, delay, MILLISECONDS); + final int totalDelayMillis = (cycles - 1) * delay; + await(done, totalDelayMillis + LONG_DELAY_MS); + final long elapsedMillis = millisElapsedSince(startTime); + assertTrue(elapsedMillis >= totalDelayMillis); + if (elapsedMillis <= cycles * delay) + return; + periodicTask.cancel(true); // retry with longer delay + } + fail("unexpected execution rate"); + } + } + /** + * scheduleWithFixedDelay executes series of tasks with given + * period. Eventually, it must hold that each task starts at + * least delay and at most 2 * delay after the termination of the + * previous task. Additionally, periodic tasks are not run after + * shutdown. + */ + public void testFixedDelaySequence() throws InterruptedException { + final ForkJoinPool p = new ForkJoinPool(1); + try (PoolCleaner cleaner = cleaner(p)) { + for (int delay = 1; delay <= LONG_DELAY_MS; delay *= 3) { + final long startTime = System.nanoTime(); + final AtomicLong previous = new AtomicLong(startTime); + final AtomicBoolean tryLongerDelay = new AtomicBoolean(false); + final int cycles = 8; + final CountDownLatch done = new CountDownLatch(cycles); + final int d = delay; + final Runnable task = new CheckedRunnable() { + public void realRun() { + long now = System.nanoTime(); + long elapsedMillis + = NANOSECONDS.toMillis(now - previous.get()); + if (elapsedMillis >= (done.getCount() == cycles ? d : 2 * d)) + tryLongerDelay.set(true); + previous.set(now); + assertTrue(done.getCount() > 0); + done.countDown(); + }}; + final ScheduledFuture periodicTask = + p.scheduleWithFixedDelay(task, 0, delay, MILLISECONDS); + final int totalDelayMillis = (cycles - 1) * delay; + await(done, totalDelayMillis + cycles * LONG_DELAY_MS); + final long elapsedMillis = millisElapsedSince(startTime); + assertTrue(elapsedMillis >= totalDelayMillis); + if (!tryLongerDelay.get()) + return; + periodicTask.cancel(true); // retry with longer delay + } + fail("unexpected execution rate"); + } + } + + /** + * Submitting null tasks throws NullPointerException + */ + public void testNullTaskSubmission() { + final ForkJoinPool p = new ForkJoinPool(1); + try (PoolCleaner cleaner = cleaner(p)) { + assertNullTaskSubmissionThrowsNullPointerException(p); + } + } + + /** + * Submitted tasks are rejected when shutdown + */ + public void testSubmittedTasksRejectedWhenShutdown() throws InterruptedException { + final ForkJoinPool p = new ForkJoinPool(4); + final ThreadLocalRandom rnd = ThreadLocalRandom.current(); + final CountDownLatch threadsStarted = new CountDownLatch(p.getParallelism()); + final CountDownLatch done = new CountDownLatch(1); + final Runnable r = () -> { + threadsStarted.countDown(); + for (;;) { + try { + done.await(); + return; + } catch (InterruptedException shutdownNowDeliberatelyIgnored) {} + }}; + final Callable c = () -> { + threadsStarted.countDown(); + for (;;) { + try { + done.await(); + return Boolean.TRUE; + } catch (InterruptedException shutdownNowDeliberatelyIgnored) {} + }}; + + try (PoolCleaner cleaner = cleaner(p, done)) { + for (int i = p.getParallelism(); i-- > 0; ) { + switch (rnd.nextInt(4)) { + case 0: p.execute(r); break; + case 1: assertFalse(p.submit(r).isDone()); break; + case 2: assertFalse(p.submit(r, Boolean.TRUE).isDone()); break; + case 3: assertFalse(p.submit(c).isDone()); break; + } + } + + await(threadsStarted); + p.shutdown(); + done.countDown(); // release blocking tasks + assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS)); + } + } + + /** + * A fixed delay task with overflowing period should not prevent a + * one-shot task from executing. + * https://bugs.openjdk.org/browse/JDK-8051859 + */ + @SuppressWarnings("FutureReturnValueIgnored") + public void testScheduleWithFixedDelay_overflow() throws Exception { + final CountDownLatch delayedDone = new CountDownLatch(1); + final CountDownLatch immediateDone = new CountDownLatch(1); + final ForkJoinPool p = new ForkJoinPool(2); + try (PoolCleaner cleaner = cleaner(p)) { + final Runnable delayed = () -> { + delayedDone.countDown(); + p.submit(() -> immediateDone.countDown()); + }; + p.scheduleWithFixedDelay(delayed, 0L, Long.MAX_VALUE, SECONDS); + await(delayedDone); + await(immediateDone); + } + } + + /** + * shutdownNow cancels tasks that were not run + */ + public void testShutdownNow_delayedTasks() throws InterruptedException { + final ForkJoinPool p = new ForkJoinPool(2); + List> tasks = new ArrayList<>(); + for (int i = 0; i < 3; i++) { + Runnable r = new NoOpRunnable(); + tasks.add(p.schedule(r, 9, SECONDS)); + tasks.add(p.scheduleAtFixedRate(r, 9, 9, SECONDS)); + tasks.add(p.scheduleWithFixedDelay(r, 9, 9, SECONDS)); + } + p.shutdownNow(); + assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS)); + for (ScheduledFuture task : tasks) { + assertTrue(task.isDone()); + } + assertTrue(p.isTerminated()); + } + + /** + * submitWithTimeout (eventually) cancels task after timeout + */ + public void testSubmitWithTimeoutCancels() throws InterruptedException { + final ForkJoinPool p = ForkJoinPool.commonPool(); + Callable c = new Callable() { + public Boolean call() throws Exception { + Thread.sleep(LONGER_DELAY_MS); return Boolean.TRUE; }}; + ForkJoinTask task = p.submitWithTimeout(c, 1, NANOSECONDS, null); + Thread.sleep(timeoutMillis()); + assertTrue(task.isCancelled()); + } + + static final class SubmitWithTimeoutException extends RuntimeException {} + + /** + * submitWithTimeout using complete completes after timeout + */ + public void testSubmitWithCompleterTimeoutCompletes() throws InterruptedException { + final ForkJoinPool p = ForkJoinPool.commonPool(); + Callable c = new Callable() { + public Item call() throws Exception { + Thread.sleep(LONGER_DELAY_MS); return one; }}; + ForkJoinTask task = p.submitWithTimeout( + c, 1, NANOSECONDS, + (ForkJoinTask t) -> + t.complete(two)); + Thread.sleep(timeoutMillis()); + assertEquals(task.join(), two); + } + + /** + * submitWithTimeout using completeExceptionally throws after timeout + */ + public void testSubmitWithTimeoutThrows() throws InterruptedException { + final ForkJoinPool p = ForkJoinPool.commonPool(); + Callable c = new Callable() { + public Boolean call() throws Exception { + Thread.sleep(LONGER_DELAY_MS); return Boolean.TRUE; }}; + ForkJoinTask task = p.submitWithTimeout( + c, 1, NANOSECONDS, + (ForkJoinTask t) -> + t.completeExceptionally(new SubmitWithTimeoutException())); + Thread.sleep(timeoutMillis()); + try { + task.join(); + shouldThrow(); + } + catch (Exception ex) { + assertTrue(ex instanceof SubmitWithTimeoutException); + } + } + + /** + * submitWithTimeout doesn't cancel if completed before timeout + */ + public void testSubmitWithTimeout_NoTimeout() throws InterruptedException { + final ForkJoinPool p = ForkJoinPool.commonPool(); + Callable c = new Callable() { + public Boolean call() throws Exception { + return Boolean.TRUE; }}; + ForkJoinTask task = p.submitWithTimeout(c, LONGER_DELAY_MS, MILLISECONDS, null); + Thread.sleep(timeoutMillis()); + assertFalse(task.isCancelled()); + assertEquals(task.join(), Boolean.TRUE); + } + + /** + * A delayed task completes (possibly abnormally) if shutdown after + * calling cancelDelayedTasksOnShutdown() + */ + public void testCancelDelayedTasksOnShutdown() throws Exception { + final ForkJoinPool p = new ForkJoinPool(2); + p.cancelDelayedTasksOnShutdown(); + try (PoolCleaner cleaner = cleaner(p)) { + Callable task = new CheckedCallable<>() { + public Boolean realCall() { + return Boolean.TRUE; + }}; + Future f = p.schedule(task, LONGER_DELAY_MS, MILLISECONDS); + p.shutdown(); + assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS)); + assertTrue(f.isDone()); + } + } + } diff --git a/test/jdk/java/util/concurrent/tck/SubmissionPublisherTest.java b/test/jdk/java/util/concurrent/tck/SubmissionPublisherTest.java index c47e66c6727..9f87ff5cf64 100644 --- a/test/jdk/java/util/concurrent/tck/SubmissionPublisherTest.java +++ b/test/jdk/java/util/concurrent/tck/SubmissionPublisherTest.java @@ -178,10 +178,7 @@ public class SubmissionPublisherTest extends JSR166TestCase { checkInitialState(p); assertEquals(p.getMaxBufferCapacity(), Flow.defaultBufferSize()); Executor e = p.getExecutor(), c = ForkJoinPool.commonPool(); - if (ForkJoinPool.getCommonPoolParallelism() > 1) - assertSame(e, c); - else - assertNotSame(e, c); + assertSame(e, c); } /**