8134852: Integrate fork/join with API enhancements

Reviewed-by: martin, psandoz, chegar
This commit is contained in:
Doug Lea 2015-10-13 16:15:06 -07:00
parent cf43156eed
commit 2d8e9b323c
5 changed files with 1248 additions and 1156 deletions

View File

@ -36,21 +36,13 @@
package java.util.concurrent;
import java.io.Serializable;
import java.lang.ref.ReferenceQueue;
import java.lang.ref.WeakReference;
import java.lang.reflect.Constructor;
import java.util.Collection;
import java.util.List;
import java.util.RandomAccess;
import java.lang.ref.WeakReference;
import java.lang.ref.ReferenceQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.ReentrantLock;
import java.lang.reflect.Constructor;
/**
* Abstract base class for tasks that run within a {@link ForkJoinPool}.
@ -442,7 +434,8 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
ExceptionNode next;
final long thrower; // use id not ref to avoid weak cycles
final int hashCode; // store task hashCode before weak ref disappears
ExceptionNode(ForkJoinTask<?> task, Throwable ex, ExceptionNode next) {
ExceptionNode(ForkJoinTask<?> task, Throwable ex, ExceptionNode next,
ReferenceQueue<Object> exceptionTableRefQueue) {
super(task, exceptionTableRefQueue);
this.ex = ex;
this.next = next;
@ -468,7 +461,8 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
int i = h & (t.length - 1);
for (ExceptionNode e = t[i]; ; e = e.next) {
if (e == null) {
t[i] = new ExceptionNode(this, ex, t[i]);
t[i] = new ExceptionNode(this, ex, t[i],
exceptionTableRefQueue);
break;
}
if (e.get() == this) // already present
@ -561,8 +555,6 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
* @return the exception, or null if none
*/
private Throwable getThrowableException() {
if ((status & DONE_MASK) != EXCEPTIONAL)
return null;
int h = System.identityHashCode(this);
ExceptionNode e;
final ReentrantLock lock = exceptionTableLock;
@ -608,7 +600,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
}
/**
* Poll stale refs and remove them. Call only while holding lock.
* Polls stale refs and removes them. Call only while holding lock.
*/
private static void expungeStaleExceptions() {
for (Object x; (x = exceptionTableRefQueue.poll()) != null;) {
@ -635,7 +627,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
}
/**
* If lock is available, poll stale refs and remove them.
* If lock is available, polls stale refs and removes them.
* Called from ForkJoinPool when pools become quiescent.
*/
static final void helpExpungeStaleExceptions() {
@ -650,21 +642,23 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
}
/**
* A version of "sneaky throw" to relay exceptions
* A version of "sneaky throw" to relay exceptions.
*/
static void rethrow(Throwable ex) {
if (ex != null)
ForkJoinTask.<RuntimeException>uncheckedThrow(ex);
ForkJoinTask.<RuntimeException>uncheckedThrow(ex);
}
/**
* The sneaky part of sneaky throw, relying on generics
* limitations to evade compiler complaints about rethrowing
* unchecked exceptions
* unchecked exceptions.
*/
@SuppressWarnings("unchecked") static <T extends Throwable>
void uncheckedThrow(Throwable t) throws T {
throw (T)t; // rely on vacuous cast
void uncheckedThrow(Throwable t) throws T {
if (t != null)
throw (T)t; // rely on vacuous cast
else
throw new Error("Unknown Exception");
}
/**
@ -999,11 +993,10 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
public final V get() throws InterruptedException, ExecutionException {
int s = (Thread.currentThread() instanceof ForkJoinWorkerThread) ?
doJoin() : externalInterruptibleAwaitDone();
Throwable ex;
if ((s &= DONE_MASK) == CANCELLED)
throw new CancellationException();
if (s == EXCEPTIONAL && (ex = getThrowableException()) != null)
throw new ExecutionException(ex);
if (s == EXCEPTIONAL)
throw new ExecutionException(getThrowableException());
return getRawResult();
}
@ -1058,13 +1051,11 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
if (s >= 0)
s = status;
if ((s &= DONE_MASK) != NORMAL) {
Throwable ex;
if (s == CANCELLED)
throw new CancellationException();
if (s != EXCEPTIONAL)
throw new TimeoutException();
if ((ex = getThrowableException()) != null)
throw new ExecutionException(ex);
throw new ExecutionException(getThrowableException());
}
return getRawResult();
}
@ -1090,10 +1081,10 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
/**
* Possibly executes tasks until the pool hosting the current task
* {@link ForkJoinPool#isQuiescent is quiescent}. This method may
* be of use in designs in which many tasks are forked, but none
* are explicitly joined, instead executing them until all are
* processed.
* {@linkplain ForkJoinPool#isQuiescent is quiescent}. This
* method may be of use in designs in which many tasks are forked,
* but none are explicitly joined, instead executing them until
* all are processed.
*/
public static void helpQuiesce() {
Thread t;
@ -1129,10 +1120,12 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
}
/**
* Returns the pool hosting the current task execution, or null
* if this task is executing outside of any ForkJoinPool.
* Returns the pool hosting the current thread, or {@code null}
* if the current thread is executing outside of any ForkJoinPool.
*
* <p>This method returns {@code null} if and only if {@link
* #inForkJoinPool} returns {@code false}.
*
* @see #inForkJoinPool
* @return the pool, or {@code null} if none
*/
public static ForkJoinPool getPool() {
@ -1299,6 +1292,23 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
null;
}
/**
* If the current thread is operating in a ForkJoinPool,
* unschedules and returns, without executing, a task externally
* submitted to the pool, if one is available. Availability may be
* transient, so a {@code null} result does not necessarily imply
* quiescence of the pool. This method is designed primarily to
* support extensions, and is unlikely to be useful otherwise.
*
* @return a task, or {@code null} if none are available
* @since 1.9
*/
protected static ForkJoinTask<?> pollSubmission() {
Thread t;
return ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
((ForkJoinWorkerThread)t).pool.pollSubmission() : null;
}
// tag operations
/**
@ -1312,16 +1322,16 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
}
/**
* Atomically sets the tag value for this task.
* Atomically sets the tag value for this task and returns the old value.
*
* @param tag the tag value
* @param newValue the new tag value
* @return the previous value of the tag
* @since 1.8
*/
public final short setForkJoinTaskTag(short tag) {
public final short setForkJoinTaskTag(short newValue) {
for (int s;;) {
if (U.compareAndSwapInt(this, STATUS, s = status,
(s & ~SMASK) | (tag & SMASK)))
(s & ~SMASK) | (newValue & SMASK)))
return (short)s;
}
}
@ -1334,24 +1344,24 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
* before processing, otherwise exiting because the node has
* already been visited.
*
* @param e the expected tag value
* @param tag the new tag value
* @param expect the expected tag value
* @param update the new tag value
* @return {@code true} if successful; i.e., the current value was
* equal to e and is now tag.
* equal to {@code expect} and was changed to {@code update}.
* @since 1.8
*/
public final boolean compareAndSetForkJoinTaskTag(short e, short tag) {
public final boolean compareAndSetForkJoinTaskTag(short expect, short update) {
for (int s;;) {
if ((short)(s = status) != e)
if ((short)(s = status) != expect)
return false;
if (U.compareAndSwapInt(this, STATUS, s,
(s & ~SMASK) | (tag & SMASK)))
(s & ~SMASK) | (update & SMASK)))
return true;
}
}
/**
* Adaptor for Runnables. This implements RunnableFuture
* Adapter for Runnables. This implements RunnableFuture
* to be compliant with AbstractExecutorService constraints
* when used in ForkJoinPool.
*/
@ -1372,7 +1382,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
}
/**
* Adaptor for Runnables without results
* Adapter for Runnables without results.
*/
static final class AdaptedRunnableAction extends ForkJoinTask<Void>
implements RunnableFuture<Void> {
@ -1389,7 +1399,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
}
/**
* Adaptor for Runnables in which failure forces worker exception
* Adapter for Runnables in which failure forces worker exception.
*/
static final class RunnableExecuteAction extends ForkJoinTask<Void> {
final Runnable runnable;
@ -1407,7 +1417,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
}
/**
* Adaptor for Callables
* Adapter for Callables.
*/
static final class AdaptedCallable<T> extends ForkJoinTask<T>
implements RunnableFuture<T> {
@ -1423,8 +1433,6 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
try {
result = callable.call();
return true;
} catch (Error err) {
throw err;
} catch (RuntimeException rex) {
throw rex;
} catch (Exception ex) {
@ -1509,7 +1517,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
}
// Unsafe mechanics
private static final sun.misc.Unsafe U;
private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
private static final long STATUS;
static {
@ -1517,11 +1525,9 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
exceptionTableRefQueue = new ReferenceQueue<Object>();
exceptionTable = new ExceptionNode[EXCEPTION_MAP_CAPACITY];
try {
U = sun.misc.Unsafe.getUnsafe();
Class<?> k = ForkJoinTask.class;
STATUS = U.objectFieldOffset
(k.getDeclaredField("status"));
} catch (Exception e) {
(ForkJoinTask.class.getDeclaredField("status"));
} catch (ReflectiveOperationException e) {
throw new Error(e);
}
}

View File

@ -87,7 +87,7 @@ public class ForkJoinWorkerThread extends Thread {
}
/**
* Version for InnocuousForkJoinWorkerThread
* Version for InnocuousForkJoinWorkerThread.
*/
ForkJoinWorkerThread(ForkJoinPool pool, ThreadGroup threadGroup,
AccessControlContext acc) {
@ -179,28 +179,25 @@ public class ForkJoinWorkerThread extends Thread {
}
/**
* Non-public hook method for InnocuousForkJoinWorkerThread
* Non-public hook method for InnocuousForkJoinWorkerThread.
*/
void afterTopLevelExec() {
}
// Set up to allow setting thread fields in constructor
private static final sun.misc.Unsafe U;
private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
private static final long THREADLOCALS;
private static final long INHERITABLETHREADLOCALS;
private static final long INHERITEDACCESSCONTROLCONTEXT;
static {
try {
U = sun.misc.Unsafe.getUnsafe();
Class<?> tk = Thread.class;
THREADLOCALS = U.objectFieldOffset
(tk.getDeclaredField("threadLocals"));
(Thread.class.getDeclaredField("threadLocals"));
INHERITABLETHREADLOCALS = U.objectFieldOffset
(tk.getDeclaredField("inheritableThreadLocals"));
(Thread.class.getDeclaredField("inheritableThreadLocals"));
INHERITEDACCESSCONTROLCONTEXT = U.objectFieldOffset
(tk.getDeclaredField("inheritedAccessControlContext"));
} catch (Exception e) {
(Thread.class.getDeclaredField("inheritedAccessControlContext"));
} catch (ReflectiveOperationException e) {
throw new Error(e);
}
}
@ -252,10 +249,10 @@ public class ForkJoinWorkerThread extends Thread {
private static ThreadGroup createThreadGroup() {
try {
sun.misc.Unsafe u = sun.misc.Unsafe.getUnsafe();
Class<?> tk = Thread.class;
Class<?> gk = ThreadGroup.class;
long tg = u.objectFieldOffset(tk.getDeclaredField("group"));
long gp = u.objectFieldOffset(gk.getDeclaredField("parent"));
long tg = u.objectFieldOffset
(Thread.class.getDeclaredField("group"));
long gp = u.objectFieldOffset
(ThreadGroup.class.getDeclaredField("parent"));
ThreadGroup group = (ThreadGroup)
u.getObject(Thread.currentThread(), tg);
while (group != null) {
@ -265,7 +262,7 @@ public class ForkJoinWorkerThread extends Thread {
"InnocuousForkJoinWorkerThreadGroup");
group = parent;
}
} catch (Exception e) {
} catch (ReflectiveOperationException e) {
throw new Error(e);
}
// fall through if null as cannot-happen safeguard

View File

@ -36,11 +36,12 @@
* @author Doug Lea
* @bug 8004138
* @summary Check if ForkJoinPool table leaks thrown exceptions.
* @run main/othervm -Xmx32m FJExceptionTableLeak
* @run main/othervm/timeout=1200 -Xmx32m FJExceptionTableLeak
*/
import java.util.concurrent.*;
public class FJExceptionTableLeak {
// TODO: make this test use less time!
// Run with TASKS_PER_STEP * 40 < Xmx < STEPS * TASKS_PER_STEP * 40
// These work for Xmx32m:

View File

@ -59,14 +59,15 @@ public final class Integrate {
static final int DYNAMIC = 0;
static final int FORK = 1;
// the function to integrate
static double computeFunction(double x) {
/** the function to integrate */
static double computeFunction(double x) {
return (x * x + 1.0) * x;
}
static final double start = 0.0;
static final double end = 1536.0;
/*
/**
* The number of recursive calls for
* integrate from start to end.
* (Empirically determined)
@ -127,7 +128,6 @@ public final class Integrate {
g.shutdown();
}
// Sequential version
static final class SQuad extends RecursiveAction {
static double computeArea(ForkJoinPool pool, double l, double r) {