核心概念

Kafka Streams 是一个客户端库,用于处理和分析存储在 Kafka 中的数据。 它建立在重要的流处理概念之上,例如正确区分事件时间和处理时间、窗口支持以及简单而高效的管理和实时查询应用程序状态。

Kafka Streams 的进入门槛很低:您可以在单台机器上快速编写和运行小规模的概念验证;您只需在多台计算机上运行应用程序的其他实例,即可扩展到大批量生产工作负载。 Kafka Streams 利用 Kafka 的并行模型以透明方式处理同一应用程序的多个实例的负载均衡。

Kafka Streams 的一些亮点:

  • 设计为简单轻量级的客户端库,可以轻松嵌入到任何 Java 应用程序中,并与用户用于其流式处理应用程序的任何现有打包、部署和操作工具集成。
  • 除了作为内部消息传递层的 Apache Kafka 本身之外,对系统没有外部依赖性;值得注意的是,它使用 Kafka 的分区模型来水平扩展处理,同时保持强大的排序保证。
  • 支持容错本地状态,从而实现非常快速和高效的有状态操作,如窗口联接和聚合。
  • 支持恰好一次处理语义,以保证每条记录将被处理一次,并且只处理一次,即使在处理过程中 Streams 客户端或 Kafka 代理出现故障也是如此。
  • 采用一次一条记录的处理来实现毫秒级处理延迟,并支持基于事件时间的窗口操作,记录无序到达。
  • 提供必要的流处理原语,以及高级 Streams DSL低级处理器 API

我们首先总结了 Kafka Streams 的关键概念。

流处理拓扑

  • 是 Kafka Streams 提供的最重要的抽象:它表示一个无界的、不断更新的数据集。流是有序、可重放且容错的不可变数据记录序列,其中数据记录定义为键值对。
  • 流处理应用程序是使用 Kafka Streams 库的任何程序。它通过一个或多个处理器拓扑定义其计算逻辑,其中处理器拓扑是通过流(边缘)连接的流处理器(节点)的图形。
  • 流处理器是处理器拓扑中的一个节点;它表示一个处理步骤,通过一次从拓扑中的上游处理器接收一条输入记录,将其操作应用于该记录,然后可能向其下游处理器生成一个或多个输出记录,从而在流中转换数据。
拓扑中有两个特殊处理器:
  • 源处理器:源处理器是一种特殊类型的流处理器,没有任何上游处理器。它通过使用来自这些主题的记录并将其转发到其下游处理器,从一个或多个 Kafka 主题生成到其拓扑的输入流。
  • 接收器处理器:接收器处理器是一种特殊类型的流处理器,没有下游处理器。它将从其上游处理器收到的任何记录发送到指定的 Kafka 主题。
请注意,在正常的处理器节点中,在处理当前记录时也可以访问其他远程系统。因此,处理后的结果可以流式传输回 Kafka 或写入外部系统。

Kafka Streams 提供了两种定义流处理拓扑的方法:Kafka Streams DSL 提供 最常见的数据转换操作,例如 、 和 开箱即用;较低级别的处理器 API 允许 开发人员定义和连接自定义处理器,以及与状态存储交互。mapfilterjoinaggregations

处理器拓扑只是流处理代码的逻辑抽象。 在运行时,逻辑拓扑在应用程序内部实例化和复制,以便进行并行处理(有关详细信息,请参阅 Stream Partitions and Tasks)。

时间

流处理中的一个关键方面是时间的概念,以及时间的建模和集成方式。 例如,某些操作(如窗口化)是根据时间边界定义的。

流中常见的时间概念是:

  • 事件时间 - 事件或数据记录发生的时间点,即最初在“源”处创建。例:如果事件是汽车中 GPS 传感器报告的地理位置变化,则关联的事件时间将是 GPS 传感器捕获位置变化的时间。
  • 处理时间 - 事件或数据记录恰好由流处理应用程序处理的时间点,即使用记录时。处理时间可能比原始事件时间晚几毫秒、几小时或几天等。例:想象一下,一个分析应用程序读取并处理汽车传感器报告的地理位置数据,并将其呈现给车队管理仪表板。在这里,分析应用程序中的处理时间可能是事件时间后的毫秒或秒(例如,对于基于 Apache Kafka 和 Kafka Streams 的实时管道)或小时(例如,对于基于 Apache Hadoop 或 Apache Spark 的批处理管道)。
  • 摄取时间 - Kafka 代理将事件或数据记录存储在主题分区中的时间点。与事件时间的不同之处在于,此摄取时间戳是在 Kafka 代理将记录附加到目标主题时生成的,而不是在“在源处”创建记录时生成的。与处理时间的区别在于,处理时间是流处理应用程序处理记录的时间。例如,如果从未处理过某条记录,则没有处理时间的概念,但它仍然具有摄取时间。

