8372958: SocketInputStream.read throws SocketException instead of returning -1 when input shutdown

Reviewed-by: djelinski, michaelm
This commit is contained in:
Alan Bateman 2025-12-03 13:03:51 +00:00
parent abb75ba656
commit afb6a0c2fe
4 changed files with 171 additions and 233 deletions

View File

@ -289,6 +289,7 @@ public final class NioSocketImpl extends SocketImpl implements PlatformSocketImp
*/
private int implRead(byte[] b, int off, int len, long remainingNanos) throws IOException {
int n = 0;
SocketException ex = null;
FileDescriptor fd = beginRead();
try {
if (connectionReset)
@ -307,18 +308,24 @@ public final class NioSocketImpl extends SocketImpl implements PlatformSocketImp
n = tryRead(fd, b, off, len);
}
}
return n;
} catch (InterruptedIOException e) {
throw e;
} catch (ConnectionResetException e) {
connectionReset = true;
throw new SocketException("Connection reset");
} catch (IOException ioe) {
// throw SocketException to maintain compatibility
throw asSocketException(ioe);
// translate to SocketException to maintain compatibility
ex = asSocketException(ioe);
} finally {
endRead(n > 0);
}
if (n <= 0 && isInputClosed) {
return -1;
}
if (ex != null) {
throw ex;
}
return n;
}
/**
@ -411,6 +418,7 @@ public final class NioSocketImpl extends SocketImpl implements PlatformSocketImp
*/
private int implWrite(byte[] b, int off, int len) throws IOException {
int n = 0;
SocketException ex = null;
FileDescriptor fd = beginWrite();
try {
configureNonBlockingIfNeeded(fd, false);
@ -419,15 +427,18 @@ public final class NioSocketImpl extends SocketImpl implements PlatformSocketImp
park(fd, Net.POLLOUT);
n = tryWrite(fd, b, off, len);
}
return n;
} catch (InterruptedIOException e) {
throw e;
} catch (IOException ioe) {
// throw SocketException to maintain compatibility
throw asSocketException(ioe);
// translate to SocketException to maintain compatibility
ex = asSocketException(ioe);
} finally {
endWrite(n > 0);
}
if (ex != null) {
throw ex;
}
return n;
}
/**

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2019, 2025, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -24,11 +24,13 @@
/*
* @test
* @requires (os.family == "linux" | os.family == "mac")
* @run testng AsyncShutdown
* @summary Test shutdownInput/shutdownOutput with threads blocked in read/write
* @run junit AsyncShutdown
*/
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
@ -38,54 +40,56 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.testng.annotations.Test;
import static org.testng.Assert.*;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import static org.junit.jupiter.api.Assertions.*;
@Test
public class AsyncShutdown {
class AsyncShutdown {
public void testShutdownInput1() throws IOException {
@ParameterizedTest
@ValueSource(booleans = { false, true })
void testShutdownInput(boolean timed) throws IOException {
withConnection((s1, s2) -> {
InputStream in = s1.getInputStream();
scheduleShutdownInput(s1, 2000);
int n = s1.getInputStream().read();
assertTrue(n == -1);
if (timed) {
s1.setSoTimeout(30*1000);
}
assertEquals(-1, in.read());
assertEquals(0, in.available());
});
}
public void testShutdownInput2() throws IOException {
withConnection((s1, s2) -> {
scheduleShutdownInput(s1, 2000);
s1.setSoTimeout(30*1000);
int n = s1.getInputStream().read();
assertTrue(n == -1);
});
}
public void testShutdownOutput1() throws IOException {
@Test
void testShutdownOutput1() throws IOException {
withConnection((s1, s2) -> {
OutputStream out = s1.getOutputStream();
scheduleShutdownOutput(s1, 2000);
byte[] data = new byte[128*1024];
try {
while (true) {
s1.getOutputStream().write(data);
out.write(data);
}
} catch (IOException expected) { }
});
}
public void testShutdownOutput2() throws IOException {
@Test
void testShutdownOutput2() throws IOException {
withConnection((s1, s2) -> {
s1.setSoTimeout(100);
try {
s1.getInputStream().read();
assertTrue(false);
fail();
} catch (SocketTimeoutException e) { }
OutputStream out = s1.getOutputStream();
scheduleShutdownOutput(s1, 2000);
byte[] data = new byte[128*1024];
try {
while (true) {
s1.getOutputStream().write(data);
out.write(data);
}
} catch (IOException expected) { }
});

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2018, 2023, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2018, 2025, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -21,15 +21,15 @@
* questions.
*/
/**
/*
* @test id=default
* @bug 8284161
* @bug 8284161 8372958
* @summary Test virtual threads doing blocking I/O on java.net Sockets
* @library /test/lib
* @run junit BlockingSocketOps
*/
/**
/*
* @test id=poller-modes
* @requires (os.family == "linux") | (os.family == "mac")
* @library /test/lib
@ -37,7 +37,7 @@
* @run junit/othervm -Djdk.pollerMode=2 BlockingSocketOps
*/
/**
/*
* @test id=no-vmcontinuations
* @requires vm.continuations
* @library /test/lib
@ -60,6 +60,8 @@ import java.net.SocketTimeoutException;
import jdk.test.lib.thread.VThreadRunner;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import static org.junit.jupiter.api.Assertions.*;
class BlockingSocketOps {
@ -90,19 +92,8 @@ class BlockingSocketOps {
/**
* Virtual thread blocks in read.
*/
@Test
void testSocketRead1() throws Exception {
testSocketRead(0);
}
/**
* Virtual thread blocks in timed read.
*/
@Test
void testSocketRead2() throws Exception {
testSocketRead(60_000);
}
@ParameterizedTest
@ValueSource(ints = { 0, 60_000 })
void testSocketRead(int timeout) throws Exception {
VThreadRunner.run(() -> {
try (var connection = new Connection()) {
@ -202,19 +193,8 @@ class BlockingSocketOps {
/**
* Socket close while virtual thread blocked in read.
*/
@Test
void testSocketReadAsyncClose1() throws Exception {
testSocketReadAsyncClose(0);
}
/**
* Socket close while virtual thread blocked in timed read.
*/
@Test
void testSocketReadAsyncClose2() throws Exception {
testSocketReadAsyncClose(0);
}
@ParameterizedTest
@ValueSource(ints = { 0, 60_000 })
void testSocketReadAsyncClose(int timeout) throws Exception {
VThreadRunner.run(() -> {
try (var connection = new Connection()) {
@ -230,7 +210,34 @@ class BlockingSocketOps {
try {
int n = s.getInputStream().read();
fail("read " + n);
} catch (SocketException expected) { }
} catch (SocketException expected) {
log(expected);
}
}
});
}
/**
* Socket shutdownInput while virtual thread blocked in read.
*/
@ParameterizedTest
@ValueSource(ints = { 0, 60_000 })
void testSocketReadAsyncShutdownInput(int timeout) throws Exception {
VThreadRunner.run(() -> {
try (var connection = new Connection()) {
Socket s = connection.socket1();
// delayed shutdown of input stream
InputStream in = s.getInputStream();
runAfterParkedAsync(s::shutdownInput);
// read should return -1
if (timeout > 0) {
s.setSoTimeout(timeout);
}
assertEquals(-1, in.read());
assertEquals(0, in.available());
assertFalse(s.isClosed());
}
});
}
@ -238,19 +245,8 @@ class BlockingSocketOps {
/**
* Virtual thread interrupted while blocked in Socket read.
*/
@Test
void testSocketReadInterrupt1() throws Exception {
testSocketReadInterrupt(0);
}
/**
* Virtual thread interrupted while blocked in Socket read with timeout
*/
@Test
void testSocketReadInterrupt2() throws Exception {
testSocketReadInterrupt(60_000);
}
@ParameterizedTest
@ValueSource(ints = { 0, 60_000 })
void testSocketReadInterrupt(int timeout) throws Exception {
VThreadRunner.run(() -> {
try (var connection = new Connection()) {
@ -269,6 +265,7 @@ class BlockingSocketOps {
int n = s.getInputStream().read();
fail("read " + n);
} catch (SocketException expected) {
log(expected);
assertTrue(Thread.interrupted());
assertTrue(s.isClosed());
}
@ -285,7 +282,7 @@ class BlockingSocketOps {
try (var connection = new Connection()) {
Socket s = connection.socket1();
// delayedclose of s
// delayed close of s
runAfterParkedAsync(s::close);
// write to s should block, then throw
@ -295,7 +292,36 @@ class BlockingSocketOps {
for (;;) {
out.write(ba);
}
} catch (SocketException expected) { }
} catch (SocketException expected) {
log(expected);
}
}
});
}
/**
* Socket shutdownOutput while virtual thread blocked in write.
*/
@Test
void testSocketWriteAsyncShutdownOutput() throws Exception {
VThreadRunner.run(() -> {
try (var connection = new Connection()) {
Socket s = connection.socket1();
// delayed shutdown of output stream
OutputStream out = s.getOutputStream();
runAfterParkedAsync(s::shutdownOutput);
// write to s should block, then throw
try {
byte[] ba = new byte[100*1024];
for (;;) {
out.write(ba);
}
} catch (SocketException expected) {
log(expected);
}
assertFalse(s.isClosed());
}
});
}
@ -321,6 +347,7 @@ class BlockingSocketOps {
out.write(ba);
}
} catch (SocketException expected) {
log(expected);
assertTrue(Thread.interrupted());
assertTrue(s.isClosed());
}
@ -355,7 +382,9 @@ class BlockingSocketOps {
try {
s1.getInputStream().read(ba);
fail();
} catch (SocketTimeoutException expected) { }
} catch (SocketTimeoutException expected) {
log(expected);
}
}
});
}
@ -384,19 +413,8 @@ class BlockingSocketOps {
/**
* Virtual thread blocks in accept.
*/
@Test
void testServerSocketAccept2() throws Exception {
testServerSocketAccept(0);
}
/**
* Virtual thread blocks in timed accept.
*/
@Test
void testServerSocketAccept3() throws Exception {
testServerSocketAccept(60_000);
}
@ParameterizedTest
@ValueSource(ints = { 0, 60_000 })
void testServerSocketAccept(int timeout) throws Exception {
VThreadRunner.run(() -> {
try (var listener = new ServerSocket()) {
@ -422,19 +440,8 @@ class BlockingSocketOps {
/**
* ServerSocket close while virtual thread blocked in accept.
*/
@Test
void testServerSocketAcceptAsyncClose1() throws Exception {
testServerSocketAcceptAsyncClose(0);
}
/**
* ServerSocket close while virtual thread blocked in timed accept.
*/
@Test
void testServerSocketAcceptAsyncClose2() throws Exception {
testServerSocketAcceptAsyncClose(60_000);
}
@ParameterizedTest
@ValueSource(ints = { 0, 60_000 })
void testServerSocketAcceptAsyncClose(int timeout) throws Exception {
VThreadRunner.run(() -> {
try (var listener = new ServerSocket()) {
@ -451,7 +458,9 @@ class BlockingSocketOps {
try {
listener.accept().close();
fail("connection accepted???");
} catch (SocketException expected) { }
} catch (SocketException expected) {
log(expected);
}
}
});
}
@ -459,19 +468,8 @@ class BlockingSocketOps {
/**
* Virtual thread interrupted while blocked in ServerSocket accept.
*/
@Test
void testServerSocketAcceptInterrupt1() throws Exception {
testServerSocketAcceptInterrupt(0);
}
/**
* Virtual thread interrupted while blocked in ServerSocket accept with timeout.
*/
@Test
void testServerSocketAcceptInterrupt2() throws Exception {
testServerSocketAcceptInterrupt(60_000);
}
@ParameterizedTest
@ValueSource(ints = { 0, 60_000 })
void testServerSocketAcceptInterrupt(int timeout) throws Exception {
VThreadRunner.run(() -> {
try (var listener = new ServerSocket()) {
@ -490,6 +488,7 @@ class BlockingSocketOps {
listener.accept().close();
fail("connection accepted???");
} catch (SocketException expected) {
log(expected);
assertTrue(Thread.interrupted());
assertTrue(listener.isClosed());
}
@ -529,20 +528,9 @@ class BlockingSocketOps {
/**
* Virtual thread blocks in DatagramSocket receive.
*/
@Test
void testDatagramSocketSendReceive2() throws Exception {
testDatagramSocketSendReceive(0);
}
/**
* Virtual thread blocks in DatagramSocket receive with timeout.
*/
@Test
void testDatagramSocketSendReceive3() throws Exception {
testDatagramSocketSendReceive(60_000);
}
private void testDatagramSocketSendReceive(int timeout) throws Exception {
@ParameterizedTest
@ValueSource(ints = { 0, 60_000 })
void testDatagramSocketSendReceive(int timeout) throws Exception {
VThreadRunner.run(() -> {
try (DatagramSocket s1 = new DatagramSocket(null);
DatagramSocket s2 = new DatagramSocket(null)) {
@ -585,7 +573,9 @@ class BlockingSocketOps {
try {
s.receive(p);
fail();
} catch (SocketTimeoutException expected) { }
} catch (SocketTimeoutException expected) {
log(expected);
}
}
});
}
@ -593,20 +583,9 @@ class BlockingSocketOps {
/**
* DatagramSocket close while virtual thread blocked in receive.
*/
@Test
void testDatagramSocketReceiveAsyncClose1() throws Exception {
testDatagramSocketReceiveAsyncClose(0);
}
/**
* DatagramSocket close while virtual thread blocked with timeout.
*/
@Test
void testDatagramSocketReceiveAsyncClose2() throws Exception {
testDatagramSocketReceiveAsyncClose(60_000);
}
private void testDatagramSocketReceiveAsyncClose(int timeout) throws Exception {
@ParameterizedTest
@ValueSource(ints = { 0, 60_000 })
void testDatagramSocketReceiveAsyncClose(int timeout) throws Exception {
VThreadRunner.run(() -> {
try (DatagramSocket s = new DatagramSocket(null)) {
InetAddress lh = InetAddress.getLoopbackAddress();
@ -624,7 +603,9 @@ class BlockingSocketOps {
DatagramPacket p = new DatagramPacket(ba, ba.length);
s.receive(p);
fail();
} catch (SocketException expected) { }
} catch (SocketException expected) {
log(expected);
}
}
});
}
@ -632,20 +613,9 @@ class BlockingSocketOps {
/**
* Virtual thread interrupted while blocked in DatagramSocket receive.
*/
@Test
void testDatagramSocketReceiveInterrupt1() throws Exception {
testDatagramSocketReceiveInterrupt(0);
}
/**
* Virtual thread interrupted while blocked in DatagramSocket receive with timeout.
*/
@Test
void testDatagramSocketReceiveInterrupt2() throws Exception {
testDatagramSocketReceiveInterrupt(60_000);
}
private void testDatagramSocketReceiveInterrupt(int timeout) throws Exception {
@ParameterizedTest
@ValueSource(ints = { 0, 60_000 })
void testDatagramSocketReceiveInterrupt(int timeout) throws Exception {
VThreadRunner.run(() -> {
try (DatagramSocket s = new DatagramSocket(null)) {
InetAddress lh = InetAddress.getLoopbackAddress();
@ -665,6 +635,7 @@ class BlockingSocketOps {
s.receive(p);
fail();
} catch (SocketException expected) {
log(expected);
assertTrue(Thread.interrupted());
assertTrue(s.isClosed());
}
@ -737,4 +708,11 @@ class BlockingSocketOps {
}
});
}
/**
* Log to System.err to inline with the JUnit messages.
*/
static void log(Throwable e) {
System.err.println(e);
}
}

View File

@ -66,6 +66,8 @@ import java.util.concurrent.locks.LockSupport;
import jdk.test.lib.thread.VThreadRunner;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import static org.junit.jupiter.api.Assertions.*;
class BlockingChannelOps {
@ -301,20 +303,9 @@ class BlockingChannelOps {
/**
* Virtual thread blocks in SocketChannel adaptor read.
*/
@Test
void testSocketAdaptorRead1() throws Exception {
testSocketAdaptorRead(0);
}
/**
* Virtual thread blocks in SocketChannel adaptor read with timeout.
*/
@Test
void testSocketAdaptorRead2() throws Exception {
testSocketAdaptorRead(60_000);
}
private void testSocketAdaptorRead(int timeout) throws Exception {
@ParameterizedTest
@ValueSource(ints = { 0, 60_000 })
void testSocketAdaptorRead(int timeout) throws Exception {
VThreadRunner.run(() -> {
try (var connection = new Connection()) {
SocketChannel sc1 = connection.channel1();
@ -420,20 +411,9 @@ class BlockingChannelOps {
/**
* Virtual thread blocks in ServerSocketChannel adaptor accept.
*/
@Test
void testSocketChannelAdaptorAccept1() throws Exception {
testSocketChannelAdaptorAccept(0);
}
/**
* Virtual thread blocks in ServerSocketChannel adaptor accept with timeout.
*/
@Test
void testSocketChannelAdaptorAccept2() throws Exception {
testSocketChannelAdaptorAccept(60_000);
}
private void testSocketChannelAdaptorAccept(int timeout) throws Exception {
@ParameterizedTest
@ValueSource(ints = { 0, 60_000 })
void testSocketChannelAdaptorAccept(int timeout) throws Exception {
VThreadRunner.run(() -> {
try (var ssc = ServerSocketChannel.open()) {
ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
@ -546,20 +526,9 @@ class BlockingChannelOps {
/**
* Virtual thread blocks in DatagramSocket adaptor receive.
*/
@Test
void testDatagramSocketAdaptorReceive1() throws Exception {
testDatagramSocketAdaptorReceive(0);
}
/**
* Virtual thread blocks in DatagramSocket adaptor receive with timeout.
*/
@Test
void testDatagramSocketAdaptorReceive2() throws Exception {
testDatagramSocketAdaptorReceive(60_000);
}
private void testDatagramSocketAdaptorReceive(int timeout) throws Exception {
@ParameterizedTest
@ValueSource(ints = { 0, 60_000 })
void testDatagramSocketAdaptorReceive(int timeout) throws Exception {
VThreadRunner.run(() -> {
try (DatagramChannel dc1 = DatagramChannel.open();
DatagramChannel dc2 = DatagramChannel.open()) {
@ -585,21 +554,9 @@ class BlockingChannelOps {
/**
* DatagramChannel close while virtual thread blocked in adaptor receive.
*/
@Test
void testDatagramSocketAdaptorReceiveAsyncClose1() throws Exception {
testDatagramSocketAdaptorReceiveAsyncClose(0);
}
/**
* DatagramChannel close while virtual thread blocked in adaptor receive
* with timeout.
*/
@Test
void testDatagramSocketAdaptorReceiveAsyncClose2() throws Exception {
testDatagramSocketAdaptorReceiveAsyncClose(60_1000);
}
private void testDatagramSocketAdaptorReceiveAsyncClose(int timeout) throws Exception {
@ParameterizedTest
@ValueSource(ints = { 0, 60_000 })
void testDatagramSocketAdaptorReceiveAsyncClose(int timeout) throws Exception {
VThreadRunner.run(() -> {
try (DatagramChannel dc = DatagramChannel.open()) {
InetAddress lh = InetAddress.getLoopbackAddress();
@ -621,21 +578,9 @@ class BlockingChannelOps {
/**
* Virtual thread interrupted while blocked in DatagramSocket adaptor receive.
*/
@Test
void testDatagramSocketAdaptorReceiveInterrupt1() throws Exception {
testDatagramSocketAdaptorReceiveInterrupt(0);
}
/**
* Virtual thread interrupted while blocked in DatagramSocket adaptor receive
* with timeout.
*/
@Test
void testDatagramSocketAdaptorReceiveInterrupt2() throws Exception {
testDatagramSocketAdaptorReceiveInterrupt(60_1000);
}
private void testDatagramSocketAdaptorReceiveInterrupt(int timeout) throws Exception {
@ParameterizedTest
@ValueSource(ints = { 0, 60_000 })
void testDatagramSocketAdaptorReceiveInterrupt(int timeout) throws Exception {
VThreadRunner.run(() -> {
try (DatagramChannel dc = DatagramChannel.open()) {
InetAddress lh = InetAddress.getLoopbackAddress();