Kafka Streams API

Шаг за рамки Hello World

Иван Пономарёв, КУРС/МФТИ

me
  • Tech Lead at KURS

  • ERP systems & Java background

  • Speaker at JPoint, Devoops, Heisenbug, JUG.MSK, PermDevDay, DevopsForum, Стачка etc.

  • Текущий проект: Real-time Webscraping

celesta duke
bass duke

Celesta & 2bass — хабрапосты:

migration

Celesta 7.x

  • Только Java (отказались от Jython)

  • Maven Plugin

  • JUnit5 extension

  • Spring Boot Starter

Всё, что я показываю, есть на гитхабе

octocat

Зачем нам Kafka?

kafka

Зачем нам Kafka?

  • Web-scraping в реальном времени

  • 500 запросов/сек (да, это очень мало!)

  • Удобные штуки «из коробки»:

    • Персистентный, но «подрезаемый» лог

    • Microbatching

Зачем нам Streams API?

  • Real-time stream processing

  • Stream-like API (map / reduce)

  • Под капотом:

    • Ребалансировка

    • Внутреннее состояние обработчиков (репликация)

    • Легкое масштабирование

Disclaimer #1: предполагается базовое понимание Кафки

Disclaimer #2: доклад не о жизни в production!

Доклады о жизни в production:

Kafka за 30 секунд

kafka cluster
Источник: Kafka. The Definitive Guide

Kafka Message

message
  • Номер партиции

  • Ключ

  • Значение

Compacted topics

log compaction
Источник: Kafka Documentation

Наш план

kafka
  1. Конфигурация приложения. Простые (stateless) трансформации

  2. Трансформации с использованием локального состояния

  3. Дуализм «поток—таблица» и табличные join-ы

  4. Время и оконные операции

kafka

Kafka Streams API: общая структура KStreams-приложения

StreamsConfig config = ...;
//Здесь устанавливаем всякие опции

Topology topology = new StreamsBuilder()
//Здесь строим топологию
....build();

Kafka Streams API: общая структура KStreams-приложения

Топология — конвейер обработчиков:

topology sample

Kafka Streams API: общая структура KStreams-приложения

StreamsConfig config = ...;
//Здесь устанавливаем всякие опции

Topology topology = new StreamsBuilder()
//Здесь строим топологию
....build();


//Это за нас делает SPRING-KAFKA
KafkaStreams streams = new KafkaStreams(topology, config);
streams.start();
...
streams.close();

В Спринге достаточно определить две вещи

  • @Bean KafkaStreamsConfiguration

  • @Bean Topology

borisov

И не забудьте про @EnableKafkaStreams

Легенда

betting
  • Идут футбольные матчи (меняется счёт)

  • Делаются ставки: H, D, A.

  • Поток ставок, ключ: Cyprus-Belgium:A

  • Поток ставок, значение:

class Bet {
  String bettor;   //John Doe
  String match;    //Cyprus-Belgium
  Outcome outcome; //A (or H or D)
  long amount;     //100
  double odds;     //1.7
  long timestamp;  //1554215083998
}

@Bean KafkaConfiguration

//ВАЖНО!
@Bean(name =
    KafkaStreamsDefaultConfiguration
                .DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration getStreamsConfig() {
    Map<String, Object> props = new HashMap<>();
    //ВАЖНО!
    props.put(StreamsConfig.APPLICATION_ID_CONFIG,
        "stateless-demo-app");
    //ВАЖНО!
    props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4);
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    ...
    KafkaStreamsConfiguration streamsConfig =
            new KafkaStreamsConfiguration(props);
    return streamsConfig;
}

@Bean NewTopic

@Bean
NewTopic getFilteredTopic() {
    Map<String, String> props = new HashMap<>();
    props.put(
      TopicConfig.CLEANUP_POLICY_CONFIG,
      TopicConfig.CLEANUP_POLICY_COMPACT);
    return new NewTopic("mytopic", 10, (short) 1).configs(props);
}

@Bean Topology

yelling topology
@Bean
public Topology createTopology(StreamsBuilder streamsBuilder) {
    KStream<String, Bet> input = streamsBuilder.stream(...);
    KStream<String, Long> gain
            = input.mapValues(v -> Math.round(v.getAmount() * v.getOdds()));
    gain.to(GAIN_TOPIC, Produced.with(Serdes.String(),
                new JsonSerde<>(Long.class)));
    return streamsBuilder.build();
}

TopologyTestDriver: создание

