Stream Optimization in Java-8

Reading Time: 4 minutes

In java-8 streams are introduced to handle huge data so we can process the data efficiently. Streams are lazy and its do not process elements until a terminal condition is reached. After a terminal condition is met then each element is processed individually. If there is a short-circuiting operation at the end, the stream processing will terminate whenever all the conditions are satisfied.

The new java.util.stream.Stream interface in Java 8 provides several static methods for creating streams. Specifically, you can use the static methods Stream.of, Stream.iterate, and Stream.generate. The Stream.of method takes a variable argument list of elements:

here are the methods to create streams:

Stream.of(T t)

Arrays.stream(T[] array), with overloads for int[], double[], and long[]

Stream.iterate(T t, UnaryOperator f)

Stream.generate(Supplier s)

Collection.stream()

• Using range and rangeClosed:

IntStream.range(int startInclusive, int endExclusive)

IntStream.rangeClosed(int startInclusive, int endInclusive)

LongStream.range(long startInclusive, long endExclusive)

LongStream.rangeClosed(long startInclusive, long endInclusive)

For example, consider taking a range of numbers between 100 and 200, doubling each of them, and then finding the first the value that is evenly divisible by three, as in

Example. First integer between 200 and 400 divisible by 3


OptionalInt firstEvenIntegerDivBy3 = IntStream.range(100, 200)
.map(n -> n * 2)
.filter(n -> n % 3 == 0)
.findFirst();
System.out.println(firstEvenIntegerDivBy3);

Output Optional[204]


If we use for loop we will wasted lots of operations
• The range of numbers from 100 to 199 is created (100 operations)
• Each number is doubled (100 operations)
• Each number is checked for divisibility (100 operations)
• The first element of the resulting stream is returned (1 operation)
Since the first value that satisfies the stream requirements is 204, why process all the other numbers?

Fortunately, stream processing doesn’t work that way. Streams are lazy, and in that no work is done until the terminal condition is reached, and then each element is processed through the pipeline individually. Lets demonstrate this, with the help of example lets take the same code, but refactored to show each element as it passes through the pipeline.

public int multByTwo(int n) {
System.out.printf("Inside multByTwo with arg %d%n", n);
return n * 2;
}
public boolean divByThree(int n) {
System.out.printf("Inside divByThree with arg %d%n", n);
return n % 3 == 0;
}
firstEvenDoubleDivBy3 = IntStream.range(100, 200)
.map(this::multByTwo)
.filter(this::divByThree)
.findFirst();

The output this time is:
Inside multByTwo with arg 100
Inside divByThree with arg 200
Inside multByTwo with arg 101
Inside divByThree with arg 202
Inside multByTwo with arg 102
Inside divByThree with arg 204
First even divisible by 3 is Optional[204]


The value 100 goes through the map to produce 200, but does not pass the filter, so the stream moves to the value 101. That is mapped to 202, which also doesn’t pass the filter. Then the next value, 102, is mapped to 204, but that is divisible by 3, so it passes. The stream processing terminates after processing only three values, using six operations.

Concurrency and Parallelism

• Concurrency is when multiple tasks can run in overlapping time periods
• Parallelism is when multiple tasks run at literally the same time
Concurrency is the ability to decompose your problem into independent operations that can run simultaneously, even if they aren’t doing so at the moment. A concurrent application is composed of independently executing processes. You can then implement the concurrent tasks in parallel, which may or may not improve performance, assuming you have multiple processing units. Parallelization in Java by default splits work into multiple sections, assigning each to the
common fork-join pool, executing them, and joining the results together.

Parallel Stream

Moving to parallel streams is an optimization. By default, Java 8 parallel streams use a common fork-join pool to distribute the work. The size of that pool is equal to the number of processors, which you can determine via Runtime.getRuntime().availableProcessors(). Managing the fork-join pool requires overhead, both in dividing the work into individual segments and in combining the individual results back into a final answer.

List<Integer> numbers = Arrays.asList(3, 1, 4, 1, 5, 9);

List<Integer> nums = numbers.parallelStream()

.map(n -> n * 2)

.peek(n -> System.out.printf("%s processing %d%n", Thread.currentThread().getName(), n))

.sequential()

.sorted()

.collect(Collectors.toList());

The output is:
main processing 6
main processing 2
main processing 8
main processing 2
main processing 10
main processing 18


The idea here is that you want to double all the numbers and then sort them. Since the doubling function is stateless and associative, there’s no reason not to do it parallel. Sorting, however, is inherently sequential. The peek method is used to show the name of the thread doing the processing, and in the example peek is invoked after the call to parallel stream. The main thread did all the processing. In other words, the stream is sequential, despite the call to parallel stream. Why is that? Remember that with streams, no processing is done until the terminal expression is reached, so it’s at the state of the stream is evaluated at every instance. Since after the sequential call the data is processed in sequential stream.

The ForkJoinPool class has a constructor that takes an integer representing the degree of parallelism. You can therefore create your own pool, separate from the common pool, and submit your jobs to that pool instead. The code in Example uses this mechanism to create its own pool.

ForkJoinPool ForkJoinPool pool = new ForkJoinPool(15);

ForkJoinTask task = pool.submit(

() -> LongStream.rangeClosed(1, 3000000)

.parallel() .sum());

try {

total = task.get();

} catch (InterruptedException | ExecutionException e)

{e.printStackTrace(); }

finally { pool.shutdown();

} poolSize = pool.getPoolSize();

System.out.println("Pool size: " + poolSize);

Instantiate a ForkJoinPool of size 15 Submit a Callable as the job Execute the job and wait for a reply Prints Pool size: 15

Written by 

Hi, I am Sakshi I have done B.Tech in Computer Science. I like to share my knowledge in the form of these Blogs and try hards to explain every concept with ease.

Discover more from Knoldus Blogs

Subscribe now to keep reading and get access to the full archive.

Continue reading