Java 8 Parallel Streams

30 September 2014
By Gonçalo Marques
 
java java8
In this article we will cover Java 8 Parallel Streams.

Introduction

As we have seen previously in Java 8 Streams, Java 8 introduced streams as way to perform functional operations over collections of elements. In this article we will see how we may take advantage of the parallel streams feature in order to implement parallel stream processing.

The well known Collection Framework provides wrappers that add synchronization capabilities to Java collections (by default, Java collections are not thread-safe). The problem with synchronized collections is that they will introduce thread contention: Threads will race to gather access to resources, and may have to wait before being able to proceed, while other threads may get in the way.

Parallel streams provide the capability of parallel processing over collections that are not thread-safe. It is although required that one does not modify the collection during the parallel processing.

Note: There are some scenarios where parallel processing does not necessarily means faster. The allocation of resources to multiple threads and the final step of combining the results also introduces a certain degree of overhead.

Parallel Streams

In order to create a parallel processing stream one may invoke the method Collection.parallelStream() against a collection:

Parallel Stream

List<Integer> integerList = Arrays.asList(new Integer[] { 1, 2, 3, 4, 5, 6, 7, 8, 9 });
int sum = integerList
  .parallelStream()
  .filter(i -> i % 2 == 0)
  .mapToInt(i -> i)
  .sum();

// Will print 20
System.out.println(sum);

Keep in mind that it is the JVM that partitions the stream into multiple sub-streams, and additionally picks the order from which the elements will be processed. One should not expect the parallel stream to process the elements in the order they are defined in the original collection (although one may force the processing to be ordered as we will see later in this article).

Reduction

Let's perform a reduction in order to convert a Collection into a Map with a couple of entries: one containing a list of even numbers and the other containing a list of odd numbers:

Reduction

List<Integer> integerList = Arrays.asList(new Integer[] { 1, 2, 3, 4, 5, 6, 7, 8, 9 });
Map<Integer, List<Integer>> evenOddMap = integerList
    .stream().collect(Collectors.groupingBy(i -> i % 2 == 0 ? 0 : 1));

// Will print 2, 4, 6, 8
System.out.println(evenOddMap.get(0));

// Will print 1, 3, 5, 7, 9
System.out.println(evenOddMap.get(1));

The following reduction is the equivalent parallel reduction:

Parallel Reduction

ConcurrentMap<Integer, List<Integer>> concurrentEvenOddMap = integerList
    .parallelStream().collect(Collectors.groupingByConcurrent(i -> i % 2 == 0 ? 0 : 1));
System.out.println(concurrentEvenOddMap.get(0));
System.out.println(concurrentEvenOddMap.get(1));

The results produced by the parallel processing will be unordered. One may be assured that each map will contain the expected even and odd numbers respectively, but the order will not be the same for every execution.

Subsequent executions of the above Parallel Reduction

[4, 2, 8, 6]
[3, 1, 9, 7, 5]

[6, 8, 4, 2]
[5, 9, 7, 3, 1]

[4, 2, 8, 6]
[3, 1, 9, 7, 5]

As we have said earlier, it is the JVM that will split the stream into multiple sub-streams and pick the element processing order.

Note that we have used the concurrent counterparts in the reduction: ConcurrentMap as a placeholder since multiple threads will access the map in order to put the elements as they are processed. Collectors.groupingByConcurrent() in order to process the concurrent grouping operation. It will perform much faster in parallel processing than its serial counterpart.

Ordering

As we have seen earlier, the order of processing in parallel streams in undefined. This is because the JVM will split the stream into multiple sub-streams and pick the elements for processing in an optimized order for the current parallel processing.

Parallel stream processing order

List<Integer> integerList = Arrays.asList(new Integer[] { 1, 2, 3, 4, 5, 6, 7, 8, 9 });
integerList.parallelStream().forEach(i -> System.out.print(i + " "));

Subsequent executions of the above parallel processing

5 6 8 9 7 1 2 3 4

3 4 5 6 1 2 8 9 7

5 6 3 4 1 2 8 7 9

One may force the parallel processing to be ordered by invoking forEachOrdered, but the benefits inherent to parallel processing may be lost:

Force parallel stream processing to be ordered

List<Integer> integerList = Arrays.asList(new Integer[] { 1, 2, 3, 4, 5, 6, 7, 8, 9 });

// Will always print 1 2 3 4 5 6 7 8 9
integerList.parallelStream().forEachOrdered(i -> System.out.print(i + " "));

Additional considerations

Keep in mind that the processing will only take place when the last (or terminal) instruction of the pipeline is invoked. Let«s see an example:

int sum = integerList
  .parallelStream()
  .filter(i -> i % 2 == 0)
  .mapToInt(i -> i)
  .sum();

The processing will only be started when sum() instruction is invoked. The JVM will then select the appropriate order to pass the elements into the intermediate actions, in order to compute the final result. This is a very important concept, as we will see in the next example:

Parallel processing using a stateful lambda expression

List<Integer> integerList = Arrays.asList(new Integer[] { 1, 2, 3, 4, 5, 6, 7, 8, 9 });
List<Integer> resultingIntegerList = Collections.synchronizedList(new ArrayList<>());
integerList.parallelStream().map(i -> {
  resultingIntegerList.add(i);
  return i;
}).forEachOrdered(i -> System.out.print(i + " "));



The forEachOrdered operation will guarantee that the element processing is in fact ordered, but we may not assume that the elements will be passed to the map method in the same order as they were picked up for processing. This is because the map operation will be executed simultaneously my multiple threads.

The above execution produces always the following result:


1 2 3 4 5 6 7 8 9

But the order in which the elements are stored in resultingIntegerList will not be the same accross multiple executions. This is because the map operation will be executed by multiple threads concurrently, event if the elements processing is initialized in a given order. It is not guaranteed that the initial processing order will be preserved during the intermediate instructions pipeline.

If we inspect the contents of resultingIntegerList after multiple independent invocations we will confirm that the order of elements will change across executions:


[5, 6, 8, 9, 7, 1, 3, 4, 2]

[3, 5, 6, 4, 1, 8, 9, 7, 2]

[5, 3, 6, 4, 1, 8, 9, 7, 2]

Reference

Parallelism (The Java Tutorials - Collections - Aggregate Operations)

Related Articles

Comments

About the author
Gonçalo Marques is a Software Engineer with several years of experience in software development and architecture definition. During this period his main focus was delivering software solutions in banking, telecommunications and governmental areas. He created the Bytes Lounge website with one ultimate goal: share his knowledge with the software development community. His main area of expertise is Java and open source.

GitHub profile: https://github.com/gonmarques

He is also the author of the WiFi File Browser Android application: