mirror of
https://github.com/openjdk/jdk.git
synced 2026-01-28 12:09:14 +00:00
1970 lines
79 KiB
Java
1970 lines
79 KiB
Java
/*
|
|
* Copyright (c) 2015, 2025, Oracle and/or its affiliates. All rights reserved.
|
|
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
|
|
*
|
|
* This code is free software; you can redistribute it and/or modify it
|
|
* under the terms of the GNU General Public License version 2 only, as
|
|
* published by the Free Software Foundation. Oracle designates this
|
|
* particular file as subject to the "Classpath" exception as provided
|
|
* by Oracle in the LICENSE file that accompanied this code.
|
|
*
|
|
* This code is distributed in the hope that it will be useful, but WITHOUT
|
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
|
|
* version 2 for more details (a copy is included in the LICENSE file that
|
|
* accompanied this code).
|
|
*
|
|
* You should have received a copy of the GNU General Public License version
|
|
* 2 along with this work; if not, write to the Free Software Foundation,
|
|
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
|
|
*
|
|
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
|
|
* or visit www.oracle.com if you need additional information or have any
|
|
* questions.
|
|
*/
|
|
|
|
package jdk.internal.net.http;
|
|
|
|
import java.io.EOFException;
|
|
import java.io.IOException;
|
|
import java.io.UncheckedIOException;
|
|
import java.lang.invoke.MethodHandles;
|
|
import java.lang.invoke.VarHandle;
|
|
import java.net.ProtocolException;
|
|
import java.net.URI;
|
|
import java.net.http.HttpResponse.BodyHandler;
|
|
import java.net.http.HttpResponse.ResponseInfo;
|
|
import java.nio.ByteBuffer;
|
|
import java.util.ArrayList;
|
|
import java.util.Collections;
|
|
import java.util.List;
|
|
import java.util.concurrent.CompletableFuture;
|
|
import java.util.concurrent.ConcurrentLinkedDeque;
|
|
import java.util.concurrent.ConcurrentLinkedQueue;
|
|
import java.util.concurrent.Executor;
|
|
import java.util.concurrent.Flow;
|
|
import java.util.concurrent.Flow.Subscription;
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
import java.util.concurrent.atomic.AtomicReference;
|
|
import java.util.concurrent.locks.Lock;
|
|
import java.util.concurrent.locks.ReentrantLock;
|
|
import java.util.function.BiPredicate;
|
|
import java.net.http.HttpClient;
|
|
import java.net.http.HttpHeaders;
|
|
import java.net.http.HttpRequest;
|
|
import java.net.http.HttpResponse;
|
|
import java.net.http.HttpResponse.BodySubscriber;
|
|
|
|
import jdk.internal.net.http.common.*;
|
|
import jdk.internal.net.http.frame.*;
|
|
import jdk.internal.net.http.hpack.DecodingCallback;
|
|
|
|
import static jdk.internal.net.http.Exchange.MAX_NON_FINAL_RESPONSES;
|
|
|
|
/**
|
|
* Http/2 Stream handling.
|
|
*
|
|
* REQUESTS
|
|
*
|
|
* sendHeadersOnly() -- assembles HEADERS frame and puts on connection outbound Q
|
|
*
|
|
* sendRequest() -- sendHeadersOnly() + sendBody()
|
|
*
|
|
* sendBodyAsync() -- calls sendBody() in an executor thread.
|
|
*
|
|
* sendHeadersAsync() -- calls sendHeadersOnly() which does not block
|
|
*
|
|
* sendRequestAsync() -- calls sendRequest() in an executor thread
|
|
*
|
|
* RESPONSES
|
|
*
|
|
* Multiple responses can be received per request. Responses are queued up on
|
|
* a LinkedList of CF<HttpResponse> and the first one on the list is completed
|
|
* with the next response
|
|
*
|
|
* getResponseAsync() -- queries list of response CFs and returns first one
|
|
* if one exists. Otherwise, creates one and adds it to list
|
|
* and returns it. Completion is achieved through the
|
|
* incoming() upcall from connection reader thread.
|
|
*
|
|
* getResponse() -- calls getResponseAsync() and waits for CF to complete
|
|
*
|
|
* responseBodyAsync() -- calls responseBody() in an executor thread.
|
|
*
|
|
* incoming() -- entry point called from connection reader thread. Frames are
|
|
* either handled immediately without blocking or for data frames
|
|
* placed on the stream's inputQ which is consumed by the stream's
|
|
* reader thread.
|
|
*
|
|
* PushedStream sub class
|
|
* ======================
|
|
* Sending side methods are not used because the request comes from a PUSH_PROMISE
|
|
* frame sent by the server. When a PUSH_PROMISE is received the PushedStream
|
|
* is created. PushedStream does not use responseCF list as there can be only
|
|
* one response. The CF is created when the object created and when the response
|
|
* HEADERS frame is received the object is completed.
|
|
*/
|
|
class Stream<T> extends ExchangeImpl<T> {
|
|
|
|
private static final String COOKIE_HEADER = "Cookie";
|
|
final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
|
|
|
|
final ConcurrentLinkedQueue<Http2Frame> inputQ = new ConcurrentLinkedQueue<>();
|
|
final SequentialScheduler sched =
|
|
SequentialScheduler.lockingScheduler(this::schedule);
|
|
final SubscriptionBase userSubscription =
|
|
new SubscriptionBase(sched, this::cancel, this::onSubscriptionError);
|
|
|
|
/**
|
|
* This stream's identifier. Assigned lazily by the HTTP2Connection before
|
|
* the stream's first frame is sent.
|
|
*/
|
|
protected volatile int streamid;
|
|
|
|
long requestContentLen;
|
|
|
|
final Http2Connection connection;
|
|
final HttpRequestImpl request;
|
|
final HeadersConsumer rspHeadersConsumer;
|
|
final HttpHeadersBuilder responseHeadersBuilder;
|
|
final HttpHeaders requestPseudoHeaders;
|
|
volatile HttpResponse.BodySubscriber<T> responseSubscriber;
|
|
final HttpRequest.BodyPublisher requestPublisher;
|
|
volatile RequestSubscriber requestSubscriber;
|
|
volatile int responseCode;
|
|
volatile Response response;
|
|
// The exception with which this stream was canceled.
|
|
private final AtomicReference<Throwable> errorRef = new AtomicReference<>();
|
|
final CompletableFuture<Void> requestBodyCF = new MinimalFuture<>();
|
|
volatile CompletableFuture<T> responseBodyCF;
|
|
volatile HttpResponse.BodySubscriber<T> pendingResponseSubscriber;
|
|
volatile boolean stopRequested;
|
|
|
|
/** True if END_STREAM has been seen in a frame received on this stream. */
|
|
private volatile boolean remotelyClosed;
|
|
private volatile boolean closed;
|
|
private volatile boolean endStreamSent;
|
|
private volatile boolean finalResponseCodeReceived;
|
|
private volatile boolean trailerReceived;
|
|
private AtomicInteger nonFinalResponseCount = new AtomicInteger();
|
|
|
|
// Indicates the first reason that was invoked when sending a ResetFrame
|
|
// to the server. A streamState of 0 indicates that no reset was sent.
|
|
// (see markStream(int code)
|
|
private volatile int streamState; // assigned using STREAM_STATE varhandle.
|
|
private volatile boolean deRegistered; // assigned using DEREGISTERED varhandle.
|
|
|
|
// state flags
|
|
private boolean requestSent, responseReceived;
|
|
|
|
// send lock: prevent sending DataFrames after reset occurred.
|
|
private final Lock sendLock = new ReentrantLock();
|
|
private final Lock stateLock = new ReentrantLock();
|
|
// inputQ lock: methods that take from the inputQ
|
|
// must not run concurrently.
|
|
private final Lock inputQLock = new ReentrantLock();
|
|
|
|
/**
|
|
* A reference to this Stream's connection Send Window controller. The
|
|
* stream MUST acquire the appropriate amount of Send Window before
|
|
* sending any data. Will be null for PushStreams, as they cannot send data.
|
|
*/
|
|
private final WindowController windowController;
|
|
private final WindowUpdateSender streamWindowUpdater;
|
|
|
|
// Only accessed in all method calls from incoming(), no need for volatile
|
|
private boolean endStreamSeen;
|
|
|
|
@Override
|
|
HttpConnection connection() {
|
|
return connection.connection;
|
|
}
|
|
|
|
/**
|
|
* Invoked either from incoming() -> {receiveDataFrame() or receiveResetFrame() }
|
|
* of after user subscription window has re-opened, from SubscriptionBase.request()
|
|
*/
|
|
private void schedule() {
|
|
boolean onCompleteCalled = false;
|
|
HttpResponse.BodySubscriber<T> subscriber = responseSubscriber;
|
|
// prevents drainInputQueue() from running concurrently
|
|
inputQLock.lock();
|
|
try {
|
|
if (subscriber == null) {
|
|
// pendingResponseSubscriber will be null until response headers have been received and
|
|
// readBodyAsync is called.
|
|
subscriber = responseSubscriber = pendingResponseSubscriber;
|
|
if (subscriber == null) {
|
|
// can't process anything yet
|
|
return;
|
|
}
|
|
if (debug.on()) debug.log("subscribing user subscriber");
|
|
subscriber.onSubscribe(userSubscription);
|
|
}
|
|
while (!inputQ.isEmpty() && errorRef.get() == null) {
|
|
Http2Frame frame = inputQ.peek();
|
|
if (frame instanceof ResetFrame rf) {
|
|
inputQ.remove();
|
|
if (endStreamReceived() && rf.getErrorCode() == ResetFrame.NO_ERROR) {
|
|
// If END_STREAM is already received, complete the requestBodyCF successfully
|
|
// and stop sending any request data.
|
|
requestBodyCF.complete(null);
|
|
} else {
|
|
handleReset(rf, subscriber);
|
|
}
|
|
return;
|
|
}
|
|
DataFrame df = (DataFrame) frame;
|
|
boolean finished = df.getFlag(DataFrame.END_STREAM);
|
|
|
|
List<ByteBuffer> buffers = df.getData();
|
|
List<ByteBuffer> dsts = Collections.unmodifiableList(buffers);
|
|
int size = Utils.remaining(dsts, Integer.MAX_VALUE);
|
|
if (size == 0 && finished) {
|
|
inputQ.remove();
|
|
// consumed will not be called
|
|
connection.releaseUnconsumed(df); // must update connection window
|
|
Log.logTrace("responseSubscriber.onComplete");
|
|
if (debug.on()) debug.log("incoming: onComplete");
|
|
connection.decrementStreamsCount(streamid);
|
|
subscriber.onComplete();
|
|
onCompleteCalled = true;
|
|
setEndStreamReceived();
|
|
return;
|
|
} else if (userSubscription.tryDecrement()) {
|
|
inputQ.remove();
|
|
Log.logTrace("responseSubscriber.onNext {0}", size);
|
|
if (debug.on()) debug.log("incoming: onNext(%d)", size);
|
|
try {
|
|
subscriber.onNext(dsts);
|
|
} catch (Throwable t) {
|
|
// Data frames that have been added to the inputQ
|
|
// must be released using releaseUnconsumed() to
|
|
// account for the amount of unprocessed bytes
|
|
// tracked by the connection.windowUpdater.
|
|
connection.releaseUnconsumed(df);
|
|
throw t;
|
|
}
|
|
if (consumed(df)) {
|
|
Log.logTrace("responseSubscriber.onComplete");
|
|
if (debug.on()) debug.log("incoming: onComplete");
|
|
connection.decrementStreamsCount(streamid);
|
|
subscriber.onComplete();
|
|
onCompleteCalled = true;
|
|
setEndStreamReceived();
|
|
return;
|
|
}
|
|
} else {
|
|
if (stopRequested) break;
|
|
return;
|
|
}
|
|
}
|
|
} catch (Throwable throwable) {
|
|
errorRef.compareAndSet(null, throwable);
|
|
} finally {
|
|
inputQLock.unlock();
|
|
if (sched.isStopped()) drainInputQueue();
|
|
}
|
|
|
|
Throwable t = errorRef.get();
|
|
if (t != null) {
|
|
sched.stop();
|
|
try {
|
|
if (!onCompleteCalled) {
|
|
if (debug.on())
|
|
debug.log("calling subscriber.onError: %s", (Object) t);
|
|
subscriber.onError(t);
|
|
} else {
|
|
if (debug.on())
|
|
debug.log("already completed: dropping error %s", (Object) t);
|
|
}
|
|
} catch (Throwable x) {
|
|
Log.logError("Subscriber::onError threw exception: {0}", t);
|
|
} finally {
|
|
// cancelImpl will eventually call drainInputQueue();
|
|
cancelImpl(t);
|
|
}
|
|
}
|
|
}
|
|
|
|
// Called from the scheduler schedule() loop,
|
|
// or after resetting the stream.
|
|
// Ensures that all received data frames are accounted for
|
|
// in the connection window flow control if the scheduler
|
|
// is stopped before all the data is consumed.
|
|
// The inputQLock is used to prevent concurrently taking
|
|
// from the queue.
|
|
private void drainInputQueue() {
|
|
Http2Frame frame;
|
|
// will wait until schedule() has finished taking
|
|
// from the queue, if needed.
|
|
inputQLock.lock();
|
|
try {
|
|
while ((frame = inputQ.poll()) != null) {
|
|
if (frame instanceof DataFrame df) {
|
|
// Data frames that have been added to the inputQ
|
|
// must be released using releaseUnconsumed() to
|
|
// account for the amount of unprocessed bytes
|
|
// tracked by the connection.windowUpdater.
|
|
connection.releaseUnconsumed(df);
|
|
}
|
|
}
|
|
} finally {
|
|
inputQLock.unlock();
|
|
}
|
|
}
|
|
|
|
@Override
|
|
void nullBody(HttpResponse<T> resp, Throwable t) {
|
|
if (debug.on()) debug.log("nullBody: streamid=%d", streamid);
|
|
// We should have an END_STREAM data frame waiting in the inputQ.
|
|
// We need a subscriber to force the scheduler to process it.
|
|
assert pendingResponseSubscriber == null;
|
|
pendingResponseSubscriber = HttpResponse.BodySubscribers.replacing(null);
|
|
sched.runOrSchedule();
|
|
}
|
|
|
|
// Callback invoked after the Response BodySubscriber has consumed the
|
|
// buffers contained in a DataFrame.
|
|
// Returns true if END_STREAM is reached, false otherwise.
|
|
private boolean consumed(DataFrame df) {
|
|
// RFC 7540 6.1:
|
|
// The entire DATA frame payload is included in flow control,
|
|
// including the Pad Length and Padding fields if present
|
|
int len = df.payloadLength();
|
|
boolean endStream = df.getFlag(DataFrame.END_STREAM);
|
|
if (len == 0) return endStream;
|
|
|
|
connection.windowUpdater.processed(len);
|
|
if (!endStream) {
|
|
streamWindowUpdater.processed(len);
|
|
} else {
|
|
// Don't send window update on a stream which is
|
|
// closed or half closed.
|
|
streamWindowUpdater.released(len);
|
|
}
|
|
|
|
// true: end of stream; false: more data coming
|
|
return endStream;
|
|
}
|
|
|
|
@Override
|
|
void expectContinueFailed(int rcode) {
|
|
// Have to mark request as sent, due to no request body being sent in the
|
|
// event of a 417 Expectation Failed or some other non 100 response code
|
|
requestSent();
|
|
}
|
|
|
|
// This method is called by Http2Connection::decrementStreamCount in order
|
|
// to make sure that the stream count is decremented only once for
|
|
// a given stream.
|
|
boolean deRegister() {
|
|
return DEREGISTERED.compareAndSet(this, false, true);
|
|
}
|
|
|
|
@Override
|
|
CompletableFuture<T> readBodyAsync(HttpResponse.BodyHandler<T> handler,
|
|
boolean returnConnectionToPool,
|
|
Executor executor)
|
|
{
|
|
try {
|
|
Log.logTrace("Reading body on stream {0}", streamid);
|
|
debug.log("Getting BodySubscriber for: " + response);
|
|
Http2StreamResponseSubscriber<T> bodySubscriber =
|
|
createResponseSubscriber(handler, new ResponseInfoImpl(response));
|
|
CompletableFuture<T> cf = receiveData(bodySubscriber, executor);
|
|
|
|
PushGroup<?> pg = exchange.getPushGroup();
|
|
if (pg != null) {
|
|
// if an error occurs make sure it is recorded in the PushGroup
|
|
cf = cf.whenComplete((t, e) -> pg.pushError(e));
|
|
}
|
|
return cf;
|
|
} catch (Throwable t) {
|
|
// may be thrown by handler.apply
|
|
cancelImpl(t);
|
|
return MinimalFuture.failedFuture(t);
|
|
}
|
|
}
|
|
|
|
@Override
|
|
Http2StreamResponseSubscriber<T> createResponseSubscriber(BodyHandler<T> handler, ResponseInfo response) {
|
|
Http2StreamResponseSubscriber<T> subscriber =
|
|
new Http2StreamResponseSubscriber<>(handler.apply(response));
|
|
return subscriber;
|
|
}
|
|
|
|
// The Http2StreamResponseSubscriber is registered with the HttpClient
|
|
// to ensure that it gets completed if the SelectorManager aborts due
|
|
// to unexpected exceptions.
|
|
private boolean registerResponseSubscriber(Http2StreamResponseSubscriber<?> subscriber) {
|
|
return client().registerSubscriber(subscriber);
|
|
}
|
|
|
|
private boolean unregisterResponseSubscriber(Http2StreamResponseSubscriber<?> subscriber) {
|
|
return client().unregisterSubscriber(subscriber);
|
|
}
|
|
|
|
@Override
|
|
public String toString() {
|
|
return "streamid: " + streamid;
|
|
}
|
|
|
|
private void receiveDataFrame(DataFrame df) {
|
|
try {
|
|
int len = df.payloadLength();
|
|
if (len > 0) {
|
|
// we return from here if the connection is being closed.
|
|
if (!connection.windowUpdater.canBufferUnprocessedBytes(len)) return;
|
|
// we return from here if the stream is being closed.
|
|
if (closed || !streamWindowUpdater.canBufferUnprocessedBytes(len)) {
|
|
connection.releaseUnconsumed(df);
|
|
return;
|
|
}
|
|
}
|
|
pushDataFrame(len, df);
|
|
} finally {
|
|
sched.runOrSchedule();
|
|
}
|
|
}
|
|
|
|
// Ensures that no data frame is pushed on the inputQ
|
|
// after the stream is closed.
|
|
// Changes to the `closed` boolean are guarded by the
|
|
// stateLock. Contention should be low as only one
|
|
// thread at a time adds to the inputQ, and
|
|
// we can only contend when closing the stream.
|
|
// Note that this method can run concurrently with
|
|
// methods holding the inputQLock: that is OK.
|
|
// The inputQLock is there to ensure that methods
|
|
// taking from the queue are not running concurrently
|
|
// with each others, but concurrently adding at the
|
|
// end of the queue while peeking/polling at the head
|
|
// is OK.
|
|
private void pushDataFrame(int len, DataFrame df) {
|
|
boolean closed = false;
|
|
stateLock.lock();
|
|
try {
|
|
if (!(closed = this.closed)) {
|
|
inputQ.add(df);
|
|
}
|
|
} finally {
|
|
stateLock.unlock();
|
|
}
|
|
if (closed && len > 0) connection.releaseUnconsumed(df);
|
|
}
|
|
|
|
/** Handles a RESET frame. RESET is always handled inline in the queue. */
|
|
private void receiveResetFrame(ResetFrame frame) {
|
|
inputQ.add(frame);
|
|
sched.runOrSchedule();
|
|
}
|
|
|
|
/**
|
|
* Records the first reason which was invoked when sending a ResetFrame
|
|
* to the server in the streamState, and return the previous value
|
|
* of the streamState. This is an atomic operation.
|
|
* A possible use of this method would be to send a ResetFrame only
|
|
* if no previous reset frame has been sent.
|
|
* For instance: <pre>{@code
|
|
* if (markStream(ResetFrame.CANCEL) == 0) {
|
|
* connection.sendResetFrame(streamId, ResetFrame.CANCEL);
|
|
* }
|
|
* }</pre>
|
|
* @param code the reason code as per HTTP/2 protocol
|
|
* @return the previous value of the stream state.
|
|
*/
|
|
int markStream(int code) {
|
|
if (code == 0) return streamState;
|
|
sendLock.lock();
|
|
try {
|
|
return (int) STREAM_STATE.compareAndExchange(this, 0, code);
|
|
} finally {
|
|
sendLock.unlock();
|
|
}
|
|
}
|
|
|
|
private void sendDataFrame(DataFrame frame) {
|
|
sendLock.lock();
|
|
try {
|
|
// must not send DataFrame after reset.
|
|
if (streamState == 0) {
|
|
connection.sendDataFrame(frame);
|
|
}
|
|
} finally {
|
|
sendLock.unlock();
|
|
}
|
|
}
|
|
|
|
// pushes entire response body into response subscriber
|
|
// blocking when required by local or remote flow control
|
|
CompletableFuture<T> receiveData(BodySubscriber<T> bodySubscriber, Executor executor) {
|
|
// ensure that the body subscriber will be subscribed and onError() is
|
|
// invoked
|
|
pendingResponseSubscriber = bodySubscriber;
|
|
|
|
// We want to allow the subscriber's getBody() method to block so it
|
|
// can work with InputStreams. So, we offload execution.
|
|
responseBodyCF = ResponseSubscribers.getBodyAsync(executor, bodySubscriber,
|
|
new MinimalFuture<>(), this::cancelImpl);
|
|
|
|
if (isCanceled()) {
|
|
Throwable t = getCancelCause();
|
|
responseBodyCF.completeExceptionally(t);
|
|
}
|
|
|
|
sched.runOrSchedule(); // in case data waiting already to be processed, or error
|
|
|
|
return responseBodyCF;
|
|
}
|
|
|
|
@Override
|
|
CompletableFuture<ExchangeImpl<T>> sendBodyAsync() {
|
|
return sendBodyImpl().thenApply( v -> this);
|
|
}
|
|
|
|
Stream(Http2Connection connection,
|
|
Exchange<T> e,
|
|
WindowController windowController)
|
|
{
|
|
super(e);
|
|
this.connection = connection;
|
|
this.windowController = windowController;
|
|
this.request = e.request();
|
|
this.requestPublisher = request.requestPublisher; // may be null
|
|
this.responseHeadersBuilder = new HttpHeadersBuilder();
|
|
this.rspHeadersConsumer = new HeadersConsumer();
|
|
this.requestPseudoHeaders = createPseudoHeaders(request);
|
|
this.streamWindowUpdater = new StreamWindowUpdateSender(connection);
|
|
}
|
|
|
|
private boolean checkRequestCancelled() {
|
|
if (exchange.multi.requestCancelled()) {
|
|
if (errorRef.get() == null) cancel();
|
|
else sendResetStreamFrame(ResetFrame.CANCEL);
|
|
return true;
|
|
}
|
|
return false;
|
|
}
|
|
|
|
/**
|
|
* Entry point from Http2Connection reader thread.
|
|
*
|
|
* Data frames will be removed by response body thread.
|
|
*/
|
|
void incoming(Http2Frame frame) throws IOException {
|
|
if (debug.on()) debug.log("incoming: %s", frame);
|
|
var cancelled = checkRequestCancelled() || closed;
|
|
if ((frame instanceof HeaderFrame hf)) {
|
|
if (hf.endHeaders()) {
|
|
Log.logTrace("handling response (streamid={0})", streamid);
|
|
handleResponse(hf);
|
|
}
|
|
if (hf.getFlag(HeaderFrame.END_STREAM)) {
|
|
endStreamSeen = true;
|
|
if (debug.on()) debug.log("handling END_STREAM: %d", streamid);
|
|
receiveDataFrame(new DataFrame(streamid, DataFrame.END_STREAM, List.of()));
|
|
}
|
|
} else if (frame instanceof DataFrame df) {
|
|
if (df.getFlag(DataFrame.END_STREAM)) endStreamSeen = true;
|
|
if (cancelled) {
|
|
if (debug.on()) {
|
|
debug.log("request cancelled or stream closed: dropping data frame");
|
|
}
|
|
// Data frames that have not been added to the inputQ
|
|
// can be released using dropDataFrame
|
|
connection.dropDataFrame(df);
|
|
} else {
|
|
receiveDataFrame(df);
|
|
}
|
|
} else {
|
|
if (!cancelled) otherFrame(frame);
|
|
}
|
|
}
|
|
|
|
void otherFrame(Http2Frame frame) throws IOException {
|
|
switch (frame.type()) {
|
|
case WindowUpdateFrame.TYPE -> incoming_windowUpdate((WindowUpdateFrame) frame);
|
|
case ResetFrame.TYPE -> incoming_reset((ResetFrame) frame);
|
|
case PriorityFrame.TYPE -> incoming_priority((PriorityFrame) frame);
|
|
|
|
default -> throw new IOException("Unexpected frame: " + frame);
|
|
}
|
|
}
|
|
|
|
// The Hpack decoder decodes into one of these consumers of name,value pairs
|
|
|
|
DecodingCallback rspHeadersConsumer() {
|
|
return rspHeadersConsumer;
|
|
}
|
|
|
|
String checkInterimResponseCountExceeded() {
|
|
// this is also checked by Exchange - but tracking it here too provides
|
|
// a more informative message.
|
|
int count = nonFinalResponseCount.incrementAndGet();
|
|
if (MAX_NON_FINAL_RESPONSES > 0 && (count < 0 || count > MAX_NON_FINAL_RESPONSES)) {
|
|
return String.format(
|
|
"Stream %s PROTOCOL_ERROR: too many interim responses received: %s > %s",
|
|
streamid, count, MAX_NON_FINAL_RESPONSES);
|
|
}
|
|
return null;
|
|
}
|
|
|
|
protected void handleResponse(HeaderFrame hf) throws IOException {
|
|
HttpHeaders responseHeaders = responseHeadersBuilder.build();
|
|
|
|
if (!finalResponseCodeReceived) {
|
|
try {
|
|
responseCode = (int) responseHeaders
|
|
.firstValueAsLong(":status")
|
|
.orElseThrow(() -> new ProtocolException(String.format(
|
|
"Stream %s PROTOCOL_ERROR: no status code in response",
|
|
streamid)));
|
|
} catch (ProtocolException cause) {
|
|
cancelImpl(cause, ResetFrame.PROTOCOL_ERROR);
|
|
rspHeadersConsumer.reset();
|
|
return;
|
|
}
|
|
|
|
String protocolErrorMsg = null;
|
|
// If informational code, response is partially complete
|
|
if (responseCode < 100 || responseCode > 199) {
|
|
this.finalResponseCodeReceived = true;
|
|
} else if (hf.getFlag(HeaderFrame.END_STREAM)) {
|
|
// see RFC 9113 section 8.1:
|
|
// A HEADERS frame with the END_STREAM flag set that carries an
|
|
// informational status code is malformed
|
|
protocolErrorMsg = String.format(
|
|
"Stream %s PROTOCOL_ERROR: " +
|
|
"HEADERS frame with status %s has END_STREAM flag set",
|
|
streamid, responseCode);
|
|
} else {
|
|
protocolErrorMsg = checkInterimResponseCountExceeded();
|
|
}
|
|
|
|
if (protocolErrorMsg != null) {
|
|
if (debug.on()) {
|
|
debug.log(protocolErrorMsg);
|
|
}
|
|
cancelImpl(new ProtocolException(protocolErrorMsg), ResetFrame.PROTOCOL_ERROR);
|
|
rspHeadersConsumer.reset();
|
|
return;
|
|
}
|
|
|
|
response = new Response(
|
|
request, exchange, responseHeaders, connection(),
|
|
responseCode, HttpClient.Version.HTTP_2);
|
|
|
|
/* TODO: review if needs to be removed
|
|
the value is not used, but in case `content-length` doesn't parse as
|
|
long, there will be NumberFormatException. If left as is, make sure
|
|
code up the stack handles NFE correctly. */
|
|
responseHeaders.firstValueAsLong("content-length");
|
|
|
|
if (Log.headers()) {
|
|
StringBuilder sb = new StringBuilder("RESPONSE HEADERS (streamid=%s):\n".formatted(streamid));
|
|
sb.append(" %s %s %s\n".formatted(request.method(), request.uri(), responseCode));
|
|
Log.dumpHeaders(sb, " ", responseHeaders);
|
|
Log.logHeaders(sb.toString());
|
|
}
|
|
|
|
// this will clear the response headers
|
|
rspHeadersConsumer.reset();
|
|
|
|
completeResponse(response);
|
|
} else {
|
|
if (Log.headers()) {
|
|
StringBuilder sb = new StringBuilder("TRAILING HEADERS (streamid=%s):\n".formatted(streamid));
|
|
Log.dumpHeaders(sb, " ", responseHeaders);
|
|
Log.logHeaders(sb.toString());
|
|
}
|
|
if (trailerReceived) {
|
|
String protocolErrorMsg = String.format(
|
|
"Stream %s PROTOCOL_ERROR: trailers already received", streamid);
|
|
if (debug.on()) {
|
|
debug.log(protocolErrorMsg);
|
|
}
|
|
cancelImpl(new ProtocolException(protocolErrorMsg), ResetFrame.PROTOCOL_ERROR);
|
|
}
|
|
trailerReceived = true;
|
|
rspHeadersConsumer.reset();
|
|
}
|
|
|
|
}
|
|
|
|
void incoming_reset(ResetFrame frame) {
|
|
Log.logTrace("Received RST_STREAM on stream {0}", streamid);
|
|
// responseSubscriber will be null if readBodyAsync has not yet been called
|
|
Flow.Subscriber<?> subscriber = responseSubscriber;
|
|
if (subscriber == null) subscriber = pendingResponseSubscriber;
|
|
// See RFC 9113 sec 5.1 Figure 2, life-cycle of a stream
|
|
if (endStreamReceived() && requestBodyCF.isDone()) {
|
|
// Stream is in a half closed or fully closed state, the RST_STREAM is ignored and logged.
|
|
Log.logTrace("Ignoring RST_STREAM frame received on remotely closed stream {0}", streamid);
|
|
} else if (closed) {
|
|
// Stream is in a fully closed state, the RST_STREAM is ignored and logged.
|
|
Log.logTrace("Ignoring RST_STREAM frame received on closed stream {0}", streamid);
|
|
} else if (subscriber == null && !endStreamSeen) {
|
|
// subscriber is null and the reader has not seen an END_STREAM flag, handle reset immediately
|
|
handleReset(frame, null);
|
|
} else if (!requestBodyCF.isDone()) {
|
|
// Not done sending the body, complete exceptionally or normally based on RST_STREAM error code
|
|
incompleteRequestBodyReset(frame, subscriber);
|
|
} else if (response == null || !finalResponseCodeReceived) {
|
|
// Complete response has not been received, handle reset immediately
|
|
handleReset(frame, null);
|
|
} else {
|
|
// Put ResetFrame into inputQ. Any frames already in the queue will be processed before the ResetFrame.
|
|
receiveResetFrame(frame);
|
|
Log.logTrace("RST_STREAM pushed in queue for stream {0}", streamid);
|
|
}
|
|
}
|
|
|
|
void incompleteRequestBodyReset(ResetFrame frame, Flow.Subscriber<?> subscriber) {
|
|
if (frame.getErrorCode() != ResetFrame.NO_ERROR) {
|
|
if (debug.on()) {
|
|
debug.log("completing requestBodyCF exceptionally due to received" +
|
|
" RESET(%s) (stream=%s)", frame.getErrorCode(), streamid);
|
|
}
|
|
var exception = new IOException("RST_STREAM received " +
|
|
ResetFrame.stringForCode(frame.getErrorCode()));
|
|
requestBodyCF.completeExceptionally(exception);
|
|
cancelImpl(exception, frame.getErrorCode());
|
|
} else {
|
|
if (debug.on()) {
|
|
debug.log("completing requestBodyCF normally due to received" +
|
|
" RESET(NO_ERROR) (stream=%s)", streamid);
|
|
}
|
|
if (!endStreamSeen || !finalResponseCodeReceived) {
|
|
// If no END_STREAM flag seen or the final response code has not been received, any RST_STREAM
|
|
// should be handled here immediately
|
|
handleReset(frame, subscriber);
|
|
} else {
|
|
requestBodyCF.complete(null);
|
|
}
|
|
}
|
|
}
|
|
|
|
void handleReset(ResetFrame frame, Flow.Subscriber<?> subscriber) {
|
|
Log.logTrace("Handling RST_STREAM on stream {0}", streamid);
|
|
if (!closed) {
|
|
stateLock.lock();
|
|
try {
|
|
if (closed) {
|
|
if (debug.on()) debug.log("Stream already closed: ignoring RESET");
|
|
return;
|
|
}
|
|
closed = true;
|
|
} finally {
|
|
stateLock.unlock();
|
|
}
|
|
try {
|
|
final int error = frame.getErrorCode();
|
|
// A REFUSED_STREAM error code implies that the stream wasn't processed by the
|
|
// peer and the client is free to retry the request afresh.
|
|
if (error == ErrorFrame.REFUSED_STREAM) {
|
|
// Here we arrange for the request to be retried. Note that we don't call
|
|
// closeAsUnprocessed() method here because the "closed" state is already set
|
|
// to true a few lines above and calling close() from within
|
|
// closeAsUnprocessed() will end up being a no-op. We instead do the additional
|
|
// bookkeeping here.
|
|
markUnprocessedByPeer();
|
|
errorRef.compareAndSet(null, new IOException("request not processed by peer"));
|
|
if (debug.on()) {
|
|
debug.log("request unprocessed by peer (REFUSED_STREAM) " + this.request);
|
|
}
|
|
} else {
|
|
final String reason = ErrorFrame.stringForCode(error);
|
|
final IOException failureCause = new IOException("Received RST_STREAM: " + reason);
|
|
if (debug.on()) {
|
|
debug.log(streamid + " received RST_STREAM with code: " + reason);
|
|
}
|
|
if (errorRef.compareAndSet(null, failureCause)) {
|
|
if (subscriber != null) {
|
|
subscriber.onError(failureCause);
|
|
}
|
|
}
|
|
}
|
|
final Throwable failureCause = errorRef.get();
|
|
completeResponseExceptionally(failureCause);
|
|
if (!requestBodyCF.isDone()) {
|
|
requestBodyCF.completeExceptionally(failureCause); // we may be sending the body..
|
|
}
|
|
if (responseBodyCF != null) {
|
|
responseBodyCF.completeExceptionally(failureCause);
|
|
}
|
|
} finally {
|
|
connection.decrementStreamsCount(streamid);
|
|
connection.closeStream(streamid);
|
|
}
|
|
} else {
|
|
Log.logTrace("Ignoring RST_STREAM frame received on closed stream {0}", streamid);
|
|
}
|
|
}
|
|
|
|
void incoming_priority(PriorityFrame frame) {
|
|
// TODO: implement priority
|
|
throw new UnsupportedOperationException("Not implemented");
|
|
}
|
|
|
|
private void incoming_windowUpdate(WindowUpdateFrame frame)
|
|
throws IOException
|
|
{
|
|
int amount = frame.getUpdate();
|
|
if (amount <= 0) {
|
|
Log.logTrace("Resetting stream: {0}, Window Update amount: {1}",
|
|
streamid, amount);
|
|
connection.resetStream(streamid, ResetFrame.FLOW_CONTROL_ERROR);
|
|
} else {
|
|
assert streamid != 0;
|
|
boolean success = windowController.increaseStreamWindow(amount, streamid);
|
|
if (!success) { // overflow
|
|
connection.resetStream(streamid, ResetFrame.FLOW_CONTROL_ERROR);
|
|
}
|
|
}
|
|
}
|
|
|
|
void incoming_pushPromise(HttpRequestImpl pushRequest,
|
|
PushedStream<T> pushStream)
|
|
throws IOException
|
|
{
|
|
if (Log.requests()) {
|
|
Log.logRequest("PUSH_PROMISE: " + pushRequest.toString());
|
|
}
|
|
PushGroup<T> pushGroup = exchange.getPushGroup();
|
|
if (pushGroup == null || exchange.multi.requestCancelled()) {
|
|
Log.logTrace("Rejecting push promise stream " + streamid);
|
|
connection.resetStream(pushStream.streamid, ResetFrame.REFUSED_STREAM);
|
|
pushStream.close();
|
|
return;
|
|
}
|
|
|
|
PushGroup.Acceptor<T> acceptor = null;
|
|
boolean accepted = false;
|
|
try {
|
|
acceptor = pushGroup.acceptPushRequest(pushRequest);
|
|
accepted = acceptor.accepted();
|
|
} catch (Throwable t) {
|
|
if (debug.on())
|
|
debug.log("PushPromiseHandler::applyPushPromise threw exception %s",
|
|
(Object)t);
|
|
}
|
|
if (!accepted) {
|
|
// cancel / reject
|
|
IOException ex = new IOException("Stream " + streamid + " cancelled by users handler");
|
|
if (Log.trace()) {
|
|
Log.logTrace("No body subscriber for {0}: {1}", pushRequest,
|
|
ex.getMessage());
|
|
}
|
|
pushStream.cancelImpl(ex);
|
|
return;
|
|
}
|
|
|
|
assert accepted && acceptor != null;
|
|
CompletableFuture<HttpResponse<T>> pushResponseCF = acceptor.cf();
|
|
HttpResponse.BodyHandler<T> pushHandler = acceptor.bodyHandler();
|
|
assert pushHandler != null;
|
|
|
|
pushStream.requestSent();
|
|
pushStream.setPushHandler(pushHandler); // TODO: could wrap the handler to throw on acceptPushPromise ?
|
|
// setup housekeeping for when the push is received
|
|
// TODO: deal with ignoring of CF anti-pattern
|
|
CompletableFuture<HttpResponse<T>> cf = pushStream.responseCF();
|
|
cf.whenComplete((HttpResponse<T> resp, Throwable t) -> {
|
|
t = Utils.getCompletionCause(t);
|
|
if (Log.trace()) {
|
|
Log.logTrace("Push completed on stream {0} for {1}{2}",
|
|
pushStream.streamid, resp,
|
|
((t==null) ? "": " with exception " + t));
|
|
}
|
|
if (t != null) {
|
|
pushGroup.pushError(t);
|
|
pushResponseCF.completeExceptionally(t);
|
|
} else {
|
|
pushResponseCF.complete(resp);
|
|
}
|
|
pushGroup.pushCompleted();
|
|
});
|
|
|
|
}
|
|
|
|
private OutgoingHeaders<Stream<T>> headerFrame(long contentLength) {
|
|
HttpHeadersBuilder h = request.getSystemHeadersBuilder();
|
|
if (contentLength > 0) {
|
|
h.setHeader("content-length", Long.toString(contentLength));
|
|
}
|
|
HttpHeaders sysh = filterHeaders(h.build());
|
|
HttpHeaders userh = filterHeaders(request.getUserHeaders());
|
|
// Filter context restricted from userHeaders
|
|
userh = HttpHeaders.of(userh.map(), Utils.ACCEPT_ALL);
|
|
Utils.setUserAuthFlags(request, userh);
|
|
|
|
// Don't override Cookie values that have been set by the CookieHandler.
|
|
final HttpHeaders uh = userh;
|
|
BiPredicate<String, String> overrides =
|
|
(k, v) -> COOKIE_HEADER.equalsIgnoreCase(k)
|
|
|| uh.firstValue(k).isEmpty();
|
|
|
|
// Filter any headers from systemHeaders that are set in userHeaders
|
|
// except for "Cookie:" - user cookies will be appended to system
|
|
// cookies
|
|
sysh = HttpHeaders.of(sysh.map(), overrides);
|
|
|
|
OutgoingHeaders<Stream<T>> f = new OutgoingHeaders<>(sysh, userh, this);
|
|
if (contentLength == 0) {
|
|
f.setFlag(HeadersFrame.END_STREAM);
|
|
endStreamSent = true;
|
|
}
|
|
return f;
|
|
}
|
|
|
|
private boolean hasProxyAuthorization(HttpHeaders headers) {
|
|
return headers.firstValue("proxy-authorization")
|
|
.isPresent();
|
|
}
|
|
|
|
// Determines whether we need to build a new HttpHeader object.
|
|
//
|
|
// Ideally we should pass the filter to OutgoingHeaders refactor the
|
|
// code that creates the HeaderFrame to honor the filter.
|
|
// We're not there yet - so depending on the filter we need to
|
|
// apply and the content of the header we will try to determine
|
|
// whether anything might need to be filtered.
|
|
// If nothing needs filtering then we can just use the
|
|
// original headers.
|
|
private boolean needsFiltering(HttpHeaders headers,
|
|
BiPredicate<String, String> filter) {
|
|
if (filter == Utils.PROXY_TUNNEL_FILTER || filter == Utils.PROXY_FILTER) {
|
|
// we're either connecting or proxying
|
|
// slight optimization: we only need to filter out
|
|
// disabled schemes, so if there are none just
|
|
// pass through.
|
|
return Utils.proxyHasDisabledSchemes(filter == Utils.PROXY_TUNNEL_FILTER)
|
|
&& hasProxyAuthorization(headers);
|
|
} else {
|
|
// we're talking to a server, either directly or through
|
|
// a tunnel.
|
|
// Slight optimization: we only need to filter out
|
|
// proxy authorization headers, so if there are none just
|
|
// pass through.
|
|
return hasProxyAuthorization(headers);
|
|
}
|
|
}
|
|
|
|
private HttpHeaders filterHeaders(HttpHeaders headers) {
|
|
HttpConnection conn = connection();
|
|
BiPredicate<String, String> filter = conn.headerFilter(request);
|
|
if (needsFiltering(headers, filter)) {
|
|
return HttpHeaders.of(headers.map(), filter);
|
|
}
|
|
return headers;
|
|
}
|
|
|
|
private static HttpHeaders createPseudoHeaders(HttpRequest request) {
|
|
HttpHeadersBuilder hdrs = new HttpHeadersBuilder();
|
|
String method = request.method();
|
|
hdrs.setHeader(":method", method);
|
|
URI uri = request.uri();
|
|
hdrs.setHeader(":scheme", uri.getScheme());
|
|
String host = uri.getHost();
|
|
int port = uri.getPort();
|
|
assert host != null;
|
|
if (port != -1) {
|
|
hdrs.setHeader(":authority", host + ":" + port);
|
|
} else {
|
|
hdrs.setHeader(":authority", host);
|
|
}
|
|
String query = uri.getRawQuery();
|
|
String path = uri.getRawPath();
|
|
if (path == null || path.isEmpty()) {
|
|
if (method.equalsIgnoreCase("OPTIONS")) {
|
|
path = "*";
|
|
} else {
|
|
path = "/";
|
|
}
|
|
}
|
|
if (query != null) {
|
|
path += "?" + query;
|
|
}
|
|
hdrs.setHeader(":path", Utils.encode(path));
|
|
return hdrs.build();
|
|
}
|
|
|
|
HttpHeaders getRequestPseudoHeaders() {
|
|
return requestPseudoHeaders;
|
|
}
|
|
|
|
/** Sets endStreamReceived. Should be called only once. */
|
|
void setEndStreamReceived() {
|
|
if (debug.on()) debug.log("setEndStreamReceived: streamid=%d", streamid);
|
|
assert remotelyClosed == false: "Unexpected endStream already set";
|
|
remotelyClosed = true;
|
|
responseReceived();
|
|
}
|
|
|
|
/** Tells whether, or not, the END_STREAM Flag has been seen in any frame
|
|
* received on this stream. */
|
|
private boolean endStreamReceived() {
|
|
return remotelyClosed;
|
|
}
|
|
|
|
@Override
|
|
CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() {
|
|
if (debug.on()) debug.log("sendHeadersOnly()");
|
|
if (Log.requests() && request != null) {
|
|
Log.logRequest(request.toString());
|
|
}
|
|
if (requestPublisher != null) {
|
|
requestContentLen = requestPublisher.contentLength();
|
|
} else {
|
|
requestContentLen = 0;
|
|
}
|
|
|
|
// At this point the stream doesn't have a streamid yet.
|
|
// It will be allocated if we send the request headers.
|
|
Throwable t = errorRef.get();
|
|
if (t != null) {
|
|
if (debug.on()) debug.log("stream already cancelled, headers not sent: %s", (Object)t);
|
|
return MinimalFuture.failedFuture(t);
|
|
}
|
|
|
|
// sending the headers will cause the allocation of the stream id
|
|
OutgoingHeaders<Stream<T>> f = headerFrame(requestContentLen);
|
|
connection.sendFrame(f);
|
|
CompletableFuture<ExchangeImpl<T>> cf = new MinimalFuture<>();
|
|
cf.complete(this); // #### good enough for now
|
|
return cf;
|
|
}
|
|
|
|
@Override
|
|
void released() {
|
|
if (streamid > 0) {
|
|
if (debug.on()) debug.log("Released stream %d", streamid);
|
|
// remove this stream from the Http2Connection map.
|
|
connection.decrementStreamsCount(streamid);
|
|
connection.closeStream(streamid);
|
|
} else {
|
|
if (debug.on()) debug.log("Can't release stream %d", streamid);
|
|
}
|
|
}
|
|
|
|
@Override
|
|
void completed() {
|
|
// There should be nothing to do here: the stream should have
|
|
// been already closed (or will be closed shortly after).
|
|
}
|
|
|
|
boolean registerStream(int id, boolean registerIfCancelled) {
|
|
boolean cancelled = closed || exchange.multi.requestCancelled();
|
|
if (!cancelled || registerIfCancelled) {
|
|
this.streamid = id;
|
|
connection.putStream(this, streamid);
|
|
if (debug.on()) {
|
|
debug.log("Stream %d registered (cancelled: %b, registerIfCancelled: %b)",
|
|
streamid, cancelled, registerIfCancelled);
|
|
}
|
|
}
|
|
return !cancelled;
|
|
}
|
|
|
|
void signalWindowUpdate() {
|
|
RequestSubscriber subscriber = requestSubscriber;
|
|
assert subscriber != null;
|
|
if (debug.on()) debug.log("Signalling window update");
|
|
subscriber.sendScheduler.runOrSchedule();
|
|
}
|
|
|
|
static final ByteBuffer COMPLETED = ByteBuffer.allocate(0);
|
|
class RequestSubscriber implements Flow.Subscriber<ByteBuffer> {
|
|
// can be < 0 if the actual length is not known.
|
|
private final long contentLength;
|
|
private volatile long remainingContentLength;
|
|
private volatile Subscription subscription;
|
|
|
|
// Holds the outgoing data. There will be at most 2 outgoing ByteBuffers.
|
|
// 1) The data that was published by the request body Publisher, and
|
|
// 2) the COMPLETED sentinel, since onComplete can be invoked without demand.
|
|
final ConcurrentLinkedDeque<ByteBuffer> outgoing = new ConcurrentLinkedDeque<>();
|
|
|
|
private final AtomicReference<Throwable> errorRef = new AtomicReference<>();
|
|
// A scheduler used to honor window updates. Writing must be paused
|
|
// when the window is exhausted, and resumed when the window acquires
|
|
// some space. The sendScheduler makes it possible to implement this
|
|
// behaviour in an asynchronous non-blocking way.
|
|
// See RequestSubscriber::trySend below.
|
|
final SequentialScheduler sendScheduler;
|
|
|
|
RequestSubscriber(long contentLen) {
|
|
this.contentLength = contentLen;
|
|
this.remainingContentLength = contentLen;
|
|
this.sendScheduler =
|
|
SequentialScheduler.lockingScheduler(this::trySend);
|
|
}
|
|
|
|
@Override
|
|
public void onSubscribe(Flow.Subscription subscription) {
|
|
if (this.subscription != null) {
|
|
throw new IllegalStateException("already subscribed");
|
|
}
|
|
this.subscription = subscription;
|
|
if (debug.on())
|
|
debug.log("RequestSubscriber: onSubscribe, request 1");
|
|
subscription.request(1);
|
|
}
|
|
|
|
@Override
|
|
public void onNext(ByteBuffer item) {
|
|
if (debug.on())
|
|
debug.log("RequestSubscriber: onNext(%d)", item.remaining());
|
|
int size = outgoing.size();
|
|
assert size == 0 : "non-zero size: " + size;
|
|
onNextImpl(item);
|
|
}
|
|
|
|
private void onNextImpl(ByteBuffer item) {
|
|
// Got some more request body bytes to send.
|
|
if (requestBodyCF.isDone()) {
|
|
if (debug.on()) {
|
|
debug.log("RequestSubscriber: requestBodyCf is done: " +
|
|
"cancelling subscription");
|
|
}
|
|
// stream already cancelled, probably in timeout
|
|
sendScheduler.stop();
|
|
subscription.cancel();
|
|
return;
|
|
}
|
|
outgoing.add(item);
|
|
sendScheduler.runOrSchedule();
|
|
}
|
|
|
|
@Override
|
|
public void onError(Throwable throwable) {
|
|
if (debug.on())
|
|
debug.log(() -> "RequestSubscriber: onError: " + throwable);
|
|
// ensure that errors are handled within the flow.
|
|
if (errorRef.compareAndSet(null, throwable)) {
|
|
sendScheduler.runOrSchedule();
|
|
}
|
|
}
|
|
|
|
@Override
|
|
public void onComplete() {
|
|
if (debug.on()) debug.log("RequestSubscriber: onComplete");
|
|
int size = outgoing.size();
|
|
assert size == 0 || size == 1 : "non-zero or one size: " + size;
|
|
// last byte of request body has been obtained.
|
|
// ensure that everything is completed within the flow.
|
|
onNextImpl(COMPLETED);
|
|
}
|
|
|
|
// Attempts to send the data, if any.
|
|
// Handles errors and completion state.
|
|
// Pause writing if the send window is exhausted, resume it if the
|
|
// send window has some bytes that can be acquired.
|
|
void trySend() {
|
|
try {
|
|
// handle errors raised by onError;
|
|
Throwable t = errorRef.get();
|
|
if (t != null) {
|
|
sendScheduler.stop();
|
|
if (requestBodyCF.isDone()) return;
|
|
subscription.cancel();
|
|
requestBodyCF.completeExceptionally(t);
|
|
cancelImpl(t);
|
|
return;
|
|
}
|
|
int state = streamState;
|
|
|
|
do {
|
|
// handle COMPLETED;
|
|
ByteBuffer item = outgoing.peekFirst();
|
|
if (item == null) return;
|
|
else if (item == COMPLETED) {
|
|
sendScheduler.stop();
|
|
complete();
|
|
return;
|
|
}
|
|
|
|
// handle bytes to send downstream
|
|
while (item.hasRemaining() && state == 0) {
|
|
if (debug.on()) debug.log("trySend: %d", item.remaining());
|
|
DataFrame df = getDataFrame(item);
|
|
if (df == null) {
|
|
if (debug.on())
|
|
debug.log("trySend: can't send yet: %d", item.remaining());
|
|
return; // the send window is exhausted: come back later
|
|
}
|
|
|
|
if (contentLength > 0) {
|
|
remainingContentLength -= df.getDataLength();
|
|
if (remainingContentLength < 0) {
|
|
String msg = connection().getConnectionFlow()
|
|
+ " stream=" + streamid + " "
|
|
+ "[" + Thread.currentThread().getName() + "] "
|
|
+ "Too many bytes in request body. Expected: "
|
|
+ contentLength + ", got: "
|
|
+ (contentLength - remainingContentLength);
|
|
assert streamid > 0;
|
|
connection.resetStream(streamid, ResetFrame.PROTOCOL_ERROR);
|
|
throw new IOException(msg);
|
|
} else if (remainingContentLength == 0) {
|
|
assert !endStreamSent : "internal error, send data after END_STREAM flag";
|
|
df.setFlag(DataFrame.END_STREAM);
|
|
endStreamSent = true;
|
|
}
|
|
} else {
|
|
assert !endStreamSent : "internal error, send data after END_STREAM flag";
|
|
}
|
|
if ((state = streamState) != 0) {
|
|
if (debug.on()) debug.log("trySend: cancelled: %s", String.valueOf(t));
|
|
break;
|
|
}
|
|
if (debug.on())
|
|
debug.log("trySend: sending: %d", df.getDataLength());
|
|
sendDataFrame(df);
|
|
}
|
|
if (state != 0) break;
|
|
assert !item.hasRemaining();
|
|
ByteBuffer b = outgoing.removeFirst();
|
|
assert b == item;
|
|
} while (outgoing.peekFirst() != null);
|
|
|
|
if (state != 0) {
|
|
t = errorRef.get();
|
|
if (t == null) t = new IOException(ResetFrame.stringForCode(streamState));
|
|
throw t;
|
|
}
|
|
|
|
if (debug.on()) debug.log("trySend: request 1");
|
|
subscription.request(1);
|
|
} catch (Throwable ex) {
|
|
if (debug.on()) debug.log("trySend: ", ex);
|
|
sendScheduler.stop();
|
|
subscription.cancel();
|
|
requestBodyCF.completeExceptionally(ex);
|
|
// need to cancel the stream to 1. tell the server
|
|
// we don't want to receive any more data and
|
|
// 2. ensure that the operation ref count will be
|
|
// decremented on the HttpClient.
|
|
cancelImpl(ex);
|
|
}
|
|
}
|
|
|
|
private void complete() throws IOException {
|
|
long remaining = remainingContentLength;
|
|
long written = contentLength - remaining;
|
|
if (remaining > 0) {
|
|
connection.resetStream(streamid, ResetFrame.PROTOCOL_ERROR);
|
|
// let trySend() handle the exception
|
|
throw new IOException(connection().getConnectionFlow()
|
|
+ " stream=" + streamid + " "
|
|
+ "[" + Thread.currentThread().getName() +"] "
|
|
+ "Too few bytes returned by the publisher ("
|
|
+ written + "/"
|
|
+ contentLength + ")");
|
|
}
|
|
if (!endStreamSent) {
|
|
endStreamSent = true;
|
|
connection.sendDataFrame(getEmptyEndStreamDataFrame());
|
|
}
|
|
requestBodyCF.complete(null);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Send a RESET frame to tell server to stop sending data on this stream
|
|
*/
|
|
@Override
|
|
public CompletableFuture<Void> ignoreBody() {
|
|
try {
|
|
connection.resetStream(streamid, ResetFrame.STREAM_CLOSED);
|
|
return MinimalFuture.completedFuture(null);
|
|
} catch (Throwable e) {
|
|
Log.logTrace("Error resetting stream {0}", e.toString());
|
|
return MinimalFuture.failedFuture(e);
|
|
}
|
|
}
|
|
|
|
DataFrame getDataFrame(ByteBuffer buffer) {
|
|
int requestAmount = Math.min(connection.getMaxSendFrameSize(), buffer.remaining());
|
|
// blocks waiting for stream send window, if exhausted
|
|
int actualAmount = windowController.tryAcquire(requestAmount, streamid, this);
|
|
if (actualAmount <= 0) return null;
|
|
ByteBuffer outBuf = Utils.sliceWithLimitedCapacity(buffer, actualAmount);
|
|
DataFrame df = new DataFrame(streamid, 0 , outBuf);
|
|
return df;
|
|
}
|
|
|
|
private DataFrame getEmptyEndStreamDataFrame() {
|
|
return new DataFrame(streamid, DataFrame.END_STREAM, List.of());
|
|
}
|
|
|
|
/**
|
|
* A List of responses relating to this stream. Normally there is only
|
|
* one response, but interim responses like 100 are allowed
|
|
* and must be passed up to higher level before continuing. Deals with races
|
|
* such as if responses are returned before the CFs get created by
|
|
* getResponseAsync()
|
|
*/
|
|
|
|
final List<CompletableFuture<Response>> response_cfs = new ArrayList<>(5);
|
|
final Lock response_cfs_lock = new ReentrantLock();
|
|
|
|
@Override
|
|
CompletableFuture<Response> getResponseAsync(Executor executor) {
|
|
CompletableFuture<Response> cf;
|
|
// The code below deals with race condition that can be caused when
|
|
// completeResponse() is being called before getResponseAsync()
|
|
response_cfs_lock.lock();
|
|
try {
|
|
if (!response_cfs.isEmpty()) {
|
|
// This CompletableFuture was created by completeResponse().
|
|
// it will be already completed, unless the expect continue
|
|
// timeout fired
|
|
cf = response_cfs.get(0);
|
|
if (cf.isDone()) {
|
|
cf = response_cfs.remove(0);
|
|
}
|
|
|
|
// if we find a cf here it should be already completed.
|
|
// finding a non completed cf should not happen. just assert it.
|
|
assert cf.isDone() || request.expectContinue && expectTimeoutRaised()
|
|
: "Removing uncompleted response: could cause code to hang!";
|
|
} else {
|
|
// getResponseAsync() is called first. Create a CompletableFuture
|
|
// that will be completed by completeResponse() when
|
|
// completeResponse() is called.
|
|
cf = new MinimalFuture<>();
|
|
response_cfs.add(cf);
|
|
}
|
|
} finally {
|
|
response_cfs_lock.unlock();
|
|
}
|
|
if (executor != null && !cf.isDone()) {
|
|
// protect from executing later chain of CompletableFuture operations from SelectorManager thread
|
|
cf = cf.thenApplyAsync(r -> r, executor);
|
|
}
|
|
Log.logTrace("Response future (stream={0}) is: {1}", streamid, cf);
|
|
PushGroup<?> pg = exchange.getPushGroup();
|
|
if (pg != null) {
|
|
// if an error occurs make sure it is recorded in the PushGroup
|
|
cf = cf.whenComplete((t,e) -> pg.pushError(Utils.getCompletionCause(e)));
|
|
}
|
|
return cf;
|
|
}
|
|
|
|
/**
|
|
* Completes the first uncompleted CF on list, and removes it. If there is no
|
|
* uncompleted CF then creates one (completes it) and adds to list
|
|
*/
|
|
void completeResponse(Response resp) {
|
|
response_cfs_lock.lock();
|
|
try {
|
|
CompletableFuture<Response> cf;
|
|
int cfs_len = response_cfs.size();
|
|
for (int i=0; i<cfs_len; i++) {
|
|
cf = response_cfs.get(i);
|
|
if (!cf.isDone() && !expectTimeoutRaised()) {
|
|
Log.logTrace("Completing response (streamid={0}): {1}",
|
|
streamid, cf);
|
|
if (debug.on())
|
|
debug.log("Completing responseCF(%d) with response headers", i);
|
|
response_cfs.remove(cf);
|
|
cf.complete(resp);
|
|
return;
|
|
} else if (expectTimeoutRaised()) {
|
|
Log.logTrace("Completing response (streamid={0}): {1}",
|
|
streamid, cf);
|
|
if (debug.on())
|
|
debug.log("Completing responseCF(%d) with response headers", i);
|
|
// The Request will be removed in getResponseAsync()
|
|
cf.complete(resp);
|
|
return;
|
|
} // else we found the previous response: just leave it alone.
|
|
}
|
|
cf = MinimalFuture.completedFuture(resp);
|
|
Log.logTrace("Created completed future (streamid={0}): {1}",
|
|
streamid, cf);
|
|
if (debug.on())
|
|
debug.log("Adding completed responseCF(0) with response headers");
|
|
response_cfs.add(cf);
|
|
} finally {
|
|
response_cfs_lock.unlock();
|
|
}
|
|
}
|
|
|
|
// methods to update state and remove stream when finished
|
|
|
|
void requestSent() {
|
|
stateLock.lock();
|
|
try {
|
|
requestSent = true;
|
|
if (responseReceived) {
|
|
if (debug.on()) debug.log("requestSent: streamid=%d", streamid);
|
|
close();
|
|
} else {
|
|
if (debug.on()) {
|
|
debug.log("requestSent: streamid=%d but response not received", streamid);
|
|
}
|
|
}
|
|
} finally {
|
|
stateLock.unlock();
|
|
}
|
|
}
|
|
|
|
void responseReceived() {
|
|
stateLock.lock();
|
|
try {
|
|
responseReceived = true;
|
|
if (requestSent) {
|
|
if (debug.on()) debug.log("responseReceived: streamid=%d", streamid);
|
|
close();
|
|
} else {
|
|
if (debug.on()) {
|
|
debug.log("responseReceived: streamid=%d but request not sent", streamid);
|
|
}
|
|
}
|
|
} finally {
|
|
stateLock.unlock();
|
|
}
|
|
}
|
|
|
|
/**
|
|
* same as above but for errors
|
|
*/
|
|
void completeResponseExceptionally(Throwable t) {
|
|
response_cfs_lock.lock();
|
|
try {
|
|
// use index to avoid ConcurrentModificationException
|
|
// caused by removing the CF from within the loop.
|
|
for (int i = 0; i < response_cfs.size(); i++) {
|
|
CompletableFuture<Response> cf = response_cfs.get(i);
|
|
if (!cf.isDone()) {
|
|
response_cfs.remove(i);
|
|
cf.completeExceptionally(t);
|
|
return;
|
|
}
|
|
}
|
|
response_cfs.add(MinimalFuture.failedFuture(t));
|
|
} finally {
|
|
response_cfs_lock.unlock();
|
|
}
|
|
}
|
|
|
|
CompletableFuture<Void> sendBodyImpl() {
|
|
requestBodyCF.whenComplete((v, t) -> requestSent());
|
|
try {
|
|
if (requestPublisher != null) {
|
|
final RequestSubscriber subscriber = new RequestSubscriber(requestContentLen);
|
|
requestPublisher.subscribe(requestSubscriber = subscriber);
|
|
} else {
|
|
// there is no request body, therefore the request is complete,
|
|
// END_STREAM has already sent with outgoing headers
|
|
requestBodyCF.complete(null);
|
|
}
|
|
} catch (Throwable t) {
|
|
cancelImpl(t);
|
|
requestBodyCF.completeExceptionally(t);
|
|
}
|
|
return requestBodyCF;
|
|
}
|
|
|
|
@Override
|
|
void cancel() {
|
|
if ((streamid == 0)) {
|
|
cancel(new IOException("Stream cancelled before streamid assigned"));
|
|
} else {
|
|
cancel(new IOException("Stream " + streamid + " cancelled"));
|
|
}
|
|
}
|
|
|
|
void onSubscriptionError(Throwable t) {
|
|
errorRef.compareAndSet(null, t);
|
|
if (debug.on()) debug.log("Got subscription error: %s", (Object)t);
|
|
// This is the special case where the subscriber
|
|
// has requested an illegal number of items.
|
|
// In this case, the error doesn't come from
|
|
// upstream, but from downstream, and we need to
|
|
// handle the error without waiting for the inputQ
|
|
// to be exhausted.
|
|
stopRequested = true;
|
|
sched.runOrSchedule();
|
|
}
|
|
|
|
@Override
|
|
void cancel(IOException cause) {
|
|
cancelImpl(cause);
|
|
}
|
|
|
|
@Override
|
|
void onProtocolError(final IOException cause) {
|
|
onProtocolError(cause, ResetFrame.PROTOCOL_ERROR);
|
|
}
|
|
|
|
void onProtocolError(final IOException cause, int code) {
|
|
if (debug.on()) {
|
|
debug.log("cancelling exchange on stream %d due to protocol error [%s]: %s",
|
|
streamid, ErrorFrame.stringForCode(code),
|
|
cause.getMessage());
|
|
}
|
|
Log.logError("cancelling exchange on stream {0} due to protocol error: {1}\n", streamid, cause);
|
|
// send a RESET frame and close the stream
|
|
cancelImpl(cause, code);
|
|
}
|
|
|
|
void connectionClosing(Throwable cause) {
|
|
Flow.Subscriber<?> subscriber =
|
|
responseSubscriber == null ? pendingResponseSubscriber : responseSubscriber;
|
|
errorRef.compareAndSet(null, cause);
|
|
if (subscriber != null && !sched.isStopped() && !inputQ.isEmpty()) {
|
|
sched.runOrSchedule();
|
|
} else cancelImpl(cause);
|
|
}
|
|
|
|
// This method sends a RST_STREAM frame
|
|
void cancelImpl(Throwable e) {
|
|
cancelImpl(e, ResetFrame.CANCEL);
|
|
}
|
|
|
|
void cancelImpl(final Throwable e, final int resetFrameErrCode) {
|
|
errorRef.compareAndSet(null, e);
|
|
if (debug.on()) {
|
|
if (streamid == 0) debug.log("cancelling stream: %s", (Object)e);
|
|
else debug.log("cancelling stream %d: %s", streamid, e);
|
|
}
|
|
if (Log.trace()) {
|
|
if (streamid == 0) Log.logTrace("cancelling stream: {0}\n", e);
|
|
else Log.logTrace("cancelling stream {0}: {1}\n", streamid, e);
|
|
}
|
|
boolean closing;
|
|
if (closing = !closed) { // assigning closing to !closed
|
|
stateLock.lock();
|
|
try {
|
|
if (closing = !closed) { // assigning closing to !closed
|
|
closed=true;
|
|
}
|
|
} finally {
|
|
stateLock.unlock();
|
|
}
|
|
}
|
|
|
|
if (closing) { // true if the stream has not been closed yet
|
|
var subscriber = this.responseSubscriber;
|
|
if (subscriber == null) subscriber = this.pendingResponseSubscriber;
|
|
if (subscriber != null) {
|
|
if (debug.on())
|
|
debug.log("stream %s closing due to %s", streamid, (Object)errorRef.get());
|
|
sched.runOrSchedule();
|
|
if (subscriber instanceof Http2StreamResponseSubscriber<?> rs) {
|
|
// make sure the subscriber is stopped.
|
|
if (debug.on()) debug.log("closing response subscriber stream %s", streamid);
|
|
rs.complete(errorRef.get());
|
|
}
|
|
} else {
|
|
if (debug.on())
|
|
debug.log("stream %s closing due to %s before subscriber registered",
|
|
streamid, (Object)errorRef.get());
|
|
}
|
|
} else {
|
|
if (debug.on()) {
|
|
debug.log("stream %s already closed due to %s",
|
|
streamid, (Object)errorRef.get());
|
|
}
|
|
}
|
|
|
|
completeResponseExceptionally(e);
|
|
if (!requestBodyCF.isDone()) {
|
|
requestBodyCF.completeExceptionally(errorRef.get()); // we may be sending the body..
|
|
}
|
|
if (responseBodyCF != null) {
|
|
responseBodyCF.completeExceptionally(errorRef.get());
|
|
}
|
|
try {
|
|
// will send a RST_STREAM frame
|
|
if (streamid != 0 && streamState == 0) {
|
|
final Throwable cause = Utils.getCompletionCause(e);
|
|
if (cause instanceof EOFException) {
|
|
// read EOF: no need to try & send reset
|
|
connection.decrementStreamsCount(streamid);
|
|
connection.closeStream(streamid);
|
|
} else {
|
|
// no use to send CANCEL if already closed.
|
|
sendResetStreamFrame(resetFrameErrCode);
|
|
}
|
|
}
|
|
} catch (Throwable ex) {
|
|
Log.logError(ex);
|
|
} finally {
|
|
drainInputQueue();
|
|
}
|
|
}
|
|
|
|
void sendResetStreamFrame(final int resetFrameErrCode) {
|
|
// do not reset a stream until it has a streamid.
|
|
if (streamid > 0 && markStream(resetFrameErrCode) == 0) {
|
|
connection.resetStream(streamid, resetFrameErrCode);
|
|
}
|
|
close();
|
|
}
|
|
|
|
// This method doesn't send any frame
|
|
void close() {
|
|
if (closed) return;
|
|
stateLock.lock();
|
|
try {
|
|
if (closed) return;
|
|
closed = true;
|
|
} finally {
|
|
stateLock.unlock();
|
|
}
|
|
if (debug.on()) debug.log("close stream %d", streamid);
|
|
Log.logTrace("Closing stream {0}", streamid);
|
|
connection.closeStream(streamid);
|
|
var s = responseSubscriber == null
|
|
? pendingResponseSubscriber
|
|
: responseSubscriber;
|
|
if (debug.on()) debug.log("subscriber is %s", s);
|
|
if (s instanceof Http2StreamResponseSubscriber<?> sw) {
|
|
if (debug.on()) debug.log("closing response subscriber stream %s", streamid);
|
|
// if the subscriber has already completed,
|
|
// there is nothing to do...
|
|
if (!sw.completed()) {
|
|
// otherwise make sure it will be completed
|
|
var cause = errorRef.get();
|
|
sw.complete(cause == null ? new IOException("stream closed") : cause);
|
|
}
|
|
}
|
|
Log.logTrace("Stream {0} closed", streamid);
|
|
}
|
|
|
|
static class PushedStream<T> extends Stream<T> {
|
|
final Stream<T> parent;
|
|
final PushGroup<T> pushGroup;
|
|
// push streams need the response CF allocated up front as it is
|
|
// given directly to user via the multi handler callback function.
|
|
final CompletableFuture<Response> pushCF;
|
|
CompletableFuture<HttpResponse<T>> responseCF;
|
|
final HttpRequestImpl pushReq;
|
|
volatile HttpResponse.BodyHandler<T> pushHandler;
|
|
private volatile boolean finalPushResponseCodeReceived;
|
|
|
|
PushedStream(Stream<T> parent,
|
|
PushGroup<T> pushGroup,
|
|
Http2Connection connection,
|
|
Exchange<T> pushReq) {
|
|
// ## no request body possible, null window controller
|
|
super(connection, pushReq, null);
|
|
this.parent = parent;
|
|
this.pushGroup = pushGroup;
|
|
this.pushReq = pushReq.request();
|
|
this.pushCF = new MinimalFuture<>();
|
|
this.responseCF = new MinimalFuture<>();
|
|
}
|
|
|
|
CompletableFuture<HttpResponse<T>> responseCF() {
|
|
return responseCF;
|
|
}
|
|
|
|
void setPushHandler(HttpResponse.BodyHandler<T> pushHandler) {
|
|
this.pushHandler = pushHandler;
|
|
}
|
|
|
|
HttpResponse.BodyHandler<T> getPushHandler() {
|
|
// ignored parameters to function can be used as BodyHandler
|
|
return this.pushHandler;
|
|
}
|
|
|
|
// Following methods call the super class but in case of
|
|
// error record it in the PushGroup. The error method is called
|
|
// with a null value when no error occurred (is a no-op)
|
|
@Override
|
|
CompletableFuture<ExchangeImpl<T>> sendBodyAsync() {
|
|
return super.sendBodyAsync()
|
|
.whenComplete((ExchangeImpl<T> v, Throwable t)
|
|
-> pushGroup.pushError(Utils.getCompletionCause(t)));
|
|
}
|
|
|
|
@Override
|
|
CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() {
|
|
return super.sendHeadersAsync()
|
|
.whenComplete((ExchangeImpl<T> ex, Throwable t)
|
|
-> pushGroup.pushError(Utils.getCompletionCause(t)));
|
|
}
|
|
|
|
@Override
|
|
CompletableFuture<Response> getResponseAsync(Executor executor) {
|
|
CompletableFuture<Response> cf = pushCF.whenComplete(
|
|
(v, t) -> pushGroup.pushError(Utils.getCompletionCause(t)));
|
|
if(executor!=null && !cf.isDone()) {
|
|
cf = cf.thenApplyAsync( r -> r, executor);
|
|
}
|
|
return cf;
|
|
}
|
|
|
|
@Override
|
|
CompletableFuture<T> readBodyAsync(
|
|
HttpResponse.BodyHandler<T> handler,
|
|
boolean returnConnectionToPool,
|
|
Executor executor)
|
|
{
|
|
return super.readBodyAsync(handler, returnConnectionToPool, executor)
|
|
.whenComplete((v, t) -> pushGroup.pushError(t));
|
|
}
|
|
|
|
@Override
|
|
void completeResponse(Response r) {
|
|
Log.logResponse(r::toString);
|
|
pushCF.complete(r); // not strictly required for push API
|
|
// start reading the body using the obtained BodySubscriber
|
|
CompletableFuture<Void> start = new MinimalFuture<>();
|
|
start.thenCompose( v -> readBodyAsync(getPushHandler(), false, getExchange().executor()))
|
|
.whenComplete((T body, Throwable t) -> {
|
|
if (t != null) {
|
|
responseCF.completeExceptionally(t);
|
|
} else {
|
|
HttpResponseImpl<T> resp =
|
|
new HttpResponseImpl<>(r.request, r, null, body, getExchange());
|
|
responseCF.complete(resp);
|
|
}
|
|
});
|
|
start.completeAsync(() -> null, getExchange().executor());
|
|
}
|
|
|
|
@Override
|
|
void completeResponseExceptionally(Throwable t) {
|
|
pushCF.completeExceptionally(t);
|
|
}
|
|
|
|
// create and return the PushResponseImpl
|
|
@Override
|
|
protected void handleResponse(HeaderFrame hf) {
|
|
HttpHeaders responseHeaders = responseHeadersBuilder.build();
|
|
|
|
if (!finalPushResponseCodeReceived) {
|
|
responseCode = (int)responseHeaders
|
|
.firstValueAsLong(":status")
|
|
.orElse(-1);
|
|
|
|
if (responseCode == -1) {
|
|
cancelImpl(new ProtocolException("No status code"), ResetFrame.PROTOCOL_ERROR);
|
|
rspHeadersConsumer.reset();
|
|
return;
|
|
} else if (responseCode >= 100 && responseCode < 200) {
|
|
String protocolErrorMsg = checkInterimResponseCountExceeded();
|
|
if (protocolErrorMsg != null) {
|
|
cancelImpl(new ProtocolException(protocolErrorMsg), ResetFrame.PROTOCOL_ERROR);
|
|
rspHeadersConsumer.reset();
|
|
return;
|
|
}
|
|
}
|
|
|
|
this.finalPushResponseCodeReceived = true;
|
|
|
|
this.response = new Response(
|
|
pushReq, exchange, responseHeaders, connection(),
|
|
responseCode, HttpClient.Version.HTTP_2);
|
|
|
|
/* TODO: review if needs to be removed
|
|
the value is not used, but in case `content-length` doesn't parse
|
|
as long, there will be NumberFormatException. If left as is, make
|
|
sure code up the stack handles NFE correctly. */
|
|
responseHeaders.firstValueAsLong("content-length");
|
|
|
|
if (Log.headers()) {
|
|
StringBuilder sb = new StringBuilder("RESPONSE HEADERS (streamid=%s):\n".formatted(streamid));
|
|
sb.append(" %s %s %s\n".formatted(request.method(), request.uri(), responseCode));
|
|
Log.dumpHeaders(sb, " ", responseHeaders);
|
|
Log.logHeaders(sb.toString());
|
|
}
|
|
|
|
rspHeadersConsumer.reset();
|
|
|
|
// different implementations for normal streams and pushed streams
|
|
completeResponse(response);
|
|
} else {
|
|
if (Log.headers()) {
|
|
StringBuilder sb = new StringBuilder("TRAILING HEADERS (streamid=%s):\n".formatted(streamid));
|
|
sb.append(" %s %s %s\n".formatted(request.method(), request.uri(), responseCode));
|
|
Log.dumpHeaders(sb, " ", responseHeaders);
|
|
Log.logHeaders(sb.toString());
|
|
}
|
|
rspHeadersConsumer.reset();
|
|
}
|
|
}
|
|
}
|
|
|
|
final class StreamWindowUpdateSender extends WindowUpdateSender {
|
|
|
|
StreamWindowUpdateSender(Http2Connection connection) {
|
|
super(connection);
|
|
}
|
|
|
|
@Override
|
|
int getStreamId() {
|
|
return streamid;
|
|
}
|
|
|
|
@Override
|
|
String dbgString() {
|
|
String dbg = dbgString;
|
|
if (dbg != null) return dbg;
|
|
if (streamid == 0) {
|
|
return connection.dbgString() + ":WindowUpdateSender(stream: ?)";
|
|
} else {
|
|
dbg = connection.dbgString() + ":WindowUpdateSender(stream: " + streamid + ")";
|
|
return dbgString = dbg;
|
|
}
|
|
}
|
|
|
|
@Override
|
|
protected boolean windowSizeExceeded(long received) {
|
|
onProtocolError(new ProtocolException("stream %s flow control window exceeded"
|
|
.formatted(streamid)), ResetFrame.FLOW_CONTROL_ERROR);
|
|
return true;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Returns true if this exchange was canceled.
|
|
* @return true if this exchange was canceled.
|
|
*/
|
|
boolean isCanceled() {
|
|
return errorRef.get() != null;
|
|
}
|
|
|
|
/**
|
|
* Returns the cause for which this exchange was canceled, if available.
|
|
* @return the cause for which this exchange was canceled, if available.
|
|
*/
|
|
Throwable getCancelCause() {
|
|
return errorRef.get();
|
|
}
|
|
|
|
final String dbgString() {
|
|
final int id = streamid;
|
|
final String sid = id == 0 ? "?" : String.valueOf(id);
|
|
return connection.dbgString() + "/Stream(" + sid + ")";
|
|
}
|
|
|
|
/**
|
|
* An unprocessed exchange is one that hasn't been processed by a peer. The local end of the
|
|
* connection would be notified about such exchanges when it receives a GOAWAY frame with
|
|
* a stream id that tells which exchanges have been unprocessed.
|
|
* This method is called on such unprocessed exchanges and the implementation of this method
|
|
* will arrange for the request, corresponding to this exchange, to be retried afresh on a
|
|
* new connection.
|
|
*/
|
|
void closeAsUnprocessed() {
|
|
try {
|
|
// We arrange for the request to be retried on a new connection as allowed by the RFC-9113
|
|
markUnprocessedByPeer();
|
|
this.errorRef.compareAndSet(null, new IOException("request not processed by peer"));
|
|
if (debug.on()) {
|
|
debug.log("closing " + this.request + " as unprocessed by peer");
|
|
}
|
|
// close the exchange and complete the response CF exceptionally
|
|
close();
|
|
completeResponseExceptionally(this.errorRef.get());
|
|
} finally {
|
|
// decrementStreamsCount isn't really needed but we do it to make sure
|
|
// the log messages, where these counts/states get reported, show the accurate state.
|
|
connection.decrementStreamsCount(streamid);
|
|
}
|
|
}
|
|
|
|
private final class HeadersConsumer extends ValidatingHeadersConsumer
|
|
implements DecodingCallback {
|
|
|
|
private HeadersConsumer() {
|
|
super(Context.RESPONSE);
|
|
}
|
|
|
|
boolean maxHeaderListSizeReached;
|
|
|
|
@Override
|
|
public void reset() {
|
|
super.reset();
|
|
responseHeadersBuilder.clear();
|
|
debug.log("Response builder cleared, ready to receive new headers.");
|
|
}
|
|
|
|
@Override
|
|
public void onDecoded(CharSequence name, CharSequence value)
|
|
throws UncheckedIOException
|
|
{
|
|
if (maxHeaderListSizeReached) {
|
|
return;
|
|
}
|
|
try {
|
|
String n = name.toString();
|
|
String v = value.toString();
|
|
super.onDecoded(n, v);
|
|
responseHeadersBuilder.addHeader(n, v);
|
|
if (Log.headers() && Log.trace()) {
|
|
Log.logTrace("RECEIVED HEADER (streamid={0}): {1}: {2}",
|
|
streamid, n, v);
|
|
}
|
|
} catch (UncheckedIOException uio) {
|
|
// reset stream: From RFC 9113, section 8.1
|
|
// Malformed requests or responses that are detected MUST be
|
|
// treated as a stream error (Section 5.4.2) of type
|
|
// PROTOCOL_ERROR.
|
|
onProtocolError(uio.getCause());
|
|
}
|
|
}
|
|
|
|
@Override
|
|
protected String formatMessage(String message, String header) {
|
|
return "malformed response: " + super.formatMessage(message, header);
|
|
}
|
|
|
|
@Override
|
|
public void onMaxHeaderListSizeReached(long size, int maxHeaderListSize) throws ProtocolException {
|
|
if (maxHeaderListSizeReached) return;
|
|
try {
|
|
DecodingCallback.super.onMaxHeaderListSizeReached(size, maxHeaderListSize);
|
|
} catch (ProtocolException cause) {
|
|
maxHeaderListSizeReached = true;
|
|
// If this is a push stream: cancel the parent.
|
|
if (Stream.this instanceof Stream.PushedStream<?> ps) {
|
|
ps.parent.onProtocolError(cause);
|
|
}
|
|
// cancel the stream, continue processing
|
|
onProtocolError(cause);
|
|
reset();
|
|
}
|
|
}
|
|
}
|
|
|
|
final class Http2StreamResponseSubscriber<U> extends HttpBodySubscriberWrapper<U> {
|
|
Http2StreamResponseSubscriber(BodySubscriber<U> subscriber) {
|
|
super(subscriber);
|
|
}
|
|
|
|
@Override
|
|
protected void register() {
|
|
registerResponseSubscriber(this);
|
|
}
|
|
|
|
@Override
|
|
protected void unregister() {
|
|
unregisterResponseSubscriber(this);
|
|
}
|
|
|
|
}
|
|
|
|
private static final VarHandle STREAM_STATE;
|
|
private static final VarHandle DEREGISTERED;
|
|
static {
|
|
try {
|
|
MethodHandles.Lookup lookup = MethodHandles.lookup();
|
|
STREAM_STATE = lookup
|
|
.findVarHandle(Stream.class, "streamState", int.class);
|
|
DEREGISTERED = lookup
|
|
.findVarHandle(Stream.class, "deRegistered", boolean.class);
|
|
} catch (Exception x) {
|
|
throw new ExceptionInInitializerError(x);
|
|
}
|
|
}
|
|
}
|