Ivan Ponomarev, John Roesler
Ivan Ponomarev:
|
John Roesler:
|
Purpose: cover testing methodologies for Kafka Streams
"Unit" Testing: TopologyTestDriver
Integration Testing: KafkaStreams
Start with motivating example (from Ivan’s production)
A flawed testing approach: unit testing doesn’t work for this example
Deep-dive into the testing framework
Correctly testing the example with integration tests
Save different source IDs in the database
Too many writes to the database
Let’s deduplicate using Kafka Streams!
TopologyTestDriver
TopologyTestDriver
capabilitiesWhat is being sent/received | TestInputTopic methods | TestOutputTopic methods |
A single value |
|
|
A key/value pair |
|
|
TopologyTestDriver
capabilitiesWhat is being sent/received | TestInputTopic methods | TestOutputTopic methods |
A list of values |
|
|
A list of key/value pairs |
|
|
TopologyTestDriver
capabilitiesWhat is being sent/received | TestInputTopic methods | TestOutputTopic methods |
A list of Records |
|
|
Spring Boot app
Let’s do some test-driven development and first write a test
Writing a test with TTDriver
writing the topology
TopologyTestDriver test is green
Should we run this in production?
Kafka Streams | TopologyTestDriver |
is a big data streaming framework | is a fast, deterministic testing framework |
Kafka Streams | TopologyTestDriver |
is a big data streaming framework
| is a fast, deterministic testing framework |
Kafka Streams | TopologyTestDriver |
is a big data streaming framework
| is a fast, deterministic testing framework
|
Caching in Kafka Streams
don’t immediately emit every aggregation result
"soak up" repeated updates to the same key’s aggregation
configure cache size: max.bytes.buffering
(10MB)
configure cache flush interval: commit.interval.ms
(30s)
emit latest result on flush or eviction
TopologyTestDriver vs. Kafka Streams execution loop
What are other problems that can’t be surfaced with TopologyTestDriver
?
TopologyTestDriver
: single partitionTopologyTestDriver
: "Fused" subtopologiesToplogyTestDriver
Kafka Streams
stream-stream joins can behave differently (pipeInput
order vs. timestamp order)
logic that depends on stream time (such as suppress
) can behave differently
EmbeddedKafka
TestContainers
EmbeddedKafka | TestContainers |
|
|
Writing TestContainers test
An easy part: pushing messages to Kafka
A not so easy part: how do we check the output?
Deduplication: the correct implementation
Now the test is green, but takes 5 seconds!
List actual = new ArrayList<>();
while (true) {
ConsumerRecords<String, String> records =
KafkaTestUtils.getRecords(consumer, 5000 /* timeout in ms */);
if (records.isEmpty()) break;
for (ConsumerRecord<String, String> rec : records) {
actual.add(rec.value());
}
}
assertEquals(List.of("A", "B"), actual);
Awaitility.await().atMost(10, SECONDS).until(
() -> List.of("A", "B").equals(actual));
Awaitility.await().atMost(10, SECONDS).until(
() -> List.of("A", "B").equals(actual));
Cooperative termination
Thread-safe data structure
Green test runs faster
We can wait for extra 5 seconds (bad choice)
We can put a 'marker record' at the end of the input and wait for it to appear in the output (not always possible)
Both TopologyTestDriver
and integration tests are needed
Write unit tests with TopologyTestDriver
. When it fails to surface the problem, use integration tests.
Know the limitations of TopologyTestDriver
.
Understand the difficulties and limitations of asynchronous testing.
Confluent blog: Testing Kafka Streams – A Deep Dive
pro.kafka: Russian Kafka chat in Telegram: https://t.me/proKafka
Confluent community Slack: https://cnfl.io/slack
Ivan Ponomarev | John Roesler |