mirror of
https://github.com/openjdk/jdk.git
synced 2026-02-21 07:45:11 +00:00
8180155: WebSocket secure connection get stuck after onOpen
8156518: WebSocket.Builder.connectTimeout(long timeout, TimeUnit unit) implicitly affect websocket connection timeout Reviewed-by: dfuchs
This commit is contained in:
parent
b2870f2b5f
commit
cc296e46f5
@ -316,13 +316,14 @@ class MultiExchange<U,T> {
|
||||
})
|
||||
// 5. Handle errors and cancel any timer set
|
||||
.handle((response, ex) -> {
|
||||
if (response != null) {
|
||||
cancelTimer();
|
||||
if (ex == null) {
|
||||
assert response != null;
|
||||
return MinimalFuture.completedFuture(response);
|
||||
}
|
||||
// all exceptions thrown are handled here
|
||||
CompletableFuture<Response> error = getExceptionalCF(ex);
|
||||
if (error == null) {
|
||||
cancelTimer();
|
||||
return responseAsyncImpl();
|
||||
} else {
|
||||
return error;
|
||||
|
||||
@ -274,9 +274,7 @@ class SSLDelegate {
|
||||
int x;
|
||||
do {
|
||||
if (needData) {
|
||||
do {
|
||||
x = chan.read (unwrap_src);
|
||||
} while (x == 0);
|
||||
x = chan.read (unwrap_src);
|
||||
if (x == -1) {
|
||||
throw new IOException ("connection closed for reading");
|
||||
}
|
||||
|
||||
@ -28,7 +28,6 @@ package jdk.incubator.http.internal.websocket;
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.SelectionKey;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
/*
|
||||
@ -58,23 +57,24 @@ final class Receiver {
|
||||
private final Frame.Reader reader = new Frame.Reader();
|
||||
private final RawChannel.RawEvent event = createHandler();
|
||||
private final AtomicLong demand = new AtomicLong();
|
||||
private final CooperativeHandler handler =
|
||||
new CooperativeHandler(this::pushContinuously);
|
||||
/*
|
||||
* Used to ensure registering the channel event at most once (i.e. to avoid
|
||||
* multiple registrations).
|
||||
*/
|
||||
private final AtomicBoolean readable = new AtomicBoolean();
|
||||
private final CooperativeHandler handler;
|
||||
|
||||
private ByteBuffer data;
|
||||
private volatile int state;
|
||||
|
||||
private static final int UNREGISTERED = 0;
|
||||
private static final int AVAILABLE = 1;
|
||||
private static final int WAITING = 2;
|
||||
|
||||
Receiver(MessageStreamConsumer messageConsumer, RawChannel channel) {
|
||||
this.messageConsumer = messageConsumer;
|
||||
this.channel = channel;
|
||||
this.data = channel.initialByteBuffer();
|
||||
this.frameConsumer = new FrameConsumer(this.messageConsumer);
|
||||
// To ensure the initial non-final `data` will be read correctly
|
||||
// (happens-before) by reader after executing readable.get()
|
||||
readable.set(true);
|
||||
this.data = channel.initialByteBuffer();
|
||||
// To ensure the initial non-final `data` will be visible
|
||||
// (happens-before) when `handler` invokes `pushContinuously`
|
||||
// the following assignment is done last:
|
||||
handler = new CooperativeHandler(this::pushContinuously);
|
||||
}
|
||||
|
||||
private RawChannel.RawEvent createHandler() {
|
||||
@ -87,7 +87,7 @@ final class Receiver {
|
||||
|
||||
@Override
|
||||
public void handle() {
|
||||
readable.set(true);
|
||||
state = AVAILABLE;
|
||||
handler.handle();
|
||||
}
|
||||
};
|
||||
@ -110,54 +110,63 @@ final class Receiver {
|
||||
|
||||
/*
|
||||
* Stops the machinery from reading and delivering messages permanently,
|
||||
* regardless of the current demand.
|
||||
* regardless of the current demand and data availability.
|
||||
*/
|
||||
void close() {
|
||||
handler.stop();
|
||||
}
|
||||
|
||||
private void pushContinuously() {
|
||||
while (readable.get() && demand.get() > 0 && !handler.isStopped()) {
|
||||
pushOnce();
|
||||
}
|
||||
}
|
||||
|
||||
private void pushOnce() {
|
||||
if (data == null && !readData()) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
reader.readFrame(data, frameConsumer); // Pushing frame parts to the consumer
|
||||
} catch (FailWebSocketException e) {
|
||||
messageConsumer.onError(e);
|
||||
return;
|
||||
}
|
||||
if (!data.hasRemaining()) {
|
||||
data = null;
|
||||
}
|
||||
}
|
||||
|
||||
private boolean readData() {
|
||||
try {
|
||||
data = channel.read();
|
||||
} catch (IOException e) {
|
||||
messageConsumer.onError(e);
|
||||
return false;
|
||||
}
|
||||
if (data == null) { // EOF
|
||||
messageConsumer.onComplete();
|
||||
return false;
|
||||
} else if (!data.hasRemaining()) { // No data in the socket at the moment
|
||||
data = null;
|
||||
readable.set(false);
|
||||
try {
|
||||
channel.registerEvent(event);
|
||||
} catch (IOException e) {
|
||||
messageConsumer.onError(e);
|
||||
while (!handler.isStopped()) {
|
||||
if (data.hasRemaining()) {
|
||||
if (demand.get() > 0) {
|
||||
try {
|
||||
int oldPos = data.position();
|
||||
reader.readFrame(data, frameConsumer);
|
||||
int newPos = data.position();
|
||||
assert oldPos != newPos : data; // reader always consumes bytes
|
||||
} catch (FailWebSocketException e) {
|
||||
handler.stop();
|
||||
messageConsumer.onError(e);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
break;
|
||||
}
|
||||
switch (state) {
|
||||
case WAITING:
|
||||
return;
|
||||
case UNREGISTERED:
|
||||
try {
|
||||
state = WAITING;
|
||||
channel.registerEvent(event);
|
||||
} catch (IOException e) {
|
||||
handler.stop();
|
||||
messageConsumer.onError(e);
|
||||
}
|
||||
return;
|
||||
case AVAILABLE:
|
||||
try {
|
||||
data = channel.read();
|
||||
} catch (IOException e) {
|
||||
handler.stop();
|
||||
messageConsumer.onError(e);
|
||||
return;
|
||||
}
|
||||
if (data == null) { // EOF
|
||||
handler.stop();
|
||||
messageConsumer.onComplete();
|
||||
return;
|
||||
} else if (!data.hasRemaining()) { // No data at the moment
|
||||
// Pretty much a "goto", reusing the existing code path
|
||||
// for registration
|
||||
state = UNREGISTERED;
|
||||
}
|
||||
continue;
|
||||
default:
|
||||
throw new InternalError(String.valueOf(state));
|
||||
}
|
||||
return false;
|
||||
}
|
||||
assert data.hasRemaining();
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user