From 9e65963f4fa11c83101509ca14e11751cd2151f8 Mon Sep 17 00:00:00 2001 From: "Tagir F. Valeev" Date: Mon, 8 Feb 2016 10:37:37 +0100 Subject: [PATCH] 8148250: Stream.limit() parallel tasks with ordered non-SUBSIZED source should short-circuit Reviewed-by: psandoz --- .../classes/java/util/stream/SliceOps.java | 15 +++++++++++---- .../tests/java/util/stream/SliceOpTest.java | 17 ++++++++++++++++- 2 files changed, 27 insertions(+), 5 deletions(-) diff --git a/jdk/src/java.base/share/classes/java/util/stream/SliceOps.java b/jdk/src/java.base/share/classes/java/util/stream/SliceOps.java index 68a84f8512b..e771b1f00f6 100644 --- a/jdk/src/java.base/share/classes/java/util/stream/SliceOps.java +++ b/jdk/src/java.base/share/classes/java/util/stream/SliceOps.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2012, 2016, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -139,7 +139,7 @@ final class SliceOps { } else { // @@@ OOMEs will occur for LongStream.range(0, Long.MAX_VALUE).filter(i -> true).limit(n) - // regardless of the value of n + // when n * parallelismLevel is sufficiently large. // Need to adjust the target size of splitting for the // SliceTask from say (size / k) to say min(size / k, 1 << 14) // This will limit the size of the buffers created at the leaf nodes @@ -604,8 +604,15 @@ final class SliceOps { return nb.build(); } else { - Node node = helper.wrapAndCopyInto(helper.makeNodeBuilder(-1, generator), - spliterator).build(); + final Node.Builder nb = op.makeNodeBuilder(-1, generator); + if (targetOffset == 0) { // limit only + Sink opSink = op.opWrapSink(helper.getStreamAndOpFlags(), nb); + helper.copyIntoWithCancel(helper.wrapSink(opSink), spliterator); + } + else { + helper.wrapAndCopyInto(nb, spliterator); + } + Node node = nb.build(); thisNodeSize = node.count(); completed = true; spliterator = null; diff --git a/jdk/test/java/util/stream/test/org/openjdk/tests/java/util/stream/SliceOpTest.java b/jdk/test/java/util/stream/test/org/openjdk/tests/java/util/stream/SliceOpTest.java index 7fe0006dc1b..c43b3deed68 100644 --- a/jdk/test/java/util/stream/test/org/openjdk/tests/java/util/stream/SliceOpTest.java +++ b/jdk/test/java/util/stream/test/org/openjdk/tests/java/util/stream/SliceOpTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2012, 2016, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -20,6 +20,12 @@ * or visit www.oracle.com if you need additional information or have any * questions. */ + +/* + * @test + * @bug 8148250 + */ + package org.openjdk.tests.java.util.stream; import org.testng.annotations.Test; @@ -341,4 +347,13 @@ public class SliceOpTest extends OpTestCase { return Arrays.asList(0, 1, size / 2, size - 1, size, size + 1, 2 * size); } } + + public void testLimitParallelHugeInput() { + for (int n : new int[] {10, 100, 1000, 10000}) { + long[] actual = LongStream.range(0, Long.MAX_VALUE) + .parallel().filter(x -> true) // remove SIZED + .limit(n).toArray(); + assertEquals(LongStream.range(0, n).toArray(), actual); + } + } }