测试 Kafka Streams
导入测试 公用事业
为了测试 Kafka Streams 应用程序,Kafka 提供了一个 test-utils 工件,可以作为常规
依赖项添加到您的测试代码库。使用 Maven 时的示例代码片段:pom.xml
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams-test-utils</artifactId>
<version>{{fullDotVersion}}</version>
<scope>test</scope>
</dependency>
测试 Streams 应用程序
test-utils 包提供了一个可以通过手动组装的管道数据
使用处理器 API 或通过 DSL 使用 .
测试驱动程序模拟库运行时,该运行时持续从输入主题和
通过遍历拓扑来处理它们。
您可以使用测试驱动程序来验证指定的处理器拓扑是否计算出正确的结果
与手动管道 数据记录。
测试驱动程序捕获结果记录并允许查询其嵌入式状态存储。TopologyTestDriver
Topology
StreamsBuilder
// Processor API
Topology topology = new Topology();
topology.addSource("sourceProcessor", "input-topic");
topology.addProcessor("processor", ..., "sourceProcessor");
topology.addSink("sinkProcessor", "output-topic", "processor");
// or
// using DSL
StreamsBuilder builder = new StreamsBuilder();
builder.stream("input-topic").filter(...).to("output-topic");
Topology topology = builder.build();
// create test driver
TopologyTestDriver testDriver = new TopologyTestDriver(topology);
使用测试驱动程序,您可以创建给定的主题名称和相应的序列化程序。 提供各种方法来管道化新的消息值、键和值或 KeyValue 对象列表。TestInputTopic
TestInputTopic
TestInputTopic<String, Long> inputTopic = testDriver.createInputTopic("input-topic", stringSerde.serializer(), longSerde.serializer());
inputTopic.pipeInput("key", 42L);
要验证输出,您可以在初始化期间使用配置主题和相应反序列化器的位置。
它提供了帮助程序方法,用于仅读取结果记录或记录集合的某些部分。
例如,您可以使用标准断言验证 return
如果您只关心 key 和 value,而不关心 result 记录的时间戳。TestOutputTopic
KeyValue
TestOutputTopic<String, Long> outputTopic = testDriver.createOutputTopic("output-topic", stringSerde.deserializer(), longSerde.deserializer());
assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("key", 42L)));
TopologyTestDriver
也支持标点符号。
事件时间标点符号将根据已处理记录的时间戳自动触发。
挂钟时间标点符号也可以通过提前测试驱动程序的挂钟时间 (
driver 在内部模拟 wall-clock-time 以让用户控制它)。
testDriver.advanceWallClockTime(Duration.ofSeconds(20));
此外,您可以在测试之前或之后通过测试驱动程序访问状态存储。 在测试之前访问 store 对于使用一些初始值预填充 store 很有用。 处理完数据后,可以验证存储的预期更新。
KeyValueStore store = testDriver.getKeyValueStore("store-name");
请注意,您应该始终在最后关闭测试驱动程序,以确保释放所有资源 适当地。
testDriver.close();
例
下面的示例演示如何使用 test driver 和 helper 类。 该示例创建一个拓扑,该拓扑使用 key-value-store 计算每个键的最大值。 处理时,不会生成任何输出,而只会更新存储。 输出仅根据事件时间和挂钟标点符号向下游发送。
private TopologyTestDriver testDriver;
private TestInputTopic<String, Long> inputTopic;
private TestOutputTopic<String, Long> outputTopic;
private KeyValueStore<String, Long> store;
private Serde<String> stringSerde = new Serdes.StringSerde();
private Serde<Long> longSerde = new Serdes.LongSerde();
@Before
public void setup() {
Topology topology = new Topology();
topology.addSource("sourceProcessor", "input-topic");
topology.addProcessor("aggregator", new CustomMaxAggregatorSupplier(), "sourceProcessor");
topology.addStateStore(
Stores.keyValueStoreBuilder(
Stores.inMemoryKeyValueStore("aggStore"),
Serdes.String(),
Serdes.Long()).withLoggingDisabled(), // need to disable logging to allow store pre-populating
"aggregator");
topology.addSink("sinkProcessor", "result-topic", "aggregator");
// setup test driver
Properties props = new Properties();
props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass().getName());
testDriver = new TopologyTestDriver(topology, props);
// setup test topics
inputTopic = testDriver.createInputTopic("input-topic", stringSerde.serializer(), longSerde.serializer());
outputTopic = testDriver.createOutputTopic("result-topic", stringSerde.deserializer(), longSerde.deserializer());
// pre-populate store
store = testDriver.getKeyValueStore("aggStore");
store.put("a", 21L);
}
@After
public void tearDown() {
testDriver.close();
}
@Test
public void shouldFlushStoreForFirstInput() {
inputTopic.pipeInput("a", 1L);
assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("a", 21L)));
assertThat(outputTopic.isEmpty(), is(true));
}
@Test
public void shouldNotUpdateStoreForSmallerValue() {
inputTopic.pipeInput("a", 1L);
assertThat(store.get("a"), equalTo(21L));
assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("a", 21L)));
assertThat(outputTopic.isEmpty(), is(true));
}
@Test
public void shouldNotUpdateStoreForLargerValue() {
inputTopic.pipeInput("a", 42L);
assertThat(store.get("a"), equalTo(42L));
assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("a", 42L)));
assertThat(outputTopic.isEmpty(), is(true));
}
@Test
public void shouldUpdateStoreForNewKey() {
inputTopic.pipeInput("b", 21L);
assertThat(store.get("b"), equalTo(21L));
assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("a", 21L)));
assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("b", 21L)));
assertThat(outputTopic.isEmpty(), is(true));
}
@Test
public void shouldPunctuateIfEvenTimeAdvances() {
final Instant recordTime = Instant.now();
inputTopic.pipeInput("a", 1L, recordTime);
assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("a", 21L)));
inputTopic.pipeInput("a", 1L, recordTime);
assertThat(outputTopic.isEmpty(), is(true));
inputTopic.pipeInput("a", 1L, recordTime.plusSeconds(10L));
assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("a", 21L)));
assertThat(outputTopic.isEmpty(), is(true));
}
@Test
public void shouldPunctuateIfWallClockTimeAdvances() {
testDriver.advanceWallClockTime(Duration.ofSeconds(60));
assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("a", 21L)));
assertThat(outputTopic.isEmpty(), is(true));
}
public class CustomMaxAggregatorSupplier implements ProcessorSupplier<String, Long> {
@Override
public Processor<String, Long> get() {
return new CustomMaxAggregator();
}
}
public class CustomMaxAggregator implements Processor<String, Long> {
ProcessorContext context;
private KeyValueStore<String, Long> store;
@SuppressWarnings("unchecked")
@Override
public void init(ProcessorContext context) {
this.context = context;
context.schedule(Duration.ofSeconds(60), PunctuationType.WALL_CLOCK_TIME, time -> flushStore());
context.schedule(Duration.ofSeconds(10), PunctuationType.STREAM_TIME, time -> flushStore());
store = (KeyValueStore<String, Long>) context.getStateStore("aggStore");
}
@Override
public void process(String key, Long value) {
Long oldValue = store.get(key);
if (oldValue == null || value > oldValue) {
store.put(key, value);
}
}
private void flushStore() {
KeyValueIterator<String, Long> it = store.all();
while (it.hasNext()) {
KeyValue<String, Long> next = it.next();
context.forward(next.key, next.value);
}
}
@Override
public void close() {}
}
单元测试处理器
如果您编写了一个 Processor,您将需要对其进行测试。
因为 the 将其结果转发给上下文而不是返回它们,
单元测试需要一个能够捕获转发数据以供检查的模拟上下文。
因此,我们在 test-utils
中提供了一个。Processor
MockProcessorContext
首先,实例化你的处理器并使用 mock 上下文初始化它:
final Processor processorUnderTest = ...;
final MockProcessorContext context = new MockProcessorContext();
processorUnderTest.init(context);
如果你需要将配置传递给你的处理器或设置默认的 serdes,你可以使用
config:捕获的数据final Properties props = new Properties();
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
props.put("some.other.config", "some config value");
final MockProcessorContext context = new MockProcessorContext(props);
mock 将捕获处理器转发的任何值。你可以对它们进行断言:
processorUnderTest.process("key", "value");
final Iterator<CapturedForward> forwarded = context.forwarded().iterator();
assertEquals(forwarded.next().keyValue(), new KeyValue<>(..., ...));
assertFalse(forwarded.hasNext());
// you can reset forwards to clear the captured data. This may be helpful in constructing longer scenarios.
context.resetForwards();
assertEquals(context.forwarded().size(), 0);
如果您的处理器转发到特定的子处理器,您可以通过以下方式查询捕获数据的上下文
child name:模拟还会捕获您的处理器是否已调用上下文:设置记录元数据final List<CapturedForward> captures = context.forwarded("childProcessorName");
commit()
assertTrue(context.committed());
// commit captures can also be reset.
context.resetCommit();
assertFalse(context.committed());
如果您的处理器逻辑依赖于记录元数据(主题、分区、偏移量或时间戳), 您可以在上下文中一起或单独设置它们:
context.setRecordMetadata("topicName", /*partition*/ 0, /*offset*/ 0L, /*timestamp*/ 0L);
context.setTopic("topicName");
context.setPartition(0);
context.setOffset(0L);
context.setTimestamp(0L);
设置这些值后,上下文将继续返回相同的值,直到您设置新的值。 状态存储
如果你的标点符号是有状态的,则 mock 上下文允许你注册状态存储。 建议您使用适当类型的简单内存存储(KeyValue、Windowed 或 Session),因为 mock 上下文不管理变更日志、状态目录等。
final KeyValueStore<String, Integer> store =
Stores.keyValueStoreBuilder(
Stores.inMemoryKeyValueStore("myStore"),
Serdes.String(),
Serdes.Integer()
)
.withLoggingDisabled() // Changelog is not supported by MockProcessorContext.
.build();
store.init(context, store);
context.register(store, /*deprecated parameter*/ false, /*parameter unused in mock*/ null);
验证标点符号
处理器可以安排标点符号来处理定期任务。 模拟上下文不会自动执行标点符号,但它会将它们捕获到 还允许你对它们进行单元测试:
final MockProcessorContext.CapturedPunctuator capturedPunctuator = context.scheduledPunctuators().get(0);
final long interval = capturedPunctuator.getIntervalMs();
final PunctuationType type = capturedPunctuator.getType();
final boolean cancelled = capturedPunctuator.cancelled();
final Punctuator punctuator = capturedPunctuator.getPunctuator();
punctuator.punctuate(/*timestamp*/ 0L);
如果您需要编写涉及自动触发预定标点符号的测试,我们建议创建一个
使用处理器的简单拓扑并使用 TopologyTestDriver
。