Review feedback

This commit is contained in:
Daniel Fuchs 2026-01-29 11:18:42 +00:00
parent 51dd06ebaa
commit 4fa0b27af1

View File

@ -74,7 +74,6 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
@ -94,11 +93,8 @@ import static org.testng.Assert.assertTrue;
public class CancelRequestTest implements HttpServerAdapters {
private static final Random random = RandomFactory.getRandom();
private static final ConcurrentHashMap<String, Semaphore> semaphores
private static final ConcurrentHashMap<String, CountDownLatch> latches
= new ConcurrentHashMap<>();
// A number big enough to make sure nothing else could
// be blocked trying to acquire a released semaphore.
private static final int RELEASE_ALL = 1000;
private static final SSLContext sslContext = SimpleSSLContext.findSSLContext();
HttpTestServer httpTestServer; // HTTP/1.1 [ 4 servers ]
@ -333,22 +329,22 @@ public class CancelRequestTest implements HttpServerAdapters {
assertEquals(resp.statusCode(), 200);
}
private static void releaseSemaphores() {
// release left over semaphores
for (var sem : semaphores.values()) {
sem.release(RELEASE_ALL);
private static void releaseLatches() {
// release left over latches
for (var latch : latches.values()) {
latch.countDown();
}
semaphores.clear();
latches.clear();
}
private static Semaphore addSemaphoreFor(HttpRequest req) {
// release left over semaphores
releaseSemaphores();
private static CountDownLatch addLatchFor(HttpRequest req) {
// release left over latches
releaseLatches();
String key = Objects.requireNonNull(req.uri().getRawQuery(), "query");
var sem = new Semaphore(0);
semaphores.put(key, sem);
out.println(now() + "Semaphore " + sem + " added for " + req.uri());
return sem;
var latch = new CountDownLatch(1);
latches.put(key, latch);
out.println(now() + "CountDownLatch " + latch + " added for " + req.uri());
return latch;
}
@Test(dataProvider = "asyncurls")
@ -370,7 +366,7 @@ public class CancelRequestTest implements HttpServerAdapters {
.GET()
.setOption(H3_DISCOVERY, config)
.build();
var sem = addSemaphoreFor(req);
var requestLatch = addLatchFor(req);
BodyHandler<String> handler = BodyHandlers.ofString();
CountDownLatch latch = new CountDownLatch(1);
@ -389,13 +385,13 @@ public class CancelRequestTest implements HttpServerAdapters {
out.println("cf2 after cancel: " + cf2);
try {
String body = cf2.get().body();
sem.release(RELEASE_ALL);
assertEquals(body, String.join("", BODY.split("\\|")));
throw new AssertionError("Expected CancellationException not received");
} catch (ExecutionException x) {
sem.release(RELEASE_ALL);
out.println(now() + "Got expected exception: " + x);
assertTrue(isCancelled(x));
} finally {
requestLatch.countDown();
}
// Cancelling the request may cause an IOException instead...
@ -484,8 +480,7 @@ public class CancelRequestTest implements HttpServerAdapters {
Iterable<byte[]> iterable = new Iterable<>() {
@Override
public Iterator<byte[]> iterator() {
// this is dangerous - so we use a virtual thread
// to avoid starving the executor
// this is dangerous
out.println(now() + "waiting for completion on: " + cancelFuture);
boolean async = random.nextBoolean();
Runnable cancel = () -> {
@ -494,7 +489,7 @@ public class CancelRequestTest implements HttpServerAdapters {
cf1.cancel(mayInterruptIfRunning);
out.println(now() + "cancelled " + cf1);
};
if (async) Thread.ofVirtual().start(cancel);
if (async) executor.execute(cancel);
else cancel.run();
return List.of(BODY.getBytes(UTF_8)).iterator();
}
@ -507,7 +502,7 @@ public class CancelRequestTest implements HttpServerAdapters {
.POST(HttpRequest.BodyPublishers.ofByteArrays(iterable))
.setOption(H3_DISCOVERY, config)
.build();
var sem = addSemaphoreFor(req);
var requestLatch = addLatchFor(req);
BodyHandler<String> handler = BodyHandlers.ofString();
CountDownLatch latch = new CountDownLatch(1);
@ -525,13 +520,13 @@ public class CancelRequestTest implements HttpServerAdapters {
out.println("cf2 after cancel: " + cf2);
try {
String body = cf2.get().body();
sem.release(RELEASE_ALL);
assertEquals(body, String.join("", BODY.split("\\|")));
throw new AssertionError("Expected CancellationException not received");
} catch (ExecutionException x) {
sem.release(RELEASE_ALL);
out.println(now() + "Got expected exception: " + x);
assertTrue(isCancelled(x));
} finally {
requestLatch.countDown();
}
// Cancelling the request may cause an IOException instead...
@ -620,7 +615,7 @@ public class CancelRequestTest implements HttpServerAdapters {
};
Iterable<byte[]> iterable = () -> {
var async = random.nextBoolean();
if (async) Thread.ofVirtual().start(interrupt);
if (async) executor.execute(interrupt);
else interrupt.run();
return List.of(BODY.getBytes(UTF_8)).iterator();
};
@ -632,7 +627,7 @@ public class CancelRequestTest implements HttpServerAdapters {
.POST(HttpRequest.BodyPublishers.ofByteArrays(iterable))
.setOption(H3_DISCOVERY, config)
.build();
var sem = addSemaphoreFor(req);
var requestLatch = addLatchFor(req);
String body = null;
Exception failed = null;
@ -642,7 +637,7 @@ public class CancelRequestTest implements HttpServerAdapters {
} catch (Exception x) {
failed = x;
}
sem.release(RELEASE_ALL);
requestLatch.countDown();
out.println(now() + req.uri() + ": got result or exception");
if (failed instanceof InterruptedException) {
out.println(now() + req.uri() + ": Got expected exception: " + failed);
@ -763,7 +758,7 @@ public class CancelRequestTest implements HttpServerAdapters {
try {
out.println(now() + "HTTPSlowHandler received request to " + t.getRequestURI());
System.err.println(now() + "HTTPSlowHandler received request to " + t.getRequestURI());
var sem = semaphores.get(t.getRequestURI().getRawQuery());
var requestLatch = latches.get(t.getRequestURI().getRawQuery());
boolean isThreadInterrupt = isThreadInterrupt(t);
byte[] req;
try (InputStream is = t.getRequestBody()) {
@ -787,15 +782,15 @@ public class CancelRequestTest implements HttpServerAdapters {
}
out.printf(now() + "Server wrote %d bytes%n", req.length);
}
if (sem != null) {
out.printf(now() + "Server waiting to acquire %s for %s%n",
sem, t.getRequestURI());
if (requestLatch != null) {
out.printf(now() + "Server awaiting latch %s for %s%n",
requestLatch, t.getRequestURI());
try {
sem.acquire();
requestLatch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
out.printf(now() + " ...sem acquired%n");
out.printf(now() + " ...latch released%n");
}
}
} catch (Throwable e) {