KafkaStreamsConfiguration config = new KafkaConfiguration()
                                        .getStreamsConfig();
StreamsBuilder sb = new StreamsBuilder();
Topology topology = new TopologyConfiguration().createTopology(sb);
TopologyTestDriver topologyTestDriver =
        new TopologyTestDriver(topology,
                               config.asProperties());

TestInput/OutputTopic: создание

TestInputTopic<String, Bet> inputTopic =
        topologyTestDriver.createInputTopic(BET_TOPIC,
            Serdes.String().serializer(),
            new JsonSerde<>(Bet.class).serializer());
TestOutputTopic<String, Long> outputTopic =
        topologyTestDriver.createOutputTopic(GAIN_TOPIC,
            Serdes.String().deserializer(),
            new JsonSerde<>(Long.class).deserializer());

TopologyTestDriver: использование

Bet bet = Bet.builder()
            .bettor("John Doe")
            .match("Germany-Belgium")
            .outcome(Outcome.H)
            .amount(100)
            .odds(1.7).build();

inputTopic.pipeInput(bet.key(), bet);

TopologyTestDriver: использование

TestRecord<String, Long> record = outputTopic.readRecord();

assertEquals(bet.key(), record.key());
assertEquals(170L, record.value().longValue());

Если что-то пошло не так…​

  • default.deserialization.exception.handler — не смогли десериализовать

  • default.production.exception.handler — брокер отверг сообщение (например, оно слишком велико)

failure

Если всё совсем развалилось

streams.setUncaughtExceptionHandler(
  (Thread thread, Throwable throwable) -> {
    . . .
   });
uncaughtexception

В Спринге всё сложнее (см. код)

Состояния приложения KafkaStreams

kstreamsstates

Что ещё нужно знать про stateless-трансформации?

Простое ветвление стримов

Java-стримы так не могут:

KStream<..> foo = ...
KStream<..> bar = foo.mapValues().map... to...
Kstream<..> baz = foo.filter().map... forEach...
simplebranch

Ветвление стримов по условию

Не используйте KStream.branch, используйте KafkaStreamsBrancher!

new KafkaStreamsBrancher<String, String>()
   .branch((key, value) -> value.contains("A"), ks -> ks.to("A"))
   .branch((key, value) -> value.contains("B"), ks -> ks.to("B"))
   .defaultBranch(ks -> ks.to("C"))
   .onTopOf(builder.stream("source"))
   .map(...)
switchbranch

Простое слияние

KStream<String, Integer> foo = ...
KStream<String, Integer> bar = ...
KStream<String, Integer> merge = foo.merge(bar);
merge

Наш план

kafka
  1. Конфигурация приложения. Простые (stateless) трансформации

  2. Трансформации с использованием локального состояния

  3. Дуализм «поток—таблица» и табличные join-ы

  4. Время и оконные операции

kafka

Локальное состояние

Facebook’s RocksDB — что это и зачем?

rocksdb
  • Embedded key/value storage

  • LSM Tree (Log-Structured Merge-Tree)

  • High-performant (data locality)

  • Persistent, optimized for SSD

RocksDB похож на TreeMap<K,V>

  • Сохранение K,V в бинарном формате

  • Лексикографическая сортировка

  • Iterator (snapshot view)

  • Удаление диапазона (deleteRange)

Пишем “Bet Totalling App”

Какова сумма выплат по сделанным ставкам, если сыграет исход?

counting topology

@Bean Topology

KStream<String, Bet> input = streamsBuilder.
    stream(BET_TOPIC, Consumed.with(Serdes.String(),
                      new JsonSerde<>(Bet.class)));

KStream<String, Long> counted =
    new TotallingTransformer()
        .transformStream(streamsBuilder, input);

Суммирование ставок

@Override
public KeyValue<String, Long> transform(String key, Bet value,
                    KeyValueStore<String, Long> stateStore) {
    long current = Optional
        .ofNullable(stateStore.get(key))
        .orElse(0L);
    current += value.getAmount();
    stateStore.put(key, current);
    return KeyValue.pair(key, current);
}

StateStore доступен в тестах

@Test
void testTopology() {
    topologyTestDriver.pipeInput(...);
    topologyTestDriver.pipeInput(...);

    KeyValueStore<String, Long> store =
        topologyTestDriver
        .getKeyValueStore(TotallingTransformer.STORE_NAME);

    assertEquals(..., store.get(...));
    assertEquals(..., store.get(...));
}

Демо: Ребалансировка / репликация

  • Ребалансировка / репликация партиций state при запуске / выключении обработчиков.

