mirror of
https://github.com/openjdk/jdk.git
synced 2026-02-27 18:50:07 +00:00
8378268: Thread.join can wait on Thread, allows joinNanos to be removed
Reviewed-by: jpai, vklang
This commit is contained in:
parent
173153e1b2
commit
b13a291667
@ -1881,8 +1881,8 @@ public class Thread implements Runnable {
|
||||
* been {@link #start() started}.
|
||||
*
|
||||
* @implNote
|
||||
* For platform threads, the implementation uses a loop of {@code this.wait}
|
||||
* calls conditioned on {@code this.isAlive}. As a thread terminates the
|
||||
* This implementation uses a loop of {@code this.wait} calls
|
||||
* conditioned on {@code this.isAlive}. As a thread terminates the
|
||||
* {@code this.notifyAll} method is invoked. It is recommended that
|
||||
* applications not use {@code wait}, {@code notify}, or
|
||||
* {@code notifyAll} on {@code Thread} instances.
|
||||
@ -1901,13 +1901,12 @@ public class Thread implements Runnable {
|
||||
public final void join(long millis) throws InterruptedException {
|
||||
if (millis < 0)
|
||||
throw new IllegalArgumentException("timeout value is negative");
|
||||
|
||||
if (this instanceof VirtualThread vthread) {
|
||||
if (isAlive()) {
|
||||
long nanos = MILLISECONDS.toNanos(millis);
|
||||
vthread.joinNanos(nanos);
|
||||
}
|
||||
if (!isAlive())
|
||||
return;
|
||||
|
||||
// ensure there is a notifyAll to wake up waiters when this thread terminates
|
||||
if (this instanceof VirtualThread vthread) {
|
||||
vthread.beforeJoin();
|
||||
}
|
||||
|
||||
synchronized (this) {
|
||||
@ -1936,8 +1935,8 @@ public class Thread implements Runnable {
|
||||
* been {@link #start() started}.
|
||||
*
|
||||
* @implNote
|
||||
* For platform threads, the implementation uses a loop of {@code this.wait}
|
||||
* calls conditioned on {@code this.isAlive}. As a thread terminates the
|
||||
* This implementation uses a loop of {@code this.wait} calls
|
||||
* conditioned on {@code this.isAlive}. As a thread terminates the
|
||||
* {@code this.notifyAll} method is invoked. It is recommended that
|
||||
* applications not use {@code wait}, {@code notify}, or
|
||||
* {@code notifyAll} on {@code Thread} instances.
|
||||
@ -1966,16 +1965,6 @@ public class Thread implements Runnable {
|
||||
throw new IllegalArgumentException("nanosecond timeout value out of range");
|
||||
}
|
||||
|
||||
if (this instanceof VirtualThread vthread) {
|
||||
if (isAlive()) {
|
||||
// convert arguments to a total in nanoseconds
|
||||
long totalNanos = MILLISECONDS.toNanos(millis);
|
||||
totalNanos += Math.min(Long.MAX_VALUE - totalNanos, nanos);
|
||||
vthread.joinNanos(totalNanos);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
if (nanos > 0 && millis < Long.MAX_VALUE) {
|
||||
millis++;
|
||||
}
|
||||
@ -2035,10 +2024,6 @@ public class Thread implements Runnable {
|
||||
if (nanos <= 0)
|
||||
return false;
|
||||
|
||||
if (this instanceof VirtualThread vthread) {
|
||||
return vthread.joinNanos(nanos);
|
||||
}
|
||||
|
||||
// convert to milliseconds
|
||||
long millis = MILLISECONDS.convert(nanos, NANOSECONDS);
|
||||
if (nanos > NANOSECONDS.convert(millis, MILLISECONDS)) {
|
||||
|
||||
@ -26,7 +26,6 @@ package java.lang;
|
||||
|
||||
import java.util.Locale;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ForkJoinPool;
|
||||
@ -68,7 +67,6 @@ final class VirtualThread extends BaseVirtualThread {
|
||||
private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state");
|
||||
private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit");
|
||||
private static final long CARRIER_THREAD = U.objectFieldOffset(VirtualThread.class, "carrierThread");
|
||||
private static final long TERMINATION = U.objectFieldOffset(VirtualThread.class, "termination");
|
||||
private static final long ON_WAITING_LIST = U.objectFieldOffset(VirtualThread.class, "onWaitingList");
|
||||
|
||||
// scheduler and continuation
|
||||
@ -184,8 +182,8 @@ final class VirtualThread extends BaseVirtualThread {
|
||||
// carrier thread when mounted, accessed by VM
|
||||
private volatile Thread carrierThread;
|
||||
|
||||
// termination object when joining, created lazily if needed
|
||||
private volatile CountDownLatch termination;
|
||||
// true to notifyAll after this virtual thread terminates
|
||||
private volatile boolean notifyAllAfterTerminate;
|
||||
|
||||
/**
|
||||
* Returns the default scheduler.
|
||||
@ -677,11 +675,11 @@ final class VirtualThread extends BaseVirtualThread {
|
||||
assert carrierThread == null;
|
||||
setState(TERMINATED);
|
||||
|
||||
// notify anyone waiting for this virtual thread to terminate
|
||||
CountDownLatch termination = this.termination;
|
||||
if (termination != null) {
|
||||
assert termination.getCount() == 1;
|
||||
termination.countDown();
|
||||
// notifyAll to wakeup any threads waiting for this thread to terminate
|
||||
if (notifyAllAfterTerminate) {
|
||||
synchronized (this) {
|
||||
notifyAll();
|
||||
}
|
||||
}
|
||||
|
||||
// notify container
|
||||
@ -740,6 +738,13 @@ final class VirtualThread extends BaseVirtualThread {
|
||||
// do nothing
|
||||
}
|
||||
|
||||
/**
|
||||
* Invoked by Thread.join before a thread waits for this virtual thread to terminate.
|
||||
*/
|
||||
void beforeJoin() {
|
||||
notifyAllAfterTerminate = true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Parks until unparked or interrupted. If already unparked then the parking
|
||||
* permit is consumed and this method completes immediately (meaning it doesn't
|
||||
@ -999,36 +1004,6 @@ final class VirtualThread extends BaseVirtualThread {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Waits up to {@code nanos} nanoseconds for this virtual thread to terminate.
|
||||
* A timeout of {@code 0} means to wait forever.
|
||||
*
|
||||
* @throws InterruptedException if interrupted while waiting
|
||||
* @return true if the thread has terminated
|
||||
*/
|
||||
boolean joinNanos(long nanos) throws InterruptedException {
|
||||
if (state() == TERMINATED)
|
||||
return true;
|
||||
|
||||
// ensure termination object exists, then re-check state
|
||||
CountDownLatch termination = getTermination();
|
||||
if (state() == TERMINATED)
|
||||
return true;
|
||||
|
||||
// wait for virtual thread to terminate
|
||||
if (nanos == 0) {
|
||||
termination.await();
|
||||
} else {
|
||||
boolean terminated = termination.await(nanos, NANOSECONDS);
|
||||
if (!terminated) {
|
||||
// waiting time elapsed
|
||||
return false;
|
||||
}
|
||||
}
|
||||
assert state() == TERMINATED;
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
void blockedOn(Interruptible b) {
|
||||
disableSuspendAndPreempt();
|
||||
@ -1239,20 +1214,6 @@ final class VirtualThread extends BaseVirtualThread {
|
||||
return obj == this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the termination object, creating it if needed.
|
||||
*/
|
||||
private CountDownLatch getTermination() {
|
||||
CountDownLatch termination = this.termination;
|
||||
if (termination == null) {
|
||||
termination = new CountDownLatch(1);
|
||||
if (!U.compareAndSetReference(this, TERMINATION, null, termination)) {
|
||||
termination = this.termination;
|
||||
}
|
||||
}
|
||||
return termination;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the lock object to synchronize on when accessing carrierThread.
|
||||
* The lock prevents carrierThread from being reset to null during unmount.
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user