8370887: DelayScheduler.replace method may break the 4-ary heap in certain scenarios

Co-authored-by: Doug Lea <dl@openjdk.org>
Reviewed-by: vklang
This commit is contained in:
Alan Bateman 2025-11-25 07:05:46 +00:00
parent dea95e65a2
commit cc5b35bf69
2 changed files with 191 additions and 18 deletions

View File

@ -360,31 +360,36 @@ final class DelayScheduler extends Thread {
u.heapIndex = -1;
}
}
if (t != null) { // sift down
for (int cs; (cs = (k << 2) + 1) < n; ) {
ScheduledForkJoinTask<?> leastChild = null, c;
if (t != null) {
while (k > 0) { // sift up if replaced with smaller value
ScheduledForkJoinTask<?> parent; int pk;
if ((parent = h[pk = (k - 1) >>> 2]) == null ||
parent.when <= d)
break;
parent.heapIndex = k;
h[k] = parent;
k = pk;
}
for (int cs; (cs = (k << 2) + 1) < n; ) { // sift down
ScheduledForkJoinTask<?> leastChild = null;
int leastIndex = 0;
long leastValue = Long.MAX_VALUE;
for (int ck = cs, j = 4;;) { // at most 4 children
if ((c = h[ck]) == null)
break;
long cd = c.when;
if (c.status < 0 && alsoReplace < 0) {
alsoReplace = ck; // at most once per pass
c.heapIndex = -1;
}
else if (leastChild == null || cd < leastValue) {
long leastValue = d; // at most 4 children
for (int ck, j = 0; j < 4 && (ck = j + cs) < n; ++j) {
ScheduledForkJoinTask<?> c; long cd;
if ((c = h[ck]) != null && (cd = c.when) < leastValue) {
leastValue = cd;
leastIndex = ck;
leastChild = c;
}
if (--j == 0 || ++ck >= n)
break;
}
if (leastChild == null || d <= leastValue)
if (leastChild == null) // already ordered
break;
leastChild.heapIndex = k;
h[k] = leastChild;
if ((h[k] = leastChild).status >= 0 || alsoReplace >= 0)
leastChild.heapIndex = k;
else {
leastChild.heapIndex = -1;
alsoReplace = k;
}
k = leastIndex;
}
t.heapIndex = k;
@ -393,6 +398,7 @@ final class DelayScheduler extends Thread {
k = alsoReplace;
}
}
assert checkHeap(h, n);
return n;
}
@ -451,6 +457,33 @@ final class DelayScheduler extends Thread {
}
}
/**
* Invariant checks
*/
private static boolean checkHeap(ScheduledForkJoinTask<?>[] h, int n) {
for (int i = 0; i < h.length; ++i) {
ScheduledForkJoinTask<?> t = h[i];
if (t == null) { // unused slots all null
if (i < n)
return false;
}
else {
long v = t.when;
int x = t.heapIndex;
if (x != i && x >= 0) // valid index unless removing
return false;
if (i > 0 && h[(i - 1) >>> 2].when > v) // ordered wrt parent
return false;
int cs = (i << 2) + 1; // ordered wrt children
for (int ck, j = 0; j < 4 && (ck = cs + j) < n; ++j) {
if (h[ck].when < v)
return false;
}
}
}
return true;
}
/**
* Task class for DelayScheduler operations
*/

View File

@ -0,0 +1,140 @@
/*
* Copyright (c) 2025, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
/*
* @test
* @bug 8370887
* @summary Test that cancelling a delayed task doesn't impact the ordering that other
* delayed tasks execute
*/
import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedTransferQueue;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
public class AscendingOrderAfterReplace {
private static final int[] DELAYS_IN_MS = { 3000, 3400, 3900, 3800, 3700, 3600, 3430, 3420, 3310, 3500, 3200 };
public static void main(String[] args) throws Exception {
for (int i = 1; i < DELAYS_IN_MS.length; i++) {
System.out.println("=== Test " + i + " ===");
while (!testCancel(DELAYS_IN_MS, i)) { }
}
}
/**
* Schedule the delayed tasks, cancel one of them, and check that the remaining tasks
* execute in the ascending order of delay.
* @return true if the test passed, false if a retry is needed
* @throws RuntimeException if the test fails
*/
private static boolean testCancel(int[] delays, int indexToCancel) throws Exception {
log("Delayed tasks: " + toString(delays));
// delayed tasks add to this queue when they execute
var queue = new LinkedTransferQueue<Integer>();
// pool with one thread to ensure that delayed tasks don't execute concurrently
try (var pool = new ForkJoinPool(1)) {
long startNanos = System.nanoTime();
Future<?>[] futures = Arrays.stream(delays)
.mapToObj(d -> pool.schedule(() -> {
log("Triggered " + d);
queue.add(d);
}, d, MILLISECONDS))
.toArray(Future[]::new);
long endNanos = System.nanoTime();
log("Delayed tasks submitted");
// check submit took < min diffs between two delays
long submitTime = Duration.ofNanos(endNanos - startNanos).toMillis();
long minDiff = minDifference(delays);
if (submitTime >= minDiff) {
log("Submit took >= " + minDiff + " ms, need to retry");
pool.shutdownNow();
return false;
}
// give a bit of time for -delayScheduler thread to process pending tasks
Thread.sleep(minValue(delays) / 2);
log("Cancel " + delays[indexToCancel]);
futures[indexToCancel].cancel(true);
}
// delayed tasks should have executed in ascending order of their delay
int[] executed = queue.stream().mapToInt(Integer::intValue).toArray();
log("Executed: " + toString(executed));
if (!isAscendingOrder(executed)) {
throw new RuntimeException("Not in ascending order!");
}
return true;
}
/**
* Return the minimum element.
*/
private static int minValue(int[] array) {
return IntStream.of(array).min().orElseThrow();
}
/**
* Return the minimum difference between any two elements.
*/
private static int minDifference(int[] array) {
int[] sorted = array.clone();
Arrays.sort(sorted);
return IntStream.range(1, sorted.length)
.map(i -> sorted[i] - sorted[i - 1])
.min()
.orElse(0);
}
/**
* Return true if the array is in ascending order.
*/
private static boolean isAscendingOrder(int[] array) {
return IntStream.range(1, array.length)
.allMatch(i -> array[i - 1] <= array[i]);
}
/**
* Returns a String containing the elements of an array in index order.
*/
private static String toString(int[] array) {
return IntStream.of(array)
.mapToObj(Integer::toString)
.collect(Collectors.joining(", ", "[", "]"));
}
private static void log(String message) {
System.out.println(Instant.now() + " " + message);
}
}