event-time 和 ingestion-time 之间的选择实际上是通过 Kafka(而不是 Kafka Streams)的配置完成的:从 Kafka 0.10.x 开始,时间戳会自动嵌入到 Kafka 消息中。根据 Kafka 的配置,这些时间戳表示 event-time 或 ingestion-time。可以在代理级别或按主题指定相应的 Kafka 配置设置。Kafka Streams 中的默认时间戳提取器将按原样检索这些嵌入的时间戳。因此,应用程序的有效时间语义取决于这些嵌入式时间戳的有效 Kafka 配置。

Kafka Streams 通过接口为每个数据记录分配一个时间戳。 这些每条记录的时间戳描述了流在时间方面的进度,并由与时间相关的操作(如窗口操作)利用。 因此,只有当新记录到达处理器时,此时间才会提前。 我们将此数据驱动时间称为应用程序的流时间,以区分此应用程序实际执行时的默认时钟时间。 然后,接口的具体实现将为流时间定义提供不同的语义。 例如,根据数据记录的实际内容(例如嵌入的时间戳字段)检索或计算时间戳,以提供事件时间语义。 并返回当前挂钟时间,从而产生流时间的处理时间语义。 因此,开发人员可以根据其业务需求实施不同的时间概念。TimestampExtractorTimestampExtractor

最后,每当 Kafka Streams 应用程序将记录写入 Kafka 时,它还会为这些新记录分配时间戳。时间戳的分配方式取决于上下文:

  • 当通过处理某些输入记录(例如,在函数调用中触发)生成新的输出记录时,输出记录时间戳将直接从输入记录时间戳继承。context.forward()process()
  • 当通过周期性函数生成新的输出记录时,例如 ,输出记录时间戳定义为流任务的当前内部时间(通过 获取)。Punctuator#punctuate()context.timestamp()
  • 对于聚合,结果更新记录的时间戳将是影响结果的所有输入记录的最大时间戳。

您可以通过分配 timestamps 在调用 .#forward()

对于聚合和联接,时间戳是使用 遵循规则。

  • 对于具有 left 和 right 的连接 (stream-stream, table-table) input records 时,将分配输出记录的时间戳。max(left.ts, right.ts)
  • 对于流表联接,将为输出记录分配时间戳 从 Stream 记录。
  • 对于聚合,Kafka Streams 还会计算每个键的所有记录的时间戳,无论是全局的(对于非窗口化的) 或每个窗口。max
  • 对于无状态操作,将传递输入记录时间戳。 For 和发出多条记录的同级,则所有 output records 从相应的 input 记录继承 timestamp。flatMap

流和表的对偶性

在实践中实施流处理使用案例时,您通常需要数据库。 在实践中非常常见的一个示例用例是电子商务应用程序,它丰富了传入的客户群 具有数据库表中最新客户信息的事务。换句话说,流无处不在,但数据库也无处不在。

因此,任何流处理技术都必须为流和表提供一流的支持。 Kafka 的 Streams API 通过其对 StreamsTable 的核心抽象提供了这样的功能。 我们稍后会讨论。现在,一个有趣的观察是,流和表之间实际上存在着密切的关系, 所谓的 stream-table 对偶性。Kafka 以多种方式利用了这种二元性:例如,使您的应用程序具有弹性, 为了支持容错状态处理, 或者针对应用程序的最新处理结果运行交互式查询。而且,除了内部使用之外,Kafka Streams API 还允许开发人员在自己的应用程序中利用这种二元性。

在我们讨论 Kafka Streams 中的聚合等概念之前,我们必须首先更详细地介绍,并讨论上述流-表对偶性。 从本质上讲,这种对偶性意味着流可以被视为表,而表也可以被视为流。例如,Kafka 的日志压缩功能就利用了这种二元性。

表的一种简单形式是键值对的集合,也称为映射或关联数组。此类表可能如下所示:

