Kafka Streams API

Шаг за рамки Hello World

Иван Пономарёв, КУРС/МФТИ

Зачем нам Kafka?

  • Web-scraping в реальном времени

  • 500 запросов/сек (да, это очень мало!)

  • Удобные абстракции «из коробки»:

    • Персистентный, но «подрезаемый» лог

    • Microbatching

Зачем нам Streams API?

  • Real-time stream processing

  • Stream-like API (map / reduce)

  • Под капотом:

    • Автоматический offset commit

    • Ребалансировка

    • Внутреннее состояние обработчиков

    • Легкое масштабирование

Disclaimer

Наш Kafka Streams ещё не в production!

koshelev

Вопросы про production?

Григорий Кошелев:

«Когда всё пошло по Кафке»

Kafka за 30 секунд

kafka cluster
Источник: Kafka. The Definitive Guide

Kafka Message

1024px Aiga mail.svg
  • Номер партиции

  • Ключ

  • Значение

Compacted topics

log compaction
Источник: Kafka Documentation

Попробуем?

gamov

Наш план

  1. Общая структура приложения. Простые (stateless) трансформации

  2. Трансформации с использованием локального состояния

  3. Дуализм «поток—таблица» и табличные join-ы

  4. Время и оконные операции

Kafka Streams API: общая структура KStreams-приложения

StreamsConfig config = ...;
//Здесь устанавливаем всякие опции

Topology topology = new StreamsBuilder()
//Здесь строим топологию
....build();

Kafka Streams API: общая структура KStreams-приложения

Топология — конвейер обработчиков:

topology sample.png

Kafka Streams API: общая структура KStreams-приложения

StreamsConfig config = ...;
//Здесь устанавливаем всякие опции

Topology topology = new StreamsBuilder()
//Здесь строим топологию
....build();


//Это за нас делает SPRING-KAFKA
KafkaStreams streams = new KafkaStreams(topology, config);
streams.start();
...
streams.close();

В Спринге достаточно определить две вещи

  • @Bean KafkaStreamsConfiguration

  • @Bean Topology

borisov

И не забудьте про @EnableKafkaStreams

Легенда

soccerball
  • Идут футбольные матчи (меняется счёт)

  • Делаются ставки: H, D, A.

  • Поток ставок, значение:

class Bet {
  String bettor;   //John Doe
  String match;    //Cyprus-Belgium
  Outcome outcome; //A (or H or D)
  long amount;     //100
  double odds;     //1.7
  long timestamp;  //1554215083998
}
  • Поток ставок, ключ: Cyprus-Belgium:A

@Bean KafkaConfiguration

//ВАЖНО!
@Bean(name =
    KafkaStreamsDefaultConfiguration
                .DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration getStreamsConfig() {
    Map<String, Object> props = new HashMap<>();
    //ВАЖНО!
    props.put(StreamsConfig.APPLICATION_ID_CONFIG,
        "stateless-demo-app");
    //ВАЖНО!
    props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4);
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    ...
    KafkaStreamsConfiguration streamsConfig =
            new KafkaStreamsConfiguration(props);
    return streamsConfig;
}

@Bean NewTopic

@Bean
NewTopic getFilteredTopic() {
    Map<String, String> props = new HashMap<>();
    props.put(
      TopicConfig.CLEANUP_POLICY_CONFIG,
      TopicConfig.CLEANUP_POLICY_COMPACT);
    return new NewTopic("mytopic", 10, (short) 1).configs(props);
}

@Bean Topology

yelling topology.png
@Bean
public Topology createTopology(StreamsBuilder streamsBuilder) {
    KStream<String, Bet> input = streamsBuilder.stream(...);
    KStream<String, Long> gain
            = input.mapValues(v -> Math.round(v.getAmount() * v.getOdds()));
    gain.foreach(....);
    return streamsBuilder.build();
}

TopologyTestDriver: создание

KafkaStreamsConfiguration config = new KafkaConfiguration()
                                        .getStreamsConfig();
StreamsBuilder sb = new StreamsBuilder();
Topology topology = new TopologyConfiguration().createTopology(sb);
TopologyTestDriver topologyTestDriver =
        new TopologyTestDriver(topology,
                               config.asProperties());

TopologyTestDriver: использование

Bet bet = Bet.builder()
            .bettor("John Doe")
            .match("Germany-Belgium")
            .outcome(Outcome.H)
            .amount(100)
            .odds(1.7).build();

topologyTestDriver.pipeInput(
    betFactory.create(BET_TOPIC, bet.key(), bet));

assertEquals(Arrays.asList("Germany-Belgium:H 170"), output);

Простое ветвление стримов

Java-стримы так не могут:

KStream<..> foo = ...
KStream<..> bar = foo.mapValues().map... to...
Kstream<..> baz = foo.filter().map... forEach...
simplebranch.png

Ветвление стримов по условию

