Core Java. Lecture #9

Streams API, Optionals

Ivan Ponomarev, Synthesized.io/MIPT

Streams

  • Appeared in Java8, along with lambdas & method references.

  • Process finite and potentially infinite data sets.

  • Declarative approach to data processing: we describe what we want to get, not how we will get it.

  • Transparent parallelism.

Convert stream to stream

blockStream

map

squashedStream

List<Block> blocks = ...;

Stream<Block> blocksStream = blocks.stream();

Stream<SquashedBlock> squashedStream =
  blocksStream.map(Block::squash);

(The author of the animations is Tagir Valeev, see moving pictures here)

Filtering

squashedStream

filter

filteredStream

Stream<SquashedBlock> filteredStream =
  squashedStream.filter(block >
         block.getColor() != YELLOW);

Display in the console (terminal operation)

filteredStream

display
filteredStream
  .forEach(System.out::println);

All together in one line

fuse
blocks.stream()
      .map(Block::squash)
      .filter(block >
         block.getColor() != YELLOW)
      .forEach(System.out::println);

Does it resemble something?

"Merge two files, convert lines to lowercase, sort, display the last three lines in alphabetical order"

cat file1 file2 | tr "[A-Z]" "[a-z]" | sort | tail -3

Why?

Java 7

Java 8

Map<Currency, List<Transaction>>
 transactionsByCurrencies = new HashMap<>();

for (Transaction transaction : transactions) {
 Currency currency =transaction.getCurrency();
 List<Transaction> transactionsForCurrency =
   transactionsByCurrencies.get(currency);
 if (transactionsForCurrency == null) {
  transactionsForCurrency = new ArrayList<>();
    transactionsByCurrencies.put(currency,
           transactionsForCurrency);
 }
 transactionsForCurrency.add(transaction);
}
Map<Currency,
 List<Transaction>>
 transactionsByCurr =
   transactions
   .stream()
   .collect(
     Collectors
     .groupingBy(
   Transaction
   ::getCurrency));

Three categories of Stream API methods

//Create a stream
List<String> names = menu.stream()

