Kafka Streams Testing

A Deep Dive

Ivan Ponomarev, John Roesler

Who Are We

ivan

Ivan Ponomarev:

  • Software Engineer at KURS, tutor at MIPT

  • Apache Kafka Contributor

Who Are We

john

John Roesler:

  • Software Engineer at Confluent

  • Apache Kafka Committer and PMC member

Kafka Streams Testing: A Deep Dive

  1. Purpose: cover testing methodologies for Kafka Streams

    • "Unit" Testing: TopologyTestDriver

    • Integration Testing: KafkaStreams

  2. Start with motivating example (from Ivan’s production)

  3. A flawed testing approach: unit testing doesn’t work for this example

  4. Deep-dive into the testing framework

  5. Correctly testing the example with integration tests

The task

Save different source IDs in the database

dedup problem1.png

The problem

Too many writes to the database

dedup problem2.png

The solution

Let’s deduplicate using Kafka Streams!

dedup solution.png

TopologyTestDriver

ttd

TopologyTestDriver capabilities

pipeinput.png

What is being sent/received

TestInputTopic methods

TestOutputTopic methods

A single value

pipeInput (V)

V readValue ()

A key/value pair

pipeInput (K, V)

KeyValue<K,V> readKeyValue()

TopologyTestDriver capabilities

pipelist.png

What is being sent/received

TestInputTopic methods

TestOutputTopic methods

A list of values

pipeValueList (List<V>)

List<V> readValuesToList()

A list of key/value pairs

pipeKeyValueList (List<KeyValue<K,V>>)

List<KeyValue<K,V>> readKeyValuesToList()

Map<K,V> readKeyValuesToMap()

TopologyTestDriver capabilities

piperecords.png

What is being sent/received

TestInputTopic methods

TestOutputTopic methods

A list of Records

pipeRecordList (List<? extends TestRecord<K, V>>)

List<TestRecord<K, V>> readRecordsToList()

Demo

  1. Spring Boot app

  2. Let’s do some test-driven development and first write a test

  3. Writing a test with TTDriver

A "Simple Solution"

wrong topology1.png

A "Simple Solution"

wrong topology2.png

A "Simple Solution"

wrong topology3.png

A "Simple Solution"

wrong topology4.png

A "Simple Solution"

wrong topology5.png

Demo

  • writing the topology

  • TopologyTestDriver test is green

Tests are green

build

Should we run this in production?

What we saw in production:

monitoring

Why it’s not working

Kafka Streams

TopologyTestDriver

is a big data streaming framework

is a fast, deterministic testing framework

Why it’s not working

Kafka Streams

TopologyTestDriver

is a big data streaming framework

  • designed for high throughput

  • throughput demands batching, buffering, caching, etc.

  • caching is the culprit in this example

is a fast, deterministic testing framework

Why it’s not working

Kafka Streams

TopologyTestDriver

is a big data streaming framework

  • designed for high throughput

  • throughput demands batching, buffering, caching, etc.

  • caching is the culprit in this example

is a fast, deterministic testing framework

  • designed for synchronous, immediate results

  • flush cache after every update

Why it’s not working

Caching in Kafka Streams

  • don’t immediately emit every aggregation result

  • "soak up" repeated updates to the same key’s aggregation

  • configure cache size: max.bytes.buffering (10MB)

  • configure cache flush interval: commit.interval.ms (30s)

  • emit latest result on flush or eviction

Why it’s not working

cache1.png

Why it’s not working

cache1b.png

Why it’s not working

cache2.png

Why it’s not working

cache2b.png

Why it’s not working

cache3.png

Why it’s not working

cache3b.png

Demo

TopologyTestDriver vs. Kafka Streams execution loop

Kafka Streams execution loop

streamsloop1.png

Kafka Streams execution loop

streamsloop2.png

Kafka Streams execution loop

streamsloop3.png

Kafka Streams execution loop

streamsloop4.png

Kafka Streams execution loop

streamsloop5.png

Kafka Streams execution loop

streamsloop6.png

Kafka Streams execution loop

streamsloop7.png

Kafka Streams execution loop

streamsloop8.png

TopologyTestDriver execution loop

ttdloop1.png

TopologyTestDriver execution loop

ttdloop2.png

TopologyTestDriver execution loop

ttdloop3.png

TopologyTestDriver execution loop

ttdloop4.png

