Core Java. Lecture #9

Streams API, Optionals

Ivan Ponomarev, Synthesized.io/MIPT

Streams

  • Appeared in Java8, along with lambdas & method references.

  • Process finite and potentially infinite data sets.

  • Declarative approach to data processing: we describe what we want to get, not how we will get it.

  • Transparent parallelism.

Convert stream to stream

blockStream

map

squashedStream

List<Block> blocks = ...;

Stream<Block> blocksStream = blocks.stream();

Stream<SquashedBlock> squashedStream =
  blocksStream.map(Block::squash);

(The author of the animations is Tagir Valeev, see moving pictures here)

Filtering

squashedStream

filter

filteredStream

Stream<SquashedBlock> filteredStream =
  squashedStream.filter(block >
         block.getColor() != YELLOW);

Display in the console (terminal operation)

filteredStream

display
filteredStream
  .forEach(System.out::println);

All together in one line

fuse
blocks.stream()
      .map(Block::squash)
      .filter(block >
         block.getColor() != YELLOW)
      .forEach(System.out::println);

Does it resemble something?

"Merge two files, convert lines to lowercase, sort, display the last three lines in alphabetical order"

cat file1 file2 | tr "[A-Z]" "[a-z]" | sort | tail -3

Why?

Java 7

Java 8

Map<Currency, List<Transaction>>
 transactionsByCurrencies = new HashMap<>();

for (Transaction transaction : transactions) {
 Currency currency =transaction.getCurrency();
 List<Transaction> transactionsForCurrency =
   transactionsByCurrencies.get(currency);
 if (transactionsForCurrency == null) {
  transactionsForCurrency = new ArrayList<>();
    transactionsByCurrencies.put(currency,
           transactionsForCurrency);
 }
 transactionsForCurrency.add(transaction);
}
Map<Currency,
 List<Transaction>>
 transactionsByCurr =
   transactions
   .stream()
   .collect(
     Collectors
     .groupingBy(
   Transaction
   ::getCurrency));

Three categories of Stream API methods

//Create a stream
List<String> names = menu.stream()

