From f0498c2aed761d4023917bc9cd1f852a02ce977a Mon Sep 17 00:00:00 2001 From: Alan Bateman Date: Fri, 22 Aug 2025 08:16:55 +0000 Subject: [PATCH] 8364764: java/nio/channels/vthread/BlockingChannelOps.java subtests timed out Reviewed-by: jpai --- .../channels/vthread/BlockingChannelOps.java | 140 ++++++++++++++---- 1 file changed, 109 insertions(+), 31 deletions(-) diff --git a/test/jdk/java/nio/channels/vthread/BlockingChannelOps.java b/test/jdk/java/nio/channels/vthread/BlockingChannelOps.java index 7ff02cdfea4..1cdd090d1be 100644 --- a/test/jdk/java/nio/channels/vthread/BlockingChannelOps.java +++ b/test/jdk/java/nio/channels/vthread/BlockingChannelOps.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2018, 2023, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2018, 2025, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -21,7 +21,7 @@ * questions. */ -/** +/* * @test id=default * @bug 8284161 * @summary Test virtual threads doing blocking I/O on NIO channels @@ -29,7 +29,7 @@ * @run junit BlockingChannelOps */ -/** +/* * @test id=poller-modes * @requires (os.family == "linux") | (os.family == "mac") * @library /test/lib @@ -37,7 +37,7 @@ * @run junit/othervm -Djdk.pollerMode=2 BlockingChannelOps */ -/** +/* * @test id=no-vmcontinuations * @requires vm.continuations * @library /test/lib @@ -62,6 +62,7 @@ import java.nio.channels.ReadableByteChannel; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.channels.WritableByteChannel; +import java.util.concurrent.locks.LockSupport; import jdk.test.lib.thread.VThreadRunner; import org.junit.jupiter.api.Test; @@ -161,6 +162,22 @@ class BlockingChannelOps { }); } + /** + * SocketChannel shutdownInput while virtual thread blocked in read. + */ + @Test + void testSocketChannelReadAsyncShutdownInput() throws Exception { + VThreadRunner.run(() -> { + try (var connection = new Connection()) { + SocketChannel sc = connection.channel1(); + runAfterParkedAsync(sc::shutdownInput); + int n = sc.read(ByteBuffer.allocate(100)); + assertEquals(-1, n); + assertTrue(sc.isOpen()); + } + }); + } + /** * Virtual thread interrupted while blocked in SocketChannel read. */ @@ -190,13 +207,15 @@ class BlockingChannelOps { @Test void testSocketChannelWriteAsyncClose() throws Exception { VThreadRunner.run(() -> { - boolean retry = true; - while (retry) { + boolean done = false; + while (!done) { try (var connection = new Connection()) { SocketChannel sc = connection.channel1(); // close sc when current thread blocks in write - runAfterParkedAsync(sc::close); + runAfterParkedAsync(sc::close, true); + + // write until channel is closed try { ByteBuffer bb = ByteBuffer.allocate(100*1024); for (;;) { @@ -206,30 +225,59 @@ class BlockingChannelOps { } } catch (AsynchronousCloseException expected) { // closed when blocked in write - retry = false; + done = true; } catch (ClosedChannelException e) { - // closed when not blocked in write, need to retry test + // closed but not blocked in write, need to retry test + System.err.format("%s, need to retry!%n", e); } } } }); } + + /** + * SocketChannel shutdownOutput while virtual thread blocked in write. + */ + @Test + void testSocketChannelWriteAsyncShutdownOutput() throws Exception { + VThreadRunner.run(() -> { + try (var connection = new Connection()) { + SocketChannel sc = connection.channel1(); + + // shutdown output when current thread blocks in write + runAfterParkedAsync(sc::shutdownOutput); + try { + ByteBuffer bb = ByteBuffer.allocate(100*1024); + for (;;) { + int n = sc.write(bb); + assertTrue(n > 0); + bb.clear(); + } + } catch (ClosedChannelException e) { + // expected + } + assertTrue(sc.isOpen()); + } + }); + } + /** * Virtual thread interrupted while blocked in SocketChannel write. */ @Test void testSocketChannelWriteInterrupt() throws Exception { VThreadRunner.run(() -> { - boolean retry = true; - while (retry) { + boolean done = false; + while (!done) { try (var connection = new Connection()) { SocketChannel sc = connection.channel1(); // interrupt current thread when it blocks in write Thread thisThread = Thread.currentThread(); - runAfterParkedAsync(thisThread::interrupt); + runAfterParkedAsync(thisThread::interrupt, true); + // write until channel is closed try { ByteBuffer bb = ByteBuffer.allocate(100*1024); for (;;) { @@ -240,9 +288,10 @@ class BlockingChannelOps { } catch (ClosedByInterruptException e) { // closed when blocked in write assertTrue(Thread.interrupted()); - retry = false; + done = true; } catch (ClosedChannelException e) { - // closed when not blocked in write, need to retry test + // closed but not blocked in write, need to retry test + System.err.format("%s, need to retry!%n", e); } } } @@ -734,14 +783,16 @@ class BlockingChannelOps { @Test void testPipeWriteAsyncClose() throws Exception { VThreadRunner.run(() -> { - boolean retry = true; - while (retry) { + boolean done = false; + while (!done) { Pipe p = Pipe.open(); try (Pipe.SinkChannel sink = p.sink(); Pipe.SourceChannel source = p.source()) { // close sink when current thread blocks in write - runAfterParkedAsync(sink::close); + runAfterParkedAsync(sink::close, true); + + // write until channel is closed try { ByteBuffer bb = ByteBuffer.allocate(100*1024); for (;;) { @@ -751,9 +802,10 @@ class BlockingChannelOps { } } catch (AsynchronousCloseException e) { // closed when blocked in write - retry = false; + done = true; } catch (ClosedChannelException e) { - // closed when not blocked in write, need to retry test + // closed but not blocked in write, need to retry test + System.err.format("%s, need to retry!%n", e); } } } @@ -766,16 +818,17 @@ class BlockingChannelOps { @Test void testPipeWriteInterrupt() throws Exception { VThreadRunner.run(() -> { - boolean retry = true; - while (retry) { + boolean done = false; + while (!done) { Pipe p = Pipe.open(); try (Pipe.SinkChannel sink = p.sink(); Pipe.SourceChannel source = p.source()) { // interrupt current thread when it blocks in write Thread thisThread = Thread.currentThread(); - runAfterParkedAsync(thisThread::interrupt); + runAfterParkedAsync(thisThread::interrupt, true); + // write until channel is closed try { ByteBuffer bb = ByteBuffer.allocate(100*1024); for (;;) { @@ -786,9 +839,10 @@ class BlockingChannelOps { } catch (ClosedByInterruptException expected) { // closed when blocked in write assertTrue(Thread.interrupted()); - retry = false; + done = true; } catch (ClosedChannelException e) { - // closed when not blocked in write, need to retry test + // closed but not blocked in write, need to retry test + System.err.format("%s, need to retry!%n", e); } } } @@ -848,26 +902,50 @@ class BlockingChannelOps { } /** - * Runs the given task asynchronously after the current virtual thread has parked. + * Runs the given task asynchronously after the current virtual thread parks. + * @param writing if the thread will block in write * @return the thread started to run the task */ - static Thread runAfterParkedAsync(ThrowingRunnable task) { + private static Thread runAfterParkedAsync(ThrowingRunnable task, boolean writing) { Thread target = Thread.currentThread(); if (!target.isVirtual()) throw new WrongThreadException(); return Thread.ofPlatform().daemon().start(() -> { try { - Thread.State state = target.getState(); - while (state != Thread.State.WAITING - && state != Thread.State.TIMED_WAITING) { + // wait for target thread to park + while (!isWaiting(target)) { Thread.sleep(20); - state = target.getState(); } - Thread.sleep(20); // give a bit more time to release carrier + + // if the target thread is parked in write then we nudge it a few times + // to avoid wakeup with some bytes written + if (writing) { + for (int i = 0; i < 3; i++) { + LockSupport.unpark(target); + while (!isWaiting(target)) { + Thread.sleep(20); + } + } + } + task.run(); + } catch (Exception e) { e.printStackTrace(); } }); } + + private static Thread runAfterParkedAsync(ThrowingRunnable task) { + return runAfterParkedAsync(task, false); + } + + /** + * Return true if the given Thread is parked. + */ + private static boolean isWaiting(Thread target) { + Thread.State state = target.getState(); + assertNotEquals(Thread.State.TERMINATED, state); + return (state == Thread.State.WAITING || state == Thread.State.TIMED_WAITING); + } }