//Intermediate operations
  .filter(d -> d.getCalories() > 300
  .map(Dish::getName)
  .limit(3)

//Terminal operation
  .collect(Collectors.toList());

Creating a stream

//Empty stream
Stream<Foo> stream0 = Stream.empty();

//Enumerating elements
Stream<String> stream1 =
         Stream.of("gently", "down", "the", "stream");
//From an array
Stream<String> stream2 =
         Arrays.stream("gently down the stream".split(" "));
//From a collection
List<String> strings = ...
Stream<String> stream3 = strings.stream();

//From API
Path path = Paths.get(...);
Stream<Path> stream4 = Files.list(path);

Concatenation of streams

Stream<Foo> s1 = ...;
Stream<Foo> s2 = ...;
Stream<Foo> s = Stream.concat(s1, s2);

Generating streams

//Using a generator
Stream<Double> randoms =
    Stream.generate(Math::random);
          //ALTHOUGH IT IS BETTER
          DoubleStream doubles =
            ThreadLocalRandom.current().doubles()

Producing stream elements with iterate

//Iteratively
Stream<Integer> integers =
    Stream.iterate(0, x -> x + 1);
          //ALTHOUGH IT IS BETTER
          IntStream range = IntStream.range(0, 1000);

What is the contents of this stream?

    Stream.iterate(new int[]{0, 1},
                   t -> new int[]{t[1], t[0] + t[1]})
    .mapToInt(t -> t[0]);

Spliterator: the most common way to create a stream

public interface Spliterator<T> {
  boolean tryAdvance(Consumer <? super T> action);
  Spliterator<T> trySplit();
  long estimateSize();
  int characteristics();
}
StreamSupport.stream(Spliterator<T> spliterator, boolean parallel)

Spliterator branching

split1.png

Spliterator branching

split2.png

Spliterator properties

ORDERED

Elements have a defined order (for example, a List), so the Spliterator enforces this order when traversing and partitioning them.

DISTINCT

For each pair of elements x and y, x.equals(y) returns false.

SORTED

The traversed elements follow a predefined sort order.

SIZED

This Spliterator has been created from a source with a known size, so the value returned by estimatedSize() is precise.

NONNULL

It’s guaranteed that the traversed elements won’t be null.

IMMUTABLE

The source of this Spliterator can’t be modified. This implies that no elements can be added, removed, or modified during their traversal.

CONCURRENT

The source of this Spliterator may be safely concurrently modified by other threads without any synchronization.

SUBSIZED

Both this Spliterator and all further Spliterators resulting from its split are SIZED.

Intermediate conclusions

  • There are many standard ways to generate streams, for simple cases you do not need to "reinvent the wheel"

  • Most likely, you will not need to implement Spliterator yourself.

  • For the sake of performance, there are "primitive streams" (three types):

    • IntStream (more efficient than`<Integer>Stream`)

    • LongStream (more efficient than`<Long>Stream`)

    • DoubleStream (more efficient than`<Double>Stream`)

Convert streams to streams: getting "head" and "tail"

//Stream interface methods

//head
Stream<T> limit(long maxSize)
Stream<T> takeWhile(Predicate<? super T> predicate)

//tail
Stream<T> skip(long n)
Stream<T> dropWhile(Predicate<? super T> predicate)

filter

filter
Stream<T> filter(
   Predicate<? super T> predicate);

map

map
<R> Stream<R> map(Function<? super T,
                           ? extends R> mapper);
IntStream mapToInt(
             ToIntFunction<? super T> mapper);
LongStream mapToLong(
            ToLongFunction<? super T> mapper);
DoubleStream mapToDouble(
          ToDoubleFunction<? super T> mapper);

flatMap

Path path = ...
Pattern separator = Pattern.compile("\\s");
try(Stream<String> lines = Files.lines(path, StandardCharsets.UTF_8)) {
    //NOT what we need!
    //Stream<Stream<String>> streamStream =
    //                  lines.map(separator::splitAsStream);

    //Just what we need!
    Stream<String> words = lines.flatMap(separator::splitAsStream);
    words.forEach(System.out::println);
}

/*And also:
  flatMapToDouble
  flatMapToInt
  flatMapToLong
  */

distinct

//Internal Set

Stream.of(1, 5, 8, 7, 8, 5, 9, 9)
    .distinct()
    .forEach(System.out::println);

//Outputs 1, 5, 8, 7, 9

What happens when you execute this code?

ThreadLocalRandom.current().ints(1, 10)
  .distinct()
  .forEach(System.out::println);

The program will hang when all values from 1 to 10 are exhausted. You can fix it, for example, by specifying limit(9) after distinct.

sorted

//Internal sorted list

Stream.of(1, 5, 8, 7, 8, 5, 9, 9)
    .sorted()
    .forEach(System.out::println);

//Outputs 1, 5, 5, 7, 8, 8, 9, 9

//Doesn't make sense for infinite streams

peek — debug method

  • Does not change the original stream.

  • Designed to "peek" at the intermediate state of elements.

  • System.out::println is often used as an argument.

  • In parallel computations, it can be called in any order on in an arbitrary thread.

Stream<T> peek(Consumer<? super T> action);

Terminal operations

//Present the first element
Optional<T> findFirst();
//Present any element
Optional<T> findAny();

//Check to see if the condition is satisfied by...
//...at least one element
boolean anyMatch(Predicate<? super T> predicate);
//...all the elements
boolean allMatch(Predicate<? super T> predicate);
//...no element
boolean noneMatch(Predicate<? super T> predicate);

//SHORT CIRCUITING!

forEach

void forEach(Consumer<? super T> action);
  • In the case of parallel execution, there are no guarantees of consistency

  • There is no guarantee as to which thread the lambda will be executed on

reduce (with identity)

reduce1.png
  • Associative function + "identity value"

  • Intermediate results are immutable values

  • Question: What are some examples of associative operations in mathematics?

T reduce(T identity,
  BinaryOperator<T> accumulator);

Parallel reduce

reduce par.png

reduce without identity

reduce2.png
  • identity is not required, but the result may be empty (if the stream itself is empty)

Optional<T> reduce(
  BinaryOperator<T> accumulator);

Ready-made 'reduce' operations

  • Available in all streams:

    • count — generally it requires all elements to be computed!

    • max(Comparator), min(Comparator)

  • Available in streams of primitives:

    • sum

    • average

    • summaryStatistics — count, sum, min and max in a single method.

collect: the most flexible method of assembling the results

  • Type parameters:

    • T — type of stream elements

    • A is the type of accumulator, the intermediate data structure in which everything is collected

    • R — result type

  • Unlike reduce, which works with immutable objects, it mutates the accumulator.

<R, A> R collect(Collector<? super T, A, R> collector);

Collector<T, A, R> interface

  • T — type of stream elements

  • A is the type of accumulator, the intermediate data structure in which everything is collected

  • R — result type

public interface Collector<T, A, R> {
  Supplier<A> supplier();
  BiConsumer<A, T> accumulator();
  Function<A, R> finisher();
  BinaryOperator<A> combiner();
  Set<Characteristics> characteristics();
}

//For example:
class ToListCollector<T> implements
   Collector<T, List<T>, List<T>>

Collector properties

Characteristic

Meaning

CONCURRENT

Indicates that this collector is_concurrent_, meaning that the result container can support the accumulator function being called concurrently with the same result container from multiple threads.

UNORDERED

Indicates that the collection operation does not commit to preserving the encounter order of input elements. (This might be true if the result container has no intrinsic order, such as a Set.)

IDENTITY_FINISH

Indicates that the finisher function is the identity function and can be elided. If set, it must be the case that an unchecked cast from A to R will succeed.

Intermediate conclusions

  • Making your own collector is not easy,

  • But good news is that there are many ready-made ones and they can be combined!

Most used collectors

//java.util.stream.Collectors
Collector<T, ?, List<T>> toList()
Collector<T, ?, Set<T>> toSet()
Collector<T, ?, C extends Collection<T>>
  toCollection(Supplier<C> collectionFactory)

//Usage example
Stream<Foo> myStream = ...
List<Foo> list = myStream.collect(Collectors.toList());

Collecting to maps

Collector<T, ?, Map<K,U>> toMap(
  Function<? super T, ? extends K> keyMapper,
  Function<? super T, ? extends U> valueMapper)

//Usage example
Stream<Person> people = ...
Map<Integer, Person> idToPerson = people.collect(
  Collectors.toMap(Person::getId, p->p);
  • You can also specify mergeFunction and mapSupplier.

Variations on the theme of the collecting

  • toUnmodifiable(List| Set| Map) yields an immutable collection

  • toConcurrentMap yields a thread-safe map.

Delimited string

static Collector<CharSequence, ?, String> joining()

//Usage example
menuStream.map(Dish::getName).collect(Collectors.joining(", ");

//Do not forget that this can be done without streams:
static String join(CharSequence delimiter,
            Iterable<? extends CharSequence> elements)

Groupings

groupby.png
Map<Dish.Type, List<Dish>> dishesByType =
  menu.stream().collect(Collectors.groupingBy(Dish::getType));

Downstream Collectors

groupbyby.png
Map<Dish.Type, Map<Dish.CaloricLevel, List<Dish>>>
  dishesByTypeAndCaloricLevel =
    menu.stream().collect(Collectors
      .groupingBy(Dish::getType,
         Collectors.groupingBy(Dish::getCaloricLevel)));

But we can perform other operations as well!

groupbyandcount.png
Map<Dish.Type, Long> typesCount =
  menu.stream().collect(Collectors
    .groupingBy(Dish::getType, Collectors.counting()));

Downstream collectors set

/*All this is already present in the Stream API, but we need to apply it
to the "branched" stream.*/


/*"Terminal"*/
counting()
summingInt(ToIntFunction<? super T> mapper)
summingLong(ToLongFunction<? super T> mapper)
summingDouble(ToDoubleFunction<? super T> mapper)
maxBy(Comparator<? super T> comparator)
minBy(Comparator<? super T> comparator)

Downstream collectors continued

/*Giving an opportunity to continue the chain*/
filtering(Predicate<? super T> predicate, downstream)
mapping(Function<? super T, ? extends U> mapper, downstream)
flatMapping(Function<? super T,
  ? extends Stream<? extends U>> mapper, downstream)

/*Adaptor*/
collectingAndThen(Collector<T,A,R> downstream,
  Function<R,RR> finisher)

When might collectingAndThen be needed?

Map<Dish.Type, Dish> mostCaloricByType =
  menu.stream()
     .collect(Collectors.groupingBy(Dish::getType,
       collectingAndThen( //<-----ADAPTOR
         Collectors.maxBy(Comparator
                   .comparingInt(Dish::getCalories),
         //maxBy returns Optional
       Optional::get)));

Parallel streams

  • The .parallel() method enables parallelization of processing.

  • parallel can be called anywhere in the call chain.

  • It should be used with caution, understanding the limitations and applicability.

Optional: The container that contains an object. Or it doesn’t.

  • Quote from 'Java 8 in Action':

  • "Tony Hoare, one of the giants of computer science, said in a presentation at QCon London 2009: I call it my billion-dollar mistake. It was the invention of the null reference in 1965…​. I couldn’t resist the temptation to put in a null reference, simply because it was so easy to implement."

Optional<T> is like a stream of 0 or 1 element

//Creating
Optional.empty(); //empty optional
Optional.of(x); //NPE if x == null
Optional.ofNullable(x); //empty or containing x

//Unsheathing
o.get();
o.orElse(other);
o.orElseGet(()->calcOther());
o.orElseThrow(()->new IllegalStateException());

Optional.map

Optional<Insurance> optInsurance = Optional.ofNullable(insurance);
Optional<String> name = optInsurance.map(Insurance::getName);

Optional.flatMap

Optional<Person> person = ...

//person.map(Person::getCar) returns Optional<Optional<Car>>!!

String insuranceName = person.flatMap(Person::getCar)
                             .flatMap(Car::getInsurance)
                             .map(Insurance::getName)
                             .orElse("Unknown");

Optional.filter

String insuranceName = person.filter(p -> p.getAge() >= minAge)
                             .flatMap(Person::getCar)
                             .flatMap(Car::getInsurance)
                             .map(Insurance::getName)
                             .orElse("Unknown");

Rules of using Optional

Dumb usage of Optional is worse than null:

WRONG

OK

if (o.isPresent())
  o.get().someMethod();
if (o != null)
  o.someMethod();

Rules of using Optional

  • A variable of Optional type should never be null.

  • Fields with the Optional type are useless: checking for "not empty" of this field is no better than checking for null, the price is an additional object.

  • Never put Optional in a collection.

  • In general, Optional is for return values, not method arguments.

When streams should not be used

What’s wrong?

collection.stream().forEach(...)

The Collection class already has a forEach method, you do not need to create a stream.

What’s wrong?

collection.stream().collect(Collectors.toList())
collection.stream().collect(Collectors.toSet())
/*more efficient*/
new ArrayList<>(collection)
new HashSet<>(collection)

What’s wrong?

collection.stream().max(Comparator.naturalOrder()).get()
/*Same with less garbage*/
Collections.max(collection)

What did the author want to say?

stream.sorted(comparator).findFirst()
stream.min(comparator)

How to improve?

stream.collect(Collectors.counting())
/*counting(), maxBy(), minBy(), reducing(), mapping(), etc collectors
are intended to be used in cascading groupingBy operations only!*/
stream.count()

What’s wrong?!

collection.stream().count()
/*Without counting the elements one after another!*/
collection.size()

How to improve?

listOfLists.stream().flatMap(List::stream).count()
/*The number of elements in each sublist is known!*/
listOfLists.stream().mapToInt(List::size).sum()

How to improve?

stream.filter(condition).findFirst().isPresent()
/*Why getting the element itself if we just need to check its presence?*/
stream.anyMatch(condition)

How to improve?

stream.filter(condition).anyMatch(x -> true)
stream.map(condition).anyMatch(b -> b)
stream.map(condition).anyMatch(Boolean::booleanValue)
stream.map(condition).anyMatch(Boolean.TRUE::equals)
/*Intricate variations on a trivial theme*/
stream.anyMatch(condition)

How to improve?!

if(stream.filter(condition).count() > 0)
/*This example is even worse than the previous ones,
because it traverses the whole stream! And the solution is the same:*/
if(stream.anyMatch(condition))

How to improve?!

if(stream.count() > 2)
/*We don't care how many there are if there are more than two*/
stream.limit(3).count()

Conclusions

  • The "declarative" approach, "describing what, not as" does not devalue the understanding of how the calculation occurs.

  • Streams should be used wisely, and there are many cases where they should not be used.