TopologyTestDriver execution loop

ttdloop5.png

TopologyTestDriver execution loop

ttdloop6.png

TopologyTestDriver execution loop

ttdloop7.png

What else?

What are other problems that can’t be surfaced with TopologyTestDriver?

TopologyTestDriver: single partition

copart norm.png

Kafka Streams: co-partitioning problems

copart diff algorithm.png

TopologyTestDriver: "Fused" subtopologies

ToplogyTestDriver

fused.png

Kafka Streams

repart.png

Timing

  • stream-stream joins can behave differently (pipeInput order vs. timestamp order)

  • logic that depends on stream time (such as suppress) can behave differently

timing1.png
timing2.png

Should we trust StackOverflow?

so

Using Transformer

transformer1.png

Using Transformer

transformer2.png

Using Transformer

transformer4.png

Using Transformer

transformer5.png

Using Transformer

transformer6.png

Let’s run tests on real Kafka!

  • EmbeddedKafka

  • TestContainers

testcontainers

EmbeddedKafka vs TestContainers

EmbeddedKafka

TestContainers

  • Pro:

    • Just pull in a dependency

  • Contra:

    • Pulls in Scala

    • Runs in the same JVM

  • Pro

    • Runs Kafka isolated in Docker

    • Not only for Kafka testing

  • Contra

    • Needs Docker

    • Requires some time for the first start

Demo

  • Writing TestContainers test

    • An easy part: pushing messages to Kafka

    • A not so easy part: how do we check the output?

Demo

  • Deduplication: the correct implementation

  • Now the test is green, but takes 5 seconds!

Does it have to be so slow?

List actual = new ArrayList<>();

while (true) {
  ConsumerRecords<String, String> records =
    KafkaTestUtils.getRecords(consumer, 5000 /* timeout in ms */);
  if (records.isEmpty()) break;
  for (ConsumerRecord<String, String> rec : records) {
    actual.add(rec.value());
  }
}

assertEquals(List.of("A", "B"), actual);

Awaitility

Awaitility.await().atMost(10, SECONDS).until(
                 () -> List.of("A", "B").equals(actual));
awaitility pass

Awaitility

Awaitility.await().atMost(10, SECONDS).until(
                 () -> List.of("A", "B").equals(actual));
awaitility fail

Things we must keep in mind

  • Cooperative termination

  • Thread-safe data structure

Demo

  • Green test runs faster

Will any extra messages appear?

  • We can wait for extra 5 seconds (bad choice)

  • We can put a 'marker record' at the end of the input and wait for it to appear in the output (not always possible)

Summary

  • Both TopologyTestDriver and integration tests are needed

  • Write unit tests with TopologyTestDriver. When it fails to surface the problem, use integration tests.

  • Know the limitations of TopologyTestDriver.

  • Understand the difficulties and limitations of asynchronous testing.

KIP-655 is under discussion

kip 655

Thank you!

Kafka Streams Testing

A Deep Dive

Ivan Ponomarev, John Roesler

Who Are We

ivan

Ivan Ponomarev:

  • Software Engineer at KURS, tutor at MIPT

  • Apache Kafka Contributor

Who Are We

john

John Roesler:

  • Software Engineer at Confluent

  • Apache Kafka Committer and PMC member

Kafka Streams Testing: A Deep Dive

  1. Purpose: cover testing methodologies for Kafka Streams

    • "Unit" Testing: TopologyTestDriver

    • Integration Testing: KafkaStreams

  2. Start with motivating example (from Ivan’s production)

  3. A flawed testing approach: unit testing doesn’t work for this example

  4. Deep-dive into the testing framework

  5. Correctly testing the example with integration tests

The task

Save different source IDs in the database

dedup problem1.png

The problem

Too many writes to the database

dedup problem2.png

The solution

Let’s deduplicate using Kafka Streams!

dedup solution.png

TopologyTestDriver

ttd

TopologyTestDriver capabilities

pipeinput.png

What is being sent/received

TestInputTopic methods

TestOutputTopic methods

A single value

pipeInput (V)

V readValue ()

A key/value pair

pipeInput (K, V)

KeyValue<K,V> readKeyValue()

TopologyTestDriver capabilities

pipelist.png

What is being sent/received

TestInputTopic methods

TestOutputTopic methods

A list of values

pipeValueList (List<V>)

List<V> readValuesToList()

