diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/CheckedIterable.java b/src/java.net.http/share/classes/jdk/internal/net/http/CheckedIterable.java new file mode 100644 index 00000000000..bb74ac3d6e9 --- /dev/null +++ b/src/java.net.http/share/classes/jdk/internal/net/http/CheckedIterable.java @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2025, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. Oracle designates this + * particular file as subject to the "Classpath" exception as provided + * by Oracle in the LICENSE file that accompanied this code. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ + +package jdk.internal.net.http; + +import java.util.Iterator; + +/** + * An {@link Iterable} clone supporting checked exceptions. + * + * @param the type of elements returned by the produced iterators + */ +@FunctionalInterface +interface CheckedIterable { + + /** + * {@return an {@linkplain CheckedIterator iterator} over elements of type {@code E}} + */ + CheckedIterator iterator() throws Exception; + + static CheckedIterable fromIterable(Iterable iterable) { + return () -> { + Iterator iterator = iterable.iterator(); + return CheckedIterator.fromIterator(iterator); + }; + } + +} diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/CheckedIterator.java b/src/java.net.http/share/classes/jdk/internal/net/http/CheckedIterator.java new file mode 100644 index 00000000000..42e8e2b6c87 --- /dev/null +++ b/src/java.net.http/share/classes/jdk/internal/net/http/CheckedIterator.java @@ -0,0 +1,68 @@ +/* + * Copyright (c) 2025, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. Oracle designates this + * particular file as subject to the "Classpath" exception as provided + * by Oracle in the LICENSE file that accompanied this code. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ + +package jdk.internal.net.http; + +import java.util.Iterator; +import java.util.NoSuchElementException; + +/** + * An {@link Iterator} clone supporting checked exceptions. + * + * @param the type of elements returned by this iterator + */ +interface CheckedIterator { + + /** + * {@return {@code true} if the iteration has more elements} + * @throws Exception if operation fails + */ + boolean hasNext() throws Exception; + + /** + * {@return the next element in the iteration} + * + * @throws NoSuchElementException if the iteration has no more elements + * @throws Exception if operation fails + */ + E next() throws Exception; + + static CheckedIterator fromIterator(Iterator iterator) { + return new CheckedIterator<>() { + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public E next() { + return iterator.next(); + } + + }; + } + +} diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/PullPublisher.java b/src/java.net.http/share/classes/jdk/internal/net/http/PullPublisher.java index 0556214648e..d1019c05629 100644 --- a/src/java.net.http/share/classes/jdk/internal/net/http/PullPublisher.java +++ b/src/java.net.http/share/classes/jdk/internal/net/http/PullPublisher.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016, 2019, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2016, 2025, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -25,46 +25,55 @@ package jdk.internal.net.http; -import java.util.Iterator; import java.util.concurrent.Flow; + import jdk.internal.net.http.common.Demand; import jdk.internal.net.http.common.SequentialScheduler; /** - * A Publisher that publishes items obtained from the given Iterable. Each new - * subscription gets a new Iterator. + * A {@linkplain Flow.Publisher publisher} that publishes items obtained from the given {@link CheckedIterable}. + * Each new subscription gets a new {@link CheckedIterator}. */ class PullPublisher implements Flow.Publisher { - // Only one of `iterable` and `throwable` can be non-null. throwable is + // Only one of `iterable` or `throwable` should be null, and the other non-null. throwable is // non-null when an error has been encountered, by the creator of // PullPublisher, while subscribing the subscriber, but before subscribe has // completed. - private final Iterable iterable; + private final CheckedIterable iterable; private final Throwable throwable; - PullPublisher(Iterable iterable, Throwable throwable) { + PullPublisher(CheckedIterable iterable, Throwable throwable) { + if ((iterable == null) == (throwable == null)) { + String message = String.format( + "only one of `iterable` or `throwable` should be null, and the other non-null, but %s are null", + throwable == null ? "both" : "none"); + throw new IllegalArgumentException(message); + } this.iterable = iterable; this.throwable = throwable; } - PullPublisher(Iterable iterable) { + PullPublisher(CheckedIterable iterable) { this(iterable, null); } @Override public void subscribe(Flow.Subscriber subscriber) { - Subscription sub; - if (throwable != null) { - assert iterable == null : "non-null iterable: " + iterable; - sub = new Subscription(subscriber, null, throwable); - } else { - assert throwable == null : "non-null exception: " + throwable; - sub = new Subscription(subscriber, iterable.iterator(), null); + Throwable failure = throwable; + CheckedIterator iterator = null; + if (failure == null) { + try { + iterator = iterable.iterator(); + } catch (Exception exception) { + failure = exception; + } } + Subscription sub = failure != null + ? new Subscription(subscriber, null, failure) + : new Subscription(subscriber, iterator, null); subscriber.onSubscribe(sub); - - if (throwable != null) { + if (failure != null) { sub.pullScheduler.runOrSchedule(); } } @@ -72,7 +81,7 @@ class PullPublisher implements Flow.Publisher { private class Subscription implements Flow.Subscription { private final Flow.Subscriber subscriber; - private final Iterator iter; + private final CheckedIterator iter; private volatile boolean completed; private volatile boolean cancelled; private volatile Throwable error; @@ -80,7 +89,7 @@ class PullPublisher implements Flow.Publisher { private final Demand demand = new Demand(); Subscription(Flow.Subscriber subscriber, - Iterator iter, + CheckedIterator iter, Throwable throwable) { this.subscriber = subscriber; this.iter = iter; @@ -117,7 +126,18 @@ class PullPublisher implements Flow.Publisher { } subscriber.onNext(next); } - if (!iter.hasNext() && !cancelled) { + + boolean hasNext; + try { + hasNext = iter.hasNext(); + } catch (Exception e) { + completed = true; + pullScheduler.stop(); + subscriber.onError(e); + return; + } + + if (!hasNext && !cancelled) { completed = true; pullScheduler.stop(); subscriber.onComplete(); diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/RequestPublishers.java b/src/java.net.http/share/classes/jdk/internal/net/http/RequestPublishers.java index 88cabe15419..5aea5648e19 100644 --- a/src/java.net.http/share/classes/jdk/internal/net/http/RequestPublishers.java +++ b/src/java.net.http/share/classes/jdk/internal/net/http/RequestPublishers.java @@ -73,8 +73,11 @@ public final class RequestPublishers { this(content, offset, length, Utils.BUFSIZE); } - /* bufSize exposed for testing purposes */ - ByteArrayPublisher(byte[] content, int offset, int length, int bufSize) { + private ByteArrayPublisher(byte[] content, int offset, int length, int bufSize) { + Objects.checkFromIndexSize(offset, length, content.length); // Implicit null check on `content` + if (bufSize <= 0) { + throw new IllegalArgumentException("Invalid buffer size: " + bufSize); + } this.content = content; this.offset = offset; this.length = length; @@ -99,7 +102,7 @@ public final class RequestPublishers { @Override public void subscribe(Flow.Subscriber subscriber) { List copy = copy(content, offset, length); - var delegate = new PullPublisher<>(copy); + var delegate = new PullPublisher<>(CheckedIterable.fromIterable(copy)); delegate.subscribe(subscriber); } @@ -121,7 +124,7 @@ public final class RequestPublishers { // The ByteBufferIterator will iterate over the byte[] arrays in // the content one at the time. // - class ByteBufferIterator implements Iterator { + private final class ByteBufferIterator implements CheckedIterator { final ConcurrentLinkedQueue buffers = new ConcurrentLinkedQueue<>(); final Iterator iterator = content.iterator(); @Override @@ -166,13 +169,9 @@ public final class RequestPublishers { } } - public Iterator iterator() { - return new ByteBufferIterator(); - } - @Override public void subscribe(Flow.Subscriber subscriber) { - Iterable iterable = this::iterator; + CheckedIterable iterable = () -> new ByteBufferIterator(); var delegate = new PullPublisher<>(iterable); delegate.subscribe(subscriber); } @@ -202,13 +201,13 @@ public final class RequestPublishers { public static class StringPublisher extends ByteArrayPublisher { public StringPublisher(String content, Charset charset) { - super(content.getBytes(charset)); + super(content.getBytes(Objects.requireNonNull(charset))); // Implicit null check on `content` } } public static class EmptyPublisher implements BodyPublisher { private final Flow.Publisher delegate = - new PullPublisher(Collections.emptyList(), null); + new PullPublisher<>(CheckedIterable.fromIterable(Collections.emptyList()), null); @Override public long contentLength() { @@ -290,7 +289,7 @@ public final class RequestPublishers { /** * Reads one buffer ahead all the time, blocking in hasNext() */ - public static class StreamIterator implements Iterator { + private static final class StreamIterator implements CheckedIterator { final InputStream is; final Supplier bufSupplier; private volatile boolean eof; @@ -331,20 +330,8 @@ public final class RequestPublishers { return n; } - /** - * Close stream in this instance. - * UncheckedIOException may be thrown if IOE happens at InputStream::close. - */ - private void closeStream() { - try { - is.close(); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - } - @Override - public boolean hasNext() { + public boolean hasNext() throws IOException { stateLock.lock(); try { return hasNext0(); @@ -353,7 +340,7 @@ public final class RequestPublishers { } } - private boolean hasNext0() { + private boolean hasNext0() throws IOException { if (need2Read) { try { haveNext = read() != -1; @@ -363,10 +350,10 @@ public final class RequestPublishers { } catch (IOException e) { haveNext = false; need2Read = false; - throw new UncheckedIOException(e); + throw e; } finally { if (!haveNext) { - closeStream(); + is.close(); } } } @@ -374,7 +361,7 @@ public final class RequestPublishers { } @Override - public ByteBuffer next() { + public ByteBuffer next() throws IOException { stateLock.lock(); try { if (!hasNext()) { @@ -398,18 +385,23 @@ public final class RequestPublishers { @Override public void subscribe(Flow.Subscriber subscriber) { - PullPublisher publisher; - InputStream is = streamSupplier.get(); - if (is == null) { - Throwable t = new IOException("streamSupplier returned null"); - publisher = new PullPublisher<>(null, t); - } else { - publisher = new PullPublisher<>(iterableOf(is), null); + InputStream is = null; + Exception exception = null; + try { + is = streamSupplier.get(); + if (is == null) { + exception = new IOException("Stream supplier returned null"); + } + } catch (Exception cause) { + exception = new IOException("Stream supplier has failed", cause); } + PullPublisher publisher = exception != null + ? new PullPublisher<>(null, exception) + : new PullPublisher<>(iterableOf(is), null); publisher.subscribe(subscriber); } - protected Iterable iterableOf(InputStream is) { + private CheckedIterable iterableOf(InputStream is) { return () -> new StreamIterator(is); } @@ -442,13 +434,13 @@ public final class RequestPublishers { @Override public void subscribe(Flow.Subscriber subscriber) { - Iterable iterable = () -> new FileChannelIterator(channel, position, limit); + CheckedIterable iterable = () -> new FileChannelIterator(channel, position, limit); new PullPublisher<>(iterable).subscribe(subscriber); } } - private static final class FileChannelIterator implements Iterator { + private static final class FileChannelIterator implements CheckedIterator { private final FileChannel channel; @@ -470,7 +462,7 @@ public final class RequestPublishers { } @Override - public ByteBuffer next() { + public ByteBuffer next() throws IOException { if (!hasNext()) { throw new NoSuchElementException(); } @@ -487,7 +479,7 @@ public final class RequestPublishers { } } catch (IOException ioe) { terminated = true; - throw new UncheckedIOException(ioe); + throw ioe; } return buffer.flip(); } diff --git a/test/jdk/java/net/httpclient/FileChannelPublisherTest.java b/test/jdk/java/net/httpclient/FileChannelPublisherTest.java index 5b064efa078..4bc3c3ae315 100644 --- a/test/jdk/java/net/httpclient/FileChannelPublisherTest.java +++ b/test/jdk/java/net/httpclient/FileChannelPublisherTest.java @@ -531,9 +531,8 @@ class FileChannelPublisherTest { // Verifying the client failure LOGGER.log("Verifying the client failure"); - Exception requestFailure0 = assertThrows(ExecutionException.class, () -> responseFutureRef.get().get()); - Exception requestFailure1 = assertInstanceOf(UncheckedIOException.class, requestFailure0.getCause()); - assertInstanceOf(ClosedChannelException.class, requestFailure1.getCause()); + Exception requestFailure = assertThrows(ExecutionException.class, () -> responseFutureRef.get().get()); + assertInstanceOf(ClosedChannelException.class, requestFailure.getCause()); verifyServerIncompleteRead(pair, fileLength); @@ -578,9 +577,8 @@ class FileChannelPublisherTest { // Verifying the client failure LOGGER.log("Verifying the client failure"); Exception requestFailure0 = assertThrows(ExecutionException.class, responseFuture::get); - Exception requestFailure1 = assertInstanceOf(UncheckedIOException.class, requestFailure0.getCause()); - Exception requestFailure2 = assertInstanceOf(IOException.class, requestFailure1.getCause()); - String requestFailure2Message = requestFailure2.getMessage(); + Exception requestFailure1 = assertInstanceOf(IOException.class, requestFailure0.getCause()); + String requestFailure2Message = requestFailure1.getMessage(); assertTrue( requestFailure2Message.contains("Unexpected EOF"), "unexpected message: " + requestFailure2Message); diff --git a/test/jdk/java/net/httpclient/HttpRequestBodyPublishers/ByteBufferUtils.java b/test/jdk/java/net/httpclient/HttpRequestBodyPublishers/ByteBufferUtils.java new file mode 100644 index 00000000000..29276dc7b60 --- /dev/null +++ b/test/jdk/java/net/httpclient/HttpRequestBodyPublishers/ByteBufferUtils.java @@ -0,0 +1,91 @@ +/* + * Copyright (c) 2025, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Objects; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +public final class ByteBufferUtils { + + private ByteBufferUtils() {} + + public static void assertEquals(ByteBuffer expectedBuffer, ByteBuffer actualBuffer, String message) { + assertEquals(bytes(expectedBuffer), bytes(actualBuffer), message); + } + + public static void assertEquals(byte[] expectedBytes, ByteBuffer actualBuffer, String message) { + assertEquals(expectedBytes, bytes(actualBuffer), message); + } + + public static void assertEquals(byte[] expectedBytes, byte[] actualBytes, String message) { + Objects.requireNonNull(expectedBytes); + Objects.requireNonNull(actualBytes); + int mismatchIndex = Arrays.mismatch(expectedBytes, actualBytes); + if (mismatchIndex >= 0) { + Byte expectedByte = mismatchIndex >= expectedBytes.length ? null : expectedBytes[mismatchIndex]; + Byte actualByte = mismatchIndex >= actualBytes.length ? null : actualBytes[mismatchIndex]; + String extendedMessage = String.format( + "%s" + + "array contents differ at index [%s], expected: <%s> but was: <%s>%n" + + "expected: %s%n" + + "actual: %s%n", + message == null ? "" : (message + ": "), + mismatchIndex, expectedByte, actualByte, + prettyPrintBytes(expectedBytes), + prettyPrintBytes(actualBytes)); + throw new AssertionError(extendedMessage); + } + } + + private static byte[] bytes(ByteBuffer buffer) { + byte[] bytes = new byte[buffer.limit()]; + buffer.get(bytes); + return bytes; + } + + private static String prettyPrintBytes(byte[] bytes) { + return IntStream.range(0, bytes.length) + .mapToObj(i -> "" + bytes[i]) + .collect(Collectors.joining(", ", "[", "]")); + } + + public static int findLengthExceedingMaxMemory() { + long memoryLength = Runtime.getRuntime().maxMemory(); + double length = Math.ceil(1.5D * memoryLength); + if (length < 1 || length > Integer.MAX_VALUE) { + throw new IllegalArgumentException("Bogus or excessive memory: " + memoryLength); + } + return (int) length; + } + + public static byte[] byteArrayOfLength(int length) { + byte[] bytes = new byte[length]; + for (int i = 0; i < length; i++) { + bytes[i] = (byte) i; + } + return bytes; + } + +} diff --git a/test/jdk/java/net/httpclient/HttpRequestBodyPublishers/FromPublisherTest.java b/test/jdk/java/net/httpclient/HttpRequestBodyPublishers/FromPublisherTest.java new file mode 100644 index 00000000000..f05eab0a2e5 --- /dev/null +++ b/test/jdk/java/net/httpclient/HttpRequestBodyPublishers/FromPublisherTest.java @@ -0,0 +1,100 @@ +/* + * Copyright (c) 2025, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import java.net.http.HttpRequest; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/* + * @test + * @bug 8364733 + * @summary Verify all specified `HttpRequest.BodyPublishers::fromPublisher` behavior + * @build RecordingSubscriber + * @run junit FromPublisherTest + */ + +class FromPublisherTest { + + @Test + void testNullPublisher() { + assertThrows(NullPointerException.class, () -> HttpRequest.BodyPublishers.fromPublisher(null)); + assertThrows(NullPointerException.class, () -> HttpRequest.BodyPublishers.fromPublisher(null, 1)); + } + + @ParameterizedTest + @ValueSource(longs = {0L, -1L, Long.MIN_VALUE}) + void testInvalidContentLength(long contentLength) { + IllegalArgumentException exception = assertThrows( + IllegalArgumentException.class, + () -> HttpRequest.BodyPublishers.fromPublisher(null, contentLength)); + String exceptionMessage = exception.getMessage(); + assertTrue( + exceptionMessage.contains("non-positive contentLength"), + "Unexpected exception message: " + exceptionMessage); + } + + @ParameterizedTest + @ValueSource(longs = {1, 2, 3, 4}) + void testValidContentLength(long contentLength) { + HttpRequest.BodyPublisher publisher = + HttpRequest.BodyPublishers.fromPublisher(HttpRequest.BodyPublishers.noBody(), contentLength); + assertEquals(contentLength, publisher.contentLength()); + } + + @Test + void testNoContentLength() { + HttpRequest.BodyPublisher publisher = + HttpRequest.BodyPublishers.fromPublisher(HttpRequest.BodyPublishers.noBody()); + assertEquals(-1, publisher.contentLength()); + } + + @Test + void testNullSubscriber() { + HttpRequest.BodyPublisher publisher = + HttpRequest.BodyPublishers.fromPublisher(HttpRequest.BodyPublishers.noBody()); + assertThrows(NullPointerException.class, () -> publisher.subscribe(null)); + } + + @Test + void testDelegation() throws InterruptedException { + BlockingQueue publisherInvocations = new LinkedBlockingQueue<>(); + HttpRequest.BodyPublisher publisher = HttpRequest.BodyPublishers.fromPublisher(subscriber -> { + publisherInvocations.add("subscribe"); + publisherInvocations.add(subscriber); + }); + RecordingSubscriber subscriber = new RecordingSubscriber(); + publisher.subscribe(subscriber); + assertEquals("subscribe", publisherInvocations.take()); + assertEquals(subscriber, publisherInvocations.take()); + assertTrue(subscriber.invocations.isEmpty()); + } + +} diff --git a/test/jdk/java/net/httpclient/HttpRequestBodyPublishers/NoBodyTest.java b/test/jdk/java/net/httpclient/HttpRequestBodyPublishers/NoBodyTest.java new file mode 100644 index 00000000000..f58e9505c9a --- /dev/null +++ b/test/jdk/java/net/httpclient/HttpRequestBodyPublishers/NoBodyTest.java @@ -0,0 +1,57 @@ +/* + * Copyright (c) 2025, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ + +import org.junit.jupiter.api.Test; + +import java.net.http.HttpRequest; +import java.util.concurrent.Flow; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +/* + * @test + * @bug 8364733 + * @summary Verify all specified `HttpRequest.BodyPublishers::noBody` behavior + * @build RecordingSubscriber + * @run junit NoBodyTest + */ + +class NoBodyTest { + + @Test + void test() throws InterruptedException { + + // Create the publisher + HttpRequest.BodyPublisher publisher = HttpRequest.BodyPublishers.noBody(); + + // Subscribe + RecordingSubscriber subscriber = new RecordingSubscriber(); + Flow.Subscription subscription = subscriber.verifyAndSubscribe(publisher, 0); + + // Verify the state after `request()` + subscription.request(Long.MAX_VALUE); + assertEquals("onComplete", subscriber.invocations.take()); + + } + +} diff --git a/test/jdk/java/net/httpclient/HttpRequestBodyPublishers/OfByteArrayTest.java b/test/jdk/java/net/httpclient/HttpRequestBodyPublishers/OfByteArrayTest.java new file mode 100644 index 00000000000..9973272b435 --- /dev/null +++ b/test/jdk/java/net/httpclient/HttpRequestBodyPublishers/OfByteArrayTest.java @@ -0,0 +1,134 @@ +/* + * Copyright (c) 2025, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; + +import java.net.http.HttpRequest; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.concurrent.Flow; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +/* + * @test + * @bug 8364733 + * @summary Verify all specified `HttpRequest.BodyPublishers::ofByteArray` behavior + * @build RecordingSubscriber + * @run junit OfByteArrayTest + * + * @comment Using `main/othervm` to initiate tests that depend on a custom-configured JVM + * @run main/othervm -Djdk.httpclient.bufsize=-1 OfByteArrayTest testInvalidBufferSize + * @run main/othervm -Djdk.httpclient.bufsize=0 OfByteArrayTest testInvalidBufferSize + * @run main/othervm -Djdk.httpclient.bufsize=3 OfByteArrayTest testChunking "" 0 0 "" + * @run main/othervm -Djdk.httpclient.bufsize=3 OfByteArrayTest testChunking a 0 0 "" + * @run main/othervm -Djdk.httpclient.bufsize=3 OfByteArrayTest testChunking a 1 0 "" + * @run main/othervm -Djdk.httpclient.bufsize=3 OfByteArrayTest testChunking a 0 1 a + * @run main/othervm -Djdk.httpclient.bufsize=3 OfByteArrayTest testChunking ab 0 1 a + * @run main/othervm -Djdk.httpclient.bufsize=3 OfByteArrayTest testChunking ab 1 1 b + * @run main/othervm -Djdk.httpclient.bufsize=3 OfByteArrayTest testChunking ab 0 2 ab + * @run main/othervm -Djdk.httpclient.bufsize=1 OfByteArrayTest testChunking abc 0 3 a:b:c + * @run main/othervm -Djdk.httpclient.bufsize=2 OfByteArrayTest testChunking abc 0 3 ab:c + * @run main/othervm -Djdk.httpclient.bufsize=2 OfByteArrayTest testChunking abcdef 2 4 cd:ef + */ + +public class OfByteArrayTest { + + private static final Charset CHARSET = StandardCharsets.US_ASCII; + + @Test + void testNullContent() { + assertThrows(NullPointerException.class, () -> HttpRequest.BodyPublishers.ofByteArray(null)); + assertThrows(NullPointerException.class, () -> HttpRequest.BodyPublishers.ofByteArray(null, 1, 2)); + } + + @ParameterizedTest + @CsvSource({ + "abc,-1,1", // Negative offset + "abc,1,-1", // Negative length + "'',1,1", // Offset overflow on empty string + "a,2,1", // Offset overflow + "'',0,1", // Length overflow on empty string + "a,0,2", // Length overflow + }) + void testInvalidOffsetOrLength(String contentText, int offset, int length) { + byte[] content = contentText.getBytes(CHARSET); + assertThrows( + IndexOutOfBoundsException.class, + () -> HttpRequest.BodyPublishers.ofByteArray(content, offset, length)); + } + + /** + * Initiates tests that depend on a custom-configured JVM. + */ + public static void main(String[] args) throws InterruptedException { + switch (args[0]) { + case "testInvalidBufferSize" -> testInvalidBufferSize(); + case "testChunking" -> testChunking( + parseStringArg(args[1]), + Integer.parseInt(args[2]), + Integer.parseInt(args[3]), + parseStringArg(args[4])); + default -> throw new IllegalArgumentException("Unexpected arguments: " + List.of(args)); + } + } + + private static String parseStringArg(String arg) { + return arg == null || arg.trim().equals("\"\"") ? "" : arg; + } + + private static void testInvalidBufferSize() { + assertThrows(IllegalArgumentException.class, () -> HttpRequest.BodyPublishers.ofByteArray(new byte[1])); + } + + private static void testChunking( + String contentText, int offset, int length, String expectedBuffersText) + throws InterruptedException { + + // Create the publisher + byte[] content = contentText.getBytes(CHARSET); + HttpRequest.BodyPublisher publisher = HttpRequest.BodyPublishers.ofByteArray(content, offset, length); + + // Subscribe + RecordingSubscriber subscriber = new RecordingSubscriber(); + Flow.Subscription subscription = subscriber.verifyAndSubscribe(publisher, length); + + // Verify the state after `request()` + String[] expectedBuffers = expectedBuffersText.isEmpty() ? new String[0] : expectedBuffersText.split(":"); + subscription.request(Long.MAX_VALUE); + for (int bufferIndex = 0; bufferIndex < expectedBuffers.length; bufferIndex++) { + assertEquals("onNext", subscriber.invocations.take()); + String actualBuffer = CHARSET.decode((ByteBuffer) subscriber.invocations.take()).toString(); + String expectedBuffer = expectedBuffers[bufferIndex]; + assertEquals(expectedBuffer, actualBuffer, "buffer mismatch at index " + bufferIndex); + } + assertEquals("onComplete", subscriber.invocations.take()); + + } + +} diff --git a/test/jdk/java/net/httpclient/HttpRequestBodyPublishers/OfByteArraysTest.java b/test/jdk/java/net/httpclient/HttpRequestBodyPublishers/OfByteArraysTest.java new file mode 100644 index 00000000000..6b852407907 --- /dev/null +++ b/test/jdk/java/net/httpclient/HttpRequestBodyPublishers/OfByteArraysTest.java @@ -0,0 +1,317 @@ +/* + * Copyright (c) 2025, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; + +import java.net.http.HttpRequest; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.Flow; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertThrows; + +/* + * @test + * @bug 8364733 + * @summary Verify all specified `HttpRequest.BodyPublishers::ofByteArrays` behavior + * @build ByteBufferUtils + * RecordingSubscriber + * @run junit OfByteArraysTest + * + * @comment Using `main/othervm` to initiate tests that depend on a custom-configured JVM + * @run main/othervm -Xmx64m OfByteArraysTest testOOM + */ + +public class OfByteArraysTest { + + @ParameterizedTest + @ValueSource(ints = {0, 1, 2, 3}) + void testIteratorOfLength(int length) throws InterruptedException { + + // Create the publisher + List buffers = IntStream + .range(0, length) + .mapToObj(i -> new byte[]{(byte) i}) + .toList(); + HttpRequest.BodyPublisher publisher = HttpRequest.BodyPublishers.ofByteArrays(buffers::iterator); + + // Subscribe + RecordingSubscriber subscriber = new RecordingSubscriber(); + Flow.Subscription subscription = subscriber.verifyAndSubscribe(publisher, -1); + + // Verify the state after `request()` + subscription.request(Long.MAX_VALUE); + for (int bufferIndex = 0; bufferIndex < length; bufferIndex++) { + assertEquals("onNext", subscriber.invocations.take()); + byte[] expectedBuffer = buffers.get(bufferIndex); + ByteBuffer actualBuffer = (ByteBuffer) subscriber.invocations.take(); + ByteBufferUtils.assertEquals(expectedBuffer, actualBuffer, "buffer mismatch at index " + bufferIndex); + } + assertEquals("onComplete", subscriber.invocations.take()); + + } + + @Test + void testDifferentIterators() throws InterruptedException { + + // Create a publisher using an iterable that returns a different iterator at each invocation + byte[] buffer1 = ByteBufferUtils.byteArrayOfLength(9); + byte[] buffer2 = ByteBufferUtils.byteArrayOfLength(9); + int[] iteratorRequestCount = {0}; + Iterable iterable = () -> switch (++iteratorRequestCount[0]) { + case 1 -> List.of(buffer1).iterator(); + case 2 -> List.of(buffer2).iterator(); + default -> throw new AssertionError(); + }; + HttpRequest.BodyPublisher publisher = HttpRequest.BodyPublishers.ofByteArrays(iterable); + + // Subscribe twice (to force two `Iterable::iterator` invocations) + RecordingSubscriber subscriber = new RecordingSubscriber(); + Flow.Subscription subscription1 = subscriber.verifyAndSubscribe(publisher, -1); + Flow.Subscription subscription2 = subscriber.verifyAndSubscribe(publisher, -1); + + // Drain emissions until completion, and verify the content + byte[] actualBuffer1 = subscriber.drainToByteArray(subscription1, Long.MAX_VALUE); + byte[] actualBuffer2 = subscriber.drainToByteArray(subscription2, Long.MAX_VALUE); + ByteBufferUtils.assertEquals(buffer1, actualBuffer1, null); + ByteBufferUtils.assertEquals(buffer2, actualBuffer2, null); + + } + + @Test + void testNullIterable() { + assertThrows(NullPointerException.class, () -> HttpRequest.BodyPublishers.ofByteArrays(null)); + } + + @Test + void testNullIterator() throws InterruptedException { + + // Create the publisher + HttpRequest.BodyPublisher publisher = HttpRequest.BodyPublishers.ofByteArrays(() -> null); + + // Subscribe + RecordingSubscriber subscriber = new RecordingSubscriber(); + Flow.Subscription subscription = subscriber.verifyAndSubscribe(publisher, -1); + + // Verify the NPE + subscription.request(1); + assertEquals("onError", subscriber.invocations.take()); + assertInstanceOf(NullPointerException.class, subscriber.invocations.take()); + + } + + @Test + void testNullArray() throws InterruptedException { + + // Create the publisher + List iterable = new ArrayList<>(); + iterable.add(null); + HttpRequest.BodyPublisher publisher = HttpRequest.BodyPublishers.ofByteArrays(iterable); + + // Subscribe + RecordingSubscriber subscriber = new RecordingSubscriber(); + Flow.Subscription subscription = subscriber.verifyAndSubscribe(publisher, -1); + + // Verify the NPE + subscription.request(1); + assertEquals("onError", subscriber.invocations.take()); + assertInstanceOf(NullPointerException.class, subscriber.invocations.take()); + + } + + @Test + void testThrowingIterable() throws InterruptedException { + + // Create the publisher + RuntimeException exception = new RuntimeException("failure for `testIteratorCreationException`"); + HttpRequest.BodyPublisher publisher = HttpRequest.BodyPublishers.ofByteArrays(() -> { + throw exception; + }); + + // Subscribe + RecordingSubscriber subscriber = new RecordingSubscriber(); + Flow.Subscription subscription = subscriber.verifyAndSubscribe(publisher, -1); + + // Verify the failure + subscription.request(1); + assertEquals("onError", subscriber.invocations.take()); + Exception actualException = (Exception) subscriber.invocations.take(); + assertSame(exception, actualException); + + } + + static Stream testThrowingIteratorArgs() { + RuntimeException hasNextException = new RuntimeException("failure for `hasNext`"); + RuntimeException nextException = new RuntimeException("failure for `next`"); + return Stream.of( + Arguments.of(0, hasNextException, null, hasNextException), + Arguments.of(0, hasNextException, nextException, hasNextException), + Arguments.of(1, hasNextException, null, hasNextException), + Arguments.of(1, hasNextException, nextException, hasNextException), + Arguments.of(1, null, nextException, nextException)); + } + + @ParameterizedTest + @MethodSource("testThrowingIteratorArgs") + void testThrowingIterator( + int exceptionIndex, RuntimeException hasNextException, RuntimeException nextException, Exception expectedException) + throws InterruptedException { + + // Create the publisher + IteratorThrowingAtEnd iterator = + new IteratorThrowingAtEnd(exceptionIndex, hasNextException, nextException); + HttpRequest.BodyPublisher publisher = HttpRequest.BodyPublishers.ofByteArrays(() -> iterator); + + // Subscribe + RecordingSubscriber subscriber = new RecordingSubscriber(); + Flow.Subscription subscription = subscriber.verifyAndSubscribe(publisher, -1); + + // Drain successful emissions + subscription.request(Long.MAX_VALUE); + for (int itemIndex = 0; itemIndex < exceptionIndex; itemIndex++) { + assertEquals("onNext", subscriber.invocations.take()); + ByteBuffer actualBuffer = (ByteBuffer) subscriber.invocations.take(); + ByteBuffer expectedBuffer = ByteBuffer.wrap(iterator.content, itemIndex, 1); + ByteBufferUtils.assertEquals(expectedBuffer, actualBuffer, null); + } + + // Verify the result + if (expectedException == null) { + assertEquals("onComplete", subscriber.invocations.take()); + } else { + assertEquals("onError", subscriber.invocations.take()); + Exception actualException = (Exception) subscriber.invocations.take(); + assertSame(expectedException, actualException); + } + + } + + private static final class IteratorThrowingAtEnd implements Iterator { + + private final byte[] content; + + private final RuntimeException hasNextException; + + private final RuntimeException nextException; + + private int position; + + private IteratorThrowingAtEnd( + int length, + RuntimeException hasNextException, + RuntimeException nextException) { + this.content = ByteBufferUtils.byteArrayOfLength(length); + this.hasNextException = hasNextException; + this.nextException = nextException; + } + + @Override + public synchronized boolean hasNext() { + if (position >= content.length && hasNextException != null) { + throw hasNextException; + } + // We always instruct to proceed, so `next()` can throw + return true; + } + + @Override + public synchronized byte[] next() { + if (position < content.length) { + return new byte[]{content[position++]}; + } + assertNotNull(nextException); + throw nextException; + } + + } + + /** + * Initiates tests that depend on a custom-configured JVM. + */ + public static void main(String[] args) throws Exception { + if ("testOOM".equals(args[0])) { + testOOM(); + } else { + throw new IllegalArgumentException("Unknown arguments: " + List.of(args)); + } + } + + private static void testOOM() throws Exception { + + // Create the publisher + int length = ByteBufferUtils.findLengthExceedingMaxMemory(); + Iterable iterable = createIterableOfLength(length); + HttpRequest.BodyPublisher publisher = HttpRequest.BodyPublishers.ofByteArrays(iterable); + + // Subscribe + RecordingSubscriber subscriber = new RecordingSubscriber(); + Flow.Subscription subscription = subscriber.verifyAndSubscribe(publisher, -1); + + // Drain emissions until completion, and verify the received content length + final int[] readLength = {0}; + subscriber.drainToAccumulator(subscription, 1, buffer -> readLength[0] += buffer.limit()); + assertEquals(length, readLength[0]); + + } + + private static Iterable createIterableOfLength(int length) { + return () -> new Iterator<>() { + + // Instead of emitting `length` at once, doing it gradually using a buffer to avoid OOM. + private final byte[] buffer = new byte[8192]; + + private volatile int remainingLength = length; + + @Override + public boolean hasNext() { + return remainingLength > 0; + } + + @Override + public synchronized byte[] next() { + if (remainingLength >= buffer.length) { + remainingLength -= buffer.length; + return buffer; + } else { + byte[] remainingBuffer = new byte[remainingLength]; + remainingLength = 0; + return remainingBuffer; + } + } + + }; + } + +} diff --git a/test/jdk/java/net/httpclient/HttpRequestBodyPublishers/OfFileTest.java b/test/jdk/java/net/httpclient/HttpRequestBodyPublishers/OfFileTest.java new file mode 100644 index 00000000000..7705faef109 --- /dev/null +++ b/test/jdk/java/net/httpclient/HttpRequestBodyPublishers/OfFileTest.java @@ -0,0 +1,290 @@ +/* + * Copyright (c) 2025, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.OutputStream; +import java.io.UncheckedIOException; +import java.net.http.HttpRequest; +import java.nio.ByteBuffer; +import java.nio.file.FileSystem; +import java.nio.file.FileSystems; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Flow; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/* + * @test + * @bug 8364733 + * @summary Verify all specified `HttpRequest.BodyPublishers::ofFile` behavior + * @build ByteBufferUtils + * RecordingSubscriber + * @run junit OfFileTest + * + * @comment Using `main/othervm` to initiate tests that depend on a custom-configured JVM + * @run main/othervm -Xmx64m OfFileTest testOOM + */ + +public class OfFileTest { + + private static final Path DEFAULT_FS_DIR = Path.of(System.getProperty("user.dir", ".")); + + private static final FileSystem ZIP_FS = zipFs(); + + private static final Path ZIP_FS_DIR = ZIP_FS.getRootDirectories().iterator().next(); + + private static final List PARENT_DIRS = List.of(DEFAULT_FS_DIR, ZIP_FS_DIR); + + private static FileSystem zipFs() { + try { + Path zipFile = DEFAULT_FS_DIR.resolve("file.zip"); + return FileSystems.newFileSystem(zipFile, Map.of("create", "true")); + } catch (IOException ioe) { + throw new UncheckedIOException(ioe); + } + } + + @AfterAll + static void closeZipFs() throws IOException { + ZIP_FS.close(); + } + + static List parentDirs() { + return PARENT_DIRS; + } + + @Test + void testNullPath() { + assertThrows(NullPointerException.class, () -> HttpRequest.BodyPublishers.ofFile(null)); + } + + @ParameterizedTest + @MethodSource("parentDirs") + void testNonExistentPath(Path parentDir) { + Path nonExistentPath = createFilePath(parentDir, "testNonExistentPath"); + assertThrows(FileNotFoundException.class, () -> HttpRequest.BodyPublishers.ofFile(nonExistentPath)); + } + + @ParameterizedTest + @MethodSource("parentDirs") + void testNonExistentPathAtSubscribe(Path parentDir) throws Exception { + + // Create the publisher + byte[] fileBytes = ByteBufferUtils.byteArrayOfLength(3); + Path filePath = createFile(parentDir, "testNonExistentPathAtSubscribe", fileBytes); + HttpRequest.BodyPublisher publisher = HttpRequest.BodyPublishers.ofFile(filePath); + + // Delete the file + Files.delete(filePath); + + // Subscribe + RecordingSubscriber subscriber = new RecordingSubscriber(); + Flow.Subscription subscription = subscriber.verifyAndSubscribe(publisher, fileBytes.length); + + // Verify the state after `request()` + subscription.request(1); + assertEquals("onError", subscriber.invocations.take()); + FileNotFoundException actualException = (FileNotFoundException) subscriber.invocations.take(); + String actualExceptionMessage = actualException.getMessage(); + assertTrue( + actualExceptionMessage.contains("Not a regular file"), + "Unexpected message: " + actualExceptionMessage); + + } + + @ParameterizedTest + @MethodSource("parentDirs") + void testIrregularFile(Path parentDir) throws Exception { + + // Create the publisher + HttpRequest.BodyPublisher publisher = HttpRequest.BodyPublishers.ofFile(parentDir); + + // Subscribe + RecordingSubscriber subscriber = new RecordingSubscriber(); + Flow.Subscription subscription = subscriber.verifyAndSubscribe(publisher, Files.size(parentDir)); + + // Verify the state after `request()` + subscription.request(1); + assertEquals("onError", subscriber.invocations.take()); + FileNotFoundException actualException = (FileNotFoundException) subscriber.invocations.take(); + String actualExceptionMessage = actualException.getMessage(); + assertTrue( + actualExceptionMessage.contains("Not a regular file"), + "Unexpected message: " + actualExceptionMessage); + + } + + /** + * A big enough file length to observe the effects of file + * modification whilst the file is getting read. + */ + private static final int BIG_FILE_LENGTH = 8 * 1024 * 1024; // 8 MiB + + @ParameterizedTest + @MethodSource("parentDirs") + void testFileModificationWhileReading(Path parentDir) throws Exception { + + // ZIP file system (sadly?) consumes the entire content at open. + // Hence, we cannot observe the effect of file modification while reading. + if (parentDir == ZIP_FS_DIR) { + return; + } + + // Create the publisher + byte[] fileBytes = ByteBufferUtils.byteArrayOfLength(BIG_FILE_LENGTH); + Path filePath = createFile(parentDir, "testFileModificationWhileReading", fileBytes); + HttpRequest.BodyPublisher publisher = HttpRequest.BodyPublishers.ofFile(filePath); + + // Subscribe + RecordingSubscriber subscriber = new RecordingSubscriber(); + Flow.Subscription subscription = subscriber.verifyAndSubscribe(publisher, fileBytes.length); + + // Verify the state after the 1st `request()` + subscription.request(1); + assertEquals("onNext", subscriber.invocations.take()); + ByteBuffer buffer1 = (ByteBuffer) subscriber.invocations.take(); + assertTrue(buffer1.limit() > 0, "unexpected empty buffer"); + List buffers = new ArrayList<>(); + buffers.add(buffer1); + + // Truncate the file + Files.write(filePath, new byte[0]); + + // Drain emissions until completion, and verify the content + byte[] readBytes = subscriber.drainToByteArray(subscription, Long.MAX_VALUE, buffers); + assertTrue( + readBytes.length < fileBytes.length, + "was expecting less than the total amount (%s bytes), found: %s".formatted( + fileBytes.length, readBytes.length)); + ByteBuffer expectedReadBytes = ByteBuffer.wrap(fileBytes, 0, readBytes.length); + ByteBufferUtils.assertEquals(expectedReadBytes, ByteBuffer.wrap(readBytes), null); + + } + + static Stream testFileOfLengthParams() { + return PARENT_DIRS + .stream() + .flatMap(parentDir -> Stream + .of(0, 1, 2, 3, BIG_FILE_LENGTH) + .map(fileLength -> Arguments.of(parentDir, fileLength))); + } + + @ParameterizedTest + @MethodSource("testFileOfLengthParams") + void testFileOfLength(Path parentDir, int fileLength) throws Exception { + + // Create the publisher + byte[] fileBytes = ByteBufferUtils.byteArrayOfLength(fileLength); + Path filePath = createFile(parentDir, "testFileOfLength", fileBytes); + HttpRequest.BodyPublisher publisher = HttpRequest.BodyPublishers.ofFile(filePath); + + // Subscribe + RecordingSubscriber subscriber = new RecordingSubscriber(); + Flow.Subscription subscription = subscriber.verifyAndSubscribe(publisher, fileBytes.length); + + // Drain emissions until completion, and verify the received content + byte[] readBytes = subscriber.drainToByteArray(subscription, Long.MAX_VALUE); + ByteBufferUtils.assertEquals(fileBytes, readBytes, null); + + } + + /** + * Initiates tests that depend on a custom-configured JVM. + */ + public static void main(String[] args) throws Exception { + if ("testOOM".equals(args[0])) { + testOOM(); + } else { + throw new IllegalArgumentException("Unknown arguments: " + List.of(args)); + } + } + + private static void testOOM() { + for (Path parentDir : PARENT_DIRS) { + try { + testOOM(parentDir); + } catch (Exception exception) { + throw new AssertionError("failed for parent directory: " + parentDir, exception); + } + } + } + + private static void testOOM(Path parentDir) throws Exception { + + // Create the publisher + int fileLength = ByteBufferUtils.findLengthExceedingMaxMemory(); + Path filePath = createFileOfLength(parentDir, "testOOM", fileLength); + HttpRequest.BodyPublisher publisher = HttpRequest.BodyPublishers.ofFile(filePath); + + // Subscribe + RecordingSubscriber subscriber = new RecordingSubscriber(); + Flow.Subscription subscription = subscriber.verifyAndSubscribe(publisher, fileLength); + + // Drain emissions until completion, and verify the received content length + final int[] readLength = {0}; + subscriber.drainToAccumulator(subscription, 1, buffer -> readLength[0] += buffer.limit()); + assertEquals(fileLength, readLength[0]); + + } + + private static Path createFileOfLength(Path parentDir, String identifier, int fileLength) throws IOException { + Path filePath = createFilePath(parentDir, identifier); + try (OutputStream fileStream = Files.newOutputStream(filePath)) { + byte[] buffer = ByteBufferUtils.byteArrayOfLength(8192); + for (int writtenLength = 0; writtenLength < fileLength; writtenLength += buffer.length) { + int remainingLength = fileLength - writtenLength; + byte[] effectiveBuffer = remainingLength < buffer.length + ? ByteBufferUtils.byteArrayOfLength(remainingLength) + : buffer; + fileStream.write(effectiveBuffer); + } + } + return filePath; + } + + private static Path createFile(Path parentDir, String identifier, byte[] fileBytes) throws IOException { + Path filePath = createFilePath(parentDir, identifier); + Files.write(filePath, fileBytes); + return filePath; + } + + private static Path createFilePath(Path parentDir, String identifier) { + String fileName = identifier.replaceAll("\\W*", ""); + return parentDir.resolve(fileName); + } + +} diff --git a/test/jdk/java/net/httpclient/HttpRequestBodyPublishers/OfInputStreamTest.java b/test/jdk/java/net/httpclient/HttpRequestBodyPublishers/OfInputStreamTest.java new file mode 100644 index 00000000000..7688c1674ee --- /dev/null +++ b/test/jdk/java/net/httpclient/HttpRequestBodyPublishers/OfInputStreamTest.java @@ -0,0 +1,226 @@ +/* + * Copyright (c) 2025, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.http.HttpRequest; +import java.util.List; +import java.util.concurrent.Flow; +import java.util.function.Supplier; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertThrows; + +/* + * @test + * @bug 8364733 + * @summary Verify all specified `HttpRequest.BodyPublishers::ofInputStream` behavior + * @build ByteBufferUtils + * RecordingSubscriber + * @run junit OfInputStreamTest + * + * @comment Using `main/othervm` to initiate tests that depend on a custom-configured JVM + * @run main/othervm -Xmx64m OfInputStreamTest testOOM + */ + +public class OfInputStreamTest { + + @Test + void testNullInputStreamSupplier() { + assertThrows(NullPointerException.class, () -> HttpRequest.BodyPublishers.ofInputStream(null)); + } + + @Test + void testThrowingInputStreamSupplier() throws InterruptedException { + + // Create the publisher + RuntimeException exception = new RuntimeException(); + HttpRequest.BodyPublisher publisher = HttpRequest.BodyPublishers.ofInputStream(() -> { throw exception; }); + + // Subscribe + RecordingSubscriber subscriber = new RecordingSubscriber(); + Flow.Subscription subscription = subscriber.verifyAndSubscribe(publisher, -1); + + // Verify the state after `request()` + subscription.request(1); + assertEquals("onError", subscriber.invocations.take()); + IOException actualException = (IOException) subscriber.invocations.take(); + assertEquals("Stream supplier has failed", actualException.getMessage()); + assertSame(exception, actualException.getCause()); + + } + + @Test + void testNullInputStream() throws InterruptedException { + + // Create the publisher + HttpRequest.BodyPublisher publisher = HttpRequest.BodyPublishers.ofInputStream(() -> null); + + // Subscribe + RecordingSubscriber subscriber = new RecordingSubscriber(); + Flow.Subscription subscription = subscriber.verifyAndSubscribe(publisher, -1); + + // Verify the state after `request()` + subscription.request(1); + assertEquals("onError", subscriber.invocations.take()); + IOException actualException = (IOException) subscriber.invocations.take(); + assertEquals("Stream supplier returned null", actualException.getMessage()); + + } + + @Test + void testInputStreamSupplierInvocations() throws InterruptedException { + + // Create a publisher from an `InputStream` supplier returning a different instance at each invocation + byte[] buffer1 = ByteBufferUtils.byteArrayOfLength(10); + byte[] buffer2 = ByteBufferUtils.byteArrayOfLength(10); + int[] inputStreamSupplierInvocationCount = {0}; + Supplier inputStreamSupplier = () -> + switch (++inputStreamSupplierInvocationCount[0]) { + case 1 -> new ByteArrayInputStream(buffer1); + case 2 -> new ByteArrayInputStream(buffer2); + default -> throw new AssertionError(); + }; + HttpRequest.BodyPublisher publisher = HttpRequest.BodyPublishers.ofInputStream(inputStreamSupplier); + + // Subscribe + RecordingSubscriber subscriber = new RecordingSubscriber(); + Flow.Subscription subscription1 = subscriber.verifyAndSubscribe(publisher, -1); + Flow.Subscription subscription2 = subscriber.verifyAndSubscribe(publisher, -1); + + // Drain each subscription and verify the received content + byte[] actualBuffer1 = subscriber.drainToByteArray(subscription1, Long.MAX_VALUE); + ByteBufferUtils.assertEquals(buffer1, actualBuffer1, null); + byte[] actualBuffer2 = subscriber.drainToByteArray(subscription2, Long.MAX_VALUE); + ByteBufferUtils.assertEquals(buffer2, actualBuffer2, null); + + } + + @ParameterizedTest + @ValueSource(ints = {0, 1, 2, 3}) + void testInputStreamOfLength(int length) throws InterruptedException { + + // Create the publisher + byte[] content = ByteBufferUtils.byteArrayOfLength(length); + InputStream inputStream = new ByteArrayInputStream(content); + HttpRequest.BodyPublisher publisher = HttpRequest.BodyPublishers.ofInputStream(() -> inputStream); + + // Subscribe + RecordingSubscriber subscriber = new RecordingSubscriber(); + Flow.Subscription subscription = subscriber.verifyAndSubscribe(publisher, -1); + + // Drain emissions until completion, and verify the received content + byte[] actualContent = subscriber.drainToByteArray(subscription, Long.MAX_VALUE); + ByteBufferUtils.assertEquals(content, actualContent, null); + + } + + @ParameterizedTest + @ValueSource(ints = {0, 1, 2, 3}) + void testThrowingInputStream(int exceptionIndex) throws InterruptedException { + + // Create the publisher + RuntimeException exception = new RuntimeException("failure for `read`"); + InputStream inputStream = new InputStreamThrowingOnCompletion(exceptionIndex, exception); + HttpRequest.BodyPublisher publisher = HttpRequest.BodyPublishers.ofInputStream(() -> inputStream); + + // Subscribe + RecordingSubscriber subscriber = new RecordingSubscriber(); + Flow.Subscription subscription = subscriber.verifyAndSubscribe(publisher, -1); + + // Verify the failure + subscription.request(1); + assertEquals("onError", subscriber.invocations.take()); + Exception actualException = (Exception) subscriber.invocations.take(); + assertSame(exception, actualException); + + } + + private static final class InputStreamThrowingOnCompletion extends InputStream { + + private final int length; + + private final RuntimeException exception; + + private int position; + + private InputStreamThrowingOnCompletion(int length, RuntimeException exception) { + this.length = length; + this.exception = exception; + } + + @Override + public synchronized int read() { + if (position < length) { + return position++ & 0xFF; + } + throw exception; + } + + } + + /** + * Initiates tests that depend on a custom-configured JVM. + */ + public static void main(String[] args) throws Exception { + if ("testOOM".equals(args[0])) { + testOOM(); + } else { + throw new IllegalArgumentException("Unknown arguments: " + List.of(args)); + } + } + + private static void testOOM() throws InterruptedException { + + // Create the publisher using an `InputStream` that emits content exceeding the maximum memory + int length = ByteBufferUtils.findLengthExceedingMaxMemory(); + HttpRequest.BodyPublisher publisher = + HttpRequest.BodyPublishers.ofInputStream(() -> new InputStream() { + + private int position; + + @Override + public synchronized int read() { + return position < length ? (position++ & 0xFF) : -1; + } + + }); + + // Subscribe + RecordingSubscriber subscriber = new RecordingSubscriber(); + Flow.Subscription subscription = subscriber.verifyAndSubscribe(publisher, -1); + + // Drain emissions until completion, and verify the received content length + final int[] readLength = {0}; + subscriber.drainToAccumulator(subscription, 1, buffer -> readLength[0] += buffer.limit()); + assertEquals(length, readLength[0]); + + } + +} diff --git a/test/jdk/java/net/httpclient/HttpRequestBodyPublishers/OfStringTest.java b/test/jdk/java/net/httpclient/HttpRequestBodyPublishers/OfStringTest.java new file mode 100644 index 00000000000..143b5ce17da --- /dev/null +++ b/test/jdk/java/net/httpclient/HttpRequestBodyPublishers/OfStringTest.java @@ -0,0 +1,119 @@ +/* + * Copyright (c) 2025, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; +import org.junit.jupiter.params.provider.ValueSource; + +import java.net.http.HttpRequest; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.Flow; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +/* + * @test + * @bug 8364733 + * @summary Verify all specified `HttpRequest.BodyPublishers::ofString` behavior + * @build ByteBufferUtils + * RecordingSubscriber + * @run junit OfStringTest + */ + +class OfStringTest { + + private static final Charset CHARSET = StandardCharsets.US_ASCII; + + @ParameterizedTest + @ValueSource(ints = {0, 1, 2, 3}) + void testContentOfLength(int length) throws InterruptedException { + + // Create the publisher + char[] contentChars = new char[length]; + for (int i = 0; i < length; i++) { + contentChars[i] = (char) ('a' + i); + } + String content = new String(contentChars); + HttpRequest.BodyPublisher publisher = HttpRequest.BodyPublishers.ofString(content, CHARSET); + + // Subscribe + assertEquals(length, publisher.contentLength()); + RecordingSubscriber subscriber = new RecordingSubscriber(); + publisher.subscribe(subscriber); + assertEquals("onSubscribe", subscriber.invocations.take()); + Flow.Subscription subscription = (Flow.Subscription) subscriber.invocations.take(); + + // Verify the state after `request()` + subscription.request(Long.MAX_VALUE); + if (length > 0) { + assertEquals("onNext", subscriber.invocations.take()); + String actualContent = CHARSET.decode((ByteBuffer) subscriber.invocations.take()).toString(); + assertEquals(content, actualContent); + } + assertEquals("onComplete", subscriber.invocations.take()); + + } + + @ParameterizedTest + @CsvSource({ + "a,UTF-8", + "b,UTF-16", + "ı,ISO-8859-9" + }) + void testCharset(String content, Charset charset) throws InterruptedException { + + // Create the publisher + HttpRequest.BodyPublisher publisher = HttpRequest.BodyPublishers.ofString(content, charset); + + // Subscribe + ByteBuffer expectedBuffer = charset.encode(content); + assertEquals(expectedBuffer.limit(), publisher.contentLength()); + RecordingSubscriber subscriber = new RecordingSubscriber(); + publisher.subscribe(subscriber); + assertEquals("onSubscribe", subscriber.invocations.take()); + Flow.Subscription subscription = (Flow.Subscription) subscriber.invocations.take(); + + // Verify the state after `request()` + subscription.request(Long.MAX_VALUE); + assertEquals("onNext", subscriber.invocations.take()); + ByteBuffer actualBuffer = (ByteBuffer) subscriber.invocations.take(); + ByteBufferUtils.assertEquals(expectedBuffer, actualBuffer, null); + assertEquals("onComplete", subscriber.invocations.take()); + + } + + @Test + void testNullContent() { + assertThrows(NullPointerException.class, () -> HttpRequest.BodyPublishers.ofString(null, CHARSET)); + } + + @Test + void testNullCharset() { + assertThrows(NullPointerException.class, () -> HttpRequest.BodyPublishers.ofString("foo", null)); + } + +} diff --git a/test/jdk/java/net/httpclient/HttpRequestBodyPublishers/RecordingSubscriber.java b/test/jdk/java/net/httpclient/HttpRequestBodyPublishers/RecordingSubscriber.java new file mode 100644 index 00000000000..0cd43c635ae --- /dev/null +++ b/test/jdk/java/net/httpclient/HttpRequestBodyPublishers/RecordingSubscriber.java @@ -0,0 +1,136 @@ +/* + * Copyright (c) 2025, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ + +import java.net.http.HttpRequest; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Flow; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.function.Consumer; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** + * Test subscriber recording received invocations. + */ +public final class RecordingSubscriber implements Flow.Subscriber { + + public final BlockingQueue invocations = new LinkedBlockingQueue<>(); + + @Override + public void onSubscribe(Flow.Subscription subscription) { + invocations.add("onSubscribe"); + invocations.add(subscription); + } + + @Override + public void onNext(ByteBuffer item) { + invocations.add("onNext"); + invocations.add(item); + } + + @Override + public synchronized void onError(Throwable throwable) { + invocations.add("onError"); + invocations.add(throwable); + } + + @Override + public synchronized void onComplete() { + invocations.add("onComplete"); + } + + /** + * Verifies the content length of the given publisher and subscribes to it. + */ + public Flow.Subscription verifyAndSubscribe(HttpRequest.BodyPublisher publisher, long contentLength) + throws InterruptedException { + assertEquals(contentLength, publisher.contentLength()); + publisher.subscribe(this); + assertEquals("onSubscribe", invocations.take()); + return (Flow.Subscription) invocations.take(); + } + + /** + * {@return the byte sequence collected by draining all emissions until completion} + * + * @param subscription a subscription to drain from + * @param itemCount the number of items to request per iteration + */ + public byte[] drainToByteArray(Flow.Subscription subscription, long itemCount) throws InterruptedException { + return drainToByteArray(subscription, itemCount, new ArrayList<>()); + } + + /** + * {@return the byte sequence collected by draining all emissions until completion} + * + * @param subscription a subscription to drain from + * @param itemCount the number of items to request per iteration + * @param buffers a list to accumulate the received content in + */ + public byte[] drainToByteArray(Flow.Subscription subscription, long itemCount, List buffers) + throws InterruptedException { + drainToAccumulator(subscription, itemCount, buffers::add); + return flattenBuffers(buffers); + } + + /** + * Drains all emissions until completion to the given {@code accumulator}. + * + * @param subscription a subscription to drain from + * @param itemCount the number of items to request per iteration + * @param accumulator an accumulator to pass the received content to + */ + public void drainToAccumulator( + Flow.Subscription subscription, long itemCount, Consumer accumulator) + throws InterruptedException { + boolean completed = false; + while (!completed) { + subscription.request(itemCount); + String op = (String) invocations.take(); + if ("onNext".equals(op)) { + ByteBuffer buffer = (ByteBuffer) invocations.take(); + accumulator.accept(buffer); + } else if ("onComplete".equals(op)) { + completed = true; + } else { + throw new AssertionError("Unexpected invocation: " + op); + } + } + } + + private static byte[] flattenBuffers(List buffers) { + int arrayLength = buffers.stream().mapToInt(ByteBuffer::limit).sum(); + byte[] array = new byte[arrayLength]; + for (int bufferIndex = 0, arrayOffset = 0; bufferIndex < buffers.size(); bufferIndex++) { + ByteBuffer buffer = buffers.get(bufferIndex); + int bufferLimit = buffer.limit(); + buffer.get(array, arrayOffset, bufferLimit); + arrayOffset += bufferLimit; + } + return array; + } + +}