Ivan Ponomarev, John Roesler
![]() | Ivan Ponomarev:
|
![]() | John Roesler:
|
Purpose: cover testing methodologies for Kafka Streams
"Unit" Testing: TopologyTestDriver
Integration Testing: KafkaStreams
Start with motivating example (from Ivan’s production)
A flawed testing approach: unit testing doesn’t work for this example
Deep-dive into the testing framework
Correctly testing the example with integration tests
Save different source IDs in the database

Too many writes to the database

Let’s deduplicate using Kafka Streams!

TopologyTestDriver
TopologyTestDriver capabilities
What is being sent/received | TestInputTopic methods | TestOutputTopic methods |
A single value |
|
|
A key/value pair |
|
|
TopologyTestDriver capabilities
What is being sent/received | TestInputTopic methods | TestOutputTopic methods |
A list of values |
|
|
A list of key/value pairs |
|
|
TopologyTestDriver capabilities
What is being sent/received | TestInputTopic methods | TestOutputTopic methods |
A list of Records |
|
|
Spring Boot app
Let’s do some test-driven development and first write a test
Writing a test with TTDriver





writing the topology
TopologyTestDriver test is green
Should we run this in production?

Kafka Streams | TopologyTestDriver |
is a big data streaming framework | is a fast, deterministic testing framework |
Kafka Streams | TopologyTestDriver |
is a big data streaming framework
| is a fast, deterministic testing framework |
Kafka Streams | TopologyTestDriver |
is a big data streaming framework
| is a fast, deterministic testing framework
|
Caching in Kafka Streams
don’t immediately emit every aggregation result
"soak up" repeated updates to the same key’s aggregation
configure cache size: max.bytes.buffering (10MB)
configure cache flush interval: commit.interval.ms (30s)
emit latest result on flush or eviction






TopologyTestDriver vs. Kafka Streams execution loop















What are other problems that can’t be surfaced with TopologyTestDriver?
TopologyTestDriver: single partition

TopologyTestDriver: "Fused" subtopologiesToplogyTestDriver

Kafka Streams

stream-stream joins can behave differently (pipeInput order vs. timestamp order)
logic that depends on stream time (such as suppress) can behave differently
![]() | ![]() |






EmbeddedKafka
TestContainers

EmbeddedKafka | TestContainers |
|
|
Writing TestContainers test
An easy part: pushing messages to Kafka
A not so easy part: how do we check the output?
Deduplication: the correct implementation
Now the test is green, but takes 5 seconds!
List actual = new ArrayList<>();
while (true) {
ConsumerRecords<String, String> records =
KafkaTestUtils.getRecords(consumer, 5000 /* timeout in ms */);
if (records.isEmpty()) break;
for (ConsumerRecord<String, String> rec : records) {
actual.add(rec.value());
}
}
assertEquals(List.of("A", "B"), actual);Awaitility.await().atMost(10, SECONDS).until(
() -> List.of("A", "B").equals(actual));

Awaitility.await().atMost(10, SECONDS).until(
() -> List.of("A", "B").equals(actual));

Cooperative termination
Thread-safe data structure
Green test runs faster
We can wait for extra 5 seconds (bad choice)
We can put a 'marker record' at the end of the input and wait for it to appear in the output (not always possible)
Both TopologyTestDriver and integration tests are needed
Write unit tests with TopologyTestDriver. When it fails to surface the problem, use integration tests.
Know the limitations of TopologyTestDriver.
Understand the difficulties and limitations of asynchronous testing.

Confluent blog: Testing Kafka Streams – A Deep Dive
pro.kafka: Russian Kafka chat in Telegram: https://t.me/proKafka
Confluent community Slack: https://cnfl.io/slack