A list of key/value pairs

pipeKeyValueList (List<KeyValue<K,V>>)

List<KeyValue<K,V>> readKeyValuesToList()

Map<K,V> readKeyValuesToMap()

TopologyTestDriver capabilities

piperecords.png

What is being sent/received

TestInputTopic methods

TestOutputTopic methods

A list of Records

pipeRecordList (List<? extends TestRecord<K, V>>)

List<TestRecord<K, V>> readRecordsToList()

Demo

  1. Spring Boot app

  2. Let’s do some test-driven development and first write a test

  3. Writing a test with TTDriver

A "Simple Solution"

wrong topology1.png

A "Simple Solution"

wrong topology2.png

A "Simple Solution"

wrong topology3.png

A "Simple Solution"

wrong topology4.png

A "Simple Solution"

wrong topology5.png

Demo

  • writing the topology

  • TopologyTestDriver test is green

Tests are green

build

Should we run this in production?

What we saw in production:

monitoring

Why it’s not working

Kafka Streams

TopologyTestDriver

is a big data streaming framework

is a fast, deterministic testing framework

Why it’s not working

Kafka Streams

TopologyTestDriver

is a big data streaming framework

  • designed for high throughput

  • throughput demands batching, buffering, caching, etc.

  • caching is the culprit in this example

is a fast, deterministic testing framework

Why it’s not working

Kafka Streams

TopologyTestDriver

is a big data streaming framework

  • designed for high throughput

  • throughput demands batching, buffering, caching, etc.

  • caching is the culprit in this example

is a fast, deterministic testing framework

  • designed for synchronous, immediate results

  • flush cache after every update

Why it’s not working

Caching in Kafka Streams

  • don’t immediately emit every aggregation result

  • "soak up" repeated updates to the same key’s aggregation

  • configure cache size: max.bytes.buffering (10MB)

  • configure cache flush interval: commit.interval.ms (30s)

  • emit latest result on flush or eviction

Why it’s not working

cache1.png

Why it’s not working

cache1b.png

Why it’s not working

cache2.png

Why it’s not working

cache2b.png

Why it’s not working

cache3.png

Why it’s not working

cache3b.png

Demo

TopologyTestDriver vs. Kafka Streams execution loop

Kafka Streams execution loop

streamsloop1.png

Kafka Streams execution loop

streamsloop2.png

Kafka Streams execution loop

streamsloop3.png

Kafka Streams execution loop

streamsloop4.png

Kafka Streams execution loop

streamsloop5.png

Kafka Streams execution loop

streamsloop6.png

Kafka Streams execution loop

streamsloop7.png

Kafka Streams execution loop

streamsloop8.png

TopologyTestDriver execution loop

ttdloop1.png

TopologyTestDriver execution loop

ttdloop2.png

TopologyTestDriver execution loop

ttdloop3.png

TopologyTestDriver execution loop

ttdloop4.png

TopologyTestDriver execution loop

ttdloop5.png

TopologyTestDriver execution loop

ttdloop6.png

TopologyTestDriver execution loop

ttdloop7.png

What else?

What are other problems that can’t be surfaced with TopologyTestDriver?

TopologyTestDriver: single partition

copart norm.png

Kafka Streams: co-partitioning problems

copart diff algorithm.png

TopologyTestDriver: "Fused" subtopologies

ToplogyTestDriver

fused.png

Kafka Streams

repart.png

Timing

  • stream-stream joins can behave differently (pipeInput order vs. timestamp order)

  • logic that depends on stream time (such as suppress) can behave differently

timing1.png
timing2.png

Should we trust StackOverflow?

so

Using Transformer

transformer1.png

Using Transformer

transformer2.png

Using Transformer

transformer4.png

Using Transformer

transformer5.png

Using Transformer

transformer6.png

Let’s run tests on real Kafka!

  • EmbeddedKafka

  • TestContainers

testcontainers

EmbeddedKafka vs TestContainers

EmbeddedKafka

TestContainers

  • Pro:

    • Just pull in a dependency

  • Contra:

    • Pulls in Scala

    • Runs in the same JVM

  • Pro

    • Runs Kafka isolated in Docker

    • Not only for Kafka testing

  • Contra

    • Needs Docker

    • Requires some time for the first start

Demo

  • Writing TestContainers test

    • An easy part: pushing messages to Kafka

    • A not so easy part: how do we check the output?

