mirror of
https://github.com/openjdk/jdk.git
synced 2026-02-19 14:55:17 +00:00
8056248: Improve ForkJoin thread throttling
Reviewed-by: psandoz, martin
This commit is contained in:
parent
1300729887
commit
7eb2dc516c
File diff suppressed because it is too large
Load Diff
@ -297,15 +297,22 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
|
||||
}
|
||||
|
||||
/**
|
||||
* Tries to set SIGNAL status unless already completed. Used by
|
||||
* ForkJoinPool. Other variants are directly incorporated into
|
||||
* externalAwaitDone etc.
|
||||
* If not done, sets SIGNAL status and performs Object.wait(timeout).
|
||||
* This task may or may not be done on exit. Ignores interrupts.
|
||||
*
|
||||
* @return true if successful
|
||||
* @param timeout using Object.wait conventions.
|
||||
*/
|
||||
final boolean trySetSignal() {
|
||||
int s = status;
|
||||
return s >= 0 && U.compareAndSwapInt(this, STATUS, s, s | SIGNAL);
|
||||
final void internalWait(long timeout) {
|
||||
int s;
|
||||
if ((s = status) >= 0 && // force completer to issue notify
|
||||
U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
|
||||
synchronized (this) {
|
||||
if (status >= 0)
|
||||
try { wait(timeout); } catch (InterruptedException ie) { }
|
||||
else
|
||||
notifyAll();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@ -313,35 +320,29 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
|
||||
* @return status upon completion
|
||||
*/
|
||||
private int externalAwaitDone() {
|
||||
int s;
|
||||
ForkJoinPool cp = ForkJoinPool.common;
|
||||
if ((s = status) >= 0) {
|
||||
if (cp != null) {
|
||||
if (this instanceof CountedCompleter)
|
||||
s = cp.externalHelpComplete((CountedCompleter<?>)this, Integer.MAX_VALUE);
|
||||
else if (cp.tryExternalUnpush(this))
|
||||
s = doExec();
|
||||
}
|
||||
if (s >= 0 && (s = status) >= 0) {
|
||||
boolean interrupted = false;
|
||||
do {
|
||||
if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
|
||||
synchronized (this) {
|
||||
if (status >= 0) {
|
||||
try {
|
||||
wait();
|
||||
} catch (InterruptedException ie) {
|
||||
interrupted = true;
|
||||
}
|
||||
int s = ((this instanceof CountedCompleter) ? // try helping
|
||||
ForkJoinPool.common.externalHelpComplete(
|
||||
(CountedCompleter<?>)this, 0) :
|
||||
ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : 0);
|
||||
if (s >= 0 && (s = status) >= 0) {
|
||||
boolean interrupted = false;
|
||||
do {
|
||||
if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
|
||||
synchronized (this) {
|
||||
if (status >= 0) {
|
||||
try {
|
||||
wait(0L);
|
||||
} catch (InterruptedException ie) {
|
||||
interrupted = true;
|
||||
}
|
||||
else
|
||||
notifyAll();
|
||||
}
|
||||
else
|
||||
notifyAll();
|
||||
}
|
||||
} while ((s = status) >= 0);
|
||||
if (interrupted)
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
} while ((s = status) >= 0);
|
||||
if (interrupted)
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
return s;
|
||||
}
|
||||
@ -351,22 +352,22 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
|
||||
*/
|
||||
private int externalInterruptibleAwaitDone() throws InterruptedException {
|
||||
int s;
|
||||
ForkJoinPool cp = ForkJoinPool.common;
|
||||
if (Thread.interrupted())
|
||||
throw new InterruptedException();
|
||||
if ((s = status) >= 0 && cp != null) {
|
||||
if (this instanceof CountedCompleter)
|
||||
cp.externalHelpComplete((CountedCompleter<?>)this, Integer.MAX_VALUE);
|
||||
else if (cp.tryExternalUnpush(this))
|
||||
doExec();
|
||||
}
|
||||
while ((s = status) >= 0) {
|
||||
if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
|
||||
synchronized (this) {
|
||||
if (status >= 0)
|
||||
wait();
|
||||
else
|
||||
notifyAll();
|
||||
if ((s = status) >= 0 &&
|
||||
(s = ((this instanceof CountedCompleter) ?
|
||||
ForkJoinPool.common.externalHelpComplete(
|
||||
(CountedCompleter<?>)this, 0) :
|
||||
ForkJoinPool.common.tryExternalUnpush(this) ? doExec() :
|
||||
0)) >= 0) {
|
||||
while ((s = status) >= 0) {
|
||||
if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
|
||||
synchronized (this) {
|
||||
if (status >= 0)
|
||||
wait(0L);
|
||||
else
|
||||
notifyAll();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -386,7 +387,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
|
||||
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
|
||||
(w = (wt = (ForkJoinWorkerThread)t).workQueue).
|
||||
tryUnpush(this) && (s = doExec()) < 0 ? s :
|
||||
wt.pool.awaitJoin(w, this) :
|
||||
wt.pool.awaitJoin(w, this, 0L) :
|
||||
externalAwaitDone();
|
||||
}
|
||||
|
||||
@ -399,7 +400,8 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
|
||||
int s; Thread t; ForkJoinWorkerThread wt;
|
||||
return (s = doExec()) < 0 ? s :
|
||||
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
|
||||
(wt = (ForkJoinWorkerThread)t).pool.awaitJoin(wt.workQueue, this) :
|
||||
(wt = (ForkJoinWorkerThread)t).pool.
|
||||
awaitJoin(wt.workQueue, this, 0L) :
|
||||
externalAwaitDone();
|
||||
}
|
||||
|
||||
@ -577,7 +579,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
|
||||
Throwable ex;
|
||||
if (e == null || (ex = e.ex) == null)
|
||||
return null;
|
||||
if (false && e.thrower != Thread.currentThread().getId()) {
|
||||
if (e.thrower != Thread.currentThread().getId()) {
|
||||
Class<? extends Throwable> ec = ex.getClass();
|
||||
try {
|
||||
Constructor<?> noArgCtor = null;
|
||||
@ -587,13 +589,17 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
|
||||
Class<?>[] ps = c.getParameterTypes();
|
||||
if (ps.length == 0)
|
||||
noArgCtor = c;
|
||||
else if (ps.length == 1 && ps[0] == Throwable.class)
|
||||
return (Throwable)(c.newInstance(ex));
|
||||
else if (ps.length == 1 && ps[0] == Throwable.class) {
|
||||
Throwable wx = (Throwable)c.newInstance(ex);
|
||||
return (wx == null) ? ex : wx;
|
||||
}
|
||||
}
|
||||
if (noArgCtor != null) {
|
||||
Throwable wx = (Throwable)(noArgCtor.newInstance());
|
||||
wx.initCause(ex);
|
||||
return wx;
|
||||
if (wx != null) {
|
||||
wx.initCause(ex);
|
||||
return wx;
|
||||
}
|
||||
}
|
||||
} catch (Exception ignore) {
|
||||
}
|
||||
@ -1017,67 +1023,40 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
|
||||
*/
|
||||
public final V get(long timeout, TimeUnit unit)
|
||||
throws InterruptedException, ExecutionException, TimeoutException {
|
||||
int s;
|
||||
long nanos = unit.toNanos(timeout);
|
||||
if (Thread.interrupted())
|
||||
throw new InterruptedException();
|
||||
// Messy in part because we measure in nanosecs, but wait in millisecs
|
||||
int s; long ms;
|
||||
long ns = unit.toNanos(timeout);
|
||||
ForkJoinPool cp;
|
||||
if ((s = status) >= 0 && ns > 0L) {
|
||||
long deadline = System.nanoTime() + ns;
|
||||
ForkJoinPool p = null;
|
||||
ForkJoinPool.WorkQueue w = null;
|
||||
if ((s = status) >= 0 && nanos > 0L) {
|
||||
long d = System.nanoTime() + nanos;
|
||||
long deadline = (d == 0L) ? 1L : d; // avoid 0
|
||||
Thread t = Thread.currentThread();
|
||||
if (t instanceof ForkJoinWorkerThread) {
|
||||
ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t;
|
||||
p = wt.pool;
|
||||
w = wt.workQueue;
|
||||
p.helpJoinOnce(w, this); // no retries on failure
|
||||
s = wt.pool.awaitJoin(wt.workQueue, this, deadline);
|
||||
}
|
||||
else if ((cp = ForkJoinPool.common) != null) {
|
||||
if (this instanceof CountedCompleter)
|
||||
cp.externalHelpComplete((CountedCompleter<?>)this, Integer.MAX_VALUE);
|
||||
else if (cp.tryExternalUnpush(this))
|
||||
doExec();
|
||||
}
|
||||
boolean canBlock = false;
|
||||
boolean interrupted = false;
|
||||
try {
|
||||
while ((s = status) >= 0) {
|
||||
if (w != null && w.qlock < 0)
|
||||
cancelIgnoringExceptions(this);
|
||||
else if (!canBlock) {
|
||||
if (p == null || p.tryCompensate(p.ctl))
|
||||
canBlock = true;
|
||||
}
|
||||
else {
|
||||
if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) > 0L &&
|
||||
U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
|
||||
synchronized (this) {
|
||||
if (status >= 0) {
|
||||
try {
|
||||
wait(ms);
|
||||
} catch (InterruptedException ie) {
|
||||
if (p == null)
|
||||
interrupted = true;
|
||||
}
|
||||
}
|
||||
else
|
||||
notifyAll();
|
||||
}
|
||||
else if ((s = ((this instanceof CountedCompleter) ?
|
||||
ForkJoinPool.common.externalHelpComplete(
|
||||
(CountedCompleter<?>)this, 0) :
|
||||
ForkJoinPool.common.tryExternalUnpush(this) ?
|
||||
doExec() : 0)) >= 0) {
|
||||
long ns, ms; // measure in nanosecs, but wait in millisecs
|
||||
while ((s = status) >= 0 &&
|
||||
(ns = deadline - System.nanoTime()) > 0L) {
|
||||
if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) > 0L &&
|
||||
U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
|
||||
synchronized (this) {
|
||||
if (status >= 0)
|
||||
wait(ms); // OK to throw InterruptedException
|
||||
else
|
||||
notifyAll();
|
||||
}
|
||||
if ((s = status) < 0 || interrupted ||
|
||||
(ns = deadline - System.nanoTime()) <= 0L)
|
||||
break;
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
if (p != null && canBlock)
|
||||
p.incrementActiveCount();
|
||||
}
|
||||
if (interrupted)
|
||||
throw new InterruptedException();
|
||||
}
|
||||
if (s >= 0)
|
||||
s = status;
|
||||
if ((s &= DONE_MASK) != NORMAL) {
|
||||
Throwable ex;
|
||||
if (s == CANCELLED)
|
||||
|
||||
@ -66,7 +66,7 @@ public class ForkJoinWorkerThread extends Thread {
|
||||
* owning thread.
|
||||
*
|
||||
* Support for (non-public) subclass InnocuousForkJoinWorkerThread
|
||||
* requires that we break quite a lot of encapulation (via Unsafe)
|
||||
* requires that we break quite a lot of encapsulation (via Unsafe)
|
||||
* both here and in the subclass to access and set Thread fields.
|
||||
*/
|
||||
|
||||
@ -118,7 +118,7 @@ public class ForkJoinWorkerThread extends Thread {
|
||||
* @return the index number
|
||||
*/
|
||||
public int getPoolIndex() {
|
||||
return workQueue.poolIndex >>> 1; // ignore odd/even tag bit
|
||||
return workQueue.getPoolIndex();
|
||||
}
|
||||
|
||||
/**
|
||||
@ -171,7 +171,7 @@ public class ForkJoinWorkerThread extends Thread {
|
||||
}
|
||||
|
||||
/**
|
||||
* Erases ThreadLocals by nulling out Thread maps
|
||||
* Erases ThreadLocals by nulling out Thread maps.
|
||||
*/
|
||||
final void eraseThreadLocals() {
|
||||
U.putObject(this, THREADLOCALS, null);
|
||||
@ -246,8 +246,8 @@ public class ForkJoinWorkerThread extends Thread {
|
||||
|
||||
/**
|
||||
* Returns a new group with the system ThreadGroup (the
|
||||
* topmost, parentless group) as parent. Uses Unsafe to
|
||||
* traverse Thread group and ThreadGroup parent fields.
|
||||
* topmost, parent-less group) as parent. Uses Unsafe to
|
||||
* traverse Thread.group and ThreadGroup.parent fields.
|
||||
*/
|
||||
private static ThreadGroup createThreadGroup() {
|
||||
try {
|
||||
@ -274,4 +274,3 @@ public class ForkJoinWorkerThread extends Thread {
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user