//Intermediate operations
  .filter(d -> d.getCalories() > 300
  .map(Dish::getName)
  .limit(3)

//Terminal operation
  .collect(Collectors.toList());

Creating a stream

//Empty stream
Stream<Foo> stream0 = Stream.empty();

//Enumerating elements
Stream<String> stream1 =
         Stream.of("gently", "down", "the", "stream");
//From an array
Stream<String> stream2 =
         Arrays.stream("gently down the stream".split(" "));
//From a collection
List<String> strings = ...
Stream<String> stream3 = strings.stream();

//From API
Path path = Paths.get(...);
Stream<Path> stream4 = Files.list(path);

Concatenation of streams

Stream<Foo> s1 = ...;
Stream<Foo> s2 = ...;
Stream<Foo> s = Stream.concat(s1, s2);

Generating streams

//Using a generator
Stream<Double> randoms =
    Stream.generate(Math::random);
          //ALTHOUGH IT IS BETTER
          DoubleStream doubles =
            ThreadLocalRandom.current().doubles()

Producing stream elements with iterate

//Iteratively
Stream<Integer> integers =
    Stream.iterate(0, x -> x + 1);
          //ALTHOUGH IT IS BETTER
          IntStream range = IntStream.range(0, 1000);

What is the contents of this stream?

    Stream.iterate(new int[]{0, 1},
                   t -> new int[]{t[1], t[0] + t[1]})
    .mapToInt(t -> t[0]);

Spliterator: the most common way to create a stream

public interface Spliterator<T> {
  boolean tryAdvance(Consumer <? super T> action);
  Spliterator<T> trySplit();
  long estimateSize();
  int characteristics();
}
StreamSupport.stream(Spliterator<T> spliterator, boolean parallel)

Intermediate conclusions

  • There are many standard ways to generate streams, for simple cases you do not need to "reinvent the wheel"

  • Most likely, you will not need to implement Spliterator yourself.

  • For the sake of performance, there are "primitive streams" (three types):

    • IntStream (more efficient than`<Integer>Stream`)

    • LongStream (more efficient than`<Long>Stream`)

    • DoubleStream (more efficient than`<Double>Stream`)

Convert streams to streams: getting "head" and "tail"

//Stream interface methods

//head
Stream<T> limit(long maxSize)
Stream<T> takeWhile(Predicate<? super T> predicate)

//tail
Stream<T> skip(long n)
Stream<T> dropWhile(Predicate<? super T> predicate)

filter

filter
Stream<T> filter(
   Predicate<? super T> predicate);

map

map
<R> Stream<R> map(Function<? super T,
                           ? extends R> mapper);
IntStream mapToInt(
             ToIntFunction<? super T> mapper);
LongStream mapToLong(
            ToLongFunction<? super T> mapper);
DoubleStream mapToDouble(
          ToDoubleFunction<? super T> mapper);

flatMap

Path path = ...
Pattern separator = Pattern.compile("\\s");
try(Stream<String> lines = Files.lines(path, StandardCharsets.UTF_8)) {
    //NOT what we need!
    //Stream<Stream<String>> streamStream =
    //                  lines.map(separator::splitAsStream);

    //Just what we need!
    Stream<String> words = lines.flatMap(separator::splitAsStream);
    words.forEach(System.out::println);
}

/*А также flatMapToDouble, flatMapToInt, flatMapToLong*/
  • filter и map — это частные случаи flatMap!

distinct

//Internal Set

Stream.of(1, 5, 8, 7, 8, 5, 9, 9)
    .distinct()
    .forEach(System.out::println);

//Outputs 1, 5, 8, 7, 9

What happens when you execute this code?

ThreadLocalRandom.current().ints(1, 10)
  .distinct()
  .forEach(System.out::println);

The program will hang when all values from 1 to 10 are exhausted. You can fix it, for example, by specifying limit(9) after distinct.

sorted

//Internal sorted list

Stream.of(1, 5, 8, 7, 8, 5, 9, 9)
    .sorted()
    .forEach(System.out::println);

//Outputs 1, 5, 5, 7, 8, 8, 9, 9

//Doesn't make sense for infinite streams

peek — debug method

peek
  • Does not change the original stream.

  • Designed to "peek" at the intermediate state of elements.

  • System.out::println is often used as an argument.

  • In parallel computations, it can be called in any order on in an arbitrary thread.

Stream<T> peek(Consumer<? super T> action);

Gatherers: Фиксированное окно

flatMap наоборот

Stream<List<Integer>> fixed =

        Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9)

                .gather(Gatherers.windowFixed(3));

println(fixed.toList());
[[1, 2, 3], [4, 5, 6], [7, 8, 9]]

Gatherers: Скользящее окно

Отлично подходит для расчёта moving average:

Stream<List<Integer>> sliding =

        Stream.of(1, 2, 3, 4, 5, 6)

                .gather(Gatherers.windowSliding(3));

println(sliding.toList());
[[1, 2, 3], [2, 3, 4], [3, 4, 5], [4, 5, 6]]

Gatherers: scan

Подсчёт накопленного итога

Stream<Integer> sum =
    Stream.of(3, 5, -2, 7)
        .gather(Gatherers.scan(
            //initial value supplier
            () -> 0,
            //scanner function to apply to each element
            Integer::sum));
println(sum.toList());
[3, 8, 6, 13]

Terminal operations

//Present the first element
Optional<T> findFirst();
//Present any element
Optional<T> findAny();

//Check to see if the condition is satisfied by...
//...at least one element
boolean anyMatch(Predicate<? super T> predicate);
//...all the elements
boolean allMatch(Predicate<? super T> predicate);
//...no element
boolean noneMatch(Predicate<? super T> predicate);

//SHORT CIRCUITING!

forEach

void forEach(Consumer<? super T> action);
  • In the case of parallel execution, there are no guarantees of consistency

  • There is no guarantee as to which thread the lambda will be executed on

toList (Java16+)

List<T> toList()
Stream<Foo> myStream = ...
List<Foo> list = myStream.toList();
  • Материализует поток в список.

  • Возвращаемый список — неизменяемый.

  • Доступно с версии 16.

reduce (with identity)

reduce1
  • Ассоциативная функция + "identity value"

  • Промежуточные результаты — immutable values

  • Вопрос: назовите примеры ассоциативных операций в математике?

T reduce(T identity,
  BinaryOperator<T> accumulator);

Parallel reduce

reduce par

