编写 Streams 应用程序
目录
任何使用 Kafka Streams 库的 Java 或 Scala 应用程序都被视为 Kafka Streams 应用程序。 Kafka Streams 应用程序的计算逻辑定义为处理器拓扑, 这是流处理器 (节点) 和流 (边缘) 的图形。
您可以使用 Kafka Streams API 定义处理器拓扑:
- Kafka Streams DSL
- 一个高级 API,提供最常见的数据转换操作,例如 、 、 和开箱即用的操作。DSL 是 Kafka Streams 新手的推荐起点,应涵盖许多使用案例和流处理需求。如果您正在编写 Scala 应用程序,则可以使用 Kafka Streams DSL for Scala 库,该库删除了许多 Java/Scala 互操作性样板,而不是直接使用 Java DSL。
map
filter
join
aggregations
- 处理器 API
- 一个低级 API,允许您添加和连接处理器,以及直接与状态存储交互。Processor API 为您提供了比 DSL 更大的灵活性,但代价是需要应用程序开发人员进行更多的手动工作(例如,更多的代码行)。
库和 Maven 工件
本节列出了可用于编写 Kafka Streams 应用程序的 Kafka Streams 相关库。
您可以为 Kafka Streams 应用程序定义以下库的依赖项。
组 ID | 工件 ID | 版本 | 描述 |
---|---|---|---|
org.apache.kafka |
kafka-streams |
{{fullDotVersion}} |
(必填)Kafka Streams 的基础库。 |
org.apache.kafka |
kafka-clients |
{{fullDotVersion}} |
(必填)Kafka 客户端库。包含内置序列化器/反序列化器。 |
org.apache.kafka |
kafka-streams-scala |
{{fullDotVersion}} |
(可选)Kafka Streams DSL for Scala 库,用于编写 Scala Kafka Streams 应用程序。当不使用 SBT 时,您需要使用应用程序正在使用的正确 Scala 版本(,_2.12 _2.13 ) |
提示
有关 Serializers/Deserializers 的更多信息,请参见 数据类型和序列化 部分。
使用 Maven 时的示例代码片段:pom.xml
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>{{fullDotVersion}}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>{{fullDotVersion}}</version>
</dependency>
<!-- Optionally include Kafka Streams DSL for Scala for Scala {{scalaVersion}} -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams-scala_{{scalaVersion}}</artifactId>
<version>{{fullDotVersion}}</version>
</dependency>
在应用程序代码中使用 Kafka Streams
您可以从应用程序代码中的任何位置调用 Kafka Streams,但通常这些调用是在
您的应用程序或其某些变体。在应用程序中定义处理拓扑的基本元素
如下所述。main()
首先,您必须创建一个 实例。KafkaStreams
- 构造函数的第一个参数采用用于定义拓扑的拓扑(用于 DSL 或处理器 API)。
KafkaStreams
StreamsBuilder#build()
Topology
- 第二个参数是 的实例,它定义此特定拓扑的配置。
java.util.Properties
代码示例:
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.kstream.StreamsBuilder;
import org.apache.kafka.streams.processor.Topology;
// Use the builders to define the actual processing topology, e.g. to specify
// from which input topics to read, which stream operations (filter, map, etc.)
// should be called, and so on. We will cover this in detail in the subsequent
// sections of this Developer Guide.
StreamsBuilder builder = ...; // when using the DSL
Topology topology = builder.build();
//
// OR
//
Topology topology = ...; // when using the Processor API
// Use the configuration to tell your application where the Kafka cluster is,
// which Serializers/Deserializers to use by default, to specify security settings,
// and so on.
Properties props = ...;
KafkaStreams streams = new KafkaStreams(topology, props);
此时,内部结构已初始化,但处理尚未开始。
您必须通过调用以下方法显式启动 Kafka Streams 线程:KafkaStreams#start()
// Start the Kafka Streams threads
streams.start();
如果此流处理应用程序的其他实例在其他地方运行(例如,在另一台机器上),则 Kafka Streams 以透明方式将现有实例中的任务重新分配给您刚刚启动的新实例。 有关更多信息,请参阅流分区和任务和线程模型。
要捕获任何意外异常,您可以在启动
应用。每当流线程因意外异常终止时,都会调用此处理程序:java.lang.Thread.UncaughtExceptionHandler
// Java 8+, using lambda expressions
streams.setUncaughtExceptionHandler((Thread thread, Throwable throwable) -> {
// here you should examine the throwable/exception and perform an appropriate action!
});
要停止应用程序实例,请调用以下方法:KafkaStreams#close()
// Stop the Kafka Streams threads
streams.close();
要允许您的应用程序正常关闭以响应 SIGTERM,建议您添加 shutdown 钩子
并调用 。KafkaStreams#close
以下是 Java 8+ 中的关闭钩子示例:
// Add shutdown hook to stop the Kafka Streams threads.
// You can optionally provide a timeout to `close`.
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
应用程序停止后,Kafka Streams 会将此实例中运行的所有任务迁移到剩余的可用任务 实例。