Не используйте KStream.branch, используйте KafkaStreamsBrancher!

new KafkaStreamsBrancher<String, String>()
   .branch((key, value) -> value.contains("A"), ks -> ks.to("A"))
   .branch((key, value) -> value.contains("B"), ks -> ks.to("B"))
   .defaultBranch(ks -> ks.to("C"))
   .onTopOf(builder.stream("source"))
   .map(...)
switchbranch.png

Простое слияние

KStream<String, Integer> foo = ...
KStream<String, Integer> bar = ...
KStream<String, Integer> merge = foo.merge(bar);
merge.png

Наш план

  1. Общая структура приложения. Простые (stateless) трансформации

  2. Трансформации с использованием локального состояния

  3. Дуализм «поток—таблица» и табличные join-ы

  4. Время и оконные операции

Локальное состояние

Facebook’s RocksDB — что это и зачем?

rocksdb
  • Embedded key/value storage

  • LSM Tree (Log-Structured Merge-Tree)

  • High-performant (data locality)

  • Persistent, optimized for SSD

RocksDB похож на TreeMap<K,V>

  • Сохранение K,V в бинарном формате

  • Лексикографическая сортировка

  • Iterator (snapshot view)

  • Удаление диапазона (deleteRange)

Пишем “Bet Totalling App”

Какова сумма выплат по сделанным ставкам, если сыграет исход?

counting topology.png

@Bean Topology

KStream<String, Bet> input = streamsBuilder.
    stream(BET_TOPIC, Consumed.with(Serdes.String(),
                      new JsonSerde<>(Bet.class)));

KStream<String, Long> counted =
    new TotallingTransformer()
        .transformStream(streamsBuilder, input);

Суммирование ставок

@Override
public KeyValue<String, Long> transform(String key, Bet value,
                    KeyValueStore<String, Long> stateStore) {
    long current = Optional
        .ofNullable(stateStore.get(key))
        .orElse(0L);
    current += value.getAmount();
    stateStore.put(key, current);
    return KeyValue.pair(key, current);
}

StateStore доступен в тестах

@Test
void testTopology() {
    topologyTestDriver.pipeInput(...);
    topologyTestDriver.pipeInput(...);

    KeyValueStore<String, Long> store =
        topologyTestDriver
        .getKeyValueStore(TotallingTransformer.STORE_NAME);

    assertEquals(..., store.get(...));
    assertEquals(..., store.get(...));
}

Демо: Ребалансировка / репликация

  • Ребалансировка / репликация партиций state при запуске / выключении обработчиков.

Сохранение локального состояния в топик

$kafka-topics --zookeeper localhost --describe

Topic:bet-totalling-demo-app-totalling-store-changelog
PartitionCount:10
ReplicationFactor:1
Configs:cleanup.policy=compact
counting topology changelog.png

Партиционирование и local state

local partitioning oneworker.png

Партиционирование и local state

local partitioning.png

Репартиционирование

through.png
  • Явное при помощи
    through(String topic, Produced<K, V> produced)

  • Неявное при map()/selectKey() и stateful-операциях

Наш план

  1. Общая структура приложения. Простые (stateless) трансформации

  2. Трансформации с использованием локального состояния

  3. Дуализм «поток—таблица» и табличные join-ы

  4. Время и оконные операции

Таблицы vs стримы

Местонахождение пользователя

stream table animation latestLocation
Michael G. Noll. Of Streams and Tables in Kafka and Stream Processing

Таблицы vs стримы

Количество посещенных мест

stream table animation numVisitedLocations
Michael G. Noll. Of Streams and Tables in Kafka and Stream Processing

Таблицы vs стримы

Производная и интеграл

derivative and integral
Martin Kleppmann, “Designing Data Intensive Applications”

Join

derivative
join storages.png

Переписываем totalling app при помощи KTable

KTable<String, Long> totals = input.groupByKey().aggregate(
    () -> 0L,
    (k, v, a) -> a + Math.round(v.getAmount() * v.getOdds()),
    Materialized.with(Serdes.String(), Serdes.Long())
);
$kafka-topics --zookeeper localhost --describe

Topic:
table2-demo-KSTREAM-AGGREGATE-STATE-STORE-0000000001-changelog
PartitionCount:10
ReplicationFactor:1
Configs:cleanup.policy=compact

Получаем таблицу счетов матчей

KStream<String, Score> scores =
    eventScores.flatMap((k, v) ->
        Stream.of(Outcome.H, Outcome.A).map(o ->
            KeyValue.pair(String.format("%s:%s", k, o), v))
            .collect(Collectors.toList()))
    .mapValues(EventScore::getScore);

