8293786: HttpClient will not send more than 64 kb of data from the 2nd request in http2

Reviewed-by: dfuchs, djelinski
This commit is contained in:
Conor Cleary 2023-05-08 14:49:43 +00:00
parent 5a259d875e
commit ad90fb6da3
6 changed files with 233 additions and 17 deletions

View File

@ -182,9 +182,9 @@ class Stream<T> extends ExchangeImpl<T> {
}
while (!inputQ.isEmpty()) {
Http2Frame frame = inputQ.peek();
if (frame instanceof ResetFrame) {
if (frame instanceof ResetFrame rf) {
inputQ.remove();
handleReset((ResetFrame)frame, subscriber);
handleReset(rf, subscriber);
return;
}
DataFrame df = (DataFrame)frame;
@ -463,24 +463,23 @@ class Stream<T> extends ExchangeImpl<T> {
void incoming(Http2Frame frame) throws IOException {
if (debug.on()) debug.log("incoming: %s", frame);
var cancelled = checkRequestCancelled() || closed;
if ((frame instanceof HeaderFrame)) {
HeaderFrame hframe = (HeaderFrame) frame;
if (hframe.endHeaders()) {
if ((frame instanceof HeaderFrame hf)) {
if (hf.endHeaders()) {
Log.logTrace("handling response (streamid={0})", streamid);
handleResponse();
}
if (hframe.getFlag(HeaderFrame.END_STREAM)) {
if (hf.getFlag(HeaderFrame.END_STREAM)) {
if (debug.on()) debug.log("handling END_STREAM: %d", streamid);
receiveDataFrame(new DataFrame(streamid, DataFrame.END_STREAM, List.of()));
}
} else if (frame instanceof DataFrame) {
} else if (frame instanceof DataFrame df) {
if (cancelled) {
if (debug.on()) {
debug.log("request cancelled or stream closed: dropping data frame");
}
connection.dropDataFrame((DataFrame) frame);
connection.dropDataFrame(df);
} else {
receiveDataFrame((DataFrame) frame);
receiveDataFrame(df);
}
} else {
if (!cancelled) otherFrame(frame);
@ -554,6 +553,16 @@ class Stream<T> extends ExchangeImpl<T> {
} else {
Flow.Subscriber<?> subscriber =
responseSubscriber == null ? pendingResponseSubscriber : responseSubscriber;
if (!requestBodyCF.isDone()) {
// If a RST_STREAM is received, complete the requestBody. This will allow the
// response to be read before the Reset is handled in the case where the client's
// input stream is partially consumed or not consumed at all by the server.
if (frame.getErrorCode() != ResetFrame.NO_ERROR) {
requestBodyCF.completeExceptionally(new IOException("RST_STREAM received"));
} else {
requestBodyCF.complete(null);
}
}
if (response == null && subscriber == null) {
// we haven't received the headers yet, and won't receive any!
// handle reset now.

View File

@ -0,0 +1,171 @@
/*
* Copyright (c) 2023, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
/*
* @test
* @bug 8293786
* @summary Checks to see if the HttpClient can process a request to cancel a transmission from a remote if the server
* does not process any data. The client should read all data from the server and close the connection.
* @library /test/jdk/java/net/httpclient/lib
* @build jdk.httpclient.test.lib.http2.Http2TestServer
* @run testng/othervm/timeout=50 -Djdk.httpclient.HttpClient.log=all
* PostPutTest
*/
import jdk.httpclient.test.lib.http2.Http2Handler;
import jdk.httpclient.test.lib.http2.Http2TestExchange;
import jdk.httpclient.test.lib.http2.Http2TestServer;
import org.testng.annotations.AfterTest;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import java.io.IOException;
import java.io.PrintStream;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import static java.net.http.HttpClient.Version.HTTP_2;
import static java.net.http.HttpRequest.BodyPublishers.ofByteArray;
public class PostPutTest {
Http2TestServer http2TestServer;
URI warmupURI, testHandlerBasicURI, testHandlerCloseBosURI, testHandleNegativeContentLengthURI;
static PrintStream testLog = System.err;
// As per jdk.internal.net.http.WindowController.DEFAULT_INITIAL_WINDOW_SIZE
final int DEFAULT_INITIAL_WINDOW_SIZE = (64 * 1024) - 1;
// Add on a small amount of arbitrary bytes to see if client hangs when receiving RST_STREAM
byte[] data = new byte[DEFAULT_INITIAL_WINDOW_SIZE + 10];
@BeforeTest
public void setup() throws Exception {
http2TestServer = new Http2TestServer(false, 0);
http2TestServer.addHandler(new WarmupHandler(), "/Warmup");
http2TestServer.addHandler(new TestHandlerBasic(), "/TestHandlerBasic");
http2TestServer.addHandler(new TestHandlerCloseBos(), "/TestHandlerCloseBos");
http2TestServer.addHandler(new TestHandleNegativeContentLength(), "/TestHandleNegativeContentLength");
http2TestServer.start();
testLog.println("PostPutTest.setup(): Starting server");
warmupURI = new URI("http://" + http2TestServer.serverAuthority() + "/Warmup");
testHandlerBasicURI = new URI("http://" + http2TestServer.serverAuthority() + "/TestHandlerBasic");
testHandlerCloseBosURI = new URI("http://" + http2TestServer.serverAuthority() + "/TestHandlerCloseBos");
testHandleNegativeContentLengthURI = new URI("http://" + http2TestServer.serverAuthority() + "/TestHandleNegativeContentLength");
testLog.println("PostPutTest.setup(): warmupURI: " + warmupURI);
testLog.println("PostPutTest.setup(): testHandlerBasicURI: " + testHandlerBasicURI);
testLog.println("PostPutTest.setup(): testHandlerCloseBosURI: " + testHandlerCloseBosURI);
testLog.println("PostPutTest.setup(): testHandleNegativeContentLengthURI: " + testHandleNegativeContentLengthURI);
}
@AfterTest
public void teardown() {
testLog.println("PostPutTest.teardown(): Stopping server");
http2TestServer.stop();
data = null;
}
@DataProvider(name = "variants")
public Object[][] variants() {
HttpRequest over64kPost, over64kPut, over64kPostCloseBos, over64kPutCloseBos, over64kPostNegativeContentLength, over64kPutNegativeContentLength;
over64kPost = HttpRequest.newBuilder().version(HTTP_2).POST(ofByteArray(data)).uri(testHandlerBasicURI).build();
over64kPut = HttpRequest.newBuilder().version(HTTP_2).PUT(ofByteArray(data)).uri(testHandlerBasicURI).build();
over64kPostCloseBos = HttpRequest.newBuilder().version(HTTP_2).POST(ofByteArray(data)).uri(testHandlerCloseBosURI).build();
over64kPutCloseBos = HttpRequest.newBuilder().version(HTTP_2).PUT(ofByteArray(data)).uri(testHandlerCloseBosURI).build();
over64kPostNegativeContentLength = HttpRequest.newBuilder().version(HTTP_2).POST(ofByteArray(data)).uri(testHandleNegativeContentLengthURI).build();
over64kPutNegativeContentLength = HttpRequest.newBuilder().version(HTTP_2).PUT(ofByteArray(data)).uri(testHandleNegativeContentLengthURI).build();
return new Object[][] {
{ over64kPost, "POST data over 64k bytes" },
{ over64kPut, "PUT data over 64k bytes" },
{ over64kPostCloseBos, "POST data over 64k bytes with close bos" },
{ over64kPutCloseBos, "PUT data over 64k bytes with close bos" },
{ over64kPostNegativeContentLength, "POST data over 64k bytes with negative content length" },
{ over64kPutNegativeContentLength, "PUT data over 64k bytes with negative content length" }
};
}
public HttpRequest getWarmupReq() {
return HttpRequest.newBuilder()
.GET()
.uri(warmupURI)
.build();
}
@Test(dataProvider = "variants")
public void testOver64kPUT(HttpRequest req, String testMessage) {
testLog.println("PostPutTest: Performing test: " + testMessage);
HttpClient hc = HttpClient.newBuilder().version(HTTP_2).build();
hc.sendAsync(getWarmupReq(), HttpResponse.BodyHandlers.ofString()).join();
hc.sendAsync(req, HttpResponse.BodyHandlers.ofString()).join();
/*
If this test fails in timeout, it is likely due to one of two reasons:
- The responseSubscriber is null, so no incoming frames are being processed by the client
(See Stream::schedule)
- The test server is for some reason not sending a RST_STREAM with the NO_ERROR flag set after
sending an empty DATA frame with the END_STREAM flag set.
*/
}
private static class TestHandlerBasic implements Http2Handler {
@Override
public void handle(Http2TestExchange exchange) throws IOException {
// The input stream is not read in this bug as this will trigger window updates for the server. This bug
// concerns the case where no updates are sent and the server instead tells the client to abort the transmission.
exchange.sendResponseHeaders(200, 0);
}
}
private static class TestHandlerCloseBos implements Http2Handler {
@Override
public void handle(Http2TestExchange exchange) throws IOException {
// This case does actually cause the test to hang due to the body input stream being closed before it can send
// the RST_STREAM frame.
exchange.sendResponseHeaders(200, 0);
exchange.getResponseBody().close();
}
}
private static class TestHandleNegativeContentLength implements Http2Handler {
@Override
public void handle(Http2TestExchange exchange) throws IOException {
exchange.sendResponseHeaders(200, -1);
}
}
private static class WarmupHandler implements Http2Handler {
@Override
public void handle(Http2TestExchange exchange) throws IOException {
exchange.sendResponseHeaders(200, 0);
}
}
}

View File

@ -41,8 +41,8 @@ class BodyInputStream extends InputStream {
final Queue<Http2Frame> q;
final int streamid;
boolean closed;
boolean eof;
volatile boolean closed;
volatile boolean eof;
final Http2TestServerConnection conn;
@SuppressWarnings({"rawtypes","unchecked"})
@ -100,6 +100,11 @@ class BodyInputStream extends InputStream {
return buffer;
}
public boolean isEof() {
return eof;
}
@Override
public int read(byte[] buf, int offset, int length) throws IOException {
if (closed) {
@ -128,9 +133,12 @@ class BodyInputStream extends InputStream {
return one[0] & 0xFF;
}
public boolean unconsumed() {
return (!isEof() || q.size() > 0);
}
@Override
public void close() {
// TODO reset this stream
closed = true;
}
}

View File

@ -27,6 +27,7 @@ import java.io.*;
import java.nio.ByteBuffer;
import jdk.internal.net.http.frame.DataFrame;
import jdk.internal.net.http.frame.ResetFrame;
/**
* OutputStream. Incoming window updates handled by the main connection
@ -39,6 +40,8 @@ public class BodyOutputStream extends OutputStream {
final int streamid;
int window;
volatile boolean closed;
volatile BodyInputStream bis;
volatile int resetErrorCode;
boolean goodToGo = false; // not allowed to send until headers sent
final Http2TestServerConnection conn;
final Queue outputQ;
@ -131,13 +134,25 @@ public class BodyOutputStream extends OutputStream {
}
try {
sendEndStream();
if (bis!= null && bis.unconsumed()) {
// Send a reset if there is still unconsumed data in the input stream
sendReset(EMPTY_BARRAY, 0, 0, ResetFrame.NO_ERROR);
}
} catch (IOException ex) {
System.err.println("TestServer: OutputStream.close exception: " + ex);
ex.printStackTrace();
}
}
protected void sendEndStream() throws IOException {
public void sendEndStream() throws IOException {
send(EMPTY_BARRAY, 0, 0, DataFrame.END_STREAM);
}
public void sendReset(byte[] buf, int offset, int len, int flags) throws IOException {
ByteBuffer buffer = ByteBuffer.allocate(len);
buffer.put(buf, offset, len);
buffer.flip();
assert streamid != 0;
ResetFrame rf = new ResetFrame(streamid, flags);
outputQ.put(rf);
}
}

View File

@ -26,6 +26,7 @@ package jdk.httpclient.test.lib.http2;
import jdk.internal.net.http.common.HttpHeadersBuilder;
import jdk.internal.net.http.frame.HeaderFrame;
import jdk.internal.net.http.frame.HeadersFrame;
import jdk.internal.net.http.frame.ResetFrame;
import javax.net.ssl.SSLSession;
import java.io.IOException;
@ -45,7 +46,7 @@ public class Http2TestExchangeImpl implements Http2TestExchange {
protected final HttpHeadersBuilder rspheadersBuilder;
final URI uri;
final String method;
final InputStream is;
protected final InputStream is;
protected final BodyOutputStream os;
final SSLSession sslSession;
protected final int streamid;
@ -149,9 +150,16 @@ public class Http2TestExchangeImpl implements Http2TestExchange {
if (responseLength < 0 || rCode == 204) {
response.setFlag(HeadersFrame.END_STREAM);
conn.outputQ.put(response);
// Put a reset frame on the outputQ if there is still unconsumed data in the input stream and output stream
// is going to be marked closed.
if (is instanceof BodyInputStream bis && bis.unconsumed()) {
conn.outputQ.put(new ResetFrame(streamid, ResetFrame.NO_ERROR));
}
os.markClosed();
} else {
conn.outputQ.put(response);
}
conn.outputQ.put(response);
os.goodToGo();
System.err.println("Sent response headers " + rCode);
}

View File

@ -718,6 +718,11 @@ public class Http2TestServerConnection {
// give to user
Http2Handler handler = server.getHandlerFor(uri.getPath());
// Need to pass the BodyInputStream reference to the BodyOutputStream, so it can determine if the stream
// must be reset due to the BodyInputStream not being consumed by the handler when invoked.
if (bis instanceof BodyInputStream bodyInputStream) bos.bis = bodyInputStream;
try {
handler.handle(exchange);
} catch (IOException closed) {
@ -966,7 +971,7 @@ public class Http2TestServerConnection {
SettingsFrame.INITIAL_WINDOW_SIZE), this) {
@Override
protected void sendEndStream() throws IOException {
public void sendEndStream() throws IOException {
if (properties.getProperty("sendTrailingHeadersAfterPushPromise", "0").equals("1")) {
conn.outputQ.put(getTrailingHeadersFrame(promisedStreamid, List.of()));
} else {