流-表对偶性描述了 流和表。
  • 流作为表:流可以被视为 表中,其中流中的每个数据记录都捕获一个状态更改 的桌子。因此,流是伪装的表,它可以是 通过重放 changelog 从 从 beginning 到 end 重新构造表。同样,在 more 一般类比,在流中聚合数据记录 - 例如 计算用户从 pageview events - 将返回一个表格(此处包含键和 value 作为用户及其对应的网页浏览量, 分别)。
  • Table as Stream:表可以被视为快照,位于 时间点,流中每个键的最新值(一个 stream 的数据记录是键值对)。因此,表是一个 变相的 stream 中,它很容易变成 “真实” stream 通过迭代表中的每个键值条目。

让我们用一个例子来说明这一点。假设有一个表跟踪 按用户划分的网页浏览总数(下图的第一列)。多 time,每当处理新的 pageview 事件时,该 table 会相应地更新。在这里,状态在不同的 时间点 - 以及表格的不同版本 - 可以是 表示为 changelog 流(第二列)。

有趣的是,由于 stream-table 对偶性,相同的 stream 可用于重建原始表(第三列):

例如,使用相同的机制通过 更改数据捕获 (CDC),并在 Kafka Streams 中复制其 跨计算机进行所谓的状态存储,以实现容错。流表 二元性是一个非常重要的概念,因此 Kafka Streams 明确地对其进行了建模 通过 KStream、KTable 和 GlobalKTable 接口。

聚合

聚合操作采用一个输入流或表,并通过将多个输入记录合并到单个输出记录中来生成一个新表。聚合的示例包括 Calculating counts 或 sum。

在 中,an 的输入流可以是 KStream 或 KTable,但输出流将始终是 KTable。这允许 Kafka Streams 在生成和发出值后,在更多记录无序到达时更新聚合值。当发生此类无序到达时,聚合 KStream 或 KTable 会发出新的聚合值。由于输出是 KTable,因此在后续处理步骤中,新值被视为使用相同的键覆盖旧值。Kafka Streams DSLaggregation

窗口

窗口化允许您控制如何对具有相同键的记录进行分组,以进行有状态操作,例如 OR 到所谓的窗口。按记录键跟踪窗口。aggregationsjoins

Windowing operations在 .使用窗口时,您可以为窗口指定宽限期。此宽限期控制 Kafka Streams 将等待给定窗口的无序数据记录的时间。如果记录在窗口的宽限期过后到达,则该记录将被丢弃,并且不会在该窗口中进行处理。具体来说,如果记录的时间戳指示该记录属于某个窗口,但当前流时间大于窗口结束时间加上宽限期,则会丢弃该记录。Kafka Streams DSL

无序记录在现实世界中总是可能的,应该在你的应用程序中适当地考虑。这取决于处理无序记录的有效方式。在处理时间的情况下,语义是“当记录被处理时”,这意味着无序记录的概念不适用,因为根据定义,没有记录可以乱序。因此,无序记录只能被视为事件时间。在这两种情况下,Kafka Streams 都能够正确处理无序记录。time semantics

国家

某些流处理应用程序不需要状态,这意味着消息的处理独立于 所有其他消息的处理。 但是,能够维护状态为复杂的流处理应用程序开辟了许多可能性:您 可以联接输入流,或者对数据记录进行分组和聚合。许多这样的有状态 Operator 由 Kafka Streams DSL 提供。

Kafka Streams 提供所谓的状态存储,流处理应用程序可以使用它来存储和查询数据。 在实施有状态操作时,这是一项重要的功能。 Kafka Streams 中的每个任务都嵌入了一个或多个状态存储,可以通过 API 访问这些状态存储,以存储和查询处理所需的数据。 这些状态存储可以是持久化键值存储、内存中的 hashmap 或其他方便的数据结构。 Kafka Streams 为本地状态存储提供容错和自动恢复功能。

Kafka Streams 允许创建状态存储的流处理应用程序外部的方法、线程、进程或应用程序对状态存储进行直接只读查询。这是通过一个名为 Interactive Queries 的功能提供的。所有 store 都已命名,并且 Interactive Queries 仅公开底层实现的读取操作。


加工保证

