8148250: Stream.limit() parallel tasks with ordered non-SUBSIZED source should short-circuit

Reviewed-by: psandoz
This commit is contained in:
Tagir F. Valeev 2016-02-08 10:37:37 +01:00
parent fead4851e1
commit 9e65963f4f
2 changed files with 27 additions and 5 deletions

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2012, 2016, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -139,7 +139,7 @@ final class SliceOps {
}
else {
// @@@ OOMEs will occur for LongStream.range(0, Long.MAX_VALUE).filter(i -> true).limit(n)
// regardless of the value of n
// when n * parallelismLevel is sufficiently large.
// Need to adjust the target size of splitting for the
// SliceTask from say (size / k) to say min(size / k, 1 << 14)
// This will limit the size of the buffers created at the leaf nodes
@ -604,8 +604,15 @@ final class SliceOps {
return nb.build();
}
else {
Node<P_OUT> node = helper.wrapAndCopyInto(helper.makeNodeBuilder(-1, generator),
spliterator).build();
final Node.Builder<P_OUT> nb = op.makeNodeBuilder(-1, generator);
if (targetOffset == 0) { // limit only
Sink<P_OUT> opSink = op.opWrapSink(helper.getStreamAndOpFlags(), nb);
helper.copyIntoWithCancel(helper.wrapSink(opSink), spliterator);
}
else {
helper.wrapAndCopyInto(nb, spliterator);
}
Node<P_OUT> node = nb.build();
thisNodeSize = node.count();
completed = true;
spliterator = null;

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2012, 2016, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -20,6 +20,12 @@
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
/*
* @test
* @bug 8148250
*/
package org.openjdk.tests.java.util.stream;
import org.testng.annotations.Test;
@ -341,4 +347,13 @@ public class SliceOpTest extends OpTestCase {
return Arrays.asList(0, 1, size / 2, size - 1, size, size + 1, 2 * size);
}
}
public void testLimitParallelHugeInput() {
for (int n : new int[] {10, 100, 1000, 10000}) {
long[] actual = LongStream.range(0, Long.MAX_VALUE)
.parallel().filter(x -> true) // remove SIZED
.limit(n).toArray();
assertEquals(LongStream.range(0, n).toArray(), actual);
}
}
}