mirror of
https://github.com/openjdk/jdk.git
synced 2026-04-26 06:41:24 +00:00
8153768: Miscellaneous changes imported from jsr166 CVS 2016-05
Reviewed-by: martin, psandoz, chegar, shade
This commit is contained in:
parent
fea34ed634
commit
27a77176a6
@ -1242,7 +1242,8 @@ public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
|
||||
*/
|
||||
public KeySetView<K,V> keySet() {
|
||||
KeySetView<K,V> ks;
|
||||
return (ks = keySet) != null ? ks : (keySet = new KeySetView<K,V>(this, null));
|
||||
if ((ks = keySet) != null) return ks;
|
||||
return keySet = new KeySetView<K,V>(this, null);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -1265,7 +1266,8 @@ public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
|
||||
*/
|
||||
public Collection<V> values() {
|
||||
ValuesView<K,V> vs;
|
||||
return (vs = values) != null ? vs : (values = new ValuesView<K,V>(this));
|
||||
if ((vs = values) != null) return vs;
|
||||
return values = new ValuesView<K,V>(this);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -1287,7 +1289,8 @@ public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
|
||||
*/
|
||||
public Set<Map.Entry<K,V>> entrySet() {
|
||||
EntrySetView<K,V> es;
|
||||
return (es = entrySet) != null ? es : (entrySet = new EntrySetView<K,V>(this));
|
||||
if ((es = entrySet) != null) return es;
|
||||
return entrySet = new EntrySetView<K,V>(this);
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@ -376,12 +376,12 @@ public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V>
|
||||
|
||||
/** Lazily initialized key set */
|
||||
private transient KeySet<K,V> keySet;
|
||||
/** Lazily initialized entry set */
|
||||
private transient EntrySet<K,V> entrySet;
|
||||
/** Lazily initialized values collection */
|
||||
private transient Values<K,V> values;
|
||||
/** Lazily initialized entry set */
|
||||
private transient EntrySet<K,V> entrySet;
|
||||
/** Lazily initialized descending key set */
|
||||
private transient ConcurrentNavigableMap<K,V> descendingMap;
|
||||
private transient SubMap<K,V> descendingMap;
|
||||
|
||||
/**
|
||||
* Initializes or resets state. Needed by constructors, clone,
|
||||
@ -1827,13 +1827,15 @@ public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V>
|
||||
* @return a navigable set view of the keys in this map
|
||||
*/
|
||||
public NavigableSet<K> keySet() {
|
||||
KeySet<K,V> ks = keySet;
|
||||
return (ks != null) ? ks : (keySet = new KeySet<>(this));
|
||||
KeySet<K,V> ks;
|
||||
if ((ks = keySet) != null) return ks;
|
||||
return keySet = new KeySet<>(this);
|
||||
}
|
||||
|
||||
public NavigableSet<K> navigableKeySet() {
|
||||
KeySet<K,V> ks = keySet;
|
||||
return (ks != null) ? ks : (keySet = new KeySet<>(this));
|
||||
KeySet<K,V> ks;
|
||||
if ((ks = keySet) != null) return ks;
|
||||
return keySet = new KeySet<>(this);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -1856,8 +1858,9 @@ public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V>
|
||||
* <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
|
||||
*/
|
||||
public Collection<V> values() {
|
||||
Values<K,V> vs = values;
|
||||
return (vs != null) ? vs : (values = new Values<>(this));
|
||||
Values<K,V> vs;
|
||||
if ((vs = values) != null) return vs;
|
||||
return values = new Values<>(this);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -1888,14 +1891,16 @@ public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V>
|
||||
* sorted in ascending key order
|
||||
*/
|
||||
public Set<Map.Entry<K,V>> entrySet() {
|
||||
EntrySet<K,V> es = entrySet;
|
||||
return (es != null) ? es : (entrySet = new EntrySet<K,V>(this));
|
||||
EntrySet<K,V> es;
|
||||
if ((es = entrySet) != null) return es;
|
||||
return entrySet = new EntrySet<K,V>(this);
|
||||
}
|
||||
|
||||
public ConcurrentNavigableMap<K,V> descendingMap() {
|
||||
ConcurrentNavigableMap<K,V> dm = descendingMap;
|
||||
return (dm != null) ? dm : (descendingMap = new SubMap<K,V>
|
||||
(this, null, false, null, false, true));
|
||||
ConcurrentNavigableMap<K,V> dm;
|
||||
if ((dm = descendingMap) != null) return dm;
|
||||
return descendingMap =
|
||||
new SubMap<K,V>(this, null, false, null, false, true);
|
||||
}
|
||||
|
||||
public NavigableSet<K> descendingKeySet() {
|
||||
@ -2564,7 +2569,7 @@ public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V>
|
||||
* @serial include
|
||||
*/
|
||||
static final class SubMap<K,V> extends AbstractMap<K,V>
|
||||
implements ConcurrentNavigableMap<K,V>, Cloneable, Serializable {
|
||||
implements ConcurrentNavigableMap<K,V>, Serializable {
|
||||
private static final long serialVersionUID = -7647078645895051609L;
|
||||
|
||||
/** Underlying map */
|
||||
@ -2582,8 +2587,8 @@ public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V>
|
||||
|
||||
// Lazily initialized view holders
|
||||
private transient KeySet<K,V> keySetView;
|
||||
private transient Set<Map.Entry<K,V>> entrySetView;
|
||||
private transient Collection<V> valuesView;
|
||||
private transient Values<K,V> valuesView;
|
||||
private transient EntrySet<K,V> entrySetView;
|
||||
|
||||
/**
|
||||
* Creates a new submap, initializing all fields.
|
||||
@ -3049,23 +3054,27 @@ public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V>
|
||||
/* ---------------- Submap Views -------------- */
|
||||
|
||||
public NavigableSet<K> keySet() {
|
||||
KeySet<K,V> ks = keySetView;
|
||||
return (ks != null) ? ks : (keySetView = new KeySet<>(this));
|
||||
KeySet<K,V> ks;
|
||||
if ((ks = keySetView) != null) return ks;
|
||||
return keySetView = new KeySet<>(this);
|
||||
}
|
||||
|
||||
public NavigableSet<K> navigableKeySet() {
|
||||
KeySet<K,V> ks = keySetView;
|
||||
return (ks != null) ? ks : (keySetView = new KeySet<>(this));
|
||||
KeySet<K,V> ks;
|
||||
if ((ks = keySetView) != null) return ks;
|
||||
return keySetView = new KeySet<>(this);
|
||||
}
|
||||
|
||||
public Collection<V> values() {
|
||||
Collection<V> vs = valuesView;
|
||||
return (vs != null) ? vs : (valuesView = new Values<>(this));
|
||||
Values<K,V> vs;
|
||||
if ((vs = valuesView) != null) return vs;
|
||||
return valuesView = new Values<>(this);
|
||||
}
|
||||
|
||||
public Set<Map.Entry<K,V>> entrySet() {
|
||||
Set<Map.Entry<K,V>> es = entrySetView;
|
||||
return (es != null) ? es : (entrySetView = new EntrySet<K,V>(this));
|
||||
EntrySet<K,V> es;
|
||||
if ((es = entrySetView) != null) return es;
|
||||
return entrySetView = new EntrySet<K,V>(this);
|
||||
}
|
||||
|
||||
public NavigableSet<K> descendingKeySet() {
|
||||
|
||||
@ -596,7 +596,7 @@ public abstract class CountedCompleter<T> extends ForkJoinTask<T> {
|
||||
* not, be invoked for each completer in a computation.
|
||||
*/
|
||||
public final void propagateCompletion() {
|
||||
CountedCompleter<?> a = this, s = a;
|
||||
CountedCompleter<?> a = this, s;
|
||||
for (int c;;) {
|
||||
if ((c = a.pending) == 0) {
|
||||
if ((a = (s = a).completer) == null) {
|
||||
|
||||
@ -56,13 +56,11 @@ package java.util.concurrent;
|
||||
* void solve(Executor e,
|
||||
* Collection<Callable<Result>> solvers)
|
||||
* throws InterruptedException, ExecutionException {
|
||||
* CompletionService<Result> ecs
|
||||
* = new ExecutorCompletionService<Result>(e);
|
||||
* for (Callable<Result> s : solvers)
|
||||
* ecs.submit(s);
|
||||
* int n = solvers.size();
|
||||
* for (int i = 0; i < n; ++i) {
|
||||
* Result r = ecs.take().get();
|
||||
* CompletionService<Result> cs
|
||||
* = new ExecutorCompletionService<>(e);
|
||||
* solvers.forEach(cs::submit);
|
||||
* for (int i = solvers.size(); i > 0; i--) {
|
||||
* Result r = cs.take().get();
|
||||
* if (r != null)
|
||||
* use(r);
|
||||
* }
|
||||
@ -76,27 +74,24 @@ package java.util.concurrent;
|
||||
* void solve(Executor e,
|
||||
* Collection<Callable<Result>> solvers)
|
||||
* throws InterruptedException {
|
||||
* CompletionService<Result> ecs
|
||||
* = new ExecutorCompletionService<Result>(e);
|
||||
* CompletionService<Result> cs
|
||||
* = new ExecutorCompletionService<>(e);
|
||||
* int n = solvers.size();
|
||||
* List<Future<Result>> futures = new ArrayList<>(n);
|
||||
* Result result = null;
|
||||
* try {
|
||||
* for (Callable<Result> s : solvers)
|
||||
* futures.add(ecs.submit(s));
|
||||
* for (int i = 0; i < n; ++i) {
|
||||
* solvers.forEach((solver) -> futures.add(cs.submit(solver)));
|
||||
* for (int i = n; i > 0; i--) {
|
||||
* try {
|
||||
* Result r = ecs.take().get();
|
||||
* Result r = cs.take().get();
|
||||
* if (r != null) {
|
||||
* result = r;
|
||||
* break;
|
||||
* }
|
||||
* } catch (ExecutionException ignore) {}
|
||||
* }
|
||||
* }
|
||||
* finally {
|
||||
* for (Future<Result> f : futures)
|
||||
* f.cancel(true);
|
||||
* } finally {
|
||||
* futures.forEach((future) -> future.cancel(true));
|
||||
* }
|
||||
*
|
||||
* if (result != null)
|
||||
|
||||
@ -348,10 +348,6 @@ public class Phaser {
|
||||
private final AtomicReference<QNode> evenQ;
|
||||
private final AtomicReference<QNode> oddQ;
|
||||
|
||||
private AtomicReference<QNode> queueFor(int phase) {
|
||||
return ((phase & 1) == 0) ? evenQ : oddQ;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns message string for bounds exceptions on arrival.
|
||||
*/
|
||||
|
||||
@ -25,13 +25,10 @@
|
||||
* @test
|
||||
* @bug 6503247 6574123
|
||||
* @summary Test resilience to tryAcquire methods that throw
|
||||
* @library /lib/testlibrary/
|
||||
* @author Martin Buchholz
|
||||
*/
|
||||
|
||||
import static java.util.concurrent.TimeUnit.MILLISECONDS;
|
||||
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import java.util.concurrent.CyclicBarrier;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
@ -39,7 +36,6 @@ import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.locks.AbstractQueuedLongSynchronizer;
|
||||
import java.util.concurrent.locks.Condition;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import jdk.testlibrary.Utils;
|
||||
|
||||
/**
|
||||
* This uses a variant of the standard Mutex demo, except with a
|
||||
@ -48,22 +44,10 @@ import jdk.testlibrary.Utils;
|
||||
*/
|
||||
@SuppressWarnings("serial")
|
||||
public class FlakyMutex implements Lock {
|
||||
static final long LONG_DELAY_MS = Utils.adjustTimeout(10_000);
|
||||
static class MyError extends Error {}
|
||||
static class MyException extends Exception {}
|
||||
static class MyRuntimeException extends RuntimeException {}
|
||||
|
||||
static final Random rnd = new Random();
|
||||
|
||||
static void maybeThrow() {
|
||||
switch (rnd.nextInt(10)) {
|
||||
case 0: throw new MyError();
|
||||
case 1: throw new MyRuntimeException();
|
||||
case 2: FlakyMutex.<RuntimeException>uncheckedThrow(new MyException());
|
||||
default: /* Do nothing */ break;
|
||||
}
|
||||
}
|
||||
|
||||
static void checkThrowable(Throwable t) {
|
||||
check((t instanceof MyError) ||
|
||||
(t instanceof MyException) ||
|
||||
@ -72,31 +56,35 @@ public class FlakyMutex implements Lock {
|
||||
|
||||
static void realMain(String[] args) throws Throwable {
|
||||
final int nThreads = 3;
|
||||
final CyclicBarrier barrier = new CyclicBarrier(nThreads + 1);
|
||||
final FlakyMutex m = new FlakyMutex();
|
||||
final int iterations = 10_000;
|
||||
final CyclicBarrier startingGate = new CyclicBarrier(nThreads);
|
||||
final FlakyMutex mutex = new FlakyMutex();
|
||||
final ExecutorService es = Executors.newFixedThreadPool(nThreads);
|
||||
for (int i = 0; i < nThreads; i++) {
|
||||
es.submit(new Runnable() { public void run() {
|
||||
try {
|
||||
barrier.await();
|
||||
for (int i = 0; i < 10000; i++) {
|
||||
for (;;) {
|
||||
try { m.lock(); break; }
|
||||
catch (Throwable t) { checkThrowable(t); }
|
||||
}
|
||||
|
||||
try { check(! m.tryLock()); }
|
||||
final Runnable task = () -> {
|
||||
try {
|
||||
startingGate.await();
|
||||
for (int i = 0; i < iterations; i++) {
|
||||
for (;;) {
|
||||
try { mutex.lock(); break; }
|
||||
catch (Throwable t) { checkThrowable(t); }
|
||||
|
||||
try { check(! m.tryLock(1, TimeUnit.MICROSECONDS)); }
|
||||
catch (Throwable t) { checkThrowable(t); }
|
||||
|
||||
m.unlock();
|
||||
}
|
||||
} catch (Throwable t) { unexpected(t); }}});}
|
||||
barrier.await();
|
||||
|
||||
try { check(! mutex.tryLock()); }
|
||||
catch (Throwable t) { checkThrowable(t); }
|
||||
|
||||
try { check(! mutex.tryLock(1, TimeUnit.MICROSECONDS)); }
|
||||
catch (Throwable t) { checkThrowable(t); }
|
||||
|
||||
mutex.unlock();
|
||||
}
|
||||
} catch (Throwable t) { unexpected(t); }
|
||||
};
|
||||
|
||||
for (int i = 0; i < nThreads; i++)
|
||||
es.submit(task);
|
||||
es.shutdown();
|
||||
check(es.awaitTermination(LONG_DELAY_MS, MILLISECONDS));
|
||||
// Let test harness handle timeout
|
||||
check(es.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS));
|
||||
}
|
||||
|
||||
private static class FlakySync extends AbstractQueuedLongSynchronizer {
|
||||
@ -116,8 +104,12 @@ public class FlakyMutex implements Lock {
|
||||
do {} while (hasQueuedPredecessors() != hasQueuedThreads());
|
||||
}
|
||||
|
||||
maybeThrow();
|
||||
return compareAndSetState(0, 1);
|
||||
switch (ThreadLocalRandom.current().nextInt(10)) {
|
||||
case 0: throw new MyError();
|
||||
case 1: throw new MyRuntimeException();
|
||||
case 2: FlakyMutex.<RuntimeException>uncheckedThrow(new MyException());
|
||||
default: return compareAndSetState(0, 1);
|
||||
}
|
||||
}
|
||||
|
||||
public boolean tryRelease(long releases) {
|
||||
|
||||
@ -35,16 +35,10 @@
|
||||
* @test
|
||||
* @bug 8074773
|
||||
* @summary Stress test looks for lost unparks
|
||||
* @library /lib/testlibrary/
|
||||
* @modules java.management
|
||||
* @run main/timeout=1200 ParkLoops
|
||||
*/
|
||||
|
||||
import static java.util.concurrent.TimeUnit.MILLISECONDS;
|
||||
import static java.util.concurrent.TimeUnit.SECONDS;
|
||||
|
||||
import java.lang.management.ManagementFactory;
|
||||
import java.lang.management.ThreadInfo;
|
||||
import java.util.SplittableRandom;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
@ -52,11 +46,8 @@ import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.atomic.AtomicReferenceArray;
|
||||
import java.util.concurrent.locks.LockSupport;
|
||||
import jdk.testlibrary.Utils;
|
||||
|
||||
public final class ParkLoops {
|
||||
static final long TEST_TIMEOUT_SECONDS = Utils.adjustTimeout(1000);
|
||||
static final long LONG_DELAY_MS = Utils.adjustTimeout(10_000);
|
||||
static final int THREADS = 4;
|
||||
static final int ITERS = 30_000;
|
||||
|
||||
@ -126,28 +117,13 @@ public final class ParkLoops {
|
||||
final AtomicReferenceArray<Thread> threads
|
||||
= new AtomicReferenceArray<>(THREADS);
|
||||
final CountDownLatch done = new CountDownLatch(THREADS);
|
||||
final Runnable parker = new Parker(threads, done, rnd.split());
|
||||
final Runnable unparker = new Unparker(threads, done, rnd.split());
|
||||
for (int i = 0; i < THREADS; i++) {
|
||||
pool.submit(parker);
|
||||
pool.submit(unparker);
|
||||
}
|
||||
try {
|
||||
if (!done.await(TEST_TIMEOUT_SECONDS, SECONDS)) {
|
||||
dumpAllStacks();
|
||||
throw new AssertionError("lost unpark");
|
||||
}
|
||||
} finally {
|
||||
pool.shutdown();
|
||||
pool.awaitTermination(LONG_DELAY_MS, MILLISECONDS);
|
||||
}
|
||||
}
|
||||
|
||||
static void dumpAllStacks() {
|
||||
ThreadInfo[] threadInfos =
|
||||
ManagementFactory.getThreadMXBean().dumpAllThreads(true, true);
|
||||
for (ThreadInfo threadInfo : threadInfos) {
|
||||
System.err.print(threadInfo);
|
||||
pool.submit(new Parker(threads, done, rnd.split()));
|
||||
pool.submit(new Unparker(threads, done, rnd.split()));
|
||||
}
|
||||
// Let test harness handle timeout
|
||||
done.await();
|
||||
pool.shutdown();
|
||||
pool.awaitTermination(Long.MAX_VALUE, SECONDS);
|
||||
}
|
||||
}
|
||||
|
||||
@ -3322,7 +3322,7 @@ public class CompletableFutureTest extends JSR166TestCase {
|
||||
() -> f.obtrudeException(null),
|
||||
|
||||
() -> CompletableFuture.delayedExecutor(1L, SECONDS, null),
|
||||
() -> CompletableFuture.delayedExecutor(1L, null, new ThreadExecutor()),
|
||||
() -> CompletableFuture.delayedExecutor(1L, null, exec),
|
||||
() -> CompletableFuture.delayedExecutor(1L, null),
|
||||
|
||||
() -> f.orTimeout(1L, null),
|
||||
@ -3552,7 +3552,7 @@ public class CompletableFutureTest extends JSR166TestCase {
|
||||
long timeoutMillis = timeoutMillis();
|
||||
CompletableFuture<Integer> f = new CompletableFuture<>();
|
||||
long startTime = System.nanoTime();
|
||||
f.orTimeout(timeoutMillis, MILLISECONDS);
|
||||
assertSame(f, f.orTimeout(timeoutMillis, MILLISECONDS));
|
||||
checkCompletedWithTimeoutException(f);
|
||||
assertTrue(millisElapsedSince(startTime) >= timeoutMillis);
|
||||
}
|
||||
@ -3567,8 +3567,8 @@ public class CompletableFutureTest extends JSR166TestCase {
|
||||
CompletableFuture<Integer> g = new CompletableFuture<>();
|
||||
long startTime = System.nanoTime();
|
||||
f.complete(v1);
|
||||
f.orTimeout(LONG_DELAY_MS, MILLISECONDS);
|
||||
g.orTimeout(LONG_DELAY_MS, MILLISECONDS);
|
||||
assertSame(f, f.orTimeout(LONG_DELAY_MS, MILLISECONDS));
|
||||
assertSame(g, g.orTimeout(LONG_DELAY_MS, MILLISECONDS));
|
||||
g.complete(v1);
|
||||
checkCompletedNormally(f, v1);
|
||||
checkCompletedNormally(g, v1);
|
||||
@ -3583,11 +3583,14 @@ public class CompletableFutureTest extends JSR166TestCase {
|
||||
() -> testCompleteOnTimeout_timesOut(null));
|
||||
}
|
||||
|
||||
/**
|
||||
* completeOnTimeout completes with given value if not complete
|
||||
*/
|
||||
public void testCompleteOnTimeout_timesOut(Integer v) {
|
||||
long timeoutMillis = timeoutMillis();
|
||||
CompletableFuture<Integer> f = new CompletableFuture<>();
|
||||
long startTime = System.nanoTime();
|
||||
f.completeOnTimeout(v, timeoutMillis, MILLISECONDS);
|
||||
assertSame(f, f.completeOnTimeout(v, timeoutMillis, MILLISECONDS));
|
||||
assertSame(v, f.join());
|
||||
assertTrue(millisElapsedSince(startTime) >= timeoutMillis);
|
||||
f.complete(99); // should have no effect
|
||||
@ -3604,8 +3607,8 @@ public class CompletableFutureTest extends JSR166TestCase {
|
||||
CompletableFuture<Integer> g = new CompletableFuture<>();
|
||||
long startTime = System.nanoTime();
|
||||
f.complete(v1);
|
||||
f.completeOnTimeout(-1, LONG_DELAY_MS, MILLISECONDS);
|
||||
g.completeOnTimeout(-1, LONG_DELAY_MS, MILLISECONDS);
|
||||
assertSame(f, f.completeOnTimeout(-1, LONG_DELAY_MS, MILLISECONDS));
|
||||
assertSame(g, g.completeOnTimeout(-1, LONG_DELAY_MS, MILLISECONDS));
|
||||
g.complete(v1);
|
||||
checkCompletedNormally(f, v1);
|
||||
checkCompletedNormally(g, v1);
|
||||
|
||||
@ -0,0 +1,137 @@
|
||||
/*
|
||||
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
|
||||
*
|
||||
* This code is free software; you can redistribute it and/or modify it
|
||||
* under the terms of the GNU General Public License version 2 only, as
|
||||
* published by the Free Software Foundation.
|
||||
*
|
||||
* This code is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
|
||||
* version 2 for more details (a copy is included in the LICENSE file that
|
||||
* accompanied this code).
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License version
|
||||
* 2 along with this work; if not, write to the Free Software Foundation,
|
||||
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
|
||||
*
|
||||
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
|
||||
* or visit www.oracle.com if you need additional information or have any
|
||||
* questions.
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is available under and governed by the GNU General Public
|
||||
* License version 2 only, as published by the Free Software Foundation.
|
||||
* However, the following notice accompanied the original version of this
|
||||
* file:
|
||||
*
|
||||
* Written by Doug Lea and Martin Buchholz with assistance from
|
||||
* members of JCP JSR-166 Expert Group and released to the public
|
||||
* domain, as explained at
|
||||
* http://creativecommons.org/publicdomain/zero/1.0/
|
||||
*/
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.HashSet;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.CompletionService;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.ExecutorCompletionService;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
import junit.framework.Test;
|
||||
import junit.framework.TestSuite;
|
||||
|
||||
public class ExecutorCompletionService9Test extends JSR166TestCase {
|
||||
public static void main(String[] args) {
|
||||
main(suite(), args);
|
||||
}
|
||||
public static Test suite() {
|
||||
return new TestSuite(ExecutorCompletionService9Test.class);
|
||||
}
|
||||
|
||||
void solveAll(Executor e,
|
||||
Collection<Callable<Integer>> solvers)
|
||||
throws InterruptedException, ExecutionException {
|
||||
CompletionService<Integer> cs
|
||||
= new ExecutorCompletionService<>(e);
|
||||
solvers.forEach(cs::submit);
|
||||
for (int i = solvers.size(); i > 0; i--) {
|
||||
Integer r = cs.take().get();
|
||||
if (r != null)
|
||||
use(r);
|
||||
}
|
||||
}
|
||||
|
||||
void solveAny(Executor e,
|
||||
Collection<Callable<Integer>> solvers)
|
||||
throws InterruptedException {
|
||||
CompletionService<Integer> cs
|
||||
= new ExecutorCompletionService<>(e);
|
||||
int n = solvers.size();
|
||||
List<Future<Integer>> futures = new ArrayList<>(n);
|
||||
Integer result = null;
|
||||
try {
|
||||
solvers.forEach((solver) -> futures.add(cs.submit(solver)));
|
||||
for (int i = n; i > 0; i--) {
|
||||
try {
|
||||
Integer r = cs.take().get();
|
||||
if (r != null) {
|
||||
result = r;
|
||||
break;
|
||||
}
|
||||
} catch (ExecutionException ignore) {}
|
||||
}
|
||||
} finally {
|
||||
futures.forEach((future) -> future.cancel(true));
|
||||
}
|
||||
|
||||
if (result != null)
|
||||
use(result);
|
||||
}
|
||||
|
||||
HashSet<Integer> results;
|
||||
|
||||
void use(Integer x) {
|
||||
if (results == null) results = new HashSet<Integer>();
|
||||
results.add(x);
|
||||
}
|
||||
|
||||
/**
|
||||
* The first "solvers" sample code in the class javadoc works.
|
||||
*/
|
||||
public void testSolveAll()
|
||||
throws InterruptedException, ExecutionException {
|
||||
Set<Callable<Integer>> solvers = Set.of(
|
||||
() -> null,
|
||||
() -> 1,
|
||||
() -> 2,
|
||||
() -> 3,
|
||||
() -> null);
|
||||
solveAll(cachedThreadPool, solvers);
|
||||
assertEquals(Set.of(1, 2, 3), results);
|
||||
}
|
||||
|
||||
/**
|
||||
* The second "solvers" sample code in the class javadoc works.
|
||||
*/
|
||||
public void testSolveAny()
|
||||
throws InterruptedException {
|
||||
Set<Callable<Integer>> solvers = Set.of(
|
||||
() -> { throw new ArithmeticException(); },
|
||||
() -> null,
|
||||
() -> 1,
|
||||
() -> 2);
|
||||
solveAny(cachedThreadPool, solvers);
|
||||
assertEquals(1, results.size());
|
||||
Integer elt = results.iterator().next();
|
||||
assertTrue(elt.equals(1) || elt.equals(2));
|
||||
}
|
||||
|
||||
}
|
||||
@ -37,8 +37,11 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS;
|
||||
|
||||
import java.util.concurrent.ArrayBlockingQueue;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.CompletionService;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.ExecutorCompletionService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.FutureTask;
|
||||
@ -59,7 +62,7 @@ public class ExecutorCompletionServiceTest extends JSR166TestCase {
|
||||
}
|
||||
|
||||
/**
|
||||
* Creating a new ECS with null Executor throw NPE
|
||||
* new ExecutorCompletionService(null) throws NullPointerException
|
||||
*/
|
||||
public void testConstructorNPE() {
|
||||
try {
|
||||
@ -69,111 +72,147 @@ public class ExecutorCompletionServiceTest extends JSR166TestCase {
|
||||
}
|
||||
|
||||
/**
|
||||
* Creating a new ECS with null queue throw NPE
|
||||
* new ExecutorCompletionService(e, null) throws NullPointerException
|
||||
*/
|
||||
public void testConstructorNPE2() {
|
||||
try {
|
||||
ExecutorService e = Executors.newCachedThreadPool();
|
||||
new ExecutorCompletionService(e, null);
|
||||
new ExecutorCompletionService(cachedThreadPool, null);
|
||||
shouldThrow();
|
||||
} catch (NullPointerException success) {}
|
||||
}
|
||||
|
||||
/**
|
||||
* Submitting a null callable throws NPE
|
||||
* ecs.submit(null) throws NullPointerException
|
||||
*/
|
||||
public void testSubmitNPE() {
|
||||
final ExecutorService e = Executors.newCachedThreadPool();
|
||||
final ExecutorCompletionService ecs = new ExecutorCompletionService(e);
|
||||
try (PoolCleaner cleaner = cleaner(e)) {
|
||||
Callable c = null;
|
||||
try {
|
||||
ecs.submit(c);
|
||||
shouldThrow();
|
||||
} catch (NullPointerException success) {}
|
||||
}
|
||||
public void testSubmitNullCallable() {
|
||||
CompletionService cs = new ExecutorCompletionService(cachedThreadPool);
|
||||
try {
|
||||
cs.submit((Callable) null);
|
||||
shouldThrow();
|
||||
} catch (NullPointerException success) {}
|
||||
}
|
||||
|
||||
/**
|
||||
* Submitting a null runnable throws NPE
|
||||
* ecs.submit(null, val) throws NullPointerException
|
||||
*/
|
||||
public void testSubmitNPE2() {
|
||||
final ExecutorService e = Executors.newCachedThreadPool();
|
||||
final ExecutorCompletionService ecs = new ExecutorCompletionService(e);
|
||||
try (PoolCleaner cleaner = cleaner(e)) {
|
||||
Runnable r = null;
|
||||
try {
|
||||
ecs.submit(r, Boolean.TRUE);
|
||||
shouldThrow();
|
||||
} catch (NullPointerException success) {}
|
||||
}
|
||||
public void testSubmitNullRunnable() {
|
||||
CompletionService cs = new ExecutorCompletionService(cachedThreadPool);
|
||||
try {
|
||||
cs.submit((Runnable) null, Boolean.TRUE);
|
||||
shouldThrow();
|
||||
} catch (NullPointerException success) {}
|
||||
}
|
||||
|
||||
/**
|
||||
* A taken submitted task is completed
|
||||
*/
|
||||
public void testTake() throws InterruptedException {
|
||||
final ExecutorService e = Executors.newCachedThreadPool();
|
||||
final ExecutorCompletionService ecs = new ExecutorCompletionService(e);
|
||||
try (PoolCleaner cleaner = cleaner(e)) {
|
||||
Callable c = new StringTask();
|
||||
ecs.submit(c);
|
||||
Future f = ecs.take();
|
||||
assertTrue(f.isDone());
|
||||
}
|
||||
public void testTake()
|
||||
throws InterruptedException, ExecutionException {
|
||||
CompletionService cs = new ExecutorCompletionService(cachedThreadPool);
|
||||
cs.submit(new StringTask());
|
||||
Future f = cs.take();
|
||||
assertTrue(f.isDone());
|
||||
assertSame(TEST_STRING, f.get());
|
||||
}
|
||||
|
||||
/**
|
||||
* Take returns the same future object returned by submit
|
||||
*/
|
||||
public void testTake2() throws InterruptedException {
|
||||
final ExecutorService e = Executors.newCachedThreadPool();
|
||||
final ExecutorCompletionService ecs = new ExecutorCompletionService(e);
|
||||
try (PoolCleaner cleaner = cleaner(e)) {
|
||||
Callable c = new StringTask();
|
||||
Future f1 = ecs.submit(c);
|
||||
Future f2 = ecs.take();
|
||||
assertSame(f1, f2);
|
||||
}
|
||||
CompletionService cs = new ExecutorCompletionService(cachedThreadPool);
|
||||
Future f1 = cs.submit(new StringTask());
|
||||
Future f2 = cs.take();
|
||||
assertSame(f1, f2);
|
||||
}
|
||||
|
||||
/**
|
||||
* If poll returns non-null, the returned task is completed
|
||||
* poll returns non-null when the returned task is completed
|
||||
*/
|
||||
public void testPoll1() throws Exception {
|
||||
final ExecutorService e = Executors.newCachedThreadPool();
|
||||
final ExecutorCompletionService ecs = new ExecutorCompletionService(e);
|
||||
try (PoolCleaner cleaner = cleaner(e)) {
|
||||
assertNull(ecs.poll());
|
||||
Callable c = new StringTask();
|
||||
ecs.submit(c);
|
||||
public void testPoll1()
|
||||
throws InterruptedException, ExecutionException {
|
||||
CompletionService cs = new ExecutorCompletionService(cachedThreadPool);
|
||||
assertNull(cs.poll());
|
||||
cs.submit(new StringTask());
|
||||
|
||||
long startTime = System.nanoTime();
|
||||
Future f;
|
||||
while ((f = ecs.poll()) == null) {
|
||||
if (millisElapsedSince(startTime) > LONG_DELAY_MS)
|
||||
fail("timed out");
|
||||
Thread.yield();
|
||||
long startTime = System.nanoTime();
|
||||
Future f;
|
||||
while ((f = cs.poll()) == null) {
|
||||
if (millisElapsedSince(startTime) > LONG_DELAY_MS)
|
||||
fail("timed out");
|
||||
Thread.yield();
|
||||
}
|
||||
assertTrue(f.isDone());
|
||||
assertSame(TEST_STRING, f.get());
|
||||
}
|
||||
|
||||
/**
|
||||
* timed poll returns non-null when the returned task is completed
|
||||
*/
|
||||
public void testPoll2()
|
||||
throws InterruptedException, ExecutionException {
|
||||
CompletionService cs = new ExecutorCompletionService(cachedThreadPool);
|
||||
assertNull(cs.poll());
|
||||
cs.submit(new StringTask());
|
||||
|
||||
long startTime = System.nanoTime();
|
||||
Future f;
|
||||
while ((f = cs.poll(SHORT_DELAY_MS, MILLISECONDS)) == null) {
|
||||
if (millisElapsedSince(startTime) > LONG_DELAY_MS)
|
||||
fail("timed out");
|
||||
Thread.yield();
|
||||
}
|
||||
assertTrue(f.isDone());
|
||||
assertSame(TEST_STRING, f.get());
|
||||
}
|
||||
|
||||
/**
|
||||
* poll returns null before the returned task is completed
|
||||
*/
|
||||
public void testPollReturnsNull()
|
||||
throws InterruptedException, ExecutionException {
|
||||
CompletionService cs = new ExecutorCompletionService(cachedThreadPool);
|
||||
final CountDownLatch proceed = new CountDownLatch(1);
|
||||
cs.submit(new Callable() { public String call() throws Exception {
|
||||
proceed.await();
|
||||
return TEST_STRING;
|
||||
}});
|
||||
assertNull(cs.poll());
|
||||
assertNull(cs.poll(0L, MILLISECONDS));
|
||||
assertNull(cs.poll(Long.MIN_VALUE, MILLISECONDS));
|
||||
long startTime = System.nanoTime();
|
||||
assertNull(cs.poll(timeoutMillis(), MILLISECONDS));
|
||||
assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
|
||||
proceed.countDown();
|
||||
assertSame(TEST_STRING, cs.take().get());
|
||||
}
|
||||
|
||||
/**
|
||||
* successful and failed tasks are both returned
|
||||
*/
|
||||
public void testTaskAssortment()
|
||||
throws InterruptedException, ExecutionException {
|
||||
CompletionService cs = new ExecutorCompletionService(cachedThreadPool);
|
||||
ArithmeticException ex = new ArithmeticException();
|
||||
for (int i = 0; i < 2; i++) {
|
||||
cs.submit(new StringTask());
|
||||
cs.submit(callableThrowing(ex));
|
||||
cs.submit(runnableThrowing(ex), null);
|
||||
}
|
||||
int normalCompletions = 0;
|
||||
int exceptionalCompletions = 0;
|
||||
for (int i = 0; i < 3 * 2; i++) {
|
||||
try {
|
||||
if (cs.take().get() == TEST_STRING)
|
||||
normalCompletions++;
|
||||
}
|
||||
catch (ExecutionException expected) {
|
||||
assertTrue(expected.getCause() instanceof ArithmeticException);
|
||||
exceptionalCompletions++;
|
||||
}
|
||||
assertTrue(f.isDone());
|
||||
assertSame(TEST_STRING, f.get());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* If timed poll returns non-null, the returned task is completed
|
||||
*/
|
||||
public void testPoll2() throws InterruptedException {
|
||||
final ExecutorService e = Executors.newCachedThreadPool();
|
||||
final ExecutorCompletionService ecs = new ExecutorCompletionService(e);
|
||||
try (PoolCleaner cleaner = cleaner(e)) {
|
||||
assertNull(ecs.poll());
|
||||
Callable c = new StringTask();
|
||||
ecs.submit(c);
|
||||
Future f = ecs.poll(SHORT_DELAY_MS, MILLISECONDS);
|
||||
if (f != null)
|
||||
assertTrue(f.isDone());
|
||||
}
|
||||
assertEquals(2 * 1, normalCompletions);
|
||||
assertEquals(2 * 2, exceptionalCompletions);
|
||||
assertNull(cs.poll());
|
||||
}
|
||||
|
||||
/**
|
||||
@ -184,7 +223,7 @@ public class ExecutorCompletionServiceTest extends JSR166TestCase {
|
||||
final AtomicBoolean done = new AtomicBoolean(false);
|
||||
class MyCallableFuture<V> extends FutureTask<V> {
|
||||
MyCallableFuture(Callable<V> c) { super(c); }
|
||||
protected void done() { done.set(true); }
|
||||
@Override protected void done() { done.set(true); }
|
||||
}
|
||||
final ExecutorService e =
|
||||
new ThreadPoolExecutor(1, 1,
|
||||
@ -193,15 +232,14 @@ public class ExecutorCompletionServiceTest extends JSR166TestCase {
|
||||
protected <T> RunnableFuture<T> newTaskFor(Callable<T> c) {
|
||||
return new MyCallableFuture<T>(c);
|
||||
}};
|
||||
ExecutorCompletionService<String> ecs =
|
||||
new ExecutorCompletionService<String>(e);
|
||||
CompletionService<String> cs = new ExecutorCompletionService<>(e);
|
||||
try (PoolCleaner cleaner = cleaner(e)) {
|
||||
assertNull(ecs.poll());
|
||||
assertNull(cs.poll());
|
||||
Callable<String> c = new StringTask();
|
||||
Future f1 = ecs.submit(c);
|
||||
Future f1 = cs.submit(c);
|
||||
assertTrue("submit must return MyCallableFuture",
|
||||
f1 instanceof MyCallableFuture);
|
||||
Future f2 = ecs.take();
|
||||
Future f2 = cs.take();
|
||||
assertSame("submit and take must return same objects", f1, f2);
|
||||
assertTrue("completed task must have set done", done.get());
|
||||
}
|
||||
@ -215,7 +253,7 @@ public class ExecutorCompletionServiceTest extends JSR166TestCase {
|
||||
final AtomicBoolean done = new AtomicBoolean(false);
|
||||
class MyRunnableFuture<V> extends FutureTask<V> {
|
||||
MyRunnableFuture(Runnable t, V r) { super(t, r); }
|
||||
protected void done() { done.set(true); }
|
||||
@Override protected void done() { done.set(true); }
|
||||
}
|
||||
final ExecutorService e =
|
||||
new ThreadPoolExecutor(1, 1,
|
||||
@ -224,15 +262,14 @@ public class ExecutorCompletionServiceTest extends JSR166TestCase {
|
||||
protected <T> RunnableFuture<T> newTaskFor(Runnable t, T r) {
|
||||
return new MyRunnableFuture<T>(t, r);
|
||||
}};
|
||||
final ExecutorCompletionService<String> ecs =
|
||||
new ExecutorCompletionService<String>(e);
|
||||
CompletionService<String> cs = new ExecutorCompletionService<>(e);
|
||||
try (PoolCleaner cleaner = cleaner(e)) {
|
||||
assertNull(ecs.poll());
|
||||
assertNull(cs.poll());
|
||||
Runnable r = new NoOpRunnable();
|
||||
Future f1 = ecs.submit(r, null);
|
||||
Future f1 = cs.submit(r, null);
|
||||
assertTrue("submit must return MyRunnableFuture",
|
||||
f1 instanceof MyRunnableFuture);
|
||||
Future f2 = ecs.take();
|
||||
Future f2 = cs.take();
|
||||
assertSame("submit and take must return same objects", f1, f2);
|
||||
assertTrue("completed task must have set done", done.get());
|
||||
}
|
||||
|
||||
@ -39,6 +39,7 @@
|
||||
* @modules java.management
|
||||
* @build *
|
||||
* @run junit/othervm/timeout=1000 -Djsr166.testImplementationDetails=true JSR166TestCase
|
||||
* @run junit/othervm/timeout=1000 -Djava.util.concurrent.ForkJoinPool.common.parallelism=0 -Djsr166.testImplementationDetails=true JSR166TestCase
|
||||
*/
|
||||
|
||||
import static java.util.concurrent.TimeUnit.MILLISECONDS;
|
||||
@ -85,6 +86,7 @@ import java.util.concurrent.RecursiveAction;
|
||||
import java.util.concurrent.RecursiveTask;
|
||||
import java.util.concurrent.RejectedExecutionHandler;
|
||||
import java.util.concurrent.Semaphore;
|
||||
import java.util.concurrent.SynchronousQueue;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
@ -546,7 +548,7 @@ public class JSR166TestCase extends TestCase {
|
||||
// Java9+ test classes
|
||||
if (atLeastJava9()) {
|
||||
String[] java9TestClassNames = {
|
||||
// Currently empty, but expecting varhandle tests
|
||||
"ExecutorCompletionService9Test",
|
||||
};
|
||||
addNamedTestClasses(suite, java9TestClassNames);
|
||||
}
|
||||
@ -1860,4 +1862,19 @@ public class JSR166TestCase extends TestCase {
|
||||
} catch (NoSuchElementException success) {}
|
||||
assertFalse(it.hasNext());
|
||||
}
|
||||
|
||||
public <T> Callable<T> callableThrowing(final Exception ex) {
|
||||
return new Callable<T>() { public T call() throws Exception { throw ex; }};
|
||||
}
|
||||
|
||||
public Runnable runnableThrowing(final RuntimeException ex) {
|
||||
return new Runnable() { public void run() { throw ex; }};
|
||||
}
|
||||
|
||||
/** A reusable thread pool to be shared by tests. */
|
||||
static final ExecutorService cachedThreadPool =
|
||||
new ThreadPoolExecutor(0, Integer.MAX_VALUE,
|
||||
1000L, MILLISECONDS,
|
||||
new SynchronousQueue<Runnable>());
|
||||
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user