Demo

  • Deduplication: the correct implementation

  • Now the test is green, but takes 5 seconds!

Does it have to be so slow?

List actual = new ArrayList<>();

while (true) {
  ConsumerRecords<String, String> records =
    KafkaTestUtils.getRecords(consumer, 5000 /* timeout in ms */);
  if (records.isEmpty()) break;
  for (ConsumerRecord<String, String> rec : records) {
    actual.add(rec.value());
  }
}

assertEquals(List.of("A", "B"), actual);

Awaitility

Awaitility.await().atMost(10, SECONDS).until(
                 () -> List.of("A", "B").equals(actual));
awaitility pass

Awaitility

Awaitility.await().atMost(10, SECONDS).until(
                 () -> List.of("A", "B").equals(actual));
awaitility fail

Things we must keep in mind

  • Cooperative termination

  • Thread-safe data structure

Demo

  • Green test runs faster

Will any extra messages appear?

  • We can wait for extra 5 seconds (bad choice)

  • We can put a 'marker record' at the end of the input and wait for it to appear in the output (not always possible)

Summary

  • Both TopologyTestDriver and integration tests are needed

  • Write unit tests with TopologyTestDriver. When it fails to surface the problem, use integration tests.

  • Know the limitations of TopologyTestDriver.

  • Understand the difficulties and limitations of asynchronous testing.

KIP-655 is under discussion

kip 655

Thank you!

Kafka Streams Testing: A Deep Dive

Kafka Streams Testing

A Deep Dive

Ivan Ponomarev, John Roesler

Who Are We

ivan

Ivan Ponomarev:

  • Software Engineer at KURS, tutor at MIPT

  • Apache Kafka Contributor

Who Are We

john

John Roesler:

  • Software Engineer at Confluent

  • Apache Kafka Committer and PMC member

Kafka Streams Testing: A Deep Dive

  1. Purpose: cover testing methodologies for Kafka Streams

    • "Unit" Testing: TopologyTestDriver

    • Integration Testing: KafkaStreams

  2. Start with motivating example (from Ivan’s production)

  3. A flawed testing approach: unit testing doesn’t work for this example

  4. Deep-dive into the testing framework

  5. Correctly testing the example with integration tests

The task

Save different source IDs in the database

dedup problem1.png

The problem

Too many writes to the database

dedup problem2.png

The solution

Let’s deduplicate using Kafka Streams!

dedup solution.png

TopologyTestDriver

ttd

TopologyTestDriver capabilities

pipeinput.png

What is being sent/received

TestInputTopic methods

TestOutputTopic methods

A single value

pipeInput (V)

V readValue ()

A key/value pair

pipeInput (K, V)

KeyValue<K,V> readKeyValue()

TopologyTestDriver capabilities

pipelist.png

What is being sent/received

TestInputTopic methods

TestOutputTopic methods

A list of values

pipeValueList (List<V>)

List<V> readValuesToList()

A list of key/value pairs

pipeKeyValueList (List<KeyValue<K,V>>)

List<KeyValue<K,V>> readKeyValuesToList()

Map<K,V> readKeyValuesToMap()

TopologyTestDriver capabilities

piperecords.png

What is being sent/received

TestInputTopic methods

TestOutputTopic methods

A list of Records

pipeRecordList (List<? extends TestRecord<K, V>>)

List<TestRecord<K, V>> readRecordsToList()

Demo

  1. Spring Boot app

  2. Let’s do some test-driven development and first write a test

  3. Writing a test with TTDriver

A "Simple Solution"

wrong topology1.png

A "Simple Solution"

wrong topology2.png

A "Simple Solution"

wrong topology3.png

A "Simple Solution"

wrong topology4.png

A "Simple Solution"

wrong topology5.png

Demo

  • writing the topology

  • TopologyTestDriver test is green

Tests are green

build

Should we run this in production?

What we saw in production:

monitoring

Why it’s not working

Kafka Streams

TopologyTestDriver

is a big data streaming framework

is a fast, deterministic testing framework

Why it’s not working

Kafka Streams

TopologyTestDriver

is a big data streaming framework

  • designed for high throughput

  • throughput demands batching, buffering, caching, etc.

  • caching is the culprit in this example

is a fast, deterministic testing framework

Why it’s not working

Kafka Streams

TopologyTestDriver