reduce without identity

reduce2
  • identity is not required, but the result may be empty (if the stream itself is empty)

Optional<T> reduce(
  BinaryOperator<T> accumulator);

Ready-made 'reduce' operations

  • Available in all streams:

    • count — generally it requires all elements to be computed!

    • max(Comparator), min(Comparator)

  • Available in streams of primitives:

    • sum

    • average

    • summaryStatistics — count, sum, min and max in a single method.

collect: the most flexible method of assembling the results

  • Type parameters:

    • T — type of stream elements

    • A is the type of accumulator, the intermediate data structure in which everything is collected

    • R — result type

  • Unlike reduce, which works with immutable objects, it mutates the accumulator.

<R, A> R collect(Collector<? super T, A, R> collector);

Collector<T, A, R> interface

  • T — type of stream elements

  • A is the type of accumulator, the intermediate data structure in which everything is collected

  • R — result type

public interface Collector<T, A, R> {
  Supplier<A> supplier();
  BiConsumer<A, T> accumulator();
  Function<A, R> finisher();
  BinaryOperator<A> combiner();
  Set<Characteristics> characteristics();
}

//For example:
class ToListCollector<T> implements
   Collector<T, List<T>, List<T>>

Collector<T, A, R> interface

  • Сделать собственный коллектор (как и собственный сплитератор) непросто,

  • Но и не нужно: есть много готовых и их можно комбинировать!

Most used collectors

//java.util.stream.Collectors
Collector<T, ?, List<T>> toList() // << use stream.toList() instead!
Collector<T, ?, Set<T>> toSet()
Collector<T, ?, C extends Collection<T>>
  toCollection(Supplier<C> collectionFactory)

//Usage example
Stream<Foo> myStream = ...
Set<Foo> set = myStream.collect(Collectors.toSet());

Collecting to maps

Collector<T, ?, Map<K,U>> toMap(
  Function<? super T, ? extends K> keyMapper,
  Function<? super T, ? extends U> valueMapper)

