8367067: Improve exception handling in HttpRequest.BodyPublishers

Reviewed-by: jpai, dfuchs
This commit is contained in:
Volkan Yazici 2025-09-19 12:07:27 +00:00
parent 802d9c23dc
commit 87d50425fc
14 changed files with 1665 additions and 67 deletions

View File

@ -0,0 +1,50 @@
/*
* Copyright (c) 2025, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package jdk.internal.net.http;
import java.util.Iterator;
/**
* An {@link Iterable} clone supporting checked exceptions.
*
* @param <E> the type of elements returned by the produced iterators
*/
@FunctionalInterface
interface CheckedIterable<E> {
/**
* {@return an {@linkplain CheckedIterator iterator} over elements of type {@code E}}
*/
CheckedIterator<E> iterator() throws Exception;
static <E> CheckedIterable<E> fromIterable(Iterable<E> iterable) {
return () -> {
Iterator<E> iterator = iterable.iterator();
return CheckedIterator.fromIterator(iterator);
};
}
}

View File

@ -0,0 +1,68 @@
/*
* Copyright (c) 2025, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package jdk.internal.net.http;
import java.util.Iterator;
import java.util.NoSuchElementException;
/**
* An {@link Iterator} clone supporting checked exceptions.
*
* @param <E> the type of elements returned by this iterator
*/
interface CheckedIterator<E> {
/**
* {@return {@code true} if the iteration has more elements}
* @throws Exception if operation fails
*/
boolean hasNext() throws Exception;
/**
* {@return the next element in the iteration}
*
* @throws NoSuchElementException if the iteration has no more elements
* @throws Exception if operation fails
*/
E next() throws Exception;
static <E> CheckedIterator<E> fromIterator(Iterator<E> iterator) {
return new CheckedIterator<>() {
@Override
public boolean hasNext() {
return iterator.hasNext();
}
@Override
public E next() {
return iterator.next();
}
};
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2016, 2019, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2016, 2025, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -25,46 +25,55 @@
package jdk.internal.net.http;
import java.util.Iterator;
import java.util.concurrent.Flow;
import jdk.internal.net.http.common.Demand;
import jdk.internal.net.http.common.SequentialScheduler;
/**
* A Publisher that publishes items obtained from the given Iterable. Each new
* subscription gets a new Iterator.
* A {@linkplain Flow.Publisher publisher} that publishes items obtained from the given {@link CheckedIterable}.
* Each new subscription gets a new {@link CheckedIterator}.
*/
class PullPublisher<T> implements Flow.Publisher<T> {
// Only one of `iterable` and `throwable` can be non-null. throwable is
// Only one of `iterable` or `throwable` should be null, and the other non-null. throwable is
// non-null when an error has been encountered, by the creator of
// PullPublisher, while subscribing the subscriber, but before subscribe has
// completed.
private final Iterable<T> iterable;
private final CheckedIterable<T> iterable;
private final Throwable throwable;
PullPublisher(Iterable<T> iterable, Throwable throwable) {
PullPublisher(CheckedIterable<T> iterable, Throwable throwable) {
if ((iterable == null) == (throwable == null)) {
String message = String.format(
"only one of `iterable` or `throwable` should be null, and the other non-null, but %s are null",
throwable == null ? "both" : "none");
throw new IllegalArgumentException(message);
}
this.iterable = iterable;
this.throwable = throwable;
}
PullPublisher(Iterable<T> iterable) {
PullPublisher(CheckedIterable<T> iterable) {
this(iterable, null);
}
@Override
public void subscribe(Flow.Subscriber<? super T> subscriber) {
Subscription sub;
if (throwable != null) {
assert iterable == null : "non-null iterable: " + iterable;
sub = new Subscription(subscriber, null, throwable);
} else {
assert throwable == null : "non-null exception: " + throwable;
sub = new Subscription(subscriber, iterable.iterator(), null);
Throwable failure = throwable;
CheckedIterator<T> iterator = null;
if (failure == null) {
try {
iterator = iterable.iterator();
} catch (Exception exception) {
failure = exception;
}
}
Subscription sub = failure != null
? new Subscription(subscriber, null, failure)
: new Subscription(subscriber, iterator, null);
subscriber.onSubscribe(sub);
if (throwable != null) {
if (failure != null) {
sub.pullScheduler.runOrSchedule();
}
}
@ -72,7 +81,7 @@ class PullPublisher<T> implements Flow.Publisher<T> {
private class Subscription implements Flow.Subscription {
private final Flow.Subscriber<? super T> subscriber;
private final Iterator<T> iter;
private final CheckedIterator<T> iter;
private volatile boolean completed;
private volatile boolean cancelled;
private volatile Throwable error;
@ -80,7 +89,7 @@ class PullPublisher<T> implements Flow.Publisher<T> {
private final Demand demand = new Demand();
Subscription(Flow.Subscriber<? super T> subscriber,
Iterator<T> iter,
CheckedIterator<T> iter,
Throwable throwable) {
this.subscriber = subscriber;
this.iter = iter;
@ -117,7 +126,18 @@ class PullPublisher<T> implements Flow.Publisher<T> {
}
subscriber.onNext(next);
}
if (!iter.hasNext() && !cancelled) {
boolean hasNext;
try {
hasNext = iter.hasNext();
} catch (Exception e) {
completed = true;
pullScheduler.stop();
subscriber.onError(e);
return;
}
if (!hasNext && !cancelled) {
completed = true;
pullScheduler.stop();
subscriber.onComplete();

View File

@ -73,8 +73,11 @@ public final class RequestPublishers {
this(content, offset, length, Utils.BUFSIZE);
}
/* bufSize exposed for testing purposes */
ByteArrayPublisher(byte[] content, int offset, int length, int bufSize) {
private ByteArrayPublisher(byte[] content, int offset, int length, int bufSize) {
Objects.checkFromIndexSize(offset, length, content.length); // Implicit null check on `content`
if (bufSize <= 0) {
throw new IllegalArgumentException("Invalid buffer size: " + bufSize);
}
this.content = content;
this.offset = offset;
this.length = length;
@ -99,7 +102,7 @@ public final class RequestPublishers {
@Override
public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
List<ByteBuffer> copy = copy(content, offset, length);
var delegate = new PullPublisher<>(copy);
var delegate = new PullPublisher<>(CheckedIterable.fromIterable(copy));
delegate.subscribe(subscriber);
}
@ -121,7 +124,7 @@ public final class RequestPublishers {
// The ByteBufferIterator will iterate over the byte[] arrays in
// the content one at the time.
//
class ByteBufferIterator implements Iterator<ByteBuffer> {
private final class ByteBufferIterator implements CheckedIterator<ByteBuffer> {
final ConcurrentLinkedQueue<ByteBuffer> buffers = new ConcurrentLinkedQueue<>();
final Iterator<byte[]> iterator = content.iterator();
@Override
@ -166,13 +169,9 @@ public final class RequestPublishers {
}
}
public Iterator<ByteBuffer> iterator() {
return new ByteBufferIterator();
}
@Override
public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
Iterable<ByteBuffer> iterable = this::iterator;
CheckedIterable<ByteBuffer> iterable = () -> new ByteBufferIterator();
var delegate = new PullPublisher<>(iterable);
delegate.subscribe(subscriber);
}
@ -202,13 +201,13 @@ public final class RequestPublishers {
public static class StringPublisher extends ByteArrayPublisher {
public StringPublisher(String content, Charset charset) {
super(content.getBytes(charset));
super(content.getBytes(Objects.requireNonNull(charset))); // Implicit null check on `content`
}
}
public static class EmptyPublisher implements BodyPublisher {
private final Flow.Publisher<ByteBuffer> delegate =
new PullPublisher<ByteBuffer>(Collections.emptyList(), null);
new PullPublisher<>(CheckedIterable.fromIterable(Collections.emptyList()), null);
@Override
public long contentLength() {
@ -290,7 +289,7 @@ public final class RequestPublishers {
/**
* Reads one buffer ahead all the time, blocking in hasNext()
*/
public static class StreamIterator implements Iterator<ByteBuffer> {
private static final class StreamIterator implements CheckedIterator<ByteBuffer> {
final InputStream is;
final Supplier<? extends ByteBuffer> bufSupplier;
private volatile boolean eof;
@ -331,20 +330,8 @@ public final class RequestPublishers {
return n;
}
/**
* Close stream in this instance.
* UncheckedIOException may be thrown if IOE happens at InputStream::close.
*/
private void closeStream() {
try {
is.close();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
@Override
public boolean hasNext() {
public boolean hasNext() throws IOException {
stateLock.lock();
try {
return hasNext0();
@ -353,7 +340,7 @@ public final class RequestPublishers {
}
}
private boolean hasNext0() {
private boolean hasNext0() throws IOException {
if (need2Read) {
try {
haveNext = read() != -1;
@ -363,10 +350,10 @@ public final class RequestPublishers {
} catch (IOException e) {
haveNext = false;
need2Read = false;
throw new UncheckedIOException(e);
throw e;
} finally {
if (!haveNext) {
closeStream();
is.close();
}
}
}
@ -374,7 +361,7 @@ public final class RequestPublishers {
}
@Override
public ByteBuffer next() {
public ByteBuffer next() throws IOException {
stateLock.lock();
try {
if (!hasNext()) {
@ -398,18 +385,23 @@ public final class RequestPublishers {
@Override
public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
PullPublisher<ByteBuffer> publisher;
InputStream is = streamSupplier.get();
if (is == null) {
Throwable t = new IOException("streamSupplier returned null");
publisher = new PullPublisher<>(null, t);
} else {
publisher = new PullPublisher<>(iterableOf(is), null);
InputStream is = null;
Exception exception = null;
try {
is = streamSupplier.get();
if (is == null) {
exception = new IOException("Stream supplier returned null");
}
} catch (Exception cause) {
exception = new IOException("Stream supplier has failed", cause);
}
PullPublisher<ByteBuffer> publisher = exception != null
? new PullPublisher<>(null, exception)
: new PullPublisher<>(iterableOf(is), null);
publisher.subscribe(subscriber);
}
protected Iterable<ByteBuffer> iterableOf(InputStream is) {
private CheckedIterable<ByteBuffer> iterableOf(InputStream is) {
return () -> new StreamIterator(is);
}
@ -442,13 +434,13 @@ public final class RequestPublishers {
@Override
public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
Iterable<ByteBuffer> iterable = () -> new FileChannelIterator(channel, position, limit);
CheckedIterable<ByteBuffer> iterable = () -> new FileChannelIterator(channel, position, limit);
new PullPublisher<>(iterable).subscribe(subscriber);
}
}
private static final class FileChannelIterator implements Iterator<ByteBuffer> {
private static final class FileChannelIterator implements CheckedIterator<ByteBuffer> {
private final FileChannel channel;
@ -470,7 +462,7 @@ public final class RequestPublishers {
}
@Override
public ByteBuffer next() {
public ByteBuffer next() throws IOException {
if (!hasNext()) {
throw new NoSuchElementException();
}
@ -487,7 +479,7 @@ public final class RequestPublishers {
}
} catch (IOException ioe) {
terminated = true;
throw new UncheckedIOException(ioe);
throw ioe;
}
return buffer.flip();
}

View File

@ -531,9 +531,8 @@ class FileChannelPublisherTest {
// Verifying the client failure
LOGGER.log("Verifying the client failure");
Exception requestFailure0 = assertThrows(ExecutionException.class, () -> responseFutureRef.get().get());
Exception requestFailure1 = assertInstanceOf(UncheckedIOException.class, requestFailure0.getCause());
assertInstanceOf(ClosedChannelException.class, requestFailure1.getCause());
Exception requestFailure = assertThrows(ExecutionException.class, () -> responseFutureRef.get().get());
assertInstanceOf(ClosedChannelException.class, requestFailure.getCause());
verifyServerIncompleteRead(pair, fileLength);
@ -578,9 +577,8 @@ class FileChannelPublisherTest {
// Verifying the client failure
LOGGER.log("Verifying the client failure");
Exception requestFailure0 = assertThrows(ExecutionException.class, responseFuture::get);
Exception requestFailure1 = assertInstanceOf(UncheckedIOException.class, requestFailure0.getCause());
Exception requestFailure2 = assertInstanceOf(IOException.class, requestFailure1.getCause());
String requestFailure2Message = requestFailure2.getMessage();
Exception requestFailure1 = assertInstanceOf(IOException.class, requestFailure0.getCause());
String requestFailure2Message = requestFailure1.getMessage();
assertTrue(
requestFailure2Message.contains("Unexpected EOF"),
"unexpected message: " + requestFailure2Message);

View File

@ -0,0 +1,91 @@
/*
* Copyright (c) 2025, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public final class ByteBufferUtils {
private ByteBufferUtils() {}
public static void assertEquals(ByteBuffer expectedBuffer, ByteBuffer actualBuffer, String message) {
assertEquals(bytes(expectedBuffer), bytes(actualBuffer), message);
}
public static void assertEquals(byte[] expectedBytes, ByteBuffer actualBuffer, String message) {
assertEquals(expectedBytes, bytes(actualBuffer), message);
}
public static void assertEquals(byte[] expectedBytes, byte[] actualBytes, String message) {
Objects.requireNonNull(expectedBytes);
Objects.requireNonNull(actualBytes);
int mismatchIndex = Arrays.mismatch(expectedBytes, actualBytes);
if (mismatchIndex >= 0) {
Byte expectedByte = mismatchIndex >= expectedBytes.length ? null : expectedBytes[mismatchIndex];
Byte actualByte = mismatchIndex >= actualBytes.length ? null : actualBytes[mismatchIndex];
String extendedMessage = String.format(
"%s" +
"array contents differ at index [%s], expected: <%s> but was: <%s>%n" +
"expected: %s%n" +
"actual: %s%n",
message == null ? "" : (message + ": "),
mismatchIndex, expectedByte, actualByte,
prettyPrintBytes(expectedBytes),
prettyPrintBytes(actualBytes));
throw new AssertionError(extendedMessage);
}
}
private static byte[] bytes(ByteBuffer buffer) {
byte[] bytes = new byte[buffer.limit()];
buffer.get(bytes);
return bytes;
}
private static String prettyPrintBytes(byte[] bytes) {
return IntStream.range(0, bytes.length)
.mapToObj(i -> "" + bytes[i])
.collect(Collectors.joining(", ", "[", "]"));
}
public static int findLengthExceedingMaxMemory() {
long memoryLength = Runtime.getRuntime().maxMemory();
double length = Math.ceil(1.5D * memoryLength);
if (length < 1 || length > Integer.MAX_VALUE) {
throw new IllegalArgumentException("Bogus or excessive memory: " + memoryLength);
}
return (int) length;
}
public static byte[] byteArrayOfLength(int length) {
byte[] bytes = new byte[length];
for (int i = 0; i < length; i++) {
bytes[i] = (byte) i;
}
return bytes;
}
}

View File

@ -0,0 +1,100 @@
/*
* Copyright (c) 2025, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import java.net.http.HttpRequest;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
/*
* @test
* @bug 8364733
* @summary Verify all specified `HttpRequest.BodyPublishers::fromPublisher` behavior
* @build RecordingSubscriber
* @run junit FromPublisherTest
*/
class FromPublisherTest {
@Test
void testNullPublisher() {
assertThrows(NullPointerException.class, () -> HttpRequest.BodyPublishers.fromPublisher(null));
assertThrows(NullPointerException.class, () -> HttpRequest.BodyPublishers.fromPublisher(null, 1));
}
@ParameterizedTest
@ValueSource(longs = {0L, -1L, Long.MIN_VALUE})
void testInvalidContentLength(long contentLength) {
IllegalArgumentException exception = assertThrows(
IllegalArgumentException.class,
() -> HttpRequest.BodyPublishers.fromPublisher(null, contentLength));
String exceptionMessage = exception.getMessage();
assertTrue(
exceptionMessage.contains("non-positive contentLength"),
"Unexpected exception message: " + exceptionMessage);
}
@ParameterizedTest
@ValueSource(longs = {1, 2, 3, 4})
void testValidContentLength(long contentLength) {
HttpRequest.BodyPublisher publisher =
HttpRequest.BodyPublishers.fromPublisher(HttpRequest.BodyPublishers.noBody(), contentLength);
assertEquals(contentLength, publisher.contentLength());
}
@Test
void testNoContentLength() {
HttpRequest.BodyPublisher publisher =
HttpRequest.BodyPublishers.fromPublisher(HttpRequest.BodyPublishers.noBody());
assertEquals(-1, publisher.contentLength());
}
@Test
void testNullSubscriber() {
HttpRequest.BodyPublisher publisher =
HttpRequest.BodyPublishers.fromPublisher(HttpRequest.BodyPublishers.noBody());
assertThrows(NullPointerException.class, () -> publisher.subscribe(null));
}
@Test
void testDelegation() throws InterruptedException {
BlockingQueue<Object> publisherInvocations = new LinkedBlockingQueue<>();
HttpRequest.BodyPublisher publisher = HttpRequest.BodyPublishers.fromPublisher(subscriber -> {
publisherInvocations.add("subscribe");
publisherInvocations.add(subscriber);
});
RecordingSubscriber subscriber = new RecordingSubscriber();
publisher.subscribe(subscriber);
assertEquals("subscribe", publisherInvocations.take());
assertEquals(subscriber, publisherInvocations.take());
assertTrue(subscriber.invocations.isEmpty());
}
}

View File

@ -0,0 +1,57 @@
/*
* Copyright (c) 2025, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
import org.junit.jupiter.api.Test;
import java.net.http.HttpRequest;
import java.util.concurrent.Flow;
import static org.junit.jupiter.api.Assertions.assertEquals;
/*
* @test
* @bug 8364733
* @summary Verify all specified `HttpRequest.BodyPublishers::noBody` behavior
* @build RecordingSubscriber
* @run junit NoBodyTest
*/
class NoBodyTest {
@Test
void test() throws InterruptedException {
// Create the publisher
HttpRequest.BodyPublisher publisher = HttpRequest.BodyPublishers.noBody();
// Subscribe
RecordingSubscriber subscriber = new RecordingSubscriber();
Flow.Subscription subscription = subscriber.verifyAndSubscribe(publisher, 0);
// Verify the state after `request()`
subscription.request(Long.MAX_VALUE);
assertEquals("onComplete", subscriber.invocations.take());
}
}

View File

@ -0,0 +1,134 @@
/*
* Copyright (c) 2025, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import java.net.http.HttpRequest;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.Flow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
/*
* @test
* @bug 8364733
* @summary Verify all specified `HttpRequest.BodyPublishers::ofByteArray` behavior
* @build RecordingSubscriber
* @run junit OfByteArrayTest
*
* @comment Using `main/othervm` to initiate tests that depend on a custom-configured JVM
* @run main/othervm -Djdk.httpclient.bufsize=-1 OfByteArrayTest testInvalidBufferSize
* @run main/othervm -Djdk.httpclient.bufsize=0 OfByteArrayTest testInvalidBufferSize
* @run main/othervm -Djdk.httpclient.bufsize=3 OfByteArrayTest testChunking "" 0 0 ""
* @run main/othervm -Djdk.httpclient.bufsize=3 OfByteArrayTest testChunking a 0 0 ""
* @run main/othervm -Djdk.httpclient.bufsize=3 OfByteArrayTest testChunking a 1 0 ""
* @run main/othervm -Djdk.httpclient.bufsize=3 OfByteArrayTest testChunking a 0 1 a
* @run main/othervm -Djdk.httpclient.bufsize=3 OfByteArrayTest testChunking ab 0 1 a
* @run main/othervm -Djdk.httpclient.bufsize=3 OfByteArrayTest testChunking ab 1 1 b
* @run main/othervm -Djdk.httpclient.bufsize=3 OfByteArrayTest testChunking ab 0 2 ab
* @run main/othervm -Djdk.httpclient.bufsize=1 OfByteArrayTest testChunking abc 0 3 a:b:c
* @run main/othervm -Djdk.httpclient.bufsize=2 OfByteArrayTest testChunking abc 0 3 ab:c
* @run main/othervm -Djdk.httpclient.bufsize=2 OfByteArrayTest testChunking abcdef 2 4 cd:ef
*/
public class OfByteArrayTest {
private static final Charset CHARSET = StandardCharsets.US_ASCII;
@Test
void testNullContent() {
assertThrows(NullPointerException.class, () -> HttpRequest.BodyPublishers.ofByteArray(null));
assertThrows(NullPointerException.class, () -> HttpRequest.BodyPublishers.ofByteArray(null, 1, 2));
}
@ParameterizedTest
@CsvSource({
"abc,-1,1", // Negative offset
"abc,1,-1", // Negative length
"'',1,1", // Offset overflow on empty string
"a,2,1", // Offset overflow
"'',0,1", // Length overflow on empty string
"a,0,2", // Length overflow
})
void testInvalidOffsetOrLength(String contentText, int offset, int length) {
byte[] content = contentText.getBytes(CHARSET);
assertThrows(
IndexOutOfBoundsException.class,
() -> HttpRequest.BodyPublishers.ofByteArray(content, offset, length));
}
/**
* Initiates tests that depend on a custom-configured JVM.
*/
public static void main(String[] args) throws InterruptedException {
switch (args[0]) {
case "testInvalidBufferSize" -> testInvalidBufferSize();
case "testChunking" -> testChunking(
parseStringArg(args[1]),
Integer.parseInt(args[2]),
Integer.parseInt(args[3]),
parseStringArg(args[4]));
default -> throw new IllegalArgumentException("Unexpected arguments: " + List.of(args));
}
}
private static String parseStringArg(String arg) {
return arg == null || arg.trim().equals("\"\"") ? "" : arg;
}
private static void testInvalidBufferSize() {
assertThrows(IllegalArgumentException.class, () -> HttpRequest.BodyPublishers.ofByteArray(new byte[1]));
}
private static void testChunking(
String contentText, int offset, int length, String expectedBuffersText)
throws InterruptedException {
// Create the publisher
byte[] content = contentText.getBytes(CHARSET);
HttpRequest.BodyPublisher publisher = HttpRequest.BodyPublishers.ofByteArray(content, offset, length);
// Subscribe
RecordingSubscriber subscriber = new RecordingSubscriber();
Flow.Subscription subscription = subscriber.verifyAndSubscribe(publisher, length);
// Verify the state after `request()`
String[] expectedBuffers = expectedBuffersText.isEmpty() ? new String[0] : expectedBuffersText.split(":");
subscription.request(Long.MAX_VALUE);
for (int bufferIndex = 0; bufferIndex < expectedBuffers.length; bufferIndex++) {
assertEquals("onNext", subscriber.invocations.take());
String actualBuffer = CHARSET.decode((ByteBuffer) subscriber.invocations.take()).toString();
String expectedBuffer = expectedBuffers[bufferIndex];
assertEquals(expectedBuffer, actualBuffer, "buffer mismatch at index " + bufferIndex);
}
assertEquals("onComplete", subscriber.invocations.take());
}
}

View File

@ -0,0 +1,317 @@
/*
* Copyright (c) 2025, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import java.net.http.HttpRequest;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Flow;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
/*
* @test
* @bug 8364733
* @summary Verify all specified `HttpRequest.BodyPublishers::ofByteArrays` behavior
* @build ByteBufferUtils
* RecordingSubscriber
* @run junit OfByteArraysTest
*
* @comment Using `main/othervm` to initiate tests that depend on a custom-configured JVM
* @run main/othervm -Xmx64m OfByteArraysTest testOOM
*/
public class OfByteArraysTest {
@ParameterizedTest
@ValueSource(ints = {0, 1, 2, 3})
void testIteratorOfLength(int length) throws InterruptedException {
// Create the publisher
List<byte[]> buffers = IntStream
.range(0, length)
.mapToObj(i -> new byte[]{(byte) i})
.toList();
HttpRequest.BodyPublisher publisher = HttpRequest.BodyPublishers.ofByteArrays(buffers::iterator);
// Subscribe
RecordingSubscriber subscriber = new RecordingSubscriber();
Flow.Subscription subscription = subscriber.verifyAndSubscribe(publisher, -1);
// Verify the state after `request()`
subscription.request(Long.MAX_VALUE);
for (int bufferIndex = 0; bufferIndex < length; bufferIndex++) {
assertEquals("onNext", subscriber.invocations.take());
byte[] expectedBuffer = buffers.get(bufferIndex);
ByteBuffer actualBuffer = (ByteBuffer) subscriber.invocations.take();
ByteBufferUtils.assertEquals(expectedBuffer, actualBuffer, "buffer mismatch at index " + bufferIndex);
}
assertEquals("onComplete", subscriber.invocations.take());
}
@Test
void testDifferentIterators() throws InterruptedException {
// Create a publisher using an iterable that returns a different iterator at each invocation
byte[] buffer1 = ByteBufferUtils.byteArrayOfLength(9);
byte[] buffer2 = ByteBufferUtils.byteArrayOfLength(9);
int[] iteratorRequestCount = {0};
Iterable<byte[]> iterable = () -> switch (++iteratorRequestCount[0]) {
case 1 -> List.of(buffer1).iterator();
case 2 -> List.of(buffer2).iterator();
default -> throw new AssertionError();
};
HttpRequest.BodyPublisher publisher = HttpRequest.BodyPublishers.ofByteArrays(iterable);
// Subscribe twice (to force two `Iterable::iterator` invocations)
RecordingSubscriber subscriber = new RecordingSubscriber();
Flow.Subscription subscription1 = subscriber.verifyAndSubscribe(publisher, -1);
Flow.Subscription subscription2 = subscriber.verifyAndSubscribe(publisher, -1);
// Drain emissions until completion, and verify the content
byte[] actualBuffer1 = subscriber.drainToByteArray(subscription1, Long.MAX_VALUE);
byte[] actualBuffer2 = subscriber.drainToByteArray(subscription2, Long.MAX_VALUE);
ByteBufferUtils.assertEquals(buffer1, actualBuffer1, null);
ByteBufferUtils.assertEquals(buffer2, actualBuffer2, null);
}
@Test
void testNullIterable() {
assertThrows(NullPointerException.class, () -> HttpRequest.BodyPublishers.ofByteArrays(null));
}
@Test
void testNullIterator() throws InterruptedException {
// Create the publisher
HttpRequest.BodyPublisher publisher = HttpRequest.BodyPublishers.ofByteArrays(() -> null);
// Subscribe
RecordingSubscriber subscriber = new RecordingSubscriber();
Flow.Subscription subscription = subscriber.verifyAndSubscribe(publisher, -1);
// Verify the NPE
subscription.request(1);
assertEquals("onError", subscriber.invocations.take());
assertInstanceOf(NullPointerException.class, subscriber.invocations.take());
}
@Test
void testNullArray() throws InterruptedException {
// Create the publisher
List<byte[]> iterable = new ArrayList<>();
iterable.add(null);
HttpRequest.BodyPublisher publisher = HttpRequest.BodyPublishers.ofByteArrays(iterable);
// Subscribe
RecordingSubscriber subscriber = new RecordingSubscriber();
Flow.Subscription subscription = subscriber.verifyAndSubscribe(publisher, -1);
// Verify the NPE
subscription.request(1);
assertEquals("onError", subscriber.invocations.take());
assertInstanceOf(NullPointerException.class, subscriber.invocations.take());
}
@Test
void testThrowingIterable() throws InterruptedException {
// Create the publisher
RuntimeException exception = new RuntimeException("failure for `testIteratorCreationException`");
HttpRequest.BodyPublisher publisher = HttpRequest.BodyPublishers.ofByteArrays(() -> {
throw exception;
});
// Subscribe
RecordingSubscriber subscriber = new RecordingSubscriber();
Flow.Subscription subscription = subscriber.verifyAndSubscribe(publisher, -1);
// Verify the failure
subscription.request(1);
assertEquals("onError", subscriber.invocations.take());
Exception actualException = (Exception) subscriber.invocations.take();
assertSame(exception, actualException);
}
static Stream<Arguments> testThrowingIteratorArgs() {
RuntimeException hasNextException = new RuntimeException("failure for `hasNext`");
RuntimeException nextException = new RuntimeException("failure for `next`");
return Stream.of(
Arguments.of(0, hasNextException, null, hasNextException),
Arguments.of(0, hasNextException, nextException, hasNextException),
Arguments.of(1, hasNextException, null, hasNextException),
Arguments.of(1, hasNextException, nextException, hasNextException),
Arguments.of(1, null, nextException, nextException));
}
@ParameterizedTest
@MethodSource("testThrowingIteratorArgs")
void testThrowingIterator(
int exceptionIndex, RuntimeException hasNextException, RuntimeException nextException, Exception expectedException)
throws InterruptedException {
// Create the publisher
IteratorThrowingAtEnd iterator =
new IteratorThrowingAtEnd(exceptionIndex, hasNextException, nextException);
HttpRequest.BodyPublisher publisher = HttpRequest.BodyPublishers.ofByteArrays(() -> iterator);
// Subscribe
RecordingSubscriber subscriber = new RecordingSubscriber();
Flow.Subscription subscription = subscriber.verifyAndSubscribe(publisher, -1);
// Drain successful emissions
subscription.request(Long.MAX_VALUE);
for (int itemIndex = 0; itemIndex < exceptionIndex; itemIndex++) {
assertEquals("onNext", subscriber.invocations.take());
ByteBuffer actualBuffer = (ByteBuffer) subscriber.invocations.take();
ByteBuffer expectedBuffer = ByteBuffer.wrap(iterator.content, itemIndex, 1);
ByteBufferUtils.assertEquals(expectedBuffer, actualBuffer, null);
}
// Verify the result
if (expectedException == null) {
assertEquals("onComplete", subscriber.invocations.take());
} else {
assertEquals("onError", subscriber.invocations.take());
Exception actualException = (Exception) subscriber.invocations.take();
assertSame(expectedException, actualException);
}
}
private static final class IteratorThrowingAtEnd implements Iterator<byte[]> {
private final byte[] content;
private final RuntimeException hasNextException;
private final RuntimeException nextException;
private int position;
private IteratorThrowingAtEnd(
int length,
RuntimeException hasNextException,
RuntimeException nextException) {
this.content = ByteBufferUtils.byteArrayOfLength(length);
this.hasNextException = hasNextException;
this.nextException = nextException;
}
@Override
public synchronized boolean hasNext() {
if (position >= content.length && hasNextException != null) {
throw hasNextException;
}
// We always instruct to proceed, so `next()` can throw
return true;
}
@Override
public synchronized byte[] next() {
if (position < content.length) {
return new byte[]{content[position++]};
}
assertNotNull(nextException);
throw nextException;
}
}
/**
* Initiates tests that depend on a custom-configured JVM.
*/
public static void main(String[] args) throws Exception {
if ("testOOM".equals(args[0])) {
testOOM();
} else {
throw new IllegalArgumentException("Unknown arguments: " + List.of(args));
}
}
private static void testOOM() throws Exception {
// Create the publisher
int length = ByteBufferUtils.findLengthExceedingMaxMemory();
Iterable<byte[]> iterable = createIterableOfLength(length);
HttpRequest.BodyPublisher publisher = HttpRequest.BodyPublishers.ofByteArrays(iterable);
// Subscribe
RecordingSubscriber subscriber = new RecordingSubscriber();
Flow.Subscription subscription = subscriber.verifyAndSubscribe(publisher, -1);
// Drain emissions until completion, and verify the received content length
final int[] readLength = {0};
subscriber.drainToAccumulator(subscription, 1, buffer -> readLength[0] += buffer.limit());
assertEquals(length, readLength[0]);
}
private static Iterable<byte[]> createIterableOfLength(int length) {
return () -> new Iterator<>() {
// Instead of emitting `length` at once, doing it gradually using a buffer to avoid OOM.
private final byte[] buffer = new byte[8192];
private volatile int remainingLength = length;
@Override
public boolean hasNext() {
return remainingLength > 0;
}
@Override
public synchronized byte[] next() {
if (remainingLength >= buffer.length) {
remainingLength -= buffer.length;
return buffer;
} else {
byte[] remainingBuffer = new byte[remainingLength];
remainingLength = 0;
return remainingBuffer;
}
}
};
}
}

View File

@ -0,0 +1,290 @@
/*
* Copyright (c) 2025, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.net.http.HttpRequest;
import java.nio.ByteBuffer;
import java.nio.file.FileSystem;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Flow;
import java.util.stream.Stream;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
/*
* @test
* @bug 8364733
* @summary Verify all specified `HttpRequest.BodyPublishers::ofFile` behavior
* @build ByteBufferUtils
* RecordingSubscriber
* @run junit OfFileTest
*
* @comment Using `main/othervm` to initiate tests that depend on a custom-configured JVM
* @run main/othervm -Xmx64m OfFileTest testOOM
*/
public class OfFileTest {
private static final Path DEFAULT_FS_DIR = Path.of(System.getProperty("user.dir", "."));
private static final FileSystem ZIP_FS = zipFs();
private static final Path ZIP_FS_DIR = ZIP_FS.getRootDirectories().iterator().next();
private static final List<Path> PARENT_DIRS = List.of(DEFAULT_FS_DIR, ZIP_FS_DIR);
private static FileSystem zipFs() {
try {
Path zipFile = DEFAULT_FS_DIR.resolve("file.zip");
return FileSystems.newFileSystem(zipFile, Map.of("create", "true"));
} catch (IOException ioe) {
throw new UncheckedIOException(ioe);
}
}
@AfterAll
static void closeZipFs() throws IOException {
ZIP_FS.close();
}
static List<Path> parentDirs() {
return PARENT_DIRS;
}
@Test
void testNullPath() {
assertThrows(NullPointerException.class, () -> HttpRequest.BodyPublishers.ofFile(null));
}
@ParameterizedTest
@MethodSource("parentDirs")
void testNonExistentPath(Path parentDir) {
Path nonExistentPath = createFilePath(parentDir, "testNonExistentPath");
assertThrows(FileNotFoundException.class, () -> HttpRequest.BodyPublishers.ofFile(nonExistentPath));
}
@ParameterizedTest
@MethodSource("parentDirs")
void testNonExistentPathAtSubscribe(Path parentDir) throws Exception {
// Create the publisher
byte[] fileBytes = ByteBufferUtils.byteArrayOfLength(3);
Path filePath = createFile(parentDir, "testNonExistentPathAtSubscribe", fileBytes);
HttpRequest.BodyPublisher publisher = HttpRequest.BodyPublishers.ofFile(filePath);
// Delete the file
Files.delete(filePath);
// Subscribe
RecordingSubscriber subscriber = new RecordingSubscriber();
Flow.Subscription subscription = subscriber.verifyAndSubscribe(publisher, fileBytes.length);
// Verify the state after `request()`
subscription.request(1);
assertEquals("onError", subscriber.invocations.take());
FileNotFoundException actualException = (FileNotFoundException) subscriber.invocations.take();
String actualExceptionMessage = actualException.getMessage();
assertTrue(
actualExceptionMessage.contains("Not a regular file"),
"Unexpected message: " + actualExceptionMessage);
}
@ParameterizedTest
@MethodSource("parentDirs")
void testIrregularFile(Path parentDir) throws Exception {
// Create the publisher
HttpRequest.BodyPublisher publisher = HttpRequest.BodyPublishers.ofFile(parentDir);
// Subscribe
RecordingSubscriber subscriber = new RecordingSubscriber();
Flow.Subscription subscription = subscriber.verifyAndSubscribe(publisher, Files.size(parentDir));
// Verify the state after `request()`
subscription.request(1);
assertEquals("onError", subscriber.invocations.take());
FileNotFoundException actualException = (FileNotFoundException) subscriber.invocations.take();
String actualExceptionMessage = actualException.getMessage();
assertTrue(
actualExceptionMessage.contains("Not a regular file"),
"Unexpected message: " + actualExceptionMessage);
}
/**
* A <em>big enough</em> file length to observe the effects of file
* modification whilst the file is getting read.
*/
private static final int BIG_FILE_LENGTH = 8 * 1024 * 1024; // 8 MiB
@ParameterizedTest
@MethodSource("parentDirs")
void testFileModificationWhileReading(Path parentDir) throws Exception {
// ZIP file system (sadly?) consumes the entire content at open.
// Hence, we cannot observe the effect of file modification while reading.
if (parentDir == ZIP_FS_DIR) {
return;
}
// Create the publisher
byte[] fileBytes = ByteBufferUtils.byteArrayOfLength(BIG_FILE_LENGTH);
Path filePath = createFile(parentDir, "testFileModificationWhileReading", fileBytes);
HttpRequest.BodyPublisher publisher = HttpRequest.BodyPublishers.ofFile(filePath);
// Subscribe
RecordingSubscriber subscriber = new RecordingSubscriber();
Flow.Subscription subscription = subscriber.verifyAndSubscribe(publisher, fileBytes.length);
// Verify the state after the 1st `request()`
subscription.request(1);
assertEquals("onNext", subscriber.invocations.take());
ByteBuffer buffer1 = (ByteBuffer) subscriber.invocations.take();
assertTrue(buffer1.limit() > 0, "unexpected empty buffer");
List<ByteBuffer> buffers = new ArrayList<>();
buffers.add(buffer1);
// Truncate the file
Files.write(filePath, new byte[0]);
// Drain emissions until completion, and verify the content
byte[] readBytes = subscriber.drainToByteArray(subscription, Long.MAX_VALUE, buffers);
assertTrue(
readBytes.length < fileBytes.length,
"was expecting less than the total amount (%s bytes), found: %s".formatted(
fileBytes.length, readBytes.length));
ByteBuffer expectedReadBytes = ByteBuffer.wrap(fileBytes, 0, readBytes.length);
ByteBufferUtils.assertEquals(expectedReadBytes, ByteBuffer.wrap(readBytes), null);
}
static Stream<Arguments> testFileOfLengthParams() {
return PARENT_DIRS
.stream()
.flatMap(parentDir -> Stream
.of(0, 1, 2, 3, BIG_FILE_LENGTH)
.map(fileLength -> Arguments.of(parentDir, fileLength)));
}
@ParameterizedTest
@MethodSource("testFileOfLengthParams")
void testFileOfLength(Path parentDir, int fileLength) throws Exception {
// Create the publisher
byte[] fileBytes = ByteBufferUtils.byteArrayOfLength(fileLength);
Path filePath = createFile(parentDir, "testFileOfLength", fileBytes);
HttpRequest.BodyPublisher publisher = HttpRequest.BodyPublishers.ofFile(filePath);
// Subscribe
RecordingSubscriber subscriber = new RecordingSubscriber();
Flow.Subscription subscription = subscriber.verifyAndSubscribe(publisher, fileBytes.length);
// Drain emissions until completion, and verify the received content
byte[] readBytes = subscriber.drainToByteArray(subscription, Long.MAX_VALUE);
ByteBufferUtils.assertEquals(fileBytes, readBytes, null);
}
/**
* Initiates tests that depend on a custom-configured JVM.
*/
public static void main(String[] args) throws Exception {
if ("testOOM".equals(args[0])) {
testOOM();
} else {
throw new IllegalArgumentException("Unknown arguments: " + List.of(args));
}
}
private static void testOOM() {
for (Path parentDir : PARENT_DIRS) {
try {
testOOM(parentDir);
} catch (Exception exception) {
throw new AssertionError("failed for parent directory: " + parentDir, exception);
}
}
}
private static void testOOM(Path parentDir) throws Exception {
// Create the publisher
int fileLength = ByteBufferUtils.findLengthExceedingMaxMemory();
Path filePath = createFileOfLength(parentDir, "testOOM", fileLength);
HttpRequest.BodyPublisher publisher = HttpRequest.BodyPublishers.ofFile(filePath);
// Subscribe
RecordingSubscriber subscriber = new RecordingSubscriber();
Flow.Subscription subscription = subscriber.verifyAndSubscribe(publisher, fileLength);
// Drain emissions until completion, and verify the received content length
final int[] readLength = {0};
subscriber.drainToAccumulator(subscription, 1, buffer -> readLength[0] += buffer.limit());
assertEquals(fileLength, readLength[0]);
}
private static Path createFileOfLength(Path parentDir, String identifier, int fileLength) throws IOException {
Path filePath = createFilePath(parentDir, identifier);
try (OutputStream fileStream = Files.newOutputStream(filePath)) {
byte[] buffer = ByteBufferUtils.byteArrayOfLength(8192);
for (int writtenLength = 0; writtenLength < fileLength; writtenLength += buffer.length) {
int remainingLength = fileLength - writtenLength;
byte[] effectiveBuffer = remainingLength < buffer.length
? ByteBufferUtils.byteArrayOfLength(remainingLength)
: buffer;
fileStream.write(effectiveBuffer);
}
}
return filePath;
}
private static Path createFile(Path parentDir, String identifier, byte[] fileBytes) throws IOException {
Path filePath = createFilePath(parentDir, identifier);
Files.write(filePath, fileBytes);
return filePath;
}
private static Path createFilePath(Path parentDir, String identifier) {
String fileName = identifier.replaceAll("\\W*", "");
return parentDir.resolve(fileName);
}
}

View File

@ -0,0 +1,226 @@
/*
* Copyright (c) 2025, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.http.HttpRequest;
import java.util.List;
import java.util.concurrent.Flow;
import java.util.function.Supplier;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
/*
* @test
* @bug 8364733
* @summary Verify all specified `HttpRequest.BodyPublishers::ofInputStream` behavior
* @build ByteBufferUtils
* RecordingSubscriber
* @run junit OfInputStreamTest
*
* @comment Using `main/othervm` to initiate tests that depend on a custom-configured JVM
* @run main/othervm -Xmx64m OfInputStreamTest testOOM
*/
public class OfInputStreamTest {
@Test
void testNullInputStreamSupplier() {
assertThrows(NullPointerException.class, () -> HttpRequest.BodyPublishers.ofInputStream(null));
}
@Test
void testThrowingInputStreamSupplier() throws InterruptedException {
// Create the publisher
RuntimeException exception = new RuntimeException();
HttpRequest.BodyPublisher publisher = HttpRequest.BodyPublishers.ofInputStream(() -> { throw exception; });
// Subscribe
RecordingSubscriber subscriber = new RecordingSubscriber();
Flow.Subscription subscription = subscriber.verifyAndSubscribe(publisher, -1);
// Verify the state after `request()`
subscription.request(1);
assertEquals("onError", subscriber.invocations.take());
IOException actualException = (IOException) subscriber.invocations.take();
assertEquals("Stream supplier has failed", actualException.getMessage());
assertSame(exception, actualException.getCause());
}
@Test
void testNullInputStream() throws InterruptedException {
// Create the publisher
HttpRequest.BodyPublisher publisher = HttpRequest.BodyPublishers.ofInputStream(() -> null);
// Subscribe
RecordingSubscriber subscriber = new RecordingSubscriber();
Flow.Subscription subscription = subscriber.verifyAndSubscribe(publisher, -1);
// Verify the state after `request()`
subscription.request(1);
assertEquals("onError", subscriber.invocations.take());
IOException actualException = (IOException) subscriber.invocations.take();
assertEquals("Stream supplier returned null", actualException.getMessage());
}
@Test
void testInputStreamSupplierInvocations() throws InterruptedException {
// Create a publisher from an `InputStream` supplier returning a different instance at each invocation
byte[] buffer1 = ByteBufferUtils.byteArrayOfLength(10);
byte[] buffer2 = ByteBufferUtils.byteArrayOfLength(10);
int[] inputStreamSupplierInvocationCount = {0};
Supplier<InputStream> inputStreamSupplier = () ->
switch (++inputStreamSupplierInvocationCount[0]) {
case 1 -> new ByteArrayInputStream(buffer1);
case 2 -> new ByteArrayInputStream(buffer2);
default -> throw new AssertionError();
};
HttpRequest.BodyPublisher publisher = HttpRequest.BodyPublishers.ofInputStream(inputStreamSupplier);
// Subscribe
RecordingSubscriber subscriber = new RecordingSubscriber();
Flow.Subscription subscription1 = subscriber.verifyAndSubscribe(publisher, -1);
Flow.Subscription subscription2 = subscriber.verifyAndSubscribe(publisher, -1);
// Drain each subscription and verify the received content
byte[] actualBuffer1 = subscriber.drainToByteArray(subscription1, Long.MAX_VALUE);
ByteBufferUtils.assertEquals(buffer1, actualBuffer1, null);
byte[] actualBuffer2 = subscriber.drainToByteArray(subscription2, Long.MAX_VALUE);
ByteBufferUtils.assertEquals(buffer2, actualBuffer2, null);
}
@ParameterizedTest
@ValueSource(ints = {0, 1, 2, 3})
void testInputStreamOfLength(int length) throws InterruptedException {
// Create the publisher
byte[] content = ByteBufferUtils.byteArrayOfLength(length);
InputStream inputStream = new ByteArrayInputStream(content);
HttpRequest.BodyPublisher publisher = HttpRequest.BodyPublishers.ofInputStream(() -> inputStream);
// Subscribe
RecordingSubscriber subscriber = new RecordingSubscriber();
Flow.Subscription subscription = subscriber.verifyAndSubscribe(publisher, -1);
// Drain emissions until completion, and verify the received content
byte[] actualContent = subscriber.drainToByteArray(subscription, Long.MAX_VALUE);
ByteBufferUtils.assertEquals(content, actualContent, null);
}
@ParameterizedTest
@ValueSource(ints = {0, 1, 2, 3})
void testThrowingInputStream(int exceptionIndex) throws InterruptedException {
// Create the publisher
RuntimeException exception = new RuntimeException("failure for `read`");
InputStream inputStream = new InputStreamThrowingOnCompletion(exceptionIndex, exception);
HttpRequest.BodyPublisher publisher = HttpRequest.BodyPublishers.ofInputStream(() -> inputStream);
// Subscribe
RecordingSubscriber subscriber = new RecordingSubscriber();
Flow.Subscription subscription = subscriber.verifyAndSubscribe(publisher, -1);
// Verify the failure
subscription.request(1);
assertEquals("onError", subscriber.invocations.take());
Exception actualException = (Exception) subscriber.invocations.take();
assertSame(exception, actualException);
}
private static final class InputStreamThrowingOnCompletion extends InputStream {
private final int length;
private final RuntimeException exception;
private int position;
private InputStreamThrowingOnCompletion(int length, RuntimeException exception) {
this.length = length;
this.exception = exception;
}
@Override
public synchronized int read() {
if (position < length) {
return position++ & 0xFF;
}
throw exception;
}
}
/**
* Initiates tests that depend on a custom-configured JVM.
*/
public static void main(String[] args) throws Exception {
if ("testOOM".equals(args[0])) {
testOOM();
} else {
throw new IllegalArgumentException("Unknown arguments: " + List.of(args));
}
}
private static void testOOM() throws InterruptedException {
// Create the publisher using an `InputStream` that emits content exceeding the maximum memory
int length = ByteBufferUtils.findLengthExceedingMaxMemory();
HttpRequest.BodyPublisher publisher =
HttpRequest.BodyPublishers.ofInputStream(() -> new InputStream() {
private int position;
@Override
public synchronized int read() {
return position < length ? (position++ & 0xFF) : -1;
}
});
// Subscribe
RecordingSubscriber subscriber = new RecordingSubscriber();
Flow.Subscription subscription = subscriber.verifyAndSubscribe(publisher, -1);
// Drain emissions until completion, and verify the received content length
final int[] readLength = {0};
subscriber.drainToAccumulator(subscription, 1, buffer -> readLength[0] += buffer.limit());
assertEquals(length, readLength[0]);
}
}

View File

@ -0,0 +1,119 @@
/*
* Copyright (c) 2025, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;
import java.net.http.HttpRequest;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.Flow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
/*
* @test
* @bug 8364733
* @summary Verify all specified `HttpRequest.BodyPublishers::ofString` behavior
* @build ByteBufferUtils
* RecordingSubscriber
* @run junit OfStringTest
*/
class OfStringTest {
private static final Charset CHARSET = StandardCharsets.US_ASCII;
@ParameterizedTest
@ValueSource(ints = {0, 1, 2, 3})
void testContentOfLength(int length) throws InterruptedException {
// Create the publisher
char[] contentChars = new char[length];
for (int i = 0; i < length; i++) {
contentChars[i] = (char) ('a' + i);
}
String content = new String(contentChars);
HttpRequest.BodyPublisher publisher = HttpRequest.BodyPublishers.ofString(content, CHARSET);
// Subscribe
assertEquals(length, publisher.contentLength());
RecordingSubscriber subscriber = new RecordingSubscriber();
publisher.subscribe(subscriber);
assertEquals("onSubscribe", subscriber.invocations.take());
Flow.Subscription subscription = (Flow.Subscription) subscriber.invocations.take();
// Verify the state after `request()`
subscription.request(Long.MAX_VALUE);
if (length > 0) {
assertEquals("onNext", subscriber.invocations.take());
String actualContent = CHARSET.decode((ByteBuffer) subscriber.invocations.take()).toString();
assertEquals(content, actualContent);
}
assertEquals("onComplete", subscriber.invocations.take());
}
@ParameterizedTest
@CsvSource({
"a,UTF-8",
"b,UTF-16",
"ı,ISO-8859-9"
})
void testCharset(String content, Charset charset) throws InterruptedException {
// Create the publisher
HttpRequest.BodyPublisher publisher = HttpRequest.BodyPublishers.ofString(content, charset);
// Subscribe
ByteBuffer expectedBuffer = charset.encode(content);
assertEquals(expectedBuffer.limit(), publisher.contentLength());
RecordingSubscriber subscriber = new RecordingSubscriber();
publisher.subscribe(subscriber);
assertEquals("onSubscribe", subscriber.invocations.take());
Flow.Subscription subscription = (Flow.Subscription) subscriber.invocations.take();
// Verify the state after `request()`
subscription.request(Long.MAX_VALUE);
assertEquals("onNext", subscriber.invocations.take());
ByteBuffer actualBuffer = (ByteBuffer) subscriber.invocations.take();
ByteBufferUtils.assertEquals(expectedBuffer, actualBuffer, null);
assertEquals("onComplete", subscriber.invocations.take());
}
@Test
void testNullContent() {
assertThrows(NullPointerException.class, () -> HttpRequest.BodyPublishers.ofString(null, CHARSET));
}
@Test
void testNullCharset() {
assertThrows(NullPointerException.class, () -> HttpRequest.BodyPublishers.ofString("foo", null));
}
}

View File

@ -0,0 +1,136 @@
/*
* Copyright (c) 2025, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
import java.net.http.HttpRequest;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Flow;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Consumer;
import static org.junit.jupiter.api.Assertions.assertEquals;
/**
* Test subscriber recording received invocations.
*/
public final class RecordingSubscriber implements Flow.Subscriber<ByteBuffer> {
public final BlockingQueue<Object> invocations = new LinkedBlockingQueue<>();
@Override
public void onSubscribe(Flow.Subscription subscription) {
invocations.add("onSubscribe");
invocations.add(subscription);
}
@Override
public void onNext(ByteBuffer item) {
invocations.add("onNext");
invocations.add(item);
}
@Override
public synchronized void onError(Throwable throwable) {
invocations.add("onError");
invocations.add(throwable);
}
@Override
public synchronized void onComplete() {
invocations.add("onComplete");
}
/**
* Verifies the content length of the given publisher and subscribes to it.
*/
public Flow.Subscription verifyAndSubscribe(HttpRequest.BodyPublisher publisher, long contentLength)
throws InterruptedException {
assertEquals(contentLength, publisher.contentLength());
publisher.subscribe(this);
assertEquals("onSubscribe", invocations.take());
return (Flow.Subscription) invocations.take();
}
/**
* {@return the byte sequence collected by draining all emissions until completion}
*
* @param subscription a subscription to drain from
* @param itemCount the number of items to request per iteration
*/
public byte[] drainToByteArray(Flow.Subscription subscription, long itemCount) throws InterruptedException {
return drainToByteArray(subscription, itemCount, new ArrayList<>());
}
/**
* {@return the byte sequence collected by draining all emissions until completion}
*
* @param subscription a subscription to drain from
* @param itemCount the number of items to request per iteration
* @param buffers a list to accumulate the received content in
*/
public byte[] drainToByteArray(Flow.Subscription subscription, long itemCount, List<ByteBuffer> buffers)
throws InterruptedException {
drainToAccumulator(subscription, itemCount, buffers::add);
return flattenBuffers(buffers);
}
/**
* Drains all emissions until completion to the given {@code accumulator}.
*
* @param subscription a subscription to drain from
* @param itemCount the number of items to request per iteration
* @param accumulator an accumulator to pass the received content to
*/
public void drainToAccumulator(
Flow.Subscription subscription, long itemCount, Consumer<ByteBuffer> accumulator)
throws InterruptedException {
boolean completed = false;
while (!completed) {
subscription.request(itemCount);
String op = (String) invocations.take();
if ("onNext".equals(op)) {
ByteBuffer buffer = (ByteBuffer) invocations.take();
accumulator.accept(buffer);
} else if ("onComplete".equals(op)) {
completed = true;
} else {
throw new AssertionError("Unexpected invocation: " + op);
}
}
}
private static byte[] flattenBuffers(List<ByteBuffer> buffers) {
int arrayLength = buffers.stream().mapToInt(ByteBuffer::limit).sum();
byte[] array = new byte[arrayLength];
for (int bufferIndex = 0, arrayOffset = 0; bufferIndex < buffers.size(); bufferIndex++) {
ByteBuffer buffer = buffers.get(bufferIndex);
int bufferLimit = buffer.limit();
buffer.get(array, arrayOffset, bufferLimit);
arrayOffset += bufferLimit;
}
return array;
}
}