mirror of
https://github.com/openjdk/jdk.git
synced 2026-02-26 02:00:12 +00:00
8161474: Extract interface from java.net.http.RawChannel
Reviewed-by: michaelm
This commit is contained in:
parent
7112e19bd1
commit
9118632bbd
@ -178,7 +178,7 @@ class HttpResponseImpl extends HttpResponse {
|
||||
*/
|
||||
RawChannel rawChannel() throws IOException {
|
||||
if (rawchan == null) {
|
||||
rawchan = new RawChannel(request.client(), connection);
|
||||
rawchan = new RawChannelImpl(request.client(), connection);
|
||||
}
|
||||
return rawchan;
|
||||
}
|
||||
|
||||
@ -25,146 +25,36 @@ package java.net.http;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.ByteChannel;
|
||||
import java.nio.channels.GatheringByteChannel;
|
||||
import java.nio.channels.SelectableChannel;
|
||||
import java.nio.channels.SelectionKey;
|
||||
import java.nio.channels.SocketChannel;
|
||||
|
||||
//
|
||||
// Used to implement WebSocket. Each RawChannel corresponds to a TCP connection
|
||||
// (SocketChannel) but is connected to a Selector and an ExecutorService for
|
||||
// invoking the send and receive callbacks. Also includes SSL processing.
|
||||
//
|
||||
final class RawChannel implements ByteChannel, GatheringByteChannel {
|
||||
/*
|
||||
* I/O abstraction used to implement WebSocket.
|
||||
*/
|
||||
public interface RawChannel {
|
||||
|
||||
private final HttpClientImpl client;
|
||||
private final HttpConnection connection;
|
||||
interface RawEvent {
|
||||
|
||||
private interface RawEvent {
|
||||
|
||||
/**
|
||||
* must return the selector interest op flags OR'd.
|
||||
/*
|
||||
* Must return the selector interest op flags.
|
||||
*/
|
||||
int interestOps();
|
||||
|
||||
/**
|
||||
* called when event occurs.
|
||||
/*
|
||||
* Called when event occurs.
|
||||
*/
|
||||
void handle();
|
||||
}
|
||||
|
||||
interface NonBlockingEvent extends RawEvent {
|
||||
}
|
||||
|
||||
RawChannel(HttpClientImpl client, HttpConnection connection)
|
||||
throws IOException {
|
||||
this.client = client;
|
||||
this.connection = connection;
|
||||
SocketChannel chan = connection.channel();
|
||||
client.cancelRegistration(chan);
|
||||
chan.configureBlocking(false);
|
||||
}
|
||||
|
||||
SocketChannel socketChannel() {
|
||||
return connection.channel();
|
||||
}
|
||||
|
||||
ByteBuffer getRemaining() {
|
||||
return connection.getRemaining();
|
||||
}
|
||||
|
||||
private class RawAsyncEvent extends AsyncEvent {
|
||||
|
||||
private final RawEvent re;
|
||||
|
||||
RawAsyncEvent(RawEvent re) {
|
||||
super(AsyncEvent.BLOCKING); // BLOCKING & !REPEATING
|
||||
this.re = re;
|
||||
}
|
||||
|
||||
RawAsyncEvent(RawEvent re, int flags) {
|
||||
super(flags);
|
||||
this.re = re;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SelectableChannel channel() {
|
||||
return connection.channel();
|
||||
}
|
||||
|
||||
// must return the selector interest op flags OR'd
|
||||
@Override
|
||||
public int interestOps() {
|
||||
return re.interestOps();
|
||||
}
|
||||
|
||||
// called when event occurs
|
||||
@Override
|
||||
public void handle() {
|
||||
re.handle();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void abort() { }
|
||||
}
|
||||
|
||||
private class NonBlockingRawAsyncEvent extends RawAsyncEvent {
|
||||
|
||||
NonBlockingRawAsyncEvent(RawEvent re) {
|
||||
super(re, 0); // !BLOCKING & !REPEATING
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Register given event whose callback will be called once only.
|
||||
* (i.e. register new event for each callback)
|
||||
*/
|
||||
public void registerEvent(RawEvent event) throws IOException {
|
||||
if (!(event instanceof NonBlockingEvent)) {
|
||||
throw new InternalError();
|
||||
}
|
||||
if ((event.interestOps() & SelectionKey.OP_READ) != 0
|
||||
&& connection.buffer.hasRemaining()) {
|
||||
// FIXME: a hack to deal with leftovers from previous reads into an
|
||||
// internal buffer (works in conjunction with change in
|
||||
// java.net.http.PlainHttpConnection.readImpl(java.nio.ByteBuffer)
|
||||
connection.channel().configureBlocking(false);
|
||||
event.handle();
|
||||
} else {
|
||||
client.registerEvent(new NonBlockingRawAsyncEvent(event));
|
||||
}
|
||||
}
|
||||
void registerEvent(RawEvent event) throws IOException;
|
||||
|
||||
@Override
|
||||
public int read(ByteBuffer dst) throws IOException {
|
||||
assert !connection.channel().isBlocking();
|
||||
return connection.read(dst);
|
||||
}
|
||||
int read(ByteBuffer dst) throws IOException;
|
||||
|
||||
@Override
|
||||
public boolean isOpen() {
|
||||
return connection.isOpen();
|
||||
}
|
||||
long write(ByteBuffer[] src, int offset, int len) throws IOException;
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
connection.close();
|
||||
}
|
||||
boolean isOpen();
|
||||
|
||||
@Override
|
||||
public long write(ByteBuffer[] src) throws IOException {
|
||||
return connection.write(src, 0, src.length);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long write(ByteBuffer[] src, int offset, int len)
|
||||
throws IOException {
|
||||
return connection.write(src, offset, len);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int write(ByteBuffer src) throws IOException {
|
||||
return (int) connection.write(src);
|
||||
}
|
||||
void close() throws IOException;
|
||||
}
|
||||
|
||||
@ -0,0 +1,113 @@
|
||||
/*
|
||||
* Copyright (c) 2016, Oracle and/or its affiliates. All rights reserved.
|
||||
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
|
||||
*
|
||||
* This code is free software; you can redistribute it and/or modify it
|
||||
* under the terms of the GNU General Public License version 2 only, as
|
||||
* published by the Free Software Foundation. Oracle designates this
|
||||
* particular file as subject to the "Classpath" exception as provided
|
||||
* by Oracle in the LICENSE file that accompanied this code.
|
||||
*
|
||||
* This code is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
|
||||
* version 2 for more details (a copy is included in the LICENSE file that
|
||||
* accompanied this code).
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License version
|
||||
* 2 along with this work; if not, write to the Free Software Foundation,
|
||||
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
|
||||
*
|
||||
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
|
||||
* or visit www.oracle.com if you need additional information or have any
|
||||
*/
|
||||
package java.net.http;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.SelectableChannel;
|
||||
import java.nio.channels.SelectionKey;
|
||||
import java.nio.channels.SocketChannel;
|
||||
|
||||
/*
|
||||
* Each RawChannel corresponds to a TCP connection (SocketChannel) but is
|
||||
* connected to a Selector and an ExecutorService for invoking the send and
|
||||
* receive callbacks. Also includes SSL processing.
|
||||
*/
|
||||
final class RawChannelImpl implements RawChannel {
|
||||
|
||||
private final HttpClientImpl client;
|
||||
private final HttpConnection connection;
|
||||
|
||||
RawChannelImpl(HttpClientImpl client, HttpConnection connection)
|
||||
throws IOException {
|
||||
this.client = client;
|
||||
this.connection = connection;
|
||||
SocketChannel chan = connection.channel();
|
||||
client.cancelRegistration(chan);
|
||||
chan.configureBlocking(false);
|
||||
}
|
||||
|
||||
private class NonBlockingRawAsyncEvent extends AsyncEvent {
|
||||
|
||||
private final RawEvent re;
|
||||
|
||||
NonBlockingRawAsyncEvent(RawEvent re) {
|
||||
super(0); // !BLOCKING & !REPEATING
|
||||
this.re = re;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SelectableChannel channel() {
|
||||
return connection.channel();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int interestOps() {
|
||||
return re.interestOps();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handle() {
|
||||
re.handle();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void abort() { }
|
||||
}
|
||||
|
||||
@Override
|
||||
public void registerEvent(RawEvent event) throws IOException {
|
||||
if ((event.interestOps() & SelectionKey.OP_READ) != 0
|
||||
&& connection.buffer.hasRemaining()) {
|
||||
// FIXME: a hack to deal with leftovers from previous reads into an
|
||||
// internal buffer (works in conjunction with change in
|
||||
// java.net.http.PlainHttpConnection.readImpl(java.nio.ByteBuffer)
|
||||
connection.channel().configureBlocking(false);
|
||||
event.handle();
|
||||
} else {
|
||||
client.registerEvent(new NonBlockingRawAsyncEvent(event));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int read(ByteBuffer dst) throws IOException {
|
||||
assert !connection.channel().isBlocking();
|
||||
return connection.read(dst);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long write(ByteBuffer[] src, int offset, int len) throws IOException {
|
||||
return connection.write(src, offset, len);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isOpen() {
|
||||
return connection.isOpen();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
connection.close();
|
||||
}
|
||||
}
|
||||
@ -53,7 +53,7 @@ final class WSReceiver {
|
||||
private final Supplier<WSShared<ByteBuffer>> buffersSupplier =
|
||||
new WSSharedPool<>(() -> ByteBuffer.allocateDirect(32768), 2);
|
||||
private final RawChannel channel;
|
||||
private final RawChannel.NonBlockingEvent channelEvent;
|
||||
private final RawChannel.RawEvent channelEvent;
|
||||
private final WSSignalHandler handler;
|
||||
private final AtomicLong demand = new AtomicLong();
|
||||
private final AtomicBoolean readable = new AtomicBoolean();
|
||||
@ -251,8 +251,8 @@ final class WSReceiver {
|
||||
assert newDemand >= 0 : newDemand;
|
||||
}
|
||||
|
||||
private RawChannel.NonBlockingEvent createChannelEvent() {
|
||||
return new RawChannel.NonBlockingEvent() {
|
||||
private RawChannel.RawEvent createChannelEvent() {
|
||||
return new RawChannel.RawEvent() {
|
||||
|
||||
@Override
|
||||
public int interestOps() {
|
||||
|
||||
@ -60,7 +60,7 @@ import static java.util.Objects.requireNonNull;
|
||||
final class WSWriter {
|
||||
|
||||
private final RawChannel channel;
|
||||
private final RawChannel.NonBlockingEvent writeReadinessHandler;
|
||||
private final RawChannel.RawEvent writeReadinessHandler;
|
||||
private final Consumer<Throwable> completionCallback;
|
||||
private ByteBuffer[] buffers;
|
||||
private int offset;
|
||||
@ -110,8 +110,8 @@ final class WSWriter {
|
||||
return -1;
|
||||
}
|
||||
|
||||
private RawChannel.NonBlockingEvent createHandler() {
|
||||
return new RawChannel.NonBlockingEvent() {
|
||||
private RawChannel.RawEvent createHandler() {
|
||||
return new RawChannel.RawEvent() {
|
||||
|
||||
@Override
|
||||
public int interestOps() {
|
||||
|
||||
@ -78,7 +78,7 @@ public class SelectorTest {
|
||||
|
||||
final RawChannel chan = getARawChannel(port);
|
||||
|
||||
chan.registerEvent(new RawChannel.NonBlockingEvent() {
|
||||
chan.registerEvent(new RawChannel.RawEvent() {
|
||||
@Override
|
||||
public int interestOps() {
|
||||
return SelectionKey.OP_READ;
|
||||
@ -95,7 +95,7 @@ public class SelectorTest {
|
||||
}
|
||||
});
|
||||
|
||||
chan.registerEvent(new RawChannel.NonBlockingEvent() {
|
||||
chan.registerEvent(new RawChannel.RawEvent() {
|
||||
@Override
|
||||
public int interestOps() {
|
||||
return SelectionKey.OP_WRITE;
|
||||
@ -111,7 +111,7 @@ public class SelectorTest {
|
||||
ByteBuffer bb = ByteBuffer.wrap(TestServer.INPUT);
|
||||
counter.incrementAndGet();
|
||||
try {
|
||||
chan.write(bb);
|
||||
chan.write(new ByteBuffer[]{bb}, 0, 1);
|
||||
} catch (IOException e) {
|
||||
throw new UncheckedIOException(e);
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user