8347373: HTTP/2 flow control checks may count unprocessed data twice

Reviewed-by: jpai
This commit is contained in:
Daniel Fuchs 2025-01-13 12:05:44 +00:00
parent 450636ae28
commit 06126361db
5 changed files with 112 additions and 67 deletions

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2015, 2024, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2015, 2025, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -117,18 +117,24 @@ abstract class WindowUpdateSender {
* the caller wants to buffer.
*/
boolean canBufferUnprocessedBytes(int len) {
return !checkWindowSizeExceeded(unprocessed.addAndGet(len));
long buffered, processed;
// get received before unprocessed in order to avoid counting
// unprocessed bytes that might get unbuffered asynchronously
// twice.
processed = received.get();
buffered = unprocessed.addAndGet(len);
return !checkWindowSizeExceeded(processed, buffered);
}
// adds the provided amount to the amount of already
// received and processed bytes and checks whether the
// processed and processed bytes and checks whether the
// flow control window is exceeded. If so, take
// corrective actions and return true.
private boolean checkWindowSizeExceeded(long len) {
private boolean checkWindowSizeExceeded(long processed, long len) {
// because windowSize is bound by Integer.MAX_VALUE
// we will never reach the point where received.get() + len
// could overflow
long rcv = received.get() + len;
long rcv = processed + len;
return rcv > windowSize && windowSizeExceeded(rcv);
}
@ -143,6 +149,7 @@ abstract class WindowUpdateSender {
* @param delta the amount of processed bytes to release
*/
void processed(int delta) {
assert delta >= 0 : delta;
long rest = unprocessed.addAndGet(-delta);
assert rest >= 0;
update(delta);
@ -166,6 +173,7 @@ abstract class WindowUpdateSender {
* @return the amount of remaining unprocessed bytes
*/
long released(int delta) {
assert delta >= 0 : delta;
long rest = unprocessed.addAndGet(-delta);
assert rest >= 0;
return rest;
@ -195,7 +203,7 @@ abstract class WindowUpdateSender {
try {
int tosend = (int)Math.min(received.get(), Integer.MAX_VALUE);
if (tosend > limit) {
received.getAndAdd(-tosend);
received.addAndGet(-tosend);
sendWindowUpdate(tosend);
}
} finally {

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2024, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2024, 2025, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -34,6 +34,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.net.ProtocolException;
import java.net.URI;
@ -335,7 +336,9 @@ public class StreamFlowControlTest {
// to wait for the connection window
fct.conn.obtainConnectionWindow(resp.length);
} catch (InterruptedException ie) {
// ignore and continue...
var ioe = new InterruptedIOException(ie.toString());
ioe.initCause(ie);
throw ioe;
}
}
try {
@ -344,6 +347,7 @@ public class StreamFlowControlTest {
if (t instanceof FCHttp2TestExchange fct) {
fct.conn.updateConnectionWindow(resp.length);
}
throw x;
}
}
} finally {

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2018, 2024, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2018, 2025, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -239,7 +239,7 @@ public interface HttpServerAdapters {
public abstract OutputStream getResponseBody();
public abstract HttpTestRequestHeaders getRequestHeaders();
public abstract HttpTestResponseHeaders getResponseHeaders();
public abstract void sendResponseHeaders(int code, int contentLength) throws IOException;
public abstract void sendResponseHeaders(int code, long contentLength) throws IOException;
public abstract URI getRequestURI();
public abstract String getRequestMethod();
public abstract void close();
@ -292,7 +292,7 @@ public interface HttpServerAdapters {
return HttpTestResponseHeaders.of(exchange.getResponseHeaders());
}
@Override
public void sendResponseHeaders(int code, int contentLength) throws IOException {
public void sendResponseHeaders(int code, long contentLength) throws IOException {
if (contentLength == 0) contentLength = -1;
else if (contentLength < 0) contentLength = 0;
exchange.sendResponseHeaders(code, contentLength);
@ -355,7 +355,7 @@ public interface HttpServerAdapters {
return HttpTestResponseHeaders.of(exchange.getResponseHeaders());
}
@Override
public void sendResponseHeaders(int code, int contentLength) throws IOException {
public void sendResponseHeaders(int code, long contentLength) throws IOException {
if (contentLength == 0) contentLength = -1;
else if (contentLength < 0) contentLength = 0;
exchange.sendResponseHeaders(code, contentLength);

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2016, 2024, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2016, 2025, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -63,22 +63,28 @@ public class BodyOutputStream extends OutputStream {
}
void waitForWindow(int demand) throws InterruptedException {
// first wait for the connection window
conn.obtainConnectionWindow(demand);
// now wait for the stream window
// first wait for the stream window
waitForStreamWindow(demand);
// now wait for the connection window
conn.obtainConnectionWindow(demand);
}
public void waitForStreamWindow(int demand) throws InterruptedException {
synchronized (this) {
while (demand > 0) {
int n = Math.min(demand, window);
demand -= n;
window -= n;
if (demand > 0) {
wait();
public void waitForStreamWindow(int amount) throws InterruptedException {
int demand = amount;
try {
synchronized (this) {
while (amount > 0) {
int n = Math.min(amount, window);
amount -= n;
window -= n;
if (amount > 0) {
wait();
}
}
}
} catch (Throwable t) {
window += (demand - amount);
throw t;
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2015, 2024, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2015, 2025, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -171,7 +171,7 @@ public class Http2TestServerConnection {
Properties properties)
throws IOException
{
System.err.println("TestServer: New connection from " + socket);
System.err.println(server.name + ": New connection from " + socket);
if (socket instanceof SSLSocket) {
SSLSocket sslSocket = (SSLSocket)socket;
@ -218,10 +218,10 @@ public class Http2TestServerConnection {
String prop = properties.getProperty(propPrefix + key);
if (prop != null) {
try {
System.err.println("TestServer: setting " + key + " property to: " +
System.err.println(server.name + ": setting " + key + " property to: " +
prop);
int num = Integer.parseInt(numS);
System.err.println("TestServer: num = " + num);
System.err.println(server.name + ": num = " + num);
s.setParameter(num, Integer.parseInt(prop));
} catch (NumberFormatException e) {/* ignore errors */}
}
@ -269,7 +269,7 @@ public class Http2TestServerConnection {
}
final GoAwayFrame frame = new GoAwayFrame(maxProcessedStreamId, error);
outputQ.put(frame);
System.err.println("Sending GOAWAY frame " + frame + " from server connection " + this);
System.err.println(server.name + ": Sending GOAWAY frame " + frame + " from server connection " + this);
}
/**
@ -285,7 +285,7 @@ public class Http2TestServerConnection {
*/
void handlePing(PingFrame ping) throws IOException {
if (ping.streamid() != 0) {
System.err.println("Invalid ping received");
System.err.println(server.name + ": Invalid ping received");
close(ErrorFrame.PROTOCOL_ERROR);
return;
}
@ -293,7 +293,7 @@ public class Http2TestServerConnection {
// did we send a Ping?
PingRequest request = getNextRequest();
if (request == null) {
System.err.println("Invalid ping ACK received");
System.err.println(server.name + ": Invalid ping ACK received");
close(ErrorFrame.PROTOCOL_ERROR);
return;
} else if (!Arrays.equals(request.pingData, ping.getData())) {
@ -356,7 +356,7 @@ public class Http2TestServerConnection {
if (stopping)
return;
stopping = true;
System.err.printf("Server connection to %s stopping. %d streams\n",
System.err.printf(server.name + ": Server connection to %s stopping. %d streams\n",
socket.getRemoteSocketAddress().toString(), streams.size());
streams.forEach((i, q) -> {
q.orderlyClose();
@ -376,16 +376,20 @@ public class Http2TestServerConnection {
byte[] bytes = new byte[len];
int n = is.readNBytes(bytes, 0, len);
if (Arrays.compare(clientPreface, bytes) != 0) {
System.err.printf("Invalid preface: read %d/%d bytes%n", n, len);
throw new IOException("Invalid preface: " +
new String(bytes, 0, len, ISO_8859_1));
String msg = String.format("Invalid preface: read %s/%s bytes", n, len);
System.err.println(server.name + ": " + msg);
throw new IOException(msg +": \"" +
new String(bytes, 0, n, ISO_8859_1)
.replace("\r", "\\r")
.replace("\n", "\\n")
+ "\"");
}
}
Http1InitialRequest doUpgrade(Http1InitialRequest upgrade) throws IOException {
String h2c = getHeader(upgrade.headers, "Upgrade");
if (h2c == null || !h2c.equals("h2c")) {
System.err.println("Server:HEADERS: " + upgrade);
System.err.println(server.name + ":HEADERS: " + upgrade);
throw new IOException("Bad upgrade 1 " + h2c);
}
@ -457,7 +461,7 @@ public class Http2TestServerConnection {
socket.close();
return;
} else {
System.err.println("Server:HEADERS: " + upgrade);
System.err.println(server.name + ":HEADERS: " + upgrade);
throw new IOException("Bad upgrade 1 " + h2c);
}
}
@ -556,7 +560,7 @@ public class Http2TestServerConnection {
outputQ.put(frame);
return;
} else if (f instanceof GoAwayFrame) {
System.err.println("Closing: "+ f.toString());
System.err.println(server.name + ": Closing connection: "+ f.toString());
close(ErrorFrame.NO_ERROR);
} else if (f instanceof PingFrame) {
handlePing((PingFrame)f);
@ -649,7 +653,7 @@ public class Http2TestServerConnection {
// skip processing the request if configured to do so
final String connKey = connectionKey();
if (!shouldProcessNewHTTPRequest(connKey)) {
System.err.println("Rejecting primordial stream 1 and sending GOAWAY" +
System.err.println(server.name + ": Rejecting primordial stream 1 and sending GOAWAY" +
" on server connection " + connKey + ", for request: " + path);
sendGoAway(ErrorFrame.NO_ERROR);
return;
@ -726,7 +730,7 @@ public class Http2TestServerConnection {
final String connKey = connectionKey();
final String path = headers.firstValue(":path").orElse("");
if (!shouldProcessNewHTTPRequest(connKey)) {
System.err.println("Rejecting stream " + streamid
System.err.println(server.name + ": Rejecting stream " + streamid
+ " and sending GOAWAY on server connection "
+ connKey + ", for request: " + path);
sendGoAway(ErrorFrame.NO_ERROR);
@ -764,17 +768,17 @@ public class Http2TestServerConnection {
//System.out.println("scheme = " + scheme);
String authority = headers.firstValue(":authority").orElse("");
//System.out.println("authority = " + authority);
System.err.printf("TestServer: %s %s\n", method, path);
System.err.printf(server.name + ": %s %s\n", method, path);
int winsize = clientSettings.getParameter(
SettingsFrame.INITIAL_WINDOW_SIZE);
//System.err.println ("Stream window size = " + winsize);
final InputStream bis;
if (endStreamReceived && queue.size() == 0) {
System.err.println("Server: got END_STREAM for stream " + streamid);
System.err.println(server.name + ": got END_STREAM for stream " + streamid);
bis = NullInputStream.INSTANCE;
} else {
System.err.println("Server: creating input stream for stream " + streamid);
System.err.println(server.name + ": creating input stream for stream " + streamid);
bis = new BodyInputStream(queue, streamid, this);
}
try (bis;
@ -802,7 +806,7 @@ public class Http2TestServerConnection {
if (bos.closed) {
Queue q = streams.get(streamid);
if (q != null && (q.isClosed() || q.isClosing())) {
System.err.println("TestServer: Stream " + streamid + " closed: " + closed);
System.err.println(server.name + ": Stream " + streamid + " closed: " + closed);
return;
}
}
@ -812,7 +816,7 @@ public class Http2TestServerConnection {
// everything happens in the exchange from here. Hopefully will
// return though.
} catch (Throwable e) {
System.err.println("TestServer: handleRequest exception: " + e);
System.err.println(server.name + ": handleRequest exception: " + e);
e.printStackTrace();
close(-1);
}
@ -844,7 +848,7 @@ public class Http2TestServerConnection {
while (!stopping) {
Http2Frame frame = readFrameImpl();
if (frame == null) {
System.err.println("EOF reached on connection " + connectionKey()
System.err.println(server.name + ": EOF reached on connection " + connectionKey()
+ ", will no longer accept incoming frames");
closeIncoming();
return;
@ -865,7 +869,7 @@ public class Http2TestServerConnection {
Queue q = streams.get(stream);
if (frame.type() == HeadersFrame.TYPE) {
if (q != null) {
System.err.println("HEADERS frame for existing stream! Error.");
System.err.println(server.name + ": HEADERS frame for existing stream! Error.");
// TODO: close connection
continue;
} else {
@ -874,7 +878,8 @@ public class Http2TestServerConnection {
// if we already sent a goaway, then don't create new streams with
// higher stream ids.
if (finalProcessedStreamId != -1 && streamId > finalProcessedStreamId) {
System.err.println(connectionKey() + " resetting stream " + streamId
System.err.println(server.name + ": " + connectionKey()
+ " resetting stream " + streamId
+ " as REFUSED_STREAM");
final ResetFrame rst = new ResetFrame(streamId, REFUSED_STREAM);
outputQ.put(rst);
@ -884,7 +889,7 @@ public class Http2TestServerConnection {
}
} else {
if (q == null && !pushStreams.contains(stream)) {
System.err.printf("Non Headers frame received with"+
System.err.printf(server.name + ": Non Headers frame received with"+
" non existing stream (%d) ", frame.streamid());
System.err.println(frame);
continue;
@ -914,21 +919,21 @@ public class Http2TestServerConnection {
} else if (isClientStreamId(stream) && stream < next) {
// We may receive a reset on a client stream that has already
// been closed. Just ignore it.
System.err.println("TestServer: received ResetFrame on closed stream: " + stream);
System.err.println(server.name + ": received ResetFrame on closed stream: " + stream);
System.err.println(frame);
} else if (isServerStreamId(stream) && stream < nextPush) {
// We may receive a reset on a push stream that has already
// been closed. Just ignore it.
System.err.println("TestServer: received ResetFrame on closed push stream: " + stream);
System.err.println(server.name + ": received ResetFrame on closed push stream: " + stream);
System.err.println(frame);
} else {
System.err.println("TestServer: Unexpected frame on: " + stream);
System.err.println(server.name + ": Unexpected frame on: " + stream);
System.err.println(frame);
throw new IOException("Unexpected frame");
}
} else {
if (!q.putIfOpen(frame)) {
System.err.printf("Stream %s is closed: dropping %s%n",
System.err.printf(server.name + ": Stream %s is closed: dropping %s%n",
stream, frame);
}
}
@ -937,7 +942,7 @@ public class Http2TestServerConnection {
}
} catch (Throwable e) {
if (!stopping) {
System.err.println("Http server reader thread shutdown");
System.err.println(server.name + ": Http server reader thread shutdown");
e.printStackTrace();
}
close(ErrorFrame.PROTOCOL_ERROR);
@ -1078,7 +1083,7 @@ public class Http2TestServerConnection {
: new ContinuationFrame(rh.streamid(), flags, list);
if (Log.headers()) {
// avoid too much chatter: log only if Log.headers() is enabled
System.err.println("TestServer writing " + hf);
System.err.println(server.name + ": writing " + hf);
}
writeFrame(hf);
cont++;
@ -1088,7 +1093,7 @@ public class Http2TestServerConnection {
} else
writeFrame(frame);
}
System.err.println("TestServer: Connection writer stopping");
System.err.println(server.name + ": Connection writer stopping " + connectionKey());
} catch (Throwable e) {
e.printStackTrace();
/*close();
@ -1140,7 +1145,7 @@ public class Http2TestServerConnection {
ii.transferTo(oo);
} catch (Throwable ex) {
System.err.printf("TestServer: pushing response error: %s\n",
System.err.printf(server.name + ": pushing response error: %s\n",
ex.toString());
} finally {
closeIgnore(ii);
@ -1303,7 +1308,7 @@ public class Http2TestServerConnection {
}
return new Http1InitialRequest(headers, buf);
} catch (IOException e) {
System.err.println("TestServer: headers read: [ " + headers + " ]");
System.err.println(server.name + ": headers read: [ " + headers + " ]");
throw e;
}
}
@ -1335,7 +1340,7 @@ public class Http2TestServerConnection {
}
private void unexpectedFrame(Http2Frame frame) {
System.err.println("OOPS. Unexpected");
System.err.println(server.name + ": OOPS. Unexpected");
assert false;
}
@ -1375,19 +1380,41 @@ public class Http2TestServerConnection {
* @param amount
*/
public synchronized void obtainConnectionWindow(int amount) throws InterruptedException {
while (amount > 0) {
int n = Math.min(amount, sendWindow);
amount -= n;
sendWindow -= n;
if (amount > 0)
wait();
int demand = amount;
try {
int waited = 0;
while (amount > 0) {
int n = Math.min(amount, sendWindow);
amount -= n;
sendWindow -= n;
if (amount > 0) {
// Do not include this print line on a version that does not have
// JDK-8337395
System.err.printf("%s: blocked waiting for %s connection window, obtained %s%n",
server.name, amount, demand - amount);
waited++;
wait();
}
}
if (waited > 0) {
// Do not backport this print line on a version that does not have
// JDK-8337395
System.err.printf("%s: obtained %s connection window, remaining %s%n",
server.name, demand, sendWindow);
}
assert amount == 0;
} catch (Throwable t) {
sendWindow += (demand - amount);
throw t;
}
}
public void updateConnectionWindow(int amount) {
System.out.printf("sendWindow (window=%s, amount=%s) is now: %s%n",
sendWindow, amount, sendWindow + amount);
synchronized (this) {
// Do not backport this print line on a version that does not have
// JDK-8337395
System.err.printf(server.name + ": update sendWindow (window=%s, amount=%s) is now: %s%n",
sendWindow, amount, sendWindow + amount);
sendWindow += amount;
notifyAll();
}