This post is in continuation with my earlier posts on Streams. In this post we will discuss about aggregate operations on Streams.
Aggregate operations on Streams
You can perform intermediate and terminal operations on Streams. Intermediate operations result in a new stream and are lazily evaluated and will start when terminal operation is called.
persons.stream().filter(p -> p.getGender() == Gender.MALE).forEach(System.out::println);
In the snippet above, filter()
doesn't start filtering immediately but create a new stream. It will only start when terminal operation is called and in above case when forEach()
.
Intermediate operations
There are many intermediate operations that you can perform on Streams. Some of them are filter()
, distinct()
, sorted()
, limit()
, parallel()
, sequential
, map()
, flatMap
.
filter() operation
This takes Predicate
functional interface as argument and the output stream of this operation will have only those elements which pass the conditional check of Predicate. You can learn a nice explanation on Predicates here.
// all the males List<Person> allMales = persons.stream().filter(p -> p.getGender() == Gender.MALE).collect(Collectors.toList()); System.out.println(allMales);
map() operation
It is a mapper operation. It expects Function
functional interface as argument. Purpose of Function
is to transform from one type to other (The other type could be same).
// first names of all the persons List<String> firstNames = persons.stream().map(Person::getFirstName).collect(Collectors.toList()); System.out.println(firstNames);
distinct()
It returns the unique elements and uses equals()
under the hood to remove duplicates.
List<String> uniqueFirstNames = persons.stream().map(Person::getFirstName).distinct().collect(Collectors.toList()); System.out.println(uniqueFirstNames);
sorted()
Sorts the stream elements. It is stateful operation.
List<Person> sortedByAge = persons.stream().sorted(Comparator.comparingInt(Person::getAge)).collect(Collectors.toList()); System.out.println(sortedByAge);
limit()
will reduce the number of records. It is helpful to end infinite streams in a finite manner.
Intemediate operations can be divided to two parts stateless and stateful. Most of the streams intermediate operations are stateless e.g. map
, filter
, limit
etc. but some of them are stateful e.g. distinct
and sorted
because they have to maintain the state of previously visited element.
Terminal/ Reduction operations
There are many terminal operations such as forEach()
, reduction()
, max()
, min()
, average()
, collect()
, findAny
, findFirst()
, allMatch()
, noneMatch()
.
forEach()
This takes Consumer
functional interface as parameter and pass on the element for consumption.
persons.stream().forEach(System.out::println);
max(), min(), average() operations
average()
returns OptionalDouble
whereas max()
and min()
return OptionalInt
.
//average age of all persons persons.stream().mapToInt(Person::getAge).average().ifPresent(System.out::println); // max age from all persons persons.stream().mapToInt(Person::getAge).max().ifPresent(System.out::println); // min age from all persons persons.stream().mapToInt(Person::getAge).min().ifPresent(System.out::println);
noneMatch(), allMatch(), anyMatch()
matches if certain condition satisfies by none, all and/or any elements of stream respectively.
//age of all females in the group is less than 22 persons.stream().filter(p -> p.getGender() == Gender.FEMALE).allMatch(p -> p.getAge() < 22); //not a single male's age is greater than 30 persons.stream().filter(p -> p.getGender() == Gender.MALE).noneMatch(p -> p.getAge() > 30); persons.stream().anyMatch(p -> p.getAge() > 45);
Reduction operations
Reduction operations are those which provide single value as result. We have seen in previous snippet some of the reduction operation which do this. E.g. max()
, min()
, average()
, sum()
etc. Apart from this, Java 8 provides two more general purpose operations reduce()
and collect()
.
reduce()
int sumOfFirst10 = IntStream.range(1, 10).reduce(0, Integer::sum); System.out.println(sumOfFirst10);
collect()
It is a mutating reduction. Collectors
has many useful collection methods like toList()
, groupingBy()
,
Collection<Person> persons = StreamSamples.getPersons(); ListfirstNameOfPersons = persons.stream().map(Person::getFirstName).collect(Collectors.toList()); System.out.println(firstNameOfPersons); Map<Integer, List<Person>> personByAge = persons.stream().collect(Collectors.groupingBy(Person::getAge)); System.out.println(personByAge); Double averageAge = persons.stream().collect(Collectors.averagingInt(Person::getAge)); System.out.println(averageAge); Long totalPersons = persons.stream().collect(Collectors.counting()); System.out.println(totalPersons); IntSummaryStatistics personsAgeSummary = persons.stream().collect(Collectors.summarizingInt(Person::getAge)); System.out.println(personsAgeSummary); String allPersonsFirstName = persons.stream().collect(Collectors.mapping(Person::getFirstName, Collectors.joining("#"))); System.out.println(allPersonsFirstName);
The result would look like this.
[Gaurav, Gaurav, Sandeep, Rami, Jiya, Rajesh, Rampal, Nisha, Neha, Ramesh, Parul, Sunil, Prekha, Neeraj] {32=[Person [firstName=Rami, lastName=Aggarwal, gender=FEMALE, age=32, salary=12000]], 35=[Person [firstName=Rampal, lastName=Yadav, gender=MALE, age=35, salary=12000]], 20=[Person [firstName=Prekha, lastName=Verma, gender=FEMALE, age=20, salary=3600]], 21=[Person [firstName=Neha, lastName=Kapoor, gender=FEMALE, age=21, salary=5500]], 22=[Person [firstName=Jiya, lastName=Khan, gender=FEMALE, age=22, salary=4500], Person [firstName=Ramesh, lastName=Chander, gender=MALE, age=22, salary=2500]], 24=[Person [firstName=Sandeep, lastName=Shukla, gender=MALE, age=24, salary=5000]], 25=[Person [firstName=Parul, lastName=Mehta, gender=FEMALE, age=25, salary=8500], Person [firstName=Neeraj, lastName=Shah, gender=MALE, age=25, salary=33000]], 26=[Person [firstName=Nisha, lastName=Sharma, gender=FEMALE, age=26, salary=10000]], 27=[Person [firstName=Sunil, lastName=Kumar, gender=MALE, age=27, salary=6875]], 28=[Person [firstName=Gaurav, lastName=Mazra, gender=MALE, age=28, salary=10000], Person [firstName=Gaurav, lastName=Mazra, gender=MALE, age=28, salary=10000]], 45=[Person [firstName=Rajesh, lastName=Kumar, gender=MALE, age=45, salary=55000]]} 27.142857142857142 14 IntSummaryStatistics{count=14, sum=380, min=20, average=27.142857, max=45} Gaurav#Gaurav#Sandeep#Rami#Jiya#Rajesh#Rampal#Nisha#Neha#Ramesh#Parul#Sunil#Prekha#Neeraj
You can't consume same Streams twice
When the terminal operation is completed on stream, it is considered consumed and you can't use it again. You will end up with exception if you try to start new operations on already consumed stream.
Stream<String> stream = lines.stream(); stream.reduce((a, b) -> a.length() > b.length() ? a : b).ifPresent(System.out::println); // below line will throw the exception stream.forEach(System.out::println);
Exception in thread "main" java.lang.IllegalStateException: stream has already been operated upon or closed at java.util.stream.AbstractPipeline.sourceStageSpliterator(AbstractPipeline.java:279) at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580) at com.gauravbytes.java8.stream.StreamExceptionExample.main(StreamExceptionExample.java:18)
Parallelism
Streams provide a convenient way to execute operations in parallel. It uses ForkJoinPool
under the hood to run stream operations in parallel. You can use parallelStream()
or parallel()
on already created stream to perform task parallelly. One thing to note parallelism is not automatically faster than running task in serial unless you have enough data and processor cores.
persons.parallelStream().filter(p -> p.getAge() > 30).collect(Collectors.toList());
java.util.concurrent.ForkJoinPool.common.parallelism
property while JVM startup to increase parallelism in fork-join pool.
Concurrent reductions
ConcurrentMap<Integer, List<Person>> personByAgeConcurrent = persons.stream().collect(Collectors.groupingByConcurrent(Person::getAge)); System.out.println(personByAgeConcurrent);
Side effects
If the function is doing more than consuming and/ or returning value, like modifying the state is said to have side-effects. A common example of side-effect is forEach()
, mutable reduction using collect()
. Java handles side-effects in collect()
in thread-safe manner.
Interference
You should avoid interference in your lambdas/ functions. It occurs when you modify the underlying collection while running pipeline operations.
Stateful Lambda expressions
A lambda expression is stateful if its result depends on any state which can alter/ change during execution. Avoid using stateful lambdas expressions. You can read more here.
I hope you find this post informative and helpful. You can find the example code for reduction, aggregate operation and stream creation on Github.
What is the difference between,
ReplyDeletea collection.forEach and a collection.stream.forEach. I think in your example collection.stream.forEach is bit redundant, you could directly use collection.forEach instead.