//Usage example
Stream<Person> people = ...
Map<Integer, Person> idToPerson = people.collect(
  Collectors.toMap(Person::getId, p->p);
  • You can also specify mergeFunction and mapSupplier.

Variations on the theme of the collecting

  • toUnmodifiable(List| Set| Map) yields an immutable collection

  • toConcurrentMap yields a thread-safe map.

Delimited string

static Collector<CharSequence, ?, String> joining()

//Usage example
menuStream.map(Dish::getName).collect(Collectors.joining(", ");

//Do not forget that this can be done without streams:
static String join(CharSequence delimiter,
            Iterable<? extends CharSequence> elements)

Groupings

groupby
Map<Dish.Type, List<Dish>> dishesByType =
  menu.stream().collect(Collectors.groupingBy(Dish::getType));

Downstream Collectors

groupbyby
Map<Dish.Type, Map<Dish.CaloricLevel, List<Dish>>>
  dishesByTypeAndCaloricLevel =
    menu.stream().collect(Collectors
      .groupingBy(Dish::getType,
         Collectors.groupingBy(Dish::getCaloricLevel)));

But we can perform other operations as well!

groupbyandcount
Map<Dish.Type, Long> typesCount =
  menu.stream().collect(Collectors
    .groupingBy(Dish::getType, Collectors.counting()));

Downstream collectors set

/*All this is already present in the Stream API, but we need to apply it
to the "branched" stream.*/


/*"Terminal"*/
counting()
summingInt(ToIntFunction<? super T> mapper)
summingLong(ToLongFunction<? super T> mapper)
summingDouble(ToDoubleFunction<? super T> mapper)
maxBy(Comparator<? super T> comparator)
minBy(Comparator<? super T> comparator)

Downstream collectors continued

/*Giving an opportunity to continue the chain*/
filtering(Predicate<? super T> predicate, downstream)
mapping(Function<? super T, ? extends U> mapper, downstream)
flatMapping(Function<? super T,
  ? extends Stream<? extends U>> mapper, downstream)

/*Adaptor*/
collectingAndThen(Collector<T,A,R> downstream,
  Function<R,RR> finisher)

When might collectingAndThen be needed?

Map<Dish.Type, Dish> mostCaloricByType =
  menu.stream()
     .collect(Collectors.groupingBy(Dish::getType,
       collectingAndThen( //<-----ADAPTOR
         Collectors.maxBy(Comparator
                   .comparingInt(Dish::getCalories),
         //maxBy returns Optional
       Optional::get)));

Parallel streams

  • The .parallel() method enables parallelization of processing.

  • parallel can be called anywhere in the call chain.

  • It should be used with caution, understanding the limitations and applicability.

Optional: The container that contains an object. Or it doesn’t.

  • Quote from 'Java 8 in Action':

  • "Tony Hoare, one of the giants of computer science, said in a presentation at QCon London 2009: I call it my billion-dollar mistake. It was the invention of the null reference in 1965…​. I couldn’t resist the temptation to put in a null reference, simply because it was so easy to implement."

optional

Optional<T> is like a stream of 0 or 1 element

//Creating
Optional.empty(); //empty optional
Optional.of(x); //NPE if x == null
Optional.ofNullable(x); //empty or containing x

//Unsheathing
o.get();
o.orElse(other);
o.orElseGet(()->calcOther());
o.orElseThrow(()->new IllegalStateException());

Optional.map

Optional<Insurance> optInsurance = Optional.ofNullable(insurance);
Optional<String> name = optInsurance.map(Insurance::getName);

Optional.flatMap

Optional<Person> person = ...

//person.map(Person::getCar) returns Optional<Optional<Car>>!!

String insuranceName = person.flatMap(Person::getCar)
                             .flatMap(Car::getInsurance)
                             .map(Insurance::getName)
                             .orElse("Unknown");

Optional.filter

String insuranceName = person.filter(p -> p.getAge() >= minAge)
                             .flatMap(Person::getCar)
                             .flatMap(Car::getInsurance)
                             .map(Insurance::getName)
                             .orElse("Unknown");

Rules of using Optional

Dumb usage of Optional is worse than null:

WRONG

OK

if (o.isPresent())
  o.get().someMethod();
if (o != null)
  o.someMethod();

Rules of using Optional

  • A variable of Optional type should never be null.

  • Fields with the Optional type are useless: checking for "not empty" of this field is no better than checking for null, the price is an additional object.

  • Never put Optional in a collection.

  • In general, Optional is for return values, not method arguments.

When streams should not be used

What’s wrong?

collection.stream().forEach(...)

The Collection class already has a forEach method, you do not need to create a stream.

What’s wrong?

collection.stream().collect(Collectors.toList())
collection.stream().collect(Collectors.toSet())
collection.stream().toList()
/*more efficient*/
new ArrayList<>(collection)
new HashSet<>(collection)

What’s wrong?

collection.stream().max(Comparator.naturalOrder()).get()
/*Same with less garbage*/
Collections.max(collection)

What did the author want to say?

stream.sorted(comparator).findFirst()
stream.min(comparator)

How to improve?

stream.collect(Collectors.counting())
/*counting(), maxBy(), minBy(), reducing(), mapping(), etc collectors
are intended to be used in cascading groupingBy operations only!*/
stream.count()

What’s wrong?!

collection.stream().count()
/*Without counting the elements one after another!*/
collection.size()

How to improve?

listOfLists.stream().flatMap(List::stream).count()
/*The number of elements in each sublist is known!*/
listOfLists.stream().mapToInt(List::size).sum()

How to improve?

stream.filter(condition).findFirst().isPresent()
/*Why getting the element itself if we just need to check its presence?*/
stream.anyMatch(condition)

How to improve?

stream.filter(condition).anyMatch(x -> true)
stream.map(condition).anyMatch(b -> b)
stream.map(condition).anyMatch(Boolean::booleanValue)
stream.map(condition).anyMatch(Boolean.TRUE::equals)
/*Intricate variations on a trivial theme*/
stream.anyMatch(condition)

How to improve?!

if(stream.filter(condition).count() > 0)
/*This example is even worse than the previous ones,
because it traverses the whole stream! And the solution is the same:*/
if(stream.anyMatch(condition))

How to improve?!

if(stream.count() > 2)
/*We don't care how many there are if there are more than two*/
stream.limit(3).count()

Conclusions

  • The "declarative" approach, "describing what, not as" does not devalue the understanding of how the calculation occurs.

  • Streams should be used wisely, and there are many cases where they should not be used.