KTable<String, Score> tableScores =
    scores.groupByKey(Grouped.with(...).reduce((a, b) -> b);
$kafka-topics --zookeeper localhost --list

table2-demo-KSTREAM-REDUCE-STATE-STORE-0000000006-repartition
table2-demo-KSTREAM-REDUCE-STATE-STORE-0000000006-changelog

Демо: Объединяем сумму ставок с текущим счётом

KTable<String, String> joined =
    totals.join(tableScores,
            (total, eventScore) ->
                String.format("(%s)\t%d", eventScore, total));

Копартиционирование

Join работает

copart norm.png

Несовпадение количества партиций

Join не работает (Runtime Exception)

copart diff.png

Несовпадение алгоритма партицирования

Join не работает молча!

copart diff algorithm.png

GlobalKTable

Реплицируется всюду целиком

GlobalKTable<...> global = streamsBuilder.globalTable("global", ...);
globalktable.png

Операции между стримами и таблицами: сводка

streams stateful operations

Наш план

  1. Общая структура приложения. Простые (stateless) трансформации

  2. Трансформации с использованием локального состояния

  3. Дуализм «поток—таблица» и табличные join-ы

  4. Время и оконные операции

Сохранение Timestamped-значений в RocksDB

WindowKeySchema.java

static Bytes toStoreKeyBinary(byte[] serializedKey,
                              long timestamp,
                              int seqnum) {
    ByteBuffer buf = ByteBuffer.allocate(
                                serializedKey.length
                                + TIMESTAMP_SIZE
                                + SEQNUM_SIZE);
    buf.put(serializedKey);
    buf.putLong(timestamp);
    buf.putInt(seqnum);
    return Bytes.wrap(buf.array());
}

Быстрое извлечение значений по ключу из диапазона времени

timestamped record.png

Демо: Windowed Joins

  • «Послегольщик» — игрок, пытающийся протолкнуть правильную ставку в момент смены счёта в матче

  • Штамп времени ставки и события смены счёта должны «почти совпадать».

Время, вперёд!

KStream<String, Bet> bets = streamsBuilder.stream(BET_TOPIC,
    Consumed.with(
            Serdes...)
            .withTimestampExtractor(

                (record, previousTimestamp) ->
                    ((Bet) record.value()).getTimestamp()

            ));

(Ещё время можно извлечь из WallClock и RecordMetadata.)

Демо: Windowed Joins

По событию смены счёта понимаем, какая ставка будет «правильной»:

Score current = Optional.ofNullable(stateStore.get(key))
                .orElse(new Score());
stateStore.put(key, value.getScore());

Outcome currenOutcome =
    value.getScore().getHome() > current.getHome()
    ?
    Outcome.H : Outcome.A;

Демо: Windowed Joins

KStream<String, String> join = bets.join(outcomes,
    (bet, sureBet) ->

    String.format("%s %dms before goal",
                bet.getBettor(),
                sureBet.getTimestamp() - bet.getTimestamp()),
                JoinWindows.of(Duration.ofSeconds(1)).before(Duration.ZERO),
                Joined.with(Serdes....
    ));

Tumbling window

TimeWindowedKStream<..., ...> windowed =
    stream.groupByKey()
        .windowedBy(TimeWindows.of(Duration.ofSeconds(20)));
tumbling window
Источник: Kafka Streams in Action

Tumbling window

TimeWindowedKStream<..., ...> windowed =
    stream.groupByKey()
        .windowedBy(TimeWindows.of(Duration.ofSeconds(20)));

KTable<Windowed<...>, Long> count = windowed.count();

/*
* Windowed<K> interface:
* - K key()
* - Window window()
* -- Instant startTime()
* -- Instant endTime()
*/

Hopping Window

TimeWindowedKStream<..., ...> windowed =
    stream.groupByKey()
        .windowedBy(TimeWindows.of(Duration.ofSeconds(20))
                        .advanceBy(Duration.ofSeconds(10)));
hopping window
Источник: Kafka Streams in Action

Session Window

SessionWindowedKStream<..., ...> windowed =
    stream.groupByKey()
        .windowedBy(SessionWindows.with(Duration.ofMinutes(5)));
streams session windows 02

Наш план

  1. Общая структура приложения. Простые (stateless) трансформации

  2. Трансформации с использованием локального состояния

  3. Дуализм «поток—таблица» и табличные join-ы

  4. Время и оконные операции

Пора закругляться!

Kafka Streams in Action

KSIA
  • William Bejeck,
    “Kafka Streams in Action”, November 2018

  • Примеры кода для Kafka 1.0

Kafka: The Definitive Guide

kafka the definitive guide
  • Gwen Shapira, Neha Narkhede, Todd Palino

  • September 2017

Другие источники

Выводы

  • Kafka StreamsAPI — это удобная абстракция над «сырой» Кафкой

  • Чтобы начать пользоваться, надо настроить мышление под потоковую обработку

  • Технология переживает бурное развитие

    • + живой community, есть шанс повлиять на процесс самому

    • - публичные интерфейсы изменяются очень быстро

На этом всё!

Спасибо!