8177935: java/net/httpclient/http2/FixedThreadPoolTest.java fails frequently

Fixes a race condition in AsyncWriteQueue

Reviewed-by: chegar
This commit is contained in:
Daniel Fuchs 2017-08-17 16:48:45 +01:00
parent d7cd632a4a
commit ffe7f7d107
4 changed files with 28 additions and 12 deletions

View File

@ -196,7 +196,7 @@ class AsyncSSLDelegate implements ExceptionallyCloseable, AsyncConnection {
* This same method is called to try and resume output after a blocking
* handshaking operation has completed.
*/
private void upperWrite(ByteBufferReference[] refs, AsyncWriteQueue delayCallback) {
private boolean upperWrite(ByteBufferReference[] refs, AsyncWriteQueue delayCallback) {
// currently delayCallback is not used. Use it when it's needed to execute handshake in another thread.
try {
ByteBuffer[] buffers = ByteBufferReference.toBuffers(refs);
@ -230,6 +230,9 @@ class AsyncSSLDelegate implements ExceptionallyCloseable, AsyncConnection {
closeExceptionally(t);
errorHandler.accept(t);
}
// We always return true: either all the data was sent, or
// an exception happened and we have closed the queue.
return true;
}
// Connecting at this level means the initial handshake has completed.

View File

@ -231,7 +231,7 @@ class PlainHttpConnection extends HttpConnection implements AsyncConnection {
assert false;
}
void asyncOutput(ByteBufferReference[] refs, AsyncWriteQueue delayCallback) {
boolean asyncOutput(ByteBufferReference[] refs, AsyncWriteQueue delayCallback) {
try {
ByteBuffer[] bufs = ByteBufferReference.toBuffers(refs);
while (Utils.remaining(bufs) > 0) {
@ -239,13 +239,14 @@ class PlainHttpConnection extends HttpConnection implements AsyncConnection {
if (n == 0) {
delayCallback.setDelayed(refs);
client.registerEvent(new WriteEvent());
return;
return false;
}
}
ByteBufferReference.clear(refs);
} catch (IOException e) {
shutdown();
}
return true;
}
@Override

View File

@ -27,17 +27,31 @@ package jdk.incubator.http.internal.common;
import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Deque;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
public class AsyncWriteQueue implements Closeable {
@FunctionalInterface
public static interface AsyncConsumer {
/**
* Takes an array of buffer reference and attempt to send the data
* downstream. If not all the data can be sent, then push back
* to the source queue by calling {@code source.setDelayed(buffers)}
* and return false. If all the data was successfully sent downstream
* then returns true.
* @param buffers An array of ButeBufferReference containing data
* to send downstream.
* @param source This AsyncWriteQueue.
* @return true if all the data could be sent downstream, false otherwise.
*/
boolean trySend(ByteBufferReference[] buffers, AsyncWriteQueue source);
}
private static final int IDLE = 0; // nobody is flushing from the queue
private static final int FLUSHING = 1; // there is the only thread flushing from the queue
private static final int REFLUSHING = 2; // while one thread was flushing from the queue
@ -51,7 +65,7 @@ public class AsyncWriteQueue implements Closeable {
private final AtomicInteger state = new AtomicInteger(IDLE);
private final Deque<ByteBufferReference[]> queue = new ConcurrentLinkedDeque<>();
private final BiConsumer<ByteBufferReference[], AsyncWriteQueue> consumeAction;
private final AsyncConsumer consumeAction;
// Queue may be processed in two modes:
// 1. if(!doFullDrain) - invoke callback on each chunk
@ -60,11 +74,11 @@ public class AsyncWriteQueue implements Closeable {
private ByteBufferReference[] delayedElement = null;
public AsyncWriteQueue(BiConsumer<ByteBufferReference[], AsyncWriteQueue> consumeAction) {
public AsyncWriteQueue(AsyncConsumer consumeAction) {
this(consumeAction, true);
}
public AsyncWriteQueue(BiConsumer<ByteBufferReference[], AsyncWriteQueue> consumeAction, boolean doFullDrain) {
public AsyncWriteQueue(AsyncConsumer consumeAction, boolean doFullDrain) {
this.consumeAction = consumeAction;
this.doFullDrain = doFullDrain;
}
@ -156,8 +170,7 @@ public class AsyncWriteQueue implements Closeable {
}
while(true) {
while (element != null) {
consumeAction.accept(element, this);
if (state.get() == DELAYED) {
if (!consumeAction.trySend(element, this)) {
return;
}
element = drain();

View File

@ -23,8 +23,7 @@
/*
* @test
* @bug 8087112
* @key intermittent
* @bug 8087112 8177935
* @library /lib/testlibrary server
* @build jdk.testlibrary.SimpleSSLContext
* @modules jdk.incubator.httpclient/jdk.incubator.http.internal.common