升级指南和 API 变更
从任何旧版本升级到 {{fullDotVersion}} 是可能的:如果从 3.4 或更低版本升级,您将需要进行两次滚动退回,其中在第一个滚动退回阶段您设置配置(可能的值为 ),在第二个阶段您将其删除。这是安全处理 3 次更改所必需的。首先是引入嵌入式消费者的新协作再平衡协议。第二个是外键连接序列化格式的更改。
请注意,如果您跳过或延迟第二次滚动反弹,您将继续使用旧的急切再平衡协议,但是一旦整个组处于 2.4+,您就可以通过删除配置值和反弹来随时安全地切换到合作协议。有关更多详细信息,请参阅 KIP-429。第三个是内部重新分区主题的序列化格式的更改。有关更多详细信息,请参阅 KIP-904:upgrade.from="older version"
"0.10.0" - "3.4"
- 为滚动退回准备应用程序实例,并确保将 config 设置为要从中升级的版本。
upgrade.from
- 退回应用程序的每个实例一次
- 为第二轮滚动退回准备新部署的 {{fullDotVersion}} 应用程序实例;确保删除 config 的值
upgrade.from
- 再次退回应用程序的每个实例以完成升级
作为替代方案,也可以进行离线升级。在离线模式下从 0.10.0.x 和 {{fullDotVersion}} 的任何版本升级需要以下步骤:
- 停止所有旧的 (例如 0.10.0.x) 应用程序实例
- 更新您的代码,并将旧代码和 JAR 文件替换为新代码和新 JAR 文件
- 重新启动所有新的 ({{fullDotVersion}}) 应用程序实例
注意:自 2.4 以来,协作式再平衡协议一直是默认协议,但我们继续支持
Eager Rebalancing 协议为用户提供升级路径。此支持将在将来的发行版中取消。
因此,任何仍在使用 Eager 协议的用户都应该准备在 3.1 版中完成将他们的应用程序升级到 Cooperative 协议。
这仅影响仍在使用 2.4 以上版本的用户,以及已升级但尚未升级的用户
删除了他们在从 2.4 以下版本升级时设置的配置。
适合后一种情况的用户在升级到 3.1 之后时只需取消设置此配置,
而前一种情况下的用户如果尝试从 2.3 或更低版本升级到 3.1 以上的版本,则需要遵循略有不同的升级路径。
这些应用程序需要通过 Bridge 版本,首先升级到 2.4 - 3.1 之间的版本并设置配置,
然后删除该配置并升级到 3.1 以上的最终版本。有关更多详细信息,请参阅 KAFKA-8575。upgrade.from
upgrade.from
有关显示 Streams API 与 Kafka 代理版本兼容性的表,请参阅代理兼容性。
过去版本中的显著兼容性更改
从 3.5.x 或更高版本降级到 3.4.x 或更早版本需要特别注意:
从 3.5.0 版本开始,Kafka Streams 对重新分区主题使用新的序列化格式。
这意味着旧版本的 Kafka Streams 将无法识别新版本写入的字节。
因此,将 3.5.0 或更高版本的 Kafka Streams 降级到正在运行的旧版本更加困难。为
更多详情,请参考 KIP-904。
对于降级,请先将配置切换到要降级到的版本。
这将禁止在应用程序中写入新的序列化格式。在此状态下等待很重要
足够长的时间以确保应用程序已完成处理写入的任何 “in-fly” 消息
以新的序列化格式重新分区主题。之后,您可以将应用程序降级为
3.5.x 之前的版本。"upgrade.from"
从 3.0.x 或更高版本降级到 2.8.x 或更早版本需要特别注意: 自 3.0.0 版本起,Kafka Streams 使用更新的 RocksDB 版本,其磁盘格式发生了变化。 这意味着旧版本的 RocksDB 将无法识别该新版本的 RocksDB 写入的字节。 因此,将 3.0.0 或更高版本的 Kafka Streams 降级到正在运行的旧版本更加困难。 用户需要擦除由新版本的 Kafka Streams 写入的本地 RocksDB 状态存储,然后再换入 旧版本的 Kafka Streams 字节码,然后它将使用旧的磁盘格式从 changelogs 中。
Kafka Streams 不支持在同一物理状态目录上运行同一应用程序的多个实例作为不同的进程。从 2.8.0(以及 2.7.1 和 2.6.2)开始,
将强制执行此限制。如果要运行多个 Kafka Streams 实例,则必须为它们配置不同的值。state.dir
从 Kafka Streams 2.6.x 开始,可以使用一种名为 EOS 版本 2 的新处理模式。这可以配置
通过设置为 for
应用程序版本 3.0+,或者将其设置为 2.6 和 2.8 之间的版本。
要使用这项新功能,您的代理必须使用 2.5.x 或更高版本。
如果您想从旧版本升级 EOS 应用程序并在版本 3.0+ 中启用此功能,
您首先需要将应用程序升级到版本 3.0.x,保持 、
然后执行第二轮滚动弹跳以切换到 。如果你
正在将 EOS 应用程序从较旧的(2.6 之前)版本升级到 2.6 到 2.8 之间的版本,请按照以下步骤操作
相同的步骤,但使用 config。无需特殊步骤
要使用版本 2.6+ 升级到 3.0 或更高版本的应用程序:您可以
只需在滚动升级期间将 config 从 更改为 即可。
对于降级,请执行相反的操作:首先将配置从 切换到 以在 2.6.x 应用程序中禁用该功能。
之后,您可以将应用程序降级到 2.6.x 之前的版本。"processing.guarantee"
"exactly_once_v2"
"exactly_once_beta"
"exactly_once"
"exactly_once_v2"
"exactly_once_beta"
"exactly_once_beta"
"exactly_once_beta"
"exactly_once_v2"
"exactly_once_v2"
"exactly_once"
自 2.6.0 版本起,Kafka Streams 依赖于需要 MacOS 10.14 或更高版本的 RocksDB 版本。
要运行 Kafka Streams 应用程序版本 2.2.1、2.3.0 或更高版本,需要代理版本 0.11.0 或更高版本 并且磁盘消息格式必须为 0.11 或更高。 代理必须为版本 0.10.1 或更高版本,才能运行 Kafka Streams 应用程序版本 0.10.1 到 2.2.0。 此外,磁盘消息格式必须为 0.10 或更高,才能运行 Kafka Streams 应用程序版本 1.0 到 2.2.0。 对于 Kafka Streams 0.10.0,需要代理版本 0.10.0 或更高版本。
在已弃用的类中,当 a 通过 从源主题创建 时,其具体化状态存储
将重用源主题作为其 changelog 主题进行恢复,并将禁用日志记录以避免向源主题附加新的更新;在 1.0 中引入的类中,此行为已更改
意外地:我们仍然重用源主题作为恢复的 changelog 主题,但也会创建一个单独的 changelog 主题来附加 Update records from source 主题。在 2.0 版本中,我们修复了此问题,现在用户
可以根据 :如果要从旧类升级,因此需要更改代码以使用
新的 ,您应该将此 config 值设置为 to 以继续重用源主题;如果您从 1.0 或 1.1 升级,并且您已经在使用并且已经
创建了一个单独的 changelog 主题,你应该将此 config 值设置为 when upgrading to {{fullDotVersion}} 以便使用该 changelog 主题来恢复状态存储。
有关新配置的更多详细信息,请参阅 KIP-295。KStreamBuilder
KTable
KStreamBuilder.table()
StreamsBuilder
StreamsConfig#TOPOLOGY_OPTIMIZATION
KStreamBuilder
StreamsBuilder
StreamsConfig#OPTIMIZE
StreamsBuilder
StreamsConfig#NO_OPTIMIZATION
StreamsConfig#TOPOLOGY_OPTIMIZATION
3.9.0 中的 Streams API 更改
KIP-1033 的引入使您能够提供处理异常处理程序,以便在处理记录期间管理异常,而不是将异常完全抛出您的流应用程序。
您可以通过 as 提供配置。
指定的处理程序必须实现接口。StreamsConfig
StreamsConfig#PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG
org.apache.kafka.streams.errors.ProcessingExceptionHandler
Kafka Streams 现在允许通过新添加的 config 自定义 stream-thread runtime summary 的日志记录间隔。
默认情况下,每 2 分钟记录一次摘要。更多详细信息可以在 KIP-1049 中找到。log.summary.interval.ms
3.8.0 中的 Streams API 更改
Kafka Streams 现在支持通过配置自定义任务分配策略。可以将配置设置为自定义任务分配器的完全限定类名称
实现,该实现必须扩展新接口。
新配置还允许用户恢复在引入 .如果未配置自定义任务分配器,则使用默认任务分配器。
如果你使用的是该配置,则应改用新配置,因为内部配置将在将来的发行版中删除。
如果您之前是通过旧配置插入的,则需要确保您正在导入
切换时的新体验
转到 New Config,它是实现新 Public 接口的 版本。
有关更多详细信息,请参阅 KIP-924 的公共接口部分。task.assignor.class
org.apache.kafka.streams.processor.assignment.TaskAssignor
StickyTaskAssignor
HighAvailabilityTaskAssignor
HighAvailabilityTaskAssignor
internal.task.assignor.class
task.assignor.class
StickyTaskAssignor
internal.task.assignor.class
org.apache.kafka.streams.processor.assignment.StickTaskAssignor
task.assignor.class
StickyTaskAssignor
TaskAssignor
处理器 API 现在支持通过 KIP-813 添加的只读状态存储。
这些商店没有专门的 changelog 主题,但使用它们的源主题来实现容错,
类似于启用 Source-Topic 优化。KTables
为了改进对泄露的 state store 迭代器的检测,我们添加了新的 store 级指标来跟踪 number 和
打开迭代器的年龄。新指标包括 、 和 。这些指标可用
适用于所有状态存储,包括 RocksDB、内存和自定义存储。更多详细信息可以在 KIP-989 中找到。num-open-iterators
iterator-duration-avg
iterator-duration-max
oldest-iterator-open-since-ms
3.7.0 中的 Streams API 更改
我们在 中添加了一个新方法,即在 KIP-988 中,
用户可以在其中提供新添加的接口的自定义实现,以持续监控备用任务的更改。KafkaStreams
KafkaStreams#setStandbyUpdateListener()
StandbyUpdateListener
IQv2 支持允许指定无界、有界或半开放的键范围,这些键范围以无序(byte[]-lexicographic)顺序(每个分区)返回数据。KIP-985 通过添加 和 允许用户按降序或升序接收数据来扩展此功能。RangeQuery
.withDescendingKeys()
.withAscendingKeys()
KIP-992 添加了两种新的查询类型,即
即 和 。两者都应该用于查询带时间戳的键值存储,以检索结果。
existing 和 are 更改为始终仅返回带时间戳的键值存储的值。TimestampedKeyQuery
TimestampedRangeQuery
ValueAndTimestamp
KeyQuery
RangeQuery
IQv2 增加了对 (在 KIP-968 中引入) 的支持
这允许从版本控制状态存储中检索给定键和指定时间范围的一组记录。
用户必须使用和/或指定一半或完整的时间范围。MultiVersionedKeyQuery
fromTime(Instant)
toTime(Instant)
IQv2 增加了对 (在 KIP-960 中引入) 的支持
这允许根据其键和时间戳从版本化状态存储中检索单个记录。
用户必须使用该方法定义一个查询,该查询返回指定时间戳的记录版本。
更准确地说,key query 返回具有最大 timestamp 的记录。VersionedKeyQuery
asOf(Instant)
<= Instant
作为 KIP-962 的一部分,Kafka Streams 联接运算符的非 null 密钥要求已放宽。 以下运算符的行为已更改。
- left join KStream-KStream:不再删除带有 null-key 的左记录,并使用 'null' 调用 ValueJoiner 以获得正确的值。
- 外部连接 KStream-KStream:不再删除带有 null 键的左/右记录,并针对右/左值调用 'null' 的 ValueJoiner。
- left-foreign-key join KTable-KTable:不再删除 ForeignKeyExtractor 返回的 null-foreign-key 的左记录,并使用 'null' 调用 ValueJoiner 以获得正确的值。
- left join KStream-KTable:不再删除带有 null-key 的左记录,并针对右值使用 'null' 调用 ValueJoiner。
- left join KStream-GlobalTable:当 KeyValueMapper 返回 'null' 并调用 ValueJoiner 时,不再删除记录以获取 right 值。
//left join KStream-KStream
leftStream
.filter((key, value) -> key != null)
.leftJoin(rightStream, (leftValue, rightValue) -> join(leftValue, rightValue), windows);
//outer join KStream-KStream
rightStream
.filter((key, value) -> key != null);
leftStream
.filter((key, value) -> key != null)
.outerJoin(rightStream, (leftValue, rightValue) -> join(leftValue, rightValue), windows);
//left-foreign-key join KTable-KTable
Function<String, String> foreignKeyExtractor = leftValue -> ...
leftTable
.filter((key, value) -> foreignKeyExtractor.apply(value) != null)
.leftJoin(rightTable, foreignKeyExtractor, (leftValue, rightValue) -> join(leftValue, rightValue), Named.as("left-foreign-key-table-join"));
//left join KStream-KTable
leftStream
.filter((key, value) -> key != null)
.leftJoin(kTable, (k, leftValue, rightValue) -> join(leftValue, rightValue));
//left join KStream-GlobalTable
KeyValueMapper<String, String, String> keyValueMapper = (key, value) -> ...;
leftStream
.filter((key, value) -> keyValueMapper.apply(key,value) != null)
.leftJoin(globalTable, keyValueMapper, (leftValue, rightValue) -> join(leftValue, rightValue));
该配置已被弃用,取而代之的是新配置,以允许自定义状态存储
implementations 配置为 default。
如果您当前指定或替换了这些
分别具有 和default.dsl.store
dsl.store.suppliers.class
default.dsl.store=ROCKS_DB
default.dsl.store=IN_MEMORY
dsl.store.suppliers.class=BuiltInDslStoreSuppliers.RocksDBDslStoreSuppliers.class
dsl.stores.suppliers.class=BuiltInDslStoreSuppliers.InMemoryDslStoreSuppliers.class
在 3.7 版本中引入了一个新的配置选项。
有关更多信息,包括如何启用和进一步配置它,请参阅 Kafka Streams 开发人员指南。balance_subtopology
rack.aware.assignment.strategy
3.6.0 中的 Streams API 更改
机架感知任务分配是在 KIP-925 中引入的。
可以启用机架感知任务分配或计算任务分配,这可以在某些情况下最大限度地减少跨机架流量。
有关更多信息,包括如何启用和进一步配置它,请参阅 Kafka Streams 开发人员指南。StickyTaskAssignor
HighAvailabilityTaskAssignor
IQv2 支持允许指定无界、有界或半开放的密钥范围。用户必须使用 、 、
或指定半开或无界范围,但不能用于相同的范围。KIP-941 通过允许作为上限和下限(语义为“无界”)传入来简化类的使用,从而缩小了这一差距。RangeQuery
withUpperBound(K)
withLowerBound(K)
withNoBounds()
withRange(K lower, K upper)
null
RangeQuery
KStreams 到 KTable 联接现在具有添加宽限期的选项。
使用 with 方法在对象上启用宽限期。
此更改是在 KIP-923 中引入的。
要在 Stream-Table 联接中使用宽限期选项,必须对表进行版本控制。
有关更多信息,包括如何启用和进一步配置它,请参阅 Kafka Streams 开发人员指南。Joined
withGracePeriod()
3.5.0 中的 Streams API 更改
KIP-889 和 KIP-914 中引入了一种新的状态存储类型,即版本化键值存储。 不是为每个键存储单个记录版本(值和时间戳),不如 版本化状态存储可以为每个键存储多个记录版本。这 允许版本化状态存储支持带时间戳的检索操作 返回截至指定时间戳的最新记录 (每个键)。 有关更多信息,包括如何从非版本控制的键值升级 store 复制到现有应用程序中的版本控制存储,请参阅开发人员指南。 版本控制的键值存储仅是可选的;现有应用程序将不会 在升级到 3.5 时受到影响,但未明确更改代码。
除了 KIP-899 之外,如果用户选择使用新版本的键值存储,KIP-914 还会更新 DSL 处理语义。
使用新版本的键值存储,DSL 处理能够更好地处理乱序数据:
例如,可以删除延迟记录,并且 stream-table 联接对表执行基于时间戳的查找。
表聚合和主/外键表-表联接也得到了改进。
注意:global-KTable 不支持版本控制的键值存储,并且不适用于 .suppress()
KIP-904 改进了 KTable 聚合的实现。通常,输入 KTable 更新会触发两行的结果优化; 但是,在 KIP-904 之前,如果两个优化都发生在同一个结果行上,则会对同一行应用两个独立的更新,从而导致虚假的迭代中间结果。 KIP-904 允许我们检测这种情况,并且只应用一次更新,避免虚假的中间结果。
通过 KIP-399 改进了错误处理。
现有的 now 还涵盖了序列化错误。ProductionExceptionHandler
我们在 KIP-907 中添加了一个新的 Serde 类型Boolean
KIP-884 添加了一个新配置,允许在不更改任何代码的情况下使用自定义。default.client.supplier
KafkaClientSupplier
3.4.0 中的 Streams API 更改
KIP-770 废弃
config 支持新引入的 config 。
为了改进监控,在 DEBUG 级别添加了两个新指标。请注意,KIP 在 3.4.0 版本中仅部分实现,并且 config 尚不可用。cache.max.bytes.buffering
statestore.cache.max.bytes
input-buffer-bytes-total
cache-size-bytes-total
input.buffer.max.bytes
KIP-873 使您能够进行多播
结果记录添加到下游接收器主题的多个分区中,并添加了选择删除结果记录而不发送的功能。
该方法已弃用并替换为新添加的方法,该方法允许返回一组要将记录发送到的分区。Integer StreamPartitioner.partition()
Optiona≶Set<Integer>>StreamPartitioner.partitions()
KIP-862 为流-流自加入添加了 DSL 优化。优化是通过一个新选项启用的,该选项可以通过 existing config 进行设置。如果启用,DSL 将使用不同的
join 处理器实现,该实现使用单个 RocksDB 存储而不是两个,以避免 self-join 情况下不必要的数据重复。single.store.self.join
topology.optimization
KIP-865 通过弃用
参数,并在其位置引入新参数。--bootstrap-servers
--bootstrap-server
3.3.0 中的 Streams API 更改
Kafka Streams 在实例关闭时不会发送“离开组”请求。此行为意味着
再平衡将延迟,直到通过。KIP-812 引入了重载,它允许强制实例离开
组。
注意:由于内部限制,目前仅适用于静态消费组
(参见 KAFKA-16514 了解更多详细信息和修复
一些未来的版本)。max.poll.interval.ms
KafkaStreams.close(CloseOptions)
CloseOptions
KIP-820 将 KIP-478 的 PAPI 类型安全改进适应到 DSL 中。现有方法 、 、 和 以及 的所有重载均已弃用,取而代之的是新添加的方法KStream.transform
KStream.flatTransform
KStream.transformValues
KStream.flatTransformValues
void KStream.process
KStream<KOut,VOut> KStream.process(ProcessorSupplier, ...)
KStream<K,VOut> KStream.processValues(FixedKeyProcessorSupplier, ...)
KStream
void
process()
FixedKeyProcessor
FixedKeyRecord
FixedKeyProcessorContext
ContextualFixedKeyProcessor
processValues()
ProcessingContext
目前支持通过 Operator 仅在窗口关闭后发出窗口聚合结果。但是,使用内存中的实现,并且不会
支持 RocksDB。为了缩小这一差距,KIP-825 引入了“发出策略”,这些策略直接内置于聚合运算符中,以使用已经存在的
RocksDB 存储。 并允许在 “Emit on Window Update” (默认) 之间进行选择
和 “emit on window close” 策略。此外,还添加了一些新的发出指标,以及必要的
new 方法、.suppress()
suppress()
TimeWindowedKStream.emitStrategy(EmitStrategy)
SessionWindowedKStream.emitStrategy(EmitStrategy)
SessionStore.findSessions(long, long)
KIP-834 允许暂停
以及恢复 Kafka Streams 实例。暂停意味着处理输入记录和执行标点符号将
被跳过;Kafka Streams 将继续轮询以维护其组成员资格,并可能提交偏移量。
除了新方法 和 之外,它还
支持通过该方法检查实例是否暂停。KafkaStreams.pause()
KafkaStreams.resume()
KafkaStreams.isPaused()
为了改进对 Kafka Streams 应用程序的监控,KIP-846 在新的主题级别范围内添加了四个新指标 、 、 和 。
分别在 INFO 级别收集源节点和接收器节点的指标。bytes-consumed-total
records-consumed-total
bytes-produced-total
records-produced-total
3.2.0 中的 Streams API 更改
RocksDB 提供了许多指标,这些指标对于监控和调整其性能至关重要。Kafka Streams 开始使 RocksDB 指标可访问 与 2.4.0 版本中通过 KIP-471 的任何其他 Kafka 指标一样。 但是,KIP 仅部分实现,现在已通过 3.2.0 版本完成。 有关可用 RocksDB 指标的完整列表,请参阅监控文档。
Kafka Streams 附带了 RocksDB 和内存存储实现,用户可以选择使用哪一个。
但是,对于 DSL,选择是每个操作员的选择,因此从默认的 RocksDB 切换起来很麻烦
store to memory store (存储到内存存储) 中,尤其是对于较大的拓扑。KIP-591 添加了一个新配置,允许为全局所有 DSL 运算符设置默认存储。
请注意,需要传递给构造函数才能使用这个新配置。default.dsl.store
TopologyConfig
StreamsBuilder
对于多可用区部署,需要将 StandbyTasks 分配给在不同
AZ 比相应的活动 StreamTask 少。KIP-708 支持使用机架感知的 StandbyTask 分配策略配置 Kafka Streams 实例,方法是使用新添加的配置和相应的 .rack.aware.assignment.tags
client.tag.<myTag>
KIP-791 添加了一种新方法来公开
记录元数据。例如,这有助于在交互式查询中提供 read-your-writes 一致性保证。Optional<RecordMetadata> StateStoreContext.recordMetadata()
交互式查询允许用户
利用 Kafka Streams 处理器节点的运行状态。现有的 API 与
实际的 state store 接口,因此是 state store 的内部实现。打破这种紧密的耦合
并允许构建更高级的 IQ 功能,KIP-796 引入了
一个全新的 IQv2 API,通过 and 类,
以及 and 接口(加上其他帮助程序类)。
此外,还添加了多个内置查询类型:用于键查找和(通过 KIP-805)
对于键值存储的键范围查询,以及 和 (通过 KIP-806)
,用于键和范围查找窗口化存储。StateQueryRequest
StateQueryResult
Query
QueryResult
KeyQuery
RangeQuery
WindowKeyQuery
WindowRangeQuery
Kafka Streams DSL 可能会为某些 DSL 运算符插入所谓的重新分区主题,以确保正确分区
的数据。这些主题配置了无限保留时间,并且 Kafka Streams 会显式清除旧数据
通过 “delete record” 请求,当提交 Input Topic 偏移量时。KIP-811 添加了一个新配置,允许您独立于提交间隔配置清除间隔。repartition.purge.interval.ms
3.1.0 中的 Streams API 更改
通过 KIP-633 改进了左/外流-流连接的语义。
以前,由于 eager-emit 策略,left-/outer stream-stream join 可能会发出所谓的虚假 left/outer 结果。
将实现更改为仅在 join 窗口关闭后发出 left/outer join result 记录。
用于指定加入窗口的旧 API,即启用 eager-emit 策略的 API,
已弃用,取而代之的是 a 和 .
仅当使用新的联接窗口生成器时,才会启用新语义。
此外,KIP-633 还要求为窗口聚合设置宽限期,即 (hopping/tumbling)、 和 .
相应的 builder 方法已被弃用,取而代之的是 new 和 methods。JoinWindows.of()
JoinWindows.ofTimeDifferenceAndGrace()
JoinWindows.ofTimeDifferencWithNoGrace()
TimeWindows
SessionWindows
SlidingWindows
.of(...)
.ofTimeDifferenceAndGrace()
.ofTimeDifferencWithNoGrace()
KIP-761 添加了新的指标,允许跟踪底层使用者和生产者客户端上的阻塞时间。 有关更多详细信息,请查看 Kafka Streams 指标部分。
交互式查询通过 KIP-763 KIP-766 进行了改进。
范围查询现在接受 as lower/upper key-range bound,以指示开放式下限/上限。null
外键表-表联接现在支持通过 KIP-775 的自定义分区程序。
以前,如果 input 表由非默认分区程序分区,则联接记录可能会失败。
借助 KIP-775,您现在可以使用新添加的对象将自定义传递到联接中。StreamPartitioner
TableJoined
3.0.0 中的 Streams API 更改
我们改进了任务空闲 (max.task.idle.ms
) 的语义。
现在,Streams 提供更强大的 In-order 联接和合并处理语义。
Streams 的新默认功能会暂停对具有多个输入分区的任务的处理
当其中一个分区没有在本地缓冲数据但具有非零滞后时。在其他
words,Streams 将等待获取代理上已有的记录。这
导致 Join 语义得到改进,因为它允许 Streams 交错两个 Importing
按时间戳顺序进行分区,而不仅仅是处理恰好是哪个分区
缓冲。有一个选项可以禁用此新行为,还有一个选项可以
使 Streams 等待更长时间,以便将新记录生成到输入分区,
当您知道某些生产者可能是
慢。有关更多信息,请参阅配置参考,有关此更改的更大上下文,请参阅 KIP-695。
交互式查询可能会针对不同的错误引发新的异常:
-
UnknownStateStoreException
:如果拓扑中不存在指定的存储名称,则将引发 an 而不是前者。UnknownStateStoreException
InvalidStateStoreException
-
StreamsNotStartedException
:如果 Streams 状态为 ,将抛出 a。CREATED
StreamsNotStartedException
-
InvalidStateStorePartitionException
:如果指定的分区不存在,则会抛出 a。InvalidStateStorePartitionException
有关更多信息,请参阅 KIP-216。
我们弃用了 StreamsConfig 配置值(适用于 EOS 版本 1),取而代之的是改进的 EOS 版本 2,以前通过 .为了避免对配置名称中的术语“beta”产生混淆并突出 EOS 版本 2 的生产就绪性,我们还将“eos-beta”重命名为“eos-v2”
并弃用了 configuration 值 ,将其替换为新的配置值 exactly-once 语义的用户应计划迁移到 eos-v2 配置,并准备在 4.0 中或至少一年后删除已弃用的配置
从 3.0 版本开始,以最后到来者为准。请注意,eos-v2 需要 broker 版本 2.5 或更高版本,例如 eos-beta,因此用户应在必要时开始升级他们的 Kafka 集群。有关更多详细信息,请参阅 KIP-732。processing.guarantee
"exactly_once"
"exactly_once_beta
"exactly_once_beta"
"exactly_once_v2"
我们删除了 的默认实现。RocksDBConfigSetter#close()
我们删除了窗口化操作(如 Window 或 Session 聚合)的默认 24 小时宽限期,或者 stream-stream 连接。此时间段确定窗口结束后仍将保留任何无序记录的时间 被处理。在宽限期过后传入的记录被视为延迟,并将被丢弃。 但在 Suppression 等运算符中,较大的宽限期的缺点是会产生同样大的 输出延迟。当前的 API 使得完全错过宽限期配置变得太容易了,导致您 想知道为什么您的应用程序似乎没有产生任何输出 —— 实际上是,但不是 24 小时。
为了防止意外或无意识地退回到默认的 24 小时宽限期,我们弃用了所有
类的现有静态构造函数(如 )。这些
替换为两种风格的新静态构造函数:and(这些构造函数适用于类;类似的 API 适用于 、 和 SlidingWindows 类)。使用这些新 API,您被迫设置宽限期
句点,或者有意识地通过选择 flavor 来选择退出
如果您确实不关心宽限期,例如在测试期间,将其设置为 0 或
第一次使用 Kafka Streams 时。请注意,使用该类的新 API 还将修复虚假的左/外连接结果,如
以下段落。有关宽限期和新静态构造函数的更多详细信息,请参阅 KIP-633Windows
TimeWindows#of
#ofSizeAndGrace
#ofSizeWithNoGrace
TimeWindows
JoinWindows
SessionWindows
WithNoGrace
JoinWindows
此外,在旧版本中,Kafka Streams 急切地发出 stream-stream left/outer join 结果。此行为可能会导致虚假的左/外连接结果记录。
在此版本中,我们更改了行为以避免虚假结果,并且仅在 join 窗口关闭后(即宽限期过后)才会发出 left/outer join 结果。
为了保持向后兼容性,旧 API 保留了旧的 eager-emit 行为,并且只保留了新的
API 并启用新行为。有关更多信息,请查看 KAFKA-10847。JoinWindows#of(timeDifference)
JoinWindows#ofTimeDifferenceAndGrace()
JoinsWindows#ofTimeDifferenceNoGrace
TaskId 上的 public 和 fields 已被弃用,并替换为 getter。请改用新的 (替换 ) 和 API。此外,and 方法已被弃用
并且将被删除,因为它们从未打算供公众使用。我们还弃用了该类,并引入了一个要使用的新接口。引入此更改是为了更好地反映不应在外部实例化的事实
Kafka 代码库。
请注意,新提供的 API 可以更好地将任务 ID 表示为实际对象,而不是 String。请迁移到提供这些更好方法的 new,例如,使用 new 和 。 class 现在也被弃用,而新引入的接口将被改用。在这个新接口中,对已弃用的任何引用都将替换为新接口。
最后,也已弃用。请迁移到新的 .
我们已经弃用了几个方法,这些方法返回了上述已弃用的类:topicGroupId
partition
TaskId.subtopology()
topicGroupId
TaskId.partition()
TaskId#readFrom
TaskId#writeTo
org.apache.kafka.streams.processor.TaskMetadata
org.apache.kafka.streams.TaskMetadata
TaskMetadata
TaskMetadata
TaskId
org.apache.kafka.streams.TaskMetadata
ThreadMetadata#activeTasks
ThreadMetadata#standbyTasks
org.apache.kafka.streams.processor.ThreadMetadata
org.apache.kafka.streams.ThreadMetadata
ThreadMetadata
TaskMetadata
org.apache.kafka.streams.state.StreamsMetadata
org.apache.kafka.streams.StreamsMetadata
org.apache.kafka.streams.KafkaStreams
- 的用户将迁移到新的 .
KafkaStreams#allMetadata
KafkaStreams#metadataForAllStreamsClients
- 的用户将迁移到新的 .
KafkaStreams#allMetadataForStore(String)
KafkaStreams#streamsMetadataForStore(String)
- 的用户将迁移到新的 .
KafkaStreams#localThreadsMetadata
KafkaStreams#metadataForLocalThreads
有关更多详细信息,请参阅 KIP-740 和 KIP-744。
我们删除了以下已弃用的 API:
-
--zookeeper
应用程序重置工具的标志:在 Kafka 1.0.0 中已弃用 (KIP-198)。 -
--execute
应用程序重置工具的标志:在 Kafka 1.1.0 中已弃用 (KIP-171)。 -
StreamsBuilder#addGlobalStore
(一个重载):在 Kafka 1.1.0 (KIP-233) 中已弃用。 -
ProcessorContext#forward
(一些重载):在 Kafka 2.0.0 (KIP-251) 中已弃用。 -
WindowBytesStoreSupplier#segments
:在 Kafka 2.1.0 (KIP-319) 中已弃用。 -
segments, until, maintainMs
on 、 和 : 在 Kafka 2.1.0 (KIP-328) 中已弃用。TimeWindows
JoinWindows
SessionWindows
- 重载 、 、 和带有类型化参数的参数:在 Kafka 2.1.0 中已弃用 (KIP-358)。
JoinWindows#of, before, after
SessionWindows#with
TimeWindows#of, advanceBy
UnlimitedWindows#startOn
KafkaStreams#close
long
- 重载且参数为:在 Kafka 2.1.0 中已弃用 (KIP-372)。
KStream#groupBy, groupByKey
KTable#groupBy
Serialized
-
Joined#named, name
:在 Kafka 2.3.0 (KIP-307) 中已弃用。 -
TopologyTestDriver#pipeInput, readOutput
和类 (KIP-470) 的 API 和类。OutputVerifier
ConsumerRecordFactory
-
KafkaClientSupplier#getAdminClient
:在 Kafka 2.4.0 (KIP-476) 中已弃用。 - 重载 and parameters:在 Kafka 2.4.0 中已弃用 (KIP-479)。
KStream#join, leftJoin, outerJoin
KStream
Joined
-
WindowStore#put(K key, V value)
:在 Kafka 2.4.0 (KIP-474) 中已弃用。 -
UsePreviousTimeOnInvalidTimestamp
:在 Kafka 2.5.0 中已弃用,已重命名为 (KIP-530)。UsePartitionTimeOnInvalidTimestamp
- Overloaded :在 Kafka 2.5.0 (KIP-535) 中已弃用。
KafkaStreams#metadataForKey
- 重载 :在 Kafka 2.5.0 (KIP-562) 中已弃用。
KafkaStreams#store
以下依赖项已从 Kafka Streams 中删除:
- Connect-json:从 Kafka Streams 开始,编译时不再依赖于“connect:json”模块 (KAFKA-5146)。 依赖此传递依赖项的项目必须显式声明它。
配置参数的默认值已更改为 (即:使用代理默认复制因子)。
的值需要代理版本 2.4 或更高版本。replication.factor
-1
replication.factor
-1
引入了新的 serde 类型:ListSerde
- 向基于 (de) serialize 的对象添加了类
ListSerde
List
- 引入 和 为新功能提供支持
ListSerializer
ListDeserializer
2.8.0 中的 Streams API 更改
我们扩展了选项,并在 KIP-689 中加入了选项。StreamJoined
withLoggingEnabled()
withLoggingDisabled()
我们在 中添加了两个新方法,即 KIP-663 中的 和。
这些方法允许向正在运行的 KafkaStreams 客户端添加和删除 StreamThreads。KafkaStreams
KafkaStreams#addStreamThread()
KafkaStreams#removeStreamThread()
我们在 KIP-671 中弃用了 。
默认处理程序将关闭 Kafka Streams 客户端,并且客户端将转换为状态 ERROR。
如果您实现自定义处理程序,则新接口允许您返回 ,
这将确定应用程序将如何响应流线程故障。KafkaStreams#setUncaughtExceptionHandler(final Thread.UncaughtExceptionHandler uncaughtExceptionHandler)
KafkaStreams#setUncaughtExceptionHandler(final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler)
StreamThreadExceptionResponse
KIP-663 中的更改需要 KafkaStreams 客户端
状态机进行更新,这是在 KIP-696 中完成的。
ERROR 状态现在是 TERMINAL,PENDING_ERROR 是资源正在关闭的过渡状态。
ERROR 状态表示有问题,Kafka Streams 客户端不应盲目
重新启动,但未对导致线程失败的错误进行分类。
如果错误是您要重试的类型,则应返回 return 。
当所有流线程都已失效时,不会自动转换为 ERROR,因为可以添加新的流线程。StreamsUncaughtExceptionHandler
REPLACE_THREAD
该构造函数已弃用,以鼓励用户通过 正确设置其窗口大小。
为无法通过
构造函数,例如在使用 Console Consumer 时。KIP-659 有更多细节。TimeWindowedDeserializer
TimeWindowedDeserializer(final Deserializer inner)
TimeWindowedDeserializer(final Deserializer inner, Long windowSize)
window.size.ms
为了简化测试,两个不需要参数的新构造函数已经
已添加到类中。如果传递
到构造函数中,不再需要设置强制配置参数
(参见 KIP-680)。Properties
TopologyTestDriver
Properties
我们向 interface 添加了 method 。
new 允许获取 key 以给定前缀开头的所有值。
有关更多详细信息,请参阅 KIP-614。prefixScan()
ReadOnlyKeyValueStore
prefixScan()
Kafka Streams 现在正在处理使用者、创建者和管理客户端引发的问题。
如果任务发生超时,Kafka Streams 将移至下一个任务,并重试以继续处理失败的任务
任务。
要限制 Kafka Streams 重试任务的时间,您可以设置(默认值为 5 分钟)。
如果任务在指定的任务超时(按任务跟踪)内未取得进展,
Kafka Streams 抛出一个(参见 KIP-572)。TimeoutException
task.timeout.ms
TimeoutException
我们将 和 的默认值更改为 be,而不是 。
用户现在将看到 a 如果他们的 serdes 未通过这些配置正确配置或未显式传入。
有关详细信息,请参阅 KIP-741。default.key.serde
default.value.serde
null
ByteArraySerde
ConfigException
2.7.0 中的 Streams API 更改
在我们弃用了 、 以及 并分别将它们替换为 、 和 。 在 Kafka Streams 2.5 版本中引入,其中 getter 方法具有 prefix 。
此更改的目的是将方法名称引入 Kafka 自定义,以便不使用 getter 方法的前缀。
旧方法已弃用,不受影响。
(参见 KIP-648。KeyQueryMetadata
getActiveHost()
getStandbyHosts()
getPartition()
activeHost()
standbyHosts()
partition()
KeyQueryMetadata
get
get
配置参数的变量已从 重命名为 。
旧变量已弃用。请注意,参数名称本身不受影响。
(参见 KIP-629。StreamsConfig
"topology.optimization"
TOPOLOGY_OPTIMIZATION
TOPOLOGY_OPTIMIZATION_CONFIG
配置参数已弃用,取而代之的是新参数 。
Kafka Streams 的运行时会忽略(如果设置),但是,它仍会转发参数
给其内部客户。retries
task.timeout.ms
retries
我们添加了 KIP-450 中所述的窗口化聚合选项。
滑动窗口是固定时间且数据对齐的窗口,可实现灵活高效的窗口聚合。SlidingWindows
windowedBy()
2.6 中引入的端到端延迟指标已扩展为包括商店级指标。新的商店级别 指标记录在 TRACE 级别,这是一个新的指标记录级别。启用 TRACE 级别指标将自动 打开所有更高的级别,即 INFO 和 DEBUG。有关更多信息,请参阅 KIP-613。
2.6.0 中的 Streams API 更改
我们添加了一种新的处理模式 EOS 版本 2,它使用恰好一次保证来提高应用程序的可扩展性
(通过 KIP-447)。
您可以通过将配置参数设置为
新值 .
请注意,您需要 2.5 或更高版本的代理才能使用此功能。processing.guarantee
"exactly_once_beta"
对于更高可用性的有状态应用程序,我们修改了任务分配算法,以延迟有状态活动任务到实例的移动 尚未赶上该任务的状态。相反,要将任务从一个实例迁移到另一个实例(例如,在扩展时), Streams 会将预热副本分配给目标实例,以便它可以在活动任务在实例上保持可用时开始恢复状态 这已经有了任务。实例预热任务会将其进度传达给组,以便在准备就绪后,Streams 可以处于活动状态 任务分配给新所有者。查看 KIP-441 了解完整详细信息,包括用于控制此新功能的几个新配置。
添加了新的端到端延迟指标。这些任务级别的指标将记录在 INFO 级别,并报告记录在开始/源节点的最小和最大端到端延迟 以及任务的结束/结束节点。有关更多信息,请参阅 KIP-613。
从 2.6.0 开始,Kafka Streams 弃用了新的运算符
(根据 KIP-221)。 与 类似,但 Kafka Streams 将为您管理该主题。
如果需要写入和读回您管理的主题,则可以回退以与 结合使用。
请参阅 开发人员指南 以了解有关 的更多详细信息。KStream.through()
KStream.repartition()
KStream.repartition()
KStream.through()
KStream.to()
StreamsBuilder#stream()
KStream.repartition()
s 在处理器 API 中的可用性得到了改进:现在根据 KIP-401 进行了扩展,
使用户能够提供 与 Processor/Transformer 逻辑一起提供 s,以便它们自动
已添加并连接到处理器。StateStore
ProcessorSupplier
TransformerSupplier
ConnectedStoreProvider
StateStore
我们在 StreamsResetter 中添加了一个选项,用于在配置长会话超时时强制删除代理端的剩余成员
根据 KIP-571。--force
我们添加了 and 方法来允许禁用或配置 changelog 主题,并允许配置 changelog 主题
根据 KIP-446。Suppressed.withLoggingDisabled()
Suppressed.withLoggingEnabled(config)
2.5.0 中的 Streams API 更改
我们添加了一个新运算符(通过 KIP-150)
这允许在单个操作中聚合多个流。
共分组流也可以在聚合之前进行窗口化。
有关更多详细信息,请参阅开发人员指南。cogroup()
我们添加了一个新的 API,用于根据 KIP-523 将输入事件流转换为更改日志流。KStream.toTable()
我们在 KIP-527 中添加了一个新的 Serde 类型来表示
Null 键或 Null 值 from input 主题。Void
已弃用,并将其替换为 KIP-530。UsePreviousTimeOnInvalidTimestamp
UsePartitionTimeOnInvalidTimeStamp
已弃用,并将其替换为 以允许查询
对于具有各种参数的存储,包括查询特定任务和过时的存储,分别根据 KIP-562 和 KIP-535。KafkaStreams.store(String, QueryableStoreType)
KafkaStreams.store(StoreQueryParameters)
2.4.0 中的 Streams API 更改
从 2.4.0 开始,Kafka Streams 提供了 KTable-KTable 外键连接(根据 KIP-213)。 此连接符允许在具有不同键的两个 KTable 之间连接记录。 支持 INNER 和 LEFT 外键联接。
在 2.4 版本中,您现在可以通过 KIP-307 命名 Kafka Streams DSL 拓扑中的所有运算符。
为运算符指定有意义的名称可以更轻松地理解拓扑
description () 和
了解 Kafka Streams 应用程序正在执行的操作的完整上下文。
大多数 and 方法都有新的重载
接受对象。通常,您将通过以下方式为 DSL 操作提供名称
用。聚合的重新分区主题的命名
操作仍将使用,而 Join 操作将使用
或 new 对象。Topology#describe()#toString()
KStream
KTable
Named
Named.as("my operator name")
Grouped
Joined
StreamJoined
在 Kafka Streams 2.4.0 版本之前,DSL 的用户无法命名流-流联接中涉及的状态存储。
如果用户更改了拓扑并在
join,则 state store 的内部名称会发生变化,需要在重新部署时重置应用程序。
在 2.4.0 版本中,Kafka Streams 添加了类,使用户能够命名连接处理器、重新分区主题(如果需要重新分区)、
以及 join 中涉及的 state store 的 state store 中。此外,通过命名 state store,changelog 主题
支持状态的 store 也被命名。请务必注意,命名商店不会使它们可通过 Interactive Queries 查询。
提供的另一个功能是,您现在可以配置联接中使用的状态存储的类型。
您可以选择使用内存中存储或自定义状态存储进行流-流联接。请注意,提供的 store
将无法通过 Interactive Queries 进行查询。随着
of ,stream-stream 连接操作
using 已被弃用。请使用
new 重载方法。您可以从 KIP-479 获取更多详细信息。StreamJoined
StreamJoined
StreamJoined
Joined
随着增量协作再平衡的引入,Streams 不再需要在重新平衡开始时撤销所有任务。相反,在完成再平衡时,仅将要迁移到另一个使用者的任务
对于总体负载均衡,需要关闭和撤销。这改变了 a bit 的语义,因为它不一定会再过渡到再平衡开始时。请注意,
这意味着 IQ 现在将始终可用,除了在状态恢复期间,包括在进行再平衡时。如果在再平衡开始时正在进行恢复,我们将继续主动恢复 state 存储和/或进程
standby 任务。请注意,使用这种新的再平衡协议,您有时可能会看到再平衡之后是第二次短暂的再平衡,以确保所有任务都得到安全分配。有关详细信息,请参阅 KIP-429。StateListener
REBALANCING
2.4.0 版本包含新添加和重新设计的量度。KIP-444 向现有的
线程级、任务级和处理器/状态存储级指标。
有关可用客户端级别指标的完整列表,请参阅操作指南中的 KafkaStreams 监控部分。
此外,RocksDB 指标通过 KIP-471 公开。
有关可用 RocksDB 指标的完整列表,请参阅操作指南中的 RocksDB 监控部分。KafkaStreams
Kafka Streams 通过 KIP-470 进行了改进,以简化用于测试应用程序代码的过程。
我们弃用了 、 、 以及 和 ,并分别将它们替换为 和 。
我们还引入了一个简化断言代码的新类。
有关完整详细信息,请参阅开发人员指南中的测试部分。test-utils
TopologyTestDriver
ConsumerRecordFactory
TopologyTestDriver#pipeInput()
OutputVerifier
TopologyTestDriver#readOutput()
TestInputTopic
TestOutputTopic
TestRecord
在 2.4.0 中,我们弃用了永远不应该使用的 API。
相反,应该使用现有的
(KIP-474)。WindowStore#put(K key, V value)
WindowStore#put(K key, V value, long windowStartTimestamp)
此外,接口及其相应的配置参数已弃用
(KIP-528)
,并将在下一个主要版本 (KAFKA-7785.
因此,将来将不再支持此功能,您需要相应地更新代码。
如果您使用自定义并停止使用它,则创建的任务可能会更改。
因此,您需要重置应用程序才能升级它。PartitionGrouper
partition.grouper
PartitionGrouper
2.3.0 中的 Streams API 更改
版本 2.3.0 向 Ktable API 添加了 Suppress 运算符。kafka-streams-scala
从 2.3.0 开始,Streams 现在提供窗口的内存中版本 (KIP-428)
和会话 (KIP-445) 存储,以及基于 RocksDB 的持久化存储。
新的公共接口 和 被添加到内置的内存窗口或会话存储中并提供。inMemoryWindowStore()
inMemorySessionStore()
Stores
从 2.3.0 开始,我们更新了如何启用优化。现在,要启用优化,您需要做两件事。
首先将此行添加到您的 properties 中,就像您之前所做的那样。
其次,在构建实例时,您需要在构建
topology 中使用 overloaded 方法。
例如。properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
KafkaStreams
StreamsBuilder.build(Properties)
KafkaStreams myStream = new KafkaStreams(streamsBuilder.build(properties), properties)
在 2.3.0 中,我们添加了 和 for 的默认实现,以便它们可以通过 lambda 表达式实现。
有关更多详细信息,请阅读 KIP-331。close()
configure()
Serializer
Deserializer
Serde
为了改进运算符语义,添加了新的存储类型,允许为每个键值对或窗口存储额外的时间戳。
一些 DSL 运营商(例如 KTables)正在使用这些新存储。
因此,如果您指定 或 作为 .
虽然此更改主要是透明的,但在某些极端情况下可能需要更改代码:注意:如果您收到非类型化存储并使用强制转换,则可能需要更新代码以强制转换为正确的类型。
否则,您可能会收到类似于 java.lang.ClassCastException 的异常:从存储中获取值时,类 org.apache.kafka.streams.state.ValueAndTimestamp 无法强制转换为类 YOUR-VALUE-TYPE
。此外,仅返回非内置 Store,并在访问内置 Store 时引发异常。
有关更多详细信息,请阅读 KIP-258。TimestampedKeyValueStoreType
TimestampedWindowStoreType
QueryableStoreType
TopologyTestDriver#getStateStore()
为了提高类型安全性,添加了一个新运算符。
有关更多详细信息,请阅读 KIP-313。KStream#flatTransformValues
Kafka Streams 用于将配置参数设置为 。
此默认值已删除,Kafka Streams 现在使用使用者默认值。
有关更多详细信息,请阅读 KIP-442。max.poll.interval.ms
Integer.MAX_VALUE
repartition 主题的默认配置已更改:
索引文件的分段大小 () 不再为 50MB,而是使用集群默认值。
同样,配置不再是 10 分钟,而是使用集群默认配置。
最后,保留期 () 从 更改为 (无限)。
有关更多详细信息,请阅读 KIP-443。segment.index.bytes
segment.ms
retention.ms
Long.MAX_VALUE
-1
为避免内存泄漏,有一个在 shutdown 时调用的新方法。
用户应该通过关闭 RocksDB 配置对象来实现此方法以释放这些对象使用的任何内存。
有关更多详细信息,请阅读 KIP-453。RocksDBConfigSetter
close()
RocksDB 依赖项已更新到版本 。
新版本允许指定更多的 RocksDB 配置,包括有助于限制 RocksDB 堆外内存使用的配置。
有关更多详细信息,请阅读 KAFKA-8215。5.18.3
WriteBufferManager
2.2.0 中的 Streams API 更改
我们在 2.2.0 中稍微简化了启动阶段的转换图:在旧版本中,状态将从 到 ,然后是 to 以获得第一个
stream 任务分配,然后返回 ;从 2.2.0 开始,它将从 直接传输到 ,然后传输到 。
如果你已经注册了一个捕获状态转换事件的 a,你可能需要相应地调整你的 listener implementation 以实现这种简化(在实践中,你的 listener logic 应该不太可能受到影响)。KafkaStreams#state
CREATED
RUNNING
REBALANCING
RUNNING
CREATED
REBALANCING
RUNNING
StateListener
在 中,我们添加了一个新的静态构造函数,以返回具有可配置窗口大小的 。这是为了帮助用户构建时间窗口 serdes 以直接从时间窗口存储的更改日志中读取。
更多详细信息可以在 KIP-393 中找到。WindowedSerdes
TimeWindowSerde
在 2.2.0 中,我们扩展了一些公共接口,包括 to extend,以便它们可以
在 try-with-resource 语句中使用。有关受影响的公共接口的完整列表,请阅读 KIP-376。KafkaStreams
AutoCloseable
2.1.0 中的 Streams API 更改
我们更新了 API 以允许更好的运行时检查。
鼓励用户在节点上使用 and 相应地
而不是使用 ,后者已被弃用。同样,使用 和 获取节点的描述。有关更多详细信息,请参阅 KIP-321。TopologyDescription
#topicSet()
#topicPattern()
TopologyDescription.Source
#topics()
#topic()
#topicNameExtractor()
TopologyDescription.Sink
我们添加了一个新类并弃用了 .添加的目的是能够
name repartition topics (名称重新分区主题) 在执行聚合操作时创建。用户可以使用采用 a 和 作为重新分区主题名称一部分的方法命名潜在的重新分区主题。生成的重新分区
topic name 仍将遵循 .现在,该类在 、 和 中优先于 。
请注意,Kafka Streams 不会自动为聚合操作创建重新分区主题。
此外,我们还使用一种新方法更新了该类,使用户能够命名执行 Stream/Stream 或 Stream/Table 联接所需的任何重新分区主题。有关更多详细信息,请重新分区
主题命名,请参阅 KIP-372。
因此,我们更新了 Kafka Streams Scala API 并删除了该类,以支持添加 .
如果只依赖 implicit ,只需要重新编译;如果您显式传入,很抱歉,您必须更改代码。Grouped
Serialized
Grouped
Grouped#as()
String
${application-id}->name<-repartition
Grouped
Serialized
KStream#groupByKey()
KStream#groupBy()
KTable#groupBy()
Joined
Joined#withName
Serialized
Grouped
Serialized
Serialized
我们添加了一个名为 to allow users 的新配置,以允许用户指定如何处理可能正在处理多个
topic-partitions(有关更多详细信息,请参阅 Out-of-order Handling 部分)。
默认值设置为 ,以有利于在主题分区的多个输入流之间同步时最小化延迟。
如果用户希望等待更长的时间,因为某些 topic-partition 没有可供处理的数据,因此无法确定其相应的流时间,
他们可以将此配置覆盖为更大的值。max.task.idle.ms
0
我们添加了缺失项,以便与允许用户获取会话窗口化存储的指定保留期保持一致。
我们还添加了缺失项,以允许用户关闭其自定义商店的缓存。SessionBytesStoreSupplier#retentionPeriod()
WindowBytesStoreSupplier
StoreBuilder#withCachingDisabled()
我们为 UUID () 添加了一个新的 serde,您可以通过 (参见 KIP-206) 使用它。Serdes.UUIDSerde
Serdes.UUID()
我们更新了将参数作为时间戳(固定点)或持续时间(时间段)的方法列表
并将它们替换为 and 参数以改进语义。
一些基于的旧方法已被弃用,鼓励用户更新其代码。
特别是,聚合窗口(跳跃/翻转/无限时间窗口和会话窗口)以及联接窗口现在使用参数来指定窗口大小、跳跃和间隙参数。
此外,窗口大小和保留时间现在被指定为类中的类型。
该类具有新方法,并将窗口开始/结束时间戳返回为 .
对于交互式查询,有采用参数的新重载。
此外,标点符号现在通过 .
有关更多详细信息,请参阅 KIP-358。long
Instant
Duration
long
Duration
Duration
Stores
Window
#startTime()
#endTime()
Instant
#fetch(...)
Instant
ProcessorContext#schedule(Duration interval, ...)
我们弃用了它,并将其替换为接受单个 timeout 参数的
注意:新方法改进了(但略有不同)语义。
有关更多详细信息,请参阅 KIP-358。KafkaStreams#close(...)
KafkaStreams#close(Duration)
#close(Duration)
现在,在调用该方法时,可以使用新公开的指标。
有关公开指标的更多详细信息
参见 KIP-324AdminClient
KafkaStream#metrics()
AdminClients
我们弃用了 Window Store 中的段概念,因为这些是实现细节。
因此,method 和 variable 已被弃用。
如果您实现自定义窗口,则应相应地更新您的代码。
同样,已弃用并替换为 .
如果您实现自定义窗口存储,则需要相应地更新您的代码。
最后,已弃用并替换为新的重载,该重载不允许再指定段数。
有关更多详细信息,请参阅 KIP-319(注意:KIP-328 和 KIP-358 与 KIP-319 “重叠”)。Windows#segments()
Windows#segments
WindowBytesStoreSupplier#segments()
WindowBytesStoreSupplier#segmentInterval()
Stores#persistentWindowStore(...)
我们添加了一个重载方法,该方法接受 的实例,目的是使用 Kafka Streams 2.0 中添加的配置。在 2.1 之前,使用
DSL 时,Kafka Streams 会在用户对 DSL 进行调用时写入物理计划。现在,通过在
执行调用时,Kafka Streams 可以优化拓扑的物理规划,前提是配置设置为 。通过设置,除了对
重用源主题作为 changelog 主题,可以优化拓扑以将冗余的重新分区主题合并为一个
repartition 主题。的原始无参数版本仍然可供那些希望不
优化其拓扑。请注意,启用拓扑优化可能需要在重新部署应用程序时执行应用程序重置。了解更多
详情,参见 KIP-312StreamsBuilder#build
java.util.Properties
StreamsConfig#TOPOLOGY_OPTIMIZATION
java.util.Properties
StreamsBuilder#build
StreamsConfig#TOPOLOGY_OPTIMIZATION
StreamsConfig#OPTIMIZE
StreamsConfig#OPTIMIZE
KTable
StreamsBuilder#build
我们正在向 Kafka Streams 用户引入静态成员资格。此功能可减少正常应用程序升级或滚动退回期间不必要的重新平衡。
有关如何使用它的更多详细信息,请查看静态成员资格设计。
请注意,Kafka Streams 使用相同的 ,您只需确保它是在
一个应用程序中的不同 Stream 实例。ConsumerConfig#GROUP_INSTANCE_ID_CONFIG
2.0.0 中的 Streams API 更改
在 2.0.0 中,我们在界面上添加了一些新的 API(有关详细信息,请阅读下面的 Streams API 更改)。
如果您具有扩展接口的自定义窗口存储实现,则需要进行代码更改。ReadOnlyWindowStore
ReadOnlyWindowStore
此外,如果您在 Kafka Streams 代码中使用 Java 8 方法引用,则可能需要更新代码以解决方法歧义。 在这种情况下,仅热交换 jar 文件可能不起作用。 请参阅下面的 2.0.0 API 和语义更改的完整列表,这些更改允许您改进应用程序和/或简化代码库。
我们将 interface 从 移动到 ,因为它在上一个版本中被错误地放置了。如果您的代码已经使用了它,则需要在 import 语句中进行简单的单行更改。Consumed
org.apache.kafka.streams
org.apache.kafka.streams.kstream
我们还在 2.0.0 中删除了一些在 1.0.x 之前已弃用的公共 API。 有关已删除 API 的详细列表,请参阅下文。
我们已删除 and 指标。
反序列化错误以及跳过记录的所有其他原因现在都考虑在预先存在的量度和 .跳过记录时,事件为
现在记录为 WARN 级别。如果这些警告变得繁琐,我们建议显式过滤掉 unprocessable
records 而不是依赖于 record skipping 语义。有关更多详细信息,请参阅 KIP-274。
截至目前,跳过记录的潜在原因包括:skippedDueToDeserializationError-rate
skippedDueToDeserializationError-total
skipped-records-rate
skipped-records-total
null
表源中的键null
表-表内/左/外/右连接中的键null
流表联接中的键或值null
stream-stream joins 中的 key 或 valuesnull
分组流上聚合中的键或值null
分组流上 Reduction 中的键或值null
窗口化流上聚合中的键null
窗口流上的缩减中的 keynull
会话窗口化流上聚合中的键- 当配置的 Defined 决定时,产生结果的错误(默认为 to 并引发异常)。
default.production.exception.handler
CONTINUE
FAIL
- 当配置的 (默认为 to 并引发异常) 时,反序列化记录时出错。
这是之前在指标中捕获的情况。
default.deserialization.exception.handler
CONTINUE
FAIL
skippedDueToDeserializationError
- 获取的记录具有负时间戳。
我们还在 2.0 中修复了时间和会话窗口化存储操作的指标名称。因此,我们目前的内置商店
在度量名称中,其存储类型将为 、 和 。例如,RocksDB 时间窗口存储的
put 操作指标现在为 。
用户需要相应地更新其时间和会话窗口商店的指标收集和报告系统。
有关更多详细信息,请阅读 State Store Metrics 部分。in-memory-state
in-memory-lru-state
rocksdb-state
rocksdb-window-state
rocksdb-session-state
kafka.streams:type=stream-rocksdb-window-state-metrics,client-id=([-.\w]+),task-id=([-.\w]+),rocksdb-window-state-id=([-.\w]+)
我们添加了对允许查询单个窗口的键值对的方法的支持。
对于在上述界面上自定义了 Window store 实现的用户,他们还需要更新他们的代码来实现新添加的方法。
有关更多详细信息,请参阅 KIP-261。ReadOnlyWindowStore
我们添加了 public,以允许用户直接读取/写入存储窗口表更改日志的主题。
此外,如果默认的 serde 类是窗口化的 serdes,我们还添加了 和 让用户指定内部 serdes。
有关更多详细信息,请参阅 KIP-265。WindowedSerdes
StreamsConfig
default.windowed.key.serde.inner
default.windowed.value.serde.inner
我们在 Kafka 2.0.0 中添加了消息标头支持。特别是,我们添加了一个新的 API,它返回一个对象,该对象跟踪正在处理的源主题消息的标头。通过此对象,用户可以操作
标头映射也传播到整个处理器拓扑中。有关更多详细信息,请随时阅读
Developer Guide 部分。Processor API
ProcessorContext#headers()
Headers
我们已经弃用了 take a as 参数的构造函数。
请使用其他相应的 accept 构造函数。
有关更多详细信息,请参阅 KIP-245。KafkaStreams
StreamsConfig
java.util.Properties
Kafka 2.0.0 允许使用处理器 API (KIP-251) 操作输出记录的时间戳。
为了启用此新功能,已修改。
两个现有的重载 和 已弃用,并添加了新的重载。
新类允许您按名称将记录发送到所有或特定的下游处理器,并设置输出记录的时间戳。
新 API 不再支持基于子索引的转发。ProcessorContext#forward(...)
#forward(Object key, Object value, String childName)
#forward(Object key, Object value, int childIndex)
#forward(Object key, Object value, To to)
To
我们添加了支持,允许将记录动态路由到 Kafka 主题。更具体地说,在较低级别和较高级别的 API 中,我们都添加了
采用实例而不是特定的类型化主题名称,这样对于从上游处理器接收到的每条记录,库将动态确定要写入的 Kafka 主题
基于记录的键和值,以及记录上下文。请注意,所有可能使用的 Kafka 主题仍被视为用户主题,因此需要预先创建。除此之外,我们还修改了接口以添加 topic name 参数,因为现在 topic name 可能事先是未知的;具有此接口的自定义实现的用户需要在升级其应用程序时更新其代码
使用 Kafka Streams 2.0.0。Topology#addSink
KStream#to
TopicNameExtractor
String
StreamPartitioner
KIP-284 通过将重新分区主题的默认值设置为 .
Kafka Streams 不再依赖数据保留,而是使用新的清除数据 API 从这些主题中删除已使用的数据,并立即将已使用的存储空间保持在较小容量。Long.MAX_VALUE
我们修改了签名并删除了已弃用的布尔参数,因为它在 .
使用此函数将其状态存储注册到处理器拓扑中的用户只需更新其代码并从调用方中删除此参数。ProcessorStateManger#register(...)
loggingEnabled
StoreBuilder
Kafka Streams DSL for Scala 是一个新的 Kafka Streams 客户端库,可供使用 Scala 编写 Kafka Streams 应用程序的开发人员使用。它包装了核心 Kafka Streams DSL 类型,以便在 与 Scala 代码互操作。例如,它包括高阶函数作为转换的参数,从而避免了 Java 7 中的匿名类或 Scala 2.11 中的实验性 SAM 类型转换。 Java 和 Scala 集合类型之间的自动转换,一种方法 隐式提供 Serdes 以减少应用程序中的样板并使其更加类型安全,等等!有关更多信息,请参阅适用于 Scala 的 Kafka Streams DSL 文档和 KIP-270。
我们已删除这些已弃用的 API:
KafkaStreams#toString
不再返回拓扑和运行时元数据;要获取拓扑元数据,用户可以调用,要获取线程运行时元数据,用户可以调用(它们从 1.0.0 开始已弃用)。 有关如何更新代码的详细指导,请阅读此处Topology#describe()
KafkaStreams#localThreadsMetadata
TopologyBuilder
并分别被 和 删除并替换为 (它们自 1.0.0 起已弃用)。 有关如何更新代码的详细指导,请阅读此处KStreamBuilder
Topology
StreamsBuidler
StateStoreSupplier
被删除并替换为(它们从 1.0.0 开始被弃用); 以及使用它的相应和重载函数也已被删除。 有关如何更新代码的详细指导,请阅读此处StoreBuilder
Stores#create
KStream, KTable, KGroupedStream
KStream, KTable, KGroupedStream
需要 serde 和其他明确规范的重载函数将被删除,并替换为使用(它们从 1.0.0 开始已弃用)的更简单的重载函数。 有关如何更新代码的详细指导,请阅读此处Consumed, Produced, Serialized, Materialized, Joined
Processor#punctuate
、 和 ,并被删除并替换为(它们在 1.0.0 中已弃用)。ValueTransformer#punctuate
ValueTransformer#punctuate
ProcessorContext#schedule(long)
ProcessorContext#schedule(long, PunctuationType, Punctuator)
- 中的第二个类型化参数 “loggingEnabled” 已被删除;用户现在可以在创建 State Store 时使用 来指定行为。
boolean
ProcessorContext#register
StoreBuilder#withLoggingEnabled, withLoggingDisabled
KTable#writeAs, print, foreach, to, through
被删除,用户可以出于相同的目的进行调用(它们从 0.11.0.0 开始被弃用)。 有关已删除 API 的详细列表,请阅读此处KTable#tostream()#writeAs
StreamsConfig#KEY_SERDE_CLASS_CONFIG, VALUE_SERDE_CLASS_CONFIG, TIMESTAMP_EXTRACTOR_CLASS_CONFIG
分别被删除并替换为 (它们从 0.11.0.0 开始被弃用)。StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG, DEFAULT_VALUE_SERDE_CLASS_CONFIG, DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG
StreamsConfig#ZOOKEEPER_CONNECT_CONFIG
被删除,因为我们不再需要 Streams 中的 ZooKeeper 依赖项(从 0.10.2.0 开始已弃用)。
1.1.0 中的 Streams API 更改
我们添加了对方法的支持,这些方法允许在无需提供键的情况下查询 s。
对于在上述界面上自定义了 Window store 实现的用户,他们还需要更新他们的代码来实现新添加的方法。
有关更多详细信息,请参阅 KIP-205。ReadOnlyWindowStore
WindowStore
有一个新工件提供 、 和 类。
您可以将新构件作为常规依赖项包含在单元测试中,并使用测试驱动程序测试 Kafka Streams 应用程序的业务逻辑。
有关更多详细信息,请参阅 KIP-247。kafka-streams-test-utils
TopologyTestDriver
ConsumerRecordFactory
OutputVerifier
KIP-220 的引入使您能够为 Kafka Streams 创建的嵌入式管理客户端提供配置参数,类似于嵌入式生产者和使用者客户端。
您可以通过添加带有定义前缀的配置来提供配置,以将它们与共享相同配置名称的其他客户端的配置区分开来。StreamsConfig
admin.
StreamsConfig#adminClientPrefix(String)
新方法KTable
-
transformValues
方法已添加到 。与 上的方法类似,这些方法允许更丰富的、有状态的、类似于 Processor API 的值转换。KTable
KStream
新方法GlobalKTable
- 提供了一个方法,如果商店名称不可查询,它将返回与 或 关联的商店名称。
GlobalKTable
null
中的新方法 :KafkaStreams
- 为构造函数添加了重载,允许覆盖用于跟踪系统挂钟时间的对象;这对于对应用程序代码进行单元测试非常有用。
Time
中的新方法 :KafkaClientSupplier
- 添加了允许覆盖用于管理请求(如内部主题创建等)的 。
getAdminClient(config)
AdminClient
新增了生产过程中异常的错误处理:
- 添加了允许实现者决定 Streams 是否应该或何时发生某些异常的接口。
ProductionExceptionHandler
FAIL
CONTINUE
- 提供了一个始终失败的实现,默认情况下保留现有行为。
DefaultProductionExceptionHandler
- 可以通过对实现此接口的类的完全限定名称进行设置来更改使用的实现。
default.production.exception.handler
更改 :StreamsResetter
- 添加了选项以根据 KIP-171 指定要重置的输入主题偏移量
1.0.0 中的 Streams API 更改
在 1.0 中,完成了一次重大的 API 重构,新的 API 更简洁、更易于使用。
此更改包括五个主要类 、 、 、 和 (以及更多其他类)。
所有更改都完全向后兼容,因为旧 API 仅被弃用,但未被删除。
我们建议您尽快迁移到新的 API。
我们将在下一段中总结所有 API 更改。KafkaStreams
KStreamBuilder
KStream
KTable
TopologyBuilder
通过 DSL () 指定拓扑的两个主要类
或处理器 API () 已弃用,并替换为 and(两个新类都位于
package 的 ) 。
请注意,这并不扩展 ,即
现在,类层次结构不同了。
新类具有与旧类基本相同的方法,用于通过 DSL 或处理器 API 构建拓扑。
但是,一些在实际 API 中是公共的但不属于实际 API 的内部方法并不存在
在新类中。
此外,与原始类相比,一些重载得到了简化。
有关完整详细信息,请参阅 KIP-120 和 KIP-182。KStreamBuilder
TopologyBuilder
StreamsBuilder
Topology
org.apache.kafka.streams
StreamsBuilder
Topology
KStreamBuilder
TopologyBuilder
更改拓扑的指定方式也会影响构造函数,
现在只接受 .
使用 DSL 构建器类,可以通过 .
此外,还添加了一个新类(以及一些更多依赖类)。
这些可用于获取指定拓扑的详细描述
,可以通过调用 来获得。
快速入门部分显示了使用此新 API 的示例。KafkaStreams
Topology
StreamsBuilder
Topology
StreamsBuilder#build()
org.apache.kafka.streams.TopologyDescription
Topology#describe()
中的新方法 :KStream
- 随着 KIP-202 的引入,由于 StreamsBuilder 类已被删除,因此创建了一个新方法。
方法签名也发生了变化:不是一次向方法提供多个 s,而是只接受一个 s。
merge()
KStream
StreamsBuilder#merge()
KStream
KStream
中的新方法 :KafkaStreams
- 通过以下方式获取有关本地线程的当前运行时信息
localThreadsMetadata()
- 通过 观察所有状态存储的恢复,其中用户可以提供其自定义的接口实现
setGlobalStateRestoreListener()
org.apache.kafka.streams.processor.StateRestoreListener
中已弃用/已修改的方法 :KafkaStreams
-
toString()
之前用于返回 static 和 runtime 信息。 它们已被弃用,取而代之的是新的类/方法 /(返回运行时信息)和 /(返回静态信息)。toString(final String indent)
localThreadsMetadata()
ThreadMetadata
TopologyDescription
Topology#describe()
- 随着 KIP-182 的引入,您不应再进入操作。
如果你不能依赖 using 来打印你的 key an 值,你应该通过调用提供 custom。
Serde
KStream#print
toString
KeyValueMapper
Printed#withKeyValueMapper
-
setStateListener()
now 只能在应用程序开始运行之前设置,即 before 被调用。KafkaStreams.start()
中已弃用的方法KGroupedStream
- 窗口化聚合已从 弃用并移至 。
现在,您可以执行窗口化聚合,例如,使用 .
KGroupedStream
WindowedKStream
KGroupedStream#windowedBy(Windows)#reduce(Reducer)
修改的方法 :Processor
-
扩展了处理器 API,允许用户根据数据驱动的流时间或挂钟时间来调度功能。 因此,原始函数已被弃用,新的重载函数接受用户可自定义的回调接口,该接口根据 . 确定用于标点符号计划的时间概念:流时间或挂钟时间(默认情况下,流时间配置为通过 表示事件时间)。 此外,里面的函数也被弃用了。
punctuate
ProcessorContext#schedule
Punctuator
punctuate
PunctuationType
PunctuationType
TimestampExtractor
punctuate
Processor
在此之前,用户只能根据流时间(即 )进行调度,因此该函数是数据驱动的,因为流时间是由输入数据派生的时间戳确定(并向前推进)。 如果没有数据到达处理器,则流时间不会提前,因此不会触发标点符号。 另一方面,当使用挂钟时间 (即 ) 时,将完全基于挂钟时间触发。 所以,例如,如果函数是根据 调度的,如果这 60 条记录在 20 秒内处理完,则会被调用 2 次(每 10 秒 1 次); 如果这 60 条记录在 5 秒内处理完毕,则根本不会调用 NO。 用户只需在处理器的方法中多次调用,就可以在同一处理器中安排具有不同 s 的多个回调。
PunctuationType.STREAM_TIME
punctuate
PunctuationType.WALL_CLOCK_TIME
punctuate
Punctuator
PunctuationType.WALL_CLOCK_TIME
punctuate
punctuate
Punctuator
PunctuationType
ProcessorContext#schedule
init()
如果您正在监控任务级别或处理器节点/状态存储级别的 Streams 指标,请注意指标传感器名称和层次结构已更改: 任务 ID、存储名称和处理器名称不再位于传感器指标名称中,而是添加为传感器的标签,以实现一致的指标层次结构。 因此,在升级到 1.0.0 时,您可能需要对指标、报告和监控工具进行相应的代码更改。 详细的指标传感器可以在 Streams Monitoring 部分找到。
KIP-161 的引入使您能够在从 Kafka 读取数据时为反序列化错误提供默认异常处理程序,而不是将异常一直抛出流应用程序。
您可以通过 as 提供配置。
指定的处理程序必须实现接口。StreamsConfig
StreamsConfig#DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG
org.apache.kafka.streams.errors.DeserializationExceptionHandler
KIP-173 的引入使您能够为 Kafka Streams 创建的任何主题提供主题配置参数。
这包括 repartition 和 changelog 主题。
您可以通过添加带有 定义的前缀的配置来提供配置。
创建内部主题时,将应用 中带有前缀的任何属性。
任何不是主题配置的配置都将被忽略。
如果您已经使用 OR 为更改日志提供配置,那么它们将优先于配置中提供的配置。StreamsConfig
StreamsConfig#topicPrefix(String)
StreamsConfig
StateStoreSupplier
Materialized
0.11.0.0 中的 Streams API 更改
更新:StreamsConfig
- 添加了新的配置参数
processing.guarantee
- 配置参数已弃用,并替换为
key.serde
default.key.serde
- 配置参数已弃用,并替换为
value.serde
default.value.serde
- 配置参数已弃用,并替换为
timestamp.extractor
default.timestamp.extractor
- 方法已弃用,取而代之的是
keySerde()
defaultKeySerde()
- 方法已弃用,取而代之的是
valueSerde()
defaultValueSerde()
- 添加了 new method
defaultTimestampExtractor()
中的新方法 :TopologyBuilder
- 添加了允许定义每个源节点的重载
addSource()
TimestampExtractor
- 添加了重载,允许定义与全局存储关联的每个源节点
addGlobalStore()
TimestampExtractor
中的新方法 :KStreamBuilder
- 添加了允许定义 Per Input 流的重载
stream()
TimestampExtractor
- 添加了允许定义 Per Input Table 的重载
table()
TimestampExtractor
- 添加了允许定义 per global table 的重载
globalKTable()
TimestampExtractor
中已弃用的方法:KTable
-
void foreach(final ForeachAction<? super K, ? super V> action)
-
void print()
-
void print(final String streamName)
-
void print(final Serde<K> keySerde, final Serde<V> valSerde)
-
void print(final Serde<K> keySerde, final Serde<V> valSerde, final String streamName)
-
void writeAsText(final String filePath)
-
void writeAsText(final String filePath, final String streamName)
-
void writeAsText(final String filePath, final Serde<K> keySerde, final Serde<V> valSerde)
-
void writeAsText(final String filePath, final String streamName, final Serde<K> keySerde, final Serde<V> valSerde)
上述方法已被弃用,转而使用 Interactive Queries API。 如果要查询支持 KTable 的状态存储的当前内容,请使用以下方法:
- 拨打
KafkaStreams.store(final String storeName, final QueryableStoreType<T> queryableStoreType)
- 然后调用 以迭代 的键。
ReadOnlyKeyValueStore.all()
KTable
如果要查看 的 changelog 流,则可以调用 。KTable
KTable.toStream().print(Printed.toSysOut)
使用 exactly-once 语义的指标:
如果通过参数启用处理(EOS 版本 1),则
在内部,Streams 从每线程生产者切换到每任务生产者运行时模型。
使用 (EOS 版本 2) 确实使用每个线程的生产者,因此不会更改,
与本案相比)。
为了区分不同的生产者,生产者还对这种情况的任务 ID 进行编码。
由于创建者用于报告 JMX 指标,因此可能需要更新接收这些指标的工具。"exactly_once"
processing.guarantee
"exactly_once_beta"
client.id
"at_least_once"
client.id
client.id
创建者的命名架构:client.id
- at-least-once(默认):
[client.Id]-StreamThread-[sequence-number]
- exactly-once 中:
[client.Id]-StreamThread-[sequence-number]-[taskId]
- exactly-once-beta 版本:
[client.Id]-StreamThread-[sequence-number]
[client.Id]
通过 Streams 配置参数设置,或默认为 ( 是随机 UUID)。client.id
[application.id]-[processId]
[processId]
0.10.2.1 中的显著变化
中的参数更新 :StreamsConfig
- 嵌入式生产者和使用者的默认配置值已更改,以提高 Kafka Streams 应用程序的弹性
retries
max.poll.interval.ms
0.10.2.0 中的 Streams API 更改
中的新方法 :KafkaStreams
- 设置侦听器以对应用程序状态更改做出反应
setStateListener(StateListener listener)
- 通过以下方式检索当前应用程序状态
state()
- 通过以下方式检索全局 Metrics 注册表
metrics()
- 通过以下方式关闭应用程序时应用超时
close(long timeout, TimeUnit timeUnit)
- 在检索 Kafka Streams 信息时指定自定义缩进
toString(String indent)
中的参数更新 :StreamsConfig
- 参数已弃用;Kafka Streams 应用程序不再与 ZooKeeper 交互以进行主题管理,而是使用新的代理管理协议
(参见 KIP-4,“主题管理架构”部分
zookeeper.connect
) - 为 metrics、security 和 client configurations 添加了许多新参数
界面变化:StreamsMetrics
- 删除的方法:
addLatencySensor()
- 新增方法: , , ,
addLatencyAndThroughputSensor()
addThroughputSensor()
recordThroughput()
addSensor()
removeSensor()
中的新方法 :TopologyBuilder
- 添加了允许为每个源节点定义策略的重载
addSource()
auto.offset.reset
- 添加了添加全局 S 的方法
addGlobalStore()
StateStore
中的新方法 :KStreamBuilder
- 添加了 和 的重载,允许为每个输入流/表定义策略
stream()
table()
auto.offset.reset
- 添加了创建
globalKTable()
GlobalKTable
的新联接 :KStream
- 添加了 to join with 的重载
join()
KTable
- 添加了 和 to join with 的重载
join()
leftJoin()
GlobalKTable
- 请注意,0.10.2 中的连接语义得到了改进,因此与 0.10.0.x 和 0.10.1.x 相比,您可能会看到不同的结果 (参见 Apache Kafka wiki 中的 Kafka Streams Join Semantics)
对齐的 -key 连接处理:null
KTable
- 与所有其他 KTable 操作一样,联接不再在键记录上引发异常,而是静默删除这些记录
KTable-KTable
null
新窗口类型 Session Windows:
- 添加了用于指定会话窗口的类
SessionWindows
- 为方法 、 添加了重载,并允许会话窗口聚合
KGroupedStream
count()
reduce()
aggregate()
更改为 :TimestampExtractor
- method 现在有第二个参数
extract()
- 新的默认时间戳提取器类(它提供与旧的(和已删除的)默认提取器相同的行为
FailOnInvalidTimestamp
ConsumerRecordTimestampExtractor
) - 新的替代时间戳提取器类和
LogAndSkipOnInvalidTimestamp
UsePreviousTimeOnInvalidTimestamps
许多 DSL 接口、类和方法的宽松类型约束(参见 KIP-100)。
0.10.1.0 中的 Streams API 更改
流分组和聚合分为两种方法:
- 旧版本:KStream #aggregateByKey()、#reduceByKey() 和 #countByKey()
- 新增:KStream#groupByKey() 加上 KGroupedStream #aggregate()、#reduce() 和 #count()
- 示例:stream.countByKey() 更改为 stream.groupByKey().count()
自动重新分区:
- 不再需要在键更改运算符之后和聚合/连接之前调用 through()
- 示例:stream.selectKey(...)。through(...) 的countByKey() 更改为 stream.selectKey().groupByKey().count()
拓扑生成器:
- 方法 #sourceTopics(String applicationId) 和 #topicGroups(String applicationId) 简化为 #sourceTopics() 和 #topicGroups()
DSL:用于指定状态存储名称的新参数:
- 新的交互式查询功能需要为所有源 KTables 和窗口聚合结果 KTables 指定存储名称(以前的参数 “operator/window name” 现在是 storeName)
- KStreamBuilder#table(String topic) 更改为 #topic(String topic, String storeName)
- KTable#through(String topic) 更改为 #through(String topic, String storeName)
- KGroupedStream #aggregate()、#reduce() 和 #count() 需要额外的参数 “String storeName”
- 示例:stream.countByKey(TimeWindows.of(“windowName”, 1000)) 更改为 stream.groupByKey().count(TimeWindows.of(1000), “countStoreName”)
窗口:
- 窗口不再命名:TimeWindows.of(“name”, 1000) 更改为 TimeWindows.of(1000) (参见 DSL:用于指定状态存储名称的新参数)
- JoinWindows 不再具有默认大小:JoinWindows.of(“name”).within(1000) 更改为 JoinWindows.of(1000)
Streams API 代理兼容性
下表显示了哪些版本的 Kafka Streams API 与各种 Kafka 代理版本兼容。
Kafka Broker (列) | |||
---|---|---|---|
Kafka Streams API(行) | 0.10.0.x | 0.10.1.x 和 0.10.2.x | 0.11.0.x 和 1.0.x 和 1.1.x 和 2.0.x 和 2.1.x 和 2.2.x 和 2.3.x 和 2.4.x 和 2.5.x 和 2.6.x 和 2.7.x 和 2.8.x 和 3.0.x 和 3.1.x 和 3.2.x 和 3.3.x 和 3.4.x 和 3.5.x 和 3.6.x 和 3.7.x 和 3.8.x 和 3.9.x |
0.10.0.x | 相容 | 相容 | 相容 |
0.10.1.x 和 0.10.2.x | 相容 | 相容 | |
0.11.0.x | 与 Exactly-Once Turned OFF 兼容(需要 Broker 版本 0.11.0.x 或更高版本) |
相容 | |
1.0.x 和 1.1.x 和 2.0.x 和 2.1.x 以及 2.2.0 和 2.2.0 |
兼容 Precisely-Once Off (需要 Broker 版本 0.11.0.x 或更高版本); 需要消息格式 0.10 或更高; 不支持 消息标头(需要 Broker 版本 0.11.0.x 或更高版本 ,消息格式为 0.11 或更高版本) |
相容;需要消息格式 0.10 或更高; 如果使用消息标头,则需要消息格式 0.11 或更高 |
|
2.2.1 和 2.3.x 和 2.4.x 和 2.5.x 和 2.6.x 和 2.7.x 和 2.8.x 和 3.0.x 和 3.1.x 和 3.2.x 和 3.3.x 和 3.4.x 和 3.5.x 和 3.6.x 和 3.7.x 和 3.8.x 和 3.9.x |
相容;需要消息格式 0.11 或更高; 启用 Exactly-Once v2 需要 2.4.x 或更高版本 |