@inponomarev
Иван Пономарёв, КУРС/МФТИ
|
Только Java (отказались от Jython)
Maven Plugin
JUnit5 extension
Spring Boot Starter
Web-scraping в реальном времени
500 запросов/сек (да, это очень мало!)
Удобные штуки «из коробки»:
Персистентный, но «подрезаемый» лог
Microbatching
Real-time stream processing
Stream-like API (map / reduce)
Под капотом:
Ребалансировка
Внутреннее состояние обработчиков (репликация)
Легкое масштабирование
Доклады о жизни в production:
Номер партиции
Ключ
Значение
|
StreamsConfig config = ...;
//Здесь устанавливаем всякие опции
Topology topology = new StreamsBuilder()
//Здесь строим топологию
....build();
Топология — конвейер обработчиков:
StreamsConfig config = ...;
//Здесь устанавливаем всякие опции
Topology topology = new StreamsBuilder()
//Здесь строим топологию
....build();
//Это за нас делает SPRING-KAFKA
KafkaStreams streams = new KafkaStreams(topology, config);
streams.start();
...
streams.close();
@Bean KafkaStreamsConfiguration
@Bean Topology
И не забудьте про |
|
//ВАЖНО!
@Bean(name =
KafkaStreamsDefaultConfiguration
.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration getStreamsConfig() {
Map<String, Object> props = new HashMap<>();
//ВАЖНО!
props.put(StreamsConfig.APPLICATION_ID_CONFIG,
"stateless-demo-app");
//ВАЖНО!
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
...
KafkaStreamsConfiguration streamsConfig =
new KafkaStreamsConfiguration(props);
return streamsConfig;
}
@Bean
NewTopic getFilteredTopic() {
Map<String, String> props = new HashMap<>();
props.put(
TopicConfig.CLEANUP_POLICY_CONFIG,
TopicConfig.CLEANUP_POLICY_COMPACT);
return new NewTopic("mytopic", 10, (short) 1).configs(props);
}
@Bean
public Topology createTopology(StreamsBuilder streamsBuilder) {
KStream<String, Bet> input = streamsBuilder.stream(...);
KStream<String, Long> gain
= input.mapValues(v -> Math.round(v.getAmount() * v.getOdds()));
gain.to(GAIN_TOPIC, Produced.with(Serdes.String(),
new JsonSerde<>(Long.class)));
return streamsBuilder.build();
}
KafkaStreamsConfiguration config = new KafkaConfiguration()
.getStreamsConfig();
StreamsBuilder sb = new StreamsBuilder();
Topology topology = new TopologyConfiguration().createTopology(sb);
TopologyTestDriver topologyTestDriver =
new TopologyTestDriver(topology,
config.asProperties());
TestInputTopic<String, Bet> inputTopic =
topologyTestDriver.createInputTopic(BET_TOPIC,
Serdes.String().serializer(),
new JsonSerde<>(Bet.class).serializer());
TestOutputTopic<String, Long> outputTopic =
topologyTestDriver.createOutputTopic(GAIN_TOPIC,
Serdes.String().deserializer(),
new JsonSerde<>(Long.class).deserializer());
Bet bet = Bet.builder()
.bettor("John Doe")
.match("Germany-Belgium")
.outcome(Outcome.H)
.amount(100)
.odds(1.7).build();
inputTopic.pipeInput(bet.key(), bet);
TestRecord<String, Long> record = outputTopic.readRecord();
assertEquals(bet.key(), record.key());
assertEquals(170L, record.value().longValue());
default.deserialization.exception.handler
— не смогли десериализовать
default.production.exception.handler
— брокер отверг сообщение (например, оно слишком велико)
streams.setUncaughtExceptionHandler(
(Thread thread, Throwable throwable) -> {
. . .
});
В Спринге всё сложнее (см. код)
Java-стримы так не могут:
KStream<..> foo = ...
KStream<..> bar = foo.mapValues(…).map... to...
Kstream<..> baz = foo.filter(…).map... forEach...
Не используйте KStream.branch
, используйте KafkaStreamsBrancher
!
new KafkaStreamsBrancher<String, String>()
.branch((key, value) -> value.contains("A"), ks -> ks.to("A"))
.branch((key, value) -> value.contains("B"), ks -> ks.to("B"))
.defaultBranch(ks -> ks.to("C"))
.onTopOf(builder.stream("source"))
.map(...)
KStream<String, Integer> foo = ...
KStream<String, Integer> bar = ...
KStream<String, Integer> merge = foo.merge(bar);
|
Facebook’s RocksDB — что это и зачем?
|
TreeMap<K,V>
Сохранение K,V в бинарном формате
Лексикографическая сортировка
Iterator (snapshot view)
Удаление диапазона (deleteRange)
Какова сумма выплат по сделанным ставкам, если сыграет исход?
KStream<String, Bet> input = streamsBuilder.
stream(BET_TOPIC, Consumed.with(Serdes.String(),
new JsonSerde<>(Bet.class)));
KStream<String, Long> counted =
new TotallingTransformer()
.transformStream(streamsBuilder, input);
@Override
public KeyValue<String, Long> transform(String key, Bet value,
KeyValueStore<String, Long> stateStore) {
long current = Optional
.ofNullable(stateStore.get(key))
.orElse(0L);
current += value.getAmount();
stateStore.put(key, current);
return KeyValue.pair(key, current);
}
@Test
void testTopology() {
topologyTestDriver.pipeInput(...);
topologyTestDriver.pipeInput(...);
KeyValueStore<String, Long> store =
topologyTestDriver
.getKeyValueStore(TotallingTransformer.STORE_NAME);
assertEquals(..., store.get(...));
assertEquals(..., store.get(...));
}
Ребалансировка / репликация партиций state при запуске / выключении обработчиков.
$kafka-topics --zookeeper localhost --describe
Topic:bet-totalling-demo-app-totalling-store-changelog
PartitionCount:10
ReplicationFactor:1
Configs:cleanup.policy=compact
Явное при помощи
through(String topic, Produced<K, V> produced)
Неявное при операциях, меняющих ключ + stateful-операциях
KStream source = builder.stream("topic1");
KStream mapped = source.map(...);
KTable counts = mapped.groupByKey().aggregate(...);
KStream sink = mapped.leftJoin(counts, ...);
KStream source = builder.stream("topic1");
KStream shuffled = source.map(...).through("topic2",..);
KTable counts = shuffled.groupByKey().aggregate(...);
KStream sink = shuffled.leftJoin(counts, ...);
Key only: selectKey
Key and Value | Value Only |
|
|
|
|
|
|
|
|
|
Местонахождение пользователя
Количество посещенных мест
Производная и интеграл
Martin Kleppmann, “Designing Data Intensive Applications”
KTable<String, Long> totals = input.groupByKey().aggregate(
() -> 0L,
(k, v, a) -> a + Math.round(v.getAmount() * v.getOdds()),
Materialized.with(Serdes.String(), Serdes.Long())
);
$kafka-topics --zookeeper localhost --describe
Topic:
table2-demo-KSTREAM-AGGREGATE-STATE-STORE-0000000001-changelog
PartitionCount:10
ReplicationFactor:1
Configs:cleanup.policy=compact
KStream<String, Score> scores =
eventScores.flatMap((k, v) ->
Stream.of(Outcome.H, Outcome.A).map(o ->
KeyValue.pair(String.format("%s:%s", k, o), v))
.collect(Collectors.toList()))
.mapValues(EventScore::getScore);
KTable<String, Score> tableScores =
scores.groupByKey(Grouped.with(...).reduce((a, b) -> b);
$kafka-topics --zookeeper localhost --list
table2-demo-KSTREAM-REDUCE-STATE-STORE-0000000006-repartition
table2-demo-KSTREAM-REDUCE-STATE-STORE-0000000006-changelog
KTable<String, String> joined =
totals.join(tableScores,
(total, eventScore) ->
String.format("(%s)\t%d", eventScore, total));
Join работает
Join не работает (Runtime Exception)
Join не работает молча!
Реплицируется всюду целиком
GlobalKTable<...> global = streamsBuilder.globalTable("global", ...);
|
WindowKeySchema.java
static Bytes toStoreKeyBinary(byte[] serializedKey,
long timestamp,
int seqnum) {
ByteBuffer buf = ByteBuffer.allocate(
serializedKey.length
+ TIMESTAMP_SIZE
+ SEQNUM_SIZE);
buf.put(serializedKey);
buf.putLong(timestamp);
buf.putInt(seqnum);
return Bytes.wrap(buf.array());
}
«Послегольщик» — игрок, пытающийся протолкнуть правильную ставку в момент смены счёта в матче
Штамп времени ставки и события смены счёта должны «почти совпадать».
KStream<String, Bet> bets = streamsBuilder.stream(BET_TOPIC,
Consumed.with(
Serdes...)
.withTimestampExtractor(
(record, previousTimestamp) ->
((Bet) record.value()).getTimestamp()
));
(Ещё время можно извлечь из WallClock и RecordMetadata.)
По событию смены счёта понимаем, какая ставка будет «правильной»:
Score current = Optional.ofNullable(stateStore.get(key))
.orElse(new Score());
stateStore.put(key, value.getScore());
Outcome currenOutcome =
value.getScore().getHome() > current.getHome()
?
Outcome.H : Outcome.A;
KStream<String, String> join = bets.join(outcomes,
(bet, sureBet) ->
String.format("%s %dms before goal",
bet.getBettor(),
sureBet.getTimestamp() - bet.getTimestamp()),
JoinWindows.of(Duration.ofSeconds(1)).before(Duration.ZERO),
StreamJoined.with(Serdes....
));
TimeWindowedKStream<..., ...> windowed =
stream.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofSeconds(20)));
TimeWindowedKStream<..., ...> windowed =
stream.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofSeconds(20)));
KTable<Windowed<...>, Long> count = windowed.count();
/*
* Windowed<K> interface:
* - K key()
* - Window window()
* -- Instant startTime()
* -- Instant endTime()
*/
TimeWindowedKStream<..., ...> windowed =
stream.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofSeconds(20))
.advanceBy(Duration.ofSeconds(10)));
SessionWindowedKStream<..., ...> windowed =
stream.groupByKey()
.windowedBy(SessionWindows.with(Duration.ofMinutes(5)));
|
|
Пора закругляться!
|
|
Телеграм: Грефневая Кафка
Kafka Summit Conference
Kafka StreamsAPI — это удобная абстракция над «сырой» Кафкой
Чтобы начать пользоваться, надо настроить мышление под потоковую обработку
Технология переживает бурное развитие
+ живой community, есть шанс повлиять на процесс самому
- публичные интерфейсы изменяются очень быстро
Спасибо!