8157523: Various improvements to ForkJoin/SubmissionPublisher code

Reviewed-by: martin, psandoz, rriggs, plevart, dfuchs
This commit is contained in:
Doug Lea 2016-07-15 13:55:51 -07:00
parent aa81d50820
commit 7fa43fa58f
5 changed files with 1453 additions and 1829 deletions

View File

@ -36,6 +36,8 @@
package java.util.concurrent;
import java.io.Serializable;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
import java.lang.ref.ReferenceQueue;
import java.lang.ref.WeakReference;
import java.lang.reflect.Constructor;
@ -92,7 +94,7 @@ import java.util.concurrent.locks.ReentrantLock;
* encountering the exception; minimally only the latter.
*
* <p>It is possible to define and use ForkJoinTasks that may block,
* but doing do requires three further considerations: (1) Completion
* but doing so requires three further considerations: (1) Completion
* of few if any <em>other</em> tasks should be dependent on a task
* that blocks on external synchronization or I/O. Event-style async
* tasks that are never joined (for example, those subclassing {@link
@ -259,7 +261,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
for (int s;;) {
if ((s = status) < 0)
return s;
if (U.compareAndSwapInt(this, STATUS, s, s | completion)) {
if (STATUS.compareAndSet(this, s, s | completion)) {
if ((s >>> 16) != 0)
synchronized (this) { notifyAll(); }
return completion;
@ -297,7 +299,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
final void internalWait(long timeout) {
int s;
if ((s = status) >= 0 && // force completer to issue notify
U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
STATUS.compareAndSet(this, s, s | SIGNAL)) {
synchronized (this) {
if (status >= 0)
try { wait(timeout); } catch (InterruptedException ie) { }
@ -319,7 +321,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
if (s >= 0 && (s = status) >= 0) {
boolean interrupted = false;
do {
if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
if (STATUS.compareAndSet(this, s, s | SIGNAL)) {
synchronized (this) {
if (status >= 0) {
try {
@ -353,7 +355,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
ForkJoinPool.common.tryExternalUnpush(this) ? doExec() :
0)) >= 0) {
while ((s = status) >= 0) {
if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
if (STATUS.compareAndSet(this, s, s | SIGNAL)) {
synchronized (this) {
if (status >= 0)
wait(0L);
@ -400,22 +402,24 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
// Exception table support
/**
* Table of exceptions thrown by tasks, to enable reporting by
* callers. Because exceptions are rare, we don't directly keep
* Hash table of exceptions thrown by tasks, to enable reporting
* by callers. Because exceptions are rare, we don't directly keep
* them with task objects, but instead use a weak ref table. Note
* that cancellation exceptions don't appear in the table, but are
* instead recorded as status values.
*
* Note: These statics are initialized below in static block.
* The exception table has a fixed capacity.
*/
private static final ExceptionNode[] exceptionTable;
private static final ReentrantLock exceptionTableLock;
private static final ReferenceQueue<Object> exceptionTableRefQueue;
private static final ExceptionNode[] exceptionTable
= new ExceptionNode[32];
/**
* Fixed capacity for exceptionTable.
*/
private static final int EXCEPTION_MAP_CAPACITY = 32;
/** Lock protecting access to exceptionTable. */
private static final ReentrantLock exceptionTableLock
= new ReentrantLock();
/** Reference queue of stale exceptionally completed tasks. */
private static final ReferenceQueue<ForkJoinTask<?>> exceptionTableRefQueue
= new ReferenceQueue<ForkJoinTask<?>>();
/**
* Key-value nodes for exception table. The chained hash table
@ -435,7 +439,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
final long thrower; // use id not ref to avoid weak cycles
final int hashCode; // store task hashCode before weak ref disappears
ExceptionNode(ForkJoinTask<?> task, Throwable ex, ExceptionNode next,
ReferenceQueue<Object> exceptionTableRefQueue) {
ReferenceQueue<ForkJoinTask<?>> exceptionTableRefQueue) {
super(task, exceptionTableRefQueue);
this.ex = ex;
this.next = next;
@ -599,9 +603,8 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
private static void expungeStaleExceptions() {
for (Object x; (x = exceptionTableRefQueue.poll()) != null;) {
if (x instanceof ExceptionNode) {
int hashCode = ((ExceptionNode)x).hashCode;
ExceptionNode[] t = exceptionTable;
int i = hashCode & (t.length - 1);
int i = ((ExceptionNode)x).hashCode & (t.length - 1);
ExceptionNode e = t[i];
ExceptionNode pred = null;
while (e != null) {
@ -1031,7 +1034,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
while ((s = status) >= 0 &&
(ns = deadline - System.nanoTime()) > 0L) {
if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) > 0L &&
U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
STATUS.compareAndSet(this, s, s | SIGNAL)) {
synchronized (this) {
if (status >= 0)
wait(ms); // OK to throw InterruptedException
@ -1324,8 +1327,8 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
*/
public final short setForkJoinTaskTag(short newValue) {
for (int s;;) {
if (U.compareAndSwapInt(this, STATUS, s = status,
(s & ~SMASK) | (newValue & SMASK)))
if (STATUS.compareAndSet(this, s = status,
(s & ~SMASK) | (newValue & SMASK)))
return (short)s;
}
}
@ -1348,8 +1351,8 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
for (int s;;) {
if ((short)(s = status) != expect)
return false;
if (U.compareAndSwapInt(this, STATUS, s,
(s & ~SMASK) | (update & SMASK)))
if (STATUS.compareAndSet(this, s,
(s & ~SMASK) | (update & SMASK)))
return true;
}
}
@ -1510,17 +1513,12 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
setExceptionalCompletion((Throwable)ex);
}
// Unsafe mechanics
private static final jdk.internal.misc.Unsafe U = jdk.internal.misc.Unsafe.getUnsafe();
private static final long STATUS;
// VarHandle mechanics
private static final VarHandle STATUS;
static {
exceptionTableLock = new ReentrantLock();
exceptionTableRefQueue = new ReferenceQueue<Object>();
exceptionTable = new ExceptionNode[EXCEPTION_MAP_CAPACITY];
try {
STATUS = U.objectFieldOffset
(ForkJoinTask.class.getDeclaredField("status"));
MethodHandles.Lookup l = MethodHandles.lookup();
STATUS = l.findVarHandle(ForkJoinTask.class, "status", int.class);
} catch (ReflectiveOperationException e) {
throw new Error(e);
}

View File

@ -66,8 +66,9 @@ public class ForkJoinWorkerThread extends Thread {
* owning thread.
*
* Support for (non-public) subclass InnocuousForkJoinWorkerThread
* requires that we break quite a lot of encapsulation (via Unsafe)
* both here and in the subclass to access and set Thread fields.
* requires that we break quite a lot of encapsulation (via helper
* methods in ThreadLocalRandom) both here and in the subclass to
* access and set Thread fields.
*/
final ForkJoinPool pool; // the pool this thread works in
@ -92,8 +93,8 @@ public class ForkJoinWorkerThread extends Thread {
ForkJoinWorkerThread(ForkJoinPool pool, ThreadGroup threadGroup,
AccessControlContext acc) {
super(threadGroup, null, "aForkJoinWorkerThread");
U.putObjectRelease(this, INHERITEDACCESSCONTROLCONTEXT, acc);
eraseThreadLocals(); // clear before registering
ThreadLocalRandom.setInheritedAccessControlContext(this, acc);
ThreadLocalRandom.eraseThreadLocals(this); // clear before registering
this.pool = pool;
this.workQueue = pool.registerWorker(this);
}
@ -170,38 +171,12 @@ public class ForkJoinWorkerThread extends Thread {
}
}
/**
* Erases ThreadLocals by nulling out Thread maps.
*/
final void eraseThreadLocals() {
U.putObject(this, THREADLOCALS, null);
U.putObject(this, INHERITABLETHREADLOCALS, null);
}
/**
* Non-public hook method for InnocuousForkJoinWorkerThread.
*/
void afterTopLevelExec() {
}
// Set up to allow setting thread fields in constructor
private static final jdk.internal.misc.Unsafe U = jdk.internal.misc.Unsafe.getUnsafe();
private static final long THREADLOCALS;
private static final long INHERITABLETHREADLOCALS;
private static final long INHERITEDACCESSCONTROLCONTEXT;
static {
try {
THREADLOCALS = U.objectFieldOffset
(Thread.class.getDeclaredField("threadLocals"));
INHERITABLETHREADLOCALS = U.objectFieldOffset
(Thread.class.getDeclaredField("inheritableThreadLocals"));
INHERITEDACCESSCONTROLCONTEXT = U.objectFieldOffset
(Thread.class.getDeclaredField("inheritedAccessControlContext"));
} catch (ReflectiveOperationException e) {
throw new Error(e);
}
}
/**
* A worker thread that has no permissions, is not a member of any
* user-defined ThreadGroup, and erases all ThreadLocals after
@ -210,7 +185,7 @@ public class ForkJoinWorkerThread extends Thread {
static final class InnocuousForkJoinWorkerThread extends ForkJoinWorkerThread {
/** The ThreadGroup for all InnocuousForkJoinWorkerThreads */
private static final ThreadGroup innocuousThreadGroup =
createThreadGroup();
ThreadLocalRandom.createThreadGroup("InnocuousForkJoinWorkerThreadGroup");
/** An AccessControlContext supporting no privileges */
private static final AccessControlContext INNOCUOUS_ACC =
@ -225,7 +200,7 @@ public class ForkJoinWorkerThread extends Thread {
@Override // to erase ThreadLocals
void afterTopLevelExec() {
eraseThreadLocals();
ThreadLocalRandom.eraseThreadLocals(this);
}
@Override // to always report system loader
@ -241,33 +216,5 @@ public class ForkJoinWorkerThread extends Thread {
throw new SecurityException("setContextClassLoader");
}
/**
* Returns a new group with the system ThreadGroup (the
* topmost, parent-less group) as parent. Uses Unsafe to
* traverse Thread.group and ThreadGroup.parent fields.
*/
private static ThreadGroup createThreadGroup() {
try {
jdk.internal.misc.Unsafe u = jdk.internal.misc.Unsafe.getUnsafe();
long tg = u.objectFieldOffset
(Thread.class.getDeclaredField("group"));
long gp = u.objectFieldOffset
(ThreadGroup.class.getDeclaredField("parent"));
ThreadGroup group = (ThreadGroup)
u.getObject(Thread.currentThread(), tg);
while (group != null) {
ThreadGroup parent = (ThreadGroup)u.getObject(group, gp);
if (parent == null)
return new ThreadGroup(group,
"InnocuousForkJoinWorkerThreadGroup");
group = parent;
}
} catch (ReflectiveOperationException e) {
throw new Error(e);
}
// fall through if null as cannot-happen safeguard
throw new Error("Cannot create ThreadGroup");
}
}
}

View File

@ -35,6 +35,8 @@
package java.util.concurrent;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.LockSupport;
@ -866,7 +868,7 @@ public class SubmissionPublisher<T> implements Flow.Publisher<T>,
/** Subscriber for method consume */
private static final class ConsumerSubscriber<T>
implements Flow.Subscriber<T> {
implements Flow.Subscriber<T> {
final CompletableFuture<Void> status;
final Consumer<? super T> consumer;
Flow.Subscription subscription;
@ -906,7 +908,7 @@ public class SubmissionPublisher<T> implements Flow.Publisher<T>,
*/
@SuppressWarnings("serial")
static final class ConsumerTask<T> extends ForkJoinTask<Void>
implements Runnable {
implements Runnable, CompletableFuture.AsynchronousCompletionTask {
final BufferedSubscription<T> consumer;
ConsumerTask(BufferedSubscription<T> consumer) {
this.consumer = consumer;
@ -959,11 +961,9 @@ public class SubmissionPublisher<T> implements Flow.Publisher<T>,
* Blocking control relies on the "waiter" field. Producers set
* the field before trying to block, but must then recheck (via
* offer) before parking. Signalling then just unparks and clears
* waiter field. If the producer and consumer are both in the same
* ForkJoinPool, or consumers are running in commonPool, the
* producer attempts to help run consumer tasks that it forked
* before blocking. To avoid potential cycles, only one level of
* helping is currently supported.
* waiter field. If the producer and/or consumer are using a
* ForkJoinPool, the producer attempts to help run consumer tasks
* via ForkJoinPool.helpAsyncBlocker before blocking.
*
* This class uses @Contended and heuristic field declaration
* ordering to reduce false-sharing-based memory contention among
@ -983,7 +983,6 @@ public class SubmissionPublisher<T> implements Flow.Publisher<T>,
volatile long demand; // # unfilled requests
int maxCapacity; // reduced on OOME
int putStat; // offer result for ManagedBlocker
int helpDepth; // nested helping depth (at most 1)
volatile int ctl; // atomic run state flags
volatile int head; // next position to take
int tail; // next position to put
@ -1077,7 +1076,7 @@ public class SubmissionPublisher<T> implements Flow.Publisher<T>,
alloc = true;
}
else {
U.fullFence(); // recheck
VarHandle.fullFence(); // recheck
int h = head, t = tail, size = t + 1 - h;
if (cap >= size) {
a[(cap - 1) & t] = item;
@ -1116,10 +1115,10 @@ public class SubmissionPublisher<T> implements Flow.Publisher<T>,
if (a != null && cap > 0) {
int mask = cap - 1;
for (int j = head; j != t; ++j) {
long k = ((long)(j & mask) << ASHIFT) + ABASE;
Object x = U.getObjectVolatile(a, k);
int k = j & mask;
Object x = QA.getAcquire(a, k);
if (x != null && // races with consumer
U.compareAndSwapObject(a, k, x, null))
QA.compareAndSet(a, k, x, null))
newArray[j & newMask] = x;
}
}
@ -1136,100 +1135,43 @@ public class SubmissionPublisher<T> implements Flow.Publisher<T>,
* initial offer return 0.
*/
final int submit(T item) {
int stat; Executor e; ForkJoinWorkerThread w;
if ((stat = offer(item)) == 0 && helpDepth == 0 &&
((e = executor) instanceof ForkJoinPool)) {
helpDepth = 1;
Thread thread = Thread.currentThread();
if ((thread instanceof ForkJoinWorkerThread) &&
((w = (ForkJoinWorkerThread)thread)).getPool() == e)
stat = internalHelpConsume(w.workQueue, item);
else if (e == ForkJoinPool.commonPool())
stat = externalHelpConsume
(ForkJoinPool.commonSubmitterQueue(), item);
helpDepth = 0;
}
if (stat == 0 && (stat = offer(item)) == 0) {
int stat;
if ((stat = offer(item)) == 0) {
putItem = item;
timeout = 0L;
try {
ForkJoinPool.managedBlock(this);
} catch (InterruptedException ie) {
timeout = INTERRUPTED;
putStat = 0;
ForkJoinPool.helpAsyncBlocker(executor, this);
if ((stat = putStat) == 0) {
try {
ForkJoinPool.managedBlock(this);
} catch (InterruptedException ie) {
timeout = INTERRUPTED;
}
stat = putStat;
}
stat = putStat;
if (timeout < 0L)
Thread.currentThread().interrupt();
}
return stat;
}
/**
* Tries helping for FJ submitter.
*/
private int internalHelpConsume(ForkJoinPool.WorkQueue w, T item) {
int stat = 0;
if (w != null) {
ForkJoinTask<?> t;
while ((t = w.peek()) != null && (t instanceof ConsumerTask)) {
if ((stat = offer(item)) != 0 || !w.tryUnpush(t))
break;
((ConsumerTask<?>)t).consumer.consume();
}
}
return stat;
}
/**
* Tries helping for non-FJ submitter.
*/
private int externalHelpConsume(ForkJoinPool.WorkQueue w, T item) {
int stat = 0;
if (w != null) {
ForkJoinTask<?> t;
while ((t = w.peek()) != null && (t instanceof ConsumerTask)) {
if ((stat = offer(item)) != 0 || !w.trySharedUnpush(t))
break;
((ConsumerTask<?>)t).consumer.consume();
}
}
return stat;
}
/**
* Timeout version; similar to submit.
*/
final int timedOffer(T item, long nanos) {
int stat; Executor e;
if ((stat = offer(item)) == 0 && helpDepth == 0 &&
((e = executor) instanceof ForkJoinPool)) {
Thread thread = Thread.currentThread();
if (((thread instanceof ForkJoinWorkerThread) &&
((ForkJoinWorkerThread)thread).getPool() == e) ||
e == ForkJoinPool.commonPool()) {
helpDepth = 1;
ForkJoinTask<?> t;
long deadline = System.nanoTime() + nanos;
while ((t = ForkJoinTask.peekNextLocalTask()) != null &&
(t instanceof ConsumerTask)) {
if ((stat = offer(item)) != 0 ||
(nanos = deadline - System.nanoTime()) <= 0L ||
!t.tryUnfork())
break;
((ConsumerTask<?>)t).consumer.consume();
}
helpDepth = 0;
}
}
if (stat == 0 && (stat = offer(item)) == 0 &&
(timeout = nanos) > 0L) {
int stat;
if ((stat = offer(item)) == 0 && (timeout = nanos) > 0L) {
putItem = item;
try {
ForkJoinPool.managedBlock(this);
} catch (InterruptedException ie) {
timeout = INTERRUPTED;
putStat = 0;
ForkJoinPool.helpAsyncBlocker(executor, this);
if ((stat = putStat) == 0) {
try {
ForkJoinPool.managedBlock(this);
} catch (InterruptedException ie) {
timeout = INTERRUPTED;
}
stat = putStat;
}
stat = putStat;
if (timeout < 0L)
Thread.currentThread().interrupt();
}
@ -1249,22 +1191,20 @@ public class SubmissionPublisher<T> implements Flow.Publisher<T>,
}
else if ((c & ACTIVE) != 0) { // ensure keep-alive
if ((c & CONSUME) != 0 ||
U.compareAndSwapInt(this, CTL, c,
c | CONSUME))
CTL.compareAndSet(this, c, c | CONSUME))
break;
}
else if (demand == 0L || tail == head)
break;
else if (U.compareAndSwapInt(this, CTL, c,
c | (ACTIVE | CONSUME))) {
else if (CTL.compareAndSet(this, c, c | (ACTIVE | CONSUME))) {
try {
e.execute(new ConsumerTask<T>(this));
break;
} catch (RuntimeException | Error ex) { // back out
do {} while (((c = ctl) & DISABLED) == 0 &&
(c & ACTIVE) != 0 &&
!U.compareAndSwapInt(this, CTL, c,
c & ~ACTIVE));
!CTL.weakCompareAndSetVolatile
(this, c, c & ~ACTIVE));
throw ex;
}
}
@ -1300,10 +1240,10 @@ public class SubmissionPublisher<T> implements Flow.Publisher<T>,
break;
else if ((c & ACTIVE) != 0) {
pendingError = ex;
if (U.compareAndSwapInt(this, CTL, c, c | ERROR))
if (CTL.compareAndSet(this, c, c | ERROR))
break; // cause consumer task to exit
}
else if (U.compareAndSwapInt(this, CTL, c, DISABLED)) {
else if (CTL.compareAndSet(this, c, DISABLED)) {
Flow.Subscriber<? super T> s = subscriber;
if (s != null && ex != null) {
try {
@ -1330,7 +1270,7 @@ public class SubmissionPublisher<T> implements Flow.Publisher<T>,
for (int c;;) {
if ((c = ctl) == DISABLED || (c & ACTIVE) == 0)
break;
if (U.compareAndSwapInt(this, CTL, c, c & ~ACTIVE)) {
if (CTL.compareAndSet(this, c, c & ~ACTIVE)) {
onError(ex);
break;
}
@ -1343,8 +1283,8 @@ public class SubmissionPublisher<T> implements Flow.Publisher<T>,
for (int c;;) {
if ((c = ctl) == DISABLED)
break;
if (U.compareAndSwapInt(this, CTL, c,
c | (ACTIVE | CONSUME | COMPLETE))) {
if (CTL.compareAndSet(this, c,
c | (ACTIVE | CONSUME | COMPLETE))) {
if ((c & ACTIVE) == 0)
startOrDisable();
break;
@ -1356,8 +1296,8 @@ public class SubmissionPublisher<T> implements Flow.Publisher<T>,
for (int c;;) {
if ((c = ctl) == DISABLED)
break;
if (U.compareAndSwapInt(this, CTL, c,
c | (ACTIVE | CONSUME | SUBSCRIBE))) {
if (CTL.compareAndSet(this, c,
c | (ACTIVE | CONSUME | SUBSCRIBE))) {
if ((c & ACTIVE) == 0)
startOrDisable();
break;
@ -1375,11 +1315,11 @@ public class SubmissionPublisher<T> implements Flow.Publisher<T>,
if ((c = ctl) == DISABLED)
break;
else if ((c & ACTIVE) != 0) {
if (U.compareAndSwapInt(this, CTL, c,
c | (CONSUME | ERROR)))
if (CTL.compareAndSet(this, c,
c | (CONSUME | ERROR)))
break;
}
else if (U.compareAndSwapInt(this, CTL, c, DISABLED)) {
else if (CTL.compareAndSet(this, c, DISABLED)) {
detach();
break;
}
@ -1395,19 +1335,18 @@ public class SubmissionPublisher<T> implements Flow.Publisher<T>,
long prev = demand, d;
if ((d = prev + n) < prev) // saturate
d = Long.MAX_VALUE;
if (U.compareAndSwapLong(this, DEMAND, prev, d)) {
if (DEMAND.compareAndSet(this, prev, d)) {
for (int c, h;;) {
if ((c = ctl) == DISABLED)
break;
else if ((c & ACTIVE) != 0) {
if ((c & CONSUME) != 0 ||
U.compareAndSwapInt(this, CTL, c,
c | CONSUME))
CTL.compareAndSet(this, c, c | CONSUME))
break;
}
else if ((h = head) != tail) {
if (U.compareAndSwapInt(this, CTL, c,
c | (ACTIVE|CONSUME))) {
if (CTL.compareAndSet(this, c,
c | (ACTIVE|CONSUME))) {
startOrDisable();
break;
}
@ -1476,16 +1415,14 @@ public class SubmissionPublisher<T> implements Flow.Publisher<T>,
if ((s = subscriber) != null) { // else disabled
for (;;) {
long d = demand;
int c; Object[] a; int n; long i; Object x; Thread w;
int c; Object[] a; int n, i; Object x; Thread w;
if (((c = ctl) & (ERROR | SUBSCRIBE | DISABLED)) != 0) {
if (!checkControl(s, c))
break;
}
else if ((a = array) == null || h == tail ||
(n = a.length) == 0 ||
(x = U.getObjectVolatile
(a, (i = ((long)((n - 1) & h) << ASHIFT) + ABASE)))
== null) {
(x = QA.getAcquire(a, i = (n - 1) & h)) == null) {
if (!checkEmpty(s, c))
break;
}
@ -1494,10 +1431,10 @@ public class SubmissionPublisher<T> implements Flow.Publisher<T>,
break;
}
else if (((c & CONSUME) != 0 ||
U.compareAndSwapInt(this, CTL, c, c | CONSUME)) &&
U.compareAndSwapObject(a, i, x, null)) {
U.putIntRelease(this, HEAD, ++h);
U.getAndAddLong(this, DEMAND, -1L);
CTL.compareAndSet(this, c, c | CONSUME)) &&
QA.compareAndSet(a, i, x, null)) {
HEAD.setRelease(this, ++h);
DEMAND.getAndAdd(this, -1L);
if ((w = waiter) != null)
signalWaiter(w);
try {
@ -1528,7 +1465,7 @@ public class SubmissionPublisher<T> implements Flow.Publisher<T>,
}
}
else if ((c & SUBSCRIBE) != 0) {
if (U.compareAndSwapInt(this, CTL, c, c & ~SUBSCRIBE)) {
if (CTL.compareAndSet(this, c, c & ~SUBSCRIBE)) {
try {
if (s != null)
s.onSubscribe(this);
@ -1551,9 +1488,9 @@ public class SubmissionPublisher<T> implements Flow.Publisher<T>,
boolean stat = true;
if (head == tail) {
if ((c & CONSUME) != 0)
U.compareAndSwapInt(this, CTL, c, c & ~CONSUME);
CTL.compareAndSet(this, c, c & ~CONSUME);
else if ((c & COMPLETE) != 0) {
if (U.compareAndSwapInt(this, CTL, c, DISABLED)) {
if (CTL.compareAndSet(this, c, DISABLED)) {
try {
if (s != null)
s.onComplete();
@ -1561,7 +1498,7 @@ public class SubmissionPublisher<T> implements Flow.Publisher<T>,
}
}
}
else if (U.compareAndSwapInt(this, CTL, c, c & ~ACTIVE))
else if (CTL.compareAndSet(this, c, c & ~ACTIVE))
stat = false;
}
return stat;
@ -1574,8 +1511,8 @@ public class SubmissionPublisher<T> implements Flow.Publisher<T>,
boolean stat = true;
if (demand == 0L) {
if ((c & CONSUME) != 0)
U.compareAndSwapInt(this, CTL, c, c & ~CONSUME);
else if (U.compareAndSwapInt(this, CTL, c, c & ~ACTIVE))
CTL.compareAndSet(this, c, c & ~CONSUME);
else if (CTL.compareAndSet(this, c, c & ~ACTIVE))
stat = false;
}
return stat;
@ -1595,31 +1532,25 @@ public class SubmissionPublisher<T> implements Flow.Publisher<T>,
onError(ex);
}
// Unsafe mechanics
private static final jdk.internal.misc.Unsafe U = jdk.internal.misc.Unsafe.getUnsafe();
private static final long CTL;
private static final long TAIL;
private static final long HEAD;
private static final long DEMAND;
private static final int ABASE;
private static final int ASHIFT;
// VarHandle mechanics
private static final VarHandle CTL;
private static final VarHandle TAIL;
private static final VarHandle HEAD;
private static final VarHandle DEMAND;
private static final VarHandle QA;
static {
try {
CTL = U.objectFieldOffset
(BufferedSubscription.class.getDeclaredField("ctl"));
TAIL = U.objectFieldOffset
(BufferedSubscription.class.getDeclaredField("tail"));
HEAD = U.objectFieldOffset
(BufferedSubscription.class.getDeclaredField("head"));
DEMAND = U.objectFieldOffset
(BufferedSubscription.class.getDeclaredField("demand"));
ABASE = U.arrayBaseOffset(Object[].class);
int scale = U.arrayIndexScale(Object[].class);
if ((scale & (scale - 1)) != 0)
throw new Error("data type scale not a power of two");
ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
MethodHandles.Lookup l = MethodHandles.lookup();
CTL = l.findVarHandle(BufferedSubscription.class, "ctl",
int.class);
TAIL = l.findVarHandle(BufferedSubscription.class, "tail",
int.class);
HEAD = l.findVarHandle(BufferedSubscription.class, "head",
int.class);
DEMAND = l.findVarHandle(BufferedSubscription.class, "demand",
long.class);
QA = MethodHandles.arrayElementVarHandle(Object[].class);
} catch (ReflectiveOperationException e) {
throw new Error(e);
}

View File

@ -36,6 +36,7 @@
package java.util.concurrent;
import java.io.ObjectStreamField;
import java.security.AccessControlContext;
import java.util.Random;
import java.util.Spliterator;
import java.util.concurrent.atomic.AtomicInteger;
@ -47,6 +48,7 @@ import java.util.stream.DoubleStream;
import java.util.stream.IntStream;
import java.util.stream.LongStream;
import java.util.stream.StreamSupport;
import jdk.internal.misc.Unsafe;
/**
* A random number generator isolated to the current thread. Like the
@ -95,7 +97,9 @@ public class ThreadLocalRandom extends Random {
* ThreadLocalRandom sequence. The dual use is a marriage of
* convenience, but is a simple and efficient way of reducing
* application-level overhead and footprint of most concurrent
* programs.
* programs. Even more opportunistically, we also define here
* other package-private utilities that access Thread class
* fields.
*
* Even though this class subclasses java.util.Random, it uses the
* same basic algorithm as java.util.SplittableRandom. (See its
@ -958,6 +962,49 @@ public class ThreadLocalRandom extends Random {
return r;
}
// Support for other package-private ThreadLocal access
/**
* Erases ThreadLocals by nulling out Thread maps.
*/
static final void eraseThreadLocals(Thread thread) {
U.putObject(thread, THREADLOCALS, null);
U.putObject(thread, INHERITABLETHREADLOCALS, null);
}
static final void setInheritedAccessControlContext(Thread thread,
AccessControlContext acc) {
U.putObjectRelease(thread, INHERITEDACCESSCONTROLCONTEXT, acc);
}
/**
* Returns a new group with the system ThreadGroup (the
* topmost, parent-less group) as parent. Uses Unsafe to
* traverse Thread.group and ThreadGroup.parent fields.
*/
static final ThreadGroup createThreadGroup(String name) {
if (name == null)
throw new NullPointerException();
try {
long tg = U.objectFieldOffset
(Thread.class.getDeclaredField("group"));
long gp = U.objectFieldOffset
(ThreadGroup.class.getDeclaredField("parent"));
ThreadGroup group = (ThreadGroup)
U.getObject(Thread.currentThread(), tg);
while (group != null) {
ThreadGroup parent = (ThreadGroup)U.getObject(group, gp);
if (parent == null)
return new ThreadGroup(group, name);
group = parent;
}
} catch (ReflectiveOperationException e) {
throw new Error(e);
}
// fall through if null as cannot-happen safeguard
throw new Error("Cannot create ThreadGroup");
}
// Serialization support
private static final long serialVersionUID = -5851777807851030925L;
@ -1022,10 +1069,13 @@ public class ThreadLocalRandom extends Random {
static final String BAD_SIZE = "size must be non-negative";
// Unsafe mechanics
private static final jdk.internal.misc.Unsafe U = jdk.internal.misc.Unsafe.getUnsafe();
private static final Unsafe U = Unsafe.getUnsafe();
private static final long SEED;
private static final long PROBE;
private static final long SECONDARY;
private static final long THREADLOCALS;
private static final long INHERITABLETHREADLOCALS;
private static final long INHERITEDACCESSCONTROLCONTEXT;
static {
try {
SEED = U.objectFieldOffset
@ -1034,6 +1084,12 @@ public class ThreadLocalRandom extends Random {
(Thread.class.getDeclaredField("threadLocalRandomProbe"));
SECONDARY = U.objectFieldOffset
(Thread.class.getDeclaredField("threadLocalRandomSecondarySeed"));
THREADLOCALS = U.objectFieldOffset
(Thread.class.getDeclaredField("threadLocals"));
INHERITABLETHREADLOCALS = U.objectFieldOffset
(Thread.class.getDeclaredField("inheritableThreadLocals"));
INHERITEDACCESSCONTROLCONTEXT = U.objectFieldOffset
(Thread.class.getDeclaredField("inheritedAccessControlContext"));
} catch (ReflectiveOperationException e) {
throw new Error(e);
}