编写 Streams 应用程序

目录

任何使用 Kafka Streams 库的 Java 或 Scala 应用程序都被视为 Kafka Streams 应用程序。 Kafka Streams 应用程序的计算逻辑定义为处理器拓扑, 这是流处理器 (节点) 和流 (边缘) 的图形。

您可以使用 Kafka Streams API 定义处理器拓扑:

Kafka Streams DSL
一个高级 API,提供最常见的数据转换操作,例如 、 、 和开箱即用的操作。DSL 是 Kafka Streams 新手的推荐起点,应涵盖许多使用案例和流处理需求。如果您正在编写 Scala 应用程序,则可以使用 Kafka Streams DSL for Scala 库,该库删除了许多 Java/Scala 互操作性样板,而不是直接使用 Java DSL。mapfilterjoinaggregations
处理器 API
一个低级 API,允许您添加和连接处理器,以及直接与状态存储交互。Processor API 为您提供了比 DSL 更大的灵活性,但代价是需要应用程序开发人员进行更多的手动工作(例如,更多的代码行)。

库和 Maven 工件

本节列出了可用于编写 Kafka Streams 应用程序的 Kafka Streams 相关库。

您可以为 Kafka Streams 应用程序定义以下库的依赖项。

组 ID 工件 ID 版本 描述
org.apache.kafka kafka-streams {{fullDotVersion}} (必填)Kafka Streams 的基础库。
org.apache.kafka kafka-clients {{fullDotVersion}} (必填)Kafka 客户端库。包含内置序列化器/反序列化器。
org.apache.kafka kafka-streams-scala {{fullDotVersion}} (可选)Kafka Streams DSL for Scala 库,用于编写 Scala Kafka Streams 应用程序。当不使用 SBT 时,您需要使用应用程序正在使用的正确 Scala 版本(,_2.12_2.13)

提示

有关 Serializers/Deserializers 的更多信息,请参见 数据类型和序列化 部分。

使用 Maven 时的示例代码片段:pom.xml

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>{{fullDotVersion}}</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>{{fullDotVersion}}</version>
</dependency>
<!-- Optionally include Kafka Streams DSL for Scala for Scala {{scalaVersion}} -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams-scala_{{scalaVersion}}</artifactId>
    <version>{{fullDotVersion}}</version>
</dependency>

在应用程序代码中使用 Kafka Streams

您可以从应用程序代码中的任何位置调用 Kafka Streams,但通常这些调用是在 您的应用程序或其某些变体。在应用程序中定义处理拓扑的基本元素 如下所述。main()

首先,您必须创建一个 实例。KafkaStreams

  • 构造函数的第一个参数采用用于定义拓扑的拓扑(用于 DSL处理器 API)。KafkaStreamsStreamsBuilder#build()Topology
  • 第二个参数是 的实例,它定义此特定拓扑的配置。java.util.Properties

代码示例:

import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.kstream.StreamsBuilder;
import org.apache.kafka.streams.processor.Topology;

// Use the builders to define the actual processing topology, e.g. to specify
// from which input topics to read, which stream operations (filter, map, etc.)
// should be called, and so on.  We will cover this in detail in the subsequent
// sections of this Developer Guide.

StreamsBuilder builder = ...;  // when using the DSL
Topology topology = builder.build();
//
// OR
//
Topology topology = ...; // when using the Processor API

// Use the configuration to tell your application where the Kafka cluster is,
// which Serializers/Deserializers to use by default, to specify security settings,
// and so on.
Properties props = ...;

KafkaStreams streams = new KafkaStreams(topology, props);

此时,内部结构已初始化,但处理尚未开始。 您必须通过调用以下方法显式启动 Kafka Streams 线程:KafkaStreams#start()

// Start the Kafka Streams threads
streams.start();

如果此流处理应用程序的其他实例在其他地方运行(例如,在另一台机器上),则 Kafka Streams 以透明方式将现有实例中的任务重新分配给您刚刚启动的新实例。 有关更多信息,请参阅流分区和任务线程模型

要捕获任何意外异常,您可以在启动 应用。每当流线程因意外异常终止时,都会调用此处理程序:java.lang.Thread.UncaughtExceptionHandler

// Java 8+, using lambda expressions
streams.setUncaughtExceptionHandler((Thread thread, Throwable throwable) -> {
  // here you should examine the throwable/exception and perform an appropriate action!
});

要停止应用程序实例,请调用以下方法:KafkaStreams#close()

// Stop the Kafka Streams threads
streams.close();

要允许您的应用程序正常关闭以响应 SIGTERM,建议您添加 shutdown 钩子 并调用 。KafkaStreams#close

以下是 Java 8+ 中的关闭钩子示例:

// Add shutdown hook to stop the Kafka Streams threads.
// You can optionally provide a timeout to `close`.
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));

应用程序停止后,Kafka Streams 会将此实例中运行的所有任务迁移到剩余的可用任务 实例。

测试 Streams 应用程序

Kafka Streams 附带一个模块,可帮助您在此处测试应用程序。test-utils