From ad90fb6da38da066dfc7a5439196887bbcda766f Mon Sep 17 00:00:00 2001 From: Conor Cleary Date: Mon, 8 May 2023 14:49:43 +0000 Subject: [PATCH] 8293786: HttpClient will not send more than 64 kb of data from the 2nd request in http2 Reviewed-by: dfuchs, djelinski --- .../classes/jdk/internal/net/http/Stream.java | 27 ++- .../net/httpclient/http2/PostPutTest.java | 171 ++++++++++++++++++ .../test/lib/http2/BodyInputStream.java | 14 +- .../test/lib/http2/BodyOutputStream.java | 19 +- .../test/lib/http2/Http2TestExchangeImpl.java | 12 +- .../lib/http2/Http2TestServerConnection.java | 7 +- 6 files changed, 233 insertions(+), 17 deletions(-) create mode 100644 test/jdk/java/net/httpclient/http2/PostPutTest.java diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java b/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java index d24b87f2d04..28955b40bee 100644 --- a/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java +++ b/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java @@ -182,9 +182,9 @@ class Stream extends ExchangeImpl { } while (!inputQ.isEmpty()) { Http2Frame frame = inputQ.peek(); - if (frame instanceof ResetFrame) { + if (frame instanceof ResetFrame rf) { inputQ.remove(); - handleReset((ResetFrame)frame, subscriber); + handleReset(rf, subscriber); return; } DataFrame df = (DataFrame)frame; @@ -463,24 +463,23 @@ class Stream extends ExchangeImpl { void incoming(Http2Frame frame) throws IOException { if (debug.on()) debug.log("incoming: %s", frame); var cancelled = checkRequestCancelled() || closed; - if ((frame instanceof HeaderFrame)) { - HeaderFrame hframe = (HeaderFrame) frame; - if (hframe.endHeaders()) { + if ((frame instanceof HeaderFrame hf)) { + if (hf.endHeaders()) { Log.logTrace("handling response (streamid={0})", streamid); handleResponse(); } - if (hframe.getFlag(HeaderFrame.END_STREAM)) { + if (hf.getFlag(HeaderFrame.END_STREAM)) { if (debug.on()) debug.log("handling END_STREAM: %d", streamid); receiveDataFrame(new DataFrame(streamid, DataFrame.END_STREAM, List.of())); } - } else if (frame instanceof DataFrame) { + } else if (frame instanceof DataFrame df) { if (cancelled) { if (debug.on()) { debug.log("request cancelled or stream closed: dropping data frame"); } - connection.dropDataFrame((DataFrame) frame); + connection.dropDataFrame(df); } else { - receiveDataFrame((DataFrame) frame); + receiveDataFrame(df); } } else { if (!cancelled) otherFrame(frame); @@ -554,6 +553,16 @@ class Stream extends ExchangeImpl { } else { Flow.Subscriber subscriber = responseSubscriber == null ? pendingResponseSubscriber : responseSubscriber; + if (!requestBodyCF.isDone()) { + // If a RST_STREAM is received, complete the requestBody. This will allow the + // response to be read before the Reset is handled in the case where the client's + // input stream is partially consumed or not consumed at all by the server. + if (frame.getErrorCode() != ResetFrame.NO_ERROR) { + requestBodyCF.completeExceptionally(new IOException("RST_STREAM received")); + } else { + requestBodyCF.complete(null); + } + } if (response == null && subscriber == null) { // we haven't received the headers yet, and won't receive any! // handle reset now. diff --git a/test/jdk/java/net/httpclient/http2/PostPutTest.java b/test/jdk/java/net/httpclient/http2/PostPutTest.java new file mode 100644 index 00000000000..89b192f6171 --- /dev/null +++ b/test/jdk/java/net/httpclient/http2/PostPutTest.java @@ -0,0 +1,171 @@ +/* + * Copyright (c) 2023, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ + +/* + * @test + * @bug 8293786 + * @summary Checks to see if the HttpClient can process a request to cancel a transmission from a remote if the server + * does not process any data. The client should read all data from the server and close the connection. + * @library /test/jdk/java/net/httpclient/lib + * @build jdk.httpclient.test.lib.http2.Http2TestServer + * @run testng/othervm/timeout=50 -Djdk.httpclient.HttpClient.log=all + * PostPutTest + */ + +import jdk.httpclient.test.lib.http2.Http2Handler; +import jdk.httpclient.test.lib.http2.Http2TestExchange; +import jdk.httpclient.test.lib.http2.Http2TestServer; + +import org.testng.annotations.AfterTest; +import org.testng.annotations.BeforeTest; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.io.PrintStream; +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; + +import static java.net.http.HttpClient.Version.HTTP_2; +import static java.net.http.HttpRequest.BodyPublishers.ofByteArray; + +public class PostPutTest { + + Http2TestServer http2TestServer; + URI warmupURI, testHandlerBasicURI, testHandlerCloseBosURI, testHandleNegativeContentLengthURI; + static PrintStream testLog = System.err; + + // As per jdk.internal.net.http.WindowController.DEFAULT_INITIAL_WINDOW_SIZE + final int DEFAULT_INITIAL_WINDOW_SIZE = (64 * 1024) - 1; + // Add on a small amount of arbitrary bytes to see if client hangs when receiving RST_STREAM + byte[] data = new byte[DEFAULT_INITIAL_WINDOW_SIZE + 10]; + + @BeforeTest + public void setup() throws Exception { + http2TestServer = new Http2TestServer(false, 0); + http2TestServer.addHandler(new WarmupHandler(), "/Warmup"); + http2TestServer.addHandler(new TestHandlerBasic(), "/TestHandlerBasic"); + http2TestServer.addHandler(new TestHandlerCloseBos(), "/TestHandlerCloseBos"); + http2TestServer.addHandler(new TestHandleNegativeContentLength(), "/TestHandleNegativeContentLength"); + http2TestServer.start(); + testLog.println("PostPutTest.setup(): Starting server"); + warmupURI = new URI("http://" + http2TestServer.serverAuthority() + "/Warmup"); + testHandlerBasicURI = new URI("http://" + http2TestServer.serverAuthority() + "/TestHandlerBasic"); + testHandlerCloseBosURI = new URI("http://" + http2TestServer.serverAuthority() + "/TestHandlerCloseBos"); + testHandleNegativeContentLengthURI = new URI("http://" + http2TestServer.serverAuthority() + "/TestHandleNegativeContentLength"); + testLog.println("PostPutTest.setup(): warmupURI: " + warmupURI); + testLog.println("PostPutTest.setup(): testHandlerBasicURI: " + testHandlerBasicURI); + testLog.println("PostPutTest.setup(): testHandlerCloseBosURI: " + testHandlerCloseBosURI); + testLog.println("PostPutTest.setup(): testHandleNegativeContentLengthURI: " + testHandleNegativeContentLengthURI); + } + + @AfterTest + public void teardown() { + testLog.println("PostPutTest.teardown(): Stopping server"); + http2TestServer.stop(); + data = null; + } + + @DataProvider(name = "variants") + public Object[][] variants() { + HttpRequest over64kPost, over64kPut, over64kPostCloseBos, over64kPutCloseBos, over64kPostNegativeContentLength, over64kPutNegativeContentLength; + over64kPost = HttpRequest.newBuilder().version(HTTP_2).POST(ofByteArray(data)).uri(testHandlerBasicURI).build(); + over64kPut = HttpRequest.newBuilder().version(HTTP_2).PUT(ofByteArray(data)).uri(testHandlerBasicURI).build(); + + over64kPostCloseBos = HttpRequest.newBuilder().version(HTTP_2).POST(ofByteArray(data)).uri(testHandlerCloseBosURI).build(); + over64kPutCloseBos = HttpRequest.newBuilder().version(HTTP_2).PUT(ofByteArray(data)).uri(testHandlerCloseBosURI).build(); + + over64kPostNegativeContentLength = HttpRequest.newBuilder().version(HTTP_2).POST(ofByteArray(data)).uri(testHandleNegativeContentLengthURI).build(); + over64kPutNegativeContentLength = HttpRequest.newBuilder().version(HTTP_2).PUT(ofByteArray(data)).uri(testHandleNegativeContentLengthURI).build(); + + return new Object[][] { + { over64kPost, "POST data over 64k bytes" }, + { over64kPut, "PUT data over 64k bytes" }, + { over64kPostCloseBos, "POST data over 64k bytes with close bos" }, + { over64kPutCloseBos, "PUT data over 64k bytes with close bos" }, + { over64kPostNegativeContentLength, "POST data over 64k bytes with negative content length" }, + { over64kPutNegativeContentLength, "PUT data over 64k bytes with negative content length" } + }; + } + + public HttpRequest getWarmupReq() { + return HttpRequest.newBuilder() + .GET() + .uri(warmupURI) + .build(); + } + + @Test(dataProvider = "variants") + public void testOver64kPUT(HttpRequest req, String testMessage) { + testLog.println("PostPutTest: Performing test: " + testMessage); + HttpClient hc = HttpClient.newBuilder().version(HTTP_2).build(); + hc.sendAsync(getWarmupReq(), HttpResponse.BodyHandlers.ofString()).join(); + hc.sendAsync(req, HttpResponse.BodyHandlers.ofString()).join(); + /* + If this test fails in timeout, it is likely due to one of two reasons: + - The responseSubscriber is null, so no incoming frames are being processed by the client + (See Stream::schedule) + - The test server is for some reason not sending a RST_STREAM with the NO_ERROR flag set after + sending an empty DATA frame with the END_STREAM flag set. + */ + } + + private static class TestHandlerBasic implements Http2Handler { + + @Override + public void handle(Http2TestExchange exchange) throws IOException { + // The input stream is not read in this bug as this will trigger window updates for the server. This bug + // concerns the case where no updates are sent and the server instead tells the client to abort the transmission. + exchange.sendResponseHeaders(200, 0); + } + } + + private static class TestHandlerCloseBos implements Http2Handler { + + @Override + public void handle(Http2TestExchange exchange) throws IOException { + // This case does actually cause the test to hang due to the body input stream being closed before it can send + // the RST_STREAM frame. + exchange.sendResponseHeaders(200, 0); + exchange.getResponseBody().close(); + } + } + + private static class TestHandleNegativeContentLength implements Http2Handler { + + @Override + public void handle(Http2TestExchange exchange) throws IOException { + exchange.sendResponseHeaders(200, -1); + } + } + + private static class WarmupHandler implements Http2Handler { + + @Override + public void handle(Http2TestExchange exchange) throws IOException { + exchange.sendResponseHeaders(200, 0); + } + } +} \ No newline at end of file diff --git a/test/jdk/java/net/httpclient/lib/jdk/httpclient/test/lib/http2/BodyInputStream.java b/test/jdk/java/net/httpclient/lib/jdk/httpclient/test/lib/http2/BodyInputStream.java index 71372d5e95f..f9ba87557e7 100644 --- a/test/jdk/java/net/httpclient/lib/jdk/httpclient/test/lib/http2/BodyInputStream.java +++ b/test/jdk/java/net/httpclient/lib/jdk/httpclient/test/lib/http2/BodyInputStream.java @@ -41,8 +41,8 @@ class BodyInputStream extends InputStream { final Queue q; final int streamid; - boolean closed; - boolean eof; + volatile boolean closed; + volatile boolean eof; final Http2TestServerConnection conn; @SuppressWarnings({"rawtypes","unchecked"}) @@ -100,6 +100,11 @@ class BodyInputStream extends InputStream { return buffer; } + + public boolean isEof() { + return eof; + } + @Override public int read(byte[] buf, int offset, int length) throws IOException { if (closed) { @@ -128,9 +133,12 @@ class BodyInputStream extends InputStream { return one[0] & 0xFF; } + public boolean unconsumed() { + return (!isEof() || q.size() > 0); + } + @Override public void close() { - // TODO reset this stream closed = true; } } diff --git a/test/jdk/java/net/httpclient/lib/jdk/httpclient/test/lib/http2/BodyOutputStream.java b/test/jdk/java/net/httpclient/lib/jdk/httpclient/test/lib/http2/BodyOutputStream.java index 81b1d973239..c091b7ecf9b 100644 --- a/test/jdk/java/net/httpclient/lib/jdk/httpclient/test/lib/http2/BodyOutputStream.java +++ b/test/jdk/java/net/httpclient/lib/jdk/httpclient/test/lib/http2/BodyOutputStream.java @@ -27,6 +27,7 @@ import java.io.*; import java.nio.ByteBuffer; import jdk.internal.net.http.frame.DataFrame; +import jdk.internal.net.http.frame.ResetFrame; /** * OutputStream. Incoming window updates handled by the main connection @@ -39,6 +40,8 @@ public class BodyOutputStream extends OutputStream { final int streamid; int window; volatile boolean closed; + volatile BodyInputStream bis; + volatile int resetErrorCode; boolean goodToGo = false; // not allowed to send until headers sent final Http2TestServerConnection conn; final Queue outputQ; @@ -131,13 +134,25 @@ public class BodyOutputStream extends OutputStream { } try { sendEndStream(); + if (bis!= null && bis.unconsumed()) { + // Send a reset if there is still unconsumed data in the input stream + sendReset(EMPTY_BARRAY, 0, 0, ResetFrame.NO_ERROR); + } } catch (IOException ex) { - System.err.println("TestServer: OutputStream.close exception: " + ex); ex.printStackTrace(); } } - protected void sendEndStream() throws IOException { + public void sendEndStream() throws IOException { send(EMPTY_BARRAY, 0, 0, DataFrame.END_STREAM); } + + public void sendReset(byte[] buf, int offset, int len, int flags) throws IOException { + ByteBuffer buffer = ByteBuffer.allocate(len); + buffer.put(buf, offset, len); + buffer.flip(); + assert streamid != 0; + ResetFrame rf = new ResetFrame(streamid, flags); + outputQ.put(rf); + } } diff --git a/test/jdk/java/net/httpclient/lib/jdk/httpclient/test/lib/http2/Http2TestExchangeImpl.java b/test/jdk/java/net/httpclient/lib/jdk/httpclient/test/lib/http2/Http2TestExchangeImpl.java index 8f7c154e656..be52e4856ed 100644 --- a/test/jdk/java/net/httpclient/lib/jdk/httpclient/test/lib/http2/Http2TestExchangeImpl.java +++ b/test/jdk/java/net/httpclient/lib/jdk/httpclient/test/lib/http2/Http2TestExchangeImpl.java @@ -26,6 +26,7 @@ package jdk.httpclient.test.lib.http2; import jdk.internal.net.http.common.HttpHeadersBuilder; import jdk.internal.net.http.frame.HeaderFrame; import jdk.internal.net.http.frame.HeadersFrame; +import jdk.internal.net.http.frame.ResetFrame; import javax.net.ssl.SSLSession; import java.io.IOException; @@ -45,7 +46,7 @@ public class Http2TestExchangeImpl implements Http2TestExchange { protected final HttpHeadersBuilder rspheadersBuilder; final URI uri; final String method; - final InputStream is; + protected final InputStream is; protected final BodyOutputStream os; final SSLSession sslSession; protected final int streamid; @@ -149,9 +150,16 @@ public class Http2TestExchangeImpl implements Http2TestExchange { if (responseLength < 0 || rCode == 204) { response.setFlag(HeadersFrame.END_STREAM); + conn.outputQ.put(response); + // Put a reset frame on the outputQ if there is still unconsumed data in the input stream and output stream + // is going to be marked closed. + if (is instanceof BodyInputStream bis && bis.unconsumed()) { + conn.outputQ.put(new ResetFrame(streamid, ResetFrame.NO_ERROR)); + } os.markClosed(); + } else { + conn.outputQ.put(response); } - conn.outputQ.put(response); os.goodToGo(); System.err.println("Sent response headers " + rCode); } diff --git a/test/jdk/java/net/httpclient/lib/jdk/httpclient/test/lib/http2/Http2TestServerConnection.java b/test/jdk/java/net/httpclient/lib/jdk/httpclient/test/lib/http2/Http2TestServerConnection.java index 17bdeee7e67..5bedc8dff9a 100644 --- a/test/jdk/java/net/httpclient/lib/jdk/httpclient/test/lib/http2/Http2TestServerConnection.java +++ b/test/jdk/java/net/httpclient/lib/jdk/httpclient/test/lib/http2/Http2TestServerConnection.java @@ -718,6 +718,11 @@ public class Http2TestServerConnection { // give to user Http2Handler handler = server.getHandlerFor(uri.getPath()); + + // Need to pass the BodyInputStream reference to the BodyOutputStream, so it can determine if the stream + // must be reset due to the BodyInputStream not being consumed by the handler when invoked. + if (bis instanceof BodyInputStream bodyInputStream) bos.bis = bodyInputStream; + try { handler.handle(exchange); } catch (IOException closed) { @@ -966,7 +971,7 @@ public class Http2TestServerConnection { SettingsFrame.INITIAL_WINDOW_SIZE), this) { @Override - protected void sendEndStream() throws IOException { + public void sendEndStream() throws IOException { if (properties.getProperty("sendTrailingHeadersAfterPushPromise", "0").equals("1")) { conn.outputQ.put(getTrailingHeadersFrame(promisedStreamid, List.of())); } else {