Подробнее о ребалансировке

matthias

Сохранение локального состояния в топик

$kafka-topics --zookeeper localhost --describe

Topic:bet-totalling-demo-app-totalling-store-changelog
PartitionCount:10
ReplicationFactor:1
Configs:cleanup.policy=compact
counting topology changelog

Партиционирование и local state

local partitioning oneworker

Партиционирование и local state

local partitioning

Репартиционирование

through
  • Явное при помощи
    through(String topic, Produced<K, V> produced)

  • Неявное при операциях, меняющих ключ + stateful-операциях

Дублирующееся неявное репартиционирование

KStream source = builder.stream("topic1");
KStream mapped = source.map(...);
KTable counts = mapped.groupByKey().aggregate(...);
KStream sink = mapped.leftJoin(counts, ...);
doublethrough

Избавляемся от дублирующегося репартиционирования

KStream source = builder.stream("topic1");
KStream shuffled = source.map(...).through("topic2",..);
KTable counts = shuffled.groupByKey().aggregate(...);
KStream sink = shuffled.leftJoin(counts, ...);
implicitthrough

Ключ лучше лишний раз не трогать

Key only: selectKey

Key and Value

Value Only

map

mapValues

flatMap

flatMapValues

transform

transformValues

flatTransform

flatTransformValues

Подробнее про «лишнее» репартиционирование

guozhang

Наш план

kafka
  1. Конфигурация приложения. Простые (stateless) трансформации

  2. Трансформации с использованием локального состояния

  3. Дуализм «поток—таблица» и табличные join-ы

  4. Время и оконные операции

kafka

Таблицы vs стримы

Местонахождение пользователя

stream table animation latestLocation
Michael G. Noll. Of Streams and Tables in Kafka and Stream Processing

Таблицы vs стримы

Количество посещенных мест

stream table animation numVisitedLocations
Michael G. Noll. Of Streams and Tables in Kafka and Stream Processing

Таблицы vs стримы

Производная и интеграл

\[\huge state(now) = \int\limits_{t=0}^{now} stream(t)\, \mathrm{d}t \quad\quad stream(t) = \frac{\mathrm{d}state(t)}{\mathrm{d}t}\]

Martin Kleppmann, “Designing Data Intensive Applications”

Join

