mirror of
https://github.com/openjdk/jdk.git
synced 2026-04-08 05:58:38 +00:00
8029452: Fork/Join task ForEachOps.ForEachOrderedTask clarifications and minor improvements
Reviewed-by: mduigou, briangoetz
This commit is contained in:
parent
fa4d687935
commit
72803ca57e
@ -317,12 +317,55 @@ final class ForEachOps {
|
||||
*/
|
||||
@SuppressWarnings("serial")
|
||||
static final class ForEachOrderedTask<S, T> extends CountedCompleter<Void> {
|
||||
/*
|
||||
* Our goal is to ensure that the elements associated with a task are
|
||||
* processed according to an in-order traversal of the computation tree.
|
||||
* We use completion counts for representing these dependencies, so that
|
||||
* a task does not complete until all the tasks preceding it in this
|
||||
* order complete. We use the "completion map" to associate the next
|
||||
* task in this order for any left child. We increase the pending count
|
||||
* of any node on the right side of such a mapping by one to indicate
|
||||
* its dependency, and when a node on the left side of such a mapping
|
||||
* completes, it decrements the pending count of its corresponding right
|
||||
* side. As the computation tree is expanded by splitting, we must
|
||||
* atomically update the mappings to maintain the invariant that the
|
||||
* completion map maps left children to the next node in the in-order
|
||||
* traversal.
|
||||
*
|
||||
* Take, for example, the following computation tree of tasks:
|
||||
*
|
||||
* a
|
||||
* / \
|
||||
* b c
|
||||
* / \ / \
|
||||
* d e f g
|
||||
*
|
||||
* The complete map will contain (not necessarily all at the same time)
|
||||
* the following associations:
|
||||
*
|
||||
* d -> e
|
||||
* b -> f
|
||||
* f -> g
|
||||
*
|
||||
* Tasks e, f, g will have their pending counts increased by 1.
|
||||
*
|
||||
* The following relationships hold:
|
||||
*
|
||||
* - completion of d "happens-before" e;
|
||||
* - completion of d and e "happens-before b;
|
||||
* - completion of b "happens-before" f; and
|
||||
* - completion of f "happens-before" g
|
||||
*
|
||||
* Thus overall the "happens-before" relationship holds for the
|
||||
* reporting of elements, covered by tasks d, e, f and g, as specified
|
||||
* by the forEachOrdered operation.
|
||||
*/
|
||||
|
||||
private final PipelineHelper<T> helper;
|
||||
private Spliterator<S> spliterator;
|
||||
private final long targetSize;
|
||||
private final ConcurrentHashMap<ForEachOrderedTask<S, T>, ForEachOrderedTask<S, T>> completionMap;
|
||||
private final Sink<T> action;
|
||||
private final Object lock;
|
||||
private final ForEachOrderedTask<S, T> leftPredecessor;
|
||||
private Node<T> node;
|
||||
|
||||
@ -333,9 +376,9 @@ final class ForEachOps {
|
||||
this.helper = helper;
|
||||
this.spliterator = spliterator;
|
||||
this.targetSize = AbstractTask.suggestTargetSize(spliterator.estimateSize());
|
||||
this.completionMap = new ConcurrentHashMap<>();
|
||||
// Size map to avoid concurrent re-sizes
|
||||
this.completionMap = new ConcurrentHashMap<>(Math.max(16, AbstractTask.LEAF_TARGET << 1));
|
||||
this.action = action;
|
||||
this.lock = new Object();
|
||||
this.leftPredecessor = null;
|
||||
}
|
||||
|
||||
@ -348,7 +391,6 @@ final class ForEachOps {
|
||||
this.targetSize = parent.targetSize;
|
||||
this.completionMap = parent.completionMap;
|
||||
this.action = parent.action;
|
||||
this.lock = parent.lock;
|
||||
this.leftPredecessor = leftPredecessor;
|
||||
}
|
||||
|
||||
@ -367,16 +409,42 @@ final class ForEachOps {
|
||||
new ForEachOrderedTask<>(task, leftSplit, task.leftPredecessor);
|
||||
ForEachOrderedTask<S, T> rightChild =
|
||||
new ForEachOrderedTask<>(task, rightSplit, leftChild);
|
||||
|
||||
// Fork the parent task
|
||||
// Completion of the left and right children "happens-before"
|
||||
// completion of the parent
|
||||
task.addToPendingCount(1);
|
||||
// Completion of the left child "happens-before" completion of
|
||||
// the right child
|
||||
rightChild.addToPendingCount(1);
|
||||
task.completionMap.put(leftChild, rightChild);
|
||||
task.addToPendingCount(1); // forking
|
||||
rightChild.addToPendingCount(1); // right pending on left child
|
||||
|
||||
// If task is not on the left spine
|
||||
if (task.leftPredecessor != null) {
|
||||
leftChild.addToPendingCount(1); // left pending on previous subtree, except left spine
|
||||
if (task.completionMap.replace(task.leftPredecessor, task, leftChild))
|
||||
task.addToPendingCount(-1); // transfer my "right child" count to my left child
|
||||
else
|
||||
leftChild.addToPendingCount(-1); // left child is ready to go when ready
|
||||
/*
|
||||
* Completion of left-predecessor, or left subtree,
|
||||
* "happens-before" completion of left-most leaf node of
|
||||
* right subtree.
|
||||
* The left child's pending count needs to be updated before
|
||||
* it is associated in the completion map, otherwise the
|
||||
* left child can complete prematurely and violate the
|
||||
* "happens-before" constraint.
|
||||
*/
|
||||
leftChild.addToPendingCount(1);
|
||||
// Update association of left-predecessor to left-most
|
||||
// leaf node of right subtree
|
||||
if (task.completionMap.replace(task.leftPredecessor, task, leftChild)) {
|
||||
// If replaced, adjust the pending count of the parent
|
||||
// to complete when its children complete
|
||||
task.addToPendingCount(-1);
|
||||
} else {
|
||||
// Left-predecessor has already completed, parent's
|
||||
// pending count is adjusted by left-predecessor;
|
||||
// left child is ready to complete
|
||||
leftChild.addToPendingCount(-1);
|
||||
}
|
||||
}
|
||||
|
||||
ForEachOrderedTask<S, T> taskToFork;
|
||||
if (forkRight) {
|
||||
forkRight = false;
|
||||
@ -391,31 +459,47 @@ final class ForEachOps {
|
||||
}
|
||||
taskToFork.fork();
|
||||
}
|
||||
if (task.getPendingCount() == 0) {
|
||||
task.helper.wrapAndCopyInto(task.action, rightSplit);
|
||||
}
|
||||
else {
|
||||
|
||||
/*
|
||||
* Task's pending count is either 0 or 1. If 1 then the completion
|
||||
* map will contain a value that is task, and two calls to
|
||||
* tryComplete are required for completion, one below and one
|
||||
* triggered by the completion of task's left-predecessor in
|
||||
* onCompletion. Therefore there is no data race within the if
|
||||
* block.
|
||||
*/
|
||||
if (task.getPendingCount() > 0) {
|
||||
// Cannot complete just yet so buffer elements into a Node
|
||||
// for use when completion occurs
|
||||
Node.Builder<T> nb = task.helper.makeNodeBuilder(
|
||||
task.helper.exactOutputSizeIfKnown(rightSplit),
|
||||
size -> (T[]) new Object[size]);
|
||||
task.helper.exactOutputSizeIfKnown(rightSplit),
|
||||
size -> (T[]) new Object[size]);
|
||||
task.node = task.helper.wrapAndCopyInto(nb, rightSplit).build();
|
||||
task.spliterator = null;
|
||||
}
|
||||
task.tryComplete();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onCompletion(CountedCompleter<?> caller) {
|
||||
spliterator = null;
|
||||
if (node != null) {
|
||||
// Dump any data from this leaf into the sink
|
||||
synchronized (lock) {
|
||||
node.forEach(action);
|
||||
}
|
||||
// Dump buffered elements from this leaf into the sink
|
||||
node.forEach(action);
|
||||
node = null;
|
||||
}
|
||||
ForEachOrderedTask<S, T> victim = completionMap.remove(this);
|
||||
if (victim != null)
|
||||
victim.tryComplete();
|
||||
else if (spliterator != null) {
|
||||
// Dump elements output from this leaf's pipeline into the sink
|
||||
helper.wrapAndCopyInto(action, spliterator);
|
||||
spliterator = null;
|
||||
}
|
||||
|
||||
// The completion of this task *and* the dumping of elements
|
||||
// "happens-before" completion of the associated left-most leaf task
|
||||
// of right subtree (if any, which can be this task's right sibling)
|
||||
//
|
||||
ForEachOrderedTask<S, T> leftDescendant = completionMap.remove(this);
|
||||
if (leftDescendant != null)
|
||||
leftDescendant.tryComplete();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user