Relax orderings in push

This commit is contained in:
Doug Lea 2025-12-16 14:06:14 -05:00
parent 61adb1dc2a
commit 7cc2dd1a0d

View File

@ -1254,55 +1254,52 @@ public class ForkJoinPool extends AbstractExecutorService
* @throws RejectedExecutionException if array could not be resized
*/
final void push(ForkJoinTask<?> task, ForkJoinPool pool, boolean internal) {
int s = top, b = base, m, cap, room; ForkJoinTask<?>[] a;
if ((a = array) != null && (cap = a.length) > 0) { // else disabled
if ((room = (m = cap - 1) - (s - b)) >= 0) {
top = s + 1;
long pos = slotOffset(m & s);
if (!internal)
U.putReference(a, pos, task); // inside lock
else
U.getAndSetReference(a, pos, task); // fully fenced
if (room == 0)
growArray(a, cap, s);
}
int s = top++; // back out on failure
ForkJoinTask<?>[] a = array;
int size = s - base + 1, m;
if (((a != null && a.length > size) || (a = growArray(a, s)) != null) &&
(m = a.length - 1) >= 0) {
a[m & s] = task;
if (!internal)
unlockPhase();
if (room < 0)
throw new RejectedExecutionException("Queue capacity exceeded");
if (pool != null &&
(room == 0 ||
U.getReferenceAcquire(a, slotOffset(m & (s - 1))) == null))
pool.signalWork(this, s); // may have appeared empty
if (U.getReferenceAcquire(a, slotOffset(m & (s - 1))) == null &&
pool != null)
pool.signalWork(this, s); // may have appeared empty
}
}
/**
* Resizes the queue array unless out of memory.
* @param a old array
* @param cap old array capacity
* @param s current top
* @return new array (or throws on OOME)
*/
private void growArray(ForkJoinTask<?>[] a, int cap, int s) {
int newCap = (cap >= 1 << 16) ? cap << 1 : cap << 2;
ForkJoinTask<?>[] newArray = null;
if (a != null && a.length == cap && cap > 0 && newCap > 0) {
private ForkJoinTask<?>[] growArray(ForkJoinTask<?>[] a, int s) {
int cap, newCap;
if (a != null && (cap = a.length) > 0 &&
(newCap = (cap >= 1 << 16) ? cap << 1 : cap << 2) > 0) {
ForkJoinTask<?>[] newArray = null;
try {
newArray = new ForkJoinTask<?>[newCap];
} catch (OutOfMemoryError ex) {
}
if (newArray != null) { // else throw on next push
if (newArray != null) {
int mask = cap - 1, newMask = newCap - 1;
for (int k = s, j = cap; j > 0; --j, --k) {
for (int k = s - 1, j = cap; j > 0; --j, --k) {
ForkJoinTask<?> u; // poll old, push to new
if ((u = (ForkJoinTask<?>)U.getAndSetReference(
a, slotOffset(k & mask), null)) == null)
break; // lost to pollers
newArray[k & newMask] = u;
}
updateArray(newArray); // fully fenced
updateArray(newArray); // fully fenced
return newArray;
}
}
top = s; // back out
if ((phase & 1) == 0) // unlock if external
unlockPhase();
throw new RejectedExecutionException("Queue capacity exceeded");
}
/**