Address review comments

This commit is contained in:
Doug Lea 2025-12-13 10:24:55 -05:00
parent 094d7d758d
commit 496cef3815

View File

@ -1428,53 +1428,58 @@ public class ForkJoinPool extends AbstractExecutorService
/**
* Runs the given task, as well as remaining local tasks, and
* those from the given queue that can be polled without interference.
*
* @param task the top-level task
* @param q the WorkQueue from which task was taken
* @param fifo nonzero if fifo mode
* @param qbase the next base index of q to take;
* returning if no such task or already taken
* @return number of top-level tasks stolen from q
*/
final int topLevelExec(ForkJoinTask<?> task, WorkQueue q,
int fifo, int qbase) {
if (task == null || q == null)
return 0; // currently impossible
int stolen = 1;
if (task != null && q != null) {
outer: for (;;) {
task.doExec();
task = null;
int p = top, cap; ForkJoinTask<?>[] a;
if ((a = array) == null || (cap = a.length) <= 0)
break;
if (fifo == 0) { // specialized localPop
int s = p - 1; long k;
if (U.getReference(
a, k = slotOffset((cap - 1) & s)) != null &&
(task = (ForkJoinTask<?>)
U.getAndSetReference(a, k, null)) != null) {
top = s;
continue;
}
} else { // specialized localPoll
for (int b = base; p - b > 0; ) {
int nb = b + 1;
if ((task = (ForkJoinTask<?>)U.getAndSetReference(
a, slotOffset((cap - 1) & b), null)) != null) {
base = nb;
continue outer;
}
if (nb == p)
break;
while (b == (b = U.getIntAcquire(this, BASE)))
Thread.onSpinWait();
}
outer: for (;;) {
task.doExec();
task = null;
int p = top, cap; ForkJoinTask<?>[] a;
if ((a = array) == null || (cap = a.length) <= 0)
break; // currently impossible
if (fifo == 0) { // specialized localPop
int s = p - 1; long k;
if (U.getReference(
a, k = slotOffset((cap - 1) & s)) != null &&
(task = (ForkJoinTask<?>)
U.getAndSetReference(a, k, null)) != null) {
top = s;
continue;
}
} else { // specialized localPoll
for (int b = base; p - b > 0; ) {
int nb = b + 1;
if ((task = (ForkJoinTask<?>)U.getAndSetReference(
a, slotOffset((cap - 1) & b), null)) != null) {
base = nb;
continue outer;
}
if (nb == p)
break;
while (b == (b = U.getIntAcquire(this, BASE)))
Thread.onSpinWait();
}
// one-shot steal attempt
ForkJoinTask<?> t; int qcap; long qk;
ForkJoinTask<?>[] qa = q.array;
if (q.base != qbase || qa == null || (qcap = qa.length) <= 0 ||
(t = (ForkJoinTask<?>)U.getReferenceAcquire(
qa, qk = slotOffset((qcap - 1) & qbase))) == null ||
q.base != qbase ||
!U.compareAndSetReference(qa, qk, t, null))
break;
q.base = ++qbase;
++stolen;
task = t;
}
// try (once) to steal at q's next base index
ForkJoinTask<?>[] qa = q.array; int qcap; long qk;
if (q.base != qbase || qa == null || (qcap = qa.length) <= 0 ||
(task = (ForkJoinTask<?>)U.getReferenceAcquire(
qa, qk = slotOffset((qcap - 1) & qbase))) == null ||
q.base != qbase || // ensure valid read
!U.compareAndSetReference(qa, qk, task, null))
break;
q.base = ++qbase;
++stolen;
}
return stolen;
}
@ -1896,6 +1901,9 @@ public class ForkJoinPool extends AbstractExecutorService
/**
* Releases an idle worker, or creates one if not enough exist,
* giving up q is nonull and signalled slot already taken.
*
* @param q, if nonnull, the WorkQueue containing signalled task
* @param qbase q's base index for the task
*/
final void signalWork(WorkQueue q, int qbase) {
int pc = parallelism;
@ -1920,7 +1928,7 @@ public class ForkJoinPool extends AbstractExecutorService
else
nc = (v.stackPred & LMASK) | (c & TC_MASK) | ac;
if (q != null && q.base - qbase > 0)
break;
break;
if (c == (c = ctl) && c == (c = compareAndExchangeCtl(c, nc))) {
if (v == null)
createWorker();