mirror of
https://github.com/openjdk/jdk.git
synced 2026-03-15 18:33:41 +00:00
8367857: Implement JEP 525: Structured Concurrency (Sixth Preview)
Reviewed-by: vklang
This commit is contained in:
parent
72989e0fac
commit
0bae56b614
@ -34,7 +34,6 @@ import java.util.Objects;
|
||||
import java.util.concurrent.StructuredTaskScope.Joiner;
|
||||
import java.util.concurrent.StructuredTaskScope.Subtask;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.stream.Stream;
|
||||
import jdk.internal.invoke.MhUtil;
|
||||
|
||||
/**
|
||||
@ -64,29 +63,30 @@ class Joiners {
|
||||
}
|
||||
|
||||
/**
|
||||
* A joiner that returns a stream of all subtasks when all subtasks complete
|
||||
* A joiner that returns a list of all results when all subtasks complete
|
||||
* successfully. Cancels the scope if any subtask fails.
|
||||
*/
|
||||
static final class AllSuccessful<T> implements Joiner<T, Stream<Subtask<T>>> {
|
||||
static final class AllSuccessful<T> implements Joiner<T, List<T>> {
|
||||
private static final VarHandle FIRST_EXCEPTION =
|
||||
MhUtil.findVarHandle(MethodHandles.lookup(), "firstException", Throwable.class);
|
||||
|
||||
// list of forked subtasks, only accessed by owner thread
|
||||
private final List<Subtask<T>> subtasks = new ArrayList<>();
|
||||
// list of forked subtasks, created lazily, only accessed by owner thread
|
||||
private List<Subtask<T>> subtasks;
|
||||
|
||||
private volatile Throwable firstException;
|
||||
|
||||
@Override
|
||||
public boolean onFork(Subtask<? extends T> subtask) {
|
||||
public boolean onFork(Subtask<T> subtask) {
|
||||
ensureUnavailable(subtask);
|
||||
@SuppressWarnings("unchecked")
|
||||
var s = (Subtask<T>) subtask;
|
||||
subtasks.add(s);
|
||||
if (subtasks == null) {
|
||||
subtasks = new ArrayList<>();
|
||||
}
|
||||
subtasks.add(subtask);
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean onComplete(Subtask<? extends T> subtask) {
|
||||
public boolean onComplete(Subtask<T> subtask) {
|
||||
Subtask.State state = ensureCompleted(subtask);
|
||||
return (state == Subtask.State.FAILED)
|
||||
&& (firstException == null)
|
||||
@ -94,12 +94,17 @@ class Joiners {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Stream<Subtask<T>> result() throws Throwable {
|
||||
public List<T> result() throws Throwable {
|
||||
Throwable ex = firstException;
|
||||
if (ex != null) {
|
||||
throw ex;
|
||||
} else {
|
||||
return subtasks.stream();
|
||||
try {
|
||||
if (ex != null) {
|
||||
throw ex;
|
||||
}
|
||||
return (subtasks != null)
|
||||
? subtasks.stream().map(Subtask::get).toList()
|
||||
: List.of();
|
||||
} finally {
|
||||
subtasks = null; // allow subtasks to be GC'ed
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -130,7 +135,7 @@ class Joiners {
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean onComplete(Subtask<? extends T> subtask) {
|
||||
public boolean onComplete(Subtask<T> subtask) {
|
||||
Subtask.State state = ensureCompleted(subtask);
|
||||
Subtask<T> s;
|
||||
while (((s = this.subtask) == null)
|
||||
@ -166,7 +171,7 @@ class Joiners {
|
||||
private volatile Throwable firstException;
|
||||
|
||||
@Override
|
||||
public boolean onComplete(Subtask<? extends T> subtask) {
|
||||
public boolean onComplete(Subtask<T> subtask) {
|
||||
Subtask.State state = ensureCompleted(subtask);
|
||||
return (state == Subtask.State.FAILED)
|
||||
&& (firstException == null)
|
||||
@ -185,36 +190,48 @@ class Joiners {
|
||||
}
|
||||
|
||||
/**
|
||||
* A joiner that returns a stream of all subtasks.
|
||||
* A joiner that returns a list of all subtasks.
|
||||
*/
|
||||
static final class AllSubtasks<T> implements Joiner<T, Stream<Subtask<T>>> {
|
||||
private final Predicate<Subtask<? extends T>> isDone;
|
||||
static final class AllSubtasks<T> implements Joiner<T, List<Subtask<T>>> {
|
||||
private final Predicate<Subtask<T>> isDone;
|
||||
|
||||
// list of forked subtasks, only accessed by owner thread
|
||||
private final List<Subtask<T>> subtasks = new ArrayList<>();
|
||||
// list of forked subtasks, created lazily, only accessed by owner thread
|
||||
private List<Subtask<T>> subtasks;
|
||||
|
||||
AllSubtasks(Predicate<Subtask<? extends T>> isDone) {
|
||||
AllSubtasks(Predicate<Subtask<T>> isDone) {
|
||||
this.isDone = Objects.requireNonNull(isDone);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean onFork(Subtask<? extends T> subtask) {
|
||||
public boolean onFork(Subtask<T> subtask) {
|
||||
ensureUnavailable(subtask);
|
||||
@SuppressWarnings("unchecked")
|
||||
var s = (Subtask<T>) subtask;
|
||||
subtasks.add(s);
|
||||
if (subtasks == null) {
|
||||
subtasks = new ArrayList<>();
|
||||
}
|
||||
subtasks.add(subtask);
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean onComplete(Subtask<? extends T> subtask) {
|
||||
public boolean onComplete(Subtask<T> subtask) {
|
||||
ensureCompleted(subtask);
|
||||
return isDone.test(subtask);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Stream<Subtask<T>> result() {
|
||||
return subtasks.stream();
|
||||
public void onTimeout() {
|
||||
// do nothing, this joiner does not throw TimeoutException
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Subtask<T>> result() {
|
||||
if (subtasks != null) {
|
||||
List<Subtask<T>> result = List.copyOf(subtasks);
|
||||
subtasks = null; // allow subtasks to be GC'ed
|
||||
return result;
|
||||
} else {
|
||||
return List.of();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -25,10 +25,10 @@
|
||||
package java.util.concurrent;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.function.Function;
|
||||
import java.util.List;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Stream;
|
||||
import java.util.function.UnaryOperator;
|
||||
import jdk.internal.javac.PreviewFeature;
|
||||
|
||||
/**
|
||||
@ -51,9 +51,9 @@ import jdk.internal.javac.PreviewFeature;
|
||||
* To ensure correct usage, the {@code fork}, {@code join} and {@code close} methods may
|
||||
* only be invoked by the <em>owner thread</em> (the thread that opened the {@code
|
||||
* StructuredTaskScope}), the {@code fork} method may not be called after {@code join},
|
||||
* the {@code join} method may only be invoked once, and the {@code close} method throws
|
||||
* an exception after closing if the owner did not invoke the {@code join} method after
|
||||
* forking subtasks.
|
||||
* the {@code join} method must be invoked to get the outcome after forking subtasks, and
|
||||
* the {@code close} method throws an exception after closing if the owner did not invoke
|
||||
* the {@code join} method after forking subtasks.
|
||||
*
|
||||
* <p> As a first example, consider a task that splits into two subtasks to concurrently
|
||||
* fetch resources from two URL locations "left" and "right". Both subtasks may complete
|
||||
@ -107,10 +107,10 @@ import jdk.internal.javac.PreviewFeature;
|
||||
* implements the desired policy. A {@code Joiner} handles subtask completion and produces
|
||||
* the outcome for the {@link #join() join} method. In the example above, {@code join}
|
||||
* returns {@code null}. Depending on the {@code Joiner}, {@code join} may return a
|
||||
* result, a stream of elements, or some other object. The {@code Joiner} interface defines
|
||||
* result, a list of elements, or some other object. The {@code Joiner} interface defines
|
||||
* factory methods to create {@code Joiner}s for some common cases.
|
||||
*
|
||||
* <p> A {@code Joiner} may <a id="Cancallation">cancel</a> the scope (sometimes called
|
||||
* <p> A {@code Joiner} may <a id="Cancellation">cancel</a> the scope (sometimes called
|
||||
* "short-circuiting") when some condition is reached that does not require the result of
|
||||
* subtasks that are still executing. Cancelling the scope prevents new threads from being
|
||||
* started to execute further subtasks, {@linkplain Thread#interrupt() interrupts} the
|
||||
@ -124,13 +124,13 @@ import jdk.internal.javac.PreviewFeature;
|
||||
* <p> Now consider another example that splits into two subtasks. In this example,
|
||||
* each subtask produces a {@code String} result and the task is only interested in
|
||||
* the result from the first subtask to complete successfully. The example uses {@link
|
||||
* Joiner#anySuccessfulResultOrThrow() Joiner.anySuccessfulResultOrThrow()} to
|
||||
* create a {@code Joiner} that makes available the result of the first subtask to
|
||||
* complete successfully. The type parameter in the example is "{@code String}" so that
|
||||
* only subtasks that return a {@code String} can be forked.
|
||||
* Joiner#anySuccessfulOrThrow() Joiner.anySuccessfulOrThrow()} to create a {@code Joiner}
|
||||
* that makes available the result of the first subtask to complete successfully. The type
|
||||
* parameter in the example is "{@code String}" so that only subtasks that return a
|
||||
* {@code String} can be forked.
|
||||
* {@snippet lang=java :
|
||||
* // @link substring="open" target="#open(Joiner)" :
|
||||
* try (var scope = StructuredTaskScope.open(Joiner.<String>anySuccessfulResultOrThrow())) {
|
||||
* try (var scope = StructuredTaskScope.open(Joiner.<String>anySuccessfulOrThrow())) {
|
||||
*
|
||||
* scope.fork(callable1);
|
||||
* scope.fork(callable2);
|
||||
@ -154,8 +154,9 @@ import jdk.internal.javac.PreviewFeature;
|
||||
* the {@code Joiner} and usage. Some {@code Joiner} implementations are suited to subtasks
|
||||
* that return results of the same type and where the {@code join} method returns a result
|
||||
* for the task to use. Code that forks subtasks that return results of different
|
||||
* types, and uses a {@code Joiner} such as {@code Joiner.awaitAllSuccessfulOrThrow()} that
|
||||
* does not return a result, will use {@link Subtask#get() Subtask.get()} after joining.
|
||||
* types, and uses a {@code Joiner} such as {@link Joiner#awaitAllSuccessfulOrThrow()
|
||||
* awaitAllSuccessfulOrThrow} that does not return a result, will use {@link Subtask#get()
|
||||
* Subtask.get()} after joining.
|
||||
*
|
||||
* <h2>Exception handling</h2>
|
||||
*
|
||||
@ -198,23 +199,22 @@ import jdk.internal.javac.PreviewFeature;
|
||||
*
|
||||
* <h2>Configuration</h2>
|
||||
*
|
||||
*
|
||||
* A {@code StructuredTaskScope} is opened with {@linkplain Configuration configuration}
|
||||
* that consists of a {@link ThreadFactory} to create threads, an optional name for
|
||||
* monitoring and management purposes, and an optional timeout.
|
||||
* that consists of a {@link ThreadFactory} to create threads, an optional name for the
|
||||
* scope, and an optional timeout. The name is intended for monitoring and management
|
||||
* purposes.
|
||||
*
|
||||
* <p> The {@link #open()} and {@link #open(Joiner)} methods create a {@code StructuredTaskScope}
|
||||
* with the <a id="DefaultConfiguration"> <em>default configuration</em></a>. The default
|
||||
* configuration has a {@code ThreadFactory} that creates unnamed
|
||||
* <a href="{@docRoot}/java.base/java/lang/Thread.html#virtual-threads">virtual threads</a>,
|
||||
* is unnamed for monitoring and management purposes, and has no timeout.
|
||||
* configuration has a {@code ThreadFactory} that creates unnamed {@linkplain
|
||||
* Thread##virtual-threads virtual threads}, does not name the scope, and has no timeout.
|
||||
*
|
||||
* <p> The 2-arg {@link #open(Joiner, Function) open} method can be used to create a
|
||||
* {@code StructuredTaskScope} that uses a different {@code ThreadFactory}, has a name for
|
||||
* the purposes of monitoring and management, or has a timeout that cancels the scope if
|
||||
* the timeout expires before or while waiting for subtasks to complete. The {@code open}
|
||||
* method is called with a {@linkplain Function function} that is applied to the default
|
||||
* configuration and returns a {@link Configuration Configuration} for the
|
||||
* <p> The 2-arg {@link #open(Joiner, UnaryOperator) open} method can be used to create a
|
||||
* {@code StructuredTaskScope} that uses a different {@code ThreadFactory}, is named for
|
||||
* monitoring and management purposes, or has a timeout that cancels the scope if the
|
||||
* timeout expires before or while waiting for subtasks to complete. The {@code open}
|
||||
* method is called with an {@linkplain UnaryOperator operator} that is applied to the
|
||||
* default configuration and returns a {@link Configuration Configuration} for the
|
||||
* {@code StructuredTaskScope} under construction.
|
||||
*
|
||||
* <p> The following example opens a new {@code StructuredTaskScope} with a {@code
|
||||
@ -237,9 +237,9 @@ import jdk.internal.javac.PreviewFeature;
|
||||
*
|
||||
* <p> A second example sets a timeout, represented by a {@link Duration}. The timeout
|
||||
* starts when the new scope is opened. If the timeout expires before the {@code join}
|
||||
* method has completed then the scope is <a href="#Cancallation">cancelled</a>. This
|
||||
* interrupts the threads executing the two subtasks and causes the {@link #join() join}
|
||||
* method to throw {@link TimeoutException}.
|
||||
* method has completed then the scope is {@linkplain ##Cancellation cancelled} (this
|
||||
* interrupts the threads executing the two subtasks), and the {@code join} method
|
||||
* throws {@link TimeoutException TimeoutException}.
|
||||
* {@snippet lang=java :
|
||||
* Duration timeout = Duration.ofSeconds(10);
|
||||
*
|
||||
@ -251,9 +251,7 @@ import jdk.internal.javac.PreviewFeature;
|
||||
* scope.fork(callable1);
|
||||
* scope.fork(callable2);
|
||||
*
|
||||
* List<String> result = scope.join()
|
||||
* .map(Subtask::get)
|
||||
* .toList();
|
||||
* List<String> results = scope.join();
|
||||
*
|
||||
* }
|
||||
* }
|
||||
@ -314,11 +312,10 @@ import jdk.internal.javac.PreviewFeature;
|
||||
* });
|
||||
* }
|
||||
*
|
||||
* <p> A scoped value inherited into a subtask may be
|
||||
* <a href="{@docRoot}/java.base/java/lang/ScopedValue.html#rebind">rebound</a> to a new
|
||||
* value in the subtask for the bounded execution of some method executed in the subtask.
|
||||
* When the method completes, the value of the {@code ScopedValue} reverts to its previous
|
||||
* value, the value inherited from the thread executing the task.
|
||||
* <p> A scoped value inherited into a subtask may be {@linkplain ScopedValue##rebind
|
||||
* rebound} to a new value in the subtask for the bounded execution of some method executed
|
||||
* in the subtask. When the method completes, the value of the {@code ScopedValue} reverts
|
||||
* to its previous value, the value inherited from the thread executing the task.
|
||||
*
|
||||
* <p> A subtask may execute code that itself opens a new {@code StructuredTaskScope}.
|
||||
* A task executing in thread T1 opens a {@code StructuredTaskScope} and forks a
|
||||
@ -331,11 +328,15 @@ import jdk.internal.javac.PreviewFeature;
|
||||
*
|
||||
* <h2>Memory consistency effects</h2>
|
||||
*
|
||||
* <p> Actions in the owner thread of a {@code StructuredTaskScope} prior to
|
||||
* {@linkplain #fork forking} of a subtask
|
||||
* <a href="{@docRoot}/java.base/java/util/concurrent/package-summary.html#MemoryVisibility">
|
||||
* <i>happen-before</i></a> any actions taken by that subtask, which in turn
|
||||
* <i>happen-before</i> the subtask result is {@linkplain Subtask#get() retrieved}.
|
||||
* <p> Actions in the owner thread of a {@code StructuredTaskScope} prior to {@linkplain
|
||||
* #fork forking} of a subtask {@linkplain java.util.concurrent##MemoryVisibility
|
||||
* <i>happen-before</i>} any actions taken by the thread that executes the subtask, which
|
||||
* in turn <i>happen-before</i> actions in any thread that successfully obtains the
|
||||
* subtask outcome with {@link Subtask#get() Subtask.get()} or {@link Subtask#exception()
|
||||
* Subtask.exception()}. If a subtask's outcome contributes to the result or exception
|
||||
* from {@link #join()}, then any actions taken by the thread executing that subtask
|
||||
* <i>happen-before</i> the owner thread returns from {@code join} with a result or
|
||||
* {@link FailedException FailedException}.
|
||||
*
|
||||
* <h2>General exceptions</h2>
|
||||
*
|
||||
@ -405,16 +406,20 @@ public sealed interface StructuredTaskScope<T, R>
|
||||
* {@link #fork(Runnable) fork(Runnable)} then {@code null} is returned.
|
||||
*
|
||||
* <p> Code executing in the scope owner thread can use this method to get the
|
||||
* result of a successful subtask only after it has {@linkplain #join() joined}.
|
||||
* result of a successful subtask after it has {@linkplain #join() joined}.
|
||||
*
|
||||
* <p> Code executing in the {@code Joiner} {@link Joiner#onComplete(Subtask)
|
||||
* onComplete} method should test that the {@linkplain #state() subtask state} is
|
||||
* {@link State#SUCCESS SUCCESS} before using this method to get the result.
|
||||
*
|
||||
* <p> This method may be invoked by any thread after the scope owner has joined.
|
||||
* The only case where this method can be used to get the result before the scope
|
||||
* owner has joined is when called from the {@code onComplete(Subtask)} method.
|
||||
*
|
||||
* @return the possibly-null result
|
||||
* @throws IllegalStateException if the subtask has not completed, did not complete
|
||||
* successfully, or the current thread is the scope owner invoking this
|
||||
* method before {@linkplain #join() joining}
|
||||
* @throws IllegalStateException if the subtask has not completed or did not
|
||||
* complete successfully, or this method if invoked outside the context of the
|
||||
* {@code onComplete(Subtask)} method before the owner thread has joined
|
||||
* @see State#SUCCESS
|
||||
*/
|
||||
T get();
|
||||
@ -427,15 +432,19 @@ public sealed interface StructuredTaskScope<T, R>
|
||||
* exception or error thrown by the {@link Runnable#run() run} method is returned.
|
||||
*
|
||||
* <p> Code executing in the scope owner thread can use this method to get the
|
||||
* exception thrown by a failed subtask only after it has {@linkplain #join() joined}.
|
||||
* exception thrown by a failed subtask after it has {@linkplain #join() joined}.
|
||||
*
|
||||
* <p> Code executing in a {@code Joiner} {@link Joiner#onComplete(Subtask)
|
||||
* onComplete} method should test that the {@linkplain #state() subtask state} is
|
||||
* {@link State#FAILED FAILED} before using this method to get the exception.
|
||||
*
|
||||
* @throws IllegalStateException if the subtask has not completed, completed with
|
||||
* a result, or the current thread is the scope owner invoking this method
|
||||
* before {@linkplain #join() joining}
|
||||
* <p> This method may be invoked by any thread after the scope owner has joined.
|
||||
* The only case where this method can be used to get the exception before the scope
|
||||
* owner has joined is when called from the {@code onComplete(Subtask)} method.
|
||||
*
|
||||
* @throws IllegalStateException if the subtask has not completed or completed
|
||||
* with a result, or this method if invoked outside the context of the {@code
|
||||
* onComplete(Subtask)} method before the owner thread has joined
|
||||
* @see State#FAILED
|
||||
*/
|
||||
Throwable exception();
|
||||
@ -449,22 +458,22 @@ public sealed interface StructuredTaskScope<T, R>
|
||||
* <p> Joiner defines static methods to create {@code Joiner} objects for common cases:
|
||||
* <ul>
|
||||
* <li> {@link #allSuccessfulOrThrow() allSuccessfulOrThrow()} creates a {@code Joiner}
|
||||
* that yields a stream of the completed subtasks for {@code join} to return when
|
||||
* all subtasks complete successfully. It cancels the scope and causes {@code join}
|
||||
* to throw if any subtask fails.
|
||||
* <li> {@link #anySuccessfulResultOrThrow() anySuccessfulResultOrThrow()} creates a
|
||||
* {@code Joiner} that yields the result of the first subtask to succeed for {@code
|
||||
* join} to return. It causes {@code join} to throw if all subtasks fail.
|
||||
* that yields a list of all results for {@code join} to return when all subtasks
|
||||
* complete successfully. It cancels the scope and causes {@code join} to throw if
|
||||
* any subtask fails.
|
||||
* <li> {@link #anySuccessfulOrThrow() anySuccessfulOrThrow()} creates a {@code Joiner}
|
||||
* that yields the result of the first subtask to succeed for {@code join} to return.
|
||||
* It causes {@code join} to throw if all subtasks fail.
|
||||
* <li> {@link #awaitAllSuccessfulOrThrow() awaitAllSuccessfulOrThrow()} creates a
|
||||
* {@code Joiner} that waits for all successful subtasks. It cancels the scope and
|
||||
* causes {@code join} to throw if any subtask fails.
|
||||
* <li> {@link #awaitAll() awaitAll()} creates a {@code Joiner} that waits for all
|
||||
* subtasks. It does not cancel the scope or cause {@code join} to throw.
|
||||
* subtasks to complete. It does not cancel the scope or cause {@code join} to throw.
|
||||
* </ul>
|
||||
*
|
||||
* <p> In addition to the methods to create {@code Joiner} objects for common cases,
|
||||
* the {@link #allUntil(Predicate) allUntil(Predicate)} method is defined to create a
|
||||
* {@code Joiner} that yields a stream of all subtasks. It is created with a {@link
|
||||
* {@code Joiner} that yields a list of all subtasks. It is created with a {@link
|
||||
* Predicate Predicate} that determines if the scope should continue or be cancelled.
|
||||
* This {@code Joiner} can be built upon to create custom policies that cancel the
|
||||
* scope based on some condition.
|
||||
@ -472,22 +481,34 @@ public sealed interface StructuredTaskScope<T, R>
|
||||
* <p> More advanced policies can be developed by implementing the {@code Joiner}
|
||||
* interface. The {@link #onFork(Subtask)} method is invoked when subtasks are forked.
|
||||
* The {@link #onComplete(Subtask)} method is invoked when subtasks complete with a
|
||||
* result or exception. These methods return a {@code boolean} to indicate if scope
|
||||
* result or exception. These methods return a {@code boolean} to indicate if the scope
|
||||
* should be cancelled. These methods can be used to collect subtasks, results, or
|
||||
* exceptions, and control when to cancel the scope. The {@link #result()} method
|
||||
* must be implemented to produce the result (or exception) for the {@code join}
|
||||
* method.
|
||||
*
|
||||
* <p> If a {@code StructuredTaskScope} is opened with a {@linkplain
|
||||
* Configuration#withTimeout(Duration) timeout}, and the timeout expires before or
|
||||
* while waiting in {@link StructuredTaskScope#join() join()}, then the scope is
|
||||
* {@linkplain StructuredTaskScope##Cancellation cancelled}, and the {@code Joiners}'s
|
||||
* {@link #onTimeout()} method is invoked to notify the {@code Joiner} and optionally
|
||||
* throw {@link TimeoutException TimeoutException}. If the {@code onTimeout()} method
|
||||
* does not throw then the {@code join()} method will invoke the {@link #result()}
|
||||
* method to produce a result. This result may be based on the outcome of subtasks
|
||||
* that completed before the timeout expired.
|
||||
*
|
||||
* <p> Unless otherwise specified, passing a {@code null} argument to a method
|
||||
* in this class will cause a {@link NullPointerException} to be thrown.
|
||||
*
|
||||
* @implSpec Implementations of this interface must be thread safe. The {@link
|
||||
* #onComplete(Subtask)} method defined by this interface may be invoked by several
|
||||
* threads concurrently.
|
||||
* @implSpec Implementations of this interface must be thread-safe. The {@link
|
||||
* #onComplete(Subtask)} method may be invoked concurrently, as multiple subtasks can
|
||||
* complete at the same time. Additionally, the {@code onComplete} method may be
|
||||
* called concurrently with the scope owner thread invoking the {@link #onFork(Subtask)}
|
||||
* or {@link #onTimeout()} methods.
|
||||
*
|
||||
* @apiNote It is very important that a new {@code Joiner} object is created for each
|
||||
* {@code StructuredTaskScope}. {@code Joiner} objects should never be shared with
|
||||
* different scopes or re-used after a task is closed.
|
||||
* different scopes or re-used after a scope is closed.
|
||||
*
|
||||
* <p> Designing a {@code Joiner} should take into account the code at the use-site
|
||||
* where the results from the {@link StructuredTaskScope#join() join} method are
|
||||
@ -502,13 +523,11 @@ public sealed interface StructuredTaskScope<T, R>
|
||||
* @see #open(Joiner)
|
||||
*/
|
||||
@PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
|
||||
@FunctionalInterface
|
||||
interface Joiner<T, R> {
|
||||
/**
|
||||
* Invoked by {@link #fork(Callable) fork(Callable)} and {@link #fork(Runnable)
|
||||
* fork(Runnable)} when forking a subtask. The method is invoked from the task
|
||||
* owner thread. The method is invoked before a thread is created to run the
|
||||
* subtask.
|
||||
* fork(Runnable)} when forking a subtask. The method is invoked before a thread
|
||||
* is created to run the subtask.
|
||||
*
|
||||
* @implSpec The default implementation throws {@code NullPointerException} if the
|
||||
* subtask is {@code null}. It throws {@code IllegalArgumentException} if the
|
||||
@ -521,7 +540,7 @@ public sealed interface StructuredTaskScope<T, R>
|
||||
* @param subtask the subtask
|
||||
* @return {@code true} to cancel the scope, otherwise {@code false}
|
||||
*/
|
||||
default boolean onFork(Subtask<? extends T> subtask) {
|
||||
default boolean onFork(Subtask<T> subtask) {
|
||||
if (subtask.state() != Subtask.State.UNAVAILABLE) {
|
||||
throw new IllegalArgumentException("Subtask not in UNAVAILABLE state");
|
||||
}
|
||||
@ -544,13 +563,34 @@ public sealed interface StructuredTaskScope<T, R>
|
||||
* @param subtask the subtask
|
||||
* @return {@code true} to cancel the scope, otherwise {@code false}
|
||||
*/
|
||||
default boolean onComplete(Subtask<? extends T> subtask) {
|
||||
default boolean onComplete(Subtask<T> subtask) {
|
||||
if (subtask.state() == Subtask.State.UNAVAILABLE) {
|
||||
throw new IllegalArgumentException("Subtask has not completed");
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Invoked by the {@link #join() join()} method if the scope was opened with a
|
||||
* timeout, and the timeout expires before or while waiting in the {@code join}
|
||||
* method.
|
||||
*
|
||||
* @implSpec The default implementation throws {@link TimeoutException TimeoutException}.
|
||||
*
|
||||
* @apiNote This method is intended for {@code Joiner} implementations that do not
|
||||
* throw {@link TimeoutException TimeoutException}, or require a notification when
|
||||
* the timeout expires before or while waiting in {@code join}.
|
||||
*
|
||||
* <p> This method is invoked by the {@code join} method. It should not be
|
||||
* invoked directly.
|
||||
*
|
||||
* @throws TimeoutException for {@code join} to throw
|
||||
* @since 26
|
||||
*/
|
||||
default void onTimeout() {
|
||||
throw new TimeoutException();
|
||||
}
|
||||
|
||||
/**
|
||||
* Invoked by the {@link #join() join()} method to produce the result (or exception)
|
||||
* after waiting for all subtasks to complete or the scope cancelled. The result
|
||||
@ -560,9 +600,7 @@ public sealed interface StructuredTaskScope<T, R>
|
||||
*
|
||||
* <p> In normal usage, this method will be called at most once by the {@code join}
|
||||
* method to produce the result (or exception). The behavior of this method when
|
||||
* invoked directly, and invoked more than once, is undefined. Where possible, an
|
||||
* implementation should return an equal result (or throw the same exception) on
|
||||
* second or subsequent calls to produce the outcome.
|
||||
* invoked directly is undefined.
|
||||
*
|
||||
* @apiNote This method is invoked by the {@code join} method. It should not be
|
||||
* invoked directly.
|
||||
@ -573,15 +611,17 @@ public sealed interface StructuredTaskScope<T, R>
|
||||
R result() throws Throwable;
|
||||
|
||||
/**
|
||||
* {@return a new Joiner object that yields a stream of all subtasks when all
|
||||
* {@return a new Joiner object that yields a list of all results when all
|
||||
* subtasks complete successfully}
|
||||
* The {@code Joiner} <a href="StructuredTaskScope.html#Cancallation">cancels</a>
|
||||
* The {@code Joiner} {@linkplain StructuredTaskScope##Cancellation cancels}
|
||||
* the scope and causes {@code join} to throw if any subtask fails.
|
||||
*
|
||||
* <p> If all subtasks complete successfully, the joiner's {@link Joiner#result()}
|
||||
* method returns a stream of all subtasks in the order that they were forked.
|
||||
* If any subtask failed then the {@code result} method throws the exception from
|
||||
* the first subtask to fail.
|
||||
* <p> If all subtasks complete successfully then the joiner's {@link
|
||||
* Joiner#result()} method returns a list of all results, in the order that the
|
||||
* subtasks were forked, for the {@link StructuredTaskScope#join() join()} to return.
|
||||
* If the scope was opened with a {@linkplain Configuration#withTimeout(Duration)
|
||||
* timeout}, and the timeout expires before or while waiting for all subtasks to
|
||||
* complete, then the {@code join} method throws {@code TimeoutException}.
|
||||
*
|
||||
* @apiNote Joiners returned by this method are suited to cases where all subtasks
|
||||
* return a result of the same type. Joiners returned by {@link
|
||||
@ -590,7 +630,7 @@ public sealed interface StructuredTaskScope<T, R>
|
||||
*
|
||||
* @param <T> the result type of subtasks
|
||||
*/
|
||||
static <T> Joiner<T, Stream<Subtask<T>>> allSuccessfulOrThrow() {
|
||||
static <T> Joiner<T, List<T>> allSuccessfulOrThrow() {
|
||||
return new Joiners.AllSuccessful<>();
|
||||
}
|
||||
|
||||
@ -599,25 +639,33 @@ public sealed interface StructuredTaskScope<T, R>
|
||||
* completed successfully}
|
||||
* The {@code Joiner} causes {@code join} to throw if all subtasks fail.
|
||||
*
|
||||
* <p> The joiner's {@link Joiner#result()} method returns the result of a subtask
|
||||
* that completed successfully. If all subtasks fail then the {@code result} method
|
||||
* throws the exception from one of the failed subtasks. The {@code result} method
|
||||
* throws {@code NoSuchElementException} if no subtasks were forked.
|
||||
* <p> The joiner's {@link Joiner#result()} method returns the result of a subtask,
|
||||
* that completed successfully, for the {@link StructuredTaskScope#join() join()}
|
||||
* to return. If all subtasks fail then the {@code result} method throws the
|
||||
* exception from one of the failed subtasks. The {@code result} method throws
|
||||
* {@code NoSuchElementException} if no subtasks were forked. If the scope was
|
||||
* opened with a {@linkplain Configuration#withTimeout(Duration) timeout}, and
|
||||
* the timeout expires before or while waiting for any subtask to complete
|
||||
* successfully, then the {@code join} method throws {@code TimeoutException}.
|
||||
*
|
||||
* @param <T> the result type of subtasks
|
||||
* @since 26
|
||||
*/
|
||||
static <T> Joiner<T, T> anySuccessfulResultOrThrow() {
|
||||
static <T> Joiner<T, T> anySuccessfulOrThrow() {
|
||||
return new Joiners.AnySuccessful<>();
|
||||
}
|
||||
|
||||
/**
|
||||
* {@return a new Joiner object that waits for subtasks to complete successfully}
|
||||
* The {@code Joiner} <a href="StructuredTaskScope.html#Cancallation">cancels</a>
|
||||
* The {@code Joiner} {@linkplain StructuredTaskScope##Cancellation cancels}
|
||||
* the scope and causes {@code join} to throw if any subtask fails.
|
||||
*
|
||||
* <p> The joiner's {@link Joiner#result() result} method returns {@code null}
|
||||
* if all subtasks complete successfully, or throws the exception from the first
|
||||
* subtask to fail.
|
||||
* subtask to fail. If the scope was opened with a {@linkplain
|
||||
* Configuration#withTimeout(Duration) timeout}, and the timeout expires before or
|
||||
* while waiting for all subtasks to complete, then the {@code join} method throws
|
||||
* {@code TimeoutException}.
|
||||
*
|
||||
* @apiNote Joiners returned by this method are suited to cases where subtasks
|
||||
* return results of different types. Joiners returned by {@link #allSuccessfulOrThrow()}
|
||||
@ -634,6 +682,9 @@ public sealed interface StructuredTaskScope<T, R>
|
||||
* The {@code Joiner} does not cancel the scope if a subtask fails.
|
||||
*
|
||||
* <p> The joiner's {@link Joiner#result() result} method returns {@code null}.
|
||||
* If the scope was opened with a {@linkplain Configuration#withTimeout(Duration)
|
||||
* timeout}, and the timeout expires before or while waiting for all subtasks to
|
||||
* complete, then the {@code join} method throws {@code TimeoutException}.
|
||||
*
|
||||
* @apiNote This Joiner is useful for cases where subtasks make use of
|
||||
* <em>side-effects</em> rather than return results or fail with exceptions.
|
||||
@ -668,33 +719,40 @@ public sealed interface StructuredTaskScope<T, R>
|
||||
}
|
||||
|
||||
/**
|
||||
* {@return a new Joiner object that yields a stream of all subtasks when all
|
||||
* {@return a new Joiner object that yields a list of all subtasks when all
|
||||
* subtasks complete or a predicate returns {@code true} to cancel the scope}
|
||||
*
|
||||
* <p> The joiner's {@link Joiner#onComplete(Subtask)} method invokes the
|
||||
* predicate's {@link Predicate#test(Object) test} method with the subtask that
|
||||
* completed successfully or failed with an exception. If the {@code test} method
|
||||
* returns {@code true} then <a href="StructuredTaskScope.html#Cancallation">
|
||||
* the scope is cancelled</a>. The {@code test} method must be thread safe as it
|
||||
* <p> The joiner's {@link #onComplete(Subtask)} method invokes the predicate's
|
||||
* {@link Predicate#test(Object) test} method with the subtask that completed
|
||||
* successfully or failed with an exception. If the {@code test} method
|
||||
* returns {@code true} then {@linkplain StructuredTaskScope##Cancellation
|
||||
* the scope is cancelled}. The {@code test} method must be thread safe as it
|
||||
* may be invoked concurrently from several threads. If the {@code test} method
|
||||
* completes with an exception or error, then the thread that executed the subtask
|
||||
* invokes the {@linkplain Thread.UncaughtExceptionHandler uncaught exception handler}
|
||||
* with the exception or error before the thread terminates.
|
||||
*
|
||||
* <p> The joiner's {@link #result()} method returns the stream of all subtasks,
|
||||
* in fork order. The stream may contain subtasks that have completed
|
||||
* <p> The joiner's {@link #result()} method returns the list of all subtasks,
|
||||
* in fork order. The list may contain subtasks that have completed
|
||||
* (in {@link Subtask.State#SUCCESS SUCCESS} or {@link Subtask.State#FAILED FAILED}
|
||||
* state) or subtasks in the {@link Subtask.State#UNAVAILABLE UNAVAILABLE} state
|
||||
* if the scope was cancelled before all subtasks were forked or completed.
|
||||
*
|
||||
* <p> The joiner's {@link #onTimeout()} method does nothing. If configured with
|
||||
* a {@linkplain Configuration#withTimeout(Duration) timeout}, and the timeout
|
||||
* expires before or while waiting in {@link StructuredTaskScope#join() join},
|
||||
* then the {@link #result()} method returns the list of all subtasks.
|
||||
* Subtasks that did not complete before the timeout expired will be in the
|
||||
* {@link Subtask.State#UNAVAILABLE UNAVAILABLE} state.
|
||||
*
|
||||
* <p> The following example uses this method to create a {@code Joiner} that
|
||||
* <a href="StructuredTaskScope.html#Cancallation">cancels</a> the scope when
|
||||
* two or more subtasks fail.
|
||||
* {@linkplain StructuredTaskScope##Cancellation cancels} the scope when two or
|
||||
* more subtasks fail.
|
||||
* {@snippet lang=java :
|
||||
* class CancelAfterTwoFailures<T> implements Predicate<Subtask<? extends T>> {
|
||||
* class CancelAfterTwoFailures<T> implements Predicate<Subtask<T>> {
|
||||
* private final AtomicInteger failedCount = new AtomicInteger();
|
||||
* @Override
|
||||
* public boolean test(Subtask<? extends T> subtask) {
|
||||
* public boolean test(Subtask<T> subtask) {
|
||||
* return subtask.state() == Subtask.State.FAILED
|
||||
* && failedCount.incrementAndGet() >= 2;
|
||||
* }
|
||||
@ -710,15 +768,30 @@ public sealed interface StructuredTaskScope<T, R>
|
||||
* <T> List<Subtask<T>> invokeAll(Collection<Callable<T>> tasks) throws InterruptedException {
|
||||
* try (var scope = StructuredTaskScope.open(Joiner.<T>allUntil(_ -> false))) {
|
||||
* tasks.forEach(scope::fork);
|
||||
* return scope.join().toList();
|
||||
* return scope.join();
|
||||
* }
|
||||
* }
|
||||
* }
|
||||
*
|
||||
* <p> The following example uses {@code allUntil} to get the results of all
|
||||
* subtasks that complete successfully within a timeout period.
|
||||
* {@snippet lang=java :
|
||||
* <T> List<T> invokeAll(Collection<Callable<T>> tasks, Duration timeout) throws InterruptedException {
|
||||
* try (var scope = StructuredTaskScope.open(Joiner.<T>allUntil(_ -> false), cf -> cf.withTimeout(timeout))) {
|
||||
* tasks.forEach(scope::fork);
|
||||
* return scope.join()
|
||||
* .stream()
|
||||
* .filter(s -> s.state() == Subtask.State.SUCCESS)
|
||||
* .map(Subtask::get)
|
||||
* .toList();
|
||||
* }
|
||||
* }
|
||||
* }
|
||||
*
|
||||
* @param isDone the predicate to evaluate completed subtasks
|
||||
* @param <T> the result type of subtasks
|
||||
*/
|
||||
static <T> Joiner<T, Stream<Subtask<T>>> allUntil(Predicate<Subtask<? extends T>> isDone) {
|
||||
static <T> Joiner<T, List<Subtask<T>>> allUntil(Predicate<Subtask<T>> isDone) {
|
||||
return new Joiners.AllSubtasks<>(isDone);
|
||||
}
|
||||
}
|
||||
@ -727,19 +800,13 @@ public sealed interface StructuredTaskScope<T, R>
|
||||
* Represents the configuration for a {@code StructuredTaskScope}.
|
||||
*
|
||||
* <p> The configuration for a {@code StructuredTaskScope} consists of a {@link
|
||||
* ThreadFactory} to create threads, an optional name for the purposes of monitoring
|
||||
* and management, and an optional timeout.
|
||||
* ThreadFactory} to create threads, an optional name for the scope, and an optional
|
||||
* timeout. The name is intended for monitoring and management purposes.
|
||||
*
|
||||
* <p> Creating a {@code StructuredTaskScope} with {@link #open()} or {@link #open(Joiner)}
|
||||
* uses the <a href="StructuredTaskScope.html#DefaultConfiguration">default
|
||||
* configuration</a>. The default configuration consists of a thread factory that
|
||||
* creates unnamed <a href="{@docRoot}/java.base/java/lang/Thread.html#virtual-threads">
|
||||
* virtual threads</a>, no name for monitoring and management purposes, and no timeout.
|
||||
*
|
||||
* <p> Creating a {@code StructuredTaskScope} with its 2-arg {@link #open(Joiner, Function)
|
||||
* open} method allows a different configuration to be used. The function specified
|
||||
* <p> Creating a {@code StructuredTaskScope} with its 2-arg {@link #open(Joiner, UnaryOperator)
|
||||
* open} method allows a different configuration to be used. The operator specified
|
||||
* to the {@code open} method is applied to the default configuration and returns the
|
||||
* configuration for the {@code StructuredTaskScope} under construction. The function
|
||||
* configuration for the {@code StructuredTaskScope} under construction. The operator
|
||||
* can use the {@code with-} prefixed methods defined here to specify the components
|
||||
* of the configuration to use.
|
||||
*
|
||||
@ -756,17 +823,17 @@ public sealed interface StructuredTaskScope<T, R>
|
||||
* a scope to create threads when {@linkplain #fork(Callable) forking} subtasks.
|
||||
* @param threadFactory the thread factory
|
||||
*
|
||||
* @apiNote The thread factory will typically create
|
||||
* <a href="{@docRoot}/java.base/java/lang/Thread.html#virtual-threads">virtual threads</a>,
|
||||
* maybe with names for monitoring purposes, an {@linkplain Thread.UncaughtExceptionHandler
|
||||
* uncaught exception handler}, or other properties configured.
|
||||
* @apiNote The thread factory will typically create {@linkplain Thread##virtual-threads
|
||||
* virtual threads}, maybe with names for monitoring purposes, an {@linkplain
|
||||
* Thread.UncaughtExceptionHandler uncaught exception handler}, or other properties
|
||||
* configured.
|
||||
*
|
||||
* @see #fork(Callable)
|
||||
*/
|
||||
Configuration withThreadFactory(ThreadFactory threadFactory);
|
||||
|
||||
/**
|
||||
* {@return a new {@code Configuration} object with the given name}
|
||||
* {@return a new {@code Configuration} object with the given scope name}
|
||||
* The other components are the same as this object. A scope is optionally
|
||||
* named for the purposes of monitoring and management.
|
||||
* @param name the name
|
||||
@ -783,6 +850,7 @@ public sealed interface StructuredTaskScope<T, R>
|
||||
* compute the timeout for this method.
|
||||
*
|
||||
* @see #join()
|
||||
* @see Joiner#onTimeout()
|
||||
*/
|
||||
Configuration withTimeout(Duration timeout);
|
||||
}
|
||||
@ -809,11 +877,13 @@ public sealed interface StructuredTaskScope<T, R>
|
||||
}
|
||||
|
||||
/**
|
||||
* Exception thrown by {@link #join()} if the scope was created with a timeout and
|
||||
* the timeout expired before or while waiting in {@code join}.
|
||||
* Exception thrown by {@link #join()} if the scope was opened with a timeout,
|
||||
* the timeout expired before or while waiting in {@code join}, and the {@link
|
||||
* Joiner#onTimeout() Joiner.onTimeout} method throws this exception.
|
||||
*
|
||||
* @since 25
|
||||
* @see Configuration#withTimeout(Duration)
|
||||
* @see Joiner#onTimeout()
|
||||
*/
|
||||
@PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY)
|
||||
final class TimeoutException extends RuntimeException {
|
||||
@ -828,14 +898,14 @@ public sealed interface StructuredTaskScope<T, R>
|
||||
|
||||
/**
|
||||
* Opens a new {@code StructuredTaskScope} to use the given {@code Joiner} object and
|
||||
* with configuration that is the result of applying the given function to the
|
||||
* <a href="#DefaultConfiguration">default configuration</a>.
|
||||
* with configuration that is the result of applying the given operator to the
|
||||
* {@linkplain ##DefaultConfiguration default configuration}.
|
||||
*
|
||||
* <p> The {@code configFunction} is called with the default configuration and returns
|
||||
* the configuration for the new scope. The function may, for example, set the
|
||||
* <p> The {@code configOperator} is called with the default configuration and returns
|
||||
* the configuration for the new scope. The operator may, for example, set the
|
||||
* {@linkplain Configuration#withThreadFactory(ThreadFactory) ThreadFactory} or set a
|
||||
* {@linkplain Configuration#withTimeout(Duration) timeout}. If the function completes
|
||||
* with an exception or error then it is propagated by this method. If the function
|
||||
* {@linkplain Configuration#withTimeout(Duration) timeout}. If the operator completes
|
||||
* with an exception or error then it is propagated by this method. If the operator
|
||||
* returns {@code null} then {@code NullPointerException} is thrown.
|
||||
*
|
||||
* <p> If a {@code ThreadFactory} is set then its {@link ThreadFactory#newThread(Runnable)
|
||||
@ -845,8 +915,9 @@ public sealed interface StructuredTaskScope<T, R>
|
||||
*
|
||||
* <p> If a {@linkplain Configuration#withTimeout(Duration) timeout} is set then it
|
||||
* starts when the scope is opened. If the timeout expires before the scope has
|
||||
* {@linkplain #join() joined} then the scope is <a href="#Cancallation">cancelled</a>
|
||||
* and the {@code join} method throws {@link TimeoutException}.
|
||||
* {@linkplain #join() joined} then the scope is {@linkplain ##Cancellation cancelled}
|
||||
* and the {@code Joiner}'s {@link Joiner#onTimeout() onTimeout()} method is invoked
|
||||
* to optionally throw {@link TimeoutException TimeoutException}.
|
||||
*
|
||||
* <p> The new scope is owned by the current thread. Only code executing in this
|
||||
* thread can {@linkplain #fork(Callable) fork}, {@linkplain #join() join}, or
|
||||
@ -856,27 +927,27 @@ public sealed interface StructuredTaskScope<T, R>
|
||||
* value} bindings for inheritance by threads started in the scope.
|
||||
*
|
||||
* @param joiner the joiner
|
||||
* @param configFunction a function to produce the configuration
|
||||
* @param configOperator the operator to produce the configuration
|
||||
* @return a new scope
|
||||
* @param <T> the result type of subtasks executed in the scope
|
||||
* @param <R> the result type of the scope
|
||||
* @since 25
|
||||
* @since 26
|
||||
*/
|
||||
static <T, R> StructuredTaskScope<T, R> open(Joiner<? super T, ? extends R> joiner,
|
||||
Function<Configuration, Configuration> configFunction) {
|
||||
return StructuredTaskScopeImpl.open(joiner, configFunction);
|
||||
UnaryOperator<Configuration> configOperator) {
|
||||
return StructuredTaskScopeImpl.open(joiner, configOperator);
|
||||
}
|
||||
|
||||
/**
|
||||
* Opens a new {@code StructuredTaskScope}to use the given {@code Joiner} object. The
|
||||
* scope is created with the <a href="#DefaultConfiguration">default configuration</a>.
|
||||
* scope is created with the {@linkplain ##DefaultConfiguration default configuration}.
|
||||
* The default configuration has a {@code ThreadFactory} that creates unnamed
|
||||
* <a href="{@docRoot}/java.base/java/lang/Thread.html#virtual-threads">virtual threads</a>,
|
||||
* is unnamed for monitoring and management purposes, and has no timeout.
|
||||
* {@linkplain Thread##irtual-threads virtual threads}, does not name the scope, and
|
||||
* has no timeout.
|
||||
*
|
||||
* @implSpec
|
||||
* This factory method is equivalent to invoking the 2-arg open method with the given
|
||||
* joiner and the {@linkplain Function#identity() identity function}.
|
||||
* joiner and the {@linkplain UnaryOperator#identity() identity operator}.
|
||||
*
|
||||
* @param joiner the joiner
|
||||
* @return a new scope
|
||||
@ -885,7 +956,7 @@ public sealed interface StructuredTaskScope<T, R>
|
||||
* @since 25
|
||||
*/
|
||||
static <T, R> StructuredTaskScope<T, R> open(Joiner<? super T, ? extends R> joiner) {
|
||||
return open(joiner, Function.identity());
|
||||
return open(joiner, UnaryOperator.identity());
|
||||
}
|
||||
|
||||
/**
|
||||
@ -897,22 +968,22 @@ public sealed interface StructuredTaskScope<T, R>
|
||||
* It throws {@link FailedException} if any subtask fails, with the exception from
|
||||
* the first subtask to fail as the cause.
|
||||
*
|
||||
* <p> The scope is created with the <a href="#DefaultConfiguration">default
|
||||
* configuration</a>. The default configuration has a {@code ThreadFactory} that creates
|
||||
* unnamed <a href="{@docRoot}/java.base/java/lang/Thread.html#virtual-threads">virtual
|
||||
* threads</a>, is unnamed for monitoring and management purposes, and has no timeout.
|
||||
* <p> The scope is created with the {@linkplain ##DefaultConfiguration default
|
||||
* configuration}. The default configuration has a {@code ThreadFactory} that creates
|
||||
* unnamed {@linkplain Thread##virtual-threads virtual threads}, does not name the
|
||||
* scope, and has no timeout.
|
||||
*
|
||||
* @implSpec
|
||||
* This factory method is equivalent to invoking the 2-arg open method with a joiner
|
||||
* created with {@link Joiner#awaitAllSuccessfulOrThrow() awaitAllSuccessfulOrThrow()}
|
||||
* and the {@linkplain Function#identity() identity function}.
|
||||
* and the {@linkplain UnaryOperator#identity() identity operator}.
|
||||
*
|
||||
* @param <T> the result type of subtasks
|
||||
* @return a new scope
|
||||
* @since 25
|
||||
*/
|
||||
static <T> StructuredTaskScope<T, Void> open() {
|
||||
return open(Joiner.awaitAllSuccessfulOrThrow(), Function.identity());
|
||||
return open(Joiner.awaitAllSuccessfulOrThrow(), UnaryOperator.identity());
|
||||
}
|
||||
|
||||
/**
|
||||
@ -926,7 +997,7 @@ public sealed interface StructuredTaskScope<T, R>
|
||||
* method with the subtask in the {@link Subtask.State#UNAVAILABLE UNAVAILABLE} state.
|
||||
* If the {@code onFork} completes with an exception or error then it is propagated by
|
||||
* the {@code fork} method without creating a thread. If the scope is already
|
||||
* <a href="#Cancallation">cancelled</a>, or {@code onFork} returns {@code true} to
|
||||
* {@linkplain ##Cancellation cancelled}, or {@code onFork} returns {@code true} to
|
||||
* cancel the scope, then this method returns the {@code Subtask}, in the
|
||||
* {@link Subtask.State#UNAVAILABLE UNAVAILABLE} state, without creating a thread to
|
||||
* execute the subtask.
|
||||
@ -993,32 +1064,42 @@ public sealed interface StructuredTaskScope<T, R>
|
||||
|
||||
/**
|
||||
* Returns the result, or throws, after waiting for all subtasks to complete or
|
||||
* the scope to be <a href="#Cancallation">cancelled</a>.
|
||||
* the scope to be {@linkplain ##Cancellation cancelled}.
|
||||
*
|
||||
* <p> This method waits for all subtasks started in this scope to complete or the
|
||||
* scope to be cancelled. If a {@linkplain Configuration#withTimeout(Duration) timeout}
|
||||
* is configured and the timeout expires before or while waiting, then the scope is
|
||||
* cancelled and {@link TimeoutException TimeoutException} is thrown. Once finished
|
||||
* waiting, the {@code Joiner}'s {@link Joiner#result() result()} method is invoked
|
||||
* to get the result or throw an exception. If the {@code result()} method throws
|
||||
* then this method throws {@code FailedException} with the exception as the cause.
|
||||
* scope to be cancelled. Once finished waiting, the {@code Joiner}'s {@link
|
||||
* Joiner#result() result()} method is invoked to get the result or throw an exception.
|
||||
* If the {@code result()} method throws then {@code join()} throws
|
||||
* {@code FailedException} with the exception from the {@code Joiner} as the cause.
|
||||
*
|
||||
* <p> This method may only be invoked by the scope owner, and only once.
|
||||
* <p> If a {@linkplain Configuration#withTimeout(Duration) timeout} is configured,
|
||||
* and the timeout expires before or while waiting, then the scope is cancelled and
|
||||
* the {@code Joiner}'s {@link Joiner#onTimeout() onTimeout()} method is invoked
|
||||
* before calling the {@code Joiner}'s {@code result()} method. If the {@code onTimeout()}
|
||||
* method throws {@link TimeoutException TimeoutException} (or throws any exception
|
||||
* or error), then it is propagated by this method. If the {@code onTimeout()} method
|
||||
* does not throw then the {@code Joiner}'s {@code result()} method is invoked to
|
||||
* get the result or throw.
|
||||
*
|
||||
* <p> This method may only be invoked by the scope owner. Once the result or
|
||||
* exception outcome is obtained, this method may not be invoked again. The only
|
||||
* case where the method may be called again is where {@code InterruptedException}
|
||||
* is thrown while waiting.
|
||||
*
|
||||
* @return the result
|
||||
* @throws WrongThreadException if the current thread is not the scope owner
|
||||
* @throws IllegalStateException if already joined or this scope is closed
|
||||
* @throws FailedException if the <i>outcome</i> is an exception, thrown with the
|
||||
* exception from {@link Joiner#result() Joiner.result()} as the cause
|
||||
* @throws TimeoutException if a timeout is set and the timeout expires before or
|
||||
* while waiting
|
||||
* @throws TimeoutException if a timeout is set, the timeout expires before or while
|
||||
* waiting, and {@link Joiner#onTimeout() Joiner.onTimeout()} throws this exception
|
||||
* @throws InterruptedException if interrupted while waiting
|
||||
* @since 25
|
||||
*/
|
||||
R join() throws InterruptedException;
|
||||
|
||||
/**
|
||||
* {@return {@code true} if this scope is <a href="#Cancallation">cancelled</a> or in
|
||||
* {@return {@code true} if this scope is {@linkplain ##Cancellation cancelled} or in
|
||||
* the process of being cancelled, otherwise {@code false}}
|
||||
*
|
||||
* <p> Cancelling the scope prevents new threads from starting in the scope and
|
||||
@ -1038,11 +1119,11 @@ public sealed interface StructuredTaskScope<T, R>
|
||||
/**
|
||||
* Closes this scope.
|
||||
*
|
||||
* <p> This method first <a href="#Cancallation">cancels</a> the scope, if not
|
||||
* <p> This method first {@linkplain ##Cancellation cancels} the scope, if not
|
||||
* already cancelled. This interrupts the threads executing unfinished subtasks. This
|
||||
* method then waits for all threads to finish. If interrupted while waiting then it
|
||||
* will continue to wait until the threads finish, before completing with the interrupt
|
||||
* status set.
|
||||
* will continue to wait until the threads finish, before completing with the
|
||||
* {@linkplain Thread#isInterrupted() interrupted status} set.
|
||||
*
|
||||
* <p> This method may only be invoked by the scope owner. If the scope
|
||||
* is already closed then the scope owner invoking this method has no effect.
|
||||
@ -1069,4 +1150,4 @@ public sealed interface StructuredTaskScope<T, R>
|
||||
*/
|
||||
@Override
|
||||
void close();
|
||||
}
|
||||
}
|
||||
|
||||
@ -28,9 +28,10 @@ import java.lang.invoke.MethodHandles;
|
||||
import java.lang.invoke.VarHandle;
|
||||
import java.time.Duration;
|
||||
import java.util.Objects;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.UnaryOperator;
|
||||
import jdk.internal.misc.ThreadFlock;
|
||||
import jdk.internal.invoke.MhUtil;
|
||||
import jdk.internal.vm.annotation.Stable;
|
||||
|
||||
/**
|
||||
* StructuredTaskScope implementation.
|
||||
@ -43,20 +44,19 @@ final class StructuredTaskScopeImpl<T, R> implements StructuredTaskScope<T, R> {
|
||||
private final ThreadFactory threadFactory;
|
||||
private final ThreadFlock flock;
|
||||
|
||||
// state, only accessed by owner thread
|
||||
private static final int ST_NEW = 0,
|
||||
ST_FORKED = 1, // subtasks forked, need to join
|
||||
// scope state, set by owner thread, read by any thread
|
||||
private static final int ST_FORKED = 1, // subtasks forked, need to join
|
||||
ST_JOIN_STARTED = 2, // join started, can no longer fork
|
||||
ST_JOIN_COMPLETED = 3, // join completed
|
||||
ST_CLOSED = 4; // closed
|
||||
private int state;
|
||||
|
||||
// timer task, only accessed by owner thread
|
||||
private Future<?> timerTask;
|
||||
private volatile int state;
|
||||
|
||||
// set or read by any thread
|
||||
private volatile boolean cancelled;
|
||||
|
||||
// timer task, only accessed by owner thread
|
||||
private Future<?> timerTask;
|
||||
|
||||
// set by the timer thread, read by the owner thread
|
||||
private volatile boolean timeoutExpired;
|
||||
|
||||
@ -67,7 +67,6 @@ final class StructuredTaskScopeImpl<T, R> implements StructuredTaskScope<T, R> {
|
||||
this.joiner = joiner;
|
||||
this.threadFactory = threadFactory;
|
||||
this.flock = ThreadFlock.open((name != null) ? name : Objects.toIdentityString(this));
|
||||
this.state = ST_NEW;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -76,10 +75,10 @@ final class StructuredTaskScopeImpl<T, R> implements StructuredTaskScope<T, R> {
|
||||
* default configuration.
|
||||
*/
|
||||
static <T, R> StructuredTaskScope<T, R> open(Joiner<? super T, ? extends R> joiner,
|
||||
Function<Configuration, Configuration> configFunction) {
|
||||
UnaryOperator<Configuration> configOperator) {
|
||||
Objects.requireNonNull(joiner);
|
||||
|
||||
var config = (ConfigImpl) configFunction.apply(ConfigImpl.defaultConfig());
|
||||
var config = (ConfigImpl) configOperator.apply(ConfigImpl.defaultConfig());
|
||||
var scope = new StructuredTaskScopeImpl<T, R>(joiner, config.threadFactory(), config.name());
|
||||
|
||||
// schedule timeout
|
||||
@ -109,23 +108,10 @@ final class StructuredTaskScopeImpl<T, R> implements StructuredTaskScope<T, R> {
|
||||
}
|
||||
|
||||
/**
|
||||
* Throws IllegalStateException if already joined or scope is closed.
|
||||
* Returns true if join has been invoked and there is an outcome.
|
||||
*/
|
||||
private void ensureNotJoined() {
|
||||
assert Thread.currentThread() == flock.owner();
|
||||
if (state > ST_FORKED) {
|
||||
throw new IllegalStateException("Already joined or scope is closed");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Throws IllegalStateException if invoked by the owner thread and the owner thread
|
||||
* has not joined.
|
||||
*/
|
||||
private void ensureJoinedIfOwner() {
|
||||
if (Thread.currentThread() == flock.owner() && state <= ST_JOIN_STARTED) {
|
||||
throw new IllegalStateException("join not called");
|
||||
}
|
||||
private boolean isJoinCompleted() {
|
||||
return state >= ST_JOIN_COMPLETED;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -184,9 +170,11 @@ final class StructuredTaskScopeImpl<T, R> implements StructuredTaskScope<T, R> {
|
||||
/**
|
||||
* Invoked by the thread for a subtask when the subtask completes before scope is cancelled.
|
||||
*/
|
||||
private void onComplete(SubtaskImpl<? extends T> subtask) {
|
||||
private <U extends T> void onComplete(SubtaskImpl<U> subtask) {
|
||||
assert subtask.state() != Subtask.State.UNAVAILABLE;
|
||||
if (joiner.onComplete(subtask)) {
|
||||
@SuppressWarnings("unchecked")
|
||||
var j = (Joiner<U, ? extends R>) joiner;
|
||||
if (j.onComplete(subtask)) {
|
||||
cancel();
|
||||
}
|
||||
}
|
||||
@ -195,12 +183,17 @@ final class StructuredTaskScopeImpl<T, R> implements StructuredTaskScope<T, R> {
|
||||
public <U extends T> Subtask<U> fork(Callable<? extends U> task) {
|
||||
Objects.requireNonNull(task);
|
||||
ensureOwner();
|
||||
ensureNotJoined();
|
||||
int s = state;
|
||||
if (s > ST_FORKED) {
|
||||
throw new IllegalStateException("join already called or scope is closed");
|
||||
}
|
||||
|
||||
var subtask = new SubtaskImpl<U>(this, task);
|
||||
|
||||
// notify joiner, even if cancelled
|
||||
if (joiner.onFork(subtask)) {
|
||||
@SuppressWarnings("unchecked")
|
||||
var j = (Joiner<U, ? extends R>) joiner;
|
||||
if (j.onFork(subtask)) {
|
||||
cancel();
|
||||
}
|
||||
|
||||
@ -212,6 +205,7 @@ final class StructuredTaskScopeImpl<T, R> implements StructuredTaskScope<T, R> {
|
||||
}
|
||||
|
||||
// attempt to start the thread
|
||||
subtask.setThread(thread);
|
||||
try {
|
||||
flock.start(thread);
|
||||
} catch (IllegalStateException e) {
|
||||
@ -221,7 +215,9 @@ final class StructuredTaskScopeImpl<T, R> implements StructuredTaskScope<T, R> {
|
||||
}
|
||||
|
||||
// force owner to join
|
||||
state = ST_FORKED;
|
||||
if (s < ST_FORKED) {
|
||||
state = ST_FORKED;
|
||||
}
|
||||
return subtask;
|
||||
}
|
||||
|
||||
@ -234,23 +230,29 @@ final class StructuredTaskScopeImpl<T, R> implements StructuredTaskScope<T, R> {
|
||||
@Override
|
||||
public R join() throws InterruptedException {
|
||||
ensureOwner();
|
||||
ensureNotJoined();
|
||||
|
||||
// join started
|
||||
state = ST_JOIN_STARTED;
|
||||
if (state >= ST_JOIN_COMPLETED) {
|
||||
throw new IllegalStateException("Already joined or scope is closed");
|
||||
}
|
||||
|
||||
// wait for all subtasks, the scope to be cancelled, or interrupt
|
||||
flock.awaitAll();
|
||||
|
||||
// throw if timeout expired
|
||||
if (timeoutExpired) {
|
||||
throw new TimeoutException();
|
||||
try {
|
||||
flock.awaitAll();
|
||||
} catch (InterruptedException e) {
|
||||
state = ST_JOIN_STARTED; // joining not completed, prevent new forks
|
||||
throw e;
|
||||
}
|
||||
cancelTimeout();
|
||||
|
||||
// all subtasks completed or cancelled
|
||||
// all subtasks completed or scope cancelled
|
||||
state = ST_JOIN_COMPLETED;
|
||||
|
||||
// invoke joiner onTimeout if timeout expired
|
||||
if (timeoutExpired) {
|
||||
cancel(); // ensure cancelled before calling onTimeout
|
||||
joiner.onTimeout();
|
||||
} else {
|
||||
cancelTimeout();
|
||||
}
|
||||
|
||||
// invoke joiner to get result
|
||||
try {
|
||||
return joiner.result();
|
||||
@ -311,14 +313,37 @@ final class StructuredTaskScopeImpl<T, R> implements StructuredTaskScope<T, R> {
|
||||
private final StructuredTaskScopeImpl<? super T, ?> scope;
|
||||
private final Callable<? extends T> task;
|
||||
private volatile Object result;
|
||||
@Stable private Thread thread;
|
||||
|
||||
SubtaskImpl(StructuredTaskScopeImpl<? super T, ?> scope, Callable<? extends T> task) {
|
||||
this.scope = scope;
|
||||
this.task = task;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the thread for this subtask.
|
||||
*/
|
||||
void setThread(Thread thread) {
|
||||
assert thread.getState() == Thread.State.NEW;
|
||||
this.thread = thread;
|
||||
}
|
||||
|
||||
/**
|
||||
* Throws IllegalStateException if the caller thread is not the subtask and
|
||||
* the scope owner has not joined.
|
||||
*/
|
||||
private void ensureJoinedIfNotSubtask() {
|
||||
if (Thread.currentThread() != thread && !scope.isJoinCompleted()) {
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
if (Thread.currentThread() != thread) {
|
||||
throw new WrongThreadException();
|
||||
}
|
||||
|
||||
T result = null;
|
||||
Throwable ex = null;
|
||||
try {
|
||||
@ -355,7 +380,7 @@ final class StructuredTaskScopeImpl<T, R> implements StructuredTaskScope<T, R> {
|
||||
|
||||
@Override
|
||||
public T get() {
|
||||
scope.ensureJoinedIfOwner();
|
||||
ensureJoinedIfNotSubtask();
|
||||
Object result = this.result;
|
||||
if (result instanceof AltResult) {
|
||||
if (result == RESULT_NULL) return null;
|
||||
@ -370,7 +395,7 @@ final class StructuredTaskScopeImpl<T, R> implements StructuredTaskScope<T, R> {
|
||||
|
||||
@Override
|
||||
public Throwable exception() {
|
||||
scope.ensureJoinedIfOwner();
|
||||
ensureJoinedIfNotSubtask();
|
||||
Object result = this.result;
|
||||
if (result instanceof AltResult alt && alt.state() == State.FAILED) {
|
||||
return alt.exception();
|
||||
|
||||
@ -83,7 +83,7 @@ public @interface PreviewFeature {
|
||||
// the bootstrap JDK, the CLASSFILE_API enum constant became eligible for removal.
|
||||
|
||||
//---
|
||||
@JEP(number=505, title="Structured Concurrency", status="Fifth Preview")
|
||||
@JEP(number=525, title="Structured Concurrency", status="Sixth Preview")
|
||||
STRUCTURED_CONCURRENCY,
|
||||
@JEP(number = 502, title = "Stable Values", status = "Preview")
|
||||
STABLE_VALUES,
|
||||
|
||||
@ -67,7 +67,7 @@ class StressCancellation {
|
||||
void test(ThreadFactory factory, int beforeCancel, int afterCancel) throws Exception {
|
||||
var joiner = new Joiner<Boolean, Void>() {
|
||||
@Override
|
||||
public boolean onComplete(Subtask<? extends Boolean> subtask) {
|
||||
public boolean onComplete(Subtask<Boolean> subtask) {
|
||||
boolean cancel = subtask.get();
|
||||
return cancel;
|
||||
}
|
||||
|
||||
@ -61,8 +61,8 @@ import java.util.concurrent.StructureViolationException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.function.UnaryOperator;
|
||||
import java.util.stream.Stream;
|
||||
import static java.lang.Thread.State.*;
|
||||
|
||||
@ -209,7 +209,7 @@ class StructuredTaskScopeTest {
|
||||
*/
|
||||
@ParameterizedTest
|
||||
@MethodSource("factories")
|
||||
void testForkAfterJoin1(ThreadFactory factory) throws Exception {
|
||||
void testForkAfterJoinCompleted1(ThreadFactory factory) throws Exception {
|
||||
try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
|
||||
cf -> cf.withThreadFactory(factory))) {
|
||||
scope.join();
|
||||
@ -222,7 +222,7 @@ class StructuredTaskScopeTest {
|
||||
*/
|
||||
@ParameterizedTest
|
||||
@MethodSource("factories")
|
||||
void testForkAfterJoin2(ThreadFactory factory) throws Exception {
|
||||
void testForkAfterJoinCompleted2(ThreadFactory factory) throws Exception {
|
||||
try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
|
||||
cf -> cf.withThreadFactory(factory))) {
|
||||
scope.fork(() -> "foo");
|
||||
@ -232,16 +232,15 @@ class StructuredTaskScopeTest {
|
||||
}
|
||||
|
||||
/**
|
||||
* Test fork after join throws.
|
||||
* Test fork after join interrupted.
|
||||
*/
|
||||
@ParameterizedTest
|
||||
@MethodSource("factories")
|
||||
void testForkAfterJoinThrows(ThreadFactory factory) throws Exception {
|
||||
void testForkAfterJoinInterrupted(ThreadFactory factory) throws Exception {
|
||||
try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
|
||||
cf -> cf.withThreadFactory(factory))) {
|
||||
var latch = new CountDownLatch(1);
|
||||
var subtask1 = scope.fork(() -> {
|
||||
latch.await();
|
||||
Thread.sleep(Duration.ofDays(1));
|
||||
return "foo";
|
||||
});
|
||||
|
||||
@ -254,6 +253,25 @@ class StructuredTaskScopeTest {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test fork after join timeout.
|
||||
*/
|
||||
@ParameterizedTest
|
||||
@MethodSource("factories")
|
||||
void testForkAfterJoinTimeout(ThreadFactory factory) throws Exception {
|
||||
try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
|
||||
cf -> cf.withThreadFactory(factory)
|
||||
.withTimeout(Duration.ofMillis(100)))) {
|
||||
awaitCancelled(scope);
|
||||
|
||||
// join throws
|
||||
assertThrows(TimeoutException.class, scope::join);
|
||||
|
||||
// fork should throw
|
||||
assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar"));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test fork after task scope is cancelled. This test uses a custom Joiner to
|
||||
* cancel execution.
|
||||
@ -296,9 +314,11 @@ class StructuredTaskScopeTest {
|
||||
/**
|
||||
* Test fork after task scope is closed.
|
||||
*/
|
||||
@Test
|
||||
void testForkAfterClose() {
|
||||
try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
|
||||
@ParameterizedTest
|
||||
@MethodSource("factories")
|
||||
void testForkAfterClose(ThreadFactory factory) {
|
||||
try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
|
||||
cf -> cf.withThreadFactory(factory))) {
|
||||
scope.close();
|
||||
assertThrows(IllegalStateException.class, () -> scope.fork(() -> null));
|
||||
}
|
||||
@ -366,7 +386,7 @@ class StructuredTaskScopeTest {
|
||||
*/
|
||||
@Test
|
||||
void testJoinAfterJoin2() throws Exception {
|
||||
try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulResultOrThrow())) {
|
||||
try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulOrThrow())) {
|
||||
scope.fork(() -> { throw new FooException(); });
|
||||
Throwable ex = assertThrows(FailedException.class, scope::join);
|
||||
assertTrue(ex.getCause() instanceof FooException);
|
||||
@ -378,12 +398,39 @@ class StructuredTaskScopeTest {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test join after join interrupted.
|
||||
*/
|
||||
@Test
|
||||
void testJoinAfterJoinInterrupted() throws Exception {
|
||||
try (var scope = StructuredTaskScope.open()) {
|
||||
var latch = new CountDownLatch(1);
|
||||
var subtask = scope.fork(() -> {
|
||||
latch.await();
|
||||
return "foo";
|
||||
});
|
||||
|
||||
// join throws InterruptedException
|
||||
Thread.currentThread().interrupt();
|
||||
assertThrows(InterruptedException.class, scope::join);
|
||||
|
||||
latch.countDown();
|
||||
|
||||
// retry join to get result
|
||||
scope.join();
|
||||
assertEquals("foo", subtask.get());
|
||||
|
||||
// retry after otbaining result
|
||||
assertThrows(IllegalStateException.class, scope::join);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test join after join completed with a timeout.
|
||||
*/
|
||||
@Test
|
||||
void testJoinAfterJoin3() throws Exception {
|
||||
try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulResultOrThrow(),
|
||||
void testJoinAfterJoinTimeout() throws Exception {
|
||||
try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulOrThrow(),
|
||||
cf -> cf.withTimeout(Duration.ofMillis(100)))) {
|
||||
// wait for scope to be cancelled by timeout
|
||||
awaitCancelled(scope);
|
||||
@ -396,6 +443,35 @@ class StructuredTaskScopeTest {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test join invoked from Joiner.onTimeout.
|
||||
*/
|
||||
@Test
|
||||
void testJoinInOnTimeout() throws Exception {
|
||||
Thread owner = Thread.currentThread();
|
||||
var scopeRef = new AtomicReference<StructuredTaskScope<?, ?>>();
|
||||
|
||||
var joiner = new Joiner<String, Void>() {
|
||||
@Override
|
||||
public void onTimeout() {
|
||||
assertTrue(Thread.currentThread() == owner);
|
||||
var scope = scopeRef.get();
|
||||
assertThrows(IllegalStateException.class, scope::join);
|
||||
}
|
||||
@Override
|
||||
public Void result() {
|
||||
return null;
|
||||
}
|
||||
};
|
||||
|
||||
try (var scope = StructuredTaskScope.open(joiner,
|
||||
cf -> cf.withTimeout(Duration.ofMillis(100)))) {
|
||||
awaitCancelled(scope);
|
||||
scopeRef.set(scope);
|
||||
scope.join(); // invokes onTimeout
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test join method is owner confined.
|
||||
*/
|
||||
@ -434,7 +510,7 @@ class StructuredTaskScopeTest {
|
||||
cf -> cf.withThreadFactory(factory))) {
|
||||
|
||||
Subtask<String> subtask = scope.fork(() -> {
|
||||
Thread.sleep(60_000);
|
||||
Thread.sleep(Duration.ofDays(1));
|
||||
return "foo";
|
||||
});
|
||||
|
||||
@ -457,10 +533,8 @@ class StructuredTaskScopeTest {
|
||||
void testInterruptJoin2(ThreadFactory factory) throws Exception {
|
||||
try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
|
||||
cf -> cf.withThreadFactory(factory))) {
|
||||
|
||||
var latch = new CountDownLatch(1);
|
||||
Subtask<String> subtask = scope.fork(() -> {
|
||||
Thread.sleep(60_000);
|
||||
Thread.sleep(Duration.ofDays(1));
|
||||
return "foo";
|
||||
});
|
||||
|
||||
@ -879,7 +953,7 @@ class StructuredTaskScopeTest {
|
||||
void testOnForkThrows() throws Exception {
|
||||
var joiner = new Joiner<String, Void>() {
|
||||
@Override
|
||||
public boolean onFork(Subtask<? extends String> subtask) {
|
||||
public boolean onFork(Subtask<String> subtask) {
|
||||
throw new FooException();
|
||||
}
|
||||
@Override
|
||||
@ -899,7 +973,7 @@ class StructuredTaskScopeTest {
|
||||
void testOnForkCancelsExecution() throws Exception {
|
||||
var joiner = new Joiner<String, Void>() {
|
||||
@Override
|
||||
public boolean onFork(Subtask<? extends String> subtask) {
|
||||
public boolean onFork(Subtask<String> subtask) {
|
||||
return true;
|
||||
}
|
||||
@Override
|
||||
@ -922,7 +996,7 @@ class StructuredTaskScopeTest {
|
||||
void testOnCompleteThrows() throws Exception {
|
||||
var joiner = new Joiner<String, Void>() {
|
||||
@Override
|
||||
public boolean onComplete(Subtask<? extends String> subtask) {
|
||||
public boolean onComplete(Subtask<String> subtask) {
|
||||
throw new FooException();
|
||||
}
|
||||
@Override
|
||||
@ -949,7 +1023,7 @@ class StructuredTaskScopeTest {
|
||||
void testOnCompleteCancelsExecution() throws Exception {
|
||||
var joiner = new Joiner<String, Void>() {
|
||||
@Override
|
||||
public boolean onComplete(Subtask<? extends String> subtask) {
|
||||
public boolean onComplete(Subtask<String> subtask) {
|
||||
return true;
|
||||
}
|
||||
@Override
|
||||
@ -965,6 +1039,66 @@ class StructuredTaskScopeTest {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test Joiner.onTimeout invoked by owner thread when timeout expires.
|
||||
*/
|
||||
@Test
|
||||
void testOnTimeoutInvoked() throws Exception {
|
||||
var scopeRef = new AtomicReference<StructuredTaskScope<?, ?>>();
|
||||
Thread owner = Thread.currentThread();
|
||||
var invokeCount = new AtomicInteger();
|
||||
var joiner = new Joiner<String, Void>() {
|
||||
@Override
|
||||
public void onTimeout() {
|
||||
assertTrue(Thread.currentThread() == owner);
|
||||
assertTrue(scopeRef.get().isCancelled());
|
||||
invokeCount.incrementAndGet();
|
||||
}
|
||||
@Override
|
||||
public Void result() {
|
||||
return null;
|
||||
}
|
||||
};
|
||||
try (var scope = StructuredTaskScope.open(joiner,
|
||||
cf -> cf.withTimeout(Duration.ofMillis(100)))) {
|
||||
scopeRef.set(scope);
|
||||
scope.fork(() -> {
|
||||
Thread.sleep(Duration.ofDays(1));
|
||||
return null;
|
||||
});
|
||||
scope.join();
|
||||
assertEquals(1, invokeCount.get());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test Joiner.onTimeout throwing an excepiton.
|
||||
*/
|
||||
@Test
|
||||
void testOnTimeoutThrows() throws Exception {
|
||||
var joiner = new Joiner<String, Void>() {
|
||||
@Override
|
||||
public void onTimeout() {
|
||||
throw new FooException();
|
||||
}
|
||||
@Override
|
||||
public Void result() {
|
||||
return null;
|
||||
}
|
||||
};
|
||||
try (var scope = StructuredTaskScope.open(joiner,
|
||||
cf -> cf.withTimeout(Duration.ofMillis(100)))) {
|
||||
// wait for scope to be cancelled by timeout
|
||||
awaitCancelled(scope);
|
||||
|
||||
// join should throw FooException on first usage
|
||||
assertThrows(FooException.class, scope::join);
|
||||
|
||||
// retry after onTimeout fails
|
||||
assertThrows(IllegalStateException.class, scope::join);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test toString.
|
||||
*/
|
||||
@ -990,19 +1124,27 @@ class StructuredTaskScopeTest {
|
||||
void testSubtaskWhenSuccess(ThreadFactory factory) throws Exception {
|
||||
try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAll(),
|
||||
cf -> cf.withThreadFactory(factory))) {
|
||||
|
||||
Subtask<String> subtask = scope.fork(() -> "foo");
|
||||
|
||||
// before join
|
||||
// before join, owner thread
|
||||
assertThrows(IllegalStateException.class, subtask::get);
|
||||
assertThrows(IllegalStateException.class, subtask::exception);
|
||||
|
||||
// before join, another thread
|
||||
assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::get));
|
||||
assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::exception));
|
||||
|
||||
scope.join();
|
||||
|
||||
// after join
|
||||
assertEquals(Subtask.State.SUCCESS, subtask.state());
|
||||
|
||||
// after join, owner thread
|
||||
assertEquals("foo", subtask.get());
|
||||
assertThrows(IllegalStateException.class, subtask::exception);
|
||||
|
||||
// after join, another thread
|
||||
assertEquals("foo", callInOtherThread(subtask::get));
|
||||
assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::exception));
|
||||
}
|
||||
}
|
||||
|
||||
@ -1017,16 +1159,25 @@ class StructuredTaskScopeTest {
|
||||
|
||||
Subtask<String> subtask = scope.fork(() -> { throw new FooException(); });
|
||||
|
||||
// before join
|
||||
// before join, owner thread
|
||||
assertThrows(IllegalStateException.class, subtask::get);
|
||||
assertThrows(IllegalStateException.class, subtask::exception);
|
||||
|
||||
// before join, another thread
|
||||
assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::get));
|
||||
assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::exception));
|
||||
|
||||
scope.join();
|
||||
|
||||
// after join
|
||||
assertEquals(Subtask.State.FAILED, subtask.state());
|
||||
|
||||
// after join, owner thread
|
||||
assertThrows(IllegalStateException.class, subtask::get);
|
||||
assertTrue(subtask.exception() instanceof FooException);
|
||||
|
||||
// after join, another thread
|
||||
assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::get));
|
||||
assertTrue(callInOtherThread(subtask::exception) instanceof FooException);
|
||||
}
|
||||
}
|
||||
|
||||
@ -1042,20 +1193,29 @@ class StructuredTaskScopeTest {
|
||||
Thread.sleep(Duration.ofDays(1));
|
||||
return null;
|
||||
});
|
||||
|
||||
// before join
|
||||
assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
|
||||
|
||||
// before join, owner thread
|
||||
assertThrows(IllegalStateException.class, subtask::get);
|
||||
assertThrows(IllegalStateException.class, subtask::exception);
|
||||
|
||||
// before join, another thread
|
||||
assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::get));
|
||||
assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::exception));
|
||||
|
||||
// attempt join, join throws
|
||||
Thread.currentThread().interrupt();
|
||||
assertThrows(InterruptedException.class, scope::join);
|
||||
|
||||
// after join
|
||||
assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
|
||||
|
||||
// after join, owner thread
|
||||
assertThrows(IllegalStateException.class, subtask::get);
|
||||
assertThrows(IllegalStateException.class, subtask::exception);
|
||||
|
||||
// before join, another thread
|
||||
assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::get));
|
||||
assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::exception));
|
||||
}
|
||||
}
|
||||
|
||||
@ -1071,17 +1231,25 @@ class StructuredTaskScopeTest {
|
||||
|
||||
var subtask = scope.fork(() -> "foo");
|
||||
|
||||
// before join
|
||||
assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
|
||||
// before join, owner thread
|
||||
assertThrows(IllegalStateException.class, subtask::get);
|
||||
assertThrows(IllegalStateException.class, subtask::exception);
|
||||
|
||||
// before join, another thread
|
||||
assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::get));
|
||||
assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::exception));
|
||||
|
||||
scope.join();
|
||||
|
||||
// after join
|
||||
assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
|
||||
|
||||
// after join, owner thread
|
||||
assertThrows(IllegalStateException.class, subtask::get);
|
||||
assertThrows(IllegalStateException.class, subtask::exception);
|
||||
|
||||
// before join, another thread
|
||||
assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::get));
|
||||
assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::exception));
|
||||
}
|
||||
}
|
||||
|
||||
@ -1115,8 +1283,8 @@ class StructuredTaskScopeTest {
|
||||
@Test
|
||||
void testAllSuccessfulOrThrow1() throws Throwable {
|
||||
try (var scope = StructuredTaskScope.open(Joiner.allSuccessfulOrThrow())) {
|
||||
var subtasks = scope.join().toList();
|
||||
assertTrue(subtasks.isEmpty());
|
||||
var results = scope.join();
|
||||
assertTrue(results.isEmpty());
|
||||
}
|
||||
}
|
||||
|
||||
@ -1128,12 +1296,10 @@ class StructuredTaskScopeTest {
|
||||
void testAllSuccessfulOrThrow2(ThreadFactory factory) throws Throwable {
|
||||
try (var scope = StructuredTaskScope.open(Joiner.<String>allSuccessfulOrThrow(),
|
||||
cf -> cf.withThreadFactory(factory))) {
|
||||
var subtask1 = scope.fork(() -> "foo");
|
||||
var subtask2 = scope.fork(() -> "bar");
|
||||
var subtasks = scope.join().toList();
|
||||
assertEquals(List.of(subtask1, subtask2), subtasks);
|
||||
assertEquals("foo", subtask1.get());
|
||||
assertEquals("bar", subtask2.get());
|
||||
scope.fork(() -> "foo");
|
||||
scope.fork(() -> "bar");
|
||||
var results = scope.join();
|
||||
assertEquals(List.of("foo", "bar"), results);
|
||||
}
|
||||
}
|
||||
|
||||
@ -1157,11 +1323,52 @@ class StructuredTaskScopeTest {
|
||||
}
|
||||
|
||||
/**
|
||||
* Test Joiner.anySuccessfulResultOrThrow() with no subtasks.
|
||||
* Test Joiner.allSuccessfulOrThrow() with a timeout.
|
||||
*/
|
||||
@Test
|
||||
void testAnySuccessfulResultOrThrow1() throws Exception {
|
||||
try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulResultOrThrow())) {
|
||||
void testAllSuccessfulOrThrow4() throws Exception {
|
||||
try (var scope = StructuredTaskScope.open(Joiner.<String>allSuccessfulOrThrow(),
|
||||
cf -> cf.withTimeout(Duration.ofMillis(100)))) {
|
||||
scope.fork(() -> "foo");
|
||||
scope.fork(() -> {
|
||||
Thread.sleep(Duration.ofDays(1));
|
||||
return "bar";
|
||||
});
|
||||
assertThrows(TimeoutException.class, scope::join);
|
||||
|
||||
// retry after join throws TimeoutException
|
||||
assertThrows(IllegalStateException.class, scope::join);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test Joiner.allSuccessfulOrThrow() yields an unmodifiable list.
|
||||
*/
|
||||
@Test
|
||||
void testAllSuccessfulOrThrow5() throws Exception {
|
||||
// empty list
|
||||
try (var scope = StructuredTaskScope.open(Joiner.<String>allSuccessfulOrThrow())) {
|
||||
var results = scope.join();
|
||||
assertEquals(0, results.size());
|
||||
assertThrows(UnsupportedOperationException.class, () -> results.add("foo"));
|
||||
}
|
||||
|
||||
// non-empty list
|
||||
try (var scope = StructuredTaskScope.open(Joiner.<String>allSuccessfulOrThrow())) {
|
||||
scope.fork(() -> "foo");
|
||||
var results = scope.join();
|
||||
assertEquals(1, results.size());
|
||||
assertThrows(UnsupportedOperationException.class, () -> results.add("foo"));
|
||||
assertThrows(UnsupportedOperationException.class, () -> results.add("bar"));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test Joiner.anySuccessfulOrThrow() with no subtasks.
|
||||
*/
|
||||
@Test
|
||||
void testAnySuccessfulOrThrow1() throws Exception {
|
||||
try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulOrThrow())) {
|
||||
try {
|
||||
scope.join();
|
||||
} catch (FailedException e) {
|
||||
@ -1171,12 +1378,12 @@ class StructuredTaskScopeTest {
|
||||
}
|
||||
|
||||
/**
|
||||
* Test Joiner.anySuccessfulResultOrThrow() with a subtask that completes successfully.
|
||||
* Test Joiner.anySuccessfulOrThrow() with a subtask that completes successfully.
|
||||
*/
|
||||
@ParameterizedTest
|
||||
@MethodSource("factories")
|
||||
void testAnySuccessfulResultOrThrow2(ThreadFactory factory) throws Exception {
|
||||
try (var scope = StructuredTaskScope.open(Joiner.<String>anySuccessfulResultOrThrow(),
|
||||
void testAnySuccessfulOrThrow2(ThreadFactory factory) throws Exception {
|
||||
try (var scope = StructuredTaskScope.open(Joiner.<String>anySuccessfulOrThrow(),
|
||||
cf -> cf.withThreadFactory(factory))) {
|
||||
scope.fork(() -> "foo");
|
||||
String result = scope.join();
|
||||
@ -1185,13 +1392,13 @@ class StructuredTaskScopeTest {
|
||||
}
|
||||
|
||||
/**
|
||||
* Test Joiner.anySuccessfulResultOrThrow() with a subtask that completes successfully
|
||||
* Test Joiner.anySuccessfulOrThrow() with a subtask that completes successfully
|
||||
* with a null result.
|
||||
*/
|
||||
@ParameterizedTest
|
||||
@MethodSource("factories")
|
||||
void testAnySuccessfulResultOrThrow3(ThreadFactory factory) throws Exception {
|
||||
try (var scope = StructuredTaskScope.open(Joiner.<String>anySuccessfulResultOrThrow(),
|
||||
void testAnySuccessfulOrThrow3(ThreadFactory factory) throws Exception {
|
||||
try (var scope = StructuredTaskScope.open(Joiner.<String>anySuccessfulOrThrow(),
|
||||
cf -> cf.withThreadFactory(factory))) {
|
||||
scope.fork(() -> null);
|
||||
String result = scope.join();
|
||||
@ -1200,13 +1407,13 @@ class StructuredTaskScopeTest {
|
||||
}
|
||||
|
||||
/**
|
||||
* Test Joiner.anySuccessfulResultOrThrow() with a subtask that complete succcessfully
|
||||
* Test Joiner.anySuccessfulOrThrow() with a subtask that complete succcessfully
|
||||
* and a subtask that fails.
|
||||
*/
|
||||
@ParameterizedTest
|
||||
@MethodSource("factories")
|
||||
void testAnySuccessfulResultOrThrow4(ThreadFactory factory) throws Exception {
|
||||
try (var scope = StructuredTaskScope.open(Joiner.<String>anySuccessfulResultOrThrow(),
|
||||
void testAnySuccessfulOrThrow4(ThreadFactory factory) throws Exception {
|
||||
try (var scope = StructuredTaskScope.open(Joiner.<String>anySuccessfulOrThrow(),
|
||||
cf -> cf.withThreadFactory(factory))) {
|
||||
scope.fork(() -> "foo");
|
||||
scope.fork(() -> { throw new FooException(); });
|
||||
@ -1216,12 +1423,12 @@ class StructuredTaskScopeTest {
|
||||
}
|
||||
|
||||
/**
|
||||
* Test Joiner.anySuccessfulResultOrThrow() with a subtask that fails.
|
||||
* Test Joiner.anySuccessfulOrThrow() with a subtask that fails.
|
||||
*/
|
||||
@ParameterizedTest
|
||||
@MethodSource("factories")
|
||||
void testAnySuccessfulResultOrThrow5(ThreadFactory factory) throws Exception {
|
||||
try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulResultOrThrow(),
|
||||
void testAnySuccessfulOrThrow5(ThreadFactory factory) throws Exception {
|
||||
try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulOrThrow(),
|
||||
cf -> cf.withThreadFactory(factory))) {
|
||||
scope.fork(() -> { throw new FooException(); });
|
||||
Throwable ex = assertThrows(FailedException.class, scope::join);
|
||||
@ -1229,6 +1436,25 @@ class StructuredTaskScopeTest {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test Joiner.anySuccessfulOrThrow() with a timeout.
|
||||
*/
|
||||
@Test
|
||||
void anySuccessfulOrThrow6() throws Exception {
|
||||
try (var scope = StructuredTaskScope.open(Joiner.<String>anySuccessfulOrThrow(),
|
||||
cf -> cf.withTimeout(Duration.ofMillis(100)))) {
|
||||
scope.fork(() -> { throw new FooException(); });
|
||||
scope.fork(() -> {
|
||||
Thread.sleep(Duration.ofDays(1));
|
||||
return "bar";
|
||||
});
|
||||
assertThrows(TimeoutException.class, scope::join);
|
||||
|
||||
// retry after join throws TimeoutException
|
||||
assertThrows(IllegalStateException.class, scope::join);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test Joiner.awaitAllSuccessfulOrThrow() with no subtasks.
|
||||
*/
|
||||
@ -1276,6 +1502,25 @@ class StructuredTaskScopeTest {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test Joiner.awaitAllSuccessfulOrThrow() with a timeout.
|
||||
*/
|
||||
@Test
|
||||
void testAwaitSuccessfulOrThrow4() throws Exception {
|
||||
try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAllSuccessfulOrThrow(),
|
||||
cf -> cf.withTimeout(Duration.ofMillis(100)))) {
|
||||
scope.fork(() -> "foo");
|
||||
scope.fork(() -> {
|
||||
Thread.sleep(Duration.ofDays(1));
|
||||
return "bar";
|
||||
});
|
||||
assertThrows(TimeoutException.class, scope::join);
|
||||
|
||||
// retry after join throws TimeoutException
|
||||
assertThrows(IllegalStateException.class, scope::join);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test Joiner.awaitAll() with no subtasks.
|
||||
*/
|
||||
@ -1322,6 +1567,25 @@ class StructuredTaskScopeTest {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test Joiner.awaitAll() with a timeout.
|
||||
*/
|
||||
@Test
|
||||
void testAwaitAll4() throws Exception {
|
||||
try (var scope = StructuredTaskScope.open(Joiner.<String>awaitAll(),
|
||||
cf -> cf.withTimeout(Duration.ofMillis(100)))) {
|
||||
scope.fork(() -> "foo");
|
||||
scope.fork(() -> {
|
||||
Thread.sleep(Duration.ofDays(1));
|
||||
return "bar";
|
||||
});
|
||||
assertThrows(TimeoutException.class, scope::join);
|
||||
|
||||
// retry after join throws TimeoutException
|
||||
assertThrows(IllegalStateException.class, scope::join);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test Joiner.allUntil(Predicate) with no subtasks.
|
||||
*/
|
||||
@ -1329,7 +1593,7 @@ class StructuredTaskScopeTest {
|
||||
void testAllUntil1() throws Throwable {
|
||||
try (var scope = StructuredTaskScope.open(Joiner.allUntil(s -> false))) {
|
||||
var subtasks = scope.join();
|
||||
assertEquals(0, subtasks.count());
|
||||
assertEquals(0, subtasks.size());
|
||||
}
|
||||
}
|
||||
|
||||
@ -1345,11 +1609,9 @@ class StructuredTaskScopeTest {
|
||||
var subtask1 = scope.fork(() -> "foo");
|
||||
var subtask2 = scope.fork(() -> { throw new FooException(); });
|
||||
|
||||
var subtasks = scope.join().toList();
|
||||
assertEquals(2, subtasks.size());
|
||||
var subtasks = scope.join();
|
||||
assertEquals(List.of(subtask1, subtask2), subtasks);
|
||||
|
||||
assertSame(subtask1, subtasks.get(0));
|
||||
assertSame(subtask2, subtasks.get(1));
|
||||
assertEquals("foo", subtask1.get());
|
||||
assertTrue(subtask2.exception() instanceof FooException);
|
||||
}
|
||||
@ -1370,11 +1632,9 @@ class StructuredTaskScopeTest {
|
||||
return "bar";
|
||||
});
|
||||
|
||||
var subtasks = scope.join().toList();
|
||||
var subtasks = scope.join();
|
||||
assertEquals(List.of(subtask1, subtask2), subtasks);
|
||||
|
||||
assertEquals(2, subtasks.size());
|
||||
assertSame(subtask1, subtasks.get(0));
|
||||
assertSame(subtask2, subtasks.get(1));
|
||||
assertEquals("foo", subtask1.get());
|
||||
assertEquals(Subtask.State.UNAVAILABLE, subtask2.state());
|
||||
}
|
||||
@ -1388,10 +1648,10 @@ class StructuredTaskScopeTest {
|
||||
void testAllUntil4(ThreadFactory factory) throws Exception {
|
||||
|
||||
// cancel execution after two or more failures
|
||||
class CancelAfterTwoFailures<T> implements Predicate<Subtask<? extends T>> {
|
||||
class CancelAfterTwoFailures<T> implements Predicate<Subtask<T>> {
|
||||
final AtomicInteger failedCount = new AtomicInteger();
|
||||
@Override
|
||||
public boolean test(Subtask<? extends T> subtask) {
|
||||
public boolean test(Subtask<T> subtask) {
|
||||
return subtask.state() == Subtask.State.FAILED
|
||||
&& failedCount.incrementAndGet() >= 2;
|
||||
}
|
||||
@ -1409,7 +1669,7 @@ class StructuredTaskScopeTest {
|
||||
Thread.sleep(Duration.ofMillis(20));
|
||||
}
|
||||
|
||||
var subtasks = scope.join().toList();
|
||||
var subtasks = scope.join();
|
||||
assertEquals(forkCount, subtasks.size());
|
||||
|
||||
long failedCount = subtasks.stream()
|
||||
@ -1437,6 +1697,59 @@ class StructuredTaskScopeTest {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test Joiner.allUntil(Predicate) with a timeout.
|
||||
*/
|
||||
@Test
|
||||
void testAllUntil6() throws Exception {
|
||||
try (var scope = StructuredTaskScope.open(Joiner.<String>allUntil(s -> false),
|
||||
cf -> cf.withTimeout(Duration.ofMillis(100)))) {
|
||||
var subtask1 = scope.fork(() -> "foo");
|
||||
var subtask2 = scope.fork(() -> {
|
||||
Thread.sleep(Duration.ofDays(1));
|
||||
return "bar";
|
||||
});
|
||||
|
||||
// TimeoutException should not be thrown
|
||||
var subtasks = scope.join();
|
||||
|
||||
// stream should have two elements, subtask1 may or may not have completed
|
||||
assertEquals(List.of(subtask1, subtask2), subtasks);
|
||||
assertEquals(Subtask.State.UNAVAILABLE, subtask2.state());
|
||||
|
||||
// retry after join throws TimeoutException
|
||||
assertThrows(IllegalStateException.class, scope::join);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test Joiner.allUntil(Predicate) yields an unmodifiable list.
|
||||
*/
|
||||
@Test
|
||||
void testAllUntil7() throws Exception {
|
||||
Subtask<String> subtask1;
|
||||
try (var scope = StructuredTaskScope.open(Joiner.<String>allUntil(s -> false))) {
|
||||
subtask1 = scope.fork(() -> "?");
|
||||
scope.join();
|
||||
}
|
||||
|
||||
// empty list
|
||||
try (var scope = StructuredTaskScope.open(Joiner.<String>allUntil(s -> false))) {
|
||||
var subtasks = scope.join();
|
||||
assertEquals(0, subtasks.size());
|
||||
assertThrows(UnsupportedOperationException.class, () -> subtasks.add(subtask1));
|
||||
}
|
||||
|
||||
// non-empty list
|
||||
try (var scope = StructuredTaskScope.open(Joiner.<String>allUntil(s -> false))) {
|
||||
var subtask2 = scope.fork(() -> "foo");
|
||||
var subtasks = scope.join();
|
||||
assertEquals(1, subtasks.size());
|
||||
assertThrows(UnsupportedOperationException.class, () -> subtasks.add(subtask1));
|
||||
assertThrows(UnsupportedOperationException.class, () -> subtasks.add(subtask2));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test Joiner default methods.
|
||||
*/
|
||||
@ -1454,13 +1767,14 @@ class StructuredTaskScopeTest {
|
||||
assertEquals(Subtask.State.UNAVAILABLE, subtask2.state());
|
||||
|
||||
// Joiner that does not override default methods
|
||||
Joiner<Object, Void> joiner = () -> null;
|
||||
Joiner<String, Void> joiner = () -> null;
|
||||
assertThrows(NullPointerException.class, () -> joiner.onFork(null));
|
||||
assertThrows(NullPointerException.class, () -> joiner.onComplete(null));
|
||||
assertThrows(IllegalArgumentException.class, () -> joiner.onFork(subtask1));
|
||||
assertFalse(joiner.onFork(subtask2));
|
||||
assertFalse(joiner.onComplete(subtask1));
|
||||
assertThrows(IllegalArgumentException.class, () -> joiner.onComplete(subtask2));
|
||||
assertThrows(TimeoutException.class, joiner::onTimeout);
|
||||
}
|
||||
}
|
||||
|
||||
@ -1481,7 +1795,7 @@ class StructuredTaskScopeTest {
|
||||
assertThrows(IllegalArgumentException.class,
|
||||
() -> Joiner.allSuccessfulOrThrow().onComplete(subtask));
|
||||
assertThrows(IllegalArgumentException.class,
|
||||
() -> Joiner.anySuccessfulResultOrThrow().onComplete(subtask));
|
||||
() -> Joiner.anySuccessfulOrThrow().onComplete(subtask));
|
||||
assertThrows(IllegalArgumentException.class,
|
||||
() -> Joiner.awaitAllSuccessfulOrThrow().onComplete(subtask));
|
||||
assertThrows(IllegalArgumentException.class,
|
||||
@ -1497,7 +1811,7 @@ class StructuredTaskScopeTest {
|
||||
assertThrows(IllegalArgumentException.class,
|
||||
() -> Joiner.allSuccessfulOrThrow().onFork(subtask));
|
||||
assertThrows(IllegalArgumentException.class,
|
||||
() -> Joiner.anySuccessfulResultOrThrow().onFork(subtask));
|
||||
() -> Joiner.anySuccessfulOrThrow().onFork(subtask));
|
||||
assertThrows(IllegalArgumentException.class,
|
||||
() -> Joiner.awaitAllSuccessfulOrThrow().onFork(subtask));
|
||||
assertThrows(IllegalArgumentException.class,
|
||||
@ -1523,7 +1837,7 @@ class StructuredTaskScopeTest {
|
||||
*/
|
||||
@Test
|
||||
void testConfigMethods() throws Exception {
|
||||
Function<Configuration, Configuration> testConfig = cf -> {
|
||||
UnaryOperator<Configuration> configOperator = cf -> {
|
||||
var name = "duke";
|
||||
var threadFactory = Thread.ofPlatform().factory();
|
||||
var timeout = Duration.ofSeconds(10);
|
||||
@ -1548,7 +1862,7 @@ class StructuredTaskScopeTest {
|
||||
|
||||
return cf;
|
||||
};
|
||||
try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), testConfig)) {
|
||||
try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), configOperator)) {
|
||||
// do nothing
|
||||
}
|
||||
}
|
||||
@ -1597,9 +1911,9 @@ class StructuredTaskScopeTest {
|
||||
assertThrows(NullPointerException.class,
|
||||
() -> Joiner.allSuccessfulOrThrow().onComplete(null));
|
||||
assertThrows(NullPointerException.class,
|
||||
() -> Joiner.anySuccessfulResultOrThrow().onFork(null));
|
||||
() -> Joiner.anySuccessfulOrThrow().onFork(null));
|
||||
assertThrows(NullPointerException.class,
|
||||
() -> Joiner.anySuccessfulResultOrThrow().onComplete(null));
|
||||
() -> Joiner.anySuccessfulOrThrow().onComplete(null));
|
||||
}
|
||||
|
||||
/**
|
||||
@ -1629,12 +1943,12 @@ class StructuredTaskScopeTest {
|
||||
final AtomicInteger onForkCount = new AtomicInteger();
|
||||
final AtomicInteger onCompleteCount = new AtomicInteger();
|
||||
@Override
|
||||
public boolean onFork(Subtask<? extends T> subtask) {
|
||||
public boolean onFork(Subtask<T> subtask) {
|
||||
onForkCount.incrementAndGet();
|
||||
return false;
|
||||
}
|
||||
@Override
|
||||
public boolean onComplete(Subtask<? extends T> subtask) {
|
||||
public boolean onComplete(Subtask<T> subtask) {
|
||||
onCompleteCount.incrementAndGet();
|
||||
return false;
|
||||
}
|
||||
@ -1658,12 +1972,12 @@ class StructuredTaskScopeTest {
|
||||
final AtomicInteger onForkCount = new AtomicInteger();
|
||||
final AtomicInteger onCompleteCount = new AtomicInteger();
|
||||
@Override
|
||||
public boolean onFork(Subtask<? extends T> subtask) {
|
||||
public boolean onFork(Subtask<T> subtask) {
|
||||
onForkCount.incrementAndGet();
|
||||
return false;
|
||||
}
|
||||
@Override
|
||||
public boolean onComplete(Subtask<? extends T> subtask) {
|
||||
public boolean onComplete(Subtask<T> subtask) {
|
||||
onCompleteCount.incrementAndGet();
|
||||
return true;
|
||||
}
|
||||
@ -1755,6 +2069,40 @@ class StructuredTaskScopeTest {
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Calls a result returning task from another thread.
|
||||
*/
|
||||
private <V> V callInOtherThread(Callable<V> task) throws Exception {
|
||||
var result = new AtomicReference<V>();
|
||||
var exc = new AtomicReference<Exception>();
|
||||
Thread thread = Thread.ofVirtual().start(() -> {
|
||||
try {
|
||||
result.set(task.call());
|
||||
} catch (Exception e) {
|
||||
exc.set(e);
|
||||
}
|
||||
});
|
||||
boolean interrupted = false;
|
||||
boolean terminated = false;
|
||||
while (!terminated) {
|
||||
try {
|
||||
thread.join();
|
||||
terminated = true;
|
||||
} catch (InterruptedException e) {
|
||||
interrupted = true;
|
||||
}
|
||||
}
|
||||
if (interrupted) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
Exception e = exc.get();
|
||||
if (e != null) {
|
||||
throw e;
|
||||
} else {
|
||||
return result.get();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true if the given stack trace contains an element for the given class
|
||||
* and method name.
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user