\[\huge (uv)'= u'v + uv'\]
join storages

Переписываем totalling app при помощи KTable

KTable<String, Long> totals = input.groupByKey().aggregate(
    () -> 0L,
    (k, v, a) -> a + Math.round(v.getAmount() * v.getOdds()),
    Materialized.with(Serdes.String(), Serdes.Long())
);
$kafka-topics --zookeeper localhost --describe

Topic:
table2-demo-KSTREAM-AGGREGATE-STATE-STORE-0000000001-changelog
PartitionCount:10
ReplicationFactor:1
Configs:cleanup.policy=compact

Получаем таблицу счетов матчей

KStream<String, Score> scores =
    eventScores.flatMap((k, v) ->
        Stream.of(Outcome.H, Outcome.A).map(o ->
            KeyValue.pair(String.format("%s:%s", k, o), v))
            .collect(Collectors.toList()))
    .mapValues(EventScore::getScore);

KTable<String, Score> tableScores =
    scores.groupByKey(Grouped.with(...).reduce((a, b) -> b);
$kafka-topics --zookeeper localhost --list

table2-demo-KSTREAM-REDUCE-STATE-STORE-0000000006-repartition
table2-demo-KSTREAM-REDUCE-STATE-STORE-0000000006-changelog

Демо: Объединяем сумму ставок с текущим счётом

KTable<String, String> joined =
    totals.join(tableScores,
            (total, eventScore) ->
                String.format("(%s)\t%d", eventScore, total));

Копартиционирование

Join работает

copart norm

Несовпадение количества партиций

Join не работает (Runtime Exception)

copart diff

Несовпадение алгоритма партицирования

Join не работает молча!

copart diff algorithm

GlobalKTable

Реплицируется всюду целиком

GlobalKTable<...> global = streamsBuilder.globalTable("global", ...);
globalktable

Операции между стримами и таблицами: сводка

streams stateful operations

Виды Join-ов: Table-Table

table table

Виды Join-ов: Table-Table

table table1

Виды Join-ов: Table-Table

table table2

Виды Join-ов: Stream-Table

stream table

Виды Join-ов: Stream-Stream

stream stream

Наш план

kafka
  1. Конфигурация приложения. Простые (stateless) трансформации

  2. Трансформации с использованием локального состояния

  3. Дуализм «поток—таблица» и табличные join-ы

  4. Время и оконные операции

kafka

Сохранение Timestamped-значений в RocksDB

WindowKeySchema.java

static Bytes toStoreKeyBinary(byte[] serializedKey,
                              long timestamp,
                              int seqnum) {
    ByteBuffer buf = ByteBuffer.allocate(
                                serializedKey.length
                                + TIMESTAMP_SIZE
                                + SEQNUM_SIZE);
    buf.put(serializedKey);
    buf.putLong(timestamp);
    buf.putInt(seqnum);
    return Bytes.wrap(buf.array());
}

Быстрое извлечение значений по ключу из диапазона времени

timestamped record

Демо: Windowed Joins

  • «Послегольщик» — игрок, пытающийся протолкнуть правильную ставку в момент смены счёта в матче

  • Штамп времени ставки и события смены счёта должны «почти совпадать».

livebet

Время, вперёд!

KStream<String, Bet> bets = streamsBuilder.stream(BET_TOPIC,
    Consumed.with(
            Serdes...)
            .withTimestampExtractor(

                (record, previousTimestamp) ->
                    ((Bet) record.value()).getTimestamp()

            ));

(Ещё время можно извлечь из WallClock и RecordMetadata.)

Демо: Windowed Joins

По событию смены счёта понимаем, какая ставка будет «правильной»:

Score current = Optional.ofNullable(stateStore.get(key))
                .orElse(new Score());
stateStore.put(key, value.getScore());

Outcome currenOutcome =
    value.getScore().getHome() > current.getHome()
    ?
    Outcome.H : Outcome.A;

Демо: Windowed Joins

KStream<String, String> join = bets.join(outcomes,
    (bet, sureBet) ->

    String.format("%s %dms before goal",
                bet.getBettor(),
                sureBet.getTimestamp() - bet.getTimestamp()),
                JoinWindows.of(Duration.ofSeconds(1)).before(Duration.ZERO),
                StreamJoined.with(Serdes....
    ));

Tumbling window

TimeWindowedKStream<..., ...> windowed =
    stream.groupByKey()
        .windowedBy(TimeWindows.of(Duration.ofSeconds(20)));
tumbling window
Источник: Kafka Streams in Action

Tumbling window

TimeWindowedKStream<..., ...> windowed =
    stream.groupByKey()
        .windowedBy(TimeWindows.of(Duration.ofSeconds(20)));

KTable<Windowed<...>, Long> count = windowed.count();

/*
* Windowed<K> interface:
* - K key()
* - Window window()
* -- Instant startTime()
* -- Instant endTime()
*/

Hopping Window

TimeWindowedKStream<..., ...> windowed =
    stream.groupByKey()
        .windowedBy(TimeWindows.of(Duration.ofSeconds(20))
                        .advanceBy(Duration.ofSeconds(10)));
hopping window
Источник: Kafka Streams in Action

Session Window

SessionWindowedKStream<..., ...> windowed =
    stream.groupByKey()
        .windowedBy(SessionWindows.with(Duration.ofMinutes(5)));
streams session windows 02

Window Retention time vs. Grace Time

window retention

Иногда нужны не окна, а Punctuator

metronome
class MyTransformer implements Transformer<...> {
    @Override
    public void init(ProcessorContext context) {

        context.schedule(
            Duration.ofSeconds(10),
            PunctuationType.WALL_CLOCK_TIME,
            timestamp->{. . .});

    }

Наш план

kafka
  1. Конфигурация приложения. Простые (stateless) трансформации

  2. Трансформации с использованием локального состояния

  3. Дуализм «поток—таблица» и табличные join-ы

  4. Время и оконные операции

kafka

Пора закругляться!

Kafka Streams in Action

KSIA
  • William Bejeck,
    “Kafka Streams in Action”, November 2018

  • Примеры кода для Kafka 1.x

Kafka: The Definitive Guide

kafka the definitive guide
  • Gwen Shapira, Neha Narkhede, Todd Palino

  • September 2017

Другие источники

Сообщества, конференции

Некоторые итоги

  • Kafka StreamsAPI — это удобная абстракция над «сырой» Кафкой

  • Чтобы начать пользоваться, надо настроить мышление под потоковую обработку

  • Технология переживает бурное развитие

    • + живой community, есть шанс повлиять на процесс самому

    • - публичные интерфейсы изменяются очень быстро

На этом всё!

Спасибо!