> futures = new ArrayList<>(tasks.size());
boolean done = false;
try {
@@ -2670,7 +2760,7 @@ public class ForkJoinPool extends AbstractExecutorService {
*/
public int getParallelism() {
int par;
- return ((par = parallelism) > 0) ? par : 1;
+ return ((par = config & SMASK) > 0) ? par : 1;
}
/**
@@ -2692,7 +2782,7 @@ public class ForkJoinPool extends AbstractExecutorService {
* @return the number of worker threads
*/
public int getPoolSize() {
- return parallelism + (short)(ctl >>> TC_SHIFT);
+ return (config & SMASK) + (short)(ctl >>> TC_SHIFT);
}
/**
@@ -2702,7 +2792,7 @@ public class ForkJoinPool extends AbstractExecutorService {
* @return {@code true} if this pool uses async mode
*/
public boolean getAsyncMode() {
- return mode == FIFO_QUEUE;
+ return (config & FIFO_QUEUE) != 0;
}
/**
@@ -2733,7 +2823,7 @@ public class ForkJoinPool extends AbstractExecutorService {
* @return the number of active threads
*/
public int getActiveThreadCount() {
- int r = parallelism + (int)(ctl >> AC_SHIFT);
+ int r = (config & SMASK) + (int)(ctl >> AC_SHIFT);
return (r <= 0) ? 0 : r; // suppress momentarily negative values
}
@@ -2749,7 +2839,7 @@ public class ForkJoinPool extends AbstractExecutorService {
* @return {@code true} if all threads are currently idle
*/
public boolean isQuiescent() {
- return parallelism + (int)(ctl >> AC_SHIFT) <= 0;
+ return (config & SMASK) + (int)(ctl >> AC_SHIFT) <= 0;
}
/**
@@ -2764,7 +2854,8 @@ public class ForkJoinPool extends AbstractExecutorService {
* @return the number of steals
*/
public long getStealCount() {
- long count = stealCount;
+ AtomicLong sc = stealCounter;
+ long count = (sc == null) ? 0L : sc.get();
WorkQueue[] ws; WorkQueue w;
if ((ws = workQueues) != null) {
for (int i = 1; i < ws.length; i += 2) {
@@ -2894,7 +2985,8 @@ public class ForkJoinPool extends AbstractExecutorService {
public String toString() {
// Use a single pass through workQueues to collect counts
long qt = 0L, qs = 0L; int rc = 0;
- long st = stealCount;
+ AtomicLong sc = stealCounter;
+ long st = (sc == null) ? 0L : sc.get();
long c = ctl;
WorkQueue[] ws; WorkQueue w;
if ((ws = workQueues) != null) {
@@ -2912,16 +3004,16 @@ public class ForkJoinPool extends AbstractExecutorService {
}
}
}
- int pc = parallelism;
+ int pc = (config & SMASK);
int tc = pc + (short)(c >>> TC_SHIFT);
int ac = pc + (int)(c >> AC_SHIFT);
if (ac < 0) // ignore transient negative
ac = 0;
- String level;
- if ((c & STOP_BIT) != 0)
- level = (tc == 0) ? "Terminated" : "Terminating";
- else
- level = plock < 0 ? "Shutting down" : "Running";
+ int rs = runState;
+ String level = ((rs & TERMINATED) != 0 ? "Terminated" :
+ (rs & STOP) != 0 ? "Terminating" :
+ (rs & SHUTDOWN) != 0 ? "Shutting down" :
+ "Running");
return super.toString() +
"[" + level +
", parallelism = " + pc +
@@ -2983,9 +3075,7 @@ public class ForkJoinPool extends AbstractExecutorService {
* @return {@code true} if all tasks have completed following shut down
*/
public boolean isTerminated() {
- long c = ctl;
- return ((c & STOP_BIT) != 0L &&
- (short)(c >>> TC_SHIFT) + parallelism <= 0);
+ return (runState & TERMINATED) != 0;
}
/**
@@ -3002,9 +3092,8 @@ public class ForkJoinPool extends AbstractExecutorService {
* @return {@code true} if terminating but not yet terminated
*/
public boolean isTerminating() {
- long c = ctl;
- return ((c & STOP_BIT) != 0L &&
- (short)(c >>> TC_SHIFT) + parallelism > 0);
+ int rs = runState;
+ return (rs & STOP) != 0 && (rs & TERMINATED) == 0;
}
/**
@@ -3013,7 +3102,7 @@ public class ForkJoinPool extends AbstractExecutorService {
* @return {@code true} if this pool has been shut down
*/
public boolean isShutdown() {
- return plock < 0;
+ return (runState & SHUTDOWN) != 0;
}
/**
@@ -3090,8 +3179,9 @@ public class ForkJoinPool extends AbstractExecutorService {
}
found = false;
for (int j = (m + 1) << 2; j >= 0; --j) {
- ForkJoinTask> t; WorkQueue q; int b;
- if ((q = ws[r++ & m]) != null && (b = q.base) - q.top < 0) {
+ ForkJoinTask> t; WorkQueue q; int b, k;
+ if ((k = r++ & m) <= m && k >= 0 && (q = ws[k]) != null &&
+ (b = q.base) - q.top < 0) {
found = true;
if ((t = q.pollAt(b)) != null)
t.doExec();
@@ -3115,8 +3205,8 @@ public class ForkJoinPool extends AbstractExecutorService {
* in {@link ForkJoinPool}s.
*
* A {@code ManagedBlocker} provides two methods. Method
- * {@code isReleasable} must return {@code true} if blocking is
- * not necessary. Method {@code block} blocks the current thread
+ * {@link #isReleasable} must return {@code true} if blocking is
+ * not necessary. Method {@link #block} blocks the current thread
* if necessary (perhaps internally invoking {@code isReleasable}
* before actually blocking). These actions are performed by any
* thread invoking {@link ForkJoinPool#managedBlock(ManagedBlocker)}.
@@ -3185,37 +3275,46 @@ public class ForkJoinPool extends AbstractExecutorService {
}
/**
- * Blocks in accord with the given blocker. If the current thread
- * is a {@link ForkJoinWorkerThread}, this method possibly
- * arranges for a spare thread to be activated if necessary to
- * ensure sufficient parallelism while the current thread is blocked.
+ * Runs the given possibly blocking task. When {@linkplain
+ * ForkJoinTask#inForkJoinPool() running in a ForkJoinPool}, this
+ * method possibly arranges for a spare thread to be activated if
+ * necessary to ensure sufficient parallelism while the current
+ * thread is blocked in {@link ManagedBlocker#block blocker.block()}.
*
- *
If the caller is not a {@link ForkJoinTask}, this method is
+ *
This method repeatedly calls {@code blocker.isReleasable()} and
+ * {@code blocker.block()} until either method returns {@code true}.
+ * Every call to {@code blocker.block()} is preceded by a call to
+ * {@code blocker.isReleasable()} that returned {@code false}.
+ *
+ *
If not running in a ForkJoinPool, this method is
* behaviorally equivalent to
*
{@code
* while (!blocker.isReleasable())
* if (blocker.block())
- * return;
- * }
+ * break;}
*
- * If the caller is a {@code ForkJoinTask}, then the pool may
- * first be expanded to ensure parallelism, and later adjusted.
+ * If running in a ForkJoinPool, the pool may first be expanded to
+ * ensure sufficient parallelism available during the call to
+ * {@code blocker.block()}.
*
- * @param blocker the blocker
- * @throws InterruptedException if blocker.block did so
+ * @param blocker the blocker task
+ * @throws InterruptedException if {@code blocker.block()} did so
*/
public static void managedBlock(ManagedBlocker blocker)
throws InterruptedException {
+ ForkJoinPool p;
+ ForkJoinWorkerThread wt;
Thread t = Thread.currentThread();
- if (t instanceof ForkJoinWorkerThread) {
- ForkJoinPool p = ((ForkJoinWorkerThread)t).pool;
+ if ((t instanceof ForkJoinWorkerThread) &&
+ (p = (wt = (ForkJoinWorkerThread)t).pool) != null) {
+ WorkQueue w = wt.workQueue;
while (!blocker.isReleasable()) {
- if (p.tryCompensate(p.ctl)) {
+ if (p.tryCompensate(w)) {
try {
do {} while (!blocker.isReleasable() &&
!blocker.block());
} finally {
- p.incrementActiveCount();
+ U.getAndAddLong(p, CTL, AC_UNIT);
}
break;
}
@@ -3241,15 +3340,18 @@ public class ForkJoinPool extends AbstractExecutorService {
// Unsafe mechanics
private static final sun.misc.Unsafe U;
+ private static final int ABASE;
+ private static final int ASHIFT;
private static final long CTL;
+ private static final long RUNSTATE;
+ private static final long STEALCOUNTER;
private static final long PARKBLOCKER;
- private static final int ABASE;
- private static final int ASHIFT;
- private static final long STEALCOUNT;
- private static final long PLOCK;
- private static final long INDEXSEED;
- private static final long QBASE;
+ private static final long QTOP;
private static final long QLOCK;
+ private static final long QSCANSTATE;
+ private static final long QPARKER;
+ private static final long QCURRENTSTEAL;
+ private static final long QCURRENTJOIN;
static {
// initialize field offsets for CAS etc
@@ -3258,20 +3360,26 @@ public class ForkJoinPool extends AbstractExecutorService {
Class> k = ForkJoinPool.class;
CTL = U.objectFieldOffset
(k.getDeclaredField("ctl"));
- STEALCOUNT = U.objectFieldOffset
- (k.getDeclaredField("stealCount"));
- PLOCK = U.objectFieldOffset
- (k.getDeclaredField("plock"));
- INDEXSEED = U.objectFieldOffset
- (k.getDeclaredField("indexSeed"));
+ RUNSTATE = U.objectFieldOffset
+ (k.getDeclaredField("runState"));
+ STEALCOUNTER = U.objectFieldOffset
+ (k.getDeclaredField("stealCounter"));
Class> tk = Thread.class;
PARKBLOCKER = U.objectFieldOffset
(tk.getDeclaredField("parkBlocker"));
Class> wk = WorkQueue.class;
- QBASE = U.objectFieldOffset
- (wk.getDeclaredField("base"));
+ QTOP = U.objectFieldOffset
+ (wk.getDeclaredField("top"));
QLOCK = U.objectFieldOffset
(wk.getDeclaredField("qlock"));
+ QSCANSTATE = U.objectFieldOffset
+ (wk.getDeclaredField("scanState"));
+ QPARKER = U.objectFieldOffset
+ (wk.getDeclaredField("parker"));
+ QCURRENTSTEAL = U.objectFieldOffset
+ (wk.getDeclaredField("currentSteal"));
+ QCURRENTJOIN = U.objectFieldOffset
+ (wk.getDeclaredField("currentJoin"));
Class> ak = ForkJoinTask[].class;
ABASE = U.arrayBaseOffset(ak);
int scale = U.arrayIndexScale(ak);
@@ -3282,6 +3390,7 @@ public class ForkJoinPool extends AbstractExecutorService {
throw new Error(e);
}
+ commonMaxSpares = DEFAULT_COMMON_MAX_SPARES;
defaultForkJoinWorkerThreadFactory =
new DefaultForkJoinWorkerThreadFactory();
modifyThreadPermission = new RuntimePermission("modifyThread");
@@ -3289,7 +3398,7 @@ public class ForkJoinPool extends AbstractExecutorService {
common = java.security.AccessController.doPrivileged
(new java.security.PrivilegedAction() {
public ForkJoinPool run() { return makeCommonPool(); }});
- int par = common.parallelism; // report 1 even if threads disabled
+ int par = common.config & SMASK; // report 1 even if threads disabled
commonParallelism = par > 0 ? par : 1;
}
@@ -3308,6 +3417,8 @@ public class ForkJoinPool extends AbstractExecutorService {
("java.util.concurrent.ForkJoinPool.common.threadFactory");
String hp = System.getProperty
("java.util.concurrent.ForkJoinPool.common.exceptionHandler");
+ String mp = System.getProperty
+ ("java.util.concurrent.ForkJoinPool.common.maximumSpares");
if (pp != null)
parallelism = Integer.parseInt(pp);
if (fp != null)
@@ -3316,6 +3427,8 @@ public class ForkJoinPool extends AbstractExecutorService {
if (hp != null)
handler = ((UncaughtExceptionHandler)ClassLoader.
getSystemClassLoader().loadClass(hp).newInstance());
+ if (mp != null)
+ commonMaxSpares = Integer.parseInt(mp);
} catch (Exception ignore) {
}
if (factory == null) {
diff --git a/jdk/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java b/jdk/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java
index 936bfc23a33..4439c2407da 100644
--- a/jdk/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java
+++ b/jdk/src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java
@@ -297,15 +297,22 @@ public abstract class ForkJoinTask implements Future, Serializable {
}
/**
- * Tries to set SIGNAL status unless already completed. Used by
- * ForkJoinPool. Other variants are directly incorporated into
- * externalAwaitDone etc.
+ * If not done, sets SIGNAL status and performs Object.wait(timeout).
+ * This task may or may not be done on exit. Ignores interrupts.
*
- * @return true if successful
+ * @param timeout using Object.wait conventions.
*/
- final boolean trySetSignal() {
- int s = status;
- return s >= 0 && U.compareAndSwapInt(this, STATUS, s, s | SIGNAL);
+ final void internalWait(long timeout) {
+ int s;
+ if ((s = status) >= 0 && // force completer to issue notify
+ U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
+ synchronized (this) {
+ if (status >= 0)
+ try { wait(timeout); } catch (InterruptedException ie) { }
+ else
+ notifyAll();
+ }
+ }
}
/**
@@ -313,35 +320,29 @@ public abstract class ForkJoinTask implements Future, Serializable {
* @return status upon completion
*/
private int externalAwaitDone() {
- int s;
- ForkJoinPool cp = ForkJoinPool.common;
- if ((s = status) >= 0) {
- if (cp != null) {
- if (this instanceof CountedCompleter)
- s = cp.externalHelpComplete((CountedCompleter>)this, Integer.MAX_VALUE);
- else if (cp.tryExternalUnpush(this))
- s = doExec();
- }
- if (s >= 0 && (s = status) >= 0) {
- boolean interrupted = false;
- do {
- if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
- synchronized (this) {
- if (status >= 0) {
- try {
- wait();
- } catch (InterruptedException ie) {
- interrupted = true;
- }
+ int s = ((this instanceof CountedCompleter) ? // try helping
+ ForkJoinPool.common.externalHelpComplete(
+ (CountedCompleter>)this, 0) :
+ ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : 0);
+ if (s >= 0 && (s = status) >= 0) {
+ boolean interrupted = false;
+ do {
+ if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
+ synchronized (this) {
+ if (status >= 0) {
+ try {
+ wait(0L);
+ } catch (InterruptedException ie) {
+ interrupted = true;
}
- else
- notifyAll();
}
+ else
+ notifyAll();
}
- } while ((s = status) >= 0);
- if (interrupted)
- Thread.currentThread().interrupt();
- }
+ }
+ } while ((s = status) >= 0);
+ if (interrupted)
+ Thread.currentThread().interrupt();
}
return s;
}
@@ -351,22 +352,22 @@ public abstract class ForkJoinTask implements Future, Serializable {
*/
private int externalInterruptibleAwaitDone() throws InterruptedException {
int s;
- ForkJoinPool cp = ForkJoinPool.common;
if (Thread.interrupted())
throw new InterruptedException();
- if ((s = status) >= 0 && cp != null) {
- if (this instanceof CountedCompleter)
- cp.externalHelpComplete((CountedCompleter>)this, Integer.MAX_VALUE);
- else if (cp.tryExternalUnpush(this))
- doExec();
- }
- while ((s = status) >= 0) {
- if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
- synchronized (this) {
- if (status >= 0)
- wait();
- else
- notifyAll();
+ if ((s = status) >= 0 &&
+ (s = ((this instanceof CountedCompleter) ?
+ ForkJoinPool.common.externalHelpComplete(
+ (CountedCompleter>)this, 0) :
+ ForkJoinPool.common.tryExternalUnpush(this) ? doExec() :
+ 0)) >= 0) {
+ while ((s = status) >= 0) {
+ if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
+ synchronized (this) {
+ if (status >= 0)
+ wait(0L);
+ else
+ notifyAll();
+ }
}
}
}
@@ -386,7 +387,7 @@ public abstract class ForkJoinTask implements Future, Serializable {
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
(w = (wt = (ForkJoinWorkerThread)t).workQueue).
tryUnpush(this) && (s = doExec()) < 0 ? s :
- wt.pool.awaitJoin(w, this) :
+ wt.pool.awaitJoin(w, this, 0L) :
externalAwaitDone();
}
@@ -399,7 +400,8 @@ public abstract class ForkJoinTask implements Future, Serializable {
int s; Thread t; ForkJoinWorkerThread wt;
return (s = doExec()) < 0 ? s :
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
- (wt = (ForkJoinWorkerThread)t).pool.awaitJoin(wt.workQueue, this) :
+ (wt = (ForkJoinWorkerThread)t).pool.
+ awaitJoin(wt.workQueue, this, 0L) :
externalAwaitDone();
}
@@ -577,7 +579,7 @@ public abstract class ForkJoinTask implements Future, Serializable {
Throwable ex;
if (e == null || (ex = e.ex) == null)
return null;
- if (false && e.thrower != Thread.currentThread().getId()) {
+ if (e.thrower != Thread.currentThread().getId()) {
Class extends Throwable> ec = ex.getClass();
try {
Constructor> noArgCtor = null;
@@ -587,13 +589,17 @@ public abstract class ForkJoinTask implements Future, Serializable {
Class>[] ps = c.getParameterTypes();
if (ps.length == 0)
noArgCtor = c;
- else if (ps.length == 1 && ps[0] == Throwable.class)
- return (Throwable)(c.newInstance(ex));
+ else if (ps.length == 1 && ps[0] == Throwable.class) {
+ Throwable wx = (Throwable)c.newInstance(ex);
+ return (wx == null) ? ex : wx;
+ }
}
if (noArgCtor != null) {
Throwable wx = (Throwable)(noArgCtor.newInstance());
- wx.initCause(ex);
- return wx;
+ if (wx != null) {
+ wx.initCause(ex);
+ return wx;
+ }
}
} catch (Exception ignore) {
}
@@ -1017,67 +1023,40 @@ public abstract class ForkJoinTask implements Future, Serializable {
*/
public final V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
+ int s;
+ long nanos = unit.toNanos(timeout);
if (Thread.interrupted())
throw new InterruptedException();
- // Messy in part because we measure in nanosecs, but wait in millisecs
- int s; long ms;
- long ns = unit.toNanos(timeout);
- ForkJoinPool cp;
- if ((s = status) >= 0 && ns > 0L) {
- long deadline = System.nanoTime() + ns;
- ForkJoinPool p = null;
- ForkJoinPool.WorkQueue w = null;
+ if ((s = status) >= 0 && nanos > 0L) {
+ long d = System.nanoTime() + nanos;
+ long deadline = (d == 0L) ? 1L : d; // avoid 0
Thread t = Thread.currentThread();
if (t instanceof ForkJoinWorkerThread) {
ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t;
- p = wt.pool;
- w = wt.workQueue;
- p.helpJoinOnce(w, this); // no retries on failure
+ s = wt.pool.awaitJoin(wt.workQueue, this, deadline);
}
- else if ((cp = ForkJoinPool.common) != null) {
- if (this instanceof CountedCompleter)
- cp.externalHelpComplete((CountedCompleter>)this, Integer.MAX_VALUE);
- else if (cp.tryExternalUnpush(this))
- doExec();
- }
- boolean canBlock = false;
- boolean interrupted = false;
- try {
- while ((s = status) >= 0) {
- if (w != null && w.qlock < 0)
- cancelIgnoringExceptions(this);
- else if (!canBlock) {
- if (p == null || p.tryCompensate(p.ctl))
- canBlock = true;
- }
- else {
- if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) > 0L &&
- U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
- synchronized (this) {
- if (status >= 0) {
- try {
- wait(ms);
- } catch (InterruptedException ie) {
- if (p == null)
- interrupted = true;
- }
- }
- else
- notifyAll();
- }
+ else if ((s = ((this instanceof CountedCompleter) ?
+ ForkJoinPool.common.externalHelpComplete(
+ (CountedCompleter>)this, 0) :
+ ForkJoinPool.common.tryExternalUnpush(this) ?
+ doExec() : 0)) >= 0) {
+ long ns, ms; // measure in nanosecs, but wait in millisecs
+ while ((s = status) >= 0 &&
+ (ns = deadline - System.nanoTime()) > 0L) {
+ if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) > 0L &&
+ U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
+ synchronized (this) {
+ if (status >= 0)
+ wait(ms); // OK to throw InterruptedException
+ else
+ notifyAll();
}
- if ((s = status) < 0 || interrupted ||
- (ns = deadline - System.nanoTime()) <= 0L)
- break;
}
}
- } finally {
- if (p != null && canBlock)
- p.incrementActiveCount();
}
- if (interrupted)
- throw new InterruptedException();
}
+ if (s >= 0)
+ s = status;
if ((s &= DONE_MASK) != NORMAL) {
Throwable ex;
if (s == CANCELLED)
diff --git a/jdk/src/java.base/share/classes/java/util/concurrent/ForkJoinWorkerThread.java b/jdk/src/java.base/share/classes/java/util/concurrent/ForkJoinWorkerThread.java
index 404c47cc01a..8723f0aac69 100644
--- a/jdk/src/java.base/share/classes/java/util/concurrent/ForkJoinWorkerThread.java
+++ b/jdk/src/java.base/share/classes/java/util/concurrent/ForkJoinWorkerThread.java
@@ -66,7 +66,7 @@ public class ForkJoinWorkerThread extends Thread {
* owning thread.
*
* Support for (non-public) subclass InnocuousForkJoinWorkerThread
- * requires that we break quite a lot of encapulation (via Unsafe)
+ * requires that we break quite a lot of encapsulation (via Unsafe)
* both here and in the subclass to access and set Thread fields.
*/
@@ -118,7 +118,7 @@ public class ForkJoinWorkerThread extends Thread {
* @return the index number
*/
public int getPoolIndex() {
- return workQueue.poolIndex >>> 1; // ignore odd/even tag bit
+ return workQueue.getPoolIndex();
}
/**
@@ -171,7 +171,7 @@ public class ForkJoinWorkerThread extends Thread {
}
/**
- * Erases ThreadLocals by nulling out Thread maps
+ * Erases ThreadLocals by nulling out Thread maps.
*/
final void eraseThreadLocals() {
U.putObject(this, THREADLOCALS, null);
@@ -246,8 +246,8 @@ public class ForkJoinWorkerThread extends Thread {
/**
* Returns a new group with the system ThreadGroup (the
- * topmost, parentless group) as parent. Uses Unsafe to
- * traverse Thread group and ThreadGroup parent fields.
+ * topmost, parent-less group) as parent. Uses Unsafe to
+ * traverse Thread.group and ThreadGroup.parent fields.
*/
private static ThreadGroup createThreadGroup() {
try {
@@ -274,4 +274,3 @@ public class ForkJoinWorkerThread extends Thread {
}
}
-