mirror of
https://github.com/openjdk/jdk.git
synced 2026-03-01 11:40:33 +00:00
Merge
This commit is contained in:
commit
883dbfb63d
@ -249,6 +249,11 @@ abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>>
|
||||
// If the last intermediate operation is stateful then
|
||||
// evaluate directly to avoid an extra collection step
|
||||
if (isParallel() && previousStage != null && opIsStateful()) {
|
||||
// Set the depth of this, last, pipeline stage to zero to slice the
|
||||
// pipeline such that this operation will not be included in the
|
||||
// upstream slice and upstream operations will not be included
|
||||
// in this slice
|
||||
depth = 0;
|
||||
return opEvaluateParallel(previousStage, previousStage.sourceSpliterator(0), generator);
|
||||
}
|
||||
else {
|
||||
@ -402,47 +407,19 @@ abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>>
|
||||
throw new IllegalStateException(MSG_CONSUMED);
|
||||
}
|
||||
|
||||
boolean hasTerminalFlags = terminalFlags != 0;
|
||||
if (isParallel() && sourceStage.sourceAnyStateful) {
|
||||
// Adjust pipeline stages if there are stateful ops,
|
||||
// and find the last short circuiting op, if any, that
|
||||
// defines the head stage for back-propagation of terminal flags
|
||||
@SuppressWarnings("rawtypes")
|
||||
AbstractPipeline backPropagationHead = sourceStage;
|
||||
int depth = 1;
|
||||
for (@SuppressWarnings("rawtypes") AbstractPipeline p = sourceStage.nextStage;
|
||||
p != null;
|
||||
p = p.nextStage) {
|
||||
if (p.opIsStateful()) {
|
||||
if (StreamOpFlag.SHORT_CIRCUIT.isKnown(p.sourceOrOpFlags)) {
|
||||
// If the stateful operation is a short-circuit operation
|
||||
// then move the back propagation head forwards
|
||||
// NOTE: there are no size-injecting ops
|
||||
backPropagationHead = p;
|
||||
}
|
||||
|
||||
depth = 0;
|
||||
}
|
||||
p.depth = depth++;
|
||||
}
|
||||
|
||||
// Adapt the source spliterator, evaluating each stateful op
|
||||
// in the pipeline up to and including this pipeline stage
|
||||
// Flags for each pipeline stage are adjusted accordingly
|
||||
boolean backPropagate = false;
|
||||
int upstreamTerminalFlags = terminalFlags & StreamOpFlag.UPSTREAM_TERMINAL_OP_MASK;
|
||||
// in the pipeline up to and including this pipeline stage.
|
||||
// The depth and flags of each pipeline stage are adjusted accordingly.
|
||||
int depth = 1;
|
||||
for (@SuppressWarnings("rawtypes") AbstractPipeline u = sourceStage, p = sourceStage.nextStage, e = this;
|
||||
u != e;
|
||||
u = p, p = p.nextStage) {
|
||||
|
||||
if (hasTerminalFlags &&
|
||||
(backPropagate || (backPropagate = (u == backPropagationHead)))) {
|
||||
// Back-propagate flags from the terminal operation
|
||||
u.combinedFlags = StreamOpFlag.combineOpFlags(upstreamTerminalFlags, u.combinedFlags);
|
||||
}
|
||||
|
||||
int thisOpFlags = p.sourceOrOpFlags;
|
||||
if (p.opIsStateful()) {
|
||||
depth = 0;
|
||||
|
||||
if (StreamOpFlag.SHORT_CIRCUIT.isKnown(thisOpFlags)) {
|
||||
// Clear the short circuit flag for next pipeline stage
|
||||
// This stage encapsulates short-circuiting, the next
|
||||
@ -460,11 +437,12 @@ abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>>
|
||||
? (thisOpFlags & ~StreamOpFlag.NOT_SIZED) | StreamOpFlag.IS_SIZED
|
||||
: (thisOpFlags & ~StreamOpFlag.IS_SIZED) | StreamOpFlag.NOT_SIZED;
|
||||
}
|
||||
p.depth = depth++;
|
||||
p.combinedFlags = StreamOpFlag.combineOpFlags(thisOpFlags, u.combinedFlags);
|
||||
}
|
||||
}
|
||||
|
||||
if (hasTerminalFlags) {
|
||||
if (terminalFlags != 0) {
|
||||
// Apply flags from the terminal operation to last pipeline stage
|
||||
combinedFlags = StreamOpFlag.combineOpFlags(terminalFlags, combinedFlags);
|
||||
}
|
||||
@ -472,7 +450,6 @@ abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>>
|
||||
return spliterator;
|
||||
}
|
||||
|
||||
|
||||
// PipelineHelper
|
||||
|
||||
@Override
|
||||
|
||||
@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright (c) 2013, Oracle and/or its affiliates. All rights reserved.
|
||||
* Copyright (c) 2013, 2015, Oracle and/or its affiliates. All rights reserved.
|
||||
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
|
||||
*
|
||||
* This code is free software; you can redistribute it and/or modify it
|
||||
@ -22,7 +22,10 @@
|
||||
*/
|
||||
package java.util.stream;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.EnumSet;
|
||||
import java.util.PrimitiveIterator;
|
||||
import java.util.Set;
|
||||
import java.util.Spliterator;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.DoubleConsumer;
|
||||
@ -159,12 +162,50 @@ public enum DoubleStreamTestScenario implements OpTestCase.BaseStreamTestScenari
|
||||
for (double t : pipe2.toArray())
|
||||
b.accept(t);
|
||||
}
|
||||
},;
|
||||
},
|
||||
|
||||
// Wrap as parallel stream + forEach synchronizing
|
||||
PAR_STREAM_FOR_EACH(true, false) {
|
||||
<T, S_IN extends BaseStream<T, S_IN>>
|
||||
void _run(TestData<T, S_IN> data, DoubleConsumer b, Function<S_IN, DoubleStream> m) {
|
||||
m.apply(data.parallelStream()).forEach(e -> {
|
||||
synchronized (data) {
|
||||
b.accept(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
},
|
||||
|
||||
// Wrap as parallel stream + forEach synchronizing and clear SIZED flag
|
||||
PAR_STREAM_FOR_EACH_CLEAR_SIZED(true, false) {
|
||||
<T, S_IN extends BaseStream<T, S_IN>>
|
||||
void _run(TestData<T, S_IN> data, DoubleConsumer b, Function<S_IN, DoubleStream> m) {
|
||||
S_IN pipe1 = (S_IN) OpTestCase.chain(data.parallelStream(),
|
||||
new FlagDeclaringOp(StreamOpFlag.NOT_SIZED, data.getShape()));
|
||||
m.apply(pipe1).forEach(e -> {
|
||||
synchronized (data) {
|
||||
b.accept(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
},
|
||||
;
|
||||
|
||||
// The set of scenarios that clean the SIZED flag
|
||||
public static final Set<DoubleStreamTestScenario> CLEAR_SIZED_SCENARIOS = Collections.unmodifiableSet(
|
||||
EnumSet.of(PAR_STREAM_TO_ARRAY_CLEAR_SIZED, PAR_STREAM_FOR_EACH_CLEAR_SIZED));
|
||||
|
||||
private boolean isParallel;
|
||||
|
||||
private final boolean isOrdered;
|
||||
|
||||
DoubleStreamTestScenario(boolean isParallel) {
|
||||
this(isParallel, true);
|
||||
}
|
||||
|
||||
DoubleStreamTestScenario(boolean isParallel, boolean isOrdered) {
|
||||
this.isParallel = isParallel;
|
||||
this.isOrdered = isOrdered;
|
||||
}
|
||||
|
||||
public StreamShape getShape() {
|
||||
@ -175,6 +216,10 @@ public enum DoubleStreamTestScenario implements OpTestCase.BaseStreamTestScenari
|
||||
return isParallel;
|
||||
}
|
||||
|
||||
public boolean isOrdered() {
|
||||
return isOrdered;
|
||||
}
|
||||
|
||||
public <T, U, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>>
|
||||
void run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, S_OUT> m) {
|
||||
_run(data, (DoubleConsumer) b, (Function<S_IN, DoubleStream>) m);
|
||||
|
||||
@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
|
||||
* Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved.
|
||||
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
|
||||
*
|
||||
* This code is free software; you can redistribute it and/or modify it
|
||||
@ -22,7 +22,10 @@
|
||||
*/
|
||||
package java.util.stream;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.EnumSet;
|
||||
import java.util.PrimitiveIterator;
|
||||
import java.util.Set;
|
||||
import java.util.Spliterator;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
@ -160,12 +163,50 @@ public enum IntStreamTestScenario implements OpTestCase.BaseStreamTestScenario {
|
||||
for (int t : pipe2.toArray())
|
||||
b.accept(t);
|
||||
}
|
||||
},;
|
||||
},
|
||||
|
||||
private boolean isParallel;
|
||||
// Wrap as parallel stream + forEach synchronizing
|
||||
PAR_STREAM_FOR_EACH(true, false) {
|
||||
<T, S_IN extends BaseStream<T, S_IN>>
|
||||
void _run(TestData<T, S_IN> data, IntConsumer b, Function<S_IN, IntStream> m) {
|
||||
m.apply(data.parallelStream()).forEach(e -> {
|
||||
synchronized (data) {
|
||||
b.accept(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
},
|
||||
|
||||
// Wrap as parallel stream + forEach synchronizing and clear SIZED flag
|
||||
PAR_STREAM_FOR_EACH_CLEAR_SIZED(true, false) {
|
||||
<T, S_IN extends BaseStream<T, S_IN>>
|
||||
void _run(TestData<T, S_IN> data, IntConsumer b, Function<S_IN, IntStream> m) {
|
||||
S_IN pipe1 = (S_IN) OpTestCase.chain(data.parallelStream(),
|
||||
new FlagDeclaringOp(StreamOpFlag.NOT_SIZED, data.getShape()));
|
||||
m.apply(pipe1).forEach(e -> {
|
||||
synchronized (data) {
|
||||
b.accept(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
},
|
||||
;
|
||||
|
||||
// The set of scenarios that clean the SIZED flag
|
||||
public static final Set<IntStreamTestScenario> CLEAR_SIZED_SCENARIOS = Collections.unmodifiableSet(
|
||||
EnumSet.of(PAR_STREAM_TO_ARRAY_CLEAR_SIZED, PAR_STREAM_FOR_EACH_CLEAR_SIZED));
|
||||
|
||||
private final boolean isParallel;
|
||||
|
||||
private final boolean isOrdered;
|
||||
|
||||
IntStreamTestScenario(boolean isParallel) {
|
||||
this(isParallel, true);
|
||||
}
|
||||
|
||||
IntStreamTestScenario(boolean isParallel, boolean isOrdered) {
|
||||
this.isParallel = isParallel;
|
||||
this.isOrdered = isOrdered;
|
||||
}
|
||||
|
||||
public StreamShape getShape() {
|
||||
@ -176,6 +217,10 @@ public enum IntStreamTestScenario implements OpTestCase.BaseStreamTestScenario {
|
||||
return isParallel;
|
||||
}
|
||||
|
||||
public boolean isOrdered() {
|
||||
return isOrdered;
|
||||
}
|
||||
|
||||
public <T, U, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>>
|
||||
void run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, S_OUT> m) {
|
||||
_run(data, (IntConsumer) b, (Function<S_IN, IntStream>) m);
|
||||
|
||||
@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright (c) 2013, Oracle and/or its affiliates. All rights reserved.
|
||||
* Copyright (c) 2013, 2015, Oracle and/or its affiliates. All rights reserved.
|
||||
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
|
||||
*
|
||||
* This code is free software; you can redistribute it and/or modify it
|
||||
@ -22,7 +22,10 @@
|
||||
*/
|
||||
package java.util.stream;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.EnumSet;
|
||||
import java.util.PrimitiveIterator;
|
||||
import java.util.Set;
|
||||
import java.util.Spliterator;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
@ -159,12 +162,50 @@ public enum LongStreamTestScenario implements OpTestCase.BaseStreamTestScenario
|
||||
for (long t : pipe2.toArray())
|
||||
b.accept(t);
|
||||
}
|
||||
},;
|
||||
},
|
||||
|
||||
// Wrap as parallel stream + forEach synchronizing
|
||||
PAR_STREAM_FOR_EACH(true, false) {
|
||||
<T, S_IN extends BaseStream<T, S_IN>>
|
||||
void _run(TestData<T, S_IN> data, LongConsumer b, Function<S_IN, LongStream> m) {
|
||||
m.apply(data.parallelStream()).forEach(e -> {
|
||||
synchronized (data) {
|
||||
b.accept(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
},
|
||||
|
||||
// Wrap as parallel stream + forEach synchronizing and clear SIZED flag
|
||||
PAR_STREAM_FOR_EACH_CLEAR_SIZED(true, false) {
|
||||
<T, S_IN extends BaseStream<T, S_IN>>
|
||||
void _run(TestData<T, S_IN> data, LongConsumer b, Function<S_IN, LongStream> m) {
|
||||
S_IN pipe1 = (S_IN) OpTestCase.chain(data.parallelStream(),
|
||||
new FlagDeclaringOp(StreamOpFlag.NOT_SIZED, data.getShape()));
|
||||
m.apply(pipe1).forEach(e -> {
|
||||
synchronized (data) {
|
||||
b.accept(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
},
|
||||
;
|
||||
|
||||
// The set of scenarios that clean the SIZED flag
|
||||
public static final Set<LongStreamTestScenario> CLEAR_SIZED_SCENARIOS = Collections.unmodifiableSet(
|
||||
EnumSet.of(PAR_STREAM_TO_ARRAY_CLEAR_SIZED, PAR_STREAM_FOR_EACH_CLEAR_SIZED));
|
||||
|
||||
private boolean isParallel;
|
||||
|
||||
private final boolean isOrdered;
|
||||
|
||||
LongStreamTestScenario(boolean isParallel) {
|
||||
this(isParallel, true);
|
||||
}
|
||||
|
||||
LongStreamTestScenario(boolean isParallel, boolean isOrdered) {
|
||||
this.isParallel = isParallel;
|
||||
this.isOrdered = isOrdered;
|
||||
}
|
||||
|
||||
public StreamShape getShape() {
|
||||
@ -175,6 +216,10 @@ public enum LongStreamTestScenario implements OpTestCase.BaseStreamTestScenario
|
||||
return isParallel;
|
||||
}
|
||||
|
||||
public boolean isOrdered() {
|
||||
return isOrdered;
|
||||
}
|
||||
|
||||
public <T, U, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>>
|
||||
void run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, S_OUT> m) {
|
||||
_run(data, (LongConsumer) b, (Function<S_IN, LongStream>) m);
|
||||
|
||||
@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
|
||||
* Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved.
|
||||
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
|
||||
*
|
||||
* This code is free software; you can redistribute it and/or modify it
|
||||
@ -30,6 +30,7 @@ import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.EnumMap;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
@ -91,11 +92,13 @@ public abstract class OpTestCase extends LoggingTestCase {
|
||||
|
||||
boolean isParallel();
|
||||
|
||||
abstract <T, U, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>>
|
||||
boolean isOrdered();
|
||||
|
||||
<T, U, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>>
|
||||
void run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, S_OUT> m);
|
||||
}
|
||||
|
||||
public <T, U, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>>
|
||||
protected <T, U, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>>
|
||||
Collection<U> exerciseOps(TestData<T, S_IN> data, Function<S_IN, S_OUT> m) {
|
||||
return withData(data).stream(m).exercise();
|
||||
}
|
||||
@ -103,7 +106,7 @@ public abstract class OpTestCase extends LoggingTestCase {
|
||||
// Run multiple versions of exercise(), returning the result of the first, and asserting that others return the same result
|
||||
// If the first version is s -> s.foo(), can be used with s -> s.mapToInt(i -> i).foo().mapToObj(i -> i) to test all shape variants
|
||||
@SafeVarargs
|
||||
public final<T, U, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>>
|
||||
protected final<T, U, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>>
|
||||
Collection<U> exerciseOpsMulti(TestData<T, S_IN> data,
|
||||
Function<S_IN, S_OUT>... ms) {
|
||||
Collection<U> result = null;
|
||||
@ -121,7 +124,7 @@ public abstract class OpTestCase extends LoggingTestCase {
|
||||
// Run multiple versions of exercise() for an Integer stream, returning the result of the first, and asserting that others return the same result
|
||||
// Automates the conversion between Stream<Integer> and {Int,Long,Double}Stream and back, so client sites look like you are passing the same
|
||||
// lambda four times, but in fact they are four different lambdas since they are transforming four different kinds of streams
|
||||
public final
|
||||
protected final
|
||||
Collection<Integer> exerciseOpsInt(TestData.OfRef<Integer> data,
|
||||
Function<Stream<Integer>, Stream<Integer>> mRef,
|
||||
Function<IntStream, IntStream> mInt,
|
||||
@ -136,30 +139,73 @@ public abstract class OpTestCase extends LoggingTestCase {
|
||||
return exerciseOpsMulti(data, ms);
|
||||
}
|
||||
|
||||
public <T, U, S_OUT extends BaseStream<U, S_OUT>>
|
||||
// Run multiple versions of exercise() with multiple terminal operations for all kinds of stream, , and asserting against the expected result
|
||||
// If the first version is s -> s.foo(), can be used with s -> s.mapToInt(i -> i).foo().mapToObj(i -> i) to test all shape variants
|
||||
protected final<T, U, R, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>>
|
||||
void exerciseTerminalOpsMulti(TestData<T, S_IN> data,
|
||||
R expected,
|
||||
Map<String, Function<S_IN, S_OUT>> streams,
|
||||
Map<String, Function<S_OUT, R>> terminals) {
|
||||
for (Map.Entry<String, Function<S_IN, S_OUT>> se : streams.entrySet()) {
|
||||
setContext("Intermediate stream", se.getKey());
|
||||
for (Map.Entry<String, Function<S_OUT, R>> te : terminals.entrySet()) {
|
||||
setContext("Terminal stream", te.getKey());
|
||||
withData(data)
|
||||
.terminal(se.getValue(), te.getValue())
|
||||
.expectedResult(expected)
|
||||
.exercise();
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Run multiple versions of exercise() with multiple terminal operation for all kinds of stream, and asserting against the expected result
|
||||
// Automates the conversion between Stream<Integer> and {Int,Long,Double}Stream and back, so client sites look like you are passing the same
|
||||
// lambda four times, but in fact they are four different lambdas since they are transforming four different kinds of streams
|
||||
protected final
|
||||
void exerciseTerminalOpsInt(TestData<Integer, Stream<Integer>> data,
|
||||
Collection<Integer> expected,
|
||||
String desc,
|
||||
Function<Stream<Integer>, Stream<Integer>> mRef,
|
||||
Function<IntStream, IntStream> mInt,
|
||||
Function<LongStream, LongStream> mLong,
|
||||
Function<DoubleStream, DoubleStream> mDouble,
|
||||
Map<String, Function<Stream<Integer>, Collection<Integer>>> terminals) {
|
||||
|
||||
Map<String, Function<Stream<Integer>, Stream<Integer>>> m = new HashMap<>();
|
||||
m.put("Ref " + desc, mRef);
|
||||
m.put("Int " + desc, s -> mInt.apply(s.mapToInt(e -> e)).mapToObj(e -> e));
|
||||
m.put("Long " + desc, s -> mLong.apply(s.mapToLong(e -> e)).mapToObj(e -> (int) e));
|
||||
m.put("Double " + desc, s -> mDouble.apply(s.mapToDouble(e -> e)).mapToObj(e -> (int) e));
|
||||
|
||||
exerciseTerminalOpsMulti(data, expected, m, terminals);
|
||||
}
|
||||
|
||||
|
||||
protected <T, U, S_OUT extends BaseStream<U, S_OUT>>
|
||||
Collection<U> exerciseOps(Collection<T> data, Function<Stream<T>, S_OUT> m) {
|
||||
TestData.OfRef<T> data1 = TestData.Factory.ofCollection("Collection of type " + data.getClass().getName(), data);
|
||||
return withData(data1).stream(m).exercise();
|
||||
}
|
||||
|
||||
public <T, U, S_OUT extends BaseStream<U, S_OUT>, I extends Iterable<U>>
|
||||
protected <T, U, S_OUT extends BaseStream<U, S_OUT>, I extends Iterable<U>>
|
||||
Collection<U> exerciseOps(Collection<T> data, Function<Stream<T>, S_OUT> m, I expected) {
|
||||
TestData.OfRef<T> data1 = TestData.Factory.ofCollection("Collection of type " + data.getClass().getName(), data);
|
||||
return withData(data1).stream(m).expectedResult(expected).exercise();
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public <U, S_OUT extends BaseStream<U, S_OUT>>
|
||||
protected <U, S_OUT extends BaseStream<U, S_OUT>>
|
||||
Collection<U> exerciseOps(int[] data, Function<IntStream, S_OUT> m) {
|
||||
return withData(TestData.Factory.ofArray("int array", data)).stream(m).exercise();
|
||||
}
|
||||
|
||||
public Collection<Integer> exerciseOps(int[] data, Function<IntStream, IntStream> m, int[] expected) {
|
||||
protected Collection<Integer> exerciseOps(int[] data, Function<IntStream, IntStream> m, int[] expected) {
|
||||
TestData.OfInt data1 = TestData.Factory.ofArray("int array", data);
|
||||
return withData(data1).stream(m).expectedResult(expected).exercise();
|
||||
}
|
||||
|
||||
public <T, S_IN extends BaseStream<T, S_IN>> DataStreamBuilder<T, S_IN> withData(TestData<T, S_IN> data) {
|
||||
protected <T, S_IN extends BaseStream<T, S_IN>> DataStreamBuilder<T, S_IN> withData(TestData<T, S_IN> data) {
|
||||
Objects.requireNonNull(data);
|
||||
return new DataStreamBuilder<>(data);
|
||||
}
|
||||
@ -325,19 +371,19 @@ public abstract class OpTestCase extends LoggingTestCase {
|
||||
// Build method
|
||||
|
||||
public Collection<U> exercise() {
|
||||
final boolean isOrdered;
|
||||
final boolean isStreamOrdered;
|
||||
if (refResult == null) {
|
||||
// Induce the reference result
|
||||
before.accept(data);
|
||||
S_OUT sOut = m.apply(data.stream());
|
||||
isOrdered = StreamOpFlag.ORDERED.isKnown(((AbstractPipeline) sOut).getStreamFlags());
|
||||
isStreamOrdered = StreamOpFlag.ORDERED.isKnown(((AbstractPipeline) sOut).getStreamFlags());
|
||||
Node<U> refNodeResult = ((AbstractPipeline<?, U, ?>) sOut).evaluateToArrayNode(size -> (U[]) new Object[size]);
|
||||
refResult = LambdaTestHelpers.toBoxedList(refNodeResult.spliterator());
|
||||
after.accept(data);
|
||||
}
|
||||
else {
|
||||
S_OUT sOut = m.apply(data.stream());
|
||||
isOrdered = StreamOpFlag.ORDERED.isKnown(((AbstractPipeline) sOut).getStreamFlags());
|
||||
isStreamOrdered = StreamOpFlag.ORDERED.isKnown(((AbstractPipeline) sOut).getStreamFlags());
|
||||
}
|
||||
|
||||
List<Error> errors = new ArrayList<>();
|
||||
@ -348,7 +394,7 @@ public abstract class OpTestCase extends LoggingTestCase {
|
||||
List<U> result = new ArrayList<>();
|
||||
test.run(data, LambdaTestHelpers.<U>toBoxingConsumer(result::add), m);
|
||||
|
||||
Runnable asserter = () -> resultAsserter.assertResult(result, refResult, isOrdered, test.isParallel());
|
||||
Runnable asserter = () -> resultAsserter.assertResult(result, refResult, isStreamOrdered && test.isOrdered(), test.isParallel());
|
||||
|
||||
if (refResult.size() > 1000) {
|
||||
LambdaTestHelpers.launderAssertion(
|
||||
@ -406,7 +452,7 @@ public abstract class OpTestCase extends LoggingTestCase {
|
||||
}
|
||||
|
||||
@SuppressWarnings({"rawtypes", "unchecked"})
|
||||
static enum TerminalTestScenario implements BaseTerminalTestScenario {
|
||||
enum TerminalTestScenario implements BaseTerminalTestScenario {
|
||||
SINGLE_SEQUENTIAL(true, false),
|
||||
|
||||
SINGLE_SEQUENTIAL_SHORT_CIRCUIT(true, false) {
|
||||
@ -546,19 +592,19 @@ public abstract class OpTestCase extends LoggingTestCase {
|
||||
}
|
||||
}
|
||||
|
||||
public <T, R> R exerciseTerminalOps(Collection<T> data, Function<Stream<T>, R> m, R expected) {
|
||||
protected <T, R> R exerciseTerminalOps(Collection<T> data, Function<Stream<T>, R> m, R expected) {
|
||||
TestData.OfRef<T> data1
|
||||
= TestData.Factory.ofCollection("Collection of type " + data.getClass().getName(), data);
|
||||
return withData(data1).terminal(m).expectedResult(expected).exercise();
|
||||
}
|
||||
|
||||
public <T, R, S_IN extends BaseStream<T, S_IN>> R
|
||||
protected <T, R, S_IN extends BaseStream<T, S_IN>> R
|
||||
exerciseTerminalOps(TestData<T, S_IN> data,
|
||||
Function<S_IN, R> terminalF) {
|
||||
return withData(data).terminal(terminalF).exercise();
|
||||
}
|
||||
|
||||
public <T, U, R, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>> R
|
||||
protected <T, U, R, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>> R
|
||||
exerciseTerminalOps(TestData<T, S_IN> data,
|
||||
Function<S_IN, S_OUT> streamF,
|
||||
Function<S_OUT, R> terminalF) {
|
||||
|
||||
@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
|
||||
* Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved.
|
||||
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
|
||||
*
|
||||
* This code is free software; you can redistribute it and/or modify it
|
||||
@ -22,7 +22,10 @@
|
||||
*/
|
||||
package java.util.stream;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.EnumSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.Set;
|
||||
import java.util.Spliterator;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
@ -173,8 +176,8 @@ public enum StreamTestScenario implements OpTestCase.BaseStreamTestScenario {
|
||||
}
|
||||
},
|
||||
|
||||
// Wrap as parallel + collect
|
||||
PAR_STREAM_COLLECT(true) {
|
||||
// Wrap as parallel + collect to list
|
||||
PAR_STREAM_COLLECT_TO_LIST(true) {
|
||||
<T, U, S_IN extends BaseStream<T, S_IN>>
|
||||
void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) {
|
||||
for (U u : m.apply(data.parallelStream()).collect(Collectors.toList()))
|
||||
@ -182,8 +185,8 @@ public enum StreamTestScenario implements OpTestCase.BaseStreamTestScenario {
|
||||
}
|
||||
},
|
||||
|
||||
// Wrap sequential as parallel, + collect
|
||||
STREAM_TO_PAR_STREAM_COLLECT(true) {
|
||||
// Wrap sequential as parallel, + collect to list
|
||||
STREAM_TO_PAR_STREAM_COLLECT_TO_LIST(true) {
|
||||
<T, U, S_IN extends BaseStream<T, S_IN>>
|
||||
void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) {
|
||||
for (U u : m.apply(data.stream().parallel()).collect(Collectors.toList()))
|
||||
@ -192,19 +195,56 @@ public enum StreamTestScenario implements OpTestCase.BaseStreamTestScenario {
|
||||
},
|
||||
|
||||
// Wrap parallel as sequential,, + collect
|
||||
PAR_STREAM_TO_STREAM_COLLECT(true) {
|
||||
PAR_STREAM_TO_STREAM_COLLECT_TO_LIST(true) {
|
||||
<T, U, S_IN extends BaseStream<T, S_IN>>
|
||||
void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) {
|
||||
for (U u : m.apply(data.parallelStream().sequential()).collect(Collectors.toList()))
|
||||
b.accept(u);
|
||||
}
|
||||
},
|
||||
|
||||
// Wrap as parallel stream + forEach synchronizing
|
||||
PAR_STREAM_FOR_EACH(true, false) {
|
||||
<T, U, S_IN extends BaseStream<T, S_IN>>
|
||||
void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) {
|
||||
m.apply(data.parallelStream()).forEach(e -> {
|
||||
synchronized (data) {
|
||||
b.accept(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
},
|
||||
|
||||
// Wrap as parallel stream + forEach synchronizing and clear SIZED flag
|
||||
PAR_STREAM_FOR_EACH_CLEAR_SIZED(true, false) {
|
||||
<T, U, S_IN extends BaseStream<T, S_IN>>
|
||||
void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) {
|
||||
S_IN pipe1 = (S_IN) OpTestCase.chain(data.parallelStream(),
|
||||
new FlagDeclaringOp(StreamOpFlag.NOT_SIZED, data.getShape()));
|
||||
m.apply(pipe1).forEach(e -> {
|
||||
synchronized (data) {
|
||||
b.accept(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
},
|
||||
;
|
||||
|
||||
private boolean isParallel;
|
||||
// The set of scenarios that clean the SIZED flag
|
||||
public static final Set<StreamTestScenario> CLEAR_SIZED_SCENARIOS = Collections.unmodifiableSet(
|
||||
EnumSet.of(PAR_STREAM_TO_ARRAY_CLEAR_SIZED, PAR_STREAM_FOR_EACH_CLEAR_SIZED));
|
||||
|
||||
private final boolean isParallel;
|
||||
|
||||
private final boolean isOrdered;
|
||||
|
||||
StreamTestScenario(boolean isParallel) {
|
||||
this(isParallel, true);
|
||||
}
|
||||
|
||||
StreamTestScenario(boolean isParallel, boolean isOrdered) {
|
||||
this.isParallel = isParallel;
|
||||
this.isOrdered = isOrdered;
|
||||
}
|
||||
|
||||
public StreamShape getShape() {
|
||||
@ -215,6 +255,10 @@ public enum StreamTestScenario implements OpTestCase.BaseStreamTestScenario {
|
||||
return isParallel;
|
||||
}
|
||||
|
||||
public boolean isOrdered() {
|
||||
return isOrdered;
|
||||
}
|
||||
|
||||
public <T, U, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>>
|
||||
void run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, S_OUT> m) {
|
||||
_run(data, b, (Function<S_IN, Stream<U>>) m);
|
||||
|
||||
@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
|
||||
* Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved.
|
||||
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
|
||||
*
|
||||
* This code is free software; you can redistribute it and/or modify it
|
||||
@ -112,7 +112,7 @@ public class FlagOpTest extends OpTestCase {
|
||||
FlagDeclaringOp[] opsArray = ops.toArray(new FlagDeclaringOp[ops.size()]);
|
||||
|
||||
withData(data).ops(opsArray).
|
||||
without(StreamTestScenario.PAR_STREAM_TO_ARRAY_CLEAR_SIZED).
|
||||
without(StreamTestScenario.CLEAR_SIZED_SCENARIOS).
|
||||
exercise();
|
||||
}
|
||||
|
||||
@ -152,7 +152,7 @@ public class FlagOpTest extends OpTestCase {
|
||||
|
||||
|
||||
withData(data).ops(opsArray).
|
||||
without(StreamTestScenario.PAR_STREAM_TO_ARRAY_CLEAR_SIZED).
|
||||
without(StreamTestScenario.CLEAR_SIZED_SCENARIOS).
|
||||
exercise();
|
||||
}
|
||||
|
||||
@ -185,7 +185,7 @@ public class FlagOpTest extends OpTestCase {
|
||||
IntermediateTestOp[] opsArray = ops.toArray(new IntermediateTestOp[ops.size()]);
|
||||
|
||||
withData(data).ops(opsArray).
|
||||
without(StreamTestScenario.PAR_STREAM_TO_ARRAY_CLEAR_SIZED).
|
||||
without(StreamTestScenario.CLEAR_SIZED_SCENARIOS).
|
||||
exercise();
|
||||
}
|
||||
|
||||
@ -221,7 +221,7 @@ public class FlagOpTest extends OpTestCase {
|
||||
IntermediateTestOp[] opsArray = ops.toArray(new IntermediateTestOp[ops.size()]);
|
||||
|
||||
withData(data).ops(opsArray).
|
||||
without(StreamTestScenario.PAR_STREAM_TO_ARRAY_CLEAR_SIZED).
|
||||
without(StreamTestScenario.CLEAR_SIZED_SCENARIOS).
|
||||
exercise();
|
||||
}
|
||||
|
||||
|
||||
@ -1,265 +0,0 @@
|
||||
/*
|
||||
* Copyright (c) 2013, 2015, Oracle and/or its affiliates. All rights reserved.
|
||||
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
|
||||
*
|
||||
* This code is free software; you can redistribute it and/or modify it
|
||||
* under the terms of the GNU General Public License version 2 only, as
|
||||
* published by the Free Software Foundation.
|
||||
*
|
||||
* This code is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
|
||||
* version 2 for more details (a copy is included in the LICENSE file that
|
||||
* accompanied this code).
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License version
|
||||
* 2 along with this work; if not, write to the Free Software Foundation,
|
||||
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
|
||||
*
|
||||
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
|
||||
* or visit www.oracle.com if you need additional information or have any
|
||||
* questions.
|
||||
*/
|
||||
package java.util.stream;
|
||||
|
||||
import org.testng.annotations.Test;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.UnaryOperator;
|
||||
|
||||
@Test
|
||||
public class UnorderedTest extends OpTestCase {
|
||||
|
||||
@Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
|
||||
public void testTerminalOps(String name, TestData<Integer, Stream<Integer>> data) {
|
||||
testTerminal(data, s -> { s.forEach(x -> { }); return 0; });
|
||||
|
||||
testTerminal(data, s -> s.findAny(), (a, b) -> assertEquals(a.isPresent(), b.isPresent()));
|
||||
|
||||
testTerminal(data, s -> s.anyMatch(e -> true));
|
||||
}
|
||||
|
||||
|
||||
private <T, R> void testTerminal(TestData<T, Stream<T>> data, Function<Stream<T>, R> terminalF) {
|
||||
testTerminal(data, terminalF, LambdaTestHelpers::assertContentsEqual);
|
||||
}
|
||||
|
||||
static class WrappingUnaryOperator<S> implements UnaryOperator<S> {
|
||||
|
||||
final boolean isLimit;
|
||||
final UnaryOperator<S> uo;
|
||||
|
||||
WrappingUnaryOperator(UnaryOperator<S> uo) {
|
||||
this(uo, false);
|
||||
}
|
||||
|
||||
WrappingUnaryOperator(UnaryOperator<S> uo, boolean isLimit) {
|
||||
this.uo = uo;
|
||||
this.isLimit = isLimit;
|
||||
}
|
||||
|
||||
@Override
|
||||
public S apply(S s) {
|
||||
return uo.apply(s);
|
||||
}
|
||||
}
|
||||
|
||||
static <S> WrappingUnaryOperator<S> wrap(UnaryOperator<S> uo) {
|
||||
return new WrappingUnaryOperator<>(uo);
|
||||
}
|
||||
|
||||
static <S> WrappingUnaryOperator<S> wrap(UnaryOperator<S> uo, boolean isLimit) {
|
||||
return new WrappingUnaryOperator<>(uo, isLimit);
|
||||
}
|
||||
|
||||
@SuppressWarnings("rawtypes")
|
||||
private List permutationOfFunctions =
|
||||
LambdaTestHelpers.perm(Arrays.<WrappingUnaryOperator<Stream<Object>>>asList(
|
||||
wrap(s -> s.sorted()),
|
||||
wrap(s -> s.distinct()),
|
||||
wrap(s -> s.limit(5), true)
|
||||
));
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private <T, R> void testTerminal(TestData<T, Stream<T>> data,
|
||||
Function<Stream<T>, R> terminalF,
|
||||
BiConsumer<R, R> equalityAsserter) {
|
||||
testTerminal(data, terminalF, equalityAsserter, permutationOfFunctions, StreamShape.REFERENCE);
|
||||
}
|
||||
|
||||
//
|
||||
|
||||
@Test(dataProvider = "IntStreamTestData", dataProviderClass = IntStreamTestDataProvider.class)
|
||||
public void testIntTerminalOps(String name, TestData.OfInt data) {
|
||||
testIntTerminal(data, s -> { s.forEach(x -> { }); return 0; });
|
||||
testIntTerminal(data, s -> s.findAny(), (a, b) -> assertEquals(a.isPresent(), b.isPresent()));
|
||||
testIntTerminal(data, s -> s.anyMatch(e -> true));
|
||||
}
|
||||
|
||||
|
||||
private <T, R> void testIntTerminal(TestData.OfInt data, Function<IntStream, R> terminalF) {
|
||||
testIntTerminal(data, terminalF, LambdaTestHelpers::assertContentsEqual);
|
||||
}
|
||||
|
||||
private List<List<WrappingUnaryOperator<IntStream>>> intPermutationOfFunctions =
|
||||
LambdaTestHelpers.perm(Arrays.asList(
|
||||
wrap(s -> s.sorted()),
|
||||
wrap(s -> s.distinct()),
|
||||
wrap(s -> s.limit(5), true)
|
||||
));
|
||||
|
||||
private <R> void testIntTerminal(TestData.OfInt data,
|
||||
Function<IntStream, R> terminalF,
|
||||
BiConsumer<R, R> equalityAsserter) {
|
||||
testTerminal(data, terminalF, equalityAsserter, intPermutationOfFunctions, StreamShape.INT_VALUE);
|
||||
}
|
||||
|
||||
//
|
||||
|
||||
@Test(dataProvider = "LongStreamTestData", dataProviderClass = LongStreamTestDataProvider.class)
|
||||
public void testLongTerminalOps(String name, TestData.OfLong data) {
|
||||
testLongTerminal(data, s -> { s.forEach(x -> { }); return 0; });
|
||||
testLongTerminal(data, s -> s.findAny(), (a, b) -> assertEquals(a.isPresent(), b.isPresent()));
|
||||
testLongTerminal(data, s -> s.anyMatch(e -> true));
|
||||
}
|
||||
|
||||
|
||||
private <T, R> void testLongTerminal(TestData.OfLong data, Function<LongStream, R> terminalF) {
|
||||
testLongTerminal(data, terminalF, LambdaTestHelpers::assertContentsEqual);
|
||||
}
|
||||
|
||||
private List<List<WrappingUnaryOperator<LongStream>>> longPermutationOfFunctions =
|
||||
LambdaTestHelpers.perm(Arrays.asList(
|
||||
wrap(s -> s.sorted()),
|
||||
wrap(s -> s.distinct()),
|
||||
wrap(s -> s.limit(5), true)
|
||||
));
|
||||
|
||||
private <R> void testLongTerminal(TestData.OfLong data,
|
||||
Function<LongStream, R> terminalF,
|
||||
BiConsumer<R, R> equalityAsserter) {
|
||||
testTerminal(data, terminalF, equalityAsserter, longPermutationOfFunctions, StreamShape.LONG_VALUE);
|
||||
}
|
||||
|
||||
//
|
||||
|
||||
@Test(dataProvider = "DoubleStreamTestData", dataProviderClass = DoubleStreamTestDataProvider.class)
|
||||
public void testDoubleTerminalOps(String name, TestData.OfDouble data) {
|
||||
testDoubleTerminal(data, s -> { s.forEach(x -> { }); return 0; });
|
||||
testDoubleTerminal(data, s -> s.findAny(), (a, b) -> assertEquals(a.isPresent(), b.isPresent()));
|
||||
testDoubleTerminal(data, s -> s.anyMatch(e -> true));
|
||||
}
|
||||
|
||||
|
||||
private <T, R> void testDoubleTerminal(TestData.OfDouble data, Function<DoubleStream, R> terminalF) {
|
||||
testDoubleTerminal(data, terminalF, LambdaTestHelpers::assertContentsEqual);
|
||||
}
|
||||
|
||||
private List<List<WrappingUnaryOperator<DoubleStream>>> doublePermutationOfFunctions =
|
||||
LambdaTestHelpers.perm(Arrays.asList(
|
||||
wrap(s -> s.sorted()),
|
||||
wrap(s -> s.distinct()),
|
||||
wrap(s -> s.limit(5), true)
|
||||
));
|
||||
|
||||
private <R> void testDoubleTerminal(TestData.OfDouble data,
|
||||
Function<DoubleStream, R> terminalF,
|
||||
BiConsumer<R, R> equalityAsserter) {
|
||||
testTerminal(data, terminalF, equalityAsserter, doublePermutationOfFunctions, StreamShape.DOUBLE_VALUE);
|
||||
}
|
||||
|
||||
//
|
||||
|
||||
private <T, S extends BaseStream<T, S>, R> void testTerminal(TestData<T, S> data,
|
||||
Function<S, R> terminalF,
|
||||
BiConsumer<R, R> equalityAsserter,
|
||||
List<List<WrappingUnaryOperator<S>>> pFunctions,
|
||||
StreamShape shape) {
|
||||
CheckClearOrderedOp<T> checkClearOrderedOp = new CheckClearOrderedOp<>(shape);
|
||||
for (List<WrappingUnaryOperator<S>> f : pFunctions) {
|
||||
@SuppressWarnings("unchecked")
|
||||
UnaryOperator<S> fi = interpose(f, (S s) -> (S) chain(s, checkClearOrderedOp));
|
||||
withData(data).
|
||||
terminal(fi, terminalF).
|
||||
equalator(equalityAsserter).
|
||||
exercise();
|
||||
}
|
||||
|
||||
CheckSetOrderedOp<T> checkSetOrderedOp = new CheckSetOrderedOp<>(shape);
|
||||
for (List<WrappingUnaryOperator<S>> f : pFunctions) {
|
||||
@SuppressWarnings("unchecked")
|
||||
UnaryOperator<S> fi = interpose(f, (S s) -> (S) chain(s, checkSetOrderedOp));
|
||||
withData(data).
|
||||
terminal(fi, s -> terminalF.apply(s.sequential())).
|
||||
equalator(equalityAsserter).
|
||||
exercise();
|
||||
}
|
||||
}
|
||||
|
||||
static class CheckClearOrderedOp<T> implements StatelessTestOp<T, T> {
|
||||
private final StreamShape shape;
|
||||
|
||||
CheckClearOrderedOp(StreamShape shape) {
|
||||
this.shape = shape;
|
||||
}
|
||||
|
||||
@Override
|
||||
public StreamShape outputShape() {
|
||||
return shape;
|
||||
}
|
||||
|
||||
@Override
|
||||
public StreamShape inputShape() {
|
||||
return shape;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Sink<T> opWrapSink(int flags, boolean parallel, Sink<T> sink) {
|
||||
if (parallel) {
|
||||
assertTrue(StreamOpFlag.ORDERED.isCleared(flags));
|
||||
}
|
||||
|
||||
return sink;
|
||||
}
|
||||
}
|
||||
|
||||
static class CheckSetOrderedOp<T> extends CheckClearOrderedOp<T> {
|
||||
|
||||
CheckSetOrderedOp(StreamShape shape) {
|
||||
super(shape);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Sink<T> opWrapSink(int flags, boolean parallel, Sink<T> sink) {
|
||||
assertTrue(StreamOpFlag.ORDERED.isKnown(flags) || StreamOpFlag.ORDERED.isPreserved(flags));
|
||||
|
||||
return sink;
|
||||
}
|
||||
}
|
||||
|
||||
private <T, S extends BaseStream<T, S>>
|
||||
UnaryOperator<S> interpose(List<WrappingUnaryOperator<S>> fs, UnaryOperator<S> fi) {
|
||||
int l = -1;
|
||||
for (int i = 0; i < fs.size(); i++) {
|
||||
if (fs.get(i).isLimit) {
|
||||
l = i;
|
||||
}
|
||||
}
|
||||
|
||||
final int lastLimitIndex = l;
|
||||
return s -> {
|
||||
if (lastLimitIndex == -1 && fs.size() > 0)
|
||||
s = fi.apply(s);
|
||||
for (int i = 0; i < fs.size(); i++) {
|
||||
s = fs.get(i).apply(s);
|
||||
if (i >= lastLimitIndex) {
|
||||
s = fi.apply(s);
|
||||
}
|
||||
}
|
||||
return s;
|
||||
};
|
||||
}
|
||||
}
|
||||
@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright (c) 2013, Oracle and/or its affiliates. All rights reserved.
|
||||
* Copyright (c) 2013, 2015, Oracle and/or its affiliates. All rights reserved.
|
||||
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
|
||||
*
|
||||
* This code is free software; you can redistribute it and/or modify it
|
||||
@ -192,7 +192,7 @@ public class SplittableRandomTest extends OpTestCase {
|
||||
public void testInts(TestData.OfInt data, ResultAsserter<Iterable<Integer>> ra) {
|
||||
withData(data).
|
||||
stream(s -> s).
|
||||
without(IntStreamTestScenario.PAR_STREAM_TO_ARRAY_CLEAR_SIZED).
|
||||
without(IntStreamTestScenario.CLEAR_SIZED_SCENARIOS).
|
||||
resultAsserter(ra).
|
||||
exercise();
|
||||
}
|
||||
@ -276,7 +276,7 @@ public class SplittableRandomTest extends OpTestCase {
|
||||
public void testLongs(TestData.OfLong data, ResultAsserter<Iterable<Long>> ra) {
|
||||
withData(data).
|
||||
stream(s -> s).
|
||||
without(LongStreamTestScenario.PAR_STREAM_TO_ARRAY_CLEAR_SIZED).
|
||||
without(LongStreamTestScenario.CLEAR_SIZED_SCENARIOS).
|
||||
resultAsserter(ra).
|
||||
exercise();
|
||||
}
|
||||
@ -360,7 +360,7 @@ public class SplittableRandomTest extends OpTestCase {
|
||||
public void testDoubles(TestData.OfDouble data, ResultAsserter<Iterable<Double>> ra) {
|
||||
withData(data).
|
||||
stream(s -> s).
|
||||
without(DoubleStreamTestScenario.PAR_STREAM_TO_ARRAY_CLEAR_SIZED).
|
||||
without(DoubleStreamTestScenario.CLEAR_SIZED_SCENARIOS).
|
||||
resultAsserter(ra).
|
||||
exercise();
|
||||
}
|
||||
|
||||
@ -32,7 +32,16 @@ import java.util.Optional;
|
||||
import java.util.Spliterator;
|
||||
import java.util.Spliterators;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import java.util.stream.*;
|
||||
import java.util.stream.CollectorOps;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.DoubleStream;
|
||||
import java.util.stream.IntStream;
|
||||
import java.util.stream.LongStream;
|
||||
import java.util.stream.OpTestCase;
|
||||
import java.util.stream.Stream;
|
||||
import java.util.stream.StreamSupport;
|
||||
import java.util.stream.StreamTestDataProvider;
|
||||
import java.util.stream.TestData;
|
||||
|
||||
import static java.util.stream.LambdaTestHelpers.*;
|
||||
|
||||
@ -67,7 +76,12 @@ public class DistinctOpTest extends OpTestCase {
|
||||
|
||||
@Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
|
||||
public void testOp(String name, TestData.OfRef<Integer> data) {
|
||||
Collection<Integer> result = exerciseOpsInt(data, Stream::distinct, IntStream::distinct, LongStream::distinct, DoubleStream::distinct);
|
||||
Collection<Integer> result = exerciseOpsInt(
|
||||
data,
|
||||
Stream::distinct,
|
||||
IntStream::distinct,
|
||||
LongStream::distinct,
|
||||
DoubleStream::distinct);
|
||||
|
||||
assertUnique(result);
|
||||
assertTrue((data.size() > 0) ? result.size() > 0 : result.size() == 0);
|
||||
@ -127,9 +141,13 @@ public class DistinctOpTest extends OpTestCase {
|
||||
|
||||
@Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
|
||||
public void testDistinctDistinct(String name, TestData.OfRef<Integer> data) {
|
||||
Collection<Integer> result = withData(data)
|
||||
.stream(s -> s.distinct().distinct())
|
||||
.exercise();
|
||||
Collection<Integer> result = exerciseOpsInt(
|
||||
data,
|
||||
s -> s.distinct().distinct(),
|
||||
s -> s.distinct().distinct(),
|
||||
s -> s.distinct().distinct(),
|
||||
s -> s.distinct().distinct());
|
||||
|
||||
assertUnique(result);
|
||||
}
|
||||
|
||||
@ -152,4 +170,31 @@ public class DistinctOpTest extends OpTestCase {
|
||||
assertUnique(result);
|
||||
assertSorted(result);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStable() {
|
||||
// Create N instances of Integer all with the same value
|
||||
List<Integer> input = IntStream.rangeClosed(0, 1000)
|
||||
.mapToObj(i -> new Integer(1000)) // explicit construction
|
||||
.collect(Collectors.toList());
|
||||
Integer expectedElement = input.get(0);
|
||||
TestData<Integer, Stream<Integer>> data = TestData.Factory.ofCollection(
|
||||
"1000 instances of Integer with the same value", input);
|
||||
|
||||
withData(data)
|
||||
.stream(Stream::distinct)
|
||||
.resultAsserter((actual, expected, isOrdered, isParallel) -> {
|
||||
List<Integer> l = new ArrayList<>();
|
||||
actual.forEach(l::add);
|
||||
|
||||
// Assert stability
|
||||
// The single result element should be equal in identity to
|
||||
// the first input element
|
||||
assertEquals(l.size(), 1);
|
||||
assertEquals(System.identityHashCode(l.get(0)),
|
||||
System.identityHashCode(expectedElement));
|
||||
|
||||
})
|
||||
.exercise();
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
|
||||
* Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved.
|
||||
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
|
||||
*
|
||||
* This code is free software; you can redistribute it and/or modify it
|
||||
@ -181,7 +181,7 @@ public class InfiniteStreamWithLimitOpTest extends OpTestCase {
|
||||
// slice implementations
|
||||
withData(refLongs()).
|
||||
stream(s -> fs.apply(s)).
|
||||
without(StreamTestScenario.PAR_STREAM_TO_ARRAY_CLEAR_SIZED).
|
||||
without(StreamTestScenario.CLEAR_SIZED_SCENARIOS).
|
||||
exercise();
|
||||
}
|
||||
|
||||
@ -192,7 +192,7 @@ public class InfiniteStreamWithLimitOpTest extends OpTestCase {
|
||||
// slice implementations
|
||||
withData(ints()).
|
||||
stream(s -> fs.apply(s)).
|
||||
without(IntStreamTestScenario.PAR_STREAM_TO_ARRAY_CLEAR_SIZED).
|
||||
without(IntStreamTestScenario.CLEAR_SIZED_SCENARIOS).
|
||||
exercise();
|
||||
}
|
||||
|
||||
@ -203,7 +203,7 @@ public class InfiniteStreamWithLimitOpTest extends OpTestCase {
|
||||
// slice implementations
|
||||
withData(longs()).
|
||||
stream(s -> fs.apply(s)).
|
||||
without(LongStreamTestScenario.PAR_STREAM_TO_ARRAY_CLEAR_SIZED).
|
||||
without(LongStreamTestScenario.CLEAR_SIZED_SCENARIOS).
|
||||
exercise();
|
||||
}
|
||||
|
||||
@ -214,7 +214,7 @@ public class InfiniteStreamWithLimitOpTest extends OpTestCase {
|
||||
// slice implementations
|
||||
withData(doubles()).
|
||||
stream(s -> fs.apply(s)).
|
||||
without(DoubleStreamTestScenario.PAR_STREAM_TO_ARRAY_CLEAR_SIZED).
|
||||
without(DoubleStreamTestScenario.CLEAR_SIZED_SCENARIOS).
|
||||
exercise();
|
||||
}
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user