mirror of
https://github.com/openjdk/jdk.git
synced 2026-01-28 12:09:14 +00:00
8371718: (sc) Channels.new{Input,Output}Stream can allocate unbounded memory for a socket channel
Reviewed-by: alanb
This commit is contained in:
parent
52aa7fe1c9
commit
74dca863c2
@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright (c) 2001, 2024, Oracle and/or its affiliates. All rights reserved.
|
||||
* Copyright (c) 2001, 2025, Oracle and/or its affiliates. All rights reserved.
|
||||
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
|
||||
*
|
||||
* This code is free software; you can redistribute it and/or modify it
|
||||
@ -109,6 +109,7 @@ class ChannelInputStream extends InputStream {
|
||||
if (len == 0)
|
||||
return 0;
|
||||
|
||||
len = Math.min(len, Streams.MAX_BUFFER_SIZE);
|
||||
ByteBuffer bb = ((this.bs == bs)
|
||||
? this.bb
|
||||
: ByteBuffer.wrap(bs));
|
||||
|
||||
@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright (c) 2021, 2022, Oracle and/or its affiliates. All rights reserved.
|
||||
* Copyright (c) 2021, 2025, Oracle and/or its affiliates. All rights reserved.
|
||||
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
|
||||
*
|
||||
* This code is free software; you can redistribute it and/or modify it
|
||||
@ -64,10 +64,15 @@ class ChannelOutputStream extends OutputStream {
|
||||
* If the channel is selectable then it must be configured blocking.
|
||||
*/
|
||||
private void writeFully(ByteBuffer bb) throws IOException {
|
||||
while (bb.remaining() > 0) {
|
||||
int pos = bb.position();
|
||||
int rem = bb.limit() - pos;
|
||||
while (rem > 0) {
|
||||
bb.limit(pos + Math.min(Streams.MAX_BUFFER_SIZE, rem));
|
||||
int n = ch.write(bb);
|
||||
if (n <= 0)
|
||||
throw new RuntimeException("no bytes written");
|
||||
throw new IOException("Write failed");
|
||||
pos += n;
|
||||
rem -= n;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -78,10 +78,6 @@ import static jdk.internal.util.Exceptions.formatMsg;
|
||||
public final class NioSocketImpl extends SocketImpl implements PlatformSocketImpl {
|
||||
private static final NativeDispatcher nd = new SocketDispatcher();
|
||||
|
||||
// The maximum number of bytes to read/write per syscall to avoid needing
|
||||
// a huge buffer from the temporary buffer cache
|
||||
private static final int MAX_BUFFER_SIZE = 128 * 1024;
|
||||
|
||||
// true if this is a SocketImpl for a ServerSocket
|
||||
private final boolean server;
|
||||
|
||||
@ -355,8 +351,8 @@ public final class NioSocketImpl extends SocketImpl implements PlatformSocketImp
|
||||
// emulate legacy behavior to return -1, even if socket is closed
|
||||
if (readEOF)
|
||||
return -1;
|
||||
// read up to MAX_BUFFER_SIZE bytes
|
||||
int size = Math.min(len, MAX_BUFFER_SIZE);
|
||||
// read up to Streams.MAX_BUFFER_SIZE bytes
|
||||
int size = Math.min(len, Streams.MAX_BUFFER_SIZE);
|
||||
int n = implRead(b, off, size, remainingNanos);
|
||||
if (n == -1)
|
||||
readEOF = true;
|
||||
@ -453,8 +449,8 @@ public final class NioSocketImpl extends SocketImpl implements PlatformSocketImp
|
||||
int pos = off;
|
||||
int end = off + len;
|
||||
while (pos < end) {
|
||||
// write up to MAX_BUFFER_SIZE bytes
|
||||
int size = Math.min((end - pos), MAX_BUFFER_SIZE);
|
||||
// write up to Streams.MAX_BUFFER_SIZE bytes
|
||||
int size = Math.min((end - pos), Streams.MAX_BUFFER_SIZE);
|
||||
int n = implWrite(b, pos, size);
|
||||
pos += n;
|
||||
}
|
||||
|
||||
@ -1373,6 +1373,7 @@ class SocketChannelImpl
|
||||
// nothing to do
|
||||
return 0;
|
||||
}
|
||||
len = Math.min(len, Streams.MAX_BUFFER_SIZE);
|
||||
|
||||
readLock.lock();
|
||||
try {
|
||||
@ -1469,7 +1470,7 @@ class SocketChannelImpl
|
||||
beginWrite(true);
|
||||
configureSocketNonBlockingIfVirtualThread();
|
||||
while (pos < end && isOpen()) {
|
||||
int size = end - pos;
|
||||
int size = Math.min(end - pos, Streams.MAX_BUFFER_SIZE);
|
||||
int n = tryWrite(b, pos, size);
|
||||
while (IOStatus.okayToRetry(n) && isOpen()) {
|
||||
park(Net.POLLOUT);
|
||||
|
||||
@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright (c) 2021, Oracle and/or its affiliates. All rights reserved.
|
||||
* Copyright (c) 2021, 2025, Oracle and/or its affiliates. All rights reserved.
|
||||
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
|
||||
*
|
||||
* This code is free software; you can redistribute it and/or modify it
|
||||
@ -33,6 +33,10 @@ import java.nio.channels.WritableByteChannel;
|
||||
* Factory methods for input/output streams based on channels.
|
||||
*/
|
||||
public class Streams {
|
||||
// The maximum number of bytes to read/write per syscall to avoid needing
|
||||
// a huge buffer from the temporary buffer cache
|
||||
static final int MAX_BUFFER_SIZE = 128 * 1024;
|
||||
|
||||
private Streams() { }
|
||||
|
||||
/**
|
||||
|
||||
@ -22,10 +22,10 @@
|
||||
*/
|
||||
|
||||
/* @test
|
||||
* @bug 8279339
|
||||
* @run testng SocketChannelStreams
|
||||
* @bug 8279339 8371718
|
||||
* @summary Exercise InputStream/OutputStream returned by Channels.newXXXStream
|
||||
* when channel is a SocketChannel
|
||||
* @run testng SocketChannelStreams
|
||||
*/
|
||||
|
||||
import java.io.Closeable;
|
||||
@ -35,6 +35,8 @@ import java.io.OutputStream;
|
||||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.Socket;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.ByteChannel;
|
||||
import java.nio.channels.Channels;
|
||||
import java.nio.channels.IllegalBlockingModeException;
|
||||
import java.nio.channels.ServerSocketChannel;
|
||||
@ -51,6 +53,9 @@ import static org.testng.Assert.*;
|
||||
|
||||
@Test
|
||||
public class SocketChannelStreams {
|
||||
// Maximum size of internal temporary buffer
|
||||
private static final int MAX_BUFFER_SIZE = 128*1024;
|
||||
|
||||
private ScheduledExecutorService executor;
|
||||
|
||||
@BeforeClass()
|
||||
@ -379,6 +384,25 @@ public class SocketChannelStreams {
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that internal buffers have at most MAX_BUFFER_SIZE bytes remaining.
|
||||
*/
|
||||
public void testReadLimit() throws IOException {
|
||||
InputStream in = Channels.newInputStream(new TestChannel());
|
||||
byte[] b = new byte[3*MAX_BUFFER_SIZE];
|
||||
int n = in.read(b, 0, b.length);
|
||||
assertEquals(n, MAX_BUFFER_SIZE);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that internal buffers have at most MAX_BUFFER_SIZE bytes remaining.
|
||||
*/
|
||||
public void testWriteLimit() throws IOException {
|
||||
OutputStream out = Channels.newOutputStream(new TestChannel());
|
||||
byte[] b = new byte[3*MAX_BUFFER_SIZE];
|
||||
out.write(b, 0, b.length);
|
||||
}
|
||||
|
||||
// -- test infrastructure --
|
||||
|
||||
private interface ThrowingTask {
|
||||
@ -477,4 +501,40 @@ public class SocketChannelStreams {
|
||||
private Future<?> schedule(Runnable task, long delay) {
|
||||
return executor.schedule(task, delay, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
/**
|
||||
* ByteChannel that throws if more than 128k bytes remain
|
||||
* in the buffer supplied for reading or writing.
|
||||
*/
|
||||
private static class TestChannel implements ByteChannel {
|
||||
@Override
|
||||
public int read(ByteBuffer bb) throws IOException {
|
||||
int rem = bb.remaining();
|
||||
if (rem > MAX_BUFFER_SIZE) {
|
||||
throw new IOException("too big");
|
||||
}
|
||||
bb.position(bb.limit());
|
||||
return rem;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int write(ByteBuffer bb) throws IOException {
|
||||
int rem = bb.remaining();
|
||||
if (rem > MAX_BUFFER_SIZE) {
|
||||
throw new IOException("too big");
|
||||
}
|
||||
bb.position(bb.limit());
|
||||
return rem;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isOpen() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user