From 8093563bce03b2a7dcea175f1e71cfd67de1c235 Mon Sep 17 00:00:00 2001 From: Daniel Fuchs Date: Mon, 2 Oct 2023 13:06:43 +0000 Subject: [PATCH] 8317295: ResponseSubscribers.SubscriberAdapter should call the finisher function asynchronously Reviewed-by: djelinski --- .../net/http/ResponseSubscribers.java | 2 +- .../httpclient/FlowAdapterSubscriberTest.java | 83 ++++++++++++++++--- 2 files changed, 73 insertions(+), 12 deletions(-) diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/ResponseSubscribers.java b/src/java.net.http/share/classes/jdk/internal/net/http/ResponseSubscribers.java index fe10ef58d09..ed95f815913 100644 --- a/src/java.net.http/share/classes/jdk/internal/net/http/ResponseSubscribers.java +++ b/src/java.net.http/share/classes/jdk/internal/net/http/ResponseSubscribers.java @@ -780,7 +780,7 @@ public class ResponseSubscribers { subscriber.onComplete(); } finally { try { - cf.complete(finisher.apply(subscriber)); + cf.completeAsync(() -> finisher.apply(subscriber)); } catch (Throwable throwable) { cf.completeExceptionally(throwable); } diff --git a/test/jdk/java/net/httpclient/FlowAdapterSubscriberTest.java b/test/jdk/java/net/httpclient/FlowAdapterSubscriberTest.java index 9d4efa2a351..8319204f5c2 100644 --- a/test/jdk/java/net/httpclient/FlowAdapterSubscriberTest.java +++ b/test/jdk/java/net/httpclient/FlowAdapterSubscriberTest.java @@ -26,15 +26,21 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.io.UncheckedIOException; +import java.lang.StackWalker.StackFrame; import java.net.URI; import java.net.http.HttpClient.Builder; import java.net.http.HttpClient.Version; import java.nio.ByteBuffer; import java.util.Collection; import java.util.List; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; import java.util.concurrent.Flow; import java.util.concurrent.Flow.Subscriber; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; +import java.util.function.Predicate; import java.util.function.Supplier; import java.net.http.HttpClient; import java.net.http.HttpRequest; @@ -57,6 +63,7 @@ import static org.testng.Assert.assertTrue; /* * @test + * @bug 8193365 8317295 * @summary Basic tests for Flow adapter Subscribers * @library /test/lib /test/jdk/java/net/httpclient/lib * @build jdk.httpclient.test.lib.common.HttpServerAdapters @@ -76,6 +83,9 @@ public class FlowAdapterSubscriberTest implements HttpServerAdapters { String http2URI; String https2URI; + static final StackWalker WALKER = + StackWalker.getInstance(StackWalker.Option.RETAIN_CLASS_REFERENCE); + static final long start = System.nanoTime(); public static String now() { long now = System.nanoTime() - start; @@ -264,8 +274,8 @@ public class FlowAdapterSubscriberTest implements HttpServerAdapters { } @Test(dataProvider = "uris") - void testCollectionWithoutFinisheBlocking(String uri) throws Exception { - System.out.printf(now() + "testCollectionWithoutFinisheBlocking(%s) starting%n", uri); + void testCollectionWithoutFinisherBlocking(String uri) throws Exception { + System.out.printf(now() + "testCollectionWithoutFinisherBlocking(%s) starting%n", uri); try (HttpClient client = newHttpClient(uri)) { HttpRequest request = newRequestBuilder(uri) .POST(BodyPublishers.ofString("What's the craic?")).build(); @@ -455,18 +465,69 @@ public class FlowAdapterSubscriberTest implements HttpServerAdapters { HttpRequest request = newRequestBuilder(uri) .POST(BodyPublishers.ofString("May the wind always be at your back.")).build(); - client.sendAsync(request, BodyHandlers.fromSubscriber(BodySubscribers.ofInputStream(), - ins -> { - InputStream is = ins.getBody().toCompletableFuture().join(); - return new String(uncheckedReadAllBytes(is), UTF_8); - })) - .thenApply(FlowAdapterSubscriberTest::assert200ResponseCode) - .thenApply(HttpResponse::body) - .thenAccept(body -> assertEquals(body, "May the wind always be at your back.")) - .join(); + var adaptee = BodySubscribers.ofInputStream(); + var exec = Executors.newSingleThreadExecutor(); + + // Use an executor to pull on the InputStream in order to reach the + // point where the Subscriber gets completed and the finisher function + // is called. If we didn't use an executor here, the finisher function + // may never get called. + var futureResult = exec.submit(() -> uncheckedReadAllBytes( + adaptee.getBody().toCompletableFuture().join())); + Supplier bytes = () -> { + try { + return futureResult.get(); + } catch (InterruptedException e) { + throw new CompletionException(e); + } catch (ExecutionException e) { + throw new CompletionException(e.getCause()); + } + }; + + AtomicReference failed = new AtomicReference<>(); + Function>, String> finisher = (s) -> { + failed.set(checkThreadAndStack()); + return new String(bytes.get(), UTF_8); + }; + + try { + var cf = client.sendAsync(request, BodyHandlers.fromSubscriber(adaptee, + finisher)) + .thenApply(FlowAdapterSubscriberTest::assert200ResponseCode) + .thenApply(HttpResponse::body) + .thenAccept(body -> assertEquals(body, "May the wind always be at your back.")) + .join(); + var error = failed.get(); + if (error != null) throw error; + } finally { + exec.close(); + } } } + static final Predicate DAT = sfe -> + sfe.getClassName().startsWith("FlowAdapterSubscriberTest"); + static final Predicate JUC = sfe -> + sfe.getClassName().startsWith("java.util.concurrent"); + static final Predicate JLT = sfe -> + sfe.getClassName().startsWith("java.lang.Thread"); + static final Predicate RSP = sfe -> + sfe.getClassName().startsWith("jdk.internal.net.http.ResponseSubscribers"); + static final Predicate NotDATorJUCorJLT = Predicate.not(DAT.or(JUC).or(JLT).or(RSP)); + + + AssertionError checkThreadAndStack() { + System.out.println("Check stack trace"); + List otherFrames = WALKER.walk(s -> s.filter(NotDATorJUCorJLT).toList()); + if (!otherFrames.isEmpty()) { + System.out.println("Found unexpected trace: "); + otherFrames.forEach(f -> System.out.printf("\t%s%n", f)); + return new AssertionError("Dependant action has unexpected frame in " + + Thread.currentThread() + ": " + otherFrames.get(0)); + } + return null; + } + /** An abstract Subscriber that converts all received data into a String. */ static abstract class AbstractSubscriber implements Supplier { protected volatile Flow.Subscription subscription;