From cf43156eed9159beaf8ffda3997bb91bae32d34c Mon Sep 17 00:00:00 2001
From: Doug Lea
Date: Tue, 13 Oct 2015 16:04:56 -0700
Subject: [PATCH] 8134851: Integrate CompletableFuture with API enhancements
8039378: CompletableFuture: Avoid StackOverflowError for long linear chains
Reviewed-by: martin, psandoz, chegar
---
.../util/concurrent/CompletableFuture.java | 757 ++++++++++++++----
.../java/util/concurrent/CompletionStage.java | 37 +-
.../concurrent/CompletableFuture/Basic.java | 46 +-
.../ThenComposeAsyncTest.java | 1 -
.../ThenComposeExceptionTest.java | 1 -
5 files changed, 649 insertions(+), 193 deletions(-)
diff --git a/jdk/src/java.base/share/classes/java/util/concurrent/CompletableFuture.java b/jdk/src/java.base/share/classes/java/util/concurrent/CompletableFuture.java
index 955daabedc0..ee553e8f261 100644
--- a/jdk/src/java.base/share/classes/java/util/concurrent/CompletableFuture.java
+++ b/jdk/src/java.base/share/classes/java/util/concurrent/CompletableFuture.java
@@ -34,23 +34,13 @@
*/
package java.util.concurrent;
-import java.util.function.Supplier;
-import java.util.function.Consumer;
-import java.util.function.BiConsumer;
-import java.util.function.Function;
-import java.util.function.BiFunction;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.ForkJoinPool;
-import java.util.concurrent.ForkJoinTask;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.CompletionException;
-import java.util.concurrent.CompletionStage;
+
import java.util.concurrent.locks.LockSupport;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
/**
* A {@link Future} that may be explicitly completed (setting its
@@ -71,19 +61,32 @@ import java.util.concurrent.locks.LockSupport;
*
Actions supplied for dependent completions of
* non-async methods may be performed by the thread that
* completes the current CompletableFuture, or by any other caller of
- * a completion method.
+ * a completion method.
*
* All async methods without an explicit Executor
* argument are performed using the {@link ForkJoinPool#commonPool()}
* (unless it does not support a parallelism level of at least two, in
- * which case, a new Thread is created to run each task). To simplify
- * monitoring, debugging, and tracking, all generated asynchronous
- * tasks are instances of the marker interface {@link
- * AsynchronousCompletionTask}.
+ * which case, a new Thread is created to run each task). This may be
+ * overridden for non-static methods in subclasses by defining method
+ * {@link #defaultExecutor()}. To simplify monitoring, debugging,
+ * and tracking, all generated asynchronous tasks are instances of the
+ * marker interface {@link AsynchronousCompletionTask}. Operations
+ * with time-delays can use adapter methods defined in this class, for
+ * example: {@code supplyAsync(supplier, delayedExecutor(timeout,
+ * timeUnit))}. To support methods with delays and timeouts, this
+ * class maintains at most one daemon thread for triggering and
+ * cancelling actions, not for running them.
*
* All CompletionStage methods are implemented independently of
* other public methods, so the behavior of one method is not impacted
- * by overrides of others in subclasses.
+ * by overrides of others in subclasses.
+ *
+ * All CompletionStage methods return CompletableFutures. To
+ * restrict usages to only those methods defined in interface
+ * CompletionStage, use method {@link #minimalCompletionStage}. Or to
+ * ensure only that clients do not themselves modify a future, use
+ * method {@link #copy}.
+ *
*
* CompletableFuture also implements {@link Future} with the following
* policies:
@@ -94,7 +97,7 @@ import java.util.concurrent.locks.LockSupport;
* completion. Method {@link #cancel cancel} has the same effect as
* {@code completeExceptionally(new CancellationException())}. Method
* {@link #isCompletedExceptionally} can be used to determine if a
- * CompletableFuture completed in any exceptional fashion.
+ * CompletableFuture completed in any exceptional fashion.
*
* In case of exceptional completion with a CompletionException,
* methods {@link #get()} and {@link #get(long, TimeUnit)} throw an
@@ -102,10 +105,38 @@ import java.util.concurrent.locks.LockSupport;
* corresponding CompletionException. To simplify usage in most
* contexts, this class also defines methods {@link #join()} and
* {@link #getNow} that instead throw the CompletionException directly
- * in these cases.
+ * in these cases.
+ *
+ *
+ * Arguments used to pass a completion result (that is, for
+ * parameters of type {@code T}) for methods accepting them may be
+ * null, but passing a null value for any other parameter will result
+ * in a {@link NullPointerException} being thrown.
+ *
+ *
Subclasses of this class should normally override the "virtual
+ * constructor" method {@link #newIncompleteFuture}, which establishes
+ * the concrete type returned by CompletionStage methods. For example,
+ * here is a class that substitutes a different default Executor and
+ * disables the {@code obtrude} methods:
+ *
+ *
{@code
+ * class MyCompletableFuture extends CompletableFuture {
+ * static final Executor myExecutor = ...;
+ * public MyCompletableFuture() { }
+ * public CompletableFuture newIncompleteFuture() {
+ * return new MyCompletableFuture(); }
+ * public Executor defaultExecutor() {
+ * return myExecutor; }
+ * public void obtrudeValue(T value) {
+ * throw new UnsupportedOperationException(); }
+ * public void obtrudeException(Throwable ex) {
+ * throw new UnsupportedOperationException(); }
+ * }}
*
* @author Doug Lea
* @since 1.8
+ * @param The result type returned by this future's {@code join}
+ * and {@code get} methods
*/
public class CompletableFuture implements Future, CompletionStage {
@@ -150,9 +181,7 @@ public class CompletableFuture implements Future, CompletionStage {
* fields for source(s), actions, and dependent. They are
* boringly similar, differing from others only with respect to
* underlying functional forms. We do this so that users don't
- * encounter layers of adaptors in common usages. We also
- * include "Relay" classes/methods that don't correspond to user
- * methods; they copy results from one stage to another.
+ * encounter layers of adapters in common usages.
*
* * Boolean CompletableFuture method x(...) (for example
* uniApply) takes all of the arguments needed to check that an
@@ -219,18 +248,18 @@ public class CompletableFuture implements Future, CompletionStage {
volatile Completion stack; // Top of Treiber stack of dependent actions
final boolean internalComplete(Object r) { // CAS from null to r
- return UNSAFE.compareAndSwapObject(this, RESULT, null, r);
+ return U.compareAndSwapObject(this, RESULT, null, r);
}
final boolean casStack(Completion cmp, Completion val) {
- return UNSAFE.compareAndSwapObject(this, STACK, cmp, val);
+ return U.compareAndSwapObject(this, STACK, cmp, val);
}
/** Returns true if successfully pushed c onto stack. */
final boolean tryPushStack(Completion c) {
Completion h = stack;
lazySetNext(c, h);
- return UNSAFE.compareAndSwapObject(this, STACK, h, c);
+ return U.compareAndSwapObject(this, STACK, h, c);
}
/** Unconditionally pushes c onto stack, retrying if necessary. */
@@ -250,8 +279,8 @@ public class CompletableFuture implements Future, CompletionStage {
/** Completes with the null value, unless already completed. */
final boolean completeNull() {
- return UNSAFE.compareAndSwapObject(this, RESULT, null,
- NIL);
+ return U.compareAndSwapObject(this, RESULT, null,
+ NIL);
}
/** Returns the encoding of the given non-exceptional value. */
@@ -261,8 +290,8 @@ public class CompletableFuture implements Future, CompletionStage {
/** Completes with a non-exceptional result, unless already completed. */
final boolean completeValue(T t) {
- return UNSAFE.compareAndSwapObject(this, RESULT, null,
- (t == null) ? NIL : t);
+ return U.compareAndSwapObject(this, RESULT, null,
+ (t == null) ? NIL : t);
}
/**
@@ -276,8 +305,8 @@ public class CompletableFuture implements Future, CompletionStage {
/** Completes with an exceptional result, unless already completed. */
final boolean completeThrowable(Throwable x) {
- return UNSAFE.compareAndSwapObject(this, RESULT, null,
- encodeThrowable(x));
+ return U.compareAndSwapObject(this, RESULT, null,
+ encodeThrowable(x));
}
/**
@@ -304,8 +333,8 @@ public class CompletableFuture implements Future, CompletionStage {
* existing CompletionException.
*/
final boolean completeThrowable(Throwable x, Object r) {
- return UNSAFE.compareAndSwapObject(this, RESULT, null,
- encodeThrowable(x, r));
+ return U.compareAndSwapObject(this, RESULT, null,
+ encodeThrowable(x, r));
}
/**
@@ -334,8 +363,8 @@ public class CompletableFuture implements Future, CompletionStage {
* If exceptional, r is first coerced to a CompletionException.
*/
final boolean completeRelay(Object r) {
- return UNSAFE.compareAndSwapObject(this, RESULT, null,
- encodeRelay(r));
+ return U.compareAndSwapObject(this, RESULT, null,
+ encodeRelay(r));
}
/**
@@ -390,14 +419,14 @@ public class CompletableFuture implements Future, CompletionStage {
public static interface AsynchronousCompletionTask {
}
- private static final boolean useCommonPool =
+ private static final boolean USE_COMMON_POOL =
(ForkJoinPool.getCommonPoolParallelism() > 1);
/**
* Default executor -- ForkJoinPool.commonPool() unless it cannot
* support parallelism.
*/
- private static final Executor asyncPool = useCommonPool ?
+ private static final Executor ASYNC_POOL = USE_COMMON_POOL ?
ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
/** Fallback if ForkJoinPool.commonPool() cannot support parallelism */
@@ -407,11 +436,11 @@ public class CompletableFuture implements Future, CompletionStage {
/**
* Null-checks user executor argument, and translates uses of
- * commonPool to asyncPool in case parallelism disabled.
+ * commonPool to ASYNC_POOL in case parallelism disabled.
*/
static Executor screenExecutor(Executor e) {
- if (!useCommonPool && e == ForkJoinPool.commonPool())
- return asyncPool;
+ if (!USE_COMMON_POOL && e == ForkJoinPool.commonPool())
+ return ASYNC_POOL;
if (e == null) throw new NullPointerException();
return e;
}
@@ -421,6 +450,12 @@ public class CompletableFuture implements Future, CompletionStage {
static final int ASYNC = 1;
static final int NESTED = -1;
+ /**
+ * Spins before blocking in waitingGet
+ */
+ static final int SPINS = (Runtime.getRuntime().availableProcessors() > 1 ?
+ 1 << 8 : 0);
+
/* ------------- Base Completion classes and operations -------------- */
@SuppressWarnings("serial")
@@ -440,13 +475,13 @@ public class CompletableFuture implements Future, CompletionStage {
abstract boolean isLive();
public final void run() { tryFire(ASYNC); }
- public final boolean exec() { tryFire(ASYNC); return true; }
+ public final boolean exec() { tryFire(ASYNC); return false; }
public final Void getRawResult() { return null; }
public final void setRawResult(Void v) {}
}
static void lazySetNext(Completion c, Completion next) {
- UNSAFE.putOrderedObject(c, NEXT, next);
+ U.putOrderedObject(c, NEXT, next);
}
/**
@@ -610,7 +645,7 @@ public class CompletableFuture implements Future, CompletionStage {
private CompletableFuture uniApplyStage(
Executor e, Function super T,? extends V> f) {
if (f == null) throw new NullPointerException();
- CompletableFuture d = new CompletableFuture();
+ CompletableFuture d = newIncompleteFuture();
if (e != null || !d.uniApply(this, f, null)) {
UniApply c = new UniApply(e, d, this, f);
push(c);
@@ -665,7 +700,7 @@ public class CompletableFuture implements Future, CompletionStage {
private CompletableFuture uniAcceptStage(Executor e,
Consumer super T> f) {
if (f == null) throw new NullPointerException();
- CompletableFuture d = new CompletableFuture();
+ CompletableFuture d = newIncompleteFuture();
if (e != null || !d.uniAccept(this, f, null)) {
UniAccept c = new UniAccept(e, d, this, f);
push(c);
@@ -713,7 +748,7 @@ public class CompletableFuture implements Future, CompletionStage {
private CompletableFuture uniRunStage(Executor e, Runnable f) {
if (f == null) throw new NullPointerException();
- CompletableFuture d = new CompletableFuture();
+ CompletableFuture d = newIncompleteFuture();
if (e != null || !d.uniRun(this, f, null)) {
UniRun c = new UniRun(e, d, this, f);
push(c);
@@ -774,7 +809,7 @@ public class CompletableFuture implements Future, CompletionStage {
private CompletableFuture uniWhenCompleteStage(
Executor e, BiConsumer super T, ? super Throwable> f) {
if (f == null) throw new NullPointerException();
- CompletableFuture d = new CompletableFuture();
+ CompletableFuture d = newIncompleteFuture();
if (e != null || !d.uniWhenComplete(this, f, null)) {
UniWhenComplete c = new UniWhenComplete(e, d, this, f);
push(c);
@@ -830,7 +865,7 @@ public class CompletableFuture implements Future, CompletionStage {
private CompletableFuture uniHandleStage(
Executor e, BiFunction super T, Throwable, ? extends V> f) {
if (f == null) throw new NullPointerException();
- CompletableFuture d = new CompletableFuture();
+ CompletableFuture d = newIncompleteFuture();
if (e != null || !d.uniHandle(this, f, null)) {
UniHandle c = new UniHandle(e, d, this, f);
push(c);
@@ -880,7 +915,7 @@ public class CompletableFuture implements Future, CompletionStage {
private CompletableFuture uniExceptionallyStage(
Function f) {
if (f == null) throw new NullPointerException();
- CompletableFuture d = new CompletableFuture();
+ CompletableFuture d = newIncompleteFuture();
if (!d.uniExceptionally(this, f, null)) {
UniExceptionally c = new UniExceptionally(d, this, f);
push(c);
@@ -912,6 +947,30 @@ public class CompletableFuture implements Future, CompletionStage {
return true;
}
+ private CompletableFuture uniCopyStage() {
+ Object r;
+ CompletableFuture d = newIncompleteFuture();
+ if ((r = result) != null)
+ d.completeRelay(r);
+ else {
+ UniRelay c = new UniRelay(d, this);
+ push(c);
+ c.tryFire(SYNC);
+ }
+ return d;
+ }
+
+ private MinimalStage uniAsMinimalStage() {
+ Object r;
+ if ((r = result) != null)
+ return new MinimalStage(encodeRelay(r));
+ MinimalStage d = new MinimalStage();
+ UniRelay c = new UniRelay(d, this);
+ push(c);
+ c.tryFire(SYNC);
+ return d;
+ }
+
@SuppressWarnings("serial")
static final class UniCompose extends UniCompletion {
Function super T, ? extends CompletionStage> fn;
@@ -967,31 +1026,32 @@ public class CompletableFuture implements Future, CompletionStage {
private CompletableFuture uniComposeStage(
Executor e, Function super T, ? extends CompletionStage> f) {
if (f == null) throw new NullPointerException();
- Object r; Throwable x;
+ Object r, s; Throwable x;
+ CompletableFuture d = newIncompleteFuture();
if (e == null && (r = result) != null) {
- // try to return function result directly
if (r instanceof AltResult) {
if ((x = ((AltResult)r).ex) != null) {
- return new CompletableFuture(encodeThrowable(x, r));
+ d.result = encodeThrowable(x, r);
+ return d;
}
r = null;
}
try {
@SuppressWarnings("unchecked") T t = (T) r;
CompletableFuture g = f.apply(t).toCompletableFuture();
- Object s = g.result;
- if (s != null)
- return new CompletableFuture(encodeRelay(s));
- CompletableFuture d = new CompletableFuture();
- UniRelay copy = new UniRelay(d, g);
- g.push(copy);
- copy.tryFire(SYNC);
+ if ((s = g.result) != null)
+ d.completeRelay(s);
+ else {
+ UniRelay c = new UniRelay(d, g);
+ g.push(c);
+ c.tryFire(SYNC);
+ }
return d;
} catch (Throwable ex) {
- return new CompletableFuture(encodeThrowable(ex));
+ d.result = encodeThrowable(ex);
+ return d;
}
}
- CompletableFuture d = new CompletableFuture();
UniCompose c = new UniCompose(e, d, this, f);
push(c);
c.tryFire(SYNC);
@@ -1116,7 +1176,7 @@ public class CompletableFuture implements Future, CompletionStage {
CompletableFuture b;
if (f == null || (b = o.toCompletableFuture()) == null)
throw new NullPointerException();
- CompletableFuture d = new CompletableFuture();
+ CompletableFuture d = newIncompleteFuture();
if (e != null || !d.biApply(this, b, f, null)) {
BiApply c = new BiApply(e, d, this, b, f);
bipush(b, c);
@@ -1188,7 +1248,7 @@ public class CompletableFuture implements Future, CompletionStage {
CompletableFuture b;
if (f == null || (b = o.toCompletableFuture()) == null)
throw new NullPointerException();
- CompletableFuture d = new CompletableFuture();
+ CompletableFuture d = newIncompleteFuture();
if (e != null || !d.biAccept(this, b, f, null)) {
BiAccept c = new BiAccept(e, d, this, b, f);
bipush(b, c);
@@ -1247,7 +1307,7 @@ public class CompletableFuture implements Future, CompletionStage {
CompletableFuture> b;
if (f == null || (b = o.toCompletableFuture()) == null)
throw new NullPointerException();
- CompletableFuture d = new CompletableFuture();
+ CompletableFuture d = newIncompleteFuture();
if (e != null || !d.biRun(this, b, f, null)) {
BiRun c = new BiRun<>(e, d, this, b, f);
bipush(b, c);
@@ -1302,7 +1362,7 @@ public class CompletableFuture implements Future, CompletionStage {
if ((a = (lo == mid ? cfs[lo] :
andTree(cfs, lo, mid))) == null ||
(b = (lo == hi ? a : (hi == mid+1) ? cfs[hi] :
- andTree(cfs, mid+1, hi))) == null)
+ andTree(cfs, mid+1, hi))) == null)
throw new NullPointerException();
if (!d.biRelay(a, b)) {
BiRelay,?> c = new BiRelay<>(d, a, b);
@@ -1388,7 +1448,7 @@ public class CompletableFuture implements Future, CompletionStage {
CompletableFuture b;
if (f == null || (b = o.toCompletableFuture()) == null)
throw new NullPointerException();
- CompletableFuture d = new CompletableFuture();
+ CompletableFuture d = newIncompleteFuture();
if (e != null || !d.orApply(this, b, f, null)) {
OrApply c = new OrApply(e, d, this, b, f);
orpush(b, c);
@@ -1452,7 +1512,7 @@ public class CompletableFuture implements Future, CompletionStage {
CompletableFuture b;
if (f == null || (b = o.toCompletableFuture()) == null)
throw new NullPointerException();
- CompletableFuture d = new CompletableFuture();
+ CompletableFuture d = newIncompleteFuture();
if (e != null || !d.orAccept(this, b, f, null)) {
OrAccept c = new OrAccept(e, d, this, b, f);
orpush(b, c);
@@ -1510,7 +1570,7 @@ public class CompletableFuture implements Future, CompletionStage {
CompletableFuture> b;
if (f == null || (b = o.toCompletableFuture()) == null)
throw new NullPointerException();
- CompletableFuture d = new CompletableFuture();
+ CompletableFuture d = newIncompleteFuture();
if (e != null || !d.orRun(this, b, f, null)) {
OrRun c = new OrRun<>(e, d, this, b, f);
orpush(b, c);
@@ -1556,7 +1616,7 @@ public class CompletableFuture implements Future, CompletionStage {
if ((a = (lo == mid ? cfs[lo] :
orTree(cfs, lo, mid))) == null ||
(b = (lo == hi ? a : (hi == mid+1) ? cfs[hi] :
- orTree(cfs, mid+1, hi))) == null)
+ orTree(cfs, mid+1, hi))) == null)
throw new NullPointerException();
if (!d.orRelay(a, b)) {
OrRelay,?> c = new OrRelay<>(d, a, b);
@@ -1571,9 +1631,9 @@ public class CompletableFuture implements Future, CompletionStage {
@SuppressWarnings("serial")
static final class AsyncSupply extends ForkJoinTask
- implements Runnable, AsynchronousCompletionTask {
- CompletableFuture dep; Supplier fn;
- AsyncSupply(CompletableFuture dep, Supplier fn) {
+ implements Runnable, AsynchronousCompletionTask {
+ CompletableFuture dep; Supplier extends T> fn;
+ AsyncSupply(CompletableFuture dep, Supplier extends T> fn) {
this.dep = dep; this.fn = fn;
}
@@ -1582,7 +1642,7 @@ public class CompletableFuture implements Future, CompletionStage {
public final boolean exec() { run(); return true; }
public void run() {
- CompletableFuture d; Supplier f;
+ CompletableFuture d; Supplier extends T> f;
if ((d = dep) != null && (f = fn) != null) {
dep = null; fn = null;
if (d.result == null) {
@@ -1607,7 +1667,7 @@ public class CompletableFuture implements Future, CompletionStage {
@SuppressWarnings("serial")
static final class AsyncRun extends ForkJoinTask
- implements Runnable, AsynchronousCompletionTask {
+ implements Runnable, AsynchronousCompletionTask {
CompletableFuture dep; Runnable fn;
AsyncRun(CompletableFuture dep, Runnable fn) {
this.dep = dep; this.fn = fn;
@@ -1651,14 +1711,15 @@ public class CompletableFuture implements Future, CompletionStage {
@SuppressWarnings("serial")
static final class Signaller extends Completion
implements ForkJoinPool.ManagedBlocker {
- long nanos; // wait time if timed
+ long nanos; // remaining wait time if timed
final long deadline; // non-zero if timed
- volatile int interruptControl; // > 0: interruptible, < 0: interrupted
+ final boolean interruptible;
+ boolean interrupted;
volatile Thread thread;
Signaller(boolean interruptible, long nanos, long deadline) {
this.thread = Thread.currentThread();
- this.interruptControl = interruptible ? 1 : 0;
+ this.interruptible = interruptible;
this.nanos = nanos;
this.deadline = deadline;
}
@@ -1671,29 +1732,22 @@ public class CompletableFuture implements Future, CompletionStage {
return null;
}
public boolean isReleasable() {
- if (thread == null)
- return true;
- if (Thread.interrupted()) {
- int i = interruptControl;
- interruptControl = -1;
- if (i > 0)
- return true;
- }
- if (deadline != 0L &&
- (nanos <= 0L || (nanos = deadline - System.nanoTime()) <= 0L)) {
- thread = null;
- return true;
- }
- return false;
+ if (Thread.interrupted())
+ interrupted = true;
+ return ((interrupted && interruptible) ||
+ (deadline != 0L &&
+ (nanos <= 0L ||
+ (nanos = deadline - System.nanoTime()) <= 0L)) ||
+ thread == null);
}
public boolean block() {
- if (isReleasable())
- return true;
- else if (deadline == 0L)
- LockSupport.park(this);
- else if (nanos > 0L)
- LockSupport.parkNanos(this, nanos);
- return isReleasable();
+ while (!isReleasable()) {
+ if (deadline == 0L)
+ LockSupport.park(this);
+ else
+ LockSupport.parkNanos(this, nanos);
+ }
+ return true;
}
final boolean isLive() { return thread != null; }
}
@@ -1705,13 +1759,10 @@ public class CompletableFuture implements Future, CompletionStage {
private Object waitingGet(boolean interruptible) {
Signaller q = null;
boolean queued = false;
- int spins = -1;
+ int spins = SPINS;
Object r;
while ((r = result) == null) {
- if (spins < 0)
- spins = (Runtime.getRuntime().availableProcessors() > 1) ?
- 1 << 8 : 0; // Use brief spin-wait on multiprocessors
- else if (spins > 0) {
+ if (spins > 0) {
if (ThreadLocalRandom.nextSecondarySeed() >= 0)
--spins;
}
@@ -1719,29 +1770,27 @@ public class CompletableFuture implements Future, CompletionStage {
q = new Signaller(interruptible, 0L, 0L);
else if (!queued)
queued = tryPushStack(q);
- else if (interruptible && q.interruptControl < 0) {
- q.thread = null;
- cleanStack();
- return null;
- }
- else if (q.thread != null && result == null) {
+ else {
try {
ForkJoinPool.managedBlock(q);
- } catch (InterruptedException ie) {
- q.interruptControl = -1;
+ } catch (InterruptedException ie) { // currently cannot happen
+ q.interrupted = true;
}
+ if (q.interrupted && interruptible)
+ break;
}
}
if (q != null) {
q.thread = null;
- if (q.interruptControl < 0) {
+ if (q.interrupted) {
if (interruptible)
- r = null; // report interruption
+ cleanStack();
else
Thread.currentThread().interrupt();
}
}
- postComplete();
+ if (r != null)
+ postComplete();
return r;
}
@@ -1752,37 +1801,39 @@ public class CompletableFuture implements Future, CompletionStage {
private Object timedGet(long nanos) throws TimeoutException {
if (Thread.interrupted())
return null;
- if (nanos <= 0L)
- throw new TimeoutException();
- long d = System.nanoTime() + nanos;
- Signaller q = new Signaller(true, nanos, d == 0L ? 1L : d); // avoid 0
- boolean queued = false;
- Object r;
- // We intentionally don't spin here (as waitingGet does) because
- // the call to nanoTime() above acts much like a spin.
- while ((r = result) == null) {
- if (!queued)
- queued = tryPushStack(q);
- else if (q.interruptControl < 0 || q.nanos <= 0L) {
- q.thread = null;
- cleanStack();
- if (q.interruptControl < 0)
- return null;
- throw new TimeoutException();
- }
- else if (q.thread != null && result == null) {
- try {
- ForkJoinPool.managedBlock(q);
- } catch (InterruptedException ie) {
- q.interruptControl = -1;
+ if (nanos > 0L) {
+ long d = System.nanoTime() + nanos;
+ long deadline = (d == 0L) ? 1L : d; // avoid 0
+ Signaller q = null;
+ boolean queued = false;
+ Object r;
+ while ((r = result) == null) { // similar to untimed, without spins
+ if (q == null)
+ q = new Signaller(true, nanos, deadline);
+ else if (!queued)
+ queued = tryPushStack(q);
+ else if (q.nanos <= 0L)
+ break;
+ else {
+ try {
+ ForkJoinPool.managedBlock(q);
+ } catch (InterruptedException ie) {
+ q.interrupted = true;
+ }
+ if (q.interrupted)
+ break;
}
}
+ if (q != null)
+ q.thread = null;
+ if (r != null)
+ postComplete();
+ else
+ cleanStack();
+ if (r != null || (q != null && q.interrupted))
+ return r;
}
- if (q.interruptControl < 0)
- r = null;
- q.thread = null;
- postComplete();
- return r;
+ throw new TimeoutException();
}
/* ------------- public methods -------------- */
@@ -1796,7 +1847,7 @@ public class CompletableFuture implements Future, CompletionStage {
/**
* Creates a new complete CompletableFuture with given encoded result.
*/
- private CompletableFuture(Object r) {
+ CompletableFuture(Object r) {
this.result = r;
}
@@ -1811,7 +1862,7 @@ public class CompletableFuture implements Future, CompletionStage {
* @return the new CompletableFuture
*/
public static CompletableFuture supplyAsync(Supplier supplier) {
- return asyncSupplyStage(asyncPool, supplier);
+ return asyncSupplyStage(ASYNC_POOL, supplier);
}
/**
@@ -1840,7 +1891,7 @@ public class CompletableFuture implements Future, CompletionStage {
* @return the new CompletableFuture
*/
public static CompletableFuture runAsync(Runnable runnable) {
- return asyncRunStage(asyncPool, runnable);
+ return asyncRunStage(ASYNC_POOL, runnable);
}
/**
@@ -1985,7 +2036,7 @@ public class CompletableFuture implements Future, CompletionStage {
public CompletableFuture thenApplyAsync(
Function super T,? extends U> fn) {
- return uniApplyStage(asyncPool, fn);
+ return uniApplyStage(defaultExecutor(), fn);
}
public CompletableFuture thenApplyAsync(
@@ -1998,7 +2049,7 @@ public class CompletableFuture implements Future, CompletionStage {
}
public CompletableFuture thenAcceptAsync(Consumer super T> action) {
- return uniAcceptStage(asyncPool, action);
+ return uniAcceptStage(defaultExecutor(), action);
}
public CompletableFuture thenAcceptAsync(Consumer super T> action,
@@ -2011,7 +2062,7 @@ public class CompletableFuture implements Future, CompletionStage {
}
public CompletableFuture