8364764: java/nio/channels/vthread/BlockingChannelOps.java subtests timed out

Reviewed-by: jpai
This commit is contained in:
Alan Bateman 2025-08-22 08:16:55 +00:00
parent 8e44856992
commit f0498c2aed

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2018, 2023, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2018, 2025, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -21,7 +21,7 @@
* questions.
*/
/**
/*
* @test id=default
* @bug 8284161
* @summary Test virtual threads doing blocking I/O on NIO channels
@ -29,7 +29,7 @@
* @run junit BlockingChannelOps
*/
/**
/*
* @test id=poller-modes
* @requires (os.family == "linux") | (os.family == "mac")
* @library /test/lib
@ -37,7 +37,7 @@
* @run junit/othervm -Djdk.pollerMode=2 BlockingChannelOps
*/
/**
/*
* @test id=no-vmcontinuations
* @requires vm.continuations
* @library /test/lib
@ -62,6 +62,7 @@ import java.nio.channels.ReadableByteChannel;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.WritableByteChannel;
import java.util.concurrent.locks.LockSupport;
import jdk.test.lib.thread.VThreadRunner;
import org.junit.jupiter.api.Test;
@ -161,6 +162,22 @@ class BlockingChannelOps {
});
}
/**
* SocketChannel shutdownInput while virtual thread blocked in read.
*/
@Test
void testSocketChannelReadAsyncShutdownInput() throws Exception {
VThreadRunner.run(() -> {
try (var connection = new Connection()) {
SocketChannel sc = connection.channel1();
runAfterParkedAsync(sc::shutdownInput);
int n = sc.read(ByteBuffer.allocate(100));
assertEquals(-1, n);
assertTrue(sc.isOpen());
}
});
}
/**
* Virtual thread interrupted while blocked in SocketChannel read.
*/
@ -190,13 +207,15 @@ class BlockingChannelOps {
@Test
void testSocketChannelWriteAsyncClose() throws Exception {
VThreadRunner.run(() -> {
boolean retry = true;
while (retry) {
boolean done = false;
while (!done) {
try (var connection = new Connection()) {
SocketChannel sc = connection.channel1();
// close sc when current thread blocks in write
runAfterParkedAsync(sc::close);
runAfterParkedAsync(sc::close, true);
// write until channel is closed
try {
ByteBuffer bb = ByteBuffer.allocate(100*1024);
for (;;) {
@ -206,30 +225,59 @@ class BlockingChannelOps {
}
} catch (AsynchronousCloseException expected) {
// closed when blocked in write
retry = false;
done = true;
} catch (ClosedChannelException e) {
// closed when not blocked in write, need to retry test
// closed but not blocked in write, need to retry test
System.err.format("%s, need to retry!%n", e);
}
}
}
});
}
/**
* SocketChannel shutdownOutput while virtual thread blocked in write.
*/
@Test
void testSocketChannelWriteAsyncShutdownOutput() throws Exception {
VThreadRunner.run(() -> {
try (var connection = new Connection()) {
SocketChannel sc = connection.channel1();
// shutdown output when current thread blocks in write
runAfterParkedAsync(sc::shutdownOutput);
try {
ByteBuffer bb = ByteBuffer.allocate(100*1024);
for (;;) {
int n = sc.write(bb);
assertTrue(n > 0);
bb.clear();
}
} catch (ClosedChannelException e) {
// expected
}
assertTrue(sc.isOpen());
}
});
}
/**
* Virtual thread interrupted while blocked in SocketChannel write.
*/
@Test
void testSocketChannelWriteInterrupt() throws Exception {
VThreadRunner.run(() -> {
boolean retry = true;
while (retry) {
boolean done = false;
while (!done) {
try (var connection = new Connection()) {
SocketChannel sc = connection.channel1();
// interrupt current thread when it blocks in write
Thread thisThread = Thread.currentThread();
runAfterParkedAsync(thisThread::interrupt);
runAfterParkedAsync(thisThread::interrupt, true);
// write until channel is closed
try {
ByteBuffer bb = ByteBuffer.allocate(100*1024);
for (;;) {
@ -240,9 +288,10 @@ class BlockingChannelOps {
} catch (ClosedByInterruptException e) {
// closed when blocked in write
assertTrue(Thread.interrupted());
retry = false;
done = true;
} catch (ClosedChannelException e) {
// closed when not blocked in write, need to retry test
// closed but not blocked in write, need to retry test
System.err.format("%s, need to retry!%n", e);
}
}
}
@ -734,14 +783,16 @@ class BlockingChannelOps {
@Test
void testPipeWriteAsyncClose() throws Exception {
VThreadRunner.run(() -> {
boolean retry = true;
while (retry) {
boolean done = false;
while (!done) {
Pipe p = Pipe.open();
try (Pipe.SinkChannel sink = p.sink();
Pipe.SourceChannel source = p.source()) {
// close sink when current thread blocks in write
runAfterParkedAsync(sink::close);
runAfterParkedAsync(sink::close, true);
// write until channel is closed
try {
ByteBuffer bb = ByteBuffer.allocate(100*1024);
for (;;) {
@ -751,9 +802,10 @@ class BlockingChannelOps {
}
} catch (AsynchronousCloseException e) {
// closed when blocked in write
retry = false;
done = true;
} catch (ClosedChannelException e) {
// closed when not blocked in write, need to retry test
// closed but not blocked in write, need to retry test
System.err.format("%s, need to retry!%n", e);
}
}
}
@ -766,16 +818,17 @@ class BlockingChannelOps {
@Test
void testPipeWriteInterrupt() throws Exception {
VThreadRunner.run(() -> {
boolean retry = true;
while (retry) {
boolean done = false;
while (!done) {
Pipe p = Pipe.open();
try (Pipe.SinkChannel sink = p.sink();
Pipe.SourceChannel source = p.source()) {
// interrupt current thread when it blocks in write
Thread thisThread = Thread.currentThread();
runAfterParkedAsync(thisThread::interrupt);
runAfterParkedAsync(thisThread::interrupt, true);
// write until channel is closed
try {
ByteBuffer bb = ByteBuffer.allocate(100*1024);
for (;;) {
@ -786,9 +839,10 @@ class BlockingChannelOps {
} catch (ClosedByInterruptException expected) {
// closed when blocked in write
assertTrue(Thread.interrupted());
retry = false;
done = true;
} catch (ClosedChannelException e) {
// closed when not blocked in write, need to retry test
// closed but not blocked in write, need to retry test
System.err.format("%s, need to retry!%n", e);
}
}
}
@ -848,26 +902,50 @@ class BlockingChannelOps {
}
/**
* Runs the given task asynchronously after the current virtual thread has parked.
* Runs the given task asynchronously after the current virtual thread parks.
* @param writing if the thread will block in write
* @return the thread started to run the task
*/
static Thread runAfterParkedAsync(ThrowingRunnable task) {
private static Thread runAfterParkedAsync(ThrowingRunnable task, boolean writing) {
Thread target = Thread.currentThread();
if (!target.isVirtual())
throw new WrongThreadException();
return Thread.ofPlatform().daemon().start(() -> {
try {
Thread.State state = target.getState();
while (state != Thread.State.WAITING
&& state != Thread.State.TIMED_WAITING) {
// wait for target thread to park
while (!isWaiting(target)) {
Thread.sleep(20);
state = target.getState();
}
Thread.sleep(20); // give a bit more time to release carrier
// if the target thread is parked in write then we nudge it a few times
// to avoid wakeup with some bytes written
if (writing) {
for (int i = 0; i < 3; i++) {
LockSupport.unpark(target);
while (!isWaiting(target)) {
Thread.sleep(20);
}
}
}
task.run();
} catch (Exception e) {
e.printStackTrace();
}
});
}
private static Thread runAfterParkedAsync(ThrowingRunnable task) {
return runAfterParkedAsync(task, false);
}
/**
* Return true if the given Thread is parked.
*/
private static boolean isWaiting(Thread target) {
Thread.State state = target.getState();
assertNotEquals(Thread.State.TERMINATED, state);
return (state == Thread.State.WAITING || state == Thread.State.TIMED_WAITING);
}
}