diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java b/src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java index 717b41679d3..f4cfabacd8f 100644 --- a/src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java +++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java @@ -1008,6 +1008,7 @@ class Http2Connection { // This method is called when the HTTP/2 client is being // stopped. Do not call it from anywhere else. void closeAllStreams() { + if (debug.on()) debug.log("Close all streams"); for (var streamId : streams.keySet()) { // safe to call without locking - see Stream::deRegister decrementStreamsCount(streamId); diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java b/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java index 09df38c7178..c787959b069 100644 --- a/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java +++ b/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java @@ -273,6 +273,7 @@ class Stream extends ExchangeImpl { if (debug.on()) debug.log("nullBody: streamid=%d", streamid); // We should have an END_STREAM data frame waiting in the inputQ. // We need a subscriber to force the scheduler to process it. + assert pendingResponseSubscriber == null; pendingResponseSubscriber = HttpResponse.BodySubscribers.replacing(null); sched.runOrSchedule(); } @@ -472,8 +473,14 @@ class Stream extends ExchangeImpl { receiveDataFrame(new DataFrame(streamid, DataFrame.END_STREAM, List.of())); } } else if (frame instanceof DataFrame) { - if (cancelled) connection.dropDataFrame((DataFrame) frame); - else receiveDataFrame((DataFrame) frame); + if (cancelled) { + if (debug.on()) { + debug.log("request cancelled or stream closed: dropping data frame"); + } + connection.dropDataFrame((DataFrame) frame); + } else { + receiveDataFrame((DataFrame) frame); + } } else { if (!cancelled) otherFrame(frame); } @@ -1283,10 +1290,24 @@ class Stream extends ExchangeImpl { } } } + if (closing) { // true if the stream has not been closed yet - if (responseSubscriber != null || pendingResponseSubscriber != null) + if (responseSubscriber != null || pendingResponseSubscriber != null) { + if (debug.on()) + debug.log("stream %s closing due to %s", streamid, (Object)errorRef.get()); sched.runOrSchedule(); + } else { + if (debug.on()) + debug.log("stream %s closing due to %s before subscriber registered", + streamid, (Object)errorRef.get()); + } + } else { + if (debug.on()) { + debug.log("stream %s already closed due to %s", + streamid, (Object)errorRef.get()); + } } + completeResponseExceptionally(e); if (!requestBodyCF.isDone()) { requestBodyCF.completeExceptionally(errorRef.get()); // we may be sending the body.. @@ -1330,6 +1351,20 @@ class Stream extends ExchangeImpl { if (debug.on()) debug.log("close stream %d", streamid); Log.logTrace("Closing stream {0}", streamid); connection.closeStream(streamid); + var s = responseSubscriber == null + ? pendingResponseSubscriber + : responseSubscriber; + if (debug.on()) debug.log("subscriber is %s", s); + if (s instanceof Http2StreamResponseSubscriber sw) { + if (debug.on()) debug.log("closing response subscriber stream %s", streamid); + // if the subscriber has already completed, + // there is nothing to do... + if (!sw.completed()) { + // otherwise make sure it will be completed + var cause = errorRef.get(); + sw.complete(cause == null ? new IOException("stream closed") : cause); + } + } Log.logTrace("Stream {0} closed", streamid); } @@ -1554,10 +1589,12 @@ class Stream extends ExchangeImpl { super.complete(t); } } + @Override protected void onCancel() { unregisterResponseSubscriber(this); } + } private static final VarHandle STREAM_STATE; diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/common/HttpBodySubscriberWrapper.java b/src/java.net.http/share/classes/jdk/internal/net/http/common/HttpBodySubscriberWrapper.java index 78185bcbcfc..a896a8eb084 100644 --- a/src/java.net.http/share/classes/jdk/internal/net/http/common/HttpBodySubscriberWrapper.java +++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/HttpBodySubscriberWrapper.java @@ -30,7 +30,6 @@ import java.nio.ByteBuffer; import java.util.Comparator; import java.util.List; import java.util.Objects; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.Flow; import java.util.concurrent.Flow.Subscription; @@ -176,6 +175,14 @@ public class HttpBodySubscriberWrapper implements TrustedSubscriber { } } + /** + * {@return true if this subscriber has already completed, either normally + * or abnormally} + */ + public boolean completed() { + return completed.get(); + } + @Override public CompletionStage getBody() { return userSubscriber.getBody(); diff --git a/test/jdk/java/net/httpclient/CancelRequestTest.java b/test/jdk/java/net/httpclient/CancelRequestTest.java index 0768ddb7aab..c134f3cd99d 100644 --- a/test/jdk/java/net/httpclient/CancelRequestTest.java +++ b/test/jdk/java/net/httpclient/CancelRequestTest.java @@ -23,7 +23,7 @@ /* * @test - * @bug 8245462 8229822 8254786 8297075 8297149 + * @bug 8245462 8229822 8254786 8297075 8297149 8298340 * @summary Tests cancelling the request. * @library /test/lib http2/server * @key randomness