在流处理中,最常见的问题之一是“我的流处理系统是否保证每条记录只处理一次,即使在处理过程中遇到一些故障? 对于许多无法容忍任何数据丢失或数据重复的应用程序来说,不能保证 exactly-once 流处理会破坏交易,在这种情况下,通常会额外使用面向批处理的框架 到流处理管道,称为 Lambda 架构。 在 0.11.0.0 之前,Kafka 仅提供至少一次交付保证,因此任何将其用作后端存储的流处理系统都无法保证端到端的恰好一次语义。 事实上,即使那些声称支持恰好一次处理的流处理系统,只要它们以 source / sink 作为 Kafka 进行读取/写入,其应用程序实际上也无法保证 在整个管道中不会生成重复项。
自 0.11.0.0 版本以来,Kafka 增加了支持,允许其生产者以事务性和幂等的方式将消息发送到不同的主题分区。 因此,Kafka Streams 通过利用这些功能添加了端到端的 Exactly-once 处理语义。 更具体地说,它保证对于从源 Kafka 主题读取的任何记录,其处理结果将只反映在输出 Kafka 主题以及状态操作的状态存储中一次。 请注意,Kafka Streams 端到端的 Exactly-once 保证与其他流处理框架声称的保证之间的主要区别在于,Kafka Streams 与底层 Kafka 存储系统紧密集成,并确保 对输入主题的提交偏移量、对状态存储的更新和对输出主题的写入将以原子方式完成,而不是将 Kafka 视为可能具有副作用的外部系统。 有关如何在 Kafka Streams 中完成此操作的更多信息,请参阅 KIP-129
从 2.6.0 版本开始,Kafka Streams 支持改进的 exactly-once 处理实现,称为“exactly-once v2”。 这需要 Broker 版本 2.5.0 或更高版本。 此实现效率更高,因为它降低了客户端和代理资源利用率,例如客户端线程和使用的网络连接。 它支持更高的吞吐量和改进的可扩展性。 从 3.0.0 版本开始,exactly-once 的第一个版本已被弃用。鼓励用户对 从现在开始进行 Exactly-once 处理,并在必要时通过升级他们的代理来做好准备。 有关如何在代理和 Kafka Streams 中完成此操作的更多信息,请参阅 KIP-447
要在运行 Kafka Streams 应用程序时启用 exactly-once 语义, 设置 Config 值(默认值为 at_least_once) 设置为 StreamsConfig.EXACTLY_ONCE_V2(需要 Brokers 版本 2.5 或更高版本)。 有关更多信息,请参阅 Kafka Streams 配置部分。
processing.guarantee

乱序处理

除了保证每条记录都将被处理一次之外,许多流处理应用程序将面临的另一个问题是如何 处理可能影响其业务逻辑的无序数据。在 Kafka Streams 中,有两个原因可能 导致数据到达相对于其时间戳的顺序错误:

  • 在主题分区中,记录的时间戳不能随其偏移量单调增加。由于 Kafka Streams 将始终尝试按照偏移量顺序处理 topic-partition 中的记录,因此 这可能会导致同一主题分区中时间戳较大(但偏移量较小)的记录比时间戳较小(但偏移量较大)的记录更早得到处理。
  • 在可能正在处理多个主题分区的流任务中,如果用户将应用程序配置为不等待所有分区都包含一些缓冲数据,并且 从时间戳最小的分区中选择以处理下一条记录,然后稍后当为其他主题分区获取某些记录时,它们的时间戳可能小于从另一个主题分区获取的已处理记录。

对于无状态操作,乱序数据不会影响处理逻辑,因为一次只考虑一条记录,而不查看过去处理记录的历史记录; 但是,对于聚合和联接等有状态操作,无序数据可能会导致处理逻辑不正确。如果用户想要处理此类无序数据,通常他们需要允许他们的应用程序 等待更长的时间,同时在 await 时间内记账 state,即在 latency、cost 和 correctness 之间做出权衡决策。 特别是在 Kafka Streams 中,用户可以为窗口聚合配置其窗口运算符以实现此类权衡(详细信息可在开发人员指南中找到)。 对于 Join,用户可以使用版本化的状态存储来解决对无序数据的问题,但默认情况下不会处理无序数据:

  • 对于 Stream-Stream 联接,所有三种类型(inner、outer、left)都可以正确处理无序记录。
  • 对于 Stream-Table 连接,如果不使用版本控制的存储,则不会处理无序记录(即,Streams 应用程序不检查无序记录,只按偏移顺序处理所有记录), 因此,它可能会产生不可预测的结果。对于版本控制存储,流端无序数据将通过在表中执行基于时间戳的查找来正确处理。仍未处理表端无序数据。
  • 对于 Table-Table 连接,如果不使用版本控制的存储,则不会处理无序记录(即,Streams 应用程序不检查无序记录,而只按偏移顺序处理所有记录)。 但是,join 结果是一个 changelog 流,因此最终将是一致的。使用版本控制存储,表-表联接语义从基于偏移量的语义更改为基于时间戳的语义,并相应地处理无序记录。