运行 Streams 应用程序

您可以运行使用 Kafka Streams 库的 Java 应用程序,而无需任何其他配置或要求。Kafka 流 还提供接收应用程序各种状态通知的功能。监控运行时的能力 状态 在 监视指南.

启动 Kafka Streams 应用程序

您可以将 Java 应用程序打包为 fat JAR 文件,然后按如下方式启动应用程序:

# Start the application in class `com.example.MyStreamsApp`
# from the fat JAR named `path-to-app-fatjar.jar`.
$ java -cp path-to-app-fatjar.jar com.example.MyStreamsApp

当您启动应用程序时,您将启动应用程序的 Kafka Streams 实例。您可以运行多个 实例的实例。一种常见情况是,应用程序有多个实例在 平行。有关更多信息,请参阅并行度模型

当应用程序实例开始运行时,定义的处理器拓扑将初始化为一个或多个流任务。 如果处理器拓扑定义了任何状态存储,则这些状态存储也会在初始化期间构造。为 有关更多信息,请参阅工作负载再平衡期间的状态还原部分)。

应用程序的弹性扩展

Kafka Streams 使您的流处理应用程序具有弹性和可扩展性。您可以添加和删除处理容量 在应用程序运行时动态运行,而不会造成任何停机或数据丢失。这使得您的应用程序 Resilient in face of failures 和 for 允许您根据需要执行维护(例如滚动升级)。

有关此弹性的更多信息,请参阅 Parallelism Model 部分。Kafka 流 利用 Kafka 组管理功能,该功能直接内置于 Kafka 有线协议中。它是启用 Kafka Streams 应用程序的弹性:组成员在消费和 在 Kafka 中处理数据。此外,Kafka Streams 还提供有状态处理并允许容错 状态。

向应用程序添加容量

如果您的流处理应用程序需要更多的处理能力,则只需启动流处理应用程序的另一个实例(例如在另一台计算机上)即可进行扩展。您的应用程序的实例将相互识别并自动开始共享处理工作。更具体地说,将从现有实例移交给新实例的是现有实例已运行的(部分)流任务。将流任务从一个实例移动到另一个实例会导致移动处理工作以及这些流任务的任何内部状态(流任务的状态将通过从相应的更改日志主题恢复状态,在目标实例中重新创建流任务的状态)。

应用程序的各个实例都在自己的 JVM 进程中运行,这意味着每个实例都可以利用其各自的 JVM 进程可用的所有处理能力(减去应用程序的任何非 Kafka-Streams 部分可能正在使用的容量)。这解释了为什么运行额外的实例将为您的应用程序提供额外的处理容量。当然,通过运行新实例将添加的确切容量取决于运行新实例的环境:可用的 CPU 内核、可用的主内存和 Java 堆空间、本地存储、网络带宽等。同样,如果您停止应用程序的任何正在运行的实例,则您正在删除并释放相应的处理容量。

在添加容量之前:只有 Kafka Streams 应用程序的单个实例正在运行。此时,您的应用程序对应的 Kafka 使用者组仅包含一个成员(此实例)。所有数据都由此单个实例读取和处理。

添加容量后:现在,您的 Kafka Streams 应用程序的另外两个实例正在运行,并且它们已自动加入应用程序的 Kafka 使用者组,总共有三个当前成员。这三个实例会自动在彼此之间拆分处理工作。拆分基于从中读取数据的 Kafka 主题分区。

从应用程序中删除容量

要删除处理容量,您可以停止运行流处理应用程序实例(例如,关闭两个 这四个实例),它将自动离开应用程序的 Consumer 组,其余的 您的应用程序将自动接管处理工作。其余实例将接管 由已停止的实例运行。将流任务从一个实例移动到另一个实例会导致移动处理 work 以及这些流任务的任何内部状态。在目标实例中重新创建流任务的状态 来自其 changelog 主题。

工作负载再平衡期间的状态恢复

迁移任务时,任务处理状态将在应用程序实例恢复之前完全恢复 加工。这保证了正确的处理结果。在 Kafka Streams 中,状态恢复通常由 重放相应的 changelog 主题以重建 state store。最小化基于更改日志的还原 latency 使用复制的本地状态存储,您可以指定 .当流任务为 初始化或重新初始化,则其 state store 将恢复,如下所示:num.standby.replicas

  • 如果不存在本地 state store,则从最早的偏移量重放到当前偏移量的变更日志。这会将本地状态存储重建为最新的快照。
  • 如果存在本地 state store,则从之前检查点的偏移量重放更改日志。将应用更改,并将状态还原到最新的快照。此方法花费的时间更少,因为它应用的更改日志的较小部分。

有关更多信息,请参阅备用副本

从版本 2.6 开始,Streams 现在将通过预热副本在后台完成任务的大部分恢复。这些将被分配给需要为任务恢复大量状态的实例。 只有当实例的状态在配置的 acceptable.recovery.lag 范围内(如果存在)时,才会将有状态活动任务分配给实例。这意味着 大多数情况下,任务迁移会导致该任务停机。它将在已经赶上 的实例上保持活动状态,而它正在 migrated 以恢复状态。Streams 将定期探测已完成还原的预热任务,并在准备就绪时将其转换为活动任务。

请注意,此任务可用性的一个例外是,没有任何实例具有该任务的赶上版本。在这种情况下,我们别无选择,只能将 active task 添加到未赶上并且必须阻止从更改日志中恢复 task 状态的进一步处理。如果高可用性很重要 对于您的应用程序,强烈建议您启用 Standbys。

确定要运行的应用程序实例数

Kafka Streams 应用程序的并行度主要取决于输入主题具有的分区数。为 例如,如果您的应用程序从具有 10 个分区的单个主题中读取数据,则您最多可以运行 10 个实例 的应用程序。您可以运行更多实例,但这些实例将是空闲的。

主题分区的数量是 Kafka Streams 应用程序的并行度上限,而 应用程序的运行实例数。

要在应用程序实例之间实现平衡的工作负载处理并防止处理热点,您应该 分发数据和处理工作负载:

  • 数据应在主题分区之间平均分布。例如,如果两个主题分区各有 100 万条消息,这比一个分区有 200 万条消息而另一个分区中没有消息要好。
  • 处理工作负载应在主题分区之间平均分配。例如,如果处理消息的时间变化很大,则最好将处理密集型消息分散到多个分区中,而不是将这些消息存储在同一个分区中。