is a big data streaming framework

  • designed for high throughput

  • throughput demands batching, buffering, caching, etc.

  • caching is the culprit in this example

is a fast, deterministic testing framework

  • designed for synchronous, immediate results

  • flush cache after every update

Why it’s not working

Caching in Kafka Streams

  • don’t immediately emit every aggregation result

  • "soak up" repeated updates to the same key’s aggregation

  • configure cache size: max.bytes.buffering (10MB)

  • configure cache flush interval: commit.interval.ms (30s)

  • emit latest result on flush or eviction

Why it’s not working

cache1.png

Why it’s not working

cache1b.png

Why it’s not working

cache2.png

Why it’s not working

cache2b.png

Why it’s not working

cache3.png

Why it’s not working

cache3b.png

Demo

TopologyTestDriver vs. Kafka Streams execution loop

Kafka Streams execution loop

streamsloop1.png

Kafka Streams execution loop

streamsloop2.png

Kafka Streams execution loop

streamsloop3.png

Kafka Streams execution loop

streamsloop4.png

Kafka Streams execution loop

streamsloop5.png

Kafka Streams execution loop

streamsloop6.png

Kafka Streams execution loop

streamsloop7.png

Kafka Streams execution loop

streamsloop8.png

TopologyTestDriver execution loop

ttdloop1.png

TopologyTestDriver execution loop

ttdloop2.png

TopologyTestDriver execution loop

ttdloop3.png

TopologyTestDriver execution loop

ttdloop4.png

TopologyTestDriver execution loop

ttdloop5.png

TopologyTestDriver execution loop

ttdloop6.png

TopologyTestDriver execution loop

ttdloop7.png

What else?

What are other problems that can’t be surfaced with TopologyTestDriver?

TopologyTestDriver: single partition

copart norm.png

Kafka Streams: co-partitioning problems

copart diff algorithm.png

TopologyTestDriver: "Fused" subtopologies

ToplogyTestDriver

fused.png

Kafka Streams

repart.png

Timing

  • stream-stream joins can behave differently (pipeInput order vs. timestamp order)

  • logic that depends on stream time (such as suppress) can behave differently

timing1.png
timing2.png

Should we trust StackOverflow?

so

Using Transformer

transformer1.png

Using Transformer

transformer2.png

Using Transformer

transformer4.png

Using Transformer

transformer5.png

Using Transformer

transformer6.png

Let’s run tests on real Kafka!

  • EmbeddedKafka

  • TestContainers

testcontainers

EmbeddedKafka vs TestContainers

EmbeddedKafka

TestContainers

  • Pro:

    • Just pull in a dependency

  • Contra:

    • Pulls in Scala

    • Runs in the same JVM

  • Pro

    • Runs Kafka isolated in Docker

    • Not only for Kafka testing

  • Contra

    • Needs Docker

    • Requires some time for the first start

Demo

  • Writing TestContainers test

    • An easy part: pushing messages to Kafka

    • A not so easy part: how do we check the output?

Demo

  • Deduplication: the correct implementation

  • Now the test is green, but takes 5 seconds!

Does it have to be so slow?

List actual = new ArrayList<>();

while (true) {
  ConsumerRecords<String, String> records =
    KafkaTestUtils.getRecords(consumer, 5000 /* timeout in ms */);
  if (records.isEmpty()) break;
  for (ConsumerRecord<String, String> rec : records) {
    actual.add(rec.value());
  }
}

assertEquals(List.of("A", "B"), actual);

Awaitility

Awaitility.await().atMost(10, SECONDS).until(
                 () -> List.of("A", "B").equals(actual));
awaitility pass

Awaitility

Awaitility.await().atMost(10, SECONDS).until(
                 () -> List.of("A", "B").equals(actual));
awaitility fail

Things we must keep in mind

  • Cooperative termination

  • Thread-safe data structure

Demo

  • Green test runs faster

Will any extra messages appear?

  • We can wait for extra 5 seconds (bad choice)

  • We can put a 'marker record' at the end of the input and wait for it to appear in the output (not always possible)

Summary

  • Both TopologyTestDriver and integration tests are needed

  • Write unit tests with TopologyTestDriver. When it fails to surface the problem, use integration tests.

  • Know the limitations of TopologyTestDriver.

  • Understand the difficulties and limitations of asynchronous testing.

KIP-655 is under discussion

kip 655

Thank you!