mirror of
https://github.com/openjdk/jdk.git
synced 2026-05-09 21:19:38 +00:00
8380109: Implement JEP 533: Structured Concurrency (Seventh Preview)
Reviewed-by: vklang
This commit is contained in:
parent
e44bda91b4
commit
4f3edc376a
@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright (c) 2024, 2025, Oracle and/or its affiliates. All rights reserved.
|
||||
* Copyright (c) 2024, 2026, Oracle and/or its affiliates. All rights reserved.
|
||||
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
|
||||
*
|
||||
* This code is free software; you can redistribute it and/or modify it
|
||||
@ -33,6 +33,8 @@ import java.util.NoSuchElementException;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.StructuredTaskScope.Joiner;
|
||||
import java.util.concurrent.StructuredTaskScope.Subtask;
|
||||
import java.util.concurrent.StructuredTaskScope.CancelledByTimeoutException;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Predicate;
|
||||
import jdk.internal.invoke.MhUtil;
|
||||
|
||||
@ -66,15 +68,21 @@ class Joiners {
|
||||
* A joiner that returns a list of all results when all subtasks complete
|
||||
* successfully. Cancels the scope if any subtask fails.
|
||||
*/
|
||||
static final class AllSuccessful<T> implements Joiner<T, List<T>> {
|
||||
static final class AllSuccessful<T, R_X extends Throwable> implements Joiner<T, List<T>, R_X> {
|
||||
private static final VarHandle FIRST_EXCEPTION =
|
||||
MhUtil.findVarHandle(MethodHandles.lookup(), "firstException", Throwable.class);
|
||||
|
||||
private final Function<Throwable, R_X> esf;
|
||||
|
||||
// list of forked subtasks, created lazily, only accessed by owner thread
|
||||
private List<Subtask<T>> subtasks;
|
||||
|
||||
private volatile Throwable firstException;
|
||||
|
||||
AllSuccessful(Function<Throwable, R_X> esf) {
|
||||
this.esf = Objects.requireNonNull(esf);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean onFork(Subtask<T> subtask) {
|
||||
ensureUnavailable(subtask);
|
||||
@ -94,11 +102,11 @@ class Joiners {
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<T> result() throws Throwable {
|
||||
public List<T> result() throws R_X {
|
||||
Throwable ex = firstException;
|
||||
try {
|
||||
if (ex != null) {
|
||||
throw ex;
|
||||
throw esf.apply(ex);
|
||||
}
|
||||
return (subtasks != null)
|
||||
? subtasks.stream().map(Subtask::get).toList()
|
||||
@ -107,22 +115,37 @@ class Joiners {
|
||||
subtasks = null; // allow subtasks to be GC'ed
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<T> timeout() throws R_X {
|
||||
try {
|
||||
throw esf.apply(new CancelledByTimeoutException());
|
||||
} finally {
|
||||
subtasks = null; // allow subtasks to be GC'ed
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A joiner that returns the result of the first subtask to complete successfully.
|
||||
* Cancels the scope if any subtasks succeeds.
|
||||
*/
|
||||
static final class AnySuccessful<T> implements Joiner<T, T> {
|
||||
static final class AnySuccessful<T, R_X extends Throwable> implements Joiner<T, T, R_X> {
|
||||
private static final VarHandle SUBTASK =
|
||||
MhUtil.findVarHandle(MethodHandles.lookup(), "subtask", Subtask.class);
|
||||
|
||||
private final Function<Throwable, R_X> esf;
|
||||
|
||||
// UNAVAILABLE < FAILED < SUCCESS
|
||||
private static final Comparator<Subtask.State> SUBTASK_STATE_COMPARATOR =
|
||||
Comparator.comparingInt(AnySuccessful::stateToInt);
|
||||
|
||||
private volatile Subtask<T> subtask;
|
||||
|
||||
AnySuccessful(Function<Throwable, R_X> esf) {
|
||||
this.esf = Objects.requireNonNull(esf);
|
||||
}
|
||||
|
||||
/**
|
||||
* Maps a Subtask.State to an int that can be compared.
|
||||
*/
|
||||
@ -148,28 +171,39 @@ class Joiners {
|
||||
}
|
||||
|
||||
@Override
|
||||
public T result() throws Throwable {
|
||||
public T result() throws R_X {
|
||||
Subtask<T> subtask = this.subtask;
|
||||
if (subtask == null) {
|
||||
throw new NoSuchElementException("No subtasks completed");
|
||||
throw esf.apply(new NoSuchElementException("No subtasks completed"));
|
||||
}
|
||||
return switch (subtask.state()) {
|
||||
case SUCCESS -> subtask.get();
|
||||
case FAILED -> throw subtask.exception();
|
||||
case FAILED -> throw esf.apply(subtask.exception());
|
||||
default -> throw new InternalError();
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public T timeout() throws R_X {
|
||||
throw esf.apply(new CancelledByTimeoutException());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A joiner that that waits for all successful subtasks. Cancels the scope if any
|
||||
* subtask fails.
|
||||
*/
|
||||
static final class AwaitSuccessful<T> implements Joiner<T, Void> {
|
||||
static final class AwaitSuccessful<T, R_X extends Throwable> implements Joiner<T, Void, R_X> {
|
||||
private static final VarHandle FIRST_EXCEPTION =
|
||||
MhUtil.findVarHandle(MethodHandles.lookup(), "firstException", Throwable.class);
|
||||
|
||||
private final Function<Throwable, R_X> esf;
|
||||
private volatile Throwable firstException;
|
||||
|
||||
AwaitSuccessful(Function<Throwable, R_X> esf) {
|
||||
this.esf = Objects.requireNonNull(esf);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean onComplete(Subtask<T> subtask) {
|
||||
Subtask.State state = ensureCompleted(subtask);
|
||||
@ -179,26 +213,31 @@ class Joiners {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Void result() throws Throwable {
|
||||
public Void result() throws R_X {
|
||||
Throwable ex = firstException;
|
||||
if (ex != null) {
|
||||
throw ex;
|
||||
throw esf.apply(ex);
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Void timeout() throws R_X {
|
||||
throw esf.apply(new CancelledByTimeoutException());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A joiner that returns a list of all subtasks.
|
||||
*/
|
||||
static final class AllSubtasks<T> implements Joiner<T, List<Subtask<T>>> {
|
||||
private final Predicate<Subtask<T>> isDone;
|
||||
static final class AllSubtasks<T> implements Joiner<T, List<Subtask<T>>, RuntimeException> {
|
||||
private final Predicate<? super Subtask<T>> isDone;
|
||||
|
||||
// list of forked subtasks, created lazily, only accessed by owner thread
|
||||
private List<Subtask<T>> subtasks;
|
||||
|
||||
AllSubtasks(Predicate<Subtask<T>> isDone) {
|
||||
AllSubtasks(Predicate<? super Subtask<T>> isDone) {
|
||||
this.isDone = Objects.requireNonNull(isDone);
|
||||
}
|
||||
|
||||
@ -218,11 +257,6 @@ class Joiners {
|
||||
return isDone.test(subtask);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTimeout() {
|
||||
// do nothing, this joiner does not throw TimeoutException
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Subtask<T>> result() {
|
||||
if (subtasks != null) {
|
||||
@ -233,5 +267,10 @@ class Joiners {
|
||||
return List.of();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Subtask<T>> timeout() {
|
||||
return result();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright (c) 2024, 2025, Oracle and/or its affiliates. All rights reserved.
|
||||
* Copyright (c) 2024, 2026, Oracle and/or its affiliates. All rights reserved.
|
||||
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
|
||||
*
|
||||
* This code is free software; you can redistribute it and/or modify it
|
||||
@ -36,11 +36,12 @@ import jdk.internal.vm.annotation.Stable;
|
||||
/**
|
||||
* StructuredTaskScope implementation.
|
||||
*/
|
||||
final class StructuredTaskScopeImpl<T, R> implements StructuredTaskScope<T, R> {
|
||||
final class StructuredTaskScopeImpl<T, R, R_X extends Throwable>
|
||||
implements StructuredTaskScope<T, R, R_X> {
|
||||
private static final VarHandle CANCELLED =
|
||||
MhUtil.findVarHandle(MethodHandles.lookup(), "cancelled", boolean.class);
|
||||
|
||||
private final Joiner<? super T, ? extends R> joiner;
|
||||
private final Joiner<? super T, ? extends R, R_X> joiner;
|
||||
private final ThreadFactory threadFactory;
|
||||
private final ThreadFlock flock;
|
||||
|
||||
@ -61,7 +62,7 @@ final class StructuredTaskScopeImpl<T, R> implements StructuredTaskScope<T, R> {
|
||||
private volatile boolean timeoutExpired;
|
||||
|
||||
@SuppressWarnings("this-escape")
|
||||
private StructuredTaskScopeImpl(Joiner<? super T, ? extends R> joiner,
|
||||
private StructuredTaskScopeImpl(Joiner<? super T, ? extends R, R_X> joiner,
|
||||
ThreadFactory threadFactory,
|
||||
String name) {
|
||||
this.joiner = joiner;
|
||||
@ -74,12 +75,12 @@ final class StructuredTaskScopeImpl<T, R> implements StructuredTaskScope<T, R> {
|
||||
* and with configuration that is the result of applying the given function to the
|
||||
* default configuration.
|
||||
*/
|
||||
static <T, R> StructuredTaskScope<T, R> open(Joiner<? super T, ? extends R> joiner,
|
||||
UnaryOperator<Configuration> configOperator) {
|
||||
static <T, R, R_X extends Throwable> StructuredTaskScope<T, R, R_X>
|
||||
open(Joiner<? super T, ? extends R, R_X> joiner, UnaryOperator<Configuration> configOperator) {
|
||||
Objects.requireNonNull(joiner);
|
||||
|
||||
var config = (ConfigImpl) configOperator.apply(ConfigImpl.defaultConfig());
|
||||
var scope = new StructuredTaskScopeImpl<T, R>(joiner, config.threadFactory(), config.name());
|
||||
var scope = new StructuredTaskScopeImpl<T, R, R_X>(joiner, config.threadFactory(), config.name());
|
||||
|
||||
// schedule timeout
|
||||
Duration timeout = config.timeout();
|
||||
@ -173,7 +174,7 @@ final class StructuredTaskScopeImpl<T, R> implements StructuredTaskScope<T, R> {
|
||||
private <U extends T> void onComplete(SubtaskImpl<U> subtask) {
|
||||
assert subtask.state() != Subtask.State.UNAVAILABLE;
|
||||
@SuppressWarnings("unchecked")
|
||||
var j = (Joiner<U, ? extends R>) joiner;
|
||||
var j = (Joiner<U, ? extends R, ? extends Throwable>) joiner;
|
||||
if (j.onComplete(subtask)) {
|
||||
cancel();
|
||||
}
|
||||
@ -192,7 +193,7 @@ final class StructuredTaskScopeImpl<T, R> implements StructuredTaskScope<T, R> {
|
||||
|
||||
// notify joiner, even if cancelled
|
||||
@SuppressWarnings("unchecked")
|
||||
var j = (Joiner<U, ? extends R>) joiner;
|
||||
var j = (Joiner<U, ? extends R, ? extends Throwable>) joiner;
|
||||
if (j.onFork(subtask)) {
|
||||
cancel();
|
||||
}
|
||||
@ -228,7 +229,7 @@ final class StructuredTaskScopeImpl<T, R> implements StructuredTaskScope<T, R> {
|
||||
}
|
||||
|
||||
@Override
|
||||
public R join() throws InterruptedException {
|
||||
public R join() throws R_X, InterruptedException {
|
||||
ensureOwner();
|
||||
if (state >= ST_JOIN_COMPLETED) {
|
||||
throw new IllegalStateException("Already joined or scope is closed");
|
||||
@ -245,19 +246,13 @@ final class StructuredTaskScopeImpl<T, R> implements StructuredTaskScope<T, R> {
|
||||
// all subtasks completed or scope cancelled
|
||||
state = ST_JOIN_COMPLETED;
|
||||
|
||||
// invoke joiner onTimeout if timeout expired
|
||||
// invoke joiner result() or timeout() method
|
||||
if (timeoutExpired) {
|
||||
cancel(); // ensure cancelled before calling onTimeout
|
||||
joiner.onTimeout();
|
||||
cancel(); // ensure cancelled before calling joiner
|
||||
return joiner.timeout();
|
||||
} else {
|
||||
cancelTimeout();
|
||||
}
|
||||
|
||||
// invoke joiner to get result
|
||||
try {
|
||||
return joiner.result();
|
||||
} catch (Throwable e) {
|
||||
throw new FailedException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@ -310,12 +305,12 @@ final class StructuredTaskScopeImpl<T, R> implements StructuredTaskScope<T, R> {
|
||||
}
|
||||
}
|
||||
|
||||
private final StructuredTaskScopeImpl<? super T, ?> scope;
|
||||
private final StructuredTaskScopeImpl<? super T, ?, ?> scope;
|
||||
private final Callable<? extends T> task;
|
||||
private volatile Object result;
|
||||
@Stable private Thread thread;
|
||||
|
||||
SubtaskImpl(StructuredTaskScopeImpl<? super T, ?> scope, Callable<? extends T> task) {
|
||||
SubtaskImpl(StructuredTaskScopeImpl<? super T, ?, ?> scope, Callable<? extends T> task) {
|
||||
this.scope = scope;
|
||||
this.task = task;
|
||||
}
|
||||
|
||||
@ -64,7 +64,7 @@ public @interface PreviewFeature {
|
||||
* Values should be annotated with the feature's {@code JEP}.
|
||||
*/
|
||||
public enum Feature {
|
||||
@JEP(number=525, title="Structured Concurrency", status="Sixth Preview")
|
||||
@JEP(number=533, title="Structured Concurrency", status="Seventh Preview")
|
||||
STRUCTURED_CONCURRENCY,
|
||||
@JEP(number = 526, title = "Lazy Constants", status = "Second Preview")
|
||||
LAZY_CONSTANTS,
|
||||
|
||||
@ -25,26 +25,26 @@
|
||||
* @test id=default
|
||||
* @summary Stress ScopedValue stack overflow recovery path
|
||||
* @enablePreview
|
||||
* @run main/othervm/timeout=300 StressStackOverflow
|
||||
* @run main/othervm/timeout=300 ${test.main.class}
|
||||
*/
|
||||
|
||||
/*
|
||||
* @test id=no-TieredCompilation
|
||||
* @enablePreview
|
||||
* @run main/othervm/timeout=300 -XX:-TieredCompilation StressStackOverflow
|
||||
* @run main/othervm/timeout=300 -XX:-TieredCompilation ${test.main.class}
|
||||
*/
|
||||
|
||||
/*
|
||||
* @test id=TieredStopAtLevel1
|
||||
* @enablePreview
|
||||
* @run main/othervm/timeout=300 -XX:TieredStopAtLevel=1 StressStackOverflow
|
||||
* @run main/othervm/timeout=300 -XX:TieredStopAtLevel=1 ${test.main.class}
|
||||
*/
|
||||
|
||||
/*
|
||||
* @test id=no-vmcontinuations
|
||||
* @requires vm.continuations
|
||||
* @enablePreview
|
||||
* @run main/othervm/timeout=300 -XX:+UnlockExperimentalVMOptions -XX:-VMContinuations StressStackOverflow
|
||||
* @run main/othervm/timeout=300 -XX:+UnlockExperimentalVMOptions -XX:-VMContinuations ${test.main.class}
|
||||
*/
|
||||
|
||||
import java.lang.ScopedValue.CallableOp;
|
||||
@ -52,7 +52,6 @@ import java.time.Duration;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import java.util.concurrent.StructureViolationException;
|
||||
import java.util.concurrent.StructuredTaskScope;
|
||||
import java.util.concurrent.StructuredTaskScope.Joiner;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
public class StressStackOverflow {
|
||||
@ -170,7 +169,7 @@ public class StressStackOverflow {
|
||||
void runInNewThread(Runnable op) {
|
||||
var threadFactory
|
||||
= (ThreadLocalRandom.current().nextBoolean() ? Thread.ofPlatform() : Thread.ofVirtual()).factory();
|
||||
try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), cf -> cf.withThreadFactory(threadFactory))) {
|
||||
try (var scope = StructuredTaskScope.open(cf -> cf.withThreadFactory(threadFactory))) {
|
||||
var handle = scope.fork(() -> {
|
||||
op.run();
|
||||
return null;
|
||||
@ -187,7 +186,7 @@ public class StressStackOverflow {
|
||||
public void run() {
|
||||
try {
|
||||
ScopedValue.where(inheritedValue, 42).where(el, 0).run(() -> {
|
||||
try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
|
||||
try (var scope = StructuredTaskScope.open()) {
|
||||
try {
|
||||
if (ThreadLocalRandom.current().nextBoolean()) {
|
||||
// Repeatedly test Scoped Values set by ScopedValue::call(), get(), and run()
|
||||
|
||||
@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright (c) 2023, 2025, Oracle and/or its affiliates. All rights reserved.
|
||||
* Copyright (c) 2023, 2026, Oracle and/or its affiliates. All rights reserved.
|
||||
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
|
||||
*
|
||||
* This code is free software; you can redistribute it and/or modify it
|
||||
@ -25,7 +25,7 @@
|
||||
* @test
|
||||
* @summary Stress test of StructuredTaskScope cancellation with running and starting threads
|
||||
* @enablePreview
|
||||
* @run junit StressCancellation
|
||||
* @run junit ${test.main.class}
|
||||
*/
|
||||
|
||||
import java.time.Duration;
|
||||
@ -65,7 +65,7 @@ class StressCancellation {
|
||||
@ParameterizedTest
|
||||
@MethodSource("testCases")
|
||||
void test(ThreadFactory factory, int beforeCancel, int afterCancel) throws Exception {
|
||||
var joiner = new Joiner<Boolean, Void>() {
|
||||
var joiner = new Joiner<Boolean, Void, RuntimeException>() {
|
||||
@Override
|
||||
public boolean onComplete(Subtask<Boolean> subtask) {
|
||||
boolean cancel = subtask.get();
|
||||
@ -75,6 +75,10 @@ class StressCancellation {
|
||||
public Void result() {
|
||||
return null;
|
||||
}
|
||||
@Override
|
||||
public Void timeout() {
|
||||
return null;
|
||||
}
|
||||
};
|
||||
|
||||
try (var scope = StructuredTaskScope.open(joiner, cf -> cf.withThreadFactory(factory))) {
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright (c) 2022, 2025, Oracle and/or its affiliates. All rights reserved.
|
||||
* Copyright (c) 2022, 2026, Oracle and/or its affiliates. All rights reserved.
|
||||
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
|
||||
*
|
||||
* This code is free software; you can redistribute it and/or modify it
|
||||
@ -27,11 +27,10 @@
|
||||
* @summary Test thread dumps with StructuredTaskScope
|
||||
* @enablePreview
|
||||
* @library /test/lib
|
||||
* @run junit/othervm StructuredThreadDumpTest
|
||||
* @run junit/othervm ${test.main.class}
|
||||
*/
|
||||
|
||||
import java.util.concurrent.StructuredTaskScope;
|
||||
import java.util.concurrent.StructuredTaskScope.Joiner;
|
||||
import java.io.IOException;
|
||||
import java.lang.management.ManagementFactory;
|
||||
import java.nio.file.Files;
|
||||
@ -54,7 +53,7 @@ class StructuredThreadDumpTest {
|
||||
*/
|
||||
@Test
|
||||
void testTree() throws Exception {
|
||||
try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), cf -> cf.withName("scope"))) {
|
||||
try (var scope = StructuredTaskScope.open(cf -> cf.withName("scope"))) {
|
||||
Thread thread1 = fork(scope, "child-scope-A");
|
||||
Thread thread2 = fork(scope, "child-scope-B");
|
||||
try {
|
||||
@ -68,15 +67,15 @@ class StructuredThreadDumpTest {
|
||||
|
||||
// check parents
|
||||
assertFalse(rootContainer.parent().isPresent());
|
||||
assertTrue(container1.parent().get() == rootContainer);
|
||||
assertTrue(container2.parent().get() == container1);
|
||||
assertTrue(container3.parent().get() == container1);
|
||||
assertSame(rootContainer, container1.parent().get());
|
||||
assertSame(container1, container2.parent().get());
|
||||
assertSame(container1, container3.parent().get());
|
||||
|
||||
// check owners
|
||||
assertFalse(rootContainer.owner().isPresent());
|
||||
assertTrue(container1.owner().getAsLong() == Thread.currentThread().threadId());
|
||||
assertTrue(container2.owner().getAsLong() == thread1.threadId());
|
||||
assertTrue(container3.owner().getAsLong() == thread2.threadId());
|
||||
assertEquals(Thread.currentThread().threadId(), container1.owner().getAsLong());
|
||||
assertEquals(thread1.threadId(), container2.owner().getAsLong());
|
||||
assertEquals(thread2.threadId(), container3.owner().getAsLong());
|
||||
|
||||
// thread1 and threads2 should be in threads array of "scope"
|
||||
container1.findThread(thread1.threadId()).orElseThrow();
|
||||
@ -96,10 +95,10 @@ class StructuredThreadDumpTest {
|
||||
*/
|
||||
@Test
|
||||
void testNested() throws Exception {
|
||||
try (var scope1 = StructuredTaskScope.open(Joiner.awaitAll(), cf -> cf.withName("scope-A"))) {
|
||||
try (var scope1 = StructuredTaskScope.open(cf -> cf.withName("scope-A"))) {
|
||||
Thread thread1 = fork(scope1);
|
||||
|
||||
try (var scope2 = StructuredTaskScope.open(Joiner.awaitAll(), cf -> cf.withName("scope-B"))) {
|
||||
try (var scope2 = StructuredTaskScope.open(cf -> cf.withName("scope-B"))) {
|
||||
Thread thread2 = fork(scope2);
|
||||
try {
|
||||
ThreadDump threadDump = threadDump();
|
||||
@ -111,14 +110,14 @@ class StructuredThreadDumpTest {
|
||||
|
||||
// check parents
|
||||
assertFalse(rootContainer.parent().isPresent());
|
||||
assertTrue(container1.parent().get() == rootContainer);
|
||||
assertTrue(container2.parent().get() == container1);
|
||||
assertSame(rootContainer, container1.parent().get());
|
||||
assertSame(container1, container2.parent().get());
|
||||
|
||||
// check owners
|
||||
long tid = Thread.currentThread().threadId();
|
||||
assertFalse(rootContainer.owner().isPresent());
|
||||
assertTrue(container1.owner().getAsLong() == tid);
|
||||
assertTrue(container2.owner().getAsLong() == tid);
|
||||
assertEquals(tid, container1.owner().getAsLong());
|
||||
assertEquals(tid, container2.owner().getAsLong());
|
||||
|
||||
// thread1 should be in threads array of "scope-A"
|
||||
container1.findThread(thread1.threadId()).orElseThrow();
|
||||
@ -160,7 +159,7 @@ class StructuredThreadDumpTest {
|
||||
* Forks a subtask in the given scope that parks, returning the Thread that executes
|
||||
* the subtask.
|
||||
*/
|
||||
private static Thread fork(StructuredTaskScope<Object, Void> scope) throws Exception {
|
||||
private static Thread fork(StructuredTaskScope<?, ?, ?> scope) throws Exception {
|
||||
var ref = new AtomicReference<Thread>();
|
||||
scope.fork(() -> {
|
||||
ref.set(Thread.currentThread());
|
||||
@ -178,12 +177,11 @@ class StructuredThreadDumpTest {
|
||||
* Forks a subtask in the given scope. The subtask creates a new child scope with
|
||||
* the given name, then parks. This method returns Thread that executes the subtask.
|
||||
*/
|
||||
private static Thread fork(StructuredTaskScope<Object, Void> scope,
|
||||
private static Thread fork(StructuredTaskScope<Object, Void, ?> scope,
|
||||
String childScopeName) throws Exception {
|
||||
var ref = new AtomicReference<Thread>();
|
||||
scope.fork(() -> {
|
||||
try (var childScope = StructuredTaskScope.open(Joiner.awaitAll(),
|
||||
cf -> cf.withName(childScopeName))) {
|
||||
try (var childScope = StructuredTaskScope.open(cf -> cf.withName(childScopeName))) {
|
||||
ref.set(Thread.currentThread());
|
||||
LockSupport.park();
|
||||
}
|
||||
|
||||
@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright (c) 2022, 2025, Oracle and/or its affiliates. All rights reserved.
|
||||
* Copyright (c) 2022, 2026, Oracle and/or its affiliates. All rights reserved.
|
||||
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
|
||||
*
|
||||
* This code is free software; you can redistribute it and/or modify it
|
||||
@ -26,12 +26,12 @@
|
||||
* @bug 8284199 8296779 8306647
|
||||
* @summary Basic tests for StructuredTaskScope with scoped values
|
||||
* @enablePreview
|
||||
* @run junit WithScopedValue
|
||||
* @run junit ${test.main.class}
|
||||
*/
|
||||
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.StructuredTaskScope;
|
||||
import java.util.concurrent.StructuredTaskScope.Subtask;
|
||||
import java.util.concurrent.StructuredTaskScope.Joiner;
|
||||
import java.util.concurrent.StructureViolationException;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
@ -56,8 +56,7 @@ class WithScopedValue {
|
||||
void testForkInheritsScopedValue1(ThreadFactory factory) throws Exception {
|
||||
ScopedValue<String> name = ScopedValue.newInstance();
|
||||
String value = ScopedValue.where(name, "x").call(() -> {
|
||||
try (var scope = StructuredTaskScope.open(Joiner.awaitAll(),
|
||||
cf -> cf.withThreadFactory(factory))) {
|
||||
try (var scope = StructuredTaskScope.open(cf -> cf.withThreadFactory(factory))) {
|
||||
Subtask<String> subtask = scope.fork(() -> {
|
||||
return name.get(); // child should read "x"
|
||||
});
|
||||
@ -76,11 +75,9 @@ class WithScopedValue {
|
||||
void testForkInheritsScopedValue2(ThreadFactory factory) throws Exception {
|
||||
ScopedValue<String> name = ScopedValue.newInstance();
|
||||
String value = ScopedValue.where(name, "x").call(() -> {
|
||||
try (var scope1 = StructuredTaskScope.open(Joiner.awaitAll(),
|
||||
cf -> cf.withThreadFactory(factory))) {
|
||||
try (var scope1 = StructuredTaskScope.open(cf -> cf.withThreadFactory(factory))) {
|
||||
Subtask<String> subtask1 = scope1.fork(() -> {
|
||||
try (var scope2 = StructuredTaskScope.open(Joiner.awaitAll(),
|
||||
cf -> cf.withThreadFactory(factory))) {
|
||||
try (var scope2 = StructuredTaskScope.open(cf -> cf.withThreadFactory(factory))) {
|
||||
Subtask<String> subtask2 = scope2.fork(() -> {
|
||||
return name.get(); // grandchild should read "x"
|
||||
});
|
||||
@ -103,15 +100,13 @@ class WithScopedValue {
|
||||
void testForkInheritsScopedValue3(ThreadFactory factory) throws Exception {
|
||||
ScopedValue<String> name = ScopedValue.newInstance();
|
||||
String value = ScopedValue.where(name, "x").call(() -> {
|
||||
try (var scope1 = StructuredTaskScope.open(Joiner.awaitAll(),
|
||||
cf -> cf.withThreadFactory(factory))) {
|
||||
try (var scope1 = StructuredTaskScope.open(cf -> cf.withThreadFactory(factory))) {
|
||||
Subtask<String> subtask1 = scope1.fork(() -> {
|
||||
assertEquals(name.get(), "x"); // child should read "x"
|
||||
|
||||
// rebind name to "y"
|
||||
String grandchildValue = ScopedValue.where(name, "y").call(() -> {
|
||||
try (var scope2 = StructuredTaskScope.open(Joiner.awaitAll(),
|
||||
cf -> cf.withThreadFactory(factory))) {
|
||||
try (var scope2 = StructuredTaskScope.open(cf -> cf.withThreadFactory(factory))) {
|
||||
Subtask<String> subtask2 = scope2.fork(() -> {
|
||||
return name.get(); // grandchild should read "y"
|
||||
});
|
||||
@ -137,21 +132,21 @@ class WithScopedValue {
|
||||
void testStructureViolation1() throws Exception {
|
||||
ScopedValue<String> name = ScopedValue.newInstance();
|
||||
class Box {
|
||||
StructuredTaskScope<Object, Void> scope;
|
||||
StructuredTaskScope<Void, Void, ExecutionException> scope;
|
||||
}
|
||||
var box = new Box();
|
||||
try {
|
||||
try {
|
||||
ScopedValue.where(name, "x").run(() -> {
|
||||
box.scope = StructuredTaskScope.open(Joiner.awaitAll());
|
||||
box.scope = StructuredTaskScope.open();
|
||||
});
|
||||
fail();
|
||||
} catch (StructureViolationException expected) { }
|
||||
|
||||
// underlying flock should be closed and fork should fail to start a thread
|
||||
StructuredTaskScope<Object, Void> scope = box.scope;
|
||||
StructuredTaskScope<Void, Void, ExecutionException> scope = box.scope;
|
||||
AtomicBoolean ran = new AtomicBoolean();
|
||||
Subtask<Object> subtask = scope.fork(() -> {
|
||||
Subtask<Void> subtask = scope.fork(() -> {
|
||||
ran.set(true);
|
||||
return null;
|
||||
});
|
||||
@ -159,7 +154,7 @@ class WithScopedValue {
|
||||
assertEquals(Subtask.State.UNAVAILABLE, subtask.state());
|
||||
assertFalse(ran.get());
|
||||
} finally {
|
||||
StructuredTaskScope<Object, Void> scope = box.scope;
|
||||
StructuredTaskScope<Void, Void, ExecutionException> scope = box.scope;
|
||||
if (scope != null) {
|
||||
scope.close();
|
||||
}
|
||||
@ -172,7 +167,7 @@ class WithScopedValue {
|
||||
@Test
|
||||
void testStructureViolation2() throws Exception {
|
||||
ScopedValue<String> name = ScopedValue.newInstance();
|
||||
try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
|
||||
try (var scope = StructuredTaskScope.open()) {
|
||||
ScopedValue.where(name, "x").run(() -> {
|
||||
assertThrows(StructureViolationException.class, scope::close);
|
||||
});
|
||||
@ -185,7 +180,7 @@ class WithScopedValue {
|
||||
@Test
|
||||
void testStructureViolation3() throws Exception {
|
||||
ScopedValue<String> name = ScopedValue.newInstance();
|
||||
try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
|
||||
try (var scope = StructuredTaskScope.open()) {
|
||||
ScopedValue.where(name, "x").run(() -> {
|
||||
assertThrows(StructureViolationException.class,
|
||||
() -> scope.fork(() -> "foo"));
|
||||
@ -203,7 +198,7 @@ class WithScopedValue {
|
||||
|
||||
// rebind
|
||||
ScopedValue.where(name1, "x").run(() -> {
|
||||
try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
|
||||
try (var scope = StructuredTaskScope.open()) {
|
||||
ScopedValue.where(name1, "y").run(() -> {
|
||||
assertThrows(StructureViolationException.class,
|
||||
() -> scope.fork(() -> "foo"));
|
||||
@ -213,7 +208,7 @@ class WithScopedValue {
|
||||
|
||||
// new binding
|
||||
ScopedValue.where(name1, "x").run(() -> {
|
||||
try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) {
|
||||
try (var scope = StructuredTaskScope.open()) {
|
||||
ScopedValue.where(name2, "y").run(() -> {
|
||||
assertThrows(StructureViolationException.class,
|
||||
() -> scope.fork(() -> "foo"));
|
||||
|
||||
@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright (c) 2024, 2025, Oracle and/or its affiliates. All rights reserved.
|
||||
* Copyright (c) 2024, 2026, Oracle and/or its affiliates. All rights reserved.
|
||||
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
|
||||
*
|
||||
* This code is free software; you can redistribute it and/or modify it
|
||||
@ -27,7 +27,7 @@
|
||||
* @enablePreview
|
||||
* @summary Implement Subject.current and Subject.callAs using scoped values.
|
||||
* Need enablePreview to use StructuredTaskScope.
|
||||
* @run main/othervm CallAsWithScopedValue true
|
||||
* @run main/othervm ${test.main.class} true
|
||||
*/
|
||||
import com.sun.security.auth.UserPrincipal;
|
||||
|
||||
@ -65,8 +65,7 @@ public class CallAsWithScopedValue {
|
||||
|
||||
// Observable in structured concurrency in SV mode, but not in ACC mode
|
||||
Subject.callAs(subject, () -> {
|
||||
var joiner = StructuredTaskScope.Joiner.awaitAll();
|
||||
try (var scope = StructuredTaskScope.open(joiner)) {
|
||||
try (var scope = StructuredTaskScope.open()) {
|
||||
scope.fork(() -> check(3, Subject.current(), usv ? "Duke" : null));
|
||||
scope.join();
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user