mirror of
https://github.com/openjdk/jdk.git
synced 2026-02-19 14:55:17 +00:00
8012987: Optimizations for Stream.limit/substream
Co-authored-by: Brian Goetz <brian.goetz@oracle.com> Reviewed-by: mduigou
This commit is contained in:
parent
411df5ec90
commit
27da77ce6b
@ -375,6 +375,12 @@ abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>>
|
||||
// NOTE: there are no size-injecting ops
|
||||
if (StreamOpFlag.SHORT_CIRCUIT.isKnown(thisOpFlags)) {
|
||||
backPropagationHead = p;
|
||||
// Clear the short circuit flag for next pipeline stage
|
||||
// This stage encapsulates short-circuiting, the next
|
||||
// stage may not have any short-circuit operations, and
|
||||
// if so spliterator.forEachRemaining should be be used
|
||||
// for traversal
|
||||
thisOpFlags = thisOpFlags & ~StreamOpFlag.IS_SHORT_CIRCUIT;
|
||||
}
|
||||
|
||||
depth = 0;
|
||||
@ -447,6 +453,15 @@ abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>>
|
||||
|
||||
// PipelineHelper
|
||||
|
||||
@Override
|
||||
final StreamShape getSourceShape() {
|
||||
AbstractPipeline p = AbstractPipeline.this;
|
||||
while (p.depth > 0) {
|
||||
p = p.previousStage;
|
||||
}
|
||||
return p.getOutputShape();
|
||||
}
|
||||
|
||||
@Override
|
||||
final <P_IN> long exactOutputSizeIfKnown(Spliterator<P_IN> spliterator) {
|
||||
return StreamOpFlag.SIZED.isKnown(getStreamAndOpFlags()) ? spliterator.getExactSizeIfKnown() : -1;
|
||||
@ -502,6 +517,16 @@ abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>>
|
||||
return (Sink<P_IN>) sink;
|
||||
}
|
||||
|
||||
@Override
|
||||
final <P_IN> Spliterator<E_OUT> wrapSpliterator(Spliterator<P_IN> sourceSpliterator) {
|
||||
if (depth == 0) {
|
||||
return (Spliterator<E_OUT>) sourceSpliterator;
|
||||
}
|
||||
else {
|
||||
return wrap(this, () -> sourceSpliterator, isParallel());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
final <P_IN> Node<E_OUT> evaluate(Spliterator<P_IN> spliterator,
|
||||
|
||||
@ -316,6 +316,7 @@ abstract class AbstractTask<P_IN, P_OUT, R,
|
||||
else {
|
||||
K l = task.leftChild = task.makeChild(split);
|
||||
K r = task.rightChild = task.makeChild(task.spliterator);
|
||||
task.spliterator = null;
|
||||
task.setPendingCount(1);
|
||||
l.fork();
|
||||
task = r;
|
||||
|
||||
@ -743,14 +743,7 @@ public interface DoubleStream extends BaseStream<Double, DoubleStream> {
|
||||
*/
|
||||
public static DoubleStream generate(DoubleSupplier s) {
|
||||
Objects.requireNonNull(s);
|
||||
return StreamSupport.doubleStream(Spliterators.spliteratorUnknownSize(
|
||||
new PrimitiveIterator.OfDouble() {
|
||||
@Override
|
||||
public boolean hasNext() { return true; }
|
||||
|
||||
@Override
|
||||
public double nextDouble() { return s.getAsDouble(); }
|
||||
},
|
||||
Spliterator.ORDERED | Spliterator.IMMUTABLE | Spliterator.NONNULL));
|
||||
return StreamSupport.doubleStream(
|
||||
new StreamSpliterators.InfiniteSupplyingSpliterator.OfDouble(Long.MAX_VALUE, s));
|
||||
}
|
||||
}
|
||||
|
||||
@ -342,7 +342,7 @@ final class ForEachOps {
|
||||
doCompute(this);
|
||||
}
|
||||
|
||||
private static<S, T> void doCompute(ForEachOrderedTask<S, T> task) {
|
||||
private static <S, T> void doCompute(ForEachOrderedTask<S, T> task) {
|
||||
while (true) {
|
||||
Spliterator<S> split;
|
||||
if (!AbstractTask.suggestSplit(task.spliterator, task.targetSize)
|
||||
|
||||
@ -745,15 +745,8 @@ public interface IntStream extends BaseStream<Integer, IntStream> {
|
||||
*/
|
||||
public static IntStream generate(IntSupplier s) {
|
||||
Objects.requireNonNull(s);
|
||||
return StreamSupport.intStream(Spliterators.spliteratorUnknownSize(
|
||||
new PrimitiveIterator.OfInt() {
|
||||
@Override
|
||||
public boolean hasNext() { return true; }
|
||||
|
||||
@Override
|
||||
public int nextInt() { return s.getAsInt(); }
|
||||
},
|
||||
Spliterator.ORDERED | Spliterator.IMMUTABLE | Spliterator.NONNULL));
|
||||
return StreamSupport.intStream(
|
||||
new StreamSpliterators.InfiniteSupplyingSpliterator.OfInt(Long.MAX_VALUE, s));
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@ -736,15 +736,8 @@ public interface LongStream extends BaseStream<Long, LongStream> {
|
||||
*/
|
||||
public static LongStream generate(LongSupplier s) {
|
||||
Objects.requireNonNull(s);
|
||||
return StreamSupport.longStream(Spliterators.spliteratorUnknownSize(
|
||||
new PrimitiveIterator.OfLong() {
|
||||
@Override
|
||||
public boolean hasNext() { return true; }
|
||||
|
||||
@Override
|
||||
public long nextLong() { return s.getAsLong(); }
|
||||
},
|
||||
Spliterator.ORDERED | Spliterator.IMMUTABLE | Spliterator.NONNULL));
|
||||
return StreamSupport.longStream(
|
||||
new StreamSpliterators.InfiniteSupplyingSpliterator.OfLong(Long.MAX_VALUE, s));
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@ -44,7 +44,7 @@ import java.util.function.IntFunction;
|
||||
* and {@link AbstractPipeline#opEvaluateParallel(PipelineHelper, java.util.Spliterator,
|
||||
* java.util.function.IntFunction)}, methods, which can use the
|
||||
* {@code PipelineHelper} to access information about the pipeline such as
|
||||
* input shape, output shape, stream flags, and size, and use the helper methods
|
||||
* head shape, stream flags, and size, and use the helper methods
|
||||
* such as {@link #wrapAndCopyInto(Sink, Spliterator)},
|
||||
* {@link #copyInto(Sink, Spliterator)}, and {@link #wrapSink(Sink)} to execute
|
||||
* pipeline operations.
|
||||
@ -54,6 +54,13 @@ import java.util.function.IntFunction;
|
||||
*/
|
||||
abstract class PipelineHelper<P_OUT> {
|
||||
|
||||
/**
|
||||
* Gets the stream shape for the source of the pipeline segment.
|
||||
*
|
||||
* @return the stream shape for the source of the pipeline segment.
|
||||
*/
|
||||
abstract StreamShape getSourceShape();
|
||||
|
||||
/**
|
||||
* Gets the combined stream and operation flags for the output of the described
|
||||
* pipeline. This will incorporate stream flags from the stream source, all
|
||||
@ -145,6 +152,14 @@ abstract class PipelineHelper<P_OUT> {
|
||||
*/
|
||||
abstract<P_IN> Sink<P_IN> wrapSink(Sink<P_OUT> sink);
|
||||
|
||||
/**
|
||||
*
|
||||
* @param spliterator
|
||||
* @param <P_IN>
|
||||
* @return
|
||||
*/
|
||||
abstract<P_IN> Spliterator<P_OUT> wrapSpliterator(Spliterator<P_IN> spliterator);
|
||||
|
||||
/**
|
||||
* Constructs a @{link Node.Builder} compatible with the output shape of
|
||||
* this {@code PipelineHelper}.
|
||||
|
||||
@ -24,14 +24,9 @@
|
||||
*/
|
||||
package java.util.stream;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Spliterator;
|
||||
import java.util.concurrent.CountedCompleter;
|
||||
import java.util.function.DoubleConsumer;
|
||||
import java.util.function.IntConsumer;
|
||||
import java.util.function.IntFunction;
|
||||
import java.util.function.LongConsumer;
|
||||
|
||||
/**
|
||||
* Factory for instances of a short-circuiting stateful intermediate operations
|
||||
@ -44,6 +39,63 @@ final class SliceOps {
|
||||
// No instances
|
||||
private SliceOps() { }
|
||||
|
||||
/**
|
||||
* Calculates the sliced size given the current size, number of elements
|
||||
* skip, and the number of elements to limit.
|
||||
*
|
||||
* @param size the current size
|
||||
* @param skip the number of elements to skip, assumed to be >= 0
|
||||
* @param limit the number of elements to limit, assumed to be >= 0, with
|
||||
* a value of {@code Long.MAX_VALUE} if there is no limit
|
||||
* @return the sliced size
|
||||
*/
|
||||
private static long calcSize(long size, long skip, long limit) {
|
||||
return size >= 0 ? Math.max(-1, Math.min(size - skip, limit)) : -1;
|
||||
}
|
||||
|
||||
/**
|
||||
* Calculates the slice fence, which is one past the index of the slice
|
||||
* range
|
||||
* @param skip the number of elements to skip, assumed to be >= 0
|
||||
* @param limit the number of elements to limit, assumed to be >= 0, with
|
||||
* a value of {@code Long.MAX_VALUE} if there is no limit
|
||||
* @return the slice fence.
|
||||
*/
|
||||
private static long calcSliceFence(long skip, long limit) {
|
||||
long sliceFence = limit >= 0 ? skip + limit : Long.MAX_VALUE;
|
||||
// Check for overflow
|
||||
return (sliceFence >= 0) ? sliceFence : Long.MAX_VALUE;
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a slice spliterator given a stream shape governing the
|
||||
* spliterator type. Requires that the underlying Spliterator
|
||||
* be SUBSIZED.
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
private static <P_IN> Spliterator<P_IN> sliceSpliterator(StreamShape shape,
|
||||
Spliterator<P_IN> s,
|
||||
long skip, long limit) {
|
||||
assert s.hasCharacteristics(Spliterator.SUBSIZED);
|
||||
long sliceFence = calcSliceFence(skip, limit);
|
||||
switch (shape) {
|
||||
case REFERENCE:
|
||||
return new StreamSpliterators
|
||||
.SliceSpliterator.OfRef<>(s, skip, sliceFence);
|
||||
case INT_VALUE:
|
||||
return (Spliterator<P_IN>) new StreamSpliterators
|
||||
.SliceSpliterator.OfInt((Spliterator.OfInt) s, skip, sliceFence);
|
||||
case LONG_VALUE:
|
||||
return (Spliterator<P_IN>) new StreamSpliterators
|
||||
.SliceSpliterator.OfLong((Spliterator.OfLong) s, skip, sliceFence);
|
||||
case DOUBLE_VALUE:
|
||||
return (Spliterator<P_IN>) new StreamSpliterators
|
||||
.SliceSpliterator.OfDouble((Spliterator.OfDouble) s, skip, sliceFence);
|
||||
default:
|
||||
throw new IllegalStateException("Unknown shape " + shape);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Appends a "slice" operation to the provided stream. The slice operation
|
||||
* may be may be skip-only, limit-only, or skip-and-limit.
|
||||
@ -61,11 +113,71 @@ final class SliceOps {
|
||||
|
||||
return new ReferencePipeline.StatefulOp<T,T>(upstream, StreamShape.REFERENCE,
|
||||
flags(limit)) {
|
||||
Spliterator<T> unorderedSkipLimitSpliterator(Spliterator<T> s,
|
||||
long skip, long limit, long sizeIfKnown) {
|
||||
if (skip <= sizeIfKnown) {
|
||||
// Use just the limit if the number of elements
|
||||
// to skip is <= the known pipeline size
|
||||
limit = limit >= 0 ? Math.min(limit, sizeIfKnown - skip) : sizeIfKnown - skip;
|
||||
skip = 0;
|
||||
}
|
||||
return new StreamSpliterators.UnorderedSliceSpliterator.OfRef<>(s, skip, limit);
|
||||
}
|
||||
|
||||
@Override
|
||||
<P_IN> Spliterator<T> opEvaluateParallelLazy(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) {
|
||||
long size = helper.exactOutputSizeIfKnown(spliterator);
|
||||
if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) {
|
||||
return new StreamSpliterators.SliceSpliterator.OfRef<>(
|
||||
helper.wrapSpliterator(spliterator),
|
||||
skip,
|
||||
calcSliceFence(skip, limit));
|
||||
} else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
|
||||
return unorderedSkipLimitSpliterator(
|
||||
helper.wrapSpliterator(spliterator),
|
||||
skip, limit, size);
|
||||
}
|
||||
else {
|
||||
// @@@ OOMEs will occur for LongStream.longs().filter(i -> true).limit(n)
|
||||
// regardless of the value of n
|
||||
// Need to adjust the target size of splitting for the
|
||||
// SliceTask from say (size / k) to say min(size / k, 1 << 14)
|
||||
// This will limit the size of the buffers created at the leaf nodes
|
||||
// cancellation will be more aggressive cancelling later tasks
|
||||
// if the target slice size has been reached from a given task,
|
||||
// cancellation should also clear local results if any
|
||||
return new SliceTask<>(this, helper, spliterator, i -> (T[]) new Object[i], skip, limit).
|
||||
invoke().spliterator();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
<P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper,
|
||||
Spliterator<P_IN> spliterator,
|
||||
IntFunction<T[]> generator) {
|
||||
return new SliceTask<>(this, helper, spliterator, generator, skip, limit).invoke();
|
||||
long size = helper.exactOutputSizeIfKnown(spliterator);
|
||||
if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) {
|
||||
// Because the pipeline is SIZED the slice spliterator
|
||||
// can be created from the source, this requires matching
|
||||
// to shape of the source, and is potentially more efficient
|
||||
// than creating the slice spliterator from the pipeline
|
||||
// wrapping spliterator
|
||||
Spliterator<P_IN> s = sliceSpliterator(helper.getSourceShape(), spliterator, skip, limit);
|
||||
return Nodes.collect(helper, s, true, generator);
|
||||
} else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
|
||||
Spliterator<T> s = unorderedSkipLimitSpliterator(
|
||||
helper.wrapSpliterator(spliterator),
|
||||
skip, limit, size);
|
||||
// Collect using this pipeline, which is empty and therefore
|
||||
// can be used with the pipeline wrapping spliterator
|
||||
// Note that we cannot create a slice spliterator from
|
||||
// the source spliterator if the pipeline is not SIZED
|
||||
return Nodes.collect(this, s, true, generator);
|
||||
}
|
||||
else {
|
||||
return new SliceTask<>(this, helper, spliterator, generator, skip, limit).
|
||||
invoke();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -74,6 +186,11 @@ final class SliceOps {
|
||||
long n = skip;
|
||||
long m = limit >= 0 ? limit : Long.MAX_VALUE;
|
||||
|
||||
@Override
|
||||
public void begin(long size) {
|
||||
downstream.begin(calcSize(size, skip, m));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void accept(T t) {
|
||||
if (n == 0) {
|
||||
@ -112,11 +229,64 @@ final class SliceOps {
|
||||
|
||||
return new IntPipeline.StatefulOp<Integer>(upstream, StreamShape.INT_VALUE,
|
||||
flags(limit)) {
|
||||
Spliterator.OfInt unorderedSkipLimitSpliterator(
|
||||
Spliterator.OfInt s, long skip, long limit, long sizeIfKnown) {
|
||||
if (skip <= sizeIfKnown) {
|
||||
// Use just the limit if the number of elements
|
||||
// to skip is <= the known pipeline size
|
||||
limit = limit >= 0 ? Math.min(limit, sizeIfKnown - skip) : sizeIfKnown - skip;
|
||||
skip = 0;
|
||||
}
|
||||
return new StreamSpliterators.UnorderedSliceSpliterator.OfInt(s, skip, limit);
|
||||
}
|
||||
|
||||
@Override
|
||||
<P_IN> Spliterator<Integer> opEvaluateParallelLazy(PipelineHelper<Integer> helper,
|
||||
Spliterator<P_IN> spliterator) {
|
||||
long size = helper.exactOutputSizeIfKnown(spliterator);
|
||||
if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) {
|
||||
return new StreamSpliterators.SliceSpliterator.OfInt(
|
||||
(Spliterator.OfInt) helper.wrapSpliterator(spliterator),
|
||||
skip,
|
||||
calcSliceFence(skip, limit));
|
||||
} else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
|
||||
return unorderedSkipLimitSpliterator(
|
||||
(Spliterator.OfInt) helper.wrapSpliterator(spliterator),
|
||||
skip, limit, size);
|
||||
}
|
||||
else {
|
||||
return new SliceTask<>(this, helper, spliterator, Integer[]::new, skip, limit).
|
||||
invoke().spliterator();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
<P_IN> Node<Integer> opEvaluateParallel(PipelineHelper<Integer> helper,
|
||||
Spliterator<P_IN> spliterator,
|
||||
IntFunction<Integer[]> generator) {
|
||||
return new SliceTask<>(this, helper, spliterator, generator, skip, limit).invoke();
|
||||
long size = helper.exactOutputSizeIfKnown(spliterator);
|
||||
if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) {
|
||||
// Because the pipeline is SIZED the slice spliterator
|
||||
// can be created from the source, this requires matching
|
||||
// to shape of the source, and is potentially more efficient
|
||||
// than creating the slice spliterator from the pipeline
|
||||
// wrapping spliterator
|
||||
Spliterator<P_IN> s = sliceSpliterator(helper.getSourceShape(), spliterator, skip, limit);
|
||||
return Nodes.collectInt(helper, s, true);
|
||||
} else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
|
||||
Spliterator.OfInt s = unorderedSkipLimitSpliterator(
|
||||
(Spliterator.OfInt) helper.wrapSpliterator(spliterator),
|
||||
skip, limit, size);
|
||||
// Collect using this pipeline, which is empty and therefore
|
||||
// can be used with the pipeline wrapping spliterator
|
||||
// Note that we cannot create a slice spliterator from
|
||||
// the source spliterator if the pipeline is not SIZED
|
||||
return Nodes.collectInt(this, s, true);
|
||||
}
|
||||
else {
|
||||
return new SliceTask<>(this, helper, spliterator, generator, skip, limit).
|
||||
invoke();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -125,6 +295,11 @@ final class SliceOps {
|
||||
long n = skip;
|
||||
long m = limit >= 0 ? limit : Long.MAX_VALUE;
|
||||
|
||||
@Override
|
||||
public void begin(long size) {
|
||||
downstream.begin(calcSize(size, skip, m));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void accept(int t) {
|
||||
if (n == 0) {
|
||||
@ -163,11 +338,64 @@ final class SliceOps {
|
||||
|
||||
return new LongPipeline.StatefulOp<Long>(upstream, StreamShape.LONG_VALUE,
|
||||
flags(limit)) {
|
||||
Spliterator.OfLong unorderedSkipLimitSpliterator(
|
||||
Spliterator.OfLong s, long skip, long limit, long sizeIfKnown) {
|
||||
if (skip <= sizeIfKnown) {
|
||||
// Use just the limit if the number of elements
|
||||
// to skip is <= the known pipeline size
|
||||
limit = limit >= 0 ? Math.min(limit, sizeIfKnown - skip) : sizeIfKnown - skip;
|
||||
skip = 0;
|
||||
}
|
||||
return new StreamSpliterators.UnorderedSliceSpliterator.OfLong(s, skip, limit);
|
||||
}
|
||||
|
||||
@Override
|
||||
<P_IN> Spliterator<Long> opEvaluateParallelLazy(PipelineHelper<Long> helper,
|
||||
Spliterator<P_IN> spliterator) {
|
||||
long size = helper.exactOutputSizeIfKnown(spliterator);
|
||||
if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) {
|
||||
return new StreamSpliterators.SliceSpliterator.OfLong(
|
||||
(Spliterator.OfLong) helper.wrapSpliterator(spliterator),
|
||||
skip,
|
||||
calcSliceFence(skip, limit));
|
||||
} else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
|
||||
return unorderedSkipLimitSpliterator(
|
||||
(Spliterator.OfLong) helper.wrapSpliterator(spliterator),
|
||||
skip, limit, size);
|
||||
}
|
||||
else {
|
||||
return new SliceTask<>(this, helper, spliterator, Long[]::new, skip, limit).
|
||||
invoke().spliterator();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
<P_IN> Node<Long> opEvaluateParallel(PipelineHelper<Long> helper,
|
||||
Spliterator<P_IN> spliterator,
|
||||
IntFunction<Long[]> generator) {
|
||||
return new SliceTask<>(this, helper, spliterator, generator, skip, limit).invoke();
|
||||
long size = helper.exactOutputSizeIfKnown(spliterator);
|
||||
if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) {
|
||||
// Because the pipeline is SIZED the slice spliterator
|
||||
// can be created from the source, this requires matching
|
||||
// to shape of the source, and is potentially more efficient
|
||||
// than creating the slice spliterator from the pipeline
|
||||
// wrapping spliterator
|
||||
Spliterator<P_IN> s = sliceSpliterator(helper.getSourceShape(), spliterator, skip, limit);
|
||||
return Nodes.collectLong(helper, s, true);
|
||||
} else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
|
||||
Spliterator.OfLong s = unorderedSkipLimitSpliterator(
|
||||
(Spliterator.OfLong) helper.wrapSpliterator(spliterator),
|
||||
skip, limit, size);
|
||||
// Collect using this pipeline, which is empty and therefore
|
||||
// can be used with the pipeline wrapping spliterator
|
||||
// Note that we cannot create a slice spliterator from
|
||||
// the source spliterator if the pipeline is not SIZED
|
||||
return Nodes.collectLong(this, s, true);
|
||||
}
|
||||
else {
|
||||
return new SliceTask<>(this, helper, spliterator, generator, skip, limit).
|
||||
invoke();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -176,6 +404,11 @@ final class SliceOps {
|
||||
long n = skip;
|
||||
long m = limit >= 0 ? limit : Long.MAX_VALUE;
|
||||
|
||||
@Override
|
||||
public void begin(long size) {
|
||||
downstream.begin(calcSize(size, skip, m));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void accept(long t) {
|
||||
if (n == 0) {
|
||||
@ -214,11 +447,64 @@ final class SliceOps {
|
||||
|
||||
return new DoublePipeline.StatefulOp<Double>(upstream, StreamShape.DOUBLE_VALUE,
|
||||
flags(limit)) {
|
||||
Spliterator.OfDouble unorderedSkipLimitSpliterator(
|
||||
Spliterator.OfDouble s, long skip, long limit, long sizeIfKnown) {
|
||||
if (skip <= sizeIfKnown) {
|
||||
// Use just the limit if the number of elements
|
||||
// to skip is <= the known pipeline size
|
||||
limit = limit >= 0 ? Math.min(limit, sizeIfKnown - skip) : sizeIfKnown - skip;
|
||||
skip = 0;
|
||||
}
|
||||
return new StreamSpliterators.UnorderedSliceSpliterator.OfDouble(s, skip, limit);
|
||||
}
|
||||
|
||||
@Override
|
||||
<P_IN> Spliterator<Double> opEvaluateParallelLazy(PipelineHelper<Double> helper,
|
||||
Spliterator<P_IN> spliterator) {
|
||||
long size = helper.exactOutputSizeIfKnown(spliterator);
|
||||
if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) {
|
||||
return new StreamSpliterators.SliceSpliterator.OfDouble(
|
||||
(Spliterator.OfDouble) helper.wrapSpliterator(spliterator),
|
||||
skip,
|
||||
calcSliceFence(skip, limit));
|
||||
} else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
|
||||
return unorderedSkipLimitSpliterator(
|
||||
(Spliterator.OfDouble) helper.wrapSpliterator(spliterator),
|
||||
skip, limit, size);
|
||||
}
|
||||
else {
|
||||
return new SliceTask<>(this, helper, spliterator, Double[]::new, skip, limit).
|
||||
invoke().spliterator();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
<P_IN> Node<Double> opEvaluateParallel(PipelineHelper<Double> helper,
|
||||
Spliterator<P_IN> spliterator,
|
||||
IntFunction<Double[]> generator) {
|
||||
return new SliceTask<>(this, helper, spliterator, generator, skip, limit).invoke();
|
||||
long size = helper.exactOutputSizeIfKnown(spliterator);
|
||||
if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) {
|
||||
// Because the pipeline is SIZED the slice spliterator
|
||||
// can be created from the source, this requires matching
|
||||
// to shape of the source, and is potentially more efficient
|
||||
// than creating the slice spliterator from the pipeline
|
||||
// wrapping spliterator
|
||||
Spliterator<P_IN> s = sliceSpliterator(helper.getSourceShape(), spliterator, skip, limit);
|
||||
return Nodes.collectDouble(helper, s, true);
|
||||
} else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
|
||||
Spliterator.OfDouble s = unorderedSkipLimitSpliterator(
|
||||
(Spliterator.OfDouble) helper.wrapSpliterator(spliterator),
|
||||
skip, limit, size);
|
||||
// Collect using this pipeline, which is empty and therefore
|
||||
// can be used with the pipeline wrapping spliterator
|
||||
// Note that we cannot create a slice spliterator from
|
||||
// the source spliterator if the pipeline is not SIZED
|
||||
return Nodes.collectDouble(this, s, true);
|
||||
}
|
||||
else {
|
||||
return new SliceTask<>(this, helper, spliterator, generator, skip, limit).
|
||||
invoke();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -227,6 +513,11 @@ final class SliceOps {
|
||||
long n = skip;
|
||||
long m = limit >= 0 ? limit : Long.MAX_VALUE;
|
||||
|
||||
@Override
|
||||
public void begin(long size) {
|
||||
downstream.begin(calcSize(size, skip, m));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void accept(double t) {
|
||||
if (n == 0) {
|
||||
@ -253,20 +544,6 @@ final class SliceOps {
|
||||
return StreamOpFlag.NOT_SIZED | ((limit != -1) ? StreamOpFlag.IS_SHORT_CIRCUIT : 0);
|
||||
}
|
||||
|
||||
// Parallel strategy -- two cases
|
||||
// IF we have full size information
|
||||
// - decompose, keeping track of each leaf's (offset, size)
|
||||
// - calculate leaf only if intersection between (offset, size) and desired slice
|
||||
// - Construct a Node containing the appropriate sections of the appropriate leaves
|
||||
// IF we don't
|
||||
// - decompose, and calculate size of each leaf
|
||||
// - on complete of any node, compute completed initial size from the root, and if big enough, cancel later nodes
|
||||
// - @@@ this can be significantly improved
|
||||
|
||||
// @@@ Currently we don't do the sized version at all
|
||||
|
||||
// @@@ Should take into account ORDERED flag; if not ORDERED, we can limit in temporal order instead
|
||||
|
||||
/**
|
||||
* {@code ForkJoinTask} implementing slice computation.
|
||||
*
|
||||
@ -319,19 +596,18 @@ final class SliceOps {
|
||||
? op.exactOutputSizeIfKnown(spliterator)
|
||||
: -1;
|
||||
final Node.Builder<P_OUT> nb = op.makeNodeBuilder(sizeIfKnown, generator);
|
||||
Sink<P_OUT> opSink = op.opWrapSink(op.sourceOrOpFlags, nb);
|
||||
|
||||
if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(op.sourceOrOpFlags))
|
||||
helper.wrapAndCopyInto(opSink, spliterator);
|
||||
else
|
||||
helper.copyIntoWithCancel(helper.wrapSink(opSink), spliterator);
|
||||
return nb.build();
|
||||
Sink<P_OUT> opSink = op.opWrapSink(helper.getStreamAndOpFlags(), nb);
|
||||
helper.copyIntoWithCancel(helper.wrapSink(opSink), spliterator);
|
||||
// It is necessary to truncate here since the result at the root
|
||||
// can only be set once
|
||||
return doTruncate(nb.build());
|
||||
}
|
||||
else {
|
||||
Node<P_OUT> node = helper.wrapAndCopyInto(helper.makeNodeBuilder(-1, generator),
|
||||
spliterator).build();
|
||||
spliterator).build();
|
||||
thisNodeSize = node.count();
|
||||
completed = true;
|
||||
spliterator = null;
|
||||
return node;
|
||||
}
|
||||
}
|
||||
@ -339,198 +615,95 @@ final class SliceOps {
|
||||
@Override
|
||||
public final void onCompletion(CountedCompleter<?> caller) {
|
||||
if (!isLeaf()) {
|
||||
Node<P_OUT> result;
|
||||
thisNodeSize = leftChild.thisNodeSize + rightChild.thisNodeSize;
|
||||
completed = true;
|
||||
|
||||
if (isRoot()) {
|
||||
// Only collect nodes once absolute size information is known
|
||||
|
||||
ArrayList<Node<P_OUT>> nodes = new ArrayList<>();
|
||||
visit(nodes, 0);
|
||||
Node<P_OUT> result;
|
||||
if (nodes.size() == 0)
|
||||
result = Nodes.emptyNode(op.getOutputShape());
|
||||
else if (nodes.size() == 1)
|
||||
result = nodes.get(0);
|
||||
else
|
||||
// This will create a tree of depth 1 and will not be a sub-tree
|
||||
// for leaf nodes within the require range
|
||||
result = conc(op.getOutputShape(), nodes);
|
||||
setLocalResult(result);
|
||||
if (canceled) {
|
||||
thisNodeSize = 0;
|
||||
result = getEmptyResult();
|
||||
}
|
||||
else if (thisNodeSize == 0)
|
||||
result = getEmptyResult();
|
||||
else if (leftChild.thisNodeSize == 0)
|
||||
result = rightChild.getLocalResult();
|
||||
else {
|
||||
result = Nodes.conc(op.getOutputShape(),
|
||||
leftChild.getLocalResult(), rightChild.getLocalResult());
|
||||
}
|
||||
setLocalResult(isRoot() ? doTruncate(result) : result);
|
||||
completed = true;
|
||||
}
|
||||
if (targetSize >= 0) {
|
||||
if (((SliceTask<P_IN, P_OUT>) getRoot()).leftSize() >= targetOffset + targetSize)
|
||||
if (targetSize >= 0
|
||||
&& !isRoot()
|
||||
&& isLeftCompleted(targetOffset + targetSize))
|
||||
cancelLaterNodes();
|
||||
}
|
||||
// Don't call super.onCompletion(), we don't look at the child nodes until farther up the tree
|
||||
|
||||
super.onCompletion(caller);
|
||||
}
|
||||
|
||||
/** Compute the cumulative size of the longest leading prefix of completed children */
|
||||
private long leftSize() {
|
||||
@Override
|
||||
protected void cancel() {
|
||||
super.cancel();
|
||||
if (completed)
|
||||
setLocalResult(getEmptyResult());
|
||||
}
|
||||
|
||||
private Node<P_OUT> doTruncate(Node<P_OUT> input) {
|
||||
long to = targetSize >= 0 ? Math.min(input.count(), targetOffset + targetSize) : thisNodeSize;
|
||||
return input.truncate(targetOffset, to, generator);
|
||||
}
|
||||
|
||||
/**
|
||||
* Determine if the number of completed elements in this node and nodes
|
||||
* to the left of this node is greater than or equal to the target size.
|
||||
*
|
||||
* @param target the target size
|
||||
* @return true if the number of elements is greater than or equal to
|
||||
* the target size, otherwise false.
|
||||
*/
|
||||
private boolean isLeftCompleted(long target) {
|
||||
long size = completed ? thisNodeSize : completedSize(target);
|
||||
if (size >= target)
|
||||
return true;
|
||||
for (SliceTask<P_IN, P_OUT> parent = getParent(), node = this;
|
||||
parent != null;
|
||||
node = parent, parent = parent.getParent()) {
|
||||
if (node == parent.rightChild) {
|
||||
SliceTask<P_IN, P_OUT> left = parent.leftChild;
|
||||
if (left != null) {
|
||||
size += left.completedSize(target);
|
||||
if (size >= target)
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
return size >= target;
|
||||
}
|
||||
|
||||
/**
|
||||
* Compute the number of completed elements in this node.
|
||||
* <p>
|
||||
* Computation terminates if all nodes have been processed or the
|
||||
* number of completed elements is greater than or equal to the target
|
||||
* size.
|
||||
*
|
||||
* @param target the target size
|
||||
* @return return the number of completed elements
|
||||
*/
|
||||
private long completedSize(long target) {
|
||||
if (completed)
|
||||
return thisNodeSize;
|
||||
else if (isLeaf())
|
||||
return 0;
|
||||
else {
|
||||
long leftSize = 0;
|
||||
for (SliceTask<P_IN, P_OUT> child = leftChild, p = null; child != p;
|
||||
p = child, child = rightChild) {
|
||||
if (child.completed)
|
||||
leftSize += child.thisNodeSize;
|
||||
else {
|
||||
leftSize += child.leftSize();
|
||||
break;
|
||||
}
|
||||
}
|
||||
return leftSize;
|
||||
}
|
||||
}
|
||||
|
||||
private void visit(List<Node<P_OUT>> results, int offset) {
|
||||
if (!isLeaf()) {
|
||||
for (SliceTask<P_IN, P_OUT> child = leftChild, p = null; child != p;
|
||||
p = child, child = rightChild) {
|
||||
child.visit(results, offset);
|
||||
offset += child.thisNodeSize;
|
||||
}
|
||||
}
|
||||
else {
|
||||
if (results.size() == 0) {
|
||||
if (offset + thisNodeSize >= targetOffset)
|
||||
results.add(truncateNode(getLocalResult(),
|
||||
Math.max(0, targetOffset - offset),
|
||||
targetSize >= 0 ? Math.max(0, offset + thisNodeSize - (targetOffset + targetSize)) : 0));
|
||||
SliceTask<P_IN, P_OUT> left = leftChild;
|
||||
SliceTask<P_IN, P_OUT> right = rightChild;
|
||||
if (left == null || right == null) {
|
||||
// must be completed
|
||||
return thisNodeSize;
|
||||
}
|
||||
else {
|
||||
if (targetSize == -1 || offset < targetOffset + targetSize) {
|
||||
results.add(truncateNode(getLocalResult(),
|
||||
0,
|
||||
targetSize >= 0 ? Math.max(0, offset + thisNodeSize - (targetOffset + targetSize)) : 0));
|
||||
}
|
||||
long leftSize = left.completedSize(target);
|
||||
return (leftSize >= target) ? leftSize : leftSize + right.completedSize(target);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a new node describing the result of truncating an existing Node
|
||||
* at the left and/or right.
|
||||
*/
|
||||
private Node<P_OUT> truncateNode(Node<P_OUT> input,
|
||||
long skipLeft, long skipRight) {
|
||||
if (skipLeft == 0 && skipRight == 0)
|
||||
return input;
|
||||
else {
|
||||
return truncateNode(input, skipLeft, thisNodeSize - skipRight, generator);
|
||||
}
|
||||
}
|
||||
/**
|
||||
* Truncate a {@link Node}, returning a node describing a subsequence of
|
||||
* the contents of the input node.
|
||||
*
|
||||
* @param <T> the type of elements of the input node and truncated node
|
||||
* @param input the input node
|
||||
* @param from the starting offset to include in the truncated node (inclusive)
|
||||
* @param to the ending offset ot include in the truncated node (exclusive)
|
||||
* @param generator the array factory (only used for reference nodes)
|
||||
* @return the truncated node
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
private static <T> Node<T> truncateNode(Node<T> input, long from, long to, IntFunction<T[]> generator) {
|
||||
StreamShape shape = input.getShape();
|
||||
long size = truncatedSize(input.count(), from, to);
|
||||
if (size == 0)
|
||||
return Nodes.emptyNode(shape);
|
||||
else if (from == 0 && to >= input.count())
|
||||
return input;
|
||||
|
||||
switch (shape) {
|
||||
case REFERENCE: {
|
||||
Spliterator<T> spliterator = input.spliterator();
|
||||
Node.Builder<T> nodeBuilder = Nodes.builder(size, generator);
|
||||
nodeBuilder.begin(size);
|
||||
for (int i = 0; i < from && spliterator.tryAdvance(e -> { }); i++) { }
|
||||
for (int i = 0; (i < size) && spliterator.tryAdvance(nodeBuilder); i++) { }
|
||||
nodeBuilder.end();
|
||||
return nodeBuilder.build();
|
||||
}
|
||||
case INT_VALUE: {
|
||||
Spliterator.OfInt spliterator = ((Node.OfInt) input).spliterator();
|
||||
Node.Builder.OfInt nodeBuilder = Nodes.intBuilder(size);
|
||||
nodeBuilder.begin(size);
|
||||
for (int i = 0; i < from && spliterator.tryAdvance((IntConsumer) e -> { }); i++) { }
|
||||
for (int i = 0; (i < size) && spliterator.tryAdvance((IntConsumer) nodeBuilder); i++) { }
|
||||
nodeBuilder.end();
|
||||
return (Node<T>) nodeBuilder.build();
|
||||
}
|
||||
case LONG_VALUE: {
|
||||
Spliterator.OfLong spliterator = ((Node.OfLong) input).spliterator();
|
||||
Node.Builder.OfLong nodeBuilder = Nodes.longBuilder(size);
|
||||
nodeBuilder.begin(size);
|
||||
for (int i = 0; i < from && spliterator.tryAdvance((LongConsumer) e -> { }); i++) { }
|
||||
for (int i = 0; (i < size) && spliterator.tryAdvance((LongConsumer) nodeBuilder); i++) { }
|
||||
nodeBuilder.end();
|
||||
return (Node<T>) nodeBuilder.build();
|
||||
}
|
||||
case DOUBLE_VALUE: {
|
||||
Spliterator.OfDouble spliterator = ((Node.OfDouble) input).spliterator();
|
||||
Node.Builder.OfDouble nodeBuilder = Nodes.doubleBuilder(size);
|
||||
nodeBuilder.begin(size);
|
||||
for (int i = 0; i < from && spliterator.tryAdvance((DoubleConsumer) e -> { }); i++) { }
|
||||
for (int i = 0; (i < size) && spliterator.tryAdvance((DoubleConsumer) nodeBuilder); i++) { }
|
||||
nodeBuilder.end();
|
||||
return (Node<T>) nodeBuilder.build();
|
||||
}
|
||||
default:
|
||||
throw new IllegalStateException("Unknown shape " + shape);
|
||||
}
|
||||
}
|
||||
|
||||
private static long truncatedSize(long size, long from, long to) {
|
||||
if (from >= 0)
|
||||
size = Math.max(0, size - from);
|
||||
long limit = to - from;
|
||||
if (limit >= 0)
|
||||
size = Math.min(size, limit);
|
||||
return size;
|
||||
}
|
||||
|
||||
/**
|
||||
* Produces a concatenated {@link Node} that has two or more children.
|
||||
* <p>The count of the concatenated node is equal to the sum of the count
|
||||
* of each child. Traversal of the concatenated node traverses the content
|
||||
* of each child in encounter order of the list of children. Splitting a
|
||||
* spliterator obtained from the concatenated node preserves the encounter
|
||||
* order of the list of children.
|
||||
*
|
||||
* <p>The result may be a concatenated node, the input sole node if the size
|
||||
* of the list is 1, or an empty node.
|
||||
*
|
||||
* @param <T> the type of elements of the concatenated node
|
||||
* @param shape the shape of the concatenated node to be created
|
||||
* @param nodes the input nodes
|
||||
* @return a {@code Node} covering the elements of the input nodes
|
||||
* @throws IllegalStateException if all {@link Node} elements of the list
|
||||
* are an not instance of type supported by this factory.
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
private static <T> Node<T> conc(StreamShape shape, List<? extends Node<T>> nodes) {
|
||||
int size = nodes.size();
|
||||
if (size == 0)
|
||||
return Nodes.emptyNode(shape);
|
||||
else if (size == 1)
|
||||
return nodes.get(0);
|
||||
else {
|
||||
// Create a right-balanced tree when there are more that 2 nodes
|
||||
List<Node<T>> refNodes = (List<Node<T>>) nodes;
|
||||
Node<T> c = Nodes.conc(shape, refNodes.get(size - 2), refNodes.get(size - 1));
|
||||
for (int i = size - 3; i >= 0; i--) {
|
||||
c = Nodes.conc(shape, refNodes.get(i), c);
|
||||
}
|
||||
return c;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -880,14 +880,7 @@ public interface Stream<T> extends BaseStream<T, Stream<T>> {
|
||||
*/
|
||||
public static<T> Stream<T> generate(Supplier<T> s) {
|
||||
Objects.requireNonNull(s);
|
||||
return StreamSupport.stream(Spliterators.spliteratorUnknownSize(
|
||||
new Iterator<T>() {
|
||||
@Override
|
||||
public boolean hasNext() { return true; }
|
||||
|
||||
@Override
|
||||
public T next() { return s.get(); }
|
||||
},
|
||||
Spliterator.ORDERED | Spliterator.IMMUTABLE));
|
||||
return StreamSupport.stream(
|
||||
new StreamSpliterators.InfiniteSupplyingSpliterator.OfRef<>(Long.MAX_VALUE, s));
|
||||
}
|
||||
}
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@ -79,11 +79,11 @@ public abstract class OpTestCase extends LoggingTestCase {
|
||||
* test.
|
||||
*
|
||||
* @param actual the actual result
|
||||
* @param excepted the expected result
|
||||
* @param expected the expected result
|
||||
* @param isOrdered true if the pipeline is ordered
|
||||
* @param isParallel true if the pipeline is parallel
|
||||
*/
|
||||
void assertResult(R actual, R excepted, boolean isOrdered, boolean isParallel);
|
||||
void assertResult(R actual, R expected, boolean isOrdered, boolean isParallel);
|
||||
}
|
||||
|
||||
// Exercise stream operations
|
||||
|
||||
@ -42,11 +42,33 @@ import static org.testng.Assert.fail;
|
||||
*/
|
||||
public class SpliteratorTestHelper {
|
||||
|
||||
public interface ContentAsserter<T> {
|
||||
void assertContents(Collection<T> actual, Collection<T> expected, boolean isOrdered);
|
||||
}
|
||||
|
||||
private static ContentAsserter<Object> DEFAULT_CONTENT_ASSERTER
|
||||
= SpliteratorTestHelper::assertContents;
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private static <T> ContentAsserter<T> defaultContentAsserter() {
|
||||
return (ContentAsserter<T>) DEFAULT_CONTENT_ASSERTER;
|
||||
}
|
||||
|
||||
public static void testSpliterator(Supplier<Spliterator<Integer>> supplier) {
|
||||
testSpliterator(supplier, (Consumer<Integer> b) -> b);
|
||||
testSpliterator(supplier, defaultContentAsserter());
|
||||
}
|
||||
|
||||
public static void testSpliterator(Supplier<Spliterator<Integer>> supplier,
|
||||
ContentAsserter<Integer> asserter) {
|
||||
testSpliterator(supplier, (Consumer<Integer> b) -> b, asserter);
|
||||
}
|
||||
|
||||
public static void testIntSpliterator(Supplier<Spliterator.OfInt> supplier) {
|
||||
testIntSpliterator(supplier, defaultContentAsserter());
|
||||
}
|
||||
|
||||
public static void testIntSpliterator(Supplier<Spliterator.OfInt> supplier,
|
||||
ContentAsserter<Integer> asserter) {
|
||||
class BoxingAdapter implements Consumer<Integer>, IntConsumer {
|
||||
private final Consumer<Integer> b;
|
||||
|
||||
@ -65,10 +87,15 @@ public class SpliteratorTestHelper {
|
||||
}
|
||||
}
|
||||
|
||||
testSpliterator(supplier, BoxingAdapter::new);
|
||||
testSpliterator(supplier, BoxingAdapter::new, asserter);
|
||||
}
|
||||
|
||||
public static void testLongSpliterator(Supplier<Spliterator.OfLong> supplier) {
|
||||
testLongSpliterator(supplier, defaultContentAsserter());
|
||||
}
|
||||
|
||||
public static void testLongSpliterator(Supplier<Spliterator.OfLong> supplier,
|
||||
ContentAsserter<Long> asserter) {
|
||||
class BoxingAdapter implements Consumer<Long>, LongConsumer {
|
||||
private final Consumer<Long> b;
|
||||
|
||||
@ -87,10 +114,15 @@ public class SpliteratorTestHelper {
|
||||
}
|
||||
}
|
||||
|
||||
testSpliterator(supplier, BoxingAdapter::new);
|
||||
testSpliterator(supplier, BoxingAdapter::new, asserter);
|
||||
}
|
||||
|
||||
public static void testDoubleSpliterator(Supplier<Spliterator.OfDouble> supplier) {
|
||||
testDoubleSpliterator(supplier, defaultContentAsserter());
|
||||
}
|
||||
|
||||
public static void testDoubleSpliterator(Supplier<Spliterator.OfDouble> supplier,
|
||||
ContentAsserter<Double> asserter) {
|
||||
class BoxingAdapter implements Consumer<Double>, DoubleConsumer {
|
||||
private final Consumer<Double> b;
|
||||
|
||||
@ -109,11 +141,12 @@ public class SpliteratorTestHelper {
|
||||
}
|
||||
}
|
||||
|
||||
testSpliterator(supplier, BoxingAdapter::new);
|
||||
testSpliterator(supplier, BoxingAdapter::new, asserter);
|
||||
}
|
||||
|
||||
static <T, S extends Spliterator<T>> void testSpliterator(Supplier<S> supplier,
|
||||
UnaryOperator<Consumer<T>> boxingAdapter) {
|
||||
UnaryOperator<Consumer<T>> boxingAdapter,
|
||||
ContentAsserter<T> asserter) {
|
||||
ArrayList<T> fromForEach = new ArrayList<>();
|
||||
Spliterator<T> spliterator = supplier.get();
|
||||
Consumer<T> addToFromForEach = boxingAdapter.apply(fromForEach::add);
|
||||
@ -121,14 +154,14 @@ public class SpliteratorTestHelper {
|
||||
|
||||
Collection<T> exp = Collections.unmodifiableList(fromForEach);
|
||||
|
||||
testForEach(exp, supplier, boxingAdapter);
|
||||
testTryAdvance(exp, supplier, boxingAdapter);
|
||||
testMixedTryAdvanceForEach(exp, supplier, boxingAdapter);
|
||||
testMixedTraverseAndSplit(exp, supplier, boxingAdapter);
|
||||
testForEach(exp, supplier, boxingAdapter, asserter);
|
||||
testTryAdvance(exp, supplier, boxingAdapter, asserter);
|
||||
testMixedTryAdvanceForEach(exp, supplier, boxingAdapter, asserter);
|
||||
testMixedTraverseAndSplit(exp, supplier, boxingAdapter, asserter);
|
||||
testSplitAfterFullTraversal(supplier, boxingAdapter);
|
||||
testSplitOnce(exp, supplier, boxingAdapter);
|
||||
testSplitSixDeep(exp, supplier, boxingAdapter);
|
||||
testSplitUntilNull(exp, supplier, boxingAdapter);
|
||||
testSplitOnce(exp, supplier, boxingAdapter, asserter);
|
||||
testSplitSixDeep(exp, supplier, boxingAdapter, asserter);
|
||||
testSplitUntilNull(exp, supplier, boxingAdapter, asserter);
|
||||
}
|
||||
|
||||
//
|
||||
@ -136,7 +169,8 @@ public class SpliteratorTestHelper {
|
||||
private static <T, S extends Spliterator<T>> void testForEach(
|
||||
Collection<T> exp,
|
||||
Supplier<S> supplier,
|
||||
UnaryOperator<Consumer<T>> boxingAdapter) {
|
||||
UnaryOperator<Consumer<T>> boxingAdapter,
|
||||
ContentAsserter<T> asserter) {
|
||||
S spliterator = supplier.get();
|
||||
long sizeIfKnown = spliterator.getExactSizeIfKnown();
|
||||
boolean isOrdered = spliterator.hasCharacteristics(Spliterator.ORDERED);
|
||||
@ -159,13 +193,14 @@ public class SpliteratorTestHelper {
|
||||
}
|
||||
assertEquals(fromForEach.size(), exp.size());
|
||||
|
||||
assertContents(fromForEach, exp, isOrdered);
|
||||
asserter.assertContents(fromForEach, exp, isOrdered);
|
||||
}
|
||||
|
||||
private static <T, S extends Spliterator<T>> void testTryAdvance(
|
||||
Collection<T> exp,
|
||||
Supplier<S> supplier,
|
||||
UnaryOperator<Consumer<T>> boxingAdapter) {
|
||||
UnaryOperator<Consumer<T>> boxingAdapter,
|
||||
ContentAsserter<T> asserter) {
|
||||
S spliterator = supplier.get();
|
||||
long sizeIfKnown = spliterator.getExactSizeIfKnown();
|
||||
boolean isOrdered = spliterator.hasCharacteristics(Spliterator.ORDERED);
|
||||
@ -188,13 +223,14 @@ public class SpliteratorTestHelper {
|
||||
}
|
||||
assertEquals(fromTryAdvance.size(), exp.size());
|
||||
|
||||
assertContents(fromTryAdvance, exp, isOrdered);
|
||||
asserter.assertContents(fromTryAdvance, exp, isOrdered);
|
||||
}
|
||||
|
||||
private static <T, S extends Spliterator<T>> void testMixedTryAdvanceForEach(
|
||||
Collection<T> exp,
|
||||
Supplier<S> supplier,
|
||||
UnaryOperator<Consumer<T>> boxingAdapter) {
|
||||
UnaryOperator<Consumer<T>> boxingAdapter,
|
||||
ContentAsserter<T> asserter) {
|
||||
S spliterator = supplier.get();
|
||||
long sizeIfKnown = spliterator.getExactSizeIfKnown();
|
||||
boolean isOrdered = spliterator.hasCharacteristics(Spliterator.ORDERED);
|
||||
@ -218,18 +254,14 @@ public class SpliteratorTestHelper {
|
||||
}
|
||||
assertEquals(dest.size(), exp.size());
|
||||
|
||||
if (isOrdered) {
|
||||
assertEquals(dest, exp);
|
||||
}
|
||||
else {
|
||||
assertContentsUnordered(dest, exp);
|
||||
}
|
||||
asserter.assertContents(dest, exp, isOrdered);
|
||||
}
|
||||
|
||||
private static <T, S extends Spliterator<T>> void testMixedTraverseAndSplit(
|
||||
Collection<T> exp,
|
||||
Supplier<S> supplier,
|
||||
UnaryOperator<Consumer<T>> boxingAdapter) {
|
||||
UnaryOperator<Consumer<T>> boxingAdapter,
|
||||
ContentAsserter<T> asserter) {
|
||||
S spliterator = supplier.get();
|
||||
long sizeIfKnown = spliterator.getExactSizeIfKnown();
|
||||
boolean isOrdered = spliterator.hasCharacteristics(Spliterator.ORDERED);
|
||||
@ -266,12 +298,7 @@ public class SpliteratorTestHelper {
|
||||
}
|
||||
assertEquals(dest.size(), exp.size());
|
||||
|
||||
if (isOrdered) {
|
||||
assertEquals(dest, exp);
|
||||
}
|
||||
else {
|
||||
assertContentsUnordered(dest, exp);
|
||||
}
|
||||
asserter.assertContents(dest, exp, isOrdered);
|
||||
}
|
||||
|
||||
private static <T, S extends Spliterator<T>> void testSplitAfterFullTraversal(
|
||||
@ -285,16 +312,14 @@ public class SpliteratorTestHelper {
|
||||
|
||||
// Full traversal using forEach
|
||||
spliterator = supplier.get();
|
||||
spliterator.forEachRemaining(boxingAdapter.apply(e -> {
|
||||
}));
|
||||
spliterator.forEachRemaining(boxingAdapter.apply(e -> { }));
|
||||
split = spliterator.trySplit();
|
||||
assertNull(split);
|
||||
|
||||
// Full traversal using tryAdvance then forEach
|
||||
spliterator = supplier.get();
|
||||
spliterator.tryAdvance(boxingAdapter.apply(e -> { }));
|
||||
spliterator.forEachRemaining(boxingAdapter.apply(e -> {
|
||||
}));
|
||||
spliterator.forEachRemaining(boxingAdapter.apply(e -> { }));
|
||||
split = spliterator.trySplit();
|
||||
assertNull(split);
|
||||
}
|
||||
@ -302,7 +327,8 @@ public class SpliteratorTestHelper {
|
||||
private static <T, S extends Spliterator<T>> void testSplitOnce(
|
||||
Collection<T> exp,
|
||||
Supplier<S> supplier,
|
||||
UnaryOperator<Consumer<T>> boxingAdapter) {
|
||||
UnaryOperator<Consumer<T>> boxingAdapter,
|
||||
ContentAsserter<T> asserter) {
|
||||
S spliterator = supplier.get();
|
||||
long sizeIfKnown = spliterator.getExactSizeIfKnown();
|
||||
boolean isOrdered = spliterator.hasCharacteristics(Spliterator.ORDERED);
|
||||
@ -322,13 +348,15 @@ public class SpliteratorTestHelper {
|
||||
if (s1Size >= 0 && s2Size >= 0)
|
||||
assertEquals(sizeIfKnown, s1Size + s2Size);
|
||||
}
|
||||
assertContents(fromSplit, exp, isOrdered);
|
||||
|
||||
asserter.assertContents(fromSplit, exp, isOrdered);
|
||||
}
|
||||
|
||||
private static <T, S extends Spliterator<T>> void testSplitSixDeep(
|
||||
Collection<T> exp,
|
||||
Supplier<S> supplier,
|
||||
UnaryOperator<Consumer<T>> boxingAdapter) {
|
||||
UnaryOperator<Consumer<T>> boxingAdapter,
|
||||
ContentAsserter<T> asserter) {
|
||||
S spliterator = supplier.get();
|
||||
boolean isOrdered = spliterator.hasCharacteristics(Spliterator.ORDERED);
|
||||
|
||||
@ -340,13 +368,13 @@ public class SpliteratorTestHelper {
|
||||
|
||||
// verify splitting with forEach
|
||||
splitSixDeepVisitor(depth, 0, dest, spliterator, boxingAdapter, spliterator.characteristics(), false);
|
||||
assertContents(dest, exp, isOrdered);
|
||||
asserter.assertContents(dest, exp, isOrdered);
|
||||
|
||||
// verify splitting with tryAdvance
|
||||
dest.clear();
|
||||
spliterator = supplier.get();
|
||||
splitSixDeepVisitor(depth, 0, dest, spliterator, boxingAdapter, spliterator.characteristics(), true);
|
||||
assertContents(dest, exp, isOrdered);
|
||||
asserter.assertContents(dest, exp, isOrdered);
|
||||
}
|
||||
}
|
||||
|
||||
@ -411,7 +439,8 @@ public class SpliteratorTestHelper {
|
||||
private static <T, S extends Spliterator<T>> void testSplitUntilNull(
|
||||
Collection<T> exp,
|
||||
Supplier<S> supplier,
|
||||
UnaryOperator<Consumer<T>> boxingAdapter) {
|
||||
UnaryOperator<Consumer<T>> boxingAdapter,
|
||||
ContentAsserter<T> asserter) {
|
||||
Spliterator<T> s = supplier.get();
|
||||
boolean isOrdered = s.hasCharacteristics(Spliterator.ORDERED);
|
||||
assertSpliterator(s);
|
||||
@ -420,7 +449,7 @@ public class SpliteratorTestHelper {
|
||||
Consumer<T> c = boxingAdapter.apply(splits::add);
|
||||
|
||||
testSplitUntilNull(new SplitNode<T>(c, s));
|
||||
assertContents(splits, exp, isOrdered);
|
||||
asserter.assertContents(splits, exp, isOrdered);
|
||||
}
|
||||
|
||||
private static class SplitNode<T> {
|
||||
@ -540,23 +569,10 @@ public class SpliteratorTestHelper {
|
||||
assertEquals(actual, expected);
|
||||
}
|
||||
else {
|
||||
assertContentsUnordered(actual, expected);
|
||||
LambdaTestHelpers.assertContentsUnordered(actual, expected);
|
||||
}
|
||||
}
|
||||
|
||||
private static<T> void assertContentsUnordered(Iterable<T> actual, Iterable<T> expected) {
|
||||
assertEquals(toBoxedMultiset(actual), toBoxedMultiset(expected));
|
||||
}
|
||||
|
||||
private static <T> Map<T, Integer> toBoxedMultiset(Iterable<T> c) {
|
||||
Map<T, Integer> result = new HashMap<>();
|
||||
c.forEach(e -> {
|
||||
if (result.containsKey(e)) result.put(e, result.get(e) + 1);
|
||||
else result.put(e, 1);
|
||||
});
|
||||
return result;
|
||||
}
|
||||
|
||||
static<U> void mixedTraverseAndSplit(Consumer<U> b, Spliterator<U> splTop) {
|
||||
Spliterator<U> spl1, spl2, spl3;
|
||||
splTop.tryAdvance(b);
|
||||
|
||||
@ -0,0 +1,201 @@
|
||||
/*
|
||||
* Copyright (c) 2013, Oracle and/or its affiliates. All rights reserved.
|
||||
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
|
||||
*
|
||||
* This code is free software; you can redistribute it and/or modify it
|
||||
* under the terms of the GNU General Public License version 2 only, as
|
||||
* published by the Free Software Foundation.
|
||||
*
|
||||
* This code is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
|
||||
* version 2 for more details (a copy is included in the LICENSE file that
|
||||
* accompanied this code).
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License version
|
||||
* 2 along with this work; if not, write to the Free Software Foundation,
|
||||
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
|
||||
*
|
||||
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
|
||||
* or visit www.oracle.com if you need additional information or have any
|
||||
* questions.
|
||||
*/
|
||||
package java.util.stream;
|
||||
|
||||
import org.testng.annotations.DataProvider;
|
||||
import org.testng.annotations.Test;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Spliterator;
|
||||
|
||||
import static java.util.stream.Collectors.toList;
|
||||
import static org.testng.Assert.assertEquals;
|
||||
|
||||
/**
|
||||
* @bug 8012987
|
||||
*/
|
||||
@Test
|
||||
public class SliceSpliteratorTest extends LoggingTestCase {
|
||||
|
||||
static class UnorderedContentAsserter<T> implements SpliteratorTestHelper.ContentAsserter<T> {
|
||||
Collection<T> source;
|
||||
|
||||
UnorderedContentAsserter(Collection<T> source) {
|
||||
this.source = source;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void assertContents(Collection<T> actual, Collection<T> expected, boolean isOrdered) {
|
||||
if (isOrdered) {
|
||||
assertEquals(actual, expected);
|
||||
}
|
||||
else {
|
||||
assertEquals(actual.size(), expected.size());
|
||||
assertTrue(source.containsAll(actual));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
interface SliceTester {
|
||||
void test(int size, int skip, int limit);
|
||||
}
|
||||
|
||||
@DataProvider(name = "sliceSpliteratorDataProvider")
|
||||
public static Object[][] sliceSpliteratorDataProvider() {
|
||||
List<Object[]> data = new ArrayList<>();
|
||||
|
||||
// SIZED/SUBSIZED slice spliterator
|
||||
|
||||
{
|
||||
SliceTester r = (size, skip, limit) -> {
|
||||
final Collection<Integer> source = IntStream.range(0, size).boxed().collect(toList());
|
||||
|
||||
SpliteratorTestHelper.testSpliterator(() -> {
|
||||
Spliterator<Integer> s = Arrays.spliterator(source.stream().toArray(Integer[]::new));
|
||||
|
||||
return new StreamSpliterators.SliceSpliterator.OfRef<>(s, skip, limit);
|
||||
});
|
||||
};
|
||||
data.add(new Object[]{"StreamSpliterators.SliceSpliterator.OfRef", r});
|
||||
}
|
||||
|
||||
{
|
||||
SliceTester r = (size, skip, limit) -> {
|
||||
final Collection<Integer> source = IntStream.range(0, size).boxed().collect(toList());
|
||||
|
||||
SpliteratorTestHelper.testIntSpliterator(() -> {
|
||||
Spliterator.OfInt s = Arrays.spliterator(source.stream().mapToInt(i->i).toArray());
|
||||
|
||||
return new StreamSpliterators.SliceSpliterator.OfInt(s, skip, limit);
|
||||
});
|
||||
};
|
||||
data.add(new Object[]{"StreamSpliterators.SliceSpliterator.OfInt", r});
|
||||
}
|
||||
|
||||
{
|
||||
SliceTester r = (size, skip, limit) -> {
|
||||
final Collection<Long> source = LongStream.range(0, size).boxed().collect(toList());
|
||||
|
||||
SpliteratorTestHelper.testLongSpliterator(() -> {
|
||||
Spliterator.OfLong s = Arrays.spliterator(source.stream().mapToLong(i->i).toArray());
|
||||
|
||||
return new StreamSpliterators.SliceSpliterator.OfLong(s, skip, limit);
|
||||
});
|
||||
};
|
||||
data.add(new Object[]{"StreamSpliterators.SliceSpliterator.OfLong", r});
|
||||
}
|
||||
|
||||
{
|
||||
SliceTester r = (size, skip, limit) -> {
|
||||
final Collection<Double> source = LongStream.range(0, size).asDoubleStream().boxed().collect(toList());
|
||||
|
||||
SpliteratorTestHelper.testDoubleSpliterator(() -> {
|
||||
Spliterator.OfDouble s = Arrays.spliterator(source.stream().mapToDouble(i->i).toArray());
|
||||
|
||||
return new StreamSpliterators.SliceSpliterator.OfDouble(s, skip, limit);
|
||||
});
|
||||
};
|
||||
data.add(new Object[]{"StreamSpliterators.SliceSpliterator.OfLong", r});
|
||||
}
|
||||
|
||||
|
||||
// Unordered slice spliterator
|
||||
|
||||
{
|
||||
SliceTester r = (size, skip, limit) -> {
|
||||
final Collection<Integer> source = IntStream.range(0, size).boxed().collect(toList());
|
||||
final UnorderedContentAsserter<Integer> uca = new UnorderedContentAsserter<>(source);
|
||||
|
||||
SpliteratorTestHelper.testSpliterator(() -> {
|
||||
Spliterator<Integer> s = Arrays.spliterator(source.stream().toArray(Integer[]::new));
|
||||
|
||||
return new StreamSpliterators.UnorderedSliceSpliterator.OfRef<>(s, skip, limit);
|
||||
}, uca);
|
||||
};
|
||||
data.add(new Object[]{"StreamSpliterators.UnorderedSliceSpliterator.OfRef", r});
|
||||
}
|
||||
|
||||
{
|
||||
SliceTester r = (size, skip, limit) -> {
|
||||
final Collection<Integer> source = IntStream.range(0, size).boxed().collect(toList());
|
||||
final UnorderedContentAsserter<Integer> uca = new UnorderedContentAsserter<>(source);
|
||||
|
||||
SpliteratorTestHelper.testIntSpliterator(() -> {
|
||||
Spliterator.OfInt s = Arrays.spliterator(source.stream().mapToInt(i->i).toArray());
|
||||
|
||||
return new StreamSpliterators.UnorderedSliceSpliterator.OfInt(s, skip, limit);
|
||||
}, uca);
|
||||
};
|
||||
data.add(new Object[]{"StreamSpliterators.UnorderedSliceSpliterator.OfInt", r});
|
||||
}
|
||||
|
||||
{
|
||||
SliceTester r = (size, skip, limit) -> {
|
||||
final Collection<Long> source = LongStream.range(0, size).boxed().collect(toList());
|
||||
final UnorderedContentAsserter<Long> uca = new UnorderedContentAsserter<>(source);
|
||||
|
||||
SpliteratorTestHelper.testLongSpliterator(() -> {
|
||||
Spliterator.OfLong s = Arrays.spliterator(source.stream().mapToLong(i->i).toArray());
|
||||
|
||||
return new StreamSpliterators.UnorderedSliceSpliterator.OfLong(s, skip, limit);
|
||||
}, uca);
|
||||
};
|
||||
data.add(new Object[]{"StreamSpliterators.UnorderedSliceSpliterator.OfLong", r});
|
||||
}
|
||||
|
||||
{
|
||||
SliceTester r = (size, skip, limit) -> {
|
||||
final Collection<Double> source = LongStream.range(0, size).asDoubleStream().boxed().collect(toList());
|
||||
final UnorderedContentAsserter<Double> uca = new UnorderedContentAsserter<>(source);
|
||||
|
||||
SpliteratorTestHelper.testDoubleSpliterator(() -> {
|
||||
Spliterator.OfDouble s = Arrays.spliterator(LongStream.range(0, SIZE).asDoubleStream().toArray());
|
||||
|
||||
return new StreamSpliterators.UnorderedSliceSpliterator.OfDouble(s, skip, limit);
|
||||
}, uca);
|
||||
};
|
||||
data.add(new Object[]{"StreamSpliterators.UnorderedSliceSpliterator.OfLong", r});
|
||||
}
|
||||
|
||||
return data.toArray(new Object[0][]);
|
||||
}
|
||||
|
||||
static final int SIZE = 256;
|
||||
|
||||
static final int STEP = 32;
|
||||
|
||||
@Test(dataProvider = "sliceSpliteratorDataProvider")
|
||||
public void testSliceSpliterator(String description, SliceTester r) {
|
||||
setContext("size", SIZE);
|
||||
for (int skip = 0; skip < SIZE; skip += STEP) {
|
||||
setContext("skip", skip);
|
||||
for (int limit = 0; limit < SIZE; limit += STEP) {
|
||||
setContext("limit", skip);
|
||||
r.test(SIZE, skip, limit);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -80,8 +80,8 @@ public class StreamFlagsTest {
|
||||
EnumSet.of(ORDERED, DISTINCT, SIZED),
|
||||
EnumSet.of(SORTED, SHORT_CIRCUIT));
|
||||
assertFlags(OpTestCase.getStreamFlags(repeat),
|
||||
EnumSet.of(ORDERED),
|
||||
EnumSet.of(SIZED, DISTINCT, SORTED, SHORT_CIRCUIT));
|
||||
EnumSet.noneOf(StreamOpFlag.class),
|
||||
EnumSet.of(DISTINCT, SORTED, SHORT_CIRCUIT));
|
||||
}
|
||||
|
||||
public void testFilter() {
|
||||
|
||||
@ -22,45 +22,440 @@
|
||||
*/
|
||||
package org.openjdk.tests.java.util.stream;
|
||||
|
||||
import java.util.stream.OpTestCase;
|
||||
import org.testng.annotations.DataProvider;
|
||||
import org.testng.annotations.Test;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.lang.reflect.InvocationHandler;
|
||||
import java.lang.reflect.Method;
|
||||
import java.lang.reflect.Proxy;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.Spliterator;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.UnaryOperator;
|
||||
import java.util.stream.DoubleStream;
|
||||
import java.util.stream.DoubleStreamTestScenario;
|
||||
import java.util.stream.IntStream;
|
||||
import java.util.stream.IntStreamTestScenario;
|
||||
import java.util.stream.LambdaTestHelpers;
|
||||
import java.util.stream.LongStream;
|
||||
import java.util.stream.LongStreamTestScenario;
|
||||
import java.util.stream.OpTestCase;
|
||||
import java.util.stream.Stream;
|
||||
import java.util.stream.StreamSupport;
|
||||
import java.util.stream.StreamTestScenario;
|
||||
import java.util.stream.TestData;
|
||||
|
||||
import static java.util.stream.LambdaTestHelpers.assertContents;
|
||||
import static java.util.stream.LambdaTestHelpers.assertUnique;
|
||||
|
||||
|
||||
@Test
|
||||
public class InfiniteStreamWithLimitOpTest extends OpTestCase {
|
||||
|
||||
private static final List<String> tenAs = Arrays.asList("A", "A", "A", "A", "A", "A", "A", "A", "A", "A");
|
||||
private static final long SKIP_LIMIT_SIZE = 1 << 16;
|
||||
|
||||
public void testRepeatLimit() {
|
||||
assertContents(Stream.generate(() -> "A").limit(10).iterator(), tenAs.iterator());
|
||||
@DataProvider(name = "Stream.limit")
|
||||
@SuppressWarnings("rawtypes")
|
||||
public static Object[][] sliceFunctionsDataProvider() {
|
||||
Function<String, String> f = s -> String.format(s, SKIP_LIMIT_SIZE);
|
||||
|
||||
List<Object[]> data = new ArrayList<>();
|
||||
|
||||
data.add(new Object[]{f.apply("Stream.limit(%d)"),
|
||||
(UnaryOperator<Stream>) s -> s.limit(SKIP_LIMIT_SIZE)});
|
||||
data.add(new Object[]{f.apply("Stream.substream(%d)"),
|
||||
(UnaryOperator<Stream>) s -> s.substream(SKIP_LIMIT_SIZE, SKIP_LIMIT_SIZE * 2)});
|
||||
data.add(new Object[]{f.apply("Stream.substream(%1$d).limit(%1$d)"),
|
||||
(UnaryOperator<Stream>) s -> s.substream(SKIP_LIMIT_SIZE).limit(SKIP_LIMIT_SIZE)});
|
||||
|
||||
return data.toArray(new Object[0][]);
|
||||
}
|
||||
|
||||
public void testIterateLimit() {
|
||||
assertContents(Stream.iterate("A", s -> s).limit(10).iterator(), tenAs.iterator());
|
||||
@DataProvider(name = "IntStream.limit")
|
||||
public static Object[][] intSliceFunctionsDataProvider() {
|
||||
Function<String, String> f = s -> String.format(s, SKIP_LIMIT_SIZE);
|
||||
|
||||
List<Object[]> data = new ArrayList<>();
|
||||
|
||||
data.add(new Object[]{f.apply("IntStream.limit(%d)"),
|
||||
(UnaryOperator<IntStream>) s -> s.limit(SKIP_LIMIT_SIZE)});
|
||||
data.add(new Object[]{f.apply("IntStream.substream(%d)"),
|
||||
(UnaryOperator<IntStream>) s -> s.substream(SKIP_LIMIT_SIZE, SKIP_LIMIT_SIZE * 2)});
|
||||
data.add(new Object[]{f.apply("IntStream.substream(%1$d).limit(%1$d)"),
|
||||
(UnaryOperator<IntStream>) s -> s.substream(SKIP_LIMIT_SIZE).limit(SKIP_LIMIT_SIZE)});
|
||||
|
||||
return data.toArray(new Object[0][]);
|
||||
}
|
||||
|
||||
public void testIterateFibLimit() {
|
||||
Stream<Integer> fib = Stream.iterate(new int[] {0, 1}, pair -> new int[] {pair[1], pair[0] + pair[1]})
|
||||
.map(pair -> pair[0]);
|
||||
@DataProvider(name = "LongStream.limit")
|
||||
public static Object[][] longSliceFunctionsDataProvider() {
|
||||
Function<String, String> f = s -> String.format(s, SKIP_LIMIT_SIZE);
|
||||
|
||||
assertContents(
|
||||
fib.limit(10).iterator(),
|
||||
Arrays.asList(0, 1, 1, 2, 3, 5, 8, 13, 21, 34).iterator());
|
||||
List<Object[]> data = new ArrayList<>();
|
||||
|
||||
data.add(new Object[]{f.apply("LongStream.limit(%d)"),
|
||||
(UnaryOperator<LongStream>) s -> s.limit(SKIP_LIMIT_SIZE)});
|
||||
data.add(new Object[]{f.apply("LongStream.substream(%d)"),
|
||||
(UnaryOperator<LongStream>) s -> s.substream(SKIP_LIMIT_SIZE, SKIP_LIMIT_SIZE * 2)});
|
||||
data.add(new Object[]{f.apply("LongStream.substream(%1$d).limit(%1$d)"),
|
||||
(UnaryOperator<LongStream>) s -> s.substream(SKIP_LIMIT_SIZE).limit(SKIP_LIMIT_SIZE)});
|
||||
|
||||
return data.toArray(new Object[0][]);
|
||||
}
|
||||
|
||||
public void testInfiniteWithLimitToShortCircuitTerminal() {
|
||||
Object[] array = Stream.generate(() -> 1).limit(4).toArray();
|
||||
assertEquals(4, array.length);
|
||||
array = Stream.generate(() -> 1).limit(4).filter(i -> true).toArray();
|
||||
assertEquals(4, array.length);
|
||||
List<Integer> result = Stream.generate(() -> 1).limit(4).collect(Collectors.toList());
|
||||
assertEquals(result, Arrays.asList(1, 1, 1, 1));
|
||||
@DataProvider(name = "DoubleStream.limit")
|
||||
public static Object[][] doubleSliceFunctionsDataProvider() {
|
||||
Function<String, String> f = s -> String.format(s, SKIP_LIMIT_SIZE);
|
||||
|
||||
List<Object[]> data = new ArrayList<>();
|
||||
|
||||
data.add(new Object[]{f.apply("DoubleStream.limit(%d)"),
|
||||
(UnaryOperator<DoubleStream>) s -> s.limit(SKIP_LIMIT_SIZE)});
|
||||
data.add(new Object[]{f.apply("DoubleStream.substream(%d)"),
|
||||
(UnaryOperator<DoubleStream>) s -> s.substream(SKIP_LIMIT_SIZE, SKIP_LIMIT_SIZE * 2)});
|
||||
data.add(new Object[]{f.apply("DoubleStream.substream(%1$d).limit(%1$d)"),
|
||||
(UnaryOperator<DoubleStream>) s -> s.substream(SKIP_LIMIT_SIZE).limit(SKIP_LIMIT_SIZE)});
|
||||
|
||||
return data.toArray(new Object[0][]);
|
||||
}
|
||||
|
||||
private <T> ResultAsserter<Iterable<T>> unorderedAsserter() {
|
||||
return (act, exp, ord, par) -> {
|
||||
if (par & !ord) {
|
||||
// Can only assert that all elements of the actual result
|
||||
// are distinct and that the count is the limit size
|
||||
// any element within the range [0, Long.MAX_VALUE) may be
|
||||
// present
|
||||
assertUnique(act);
|
||||
long count = 0;
|
||||
for (T l : act) {
|
||||
count++;
|
||||
}
|
||||
assertEquals(count, SKIP_LIMIT_SIZE, "size not equal");
|
||||
}
|
||||
else {
|
||||
LambdaTestHelpers.assertContents(act, exp);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private TestData.OfRef<Long> refLongs() {
|
||||
return refLongRange(0, Long.MAX_VALUE);
|
||||
}
|
||||
|
||||
private TestData.OfRef<Long> refLongRange(long l, long u) {
|
||||
return TestData.Factory.ofSupplier(
|
||||
String.format("[%d, %d)", l, u),
|
||||
() -> LongStream.range(l, u).boxed());
|
||||
}
|
||||
|
||||
private TestData.OfInt ints() {
|
||||
return intRange(0, Integer.MAX_VALUE);
|
||||
}
|
||||
|
||||
private TestData.OfInt intRange(int l, int u) {
|
||||
return TestData.Factory.ofIntSupplier(
|
||||
String.format("[%d, %d)", l, u),
|
||||
() -> IntStream.range(l, u));
|
||||
}
|
||||
|
||||
private TestData.OfLong longs() {
|
||||
return longRange(0, Long.MAX_VALUE);
|
||||
}
|
||||
|
||||
private TestData.OfLong longRange(long l, long u) {
|
||||
return TestData.Factory.ofLongSupplier(
|
||||
String.format("[%d, %d)", l, u),
|
||||
() -> LongStream.range(l, u));
|
||||
}
|
||||
|
||||
private TestData.OfDouble doubles() {
|
||||
return doubleRange(0, 1L << 53);
|
||||
}
|
||||
|
||||
private TestData.OfDouble doubleRange(long l, long u) {
|
||||
return TestData.Factory.ofDoubleSupplier(
|
||||
String.format("[%d, %d)", l, u),
|
||||
() -> LongStream.range(l, u).mapToDouble(i -> (double) i));
|
||||
}
|
||||
|
||||
|
||||
// Sized/subsized range
|
||||
|
||||
@Test(dataProvider = "Stream.limit")
|
||||
public void testSubsizedWithRange(String description, UnaryOperator<Stream<Long>> fs) {
|
||||
// Range is [0, Long.MAX_VALUE), splits are SUBSIZED
|
||||
// Such a size will induce out of memory errors for incorrect
|
||||
// slice implementations
|
||||
withData(refLongs()).
|
||||
stream(s -> fs.apply(s)).
|
||||
without(StreamTestScenario.PAR_STREAM_TO_ARRAY_CLEAR_SIZED).
|
||||
exercise();
|
||||
}
|
||||
|
||||
@Test(dataProvider = "IntStream.limit")
|
||||
public void testIntSubsizedWithRange(String description, UnaryOperator<IntStream> fs) {
|
||||
// Range is [0, Integer.MAX_VALUE), splits are SUBSIZED
|
||||
// Such a size will induce out of memory errors for incorrect
|
||||
// slice implementations
|
||||
withData(ints()).
|
||||
stream(s -> fs.apply(s)).
|
||||
without(IntStreamTestScenario.PAR_STREAM_TO_ARRAY_CLEAR_SIZED).
|
||||
exercise();
|
||||
}
|
||||
|
||||
@Test(dataProvider = "LongStream.limit")
|
||||
public void testLongSubsizedWithRange(String description, UnaryOperator<LongStream> fs) {
|
||||
// Range is [0, Long.MAX_VALUE), splits are SUBSIZED
|
||||
// Such a size will induce out of memory errors for incorrect
|
||||
// slice implementations
|
||||
withData(longs()).
|
||||
stream(s -> fs.apply(s)).
|
||||
without(LongStreamTestScenario.PAR_STREAM_TO_ARRAY_CLEAR_SIZED).
|
||||
exercise();
|
||||
}
|
||||
|
||||
@Test(dataProvider = "DoubleStream.limit")
|
||||
public void testDoubleSubsizedWithRange(String description, UnaryOperator<DoubleStream> fs) {
|
||||
// Range is [0, 2^53), splits are SUBSIZED
|
||||
// Such a size will induce out of memory errors for incorrect
|
||||
// slice implementations
|
||||
withData(doubles()).
|
||||
stream(s -> fs.apply(s)).
|
||||
without(DoubleStreamTestScenario.PAR_STREAM_TO_ARRAY_CLEAR_SIZED).
|
||||
exercise();
|
||||
}
|
||||
|
||||
|
||||
// Unordered finite not SIZED/SUBSIZED
|
||||
|
||||
@Test(dataProvider = "Stream.limit")
|
||||
public void testUnorderedFinite(String description, UnaryOperator<Stream<Long>> fs) {
|
||||
// Range is [0, Long.MAX_VALUE), splits are SUBSIZED
|
||||
// Such a size will induce out of memory errors for incorrect
|
||||
// slice implementations
|
||||
withData(longs()).
|
||||
stream(s -> fs.apply(s.filter(i -> true).unordered().boxed())).
|
||||
resultAsserter(unorderedAsserter()).
|
||||
exercise();
|
||||
}
|
||||
|
||||
@Test(dataProvider = "IntStream.limit")
|
||||
public void testIntUnorderedFinite(String description, UnaryOperator<IntStream> fs) {
|
||||
// Range is [0, Integer.MAX_VALUE), splits are SUBSIZED
|
||||
// Such a size will induce out of memory errors for incorrect
|
||||
// slice implementations
|
||||
withData(ints()).
|
||||
stream(s -> fs.apply(s.filter(i -> true).unordered())).
|
||||
resultAsserter(unorderedAsserter()).
|
||||
exercise();
|
||||
}
|
||||
|
||||
@Test(dataProvider = "LongStream.limit")
|
||||
public void testLongUnorderedFinite(String description, UnaryOperator<LongStream> fs) {
|
||||
// Range is [0, Long.MAX_VALUE), splits are SUBSIZED
|
||||
// Such a size will induce out of memory errors for incorrect
|
||||
// slice implementations
|
||||
withData(longs()).
|
||||
stream(s -> fs.apply(s.filter(i -> true).unordered())).
|
||||
resultAsserter(unorderedAsserter()).
|
||||
exercise();
|
||||
}
|
||||
|
||||
@Test(dataProvider = "DoubleStream.limit")
|
||||
public void testDoubleUnorderedFinite(String description, UnaryOperator<DoubleStream> fs) {
|
||||
// Range is [0, 1L << 53), splits are SUBSIZED
|
||||
// Such a size will induce out of memory errors for incorrect
|
||||
// slice implementations
|
||||
// Upper bound ensures values mapped to doubles will be unique
|
||||
withData(doubles()).
|
||||
stream(s -> fs.apply(s.filter(i -> true).unordered())).
|
||||
resultAsserter(unorderedAsserter()).
|
||||
exercise();
|
||||
}
|
||||
|
||||
|
||||
// Unordered finite not SUBSIZED
|
||||
|
||||
@SuppressWarnings({"rawtypes", "unchecked"})
|
||||
private Spliterator.OfLong proxyNotSubsized(Spliterator.OfLong s) {
|
||||
InvocationHandler ih = new InvocationHandler() {
|
||||
@Override
|
||||
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
|
||||
switch (method.getName()) {
|
||||
case "characteristics": {
|
||||
int c = (Integer) method.invoke(s, args);
|
||||
return c & ~Spliterator.SUBSIZED;
|
||||
}
|
||||
case "hasCharacteristics": {
|
||||
int c = (Integer) args[0];
|
||||
boolean b = (Boolean) method.invoke(s, args);
|
||||
return b & ((c & Spliterator.SUBSIZED) == 0);
|
||||
}
|
||||
default:
|
||||
return method.invoke(s, args);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
return (Spliterator.OfLong) Proxy.newProxyInstance(this.getClass().getClassLoader(),
|
||||
new Class[]{Spliterator.OfLong.class},
|
||||
ih);
|
||||
}
|
||||
|
||||
private TestData.OfLong proxiedLongRange(long l, long u) {
|
||||
return TestData.Factory.ofLongSupplier(
|
||||
String.format("[%d, %d)", l, u),
|
||||
() -> StreamSupport.longStream(proxyNotSubsized(LongStream.range(l, u).spliterator())));
|
||||
}
|
||||
|
||||
@Test(dataProvider = "Stream.limit")
|
||||
public void testUnorderedSizedNotSubsizedFinite(String description, UnaryOperator<Stream<Long>> fs) {
|
||||
// Range is [0, Long.MAX_VALUE), splits are not SUBSIZED (proxy clears
|
||||
// the SUBSIZED characteristic)
|
||||
// Such a size will induce out of memory errors for incorrect
|
||||
// slice implementations
|
||||
withData(proxiedLongRange(0, Long.MAX_VALUE)).
|
||||
stream(s -> fs.apply(s.unordered().boxed())).
|
||||
resultAsserter(unorderedAsserter()).
|
||||
exercise();
|
||||
}
|
||||
|
||||
@Test(dataProvider = "IntStream.limit")
|
||||
public void testIntUnorderedSizedNotSubsizedFinite(String description, UnaryOperator<IntStream> fs) {
|
||||
// Range is [0, Integer.MAX_VALUE), splits are not SUBSIZED (proxy clears
|
||||
// the SUBSIZED characteristic)
|
||||
// Such a size will induce out of memory errors for incorrect
|
||||
// slice implementations
|
||||
withData(proxiedLongRange(0, Integer.MAX_VALUE)).
|
||||
stream(s -> fs.apply(s.unordered().mapToInt(i -> (int) i))).
|
||||
resultAsserter(unorderedAsserter()).
|
||||
exercise();
|
||||
}
|
||||
|
||||
@Test(dataProvider = "LongStream.limit")
|
||||
public void testLongUnorderedSizedNotSubsizedFinite(String description, UnaryOperator<LongStream> fs) {
|
||||
// Range is [0, Long.MAX_VALUE), splits are not SUBSIZED (proxy clears
|
||||
// the SUBSIZED characteristic)
|
||||
// Such a size will induce out of memory errors for incorrect
|
||||
// slice implementations
|
||||
withData(proxiedLongRange(0, Long.MAX_VALUE)).
|
||||
stream(s -> fs.apply(s.unordered())).
|
||||
resultAsserter(unorderedAsserter()).
|
||||
exercise();
|
||||
}
|
||||
|
||||
@Test(dataProvider = "DoubleStream.limit")
|
||||
public void testDoubleUnorderedSizedNotSubsizedFinite(String description, UnaryOperator<DoubleStream> fs) {
|
||||
// Range is [0, Double.MAX_VALUE), splits are not SUBSIZED (proxy clears
|
||||
// the SUBSIZED characteristic)
|
||||
// Such a size will induce out of memory errors for incorrect
|
||||
// slice implementations
|
||||
withData(proxiedLongRange(0, 1L << 53)).
|
||||
stream(s -> fs.apply(s.unordered().mapToDouble(i -> (double) i))).
|
||||
resultAsserter(unorderedAsserter()).
|
||||
exercise();
|
||||
}
|
||||
|
||||
|
||||
// Unordered generation
|
||||
|
||||
@Test(dataProvider = "Stream.limit")
|
||||
public void testUnorderedGenerator(String description, UnaryOperator<Stream<Long>> fs) {
|
||||
// Source is spliterator of infinite size
|
||||
TestData.OfRef<Long> generator = TestData.Factory.ofSupplier(
|
||||
"[1L, 1L, ...]", () -> Stream.generate(() -> 1L));
|
||||
|
||||
withData(generator).
|
||||
stream(s -> fs.apply(s.filter(i -> true).unordered())).
|
||||
exercise();
|
||||
}
|
||||
|
||||
@Test(dataProvider = "IntStream.limit")
|
||||
public void testIntUnorderedGenerator(String description, UnaryOperator<IntStream> fs) {
|
||||
// Source is spliterator of infinite size
|
||||
TestData.OfInt generator = TestData.Factory.ofIntSupplier(
|
||||
"[1, 1, ...]", () -> IntStream.generate(() -> 1));
|
||||
|
||||
withData(generator).
|
||||
stream(s -> fs.apply(s.filter(i -> true).unordered())).
|
||||
exercise();
|
||||
}
|
||||
|
||||
@Test(dataProvider = "LongStream.limit")
|
||||
public void testLongUnorderedGenerator(String description, UnaryOperator<LongStream> fs) {
|
||||
// Source is spliterator of infinite size
|
||||
TestData.OfLong generator = TestData.Factory.ofLongSupplier(
|
||||
"[1L, 1L, ...]", () -> LongStream.generate(() -> 1));
|
||||
|
||||
withData(generator).
|
||||
stream(s -> fs.apply(s.filter(i -> true).unordered())).
|
||||
exercise();
|
||||
}
|
||||
|
||||
@Test(dataProvider = "DoubleStream.limit")
|
||||
public void testDoubleUnorderedGenerator(String description, UnaryOperator<DoubleStream> fs) {
|
||||
// Source is spliterator of infinite size
|
||||
TestData.OfDouble generator = TestData.Factory.ofDoubleSupplier(
|
||||
"[1.0, 1.0, ...]", () -> DoubleStream.generate(() -> 1.0));
|
||||
|
||||
withData(generator).
|
||||
stream(s -> fs.apply(s.filter(i -> true).unordered())).
|
||||
exercise();
|
||||
}
|
||||
|
||||
|
||||
// Unordered iteration
|
||||
|
||||
@Test(dataProvider = "Stream.limit")
|
||||
public void testUnorderedIteration(String description, UnaryOperator<Stream<Long>> fs) {
|
||||
// Source is a right-balanced tree of infinite size
|
||||
TestData.OfRef<Long> iterator = TestData.Factory.ofSupplier(
|
||||
"[1L, 2L, 3L, ...]", () -> Stream.iterate(1L, i -> i + 1L));
|
||||
|
||||
// Ref
|
||||
withData(iterator).
|
||||
stream(s -> fs.apply(s.unordered())).
|
||||
resultAsserter(unorderedAsserter()).
|
||||
exercise();
|
||||
}
|
||||
|
||||
@Test(dataProvider = "IntStream.limit")
|
||||
public void testIntUnorderedIteration(String description, UnaryOperator<IntStream> fs) {
|
||||
// Source is a right-balanced tree of infinite size
|
||||
TestData.OfInt iterator = TestData.Factory.ofIntSupplier(
|
||||
"[1, 2, 3, ...]", () -> IntStream.iterate(1, i -> i + 1));
|
||||
|
||||
// Ref
|
||||
withData(iterator).
|
||||
stream(s -> fs.apply(s.unordered())).
|
||||
resultAsserter(unorderedAsserter()).
|
||||
exercise();
|
||||
}
|
||||
|
||||
@Test(dataProvider = "LongStream.limit")
|
||||
public void testLongUnorderedIteration(String description, UnaryOperator<LongStream> fs) {
|
||||
// Source is a right-balanced tree of infinite size
|
||||
TestData.OfLong iterator = TestData.Factory.ofLongSupplier(
|
||||
"[1L, 2L, 3L, ...]", () -> LongStream.iterate(1, i -> i + 1));
|
||||
|
||||
// Ref
|
||||
withData(iterator).
|
||||
stream(s -> fs.apply(s.unordered())).
|
||||
resultAsserter(unorderedAsserter()).
|
||||
exercise();
|
||||
}
|
||||
|
||||
@Test(dataProvider = "DoubleStream.limit")
|
||||
public void testDoubleUnorderedIteration(String description, UnaryOperator<DoubleStream> fs) {
|
||||
// Source is a right-balanced tree of infinite size
|
||||
TestData.OfDouble iterator = TestData.Factory.ofDoubleSupplier(
|
||||
"[1.0, 2.0, 3.0, ...]", () -> DoubleStream.iterate(1, i -> i + 1));
|
||||
|
||||
// Ref
|
||||
withData(iterator).
|
||||
stream(s -> fs.apply(s.unordered())).
|
||||
resultAsserter(unorderedAsserter()).
|
||||
exercise();
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user