文档
Kafka 3.6 文档
早期版本: 0.7.x, 0.8.0, 0.8.1.X, 0.8.2.X, 0.9.0.X, 0.10.0.X, 0.10.1.X, 0.10.2.X, 0.11.0.X, 1.0.X, 1.1.X, 2.0.X, 2.1.X, 2.2.X, 2.3.X, 2.4.X, 2.5.X, 2.6.X, 2.7.X, 2.8.X, 3.0.X, 3.1.X, 3.2.X,3.3.X、3.4.X、3.5.X.1. 开始使用
1.1 介绍
1.2 使用案例
以下是 Apache Kafka® 的一些常见使用案例的描述。 有关其中许多领域的实际应用概述,请参阅此博客文章。
消息
Kafka 可以很好地替代更传统的消息代理。 使用消息代理的原因有很多(将处理与数据创建者分离、缓冲未处理的消息等)。 与大多数消息传递系统相比,Kafka 具有更好的吞吐量、内置分区、复制和容错功能,这使其成为一个很好的 适用于大规模消息处理应用程序的解决方案。根据我们的经验,消息传递用途通常吞吐量相对较低,但可能需要较低的端到端延迟,并且通常依赖于强大的 Kafka 提供的持久性保证。
在这个领域,Kafka 可与 ActiveMQ 或 RabbitMQ 等传统消息传递系统相媲美。
网站活动跟踪
Kafka 的原始用例是能够将用户活动跟踪管道重建为一组实时发布-订阅源。 这意味着网站活动(页面查看、搜索或用户可能执行的其他作)将发布到中心主题,每个活动类型一个主题。 这些订阅源可用于一系列使用案例,包括实时处理、实时监控和加载到 Hadoop 或 用于离线处理和报告的离线数据仓库系统。活动跟踪通常非常高,因为每个用户页面视图都会生成许多活动消息。
指标
Kafka 通常用于运营监控数据。 这涉及聚合来自分布式应用程序的统计数据,以生成运营数据的集中馈送。日志聚合
许多人使用 Kafka 作为日志聚合解决方案的替代品。 日志聚合通常从服务器收集物理日志文件,并将它们放在一个中心位置(可能是文件服务器或 HDFS)进行处理。 Kafka 抽象出文件的细节,并以消息流的形式提供更清晰的日志或事件数据抽象。 这允许更低的处理延迟,并更轻松地支持多个数据源和分布式数据使用。 与 Scribe 或 Flume 等以日志为中心的系统相比,Kafka 提供了同样好的性能、更强的持久性保证(由于复制)、 以及更低的端到端延迟。流处理
Kafka 的许多用户在由多个阶段组成的处理管道中处理数据,其中原始输入数据从 Kafka 主题中使用,然后 聚合、丰富或以其他方式转换为新主题以供进一步使用或后续处理。 例如,用于推荐新闻文章的处理管道可能会从 RSS 源中抓取文章内容并将其发布到“文章”主题; 进一步的处理可能会规范化或删除重复的内容,并将清理后的文章内容发布到新主题; 最终处理阶段可能会尝试向用户推荐此内容。 此类处理管道根据各个主题创建实时数据流图。 从 0.10.0.0 开始,Apache Kafka 中提供了一个名为 Kafka Streams 的轻量级但功能强大的流处理库,用于执行上述数据处理。 除了 Kafka Streams,其他开源流处理工具还包括 Apache Storm 和 Apache Samza。事件溯源
事件溯源是一种应用程序设计风格,其中状态更改被记录为 按时间排序的记录序列。Kafka 对非常大的存储日志数据的支持使其成为以这种样式构建的应用程序的出色后端。提交日志
Kafka 可以用作分布式系统的一种外部提交日志。日志有助于在节点之间复制数据并充当重新同步 失败节点恢复其数据的机制。 Kafka 中的日志压缩功能有助于支持这种用法。 在这个用法中,Kafka 类似于 Apache BookKeeper 项目。1.3 快速开始
1.4 生态系统
在主发行版之外,还有大量工具与 Kafka 集成。 生态系统页面列出了其中的许多工具,包括流处理系统、Hadoop 集成、监控和部署工具。1.5 从以前的版本升级
2. API 接口
3. 配置
4. 设计
5. 实施
6. 运营
以下是根据 LinkedIn 的使用情况和经验,将 Kafka 作为生产系统实际运行的一些信息。请将您知道的任何其他提示发送给我们。6.1 基本 Kafka作
本节将回顾您将在 Kafka 集群上执行的最常见作。本节中回顾的所有工具都可以在 Kafka 发行版的目录下找到,如果每个工具在没有参数的情况下运行,它将打印所有可能的命令行选项的详细信息。bin/
添加和删除主题
您可以选择手动添加主题,也可以在首次将数据发布到不存在的主题时自动创建主题。如果主题是自动创建的,则可能需要调整用于自动创建主题的默认主题配置。使用主题工具添加和修改主题:
> bin/kafka-topics.sh --bootstrap-server broker_host:port --create --topic my_topic_name \
--partitions 20 --replication-factor 3 --config x=y
复制因子控制将复制写入的每条消息的服务器数。如果您的复制因子为 3,则在您失去对数据的访问权限之前,最多 2 台服务器可能会失败。我们建议您使用 2 或 3 的复制因子,以便您可以在不中断数据使用的情况下透明地退回计算机。分区计数控制主题将被分片到多少个日志。分区计数有多种影响。首先,每个分区必须完全适合单个服务器。因此,如果您有 20 个分区,则完整数据集(以及读写负载)将由不超过 20 个服务器(不包括副本)处理。最后,分区计数会影响使用者的最大并行度。这在概念部分中进行了更详细的讨论。
每个分片分区日志都放置在 Kafka 日志目录下自己的文件夹中。此类文件夹的名称由主题名称(附加短划线 (-) 和分区 ID 组成。由于典型的文件夹名称不能超过 255 个字符,因此主题名称的长度将受到限制。我们假设分区数永远不会超过 100,000。因此,主题名称不能超过 249 个字符。这在文件夹名称中为短划线和可能长达 5 位的分区 ID 留出了足够的空间。
在命令行上添加的配置将覆盖服务器对数据应保留时间长度等内容的默认设置。此处记录了完整的按主题配置集。
修改主题
您可以使用相同的主题工具更改主题的配置或分区。要添加分区,您可以执行
> bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic my_topic_name \
--partitions 40
请注意,分区的一个用例是对数据进行语义分区,添加分区不会更改现有数据的分区,因此,如果消费者依赖该分区,这可能会打扰他们。也就是说,如果数据已分区,则此分区可能会通过添加分区来打乱顺序,但 Kafka 不会尝试以任何方式自动重新分配数据。hash(key) % number_of_partitions
要添加配置:
> bin/kafka-configs.sh --bootstrap-server broker_host:port --entity-type topics --entity-name my_topic_name --alter --add-config x=y
要删除配置:最后删除主题: > bin/kafka-configs.sh --bootstrap-server broker_host:port --entity-type topics --entity-name my_topic_name --alter --delete-config x
> bin/kafka-topics.sh --bootstrap-server broker_host:port --delete --topic my_topic_name
Kafka 目前不支持减少主题的分区数量。
有关更改主题的复制因子的说明,请参阅此处。
正常关闭
Kafka 集群将自动检测任何代理关闭或故障,并为该计算机上的分区选举新的领导者。无论服务器发生故障还是因维护或配置更改而故意关闭,都会发生这种情况。对于后一种情况,Kafka 支持一种更优雅的机制来停止服务器,而不仅仅是杀死它。 当服务器正常停止时,它将利用两个优化:- 它会将其所有日志同步到磁盘,以避免在重新启动时需要进行任何日志恢复(即验证日志尾部所有消息的校验和)。日志恢复需要时间,因此这会加快有意重启的速度。
- 它会在关闭之前将服务器作为领导者的任何分区迁移到其他副本。这将使领导权转移更快,并将每个分区不可用的时间缩短到几毫秒。
controlled.shutdown.enable=true
平衡领导力
每当 broker 停止或崩溃时,该 broker 分区的领导权就会转移到其他副本。当 broker 重新启动时,它只会是其所有分区的 follower,这意味着它不会用于客户端读取和写入。为了避免这种不平衡,Kafka 有一个首选副本的概念。如果分区的副本列表为 1,5,9,则节点 1 优先于节点 5 或 9 作为领导者,因为它在副本列表中较早。默认情况下,Kafka 集群将尝试恢复对首选副本的领导地位。此行为配置为:
auto.leader.rebalance.enable=true
您也可以将其设置为 false,但随后需要通过运行以下命令来手动恢复对已恢复副本的领导地位: > bin/kafka-leader-election.sh --bootstrap-server broker_host:port --election-type preferred --all-topic-partitions
跨机架平衡副本
机架感知功能将同一分区的副本分布在不同的机架上。这将 Kafka 为代理故障提供的保证扩展到机架故障,从而限制了机架上的所有代理同时发生故障时数据丢失的风险。该功能还可以应用于其他代理分组,例如 EC2 中的可用区。 您可以通过向代理配置添加属性来指定代理属于特定机架:当创建、修改主题或重新分发副本时,将遵循机架约束,确保副本跨越尽可能多的机架(分区将跨越 min(#racks, replication-factor) 不同的机架)。 用于将副本分配给 broker 的算法可确保每个 broker 的 leader 数量保持不变,而不管 broker 在机架中的分布方式如何。这确保了平衡的吞吐量。 但是,如果为机架分配了不同数量的代理,则副本的分配将不均匀。代理较少的机架将获得更多副本,这意味着它们将使用更多存储并将更多资源投入到复制中。因此,为每个机架配置相同数量的代理是明智的。 broker.rack=my-rack-id
在集群和地理复制之间镜像数据
Kafka 管理员可以定义跨越单个 Kafka 集群、数据中心或地理区域边界的数据流。有关更多信息,请参阅异地复制部分。
检查使用者位置
有时,了解消费者的位置很有用。我们有一个工具,可以显示消费者组中所有消费者的位置,以及他们与日志末尾相差多远。要在名为 my-group 的使用者组上运行此工具,请使用名为 my-topic 的主题,如下所示: > bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
my-topic 0 2 4 2 consumer-1-029af89c-873c-4751-a720-cefd41a669d6 /127.0.0.1 consumer-1
my-topic 1 2 3 1 consumer-1-029af89c-873c-4751-a720-cefd41a669d6 /127.0.0.1 consumer-1
my-topic 2 2 3 1 consumer-2-42c1abd4-e3b2-425d-a8bb-e1ea49b29bb2 /127.0.0.1 consumer-2
管理消费组
使用 ConsumerGroupCommand 工具,我们可以列出、描述或删除使用者组。可以手动删除使用者组,也可以在该组的上次提交偏移量过期时自动删除。仅当组没有任何活动成员时,手动删除才有效。 例如,要列出所有主题中的所有使用者组:要查看偏移量,如前所述,我们像这样“描述”使用者组:还有许多其他 “describe” 选项可用于提供有关使用者组的更多详细信息: > bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
test-consumer-group
> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
topic3 0 241019 395308 154289 consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1 consumer2
topic2 1 520678 803288 282610 consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1 consumer2
topic3 1 241018 398817 157799 consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1 consumer2
topic1 0 854144 855809 1665 consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1 consumer1
topic2 0 460537 803290 342753 consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1 consumer1
topic3 2 243655 398812 155157 consumer4-117fe4d3-c6c1-4178-8ee9-eb4a3954bee0 /127.0.0.1 consumer4
- --members:此选项提供使用者组中所有活动成员的列表。
> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --members CONSUMER-ID HOST CLIENT-ID #PARTITIONS consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1 consumer1 2 consumer4-117fe4d3-c6c1-4178-8ee9-eb4a3954bee0 /127.0.0.1 consumer4 1 consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1 consumer2 3 consumer3-ecea43e4-1f01-479f-8349-f9130b75d8ee /127.0.0.1 consumer3 0
- --members --verbose:除了上述 “--members” 选项报告的信息之外,此选项还提供分配给每个成员的分区。
> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --members --verbose CONSUMER-ID HOST CLIENT-ID #PARTITIONS ASSIGNMENT consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1 consumer1 2 topic1(0), topic2(0) consumer4-117fe4d3-c6c1-4178-8ee9-eb4a3954bee0 /127.0.0.1 consumer4 1 topic3(2) consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1 consumer2 3 topic2(1), topic3(0,1) consumer3-ecea43e4-1f01-479f-8349-f9130b75d8ee /127.0.0.1 consumer3 0 -
- --offsets:这是默认的 describe 选项,提供与 “--describe” 选项相同的输出。
- --state:此选项提供有用的组级别信息。
> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --state COORDINATOR (ID) ASSIGNMENT-STRATEGY STATE #MEMBERS localhost:9092 (0) range Stable 4
> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group my-group --group my-other-group
Deletion of requested consumer groups ('my-group', 'my-other-group') was successful.
要重置消费组的偏移量,可以使用 “--reset-offsets” 选项。 此选项一次支持一个使用者组。它需要定义以下范围:--all-topics 或 --topic。必须选择一个范围,除非您使用 '--from-file' 方案。此外,首先确保使用者实例处于非活动状态。 有关更多详细信息,请参阅 KIP-122。
它有 3 个执行选项:
- (默认)显示要重置的偏移量。
- --execute : 执行 --reset-offsets 进程。
- --export :将结果导出为 CSV 格式。
--reset-offsets 还有以下场景可供选择(必须至少选择一个场景):
- --to-datetime <String: datetime> : 将偏移量重置为与日期时间的偏移量。格式:'YYYY-MM-DDTHH:mm:SS.sss'
- --to-earliest : 将偏移量重置为最早的偏移量。
- --to-latest : 将偏移量重置为最新偏移量。
- --shift-by <Long: 偏移量> : 重置偏移量 'n' 移动电流偏移量,其中 'n' 可以是正数或负数。
- --from-file :将偏移量重置为 CSV 文件中定义的值。
- --to-current :将偏移量重置为当前偏移量。
- --by-duration <String: duration> : 重置当前时间戳的偏移量以按持续时间偏移量。格式:'PnDTnHnMnS'
- --to-offset : 将偏移量重置为特定偏移量。
例如,将消费组的偏移量重置为最新的偏移量:
> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --group consumergroup1 --topic topic1 --to-latest
TOPIC PARTITION NEW-OFFSET
topic1 0 0
如果您使用的是旧的高级使用者并将组元数据存储在 ZooKeeper 中(即),则 pass 而不是 :offsets.storage=zookeeper
--zookeeper
--bootstrap-server
> bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --list
扩展您的集群
将服务器添加到 Kafka 集群非常简单,只需为它们分配一个唯一的代理 ID,然后在新服务器上启动 Kafka。但是,这些新服务器不会自动分配任何数据分区,因此,除非将分区移动到这些服务器,否则在创建新主题之前,它们不会执行任何工作。因此,通常当您将计算机添加到集群时,您将希望将一些现有数据迁移到这些计算机。迁移数据的过程是手动启动的,但完全自动化。在幕后,Kafka 会将新服务器添加为它正在迁移的分区的追随者,并允许它完全复制该分区中的现有数据。当新服务器完全复制此分区的内容并加入同步副本时,现有副本之一将删除其分区的数据。
分区重新分配工具可用于跨代理移动分区。理想的分区分布将确保所有代理之间的数据负载和分区大小均匀。分区重新分配工具无法自动研究 Kafka 集群中的数据分布并移动分区以实现均匀的负载分布。因此,管理员必须弄清楚应该移动哪些主题或分区。
分区重新分配工具可以在 3 种互斥模式下运行:
- --generate:在此模式下,给定主题列表和代理列表,该工具会生成候选重新分配,以将指定主题的所有分区移动到新代理。此选项仅提供了一种在给定主题和目标代理列表的情况下生成分区重新分配计划的便捷方法。
- --execute:在此模式下,该工具根据用户提供的重新分配计划启动分区的重新分配。(使用 --reassignment-json-file 选项)。这可以是管理员手动制作的自定义重新分配计划,也可以是使用 --generate 选项提供的
- --verify:在此模式下,该工具会验证上次 --execute 期间列出的所有分区的重新分配状态。状态可以是 successfully completed、failed 或 in progress
自动将数据迁移到新计算机
分区重新分配工具可用于将当前代理集中的某些主题移动到新添加的代理。这在扩展现有集群时通常很有用,因为与一次移动一个分区相比,将整个主题移动到新的代理集更容易。用于执行此作时,用户应提供应移动到新代理集的主题列表和新代理的目标列表。然后,该工具将给定主题列表的所有分区均匀分布到新的代理集。在此移动期间,主题的复制因子保持不变。实际上,主题输入列表的所有分区的副本都从旧的代理集移动到新添加的代理。例如,以下示例会将主题 foo1,foo2 的所有分区移动到新的代理集 5,6。在这次移动结束时,主题 foo1 和 foo2 的所有分区将仅存在于代理 5,6 上。
由于该工具接受主题输入列表作为 json 文件,因此您首先需要确定要移动的主题并创建 json 文件,如下所示:
> cat topics-to-move.json
{"topics": [{"topic": "foo1"},
{"topic": "foo2"}],
"version":1
}
json 文件准备就绪后,使用分区重新分配工具生成候选分配: > bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --topics-to-move-json-file topics-to-move.json --broker-list "5,6" --generate
Current partition replica assignment
{"version":1,
"partitions":[{"topic":"foo1","partition":0,"replicas":[2,1]},
{"topic":"foo1","partition":1,"replicas":[1,3]},
{"topic":"foo1","partition":2,"replicas":[3,4]},
{"topic":"foo2","partition":0,"replicas":[4,2]},
{"topic":"foo2","partition":1,"replicas":[2,1]},
{"topic":"foo2","partition":2,"replicas":[1,3]}]
}
Proposed partition reassignment configuration
{"version":1,
"partitions":[{"topic":"foo1","partition":0,"replicas":[6,5]},
{"topic":"foo1","partition":1,"replicas":[5,6]},
{"topic":"foo1","partition":2,"replicas":[6,5]},
{"topic":"foo2","partition":0,"replicas":[5,6]},
{"topic":"foo2","partition":1,"replicas":[6,5]},
{"topic":"foo2","partition":2,"replicas":[5,6]}]
}
该工具会生成一个候选分配,该分配会将所有分区从主题 foo1,foo2 移动到代理 5,6。但是请注意,此时分区移动尚未开始,它只是告诉您当前分配和建议的新分配。应保存当前分配,以防您要回滚到它。新分配应保存在 json 文件(例如 expand-cluster-reassignment.json)中,以便使用 --execute 选项输入到工具中,如下所示:
> bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file expand-cluster-reassignment.json --execute
Current partition replica assignment
{"version":1,
"partitions":[{"topic":"foo1","partition":0,"replicas":[2,1]},
{"topic":"foo1","partition":1,"replicas":[1,3]},
{"topic":"foo1","partition":2,"replicas":[3,4]},
{"topic":"foo2","partition":0,"replicas":[4,2]},
{"topic":"foo2","partition":1,"replicas":[2,1]},
{"topic":"foo2","partition":2,"replicas":[1,3]}]
}
Save this to use as the --reassignment-json-file option during rollback
Successfully started partition reassignments for foo1-0,foo1-1,foo1-2,foo2-0,foo2-1,foo2-2
最后,--verify 选项可以与该工具一起使用,以检查分区重新分配的状态。请注意,相同的expand-cluster-reassignment.json(与 --execute 选项一起使用)应与 --verify 选项一起使用:
> bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file expand-cluster-reassignment.json --verify
Status of partition reassignment:
Reassignment of partition [foo1,0] is completed
Reassignment of partition [foo1,1] is still in progress
Reassignment of partition [foo1,2] is still in progress
Reassignment of partition [foo2,0] is completed
Reassignment of partition [foo2,1] is completed
Reassignment of partition [foo2,2] is completed
自定义分区分配和迁移
分区重新分配工具还可用于有选择地将分区的副本移动到一组特定的代理。以这种方式使用时,假定用户知道重新分配计划,并且不需要该工具生成候选重新分配,从而有效地跳过 --generate 步骤并直接进入 --execute 步骤例如,以下示例将主题 foo1 的分区 0 移动到代理 5,6,将主题 foo2 的分区 1 移动到代理 2,3:
第一步是在 json 文件中手动制作自定义重新分配计划:
> cat custom-reassignment.json
{"version":1,"partitions":[{"topic":"foo1","partition":0,"replicas":[5,6]},{"topic":"foo2","partition":1,"replicas":[2,3]}]}
然后,使用带有 --execute 选项的 json 文件开始重新分配过程: > bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file custom-reassignment.json --execute
Current partition replica assignment
{"version":1,
"partitions":[{"topic":"foo1","partition":0,"replicas":[1,2]},
{"topic":"foo2","partition":1,"replicas":[3,4]}]
}
Save this to use as the --reassignment-json-file option during rollback
Successfully started partition reassignments for foo1-0,foo2-1
--verify 选项可以与该工具一起使用,以检查分区重新分配的状态。请注意,相同的custom-reassignment.json(与 --execute 选项一起使用)应与 --verify 选项一起使用:
> bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file custom-reassignment.json --verify
Status of partition reassignment:
Reassignment of partition [foo1,0] is completed
Reassignment of partition [foo2,1] is completed
停用代理
分区重新分配工具尚不能自动生成用于停用代理的重新分配计划。因此,管理员必须提出一个重新分配计划,将托管在 broker 上的所有分区的副本移动到其余的 broker 中。这可能相对繁琐,因为重新分配需要确保所有副本都不会从已停用的代理移动到仅另一个代理。为了简化此过程,我们计划在未来添加对停用 broker 的工具支持。增加复制因子
增加现有分区的复制因子很容易。只需在自定义重新分配 json 文件中指定额外的副本,并将其与 --execute 选项一起使用,即可增加指定分区的复制因子。例如,以下示例将主题 foo 的分区 0 的复制因子从 1 增加到 3。在增加复制因子之前,分区的唯一副本存在于代理 5 上。作为增加复制因子的一部分,我们将在代理 6 和 7 上添加更多副本。
第一步是在 json 文件中手动制作自定义重新分配计划:
> cat increase-replication-factor.json
{"version":1,
"partitions":[{"topic":"foo","partition":0,"replicas":[5,6,7]}]}
然后,使用带有 --execute 选项的 json 文件开始重新分配过程: > bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file increase-replication-factor.json --execute
Current partition replica assignment
{"version":1,
"partitions":[{"topic":"foo","partition":0,"replicas":[5]}]}
Save this to use as the --reassignment-json-file option during rollback
Successfully started partition reassignment for foo-0
--verify 选项可以与该工具一起使用,以检查分区重新分配的状态。请注意,相同的increase-replication-factor.json(与 --execute 选项一起使用)应与 --verify 选项一起使用:
> bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file increase-replication-factor.json --verify
Status of partition reassignment:
Reassignment of partition [foo,0] is completed
您还可以使用 kafka-topics 工具验证复制因子的增加: > bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic foo --describe
Topic:foo PartitionCount:1 ReplicationFactor:3 Configs:
Topic: foo Partition: 0 Leader: 5 Replicas: 5,6,7 Isr: 5,6,7
限制数据迁移期间的带宽使用
Kafka 允许您对复制流量应用限制,为用于在计算机之间移动副本的带宽设置上限。这在重新平衡集群、引导新代理或者添加或删除代理时非常有用,因为它限制了这些数据密集型作对用户的影响。 有两个接口可用于启用节流。最简单、最安全的是在调用 kafka-reassign-partitions.sh 时应用限制,但 kafka-configs.sh 也可以用于直接查看和更改限制值。 因此,例如,如果您要执行再平衡,使用以下命令,它将以不超过 50MB/s 的速度移动分区。 当您执行此脚本时,您将看到 throttle engage:$ bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --execute --reassignment-json-file bigger-cluster.json --throttle 50000000
The inter-broker throttle limit was set to 50000000 B/s
Successfully started partition reassignment for foo1-0
如果您希望在重新平衡期间更改限制,例如增加吞吐量以更快地完成,您可以通过使用 --additional 选项重新运行 execute 命令来执行此作,并传递相同的 reassignment-json-file:
$ bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --additional --execute --reassignment-json-file bigger-cluster.json --throttle 700000000 The inter-broker throttle limit was set to 700000000 B/s
再平衡完成后,管理员可以使用 --verify 选项检查再平衡的状态。 如果再平衡已完成,将通过 --verify 命令删除限制。重要的是 重新平衡完成后,管理员通过使用 --verify 选项。否则,可能会导致常规复制流量受到限制。
执行 --verify 选项并完成重新分配时,脚本将确认已删除限制:
> bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --verify --reassignment-json-file bigger-cluster.json
Status of partition reassignment:
Reassignment of partition [my-topic,1] is completed
Reassignment of partition [my-topic,0] is completed
Clearing broker-level throttles on brokers 1,2,3
Clearing topic-level throttles on topic my-topic
管理员还可以使用 kafka-configs.sh 验证分配的配置。有两对节气门 配置。第一对是指 throttle 值本身。这是在 broker level 中,使用动态属性:
leader.replication.throttled.rate
follower.replication.throttled.rate
然后是受限制副本的枚举集的配置对:
leader.replication.throttled.replicas
follower.replication.throttled.replicas
哪些是按主题配置的。
所有四个 config 值都由 kafka-reassign-partitions.sh 自动分配(如下所述)。
要查看限制配置,请执行以下作:
> bin/kafka-configs.sh --describe --bootstrap-server localhost:9092 --entity-type brokers
Configs for brokers '2' are leader.replication.throttled.rate=700000000,follower.replication.throttled.rate=700000000
Configs for brokers '1' are leader.replication.throttled.rate=700000000,follower.replication.throttled.rate=700000000
这显示了应用于复制协议的 leader 端和 follower 端的限制。默认情况下,两侧 分配相同的受限制吞吐量值。
要查看受限制的副本列表,请执行以下作:
> bin/kafka-configs.sh --describe --bootstrap-server localhost:9092 --entity-type topics
Configs for topic 'my-topic' are leader.replication.throttled.replicas=1:102,0:101,
follower.replication.throttled.replicas=1:101,0:102
在这里,我们看到领导限制应用于代理 102 上的分区 1 和代理 101 上的分区 0。同样, Follower Throttle 应用于分区 1 代理 101 和代理 102 上的分区 0。
默认情况下 kafka-reassign-partitions.sh 会将 leader 限制应用于 rebalance,其中任何一个都可能是 leader。 它将 follower 限制应用于所有移动目的地。因此,如果代理上有一个分区具有副本 101,102,被重新分配给 102,103,一个领导者限制, 对于该分区,将应用于 101,102,而 follower throttle 将仅应用于 103。
如果需要,您还可以使用 kafka-configs.sh 上的 --alter 开关手动更改限制配置。
安全使用受限制的复制
使用受限制的复制时应小心。特别:
(1) 油门拆卸:
重新分配完成后,应及时删除限制(通过运行 kafka-reassign-partitions.sh --verify 的 intent 函数)。(2) 确保进度:
如果 throttle 设置得太低,则与传入写入速率相比,复制可能会 捗。这在以下情况下发生:
max(BytesInPerSec) > throttle
其中 BytesInPerSec 是监控创建器对每个代理的写入吞吐量的指标。
管理员可以使用以下指标在再平衡期间监控复制是否正在进行:
kafka.server:type=FetcherLagMetrics,name=ConsumerLag,clientId=([-.\w]+),topic=([-.\w]+),partition=([0-9]+)
在复制过程中,滞后应不断减少。如果指标没有减少,管理员应该 增加 如上所述限制吞吐量。
设置配额
配额覆盖和默认值可以在 (user, client-id)、user 或 client-id 级别进行配置,如此处所述。 默认情况下,客户端会收到无限制的配额。 可以为每个 (user, client-id)、user 或 client-id 组设置自定义配额。为 (user=user1, client-id=clientA) 配置自定义配额:
> bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type users --entity-name user1 --entity-type clients --entity-name clientA
Updated config for entity: user-principal 'user1', client-id 'clientA'.
为 user=user1 配置自定义配额:为 client-id=clientA 配置自定义配额:可以通过指定 --entity-default 选项而不是 --entity-name 为每个(用户、client-id)、用户或 client-id 组设置默认配额。 > bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type users --entity-name user1
Updated config for entity: user-principal 'user1'.
> bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type clients --entity-name clientA
Updated config for entity: client-id 'clientA'.
为 user=userA 配置默认 client-id 配额:
> bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type users --entity-name user1 --entity-type clients --entity-default
Updated config for entity: user-principal 'user1', default client-id.
为用户配置默认配额:为 client-id 配置默认配额:下面介绍如何描述给定 (user, client-id) 的配额:描述给定用户的配额:描述给定 client-id 的配额:如果未指定实体名称,则描述指定类型的所有实体。例如,describe all users:类似地 for (user, client): > bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type users --entity-default
Updated config for entity: default user-principal.
> bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type clients --entity-default
Updated config for entity: default client-id.
> bin/kafka-configs.sh --bootstrap-server localhost:9092 --describe --entity-type users --entity-name user1 --entity-type clients --entity-name clientA
Configs for user-principal 'user1', client-id 'clientA' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200
> bin/kafka-configs.sh --bootstrap-server localhost:9092 --describe --entity-type users --entity-name user1
Configs for user-principal 'user1' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200
> bin/kafka-configs.sh --bootstrap-server localhost:9092 --describe --entity-type clients --entity-name clientA
Configs for client-id 'clientA' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200
> bin/kafka-configs.sh --bootstrap-server localhost:9092 --describe --entity-type users
Configs for user-principal 'user1' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200
Configs for default user-principal are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200
> bin/kafka-configs.sh --bootstrap-server localhost:9092 --describe --entity-type users --entity-type clients
Configs for user-principal 'user1', default client-id are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200
Configs for user-principal 'user1', client-id 'clientA' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200
6.2 数据中心
某些部署需要管理跨多个数据中心的数据管道。我们推荐的方法是在每个数据中心部署一个本地 Kafka 集群,每个数据中心中的应用程序实例仅与其本地集群交互,并在集群之间镜像数据(有关如何执行此操作,请参阅有关异地复制的文档)。这种部署模式允许数据中心充当独立实体,并允许我们集中管理和调整数据中心间复制。这允许每个设施独立运行,即使数据中心间链路不可用:发生这种情况时,镜像会滞后,直到链路恢复,此时它会赶上来。
对于需要所有数据的全局视图的应用程序,您可以使用镜像来提供集群,这些集群具有从所有数据中心的本地集群镜像的聚合数据。这些聚合集群用于需要完整数据集的应用程序的读取。
这不是唯一可能的部署模式。可以通过 WAN 读取或写入远程 Kafka 集群,但显然这会增加获取集群所需的任何延迟。
Kafka 自然会在创建者和使用者中对数据进行批处理,因此即使在高延迟连接上也可以实现高吞吐量。但为了实现这一点,可能需要使用 和 配置增加生产者、消费者和代理的 TCP 套接字缓冲区大小。此处记录了设置此项的适当方法。socket.send.buffer.bytes
socket.receive.buffer.bytes
通常不建议通过高延迟链接运行跨多个数据中心的单个 Kafka 集群。这将导致 Kafka 写入和 ZooKeeper 写入的非常高的复制延迟,如果位置之间的网络不可用,Kafka 和 ZooKeeper 都不会在所有位置保持可用。
6.3 异地复制(跨集群数据镜像)
异地复制概述
Kafka 管理员可以定义跨越单个 Kafka 集群、数据中心或地理区域边界的数据流。组织、技术或法律要求通常需要此类事件流式处理设置。常见方案包括:
- 异地复制
- 灾难恢复
- 将边缘集群馈送到一个中央聚合集群中
- 集群的物理隔离(例如生产与测试)
- 云迁移或混合云部署
- 法律和合规性要求
管理员可以使用 Kafka 的 MirrorMaker(版本 2)设置此类集群间数据流,该工具以流方式在不同 Kafka 环境之间复制数据。MirrorMaker 构建在 Kafka Connect 框架之上,支持以下功能:
- 复制主题(数据加配置)
- 复制使用者组(包括偏移量),以在集群之间迁移应用程序
- 复制 ACL
- 保留分区
- 自动检测新主题和分区
- 提供广泛的指标,例如跨多个数据中心/集群的端到端复制延迟
- 容错和水平可扩展的操作
注意:使用 MirrorMaker 进行异地复制可跨 Kafka 集群复制数据。这种集群间复制不同于 Kafka 的集群内复制,后者在同一 Kafka 集群内复制数据。
什么是复制流
借助 MirrorMaker,Kafka 管理员可以将主题、主题配置、使用者组及其偏移量以及 ACL 从一个或多个源 Kafka 集群复制到一个或多个目标 Kafka 集群,即跨集群环境。简而言之,MirrorMaker 使用 Connector 从源集群使用并生成到目标集群。
这些从源集群到目标集群的定向流称为复制流。它们使用 MirrorMaker 配置文件中的格式定义,如下所述。管理员可以基于这些流创建复杂的复制拓扑。{source_cluster}->{target_cluster}
以下是一些示例模式:
- 主动/主动高可用性部署:
A->B, B->A
- 主动/被动或主动/备用高可用性部署:
A->B
- 聚合(例如,从多个集群到一个集群):
A->K, B->K, C->K
- 扇出(例如,从一个集群到多个集群):
K->A, K->B, K->C
- 转发:
A->B, B->C, C->D
默认情况会复制所有主题和使用者组(排除的主题和使用者组除外)。但是,每个复制流都可以独立配置。例如,您可以定义仅将特定主题或使用者组从源集群复制到目标集群。
以下是有关如何配置从集群到集群的数据复制(主动/被动设置)的第一个示例:primary
secondary
# Basic settings
clusters = primary, secondary
primary.bootstrap.servers = broker3-primary:9092
secondary.bootstrap.servers = broker5-secondary:9092
# Define replication flows
primary->secondary.enabled = true
primary->secondary.topics = foobar-topic, quux-.*
配置异地复制
以下部分介绍如何配置和运行专用 MirrorMaker 集群。如果要在现有 Kafka Connect 集群或其他支持的部署设置中运行 MirrorMaker,请参阅 KIP-382:MirrorMaker 2.0,并注意配置设置的名称可能因部署模式而异。
除了以下部分涵盖的内容之外,有关配置设置的更多示例和信息,请访问:
- MirrorMakerConfig、MirrorConnectorConfig
- DefaultTopicFilter 用于主题,DefaultGroupFilter 用于使用者组
- connect-mirror-maker.properties 中的示例配置设置,KIP-382:MirrorMaker 2.0
配置文件语法
MirrorMaker 配置文件通常命名为 。您可以在此文件中配置各种组件:connect-mirror-maker.properties
- MirrorMaker 设置:全局设置,包括群集定义(别名)以及每个复制流的自定义设置
- Kafka Connect 和连接器设置
- Kafka 生产者、使用者和管理客户端设置
示例:定义 MirrorMaker 设置(稍后将更详细地解释)。
# Global settings
clusters = us-west, us-east # defines cluster aliases
us-west.bootstrap.servers = broker3-west:9092
us-east.bootstrap.servers = broker5-east:9092
topics = .* # all topics to be replicated by default
# Specific replication flow settings (here: flow from us-west to us-east)
us-west->us-east.enabled = true
us-west->us.east.topics = foo.*, bar.* # override the default above
MirrorMaker 基于 Kafka Connect 框架。有关 Kafka Connect 的文档章节中所述的任何 Kafka Connect、源连接器和接收器连接器设置都可以直接在 MirrorMaker 配置中使用,而无需更改配置设置的名称或为其添加前缀。
示例:定义 MirrorMaker 要使用的自定义 Kafka Connect 设置。
# Setting Kafka Connect defaults for MirrorMaker
tasks.max = 5
大多数默认 Kafka Connect 设置都适用于开箱即用的 MirrorMaker,但 .为了在多个 MirrorMaker 进程之间均匀分配工作负载,建议根据可用硬件资源和要复制的主题分区的总数,至少(最好更高)设置为至少(最好更高)。tasks.max
tasks.max
2
您可以进一步自定义每个源或目标集群的 MirrorMaker 的 Kafka Connect 设置(更准确地说,您可以“每个连接器”指定 Kafka Connect 工作线程级配置设置)。使用 MirrorMaker 配置文件中的 格式。{cluster}.{config_name}
示例:定义集群的自定义连接器设置。us-west
# us-west custom settings
us-west.offset.storage.topic = my-mirrormaker-offsets
MirrorMaker 在内部使用 Kafka 生产者、使用者和管理客户端。通常需要为这些客户端进行自定义设置。要覆盖默认值,请在 MirrorMaker 配置文件中使用以下格式:
{source}.consumer.{consumer_config_name}
{target}.producer.{producer_config_name}
{source_or_target}.admin.{admin_config_name}
示例:定义自定义生产者、使用者、管理员客户端设置。
# us-west cluster (from which to consume)
us-west.consumer.isolation.level = read_committed
us-west.admin.bootstrap.servers = broker57-primary:9092
# us-east cluster (to which to produce)
us-east.producer.compression.type = gzip
us-east.producer.buffer.memory = 32768
us-east.admin.bootstrap.servers = broker8-secondary:9092
恰好一次
从版本 3.5.0 开始,专用 MirrorMaker 集群支持恰好一次语义。
对于新的 MirrorMaker 群集,对于应使用恰好一次语义写入的所有目标 Kafka 群集,将属性设置为 enabled。例如,要为对集群 us-east 的写入启用 exactly-once,可以使用以下配置:exactly.once.source.support
us-east.exactly.once.source.support = enabled
对于现有的 MirrorMaker 集群,需要进行两步升级。不要立即将属性设置为 enabled,而是首先将其设置为在集群中的所有节点上。完成此作后,可以在集群中的所有节点上将其设置为启用,以进行第二轮重启。exactly.once.source.support
preparing
无论哪种情况,都必须在 MirrorMaker 节点之间启用群集内通信,如 KIP-710 中所述。为此,必须将该属性设置为 。此外,许多可用于 Kafka Connect 的 REST 相关配置属性都可以在 MirrorMaker 配置中指定。例如,要在 MirrorMaker 群集中启用群集内通信,每个节点都侦听其本地计算机的端口 8080,应将以下内容添加到 MirrorMaker 配置文件中:dedicated.mode.enable.internal.rest
true
dedicated.mode.enable.internal.rest = true
listeners = http://localhost:8080
请注意,如果在生产环境中启用了群集内通信,强烈建议保护每个 MirrorMaker 节点启动的 REST 服务器。有关如何实现此目的的信息,请参阅 Kafka Connect 的配置属性。
此外,还建议在运行 MirrorMaker 时从复制的数据中筛选出中止事务的记录。为此,请确保将用于从源集群读取的使用者配置为 set to 。如果从 cluster 复制数据,则可以通过将以下内容添加到 MirrorMaker 配置文件中,对从该集群读取的所有复制流执行此作:isolation.level
read_committed
us-west
us-west.consumer.isolation.level = read_committed
最后要注意的是,在后台,MirrorMaker 使用 Kafka Connect 源连接器来复制数据。有关对这些类型的连接器的 exactly-once 支持的更多信息,请参阅相关的文档页面。
创建和启用复制流
要定义复制流,您必须首先在 MirrorMaker 配置文件中定义相应的源和目标 Kafka 集群。
clusters
(必需):以逗号分隔的 Kafka 集群“别名”列表{clusterAlias}.bootstrap.servers
(必需):特定集群的连接信息;以逗号分隔的 “bootstrap” Kafka 代理列表
示例:定义两个集群别名 和 ,包括其连接信息。primary
secondary
clusters = primary, secondary
primary.bootstrap.servers = broker10-primary:9092,broker-11-primary:9092
secondary.bootstrap.servers = broker5-secondary:9092,broker6-secondary:9092
其次,您必须根据需要显式启用单个复制流。请记住,流是定向的:如果需要双向(双向)复制,则必须在两个方向上启用流。{source}->{target}.enabled = true
# Enable replication from primary to secondary
primary->secondary.enabled = true
默认情况下,复制流会将除少数特殊主题和使用者组之外的所有主题和使用者组从源集群复制到目标集群,并自动检测任何新创建的主题和组。目标集群中复制的主题名称将以源集群的名称为前缀(请参阅下面的部分)。例如,源集群中的主题将被复制到目标集群中命名的主题。foo
us-west
us-west.foo
us-east
后续部分将介绍如何根据需要自定义此基本设置。
配置复制流
复制流的配置是顶级默认设置(例如,)的组合,在此基础上应用了特定于流的设置(如果有)(例如,)。要更改顶级默认值,请将相应的顶级设置添加到 MirrorMaker 配置文件中。要仅覆盖特定复制流的默认值,请使用语法格式 。topics
us-west->us-east.topics
{source}->{target}.{config.name}
最重要的设置是:
topics
:主题列表或定义源集群中要复制的主题的正则表达式(默认值:topics = .*
)topics.exclude
:主题列表或正则表达式,以随后排除与设置匹配的主题(默认值:topics
topics.exclude = .*[\-\.]internal, .*\.replica, __.*
)groups
:定义源集群中要复制的使用者组的主题或正则表达式列表(默认值:groups = .*
)groups.exclude
:主题列表或正则表达式,以随后排除与设置匹配的使用者组(默认值:groups
groups.exclude = console-consumer-.*, connect-.*, __.*
){source}->{target}.enable
:设置为 以启用复制流(默认值:true
false
)
例:
# Custom top-level defaults that apply to all replication flows
topics = .*
groups = consumer-group1, consumer-group2
# Don't forget to enable a flow!
us-west->us-east.enabled = true
# Custom settings for specific replication flows
us-west->us-east.topics = foo.*
us-west->us-east.groups = bar.*
us-west->us-east.emit.heartbeats = false
支持其他配置设置,在大多数情况下,这些设置可以保留其默认值。请参阅 MirrorMaker 配置。
保护复制流
MirrorMaker 支持与 Kafka Connect 相同的安全设置,因此请参阅链接的部分以了解更多信息。
示例:加密 MirrorMaker 与集群之间的通信。us-east
us-east.security.protocol=SSL
us-east.ssl.truststore.location=/path/to/truststore.jks
us-east.ssl.truststore.password=my-secret-password
us-east.ssl.keystore.location=/path/to/keystore.jks
us-east.ssl.keystore.password=my-secret-password
us-east.ssl.key.password=my-secret-password
目标集群中复制主题的自定义命名
目标集群中的复制主题(有时称为远程主题)将根据复制策略重命名。MirrorMaker 使用此策略来确保来自不同集群的事件(又名记录、消息)不会写入同一主题分区。默认情况下,根据 DefaultReplicationPolicy,目标集群中复制的主题名称的格式为:{source}.{source_topic_name}
us-west us-east
========= =================
bar-topic
foo-topic --> us-west.foo-topic
您可以使用以下设置自定义分隔符(默认:):.
replication.policy.separator
# Defining a custom separator
us-west->us-east.replication.policy.separator = _
如果您需要进一步控制复制主题的命名方式,您可以在 MirrorMaker 配置中实施自定义和覆盖(默认为 )。ReplicationPolicy
replication.policy.class
DefaultReplicationPolicy
防止配置冲突
MirrorMaker 进程通过其目标 Kafka 集群共享配置。当针对同一目标群集运行的 MirrorMaker 进程之间的配置不同时,此行为可能会导致冲突。
例如,以下两个 MirrorMaker 进程将是 racy 的:
# Configuration of process 1
A->B.enabled = true
A->B.topics = foo
# Configuration of process 2
A->B.enabled = true
A->B.topics = bar
在这种情况下,两个进程将通过 cluster 共享配置,这会导致冲突。根据两个进程中的哪一个是当选的 “领导者”,结果将是复制主题或主题,但不能同时复制两者。B
foo
bar
因此,在到同一目标群集的复制流中保持 MirrorMaker 配置一致非常重要。例如,这可以通过自动化工具或为整个组织使用单个共享的 MirrorMaker 配置文件来实现。
最佳实践:从远程消费,从农产品到本地
为了最大限度地减少延迟(“生产者滞后”),建议将 MirrorMaker 进程放置在尽可能靠近其目标集群的位置,即它向其生成数据的集群。这是因为 Kafka 生产者通常比 Kafka 使用者更难以处理不可靠或高延迟的网络连接。
First DC Second DC
========== =========================
primary --------- MirrorMaker --> secondary
(remote) (local)
要运行这种“从远程使用,生产到本地”设置,请在靠近目标集群的位置运行 MirrorMaker 进程,最好在目标集群所在的位置运行,并在命令行参数中显式设置这些“本地”集群(以空分隔的集群别名列表):--clusters
# Run in secondary's data center, reading from the remote `primary` cluster
$ ./bin/connect-mirror-maker.sh connect-mirror-maker.properties --clusters secondary
它告诉 MirrorMaker 进程给定的集群就在附近,并防止它复制数据或将配置发送到其他远程位置的集群。--clusters secondary
示例:主动/被动高可用性部署
以下示例显示了将主题从主 Kafka 环境复制到辅助 Kafka 环境的基本设置,而不是从辅助 Kafka 环境复制回主 Kafka 环境的基本设置。请注意,大多数生产设置都需要进一步配置,例如安全设置。
# Unidirectional flow (one-way) from primary to secondary cluster
primary.bootstrap.servers = broker1-primary:9092
secondary.bootstrap.servers = broker2-secondary:9092
primary->secondary.enabled = true
secondary->primary.enabled = false
primary->secondary.topics = foo.* # only replicate some topics
示例:主动/主动高可用性部署
以下示例显示了以两种方式在两个集群之间复制主题的基本设置。请注意,大多数生产设置都需要进一步配置,例如安全设置。
# Bidirectional flow (two-way) between us-west and us-east clusters
clusters = us-west, us-east
us-west.bootstrap.servers = broker1-west:9092,broker2-west:9092
Us-east.bootstrap.servers = broker3-east:9092,broker4-east:9092
us-west->us-east.enabled = true
us-east->us-west.enabled = true
关于防止复制“循环”(其中主题最初将从 A 复制到 B,然后复制的主题将再次从 B 复制到 A,依此类推)的注意事项:只要在同一个 MirrorMaker 配置文件中定义上述流,就无需显式添加设置来防止两个集群之间出现复制循环。topics.exclude
示例:多群集异地复制
让我们将前面部分的所有信息放在一个更大的示例中。假设有三个数据中心(西、东、北),每个数据中心有两个 Kafka 集群(例如,、)。本节中的示例显示了如何配置 MirrorMaker (1) 在每个数据中心内进行主动/主动复制,以及 (2) 进行跨数据中心复制 (XDCR)。west-1
west-2
首先,在配置中定义源集群和目标集群及其复制流:
# Basic settings
clusters: west-1, west-2, east-1, east-2, north-1, north-2
west-1.bootstrap.servers = ...
west-2.bootstrap.servers = ...
east-1.bootstrap.servers = ...
east-2.bootstrap.servers = ...
north-1.bootstrap.servers = ...
north-2.bootstrap.servers = ...
# Replication flows for Active/Active in West DC
west-1->west-2.enabled = true
west-2->west-1.enabled = true
# Replication flows for Active/Active in East DC
east-1->east-2.enabled = true
east-2->east-1.enabled = true
# Replication flows for Active/Active in North DC
north-1->north-2.enabled = true
north-2->north-1.enabled = true
# Replication flows for XDCR via west-1, east-1, north-1
west-1->east-1.enabled = true
west-1->north-1.enabled = true
east-1->west-1.enabled = true
east-1->north-1.enabled = true
north-1->west-1.enabled = true
north-1->east-1.enabled = true
然后,在每个数据中心中,启动一个或多个 MirrorMaker,如下所示:
# In West DC:
$ ./bin/connect-mirror-maker.sh connect-mirror-maker.properties --clusters west-1 west-2
# In East DC:
$ ./bin/connect-mirror-maker.sh connect-mirror-maker.properties --clusters east-1 east-2
# In North DC:
$ ./bin/connect-mirror-maker.sh connect-mirror-maker.properties --clusters north-1 north-2
使用此配置,生成到任何集群的记录将在数据中心内复制,并跨其他数据中心复制。通过提供该参数,我们确保每个 MirrorMaker 进程仅向附近的集群生成数据。--clusters
注意:从技术上讲,该参数在这里不是必需的。没有它,MirrorMaker 会正常工作。但是,吞吐量可能会受到数据中心之间的“生产者滞后”的影响,并且您可能会产生不必要的数据传输成本。--clusters
启动异地复制
您可以根据需要运行尽可能少或尽可能多的 MirrorMaker 进程(想想:节点、服务器)。由于 MirrorMaker 基于 Kafka Connect,因此配置为复制相同 Kafka 集群的 MirrorMaker 进程在分布式设置中运行:它们将找到彼此、共享配置(请参阅下面的部分)、对其工作进行负载平衡等。例如,如果要提高复制流的吞吐量,则一个选项是并行运行其他 MirrorMaker 进程。
要启动 MirrorMaker 进程,请运行以下命令:
$ ./bin/connect-mirror-maker.sh connect-mirror-maker.properties
启动后,可能需要几分钟时间,MirrorMaker 进程才会首次开始复制数据。
(可选)如前所述,您可以设置该参数以确保 MirrorMaker 进程仅向附近的集群生成数据。--clusters
# Note: The cluster alias us-west must be defined in the configuration file
$ ./bin/connect-mirror-maker.sh connect-mirror-maker.properties \
--clusters us-west
测试消费组复制时的注意事项:默认情况下,MirrorMaker 不会复制该工具创建的使用者组,您可以使用该工具在命令行上测试 MirrorMaker 设置。如果您也要复制这些使用者组,请相应地设置配置(默认值:)。请记住在完成测试后再次更新配置。kafka-console-consumer.sh
groups.exclude
groups.exclude = console-consumer-.*, connect-.*, __.*
停止异地复制
您可以通过使用以下命令发送 SIGTERM 信号来停止正在运行的 MirrorMaker 进程:
$ kill <MirrorMaker pid>
应用配置更改
要使配置更改生效,必须重新启动 MirrorMaker 进程。
监视异地复制
建议监控 MirrorMaker 进程,以确保所有定义的复制流都已启动并正确运行。MirrorMaker 基于 Connect 框架构建,并继承了 Connect 的所有指标,例如 .此外,MirrorMaker 在指标组下生成自己的指标。指标使用以下属性进行标记:source-record-poll-rate
kafka.connect.mirror
source
:源集群的别名(例如primary
)target
:目标集群的别名(例如secondary
)topic
:目标集群上的复制主题partition
:正在复制的分区
跟踪每个复制主题的指标。源集群可以从主题名称中推断出来。例如,复制 from 将产生以下指标:topic1
primary->secondary
target=secondary
topic=primary.topic1
partition=1
将发出以下指标:
# MBean: kafka.connect.mirror:type=MirrorSourceConnector,target=([-.w]+),topic=([-.w]+),partition=([0-9]+)
record-count # number of records replicated source -> target
record-age-ms # age of records when they are replicated
record-age-ms-min
record-age-ms-max
record-age-ms-avg
replication-latency-ms # time it takes records to propagate source->target
replication-latency-ms-min
replication-latency-ms-max
replication-latency-ms-avg
byte-rate # average number of bytes/sec in replicated records
# MBean: kafka.connect.mirror:type=MirrorCheckpointConnector,source=([-.w]+),target=([-.w]+)
checkpoint-latency-ms # time it takes to replicate consumer offsets
checkpoint-latency-ms-min
checkpoint-latency-ms-max
checkpoint-latency-ms-avg
这些指标不区分 created-at 和 log-append 时间戳。
6.4 多租户
多租户概述
作为一个高度可扩展的事件流平台,Kafka 被许多用户用作他们的中枢神经系统,实时连接来自不同团队和业务线的各种不同系统和应用程序。这种多租户集群环境需要适当的控制和管理,以确保这些不同需求和平共存。本节重点介绍设置此类共享环境的功能和最佳实践,这些环境应该可以帮助您运行满足 SLA/OLA 的集群,并最大限度地减少“嘈杂邻居”造成的潜在附带损害。
多租户是一个多方面的主题,包括但不限于:
- 为租户创建用户空间(有时称为命名空间)
- 使用数据保留策略等配置主题
- 通过加密、身份验证和授权保护主题和集群
- 使用配额和速率限制隔离租户
- 监控和计量
- 集群间数据共享(参见异地复制)
使用 Topic 命名为 Tenants 创建用户空间(命名空间)
运行多租户集群的 Kafka 管理员通常需要为每个租户定义用户空间。在本部分中,“用户空间”是主题的集合,这些主题在单个实体或用户的管理下分组在一起。
在 Kafka 中,数据的主要单位是主题。用户可以创建和命名每个主题。他们也可以删除它们,但无法直接重命名主题。相反,要重命名主题,用户必须创建一个新主题,将消息从原始主题移动到新主题,然后删除原始主题。考虑到这一点,建议根据分层主题命名结构定义逻辑空间。然后,可以将此设置与安全功能(例如带前缀的 ACL)结合使用,以隔离不同的空间和租户,同时还可以最大限度地减少保护集群中数据的管理开销。
这些逻辑用户空间可以以不同的方式进行分组,具体选择取决于您的组织喜欢如何使用 Kafka 集群。最常见的分组如下。
按团队或组织单位:在这里,团队是主要的聚合器。在团队是 Kafka 基础架构的主要用户的组织中,这可能是最佳分组。
主题命名结构示例:
<organization>.<team>.<dataset>.<event-name>
(例如,“acme.infosec.telemetry.logins”)
按项目或产品:在这里,一个团队管理多个项目。每个项目的凭据都不同,因此所有控件和设置将始终与项目相关。
主题命名结构示例:
<project>.<product>.<event-name>
(例如,“mobility.payments.suspicious”)
某些信息通常不应放在主题名称中,例如可能会随时间变化的信息(例如,目标使用者的名称)或其他地方可用的技术细节或元数据(例如,主题的分区计数和其他配置设置)。
要强制实施主题命名结构,可以使用以下几个选项:
- 使用前缀 ACL(参见 KIP-290)为主题名称强制实施通用前缀。例如,可能只允许团队 A 创建名称以 .
payments.teamA.
- 定义自定义 (参见 KIP-108 和设置 create.topic.policy.class.name) 以强制实施严格的命名模式。这些策略提供了最大的灵活性,并且可以涵盖复杂的模式和规则以满足组织的需求。
CreateTopicPolicy
- 通过使用 ACL 拒绝为普通用户禁用主题创建,然后依靠外部进程代表用户创建主题(例如,脚本或您最喜欢的自动化工具包)。
- 通过在代理配置中设置来禁用 Kafka 功能以按需自动创建主题也可能很有用。请注意,您不应仅依赖此选项。
auto.create.topics.enable=false
配置主题:数据保留等
Kafka 的配置由于其细粒度而非常灵活,并且它支持大量的每个主题的配置设置,以帮助管理员设置多租户集群。例如,管理员通常需要定义数据保留策略,以控制主题中存储的数据量和/或存储时间,包括 retention.bytes(大小)和 retention.ms(time)等设置。这限制了集群内的存储消耗,并有助于遵守 GDPR 等法律要求。
保护集群和主题:身份验证、授权、加密
由于该文档有一章专门介绍适用于任何 Kafka 部署的安全性,因此本节重点介绍多租户环境的其他注意事项。
Kafka 的安全设置分为三大类,类似于管理员保护其他客户端-服务器数据系统(如关系数据库和传统消息传递系统)的方式。
- 对 Kafka 代理和 Kafka 客户端之间、代理之间、代理和 ZooKeeper 节点之间以及代理和其他可选工具之间传输的数据进行加密。
- 对从 Kafka 客户端和应用程序到 Kafka 代理的连接,以及从 Kafka 代理到 ZooKeeper 节点的连接进行身份验证。
- 授权客户端作,例如创建、删除和更改主题的配置;将事件写入主题或从主题读取事件;创建和删除 ACL。管理员还可以定义自定义策略以实施其他限制,例如 和 (请参阅 KIP-108 和设置 create.topic.policy.class.name alter.config.policy.class.name)。
CreateTopicPolicy
AlterConfigPolicy
在保护多租户 Kafka 环境时,最常见的管理任务是第三类(授权),即管理用户/客户端权限,这些权限授予或拒绝对某些主题的访问,从而授予或拒绝对集群中用户存储的数据的访问。此任务主要通过设置访问控制列表 (ACL) 来执行。在这里,多租户环境的管理员尤其受益于上一节中描述的分层主题命名结构,因为他们可以通过带前缀的 ACL () 方便地控制对主题的访问。这大大减少了在多租户环境中保护主题的管理开销:管理员可以在更高的开发人员便利性(更宽松的权限、使用更少和更广泛的 ACL)和更严格的安全性(更严格的权限、使用更多和更窄的 ACL)之间做出权衡。--resource-pattern-type Prefixed
在以下示例中,用户 Alice(ACME 公司 InfoSec 团队的新成员)被授予对名称以“acme.infosec.”开头的所有主题的写入权限,例如“acme.infosec.telemetry.logins”和“acme.infosec.syslogs.events”。
# Grant permissions to user Alice
$ bin/kafka-acls.sh \
--bootstrap-server broker1:9092 \
--add --allow-principal User:Alice \
--producer \
--resource-pattern-type prefixed --topic acme.infosec.
同样,您可以使用此方法将不同客户隔离在同一个共享集群上。
隔离租户:配额、速率限制、限制
多租户集群通常应配置配额,以防止用户(租户)占用过多的集群资源,例如,当他们尝试写入或读取非常大量的数据时,或者以过高的速率向代理创建请求时。这可能会导致网络饱和、垄断代理资源并影响其他客户端 — 所有这些都是您希望在共享环境中避免的。
客户端配额:Kafka 支持不同类型的 (每用户主体) 客户端配额。由于无论客户端写入或读取哪个主题,客户端的配额都适用,因此它们是在多租户集群中分配资源的便捷有效的工具。例如,请求速率配额通过限制代理在该用户的请求处理路径上花费的时间来帮助限制用户对代理 CPU 使用率的影响,之后将开始限制。在许多情况下,在多租户集群中,使用请求速率配额隔离用户比设置传入/传出网络带宽配额的影响更大,因为处理请求时代理 CPU 使用率过高会减少代理可以提供的有效带宽。此外,管理员还可以定义主题作(例如 create、delete 和 alter)的配额,以防止 Kafka 集群被高度并发的主题作淹没(请参阅 KIP-599 和 quota type )。controller_mutation_rate
服务器配额:Kafka 还支持不同类型的代理端配额。例如,管理员可以设置代理接受新连接的速率限制,设置每个代理的最大连接数,或设置允许来自特定 IP 地址的最大连接数。
监控和计量
监控是一个更广泛的主题,在文档的其他部分进行了介绍。任何 Kafka 环境(尤其是多租户环境)的管理员都应根据这些说明设置监控。Kafka 支持广泛的指标,例如身份验证尝试失败率、请求延迟、使用者滞后、使用者组总数、上一节中描述的配额指标等等。
例如,可以将监控配置为跟踪主题分区的大小(使用 JMX metric ),从而跟踪主题中存储的数据的总大小。然后,您可以定义当共享集群上的租户即将使用过多存储空间时发出警报。kafka.log.Log.Size.<TOPIC-NAME>
多租户和异地复制
Kafka 允许您在不同的集群之间共享数据,这些集群可能位于不同的地理区域、数据中心等。除了灾难恢复等使用案例外,当多租户设置需要集群间数据共享时,此功能也很有用。有关更多信息,请参阅异地复制(跨集群数据镜像)部分。
进一步的考虑
数据协定:您可能需要使用事件架构在集群中数据的创建者和使用者之间定义数据协定。这可确保始终可以再次正确读取写入 Kafka 的事件,并防止写入格式错误或损坏的事件。实现此目的的最佳方法是在集群旁边部署一个所谓的 schema registry。(Kafka 不包含架构注册表,但有可用的第三方实现。架构注册表管理事件架构并将架构映射到主题,以便创建者知道哪些主题正在接受哪些类型的事件(架构),并且使用者知道如何读取和解析主题中的事件。某些注册表实现提供更多功能,例如架构演变、存储所有架构的历史记录以及架构兼容性设置。
6.5 Kafka 配置
重要的客户端配置
最重要的生产者配置是:- ACK 系列
- 压缩
- 批量大小
所有配置都记录在 configuration 部分。
生产服务器配置
下面是一个生产服务器配置示例:我们的客户端配置在不同用例之间有相当大的差异。 # ZooKeeper
zookeeper.connect=[list of ZooKeeper servers]
# Log configuration
num.partitions=8
default.replication.factor=3
log.dir=[List of directories. Kafka should have its own dedicated disk(s) or SSD(s).]
# Other configurations
broker.id=[An integer. Start with 0 and increment by 1 for each new broker.]
listeners=[list of listeners]
auto.create.topics.enable=false
min.insync.replicas=2
queued.max.requests=[number of concurrent requests]
6.6 Java 版本
支持 Java 8、Java 11 和 Java 17。请注意,Java 8 支持自 Apache Kafka 3.0 起已弃用,并将在 Apache Kafka 4.0 中删除。 如果启用了 TLS,Java 11 及更高版本的性能会明显更好,因此强烈建议使用它们(它们还包括许多其他 性能改进:G1GC、CRC32C、Compact Strings、Thread-Local Handshakes 等)。 从安全角度来看,我们建议使用最新发布的补丁版本,因为较旧的免费版本已披露安全漏洞。 使用基于 OpenJDK 的 Java 实现(包括 Oracle JDK)运行 Kafka 的典型参数是:作为参考,以下是使用上述 Java 参数的 LinkedIn 最繁忙的集群之一(高峰期)的统计数据: -Xmx6g -Xms6g -XX:MetaspaceSize=96m -XX:+UseG1GC
-XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M
-XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80 -XX:+ExplicitGCInvokesConcurrent
- 60 个经纪人
- 50k 个分区(复制因子 2)
- 800k 消息/秒
- 300 MB/秒入站,1 GB/秒 + 出站
6.7 硬件和作系统
我们使用的是具有 24GB 内存的双四核 Intel Xeon 计算机。您需要足够的内存来缓冲活动的读取器和写入器。您可以对内存需求进行粗略估计,方法是假设您希望能够缓冲 30 秒,并将内存需求计算为 write_throughput*30。
磁盘吞吐量很重要。我们有 8x7200 rpm SATA 驱动器。一般来说,磁盘吞吐量是性能瓶颈,磁盘越多越好。根据您配置刷新行为的方式,您可能会也可能不会从更昂贵的磁盘中受益(如果您经常强制刷新,则 RPM 较高的 SAS 驱动器可能会更好)。
操作系统
Kafka 应该可以在任何 unix 系统上运行良好,并且已经在 Linux 和 Solaris 上进行了测试。我们已经看到在 Windows 上运行时存在一些问题,而 Windows 目前不是一个得到良好支持的平台,但我们很乐意改变这种情况。
它不太可能需要太多的 OS 级调整,但有三种可能重要的 OS 级配置:
- 文件描述符限制:Kafka 对日志段和打开的连接使用文件描述符。如果代理托管许多分区,则除了代理建立的连接数外,请考虑代理至少需要 (number_of_partitions)*(partition_size/segment_size) 来跟踪所有日志段。我们建议代理进程至少允许 100000 个文件描述符作为起点。注: mmap() 函数会添加一个额外的引用,该引用将与文件描述符 fildes 相关联,该文件描述符上的后续 close() 不会删除该引用。当没有更多到文件的映射时,将删除此引用。
- 最大套接字缓冲区大小:可以增加以实现数据中心之间的高性能数据传输,如此处所述。
- 进程可以具有的最大内存映射区域数(也称为 vm.max_map_count)。请参阅 Linux 内核文档。在考虑 broker 可能具有的最大分区数时,您应该密切关注此 OS 级属性。默认情况下,在许多 Linux 系统上,vm.max_map_count 的值大约在 65535 左右。每个 log segment(按分区分配)需要一对 index/timeindex 文件,每个文件占用 1 个 map 区域。换句话说,每个日志段使用 2 个地图区域。因此,每个分区至少需要 2 个 map 区域,只要它托管一个日志段即可。也就是说,在代理上创建 50000 个分区将导致分配 100000 个映射区域,并可能导致代理崩溃,并在默认为 vm.max_map_count 的系统上出现 OutOfMemoryError(映射失败)。请记住,每个分区的日志段数因段大小、负载强度、保留策略而异,并且通常往往不止一个。
磁盘和文件系统
我们建议使用多个驱动器以获得良好的吞吐量,并且不要将用于 Kafka 数据的相同驱动器与应用程序日志或其他作系统文件系统活动共享,以确保良好的延迟。您可以将这些驱动器一起 RAID 到一个卷中,也可以将每个驱动器格式化并挂载为自己的目录。由于 Kafka 具有复制功能,因此 RAID 提供的冗余也可以在应用程序级别提供。此选择有几个权衡。如果配置多个数据目录,则分区将按循环方式分配给数据目录。每个分区将完全位于其中一个数据目录中。如果数据在分区之间没有很好地平衡,这可能会导致磁盘之间的负载不平衡。
RAID 在平衡磁盘之间的负载方面可能做得更好(尽管它似乎并不总是这样),因为它在较低级别平衡负载。RAID 的主要缺点是它通常会对写入吞吐量造成很大的性能影响,并会减少可用磁盘空间。
RAID 的另一个潜在好处是能够容忍磁盘故障。然而,我们的经验是,重建 RAID 阵列是如此密集的 I/O ,以至于它实际上会禁用服务器,因此这并没有提供太多真正的可用性改进。
应用程序与作系统刷新管理
Kafka 始终立即将所有数据写入文件系统,并支持配置 flush 策略的功能,该策略控制何时使用 flush 将数据从作系统缓存中强制出并放到磁盘上。可以控制此 flush 策略,以在一段时间后或写入一定数量的消息后强制数据到磁盘。此配置中有多种选择。Kafka 最终必须调用 fsync 才能知道数据已刷新。当从任何未知为 fsync 的日志段的崩溃中恢复时,Kafka 将通过检查其 CRC 来检查每条消息的完整性,并在启动时执行的恢复过程中重建随附的偏移索引文件。
请注意,Kafka 中的持久性不需要将数据同步到磁盘,因为故障节点将始终从其副本中恢复。
我们建议使用完全禁用应用程序 fsync 的默认 flush 设置。这意味着依赖于 OS 完成的后台 flush 和 Kafka 自己的后台 flush。这为大多数用途提供了最好的功能:无需调整旋钮、出色的吞吐量和延迟以及完整的恢复保证。我们通常认为复制提供的保证比同步到本地磁盘更强大,但是偏执狂可能仍然更喜欢两者兼而有之,并且仍然支持应用程序级别的 fsync 策略。
使用应用程序级 flush 设置的缺点是,它的磁盘使用模式效率较低(它给 OS 重新排序写入的余地较小),并且它可能会引入延迟,因为大多数 Linux 文件系统中的 fsync 会阻止对文件的写入,而后台 flush 会执行更精细的页面级锁定。
一般来说,你不需要对文件系统进行任何低级的调整,但在接下来的几节中,我们将介绍其中的一些内容,以防它有用。
了解 Linux OS 刷新行为
在 Linux 中,写入文件系统的数据将保留在页面缓存中,直到必须将其写出到磁盘(由于应用程序级 fsync 或作系统自己的 flush 策略)。数据的刷新由一组称为 pdflush 的后台线程完成(或在 2.6.32 之后的内核中称为“刷新线程”)。Pdflush 有一个可配置的策略,该策略控制缓存中可以保留多少脏数据,以及必须将其写回磁盘之前保留多长时间。 此处介绍了此策略。 当 Pdflush 无法跟上写入数据的速度时,它最终会导致写入过程阻塞,从而在写入中产生延迟,从而减慢数据的积累。
您可以通过执行
> cat /proc/meminfo这些值的含义在上面的链接中进行了描述。
与进程内高速缓存相比,使用 pagecache 存储将要写入磁盘的数据具有几个优点:
- I/O 计划程序会将连续的小写入批处理到较大的物理写入中,从而提高吞吐量。
- I/O 计划程序将尝试对写入重新排序,以最大限度地减少磁盘头的移动,从而提高吞吐量。
- 它会自动使用计算机上的所有可用内存
文件系统选择
Kafka 使用磁盘上的常规文件,因此它对特定文件系统没有硬性依赖性。但是,使用率最高的两个文件系统是 EXT4 和 XFS。从历史上看,EXT4 的使用量更大,但最近对 XFS 文件系统的改进表明,它对 Kafka 的工作负载具有更好的性能特征,而不会影响稳定性。
使用各种文件系统创建和挂载选项,在具有大量消息负载的集群上执行比较测试。Kafka 中受监控的主要指标是“请求本地时间”,表示追加作所花费的时间。XFS 带来了更好的本地时间(160 毫秒,而最佳 EXT4 配置为 250 毫秒+),以及更短的平均等待时间。XFS 性能还显示磁盘性能的可变性较小。
一般文件系统说明
对于用于数据目录的任何文件系统,在 Linux 系统上,建议在挂载时使用以下选项:- noatime:此选项在读取文件时禁用文件的 atime(上次访问时间)属性的更新。这可以消除大量的文件系统写入,尤其是在引导使用者的情况下。Kafka 根本不依赖 atime 属性,因此禁用此功能是安全的。
XFS 笔记
XFS 文件系统具有大量的自动调整功能,因此无论是在创建文件系统时还是在挂载时,都不需要对默认设置进行任何更改。唯一值得考虑的优化参数是:- largeio:这会影响 stat 调用报告的首选 I/O 大小。虽然这可以在较大的磁盘写入上实现更高的性能,但实际上它对性能的影响很小或没有影响。
- nobarrier:对于具有电池供电缓存的底层设备,此选项可以通过禁用定期写入刷新来提供更高的性能。但是,如果底层设备运行良好,它将向文件系统报告它不需要 flush,并且此选项将不起作用。
EXT4 注释
EXT4 是 Kafka 数据目录的文件系统的可用选择,但是要充分利用它,需要调整多个挂载选项。此外,这些选项在故障情况下通常是不安全的,并且会导致更多的数据丢失和损坏。对于单个代理故障,这并不是什么大问题,因为可以擦除磁盘并从集群重建副本。在多次故障的情况下,例如停电,这可能意味着底层文件系统(以及数据)损坏,不容易恢复。可以调整以下选项:- data=writeback:Ext4 默认为 data=ordered,这对某些写入有很强的排序。Kafka 不需要这种排序,因为它对所有未刷新的日志进行非常偏执的数据恢复。此设置消除了排序约束,并且似乎显著降低了延迟。
- 禁用日记功能:日记功能是一种权衡:它可以在服务器崩溃后更快地重启,但它引入了大量额外的锁定,这增加了写入性能的差异。那些不关心重启时间并希望减少写入延迟峰值的主要来源的人可以完全关闭日志功能。
- commit=num_secs:这将调整 ext4 提交到其元数据日志的频率。将此值设置为较低的值可以减少崩溃期间未刷新数据的丢失。将此值设置为较高的值将提高吞吐量。
- nobh:此设置控制使用 data=writeback 模式时的其他 Sequences 保证。这对于 Kafka 来说应该是安全的,因为我们不依赖于写入顺序,并且提高了吞吐量和延迟。
- delalloc:延迟分配意味着文件系统在物理写入发生之前避免分配任何块。这允许 ext4 分配较大的范围而不是较小的页面,并有助于确保按顺序写入数据。此功能非常适合吞吐量。它似乎确实涉及文件系统中的一些锁定,这增加了一些延迟变化。
更换 KRaft 控制器磁盘
当 Kafka 配置为使用 KRaft 时,控制器将集群元数据存储在 -- 或第一个日志目录(如果未配置)中指定的目录中。有关详细信息,请参阅文档。metadata.log.dir
metadata.log.dir
metadata.log.dir
如果集群元数据目录中的数据由于硬件故障或需要更换硬件而丢失,则在配置新控制器节点时应小心。在大多数控制器具有所有提交的数据之前,不应格式化和启动新的 controller 节点。要确定大多数控制器是否具有已提交的数据,请运行该工具以描述复制状态:kafka-metadata-quorum.sh
> bin/kafka-metadata-quorum.sh --bootstrap-server broker_host:port describe --replication
NodeId LogEndOffset Lag LastFetchTimestamp LastCaughtUpTimestamp Status
1 25806 0 1662500992757 1662500992757 Leader
... ... ... ... ... ...
检查并等待,直到大多数控制器的 都很小。如果导引的末端偏移量没有增加,则可以等到多数的滞后为 0;否则,您可以选择最新的 leader end offset 并等待所有副本都到达它。检查并等待 和 对于大多数控制器来说彼此靠近。此时,格式化控制器的元数据日志目录会更安全。这可以通过运行命令来完成。Lag
LastFetchTimestamp
LastCaughtUpTimestamp
kafka-storage.sh
> bin/kafka-storage.sh format --cluster-id uuid --config server_properties
上述命令可能会失败,并显示类似 .当使用组合模式并且仅丢失元数据日志目录而其他目录未丢失时,可能会发生这种情况。在这种情况下,您才能使用选项运行命令。bin/kafka-storage.sh format
Log directory ... is already formatted
kafka-storage.sh format
--ignore-formatted
格式化日志目录后启动 KRaft 控制器。
> /bin/kafka-server-start.sh server_properties
6.8 监控
Kafka 使用 Yammer Metrics 在服务器中报告指标。Java 客户端使用 Kafka Metrics,这是一个内置的指标注册表,可最大限度地减少提取到客户端应用程序中的传递依赖项。两者都通过 JMX 公开指标,并且可以配置为使用可插拔的统计报告器来报告统计数据,以连接到您的监控系统。所有 Kafka 速率指标都有相应的 Cumulative count 指标,其 suffix .例如,具有名为 .-total
records-consumed-rate
records-consumed-total
查看可用指标的最简单方法是启动 jconsole 并将其指向正在运行的 kafka 客户端或服务器;这将允许使用 JMX 浏览所有指标。
使用 JMX 进行远程监控的安全注意事项
Apache Kafka 默认禁用远程 JMX。您可以通过为使用 CLI 或标准 Java 系统属性启动的进程设置环境变量来启用远程 JMX,从而使用 JMX 启用远程监控。 在生产场景下开启远程 JMX 时,必须开启安全性,以确保未经授权的用户无法监控或 控制您的代理或应用程序以及运行它们的平台。请注意,对 默认情况下,Kafka 和安全配置必须通过为使用 CLI 启动的进程设置环境变量或设置适当的 Java 系统属性来覆盖 JMX,以便进行生产部署。有关保护 JMX 的详细信息,请参阅使用 JMX 技术进行监控和管理。JMX_PORT
KAFKA_JMX_OPTS
我们对以下指标进行绘图和警报:
描述 | Mbean 名称 | 正常值 |
---|---|---|
消息输入率 | kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec,topic=([-.\w]+) | 每个主题的传入消息速率。省略 'topic=(...)' 将产生全主题速率。 |
来自客户端的字节传入速率 | kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec,topic=([-.\w]+) | 每个主题的字节输入(来自客户端)速率。省略 'topic=(...)' 将产生全主题速率。 |
来自其他经纪商的字节输入速率 | kafka.server:type=BrokerTopicMetrics,name=ReplicationBytesInPerSec | 所有主题的字节输入(来自其他代理)率。 |
来自 Broker 的 Controller Request rate | kafka.controller:type=ControllerChannelManager,name=RequestRateAndQueueTimeMs,brokerId=([0-9]+) | ControllerChannelManager 从 queue 的 broker 中。以及请求在此之前保持在此队列中所需的时间 它是从队列中获取的。 |
控制器事件队列大小 | kafka.controller:type=ControllerEventManager,name=EventQueueSize | ControllerEventManager 队列的大小。 |
控制器事件队列时间 | kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs | 任何事件(Idle 事件除外)在 ControllerEventManager 的 queue before being processed |
请求速率 | kafka.network:type=RequestMetrics,name=RequestsPerSec,request={Produce|FetchConsumer|FetchFollower},版本=([0-9]+) | |
错误率 | kafka.network:type=RequestMetrics,name=ErrorsPerSec,request=([-.\w]+),error=([-.\w]+) | 按请求类型、每个错误代码计数的响应中的错误数。如果响应包含 多个错误,所有错误都被计算在内。error=NONE 表示响应成功。 |
产品请求率 | kafka.server:type=BrokerTopicMetrics,name=TotalProduceRequestsPerSec,topic=([-.\w]+) | 每个主题的生成请求速率。省略 'topic=(...)' 将产生全主题速率。 |
Fetch 请求速率 | kafka.server:type=BrokerTopicMetrics,name=TotalFetchRequestsPerSec,topic=([-.\w]+) | 每个主题的 Fetch request(来自客户或关注者)费率。省略 'topic=(...)' 将产生全主题速率。 |
失败的农产品请求率 | kafka.server:type=BrokerTopicMetrics,name=FailedProduceRequestsPerSec,topic=([-.\w]+) | 失败 每个主题的 Produce 请求速率。省略 'topic=(...)' 将产生全主题速率。 |
失败的获取请求速率 | kafka.server:type=BrokerTopicMetrics,name=FailedFetchRequestsPerSec,topic=([-.\w]+) | 每个主题的 Fetch 请求失败率(来自客户端或关注者)。省略 'topic=(...)' 将产生全主题速率。 |
请求大小(以字节为单位) | kafka.network:type=RequestMetrics,name=RequestBytes,request=([-.\w]+) | 每种请求类型的请求大小。 |
临时内存大小(以字节为单位) | kafka.network:type=RequestMetrics,name=TemporaryMemoryBytes,request={Produce|获取} | 用于消息格式转换和解压缩的临时内存。 |
消息转换时间 | kafka.network:type=RequestMetrics,name=MessageConversionsTimeMs,request={Produce|获取} | 消息格式转换所花费的时间(以毫秒为单位)。 |
消息转化率 | kafka.server:type=BrokerTopicMetrics,name={生产|Fetch}MessageConversionsPerSec,topic=([-.\w]+) | 每个主题的 Produce 或 Fetch 请求的消息格式转换率。省略 'topic=(...)' 将产生全主题速率。 |
请求队列大小 | kafka.network:type=RequestChannel,name=RequestQueueSize | 请求队列的大小。 |
客户端的字节输出速率 | kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec,topic=([-.\w]+) | 每个主题的字节输出(到客户端)速率。省略 'topic=(...)' 将产生全主题速率。 |
向其他代理发送字节输出速率 | kafka.server:type=BrokerTopicMetrics,name=ReplicationBytesOutPerSec | 所有主题的 Byte out(到其他代理)速率 |
被拒绝的字节率 | kafka.server:type=BrokerTopicMetrics,name=BytesRejectedPerSec,topic=([-.\w]+) | 由于记录批处理大小大于 max.message.bytes 配置,每个主题的字节率被拒绝。省略 'topic=(...)' 将产生全主题速率。 |
由于没有为压缩主题指定 key 而导致的消息验证失败率 | kafka.server:type=BrokerTopicMetrics,name=NoKeyCompactedTopicRecordsPerSec | 0 |
由于幻数无效而导致的消息验证失败率 | kafka.server:type=BrokerTopicMetrics,name=InvalidMagicNumberRecordsPerSec | 0 |
由于 crc 校验和不正确而导致的消息验证失败率 | kafka.server:type=BrokerTopicMetrics,name=InvalidMessageCrcRecordsPerSec | 0 |
由于批量中的非连续偏移量或序列号而导致的消息验证失败率 | kafka.server:type=BrokerTopicMetrics,name=InvalidOffsetOrSequenceRecordsPerSec | 0 |
日志刷新率和时间 | kafka.log:type=LogFlushStats,name=LogFlushRateAndTimeMs | |
# 离线日志目录 | kafka.log:type=LogManager,name=OfflineLogDirectoryCount | 0 |
领导选举率 | kafka.controller:type=ControllerStats,name=LeaderElectionRateAndTimeMs | 当存在 Broker 故障时为非零 |
不洁的领导人选举率 | kafka.controller:type=ControllerStats,name=UncleanLeaderElectionsPerSec | 0 |
控制器在 broker 上处于活动状态 | kafka.controller:type=KafkaController,name=ActiveControllerCount | 群集中只有一个代理应具有 1 |
待处理的主题删除 | kafka.controller:type=KafkaController,name=TopicsToDeleteCount | |
待处理副本删除 | kafka.controller:type=KafkaController,name=ReplicasToDeleteCount | |
不合格的待处理主题删除 | kafka.controller:type=KafkaController,name=TopicsIneligibleToDeleteCount | |
不合格的待处理副本删除 | kafka.controller:type=KafkaController,name=ReplicasIneligibleToDeleteCount | |
# 个复制的分区数 (|ISR|< 所有副本 | kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions | 0 |
# of under minIsr 分区 (|ISR|< min.insync.replicas) | kafka.server:type=ReplicaManager,name=UnderMinIsrPartitionCount | 0 |
# of at minIsr 分区 (|ISR|= min.insync.replicas) | kafka.server:type=ReplicaManager,name=AtMinIsrPartitionCount | 0 |
生产者 ID 计数 | kafka.server:type=ReplicaManager,name=ProducerIdCount | 代理上每个副本中事务性和幂等生产者创建的所有生产者 ID 的计数 |
分区计数 | kafka.server:type=ReplicaManager,name=PartitionCount | 大多数经纪商 |
离线副本计数 | kafka.server:type=ReplicaManager,name=OfflineReplicaCount | 0 |
Leader 副本计数 | kafka.server:type=ReplicaManager,name=LeaderCount | 大多数经纪商 |
ISR 收缩率 | kafka.server:type=ReplicaManager,name=IsrShrinksPerSec | 如果代理宕机,某些分区的 ISR 将 收缩。当该代理再次启动时,ISR 将扩展 一旦副本完全赶上。除此之外, ISR 收缩率和扩展率的预期值为 0。 |
ISR 扩展率 | kafka.server:type=ReplicaManager,name=IsrExpandsPerSec | 见上文 |
失败的 ISR 更新速率 | kafka.server:type=ReplicaManager,name=FailedIsrUpdatesPerSec | 0 |
消息中的最大滞后时间 btw follower 和 leader 副本 | kafka.server:type=ReplicaFetcherManager,name=MaxLag,clientId=Replica | LAG 应与 produce 请求的最大批量大小成正比。 |
每个关注者副本的消息滞后 | kafka.server:type=FetcherLagMetrics,name=ConsumerLag,clientId=([-.\w]+),topic=([-.\w]+),partition=([0-9]+) | LAG 应与 produce 请求的最大批量大小成正比。 |
在生产者炼狱中等待的请求 | kafka.server:type=DelayedOperationPurgatory,name=PurgatorySize,delayedOperation=Produce | 如果使用 ack=-1,则为非零 |
在 fetch 炼狱中等待的请求 | kafka.server:type=DelayedOperationPurgatory,name=PurgatorySize,delayedOperation=Fetch | 大小取决于使用者中的 fetch.wait.max.ms |
请求总时间 | kafka.network:type=RequestMetrics,name=TotalTimeMs,request={Produce|FetchConsumer|FetchFollower} | 分为队列、本地、远程和响应发送时间 |
请求在请求队列中等待的时间 | kafka.network:type=RequestMetrics,name=RequestQueueTimeMs,request={Produce|FetchConsumer|FetchFollower} | |
在 leader 处处理请求的时间 | kafka.network:type=RequestMetrics,name=LocalTimeMs,request={Produce|FetchConsumer|FetchFollower} | |
请求等待 follower 的时间 | kafka.network:type=RequestMetrics,name=RemoteTimeMs,request={Produce|FetchConsumer|FetchFollower} | 当 ack=-1 时,produce 请求为非零 |
请求在响应队列中等待的时间 | kafka.network:type=RequestMetrics,name=ResponseQueueTimeMs,request={Produce|FetchConsumer|FetchFollower} | |
发送响应的时间 | kafka.network:type=RequestMetrics,name=ResponseSendTimeMs,request={Produce|FetchConsumer|FetchFollower} | |
使用者落后于生成者的消息数。由使用者发布,而不是代理发布。 | kafka.consumer:type=consumer-fetch-manager-metrics,client-id={client-id} 属性:records-lag-max | |
网络处理器空闲的平均时间分数 | kafka.network:type=SocketServer,name=NetworkProcessorAvgIdlePercent | 介于 0 和 1 之间,理想情况下> 0.3 |
由于客户端未重新验证,然后在过期时间之后将连接用于除重新验证以外的任何作而在处理器上断开连接的连接数 | kafka.server:type=socket-server-metrics,listener=[SASL_PLAINTEXT|SASL_SSL],networkProcessor=<#>,name=expired-connections-killed-count | 理想情况下,当启用重新身份验证时为 0,这意味着不再有任何较旧的 2.2.0 之前的客户端连接到此(侦听器、处理器)组合 |
由于客户端未重新验证,然后在过期时间之后将连接用于除重新验证以外的任何作而在所有处理器中断开连接的连接总数 | kafka.network:type=SocketServer,name=ExpiredConnectionsKilledCount | 理想情况下,当启用重新身份验证时为 0,这意味着不再有任何较旧的 2.2.0 之前的客户端连接到此代理 |
请求处理程序线程空闲的平均时间分数 | kafka.server:type=KafkaRequestHandlerPool,name=RequestHandlerAvgIdlePercent | 介于 0 和 1 之间,理想情况下> 0.3 |
每个 (user, client-id)、user 或 client-id 的带宽配额指标 | kafka.server:type={生产|获取},user=([-.\w]+),client-id=([-.\w]+) | 两个属性。throttle-time 指示客户端受到限制的时间量(以毫秒为单位)。理想情况下 = 0。 byte-rate 表示客户端的数据生成/消耗速率(以字节/秒为单位)。 对于 (user, client-id) 配额,同时指定了 user 和 client-id。如果将 per-client-id 配额应用于客户端,则不会指定 user。如果应用了每用户配额,则不指定 client-id。 |
每个 (user, client-id)、user 或 client-id 的请求配额指标 | kafka.server:type=Request,user=([-.\w]+),client-id=([-.\w]+) | 两个属性。throttle-time 指示客户端受到限制的时间量(以毫秒为单位)。理想情况下 = 0。 request-time 表示在 broker 网络和 I/O 线程中处理来自客户端组的请求所花费的时间百分比。 对于 (user, client-id) 配额,同时指定了 user 和 client-id。如果将 per-client-id 配额应用于客户端,则不会指定 user。如果应用了每用户配额,则不指定 client-id。 |
免于限制的请求 | kafka.server:type=请求 | exempt-throttle-time 表示 broker 网络和 I/O 线程处理请求所花费的时间百分比 不受限制。 |
ZooKeeper 客户端请求延迟 | kafka.server:type=ZooKeeperClientMetrics,name=ZooKeeperRequestLatencyMs | 来自代理的 ZooKeeper 请求的延迟(以毫秒为单位)。 |
ZooKeeper 连接状态 | kafka.server:type=SessionExpireListener,name=SessionState | 代理的 ZooKeeper 会话的连接状态,可能是以下之一 已断开连接 |SyncConnected|AuthFailed|ConnectedReadOnly|SaslAuthenticated|过期。 |
加载组元数据的最大时间 | kafka.server:type=group-coordinator-metrics,name=partition-load-time-max | 在过去 30 秒内加载的使用者偏移分区中加载偏移量和组元数据所用的最长时间(以毫秒为单位)(包括等待计划加载任务所花费的时间) |
加载组元数据的平均时间 | kafka.server:type=group-coordinator-metrics,name=partition-load-time-avg | 在过去 30 秒内加载的使用者偏移分区中加载偏移量和组元数据所花费的平均时间(以毫秒为单位)(包括等待计划加载任务所花费的时间) |
加载事务元数据的最大时间 | kafka.server:type=transaction-coordinator-metrics,name=partition-load-time-max | 从过去 30 秒内加载的使用者偏移分区加载事务元数据所花费的最长时间(以毫秒为单位)(包括等待计划加载任务所花费的时间) |
加载交易元数据的平均时间 | kafka.server:类型=事务协调器指标,名称=分区加载时间-avg | 从过去 30 秒内加载的使用者偏移分区加载事务元数据所花费的平均时间(以毫秒为单位)(包括等待计划加载任务所花费的时间) |
交易验证错误率 | kafka.server:type=AddPartitionsToTxnManager,name=VerificationFailureRate | 从 AddPartitionsToTxn API 响应或通过 AddPartitionsToTxnManager 中的错误返回失败的验证速率。处于稳定状态 0,但在事务状态分区的滚动和重新分配期间预计会出现暂时性错误。 |
验证事务请求的时间 | kafka.server:type=AddPartitionsToTxnManager,name=VerificationTimeMs | 当可能的先前请求正在进行时排队的时间量,加上到事务协调器进行验证(或不验证)的往返行程 |
Consumer Group 偏移计数 | kafka.server:type=GroupMetadataManager,name=NumOffsets | 使用者组的已提交偏移量总数 |
使用者组计数 | kafka.server:type=GroupMetadataManager,name=NumGroups | 消费组总数 |
每个状态的使用者组计数 | kafka.server:type=GroupMetadataManager,name=NumGroups[PreparingRebalance,CompletingRebalance,Empty,Stable,Dead] | 每种状态下的 Consumer Group 数量:PreparingRebalance、CompletingRebalance、Empty、Stable、Dead |
重新分配分区的数量 | kafka.server:type=ReplicaManager,name=重新分配分区 | 代理上重新分配领导分区的数量。 |
重新分配流量的传出字节速率 | kafka.server:type=BrokerTopicMetrics,name=ReassignmentBytesOutPerSec | 0;当分区重新分配正在进行时,为非零。 |
重新分配流量的传入字节速率 | kafka.server:type=BrokerTopicMetrics,name=ReassignmentBytesInPerSec | 0;当分区重新分配正在进行时,为非零。 |
磁盘上分区的大小(以字节为单位) | kafka.log:type=Log,name=大小,topic=([-.\w]+),partition=([0-9]+) | 磁盘上分区的大小,以字节为单位。 |
分区中的日志段数 | kafka.log:type=Log,name=NumLogSegments,topic=([-.\w]+),partition=([0-9]+) | 分区中的日志段数。 |
分区中的第一个偏移量 | kafka.log:type=Log,name=LogStartOffset,topic=([-.\w]+),partition=([0-9]+) | 分区中的第一个偏移量。 |
分区中的最后一个偏移量 | kafka.log:type=Log,name=LogEndOffset,topic=([-.\w]+),partition=([0-9]+) | 分区中的最后一个偏移量。 |
分层存储监控
以下指标集可用于监控分层存储功能:量度/属性名称 | 描述 | Mbean 名称 |
---|---|---|
每秒远程获取字节数 | 每个主题从远程存储读取的字节数的速率。省略 'topic=(...)' 将产生全主题速率 | kafka.server:type=BrokerTopicMetrics,name=RemoteFetchBytesPerSec,topic=([-.\w]+) |
每秒远程获取请求数 | 每个主题的远程存储读取请求速率。省略 'topic=(...)' 将产生全主题速率 | kafka.server:type=BrokerTopicMetrics,name=RemoteFetchRequestsPerSec,topic=([-.\w]+) |
每秒远程获取错误数 | 每个主题的远程存储读取错误率。省略 'topic=(...)' 将产生全主题速率 | kafka.server:type=BrokerTopicMetrics,name=RemoteFetchErrorsPerSec,topic=([-.\w]+) |
每秒远程复制字节数 | 每个主题复制到远程存储的字节数速率。省略 'topic=(...)' 将产生全主题速率 | kafka.server:type=BrokerTopicMetrics,name=RemoteCopyBytesPerSec,topic=([-.\w]+) |
每秒远程复制请求数 | 每个主题的远程存储写入请求速率。省略 'topic=(...)' 将产生全主题速率 | kafka.server:type=BrokerTopicMetrics,name=RemoteCopyRequestsPerSec,topic=([-.\w]+) |
每秒远程复制错误数 | 每个主题的远程存储写入错误率。省略 'topic=(...)' 将产生全主题速率 | kafka.server:type=BrokerTopicMetrics,name=RemoteCopyErrorsPerSec,topic=([-.\w]+) |
RemoteLogReader 任务队列大小 | 保存远程存储读取任务的队列大小 | org.apache.kafka.storage.internals.log:type=RemoteStorageThreadPool,name=RemoteLogReaderTaskQueueSize |
RemoteLogReader 平均空闲百分比 | 用于处理远程存储读取任务的线程池的平均空闲百分比 | org.apache.kafka.storage.internals.log:type=RemoteStorageThreadPool,name=RemoteLogReaderAvgIdlePercent |
RemoteLogManager 任务 Avg Idle Percent | 用于将数据复制到远程存储的线程池的平均空闲百分比 | kafka.log.remote:type=RemoteLogManager,name=RemoteLogManagerTasksAvgIdlePercent |
KRaft 监控指标
允许监控 KRaft 仲裁和元数据日志的指标集。请注意,某些公开的指标取决于节点的角色,如
process.roles
KRaft Quorum 监控指标
这些指标在 KRaft 集群中的 Controller 和 Broker 上报告量度/属性名称 | 描述 | Mbean 名称 |
---|---|---|
当前状态 | 此成员的当前状态;可能的值为 leader、candidate、voted、follower、unattached、observer。 | kafka.server:type=raft-metrics,name=当前状态 |
现任领导者 | 当前 quorum leader 的 id;-1 表示未知。 | kafka.server:type=raft-metrics,name=current-leader |
当前投票 | 当前投票领导的 ID;-1 表示未投票给任何人。 | kafka.server:type=raft-metrics,name=当前投票 |
当前纪元 | 当前 quorum 纪元。 | kafka.server:type=raft-metrics,name=当前纪元 |
高水位线 | 此成员上维护的高水印;如果未知,则为 -1。 | kafka.server:type=raft-metrics,name=高水印 |
Log End Offset | 当前 raft log end offset。 | kafka.server:type=raft-metrics,name=log-end-offset |
未知选民连接数 | 连接信息未缓存的未知选民数。此指标的此值始终为 0。 | kafka.server:type=raft-metrics,name=number-unknown-voter-connections |
平均提交延迟 | 在 raft log 中提交条目的平均时间(以毫秒为单位)。 | kafka.server:type=raft-metrics,name=commit-latency-avg |
最大提交延迟 | 在 raft log 中提交条目的最长时间(以毫秒为单位)。 | kafka.server:type=raft-metrics,name=commit-latency-max |
平均选举延迟 | 选举新领导者所花费的平均时间(以毫秒为单位)。 | kafka.server:type=raft-metrics,name=election-latency-avg |
最大选择延迟 | 选举新领导者所花费的最长时间(以毫秒为单位)。 | kafka.server:type=raft-metrics,name=election-latency-max |
获取记录速率 | 从 raft 仲裁的领导层获取的平均记录数。 | kafka.server:type=raft-metrics,name=fetch-records-rate |
追加记录率 | raft 仲裁的领导者每秒附加的平均记录数。 | kafka.server:type=raft-metrics,name=append-records-rate |
平均轮询空闲比率 | 客户端的 poll() 处于空闲状态的平均时间,而不是等待用户代码处理记录的时间。 | kafka.server:type=raft-metrics,name=poll-idle-ratio-avg |
当前元数据版本 | 输出当前有效元数据版本的要素级别。 | kafka.server:type=MetadataLoader,name=CurrentMetadataVersion |
元数据快照加载计数 | 自进程启动以来我们加载 KRaft 快照的总次数。 | kafka.server:type=MetadataLoader,name=HandleLoadSnapshotCount |
最新元数据快照大小 | 节点生成的最新快照的总大小(以字节为单位)。如果尚未生成任何 ID,则此大小为 加载的最新快照。如果未生成或加载任何快照,则为 0。 | kafka.server:type=SnapshotEmitter,name=LatestSnapshotGeneratedBytes |
最新元数据快照期限 | 自节点生成的最新快照以来的间隔(以毫秒为单位)。 如果尚未生成任何数据,则这大约是自进程启动以来的时间增量。 | kafka.server:type=SnapshotEmitter,name=LatestSnapshotGeneratedAgeMs |
KRaft 控制器监控指标
量度/属性名称 | 描述 | Mbean 名称 |
---|---|---|
主动控制器计数 | 此节点上的 Active Controller 数量。有效值为 '0' 或 '1'。 | kafka.controller:type=KafkaController,name=ActiveControllerCount |
事件队列时间 Ms | 请求在 Controller Event Queue 中等待所花费的时间(以毫秒为单位)的 Histogram。 | kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs |
事件队列处理时间 Ms | 控制器事件队列中处理请求所花费的时间(以毫秒为单位)的 Histogram。 | kafka.controller:type=ControllerEventManager,name=EventQueueProcessingTimeMs |
受防护的代理计数 | 此 Controller 观察到的受防护的 broker 的数量。 | kafka.controller:type=KafkaController,name=FencedBrokerCount |
活跃代理计数 | 此 Controller 观察到的活跃 broker 数量。 | kafka.controller:type=KafkaController,name=ActiveBrokerCount |
全局主题计数 | 此 Controller 观察到的全局主题数。 | kafka.controller:type=KafkaController,name=GlobalTopicCount |
全局分区计数 | 此 Controller 观察到的全局分区数。 | kafka.controller:type=KafkaController,name=GlobalPartitionCount |
脱机分区计数 | 此 Controller 观察到的离线主题分区 (非内部) 的数量。 | kafka.controller:type=KafkaController,name=OfflinePartitionCount |
首选副本不平衡计数 | 领导者不是首选领导者的主题分区的计数。 | kafka.controller:type=KafkaController,name=PreferredReplicaImbalanceCount |
元数据错误计数 | 此控制器节点在元数据日志处理期间遇到错误的次数。 | kafka.controller:type=KafkaController,name=MetadataErrorCount |
上次应用的记录偏移量 | Controller 应用的集群元数据分区中的最后一条记录的偏移量。 | kafka.controller:type=KafkaController,name=LastAppliedRecordOffset |
上次提交的记录偏移量 | 提交到此 Controller 的最后一条记录的偏移量。 | kafka.controller:type=KafkaController,name=LastCommittedRecordOffset |
上次应用的记录时间戳 | 控制器应用的集群元数据分区中最后一条记录的时间戳。 | kafka.controller:type=KafkaController,name=LastAppliedRecordTimestamp |
上次应用的记录滞后 ms | now 与控制器应用的集群元数据分区中最后一条记录的时间戳之间的差值。 对于主动控制器,此滞后的值始终为零。 | kafka.controller:type=KafkaController,name=LastAppliedRecordLagMs |
ZooKeeper 滞后写入 | ZooKeeper 相对于元数据日志中最高提交记录的滞后记录的滞后量。 此指标将仅由活动的 KRaft 控制器报告。 | kafka.controller:type=KafkaController,name=ZkWriteBehindLag |
ZooKeeper 元数据快照写入时间 | KRaft 控制器将快照协调到 ZooKeeper 所花费的毫秒数。 | kafka.controller:type=KafkaController,name=ZkWriteSnapshotTimeMs |
ZooKeeper 元数据增量写入时间 | KRaft 控制器将 delta 写入 ZK 所花费的毫秒数。 | kafka.controller:type=KafkaController,name=ZkWriteDeltaTimeMs |
超时的代理检测信号计数 | 自进程启动以来,此控制器上超时的代理检测信号数。请注意,只有 主动控制器处理检测信号,因此只有他们会看到此指标的增加。 | kafka.controller:type=KafkaController,name=TimedOutBrokerHeartbeatCount |
Event Queue 中已启动的作数 | 已启动的控制器事件队列作的总数。这包括延迟的作。 | kafka.controller:type=KafkaController,name=EventQueueOperationsStartedCount |
事件队列中超时的作数 | 在执行控制器事件队列作之前超时的总数。 | kafka.controller:type=KafkaController,name=EventQueueOperationsTimedOutCount |
新控制器选举数量 | 计算此节点看到新控制器当选的次数。过渡到 “no leader” 状态 不计入此处。如果与之前相同的控制器变为活动状态,则仍将计算在内。 | kafka.controller:type=KafkaController,name=NewActiveControllersCount |
KRaft Broker 监控指标
量度/属性名称 | 描述 | Mbean 名称 |
---|---|---|
上次应用的记录偏移量 | 代理应用的集群元数据分区中的最后一条记录的偏移量 | kafka.server:type=broker-metadata-metrics,name=last-applied-record-offset |
上次应用的记录时间戳 | 代理应用的集群元数据分区中最后一条记录的时间戳。 | kafka.server:type=broker-metadata-metrics,name=last-applied-record-timestamp |
上次应用的记录滞后 ms | now 与代理应用的集群元数据分区中最后一条记录的时间戳之间的差值 | kafka.server:type=broker-metadata-metrics,name=last-applied-record-lag-ms |
元数据加载错误计数 | BrokerMetadataListener 在加载元数据日志并基于它生成新的 MetadataDelta 时遇到的错误数。 | kafka.server:type=broker-metadata-metrics,name=metadata-load-error-count |
元数据应用错误计数 | BrokerMetadataPublisher 在基于最新 MetadataDelta 应用新 MetadataImage 时遇到的错误数。 | kafka.server:type=broker-metadata-metrics,name=metadata-apply-error-count |
创建者/使用者/连接/流的常见监控指标
以下指标在创建者/使用者/连接器/流实例上可用。有关具体指标,请参阅以下部分。量度/属性名称 | 描述 | Mbean 名称 |
---|---|---|
connection-close-rate (连接关闭率) | 在窗口中每秒关闭的连接数。 | kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+) |
连接-关闭-总计 | 窗口中关闭的连接总数。 | kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+) |
连接创建速率 | 在窗口中每秒建立的新连接数。 | kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+) |
连接创建总计 | 在窗口中建立的新连接总数。 | kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+) |
网络 IO 速率 | 每秒对所有连接执行网络作 (读取或写入) 的平均次数。 | kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+) |
网络 io-total | 所有连接上的网络作(读取或写入)总数。 | kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+) |
传出字节率 | 每秒发送到所有服务器的平均传出字节数。 | kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+) |
传出字节总计 | 发送到所有服务器的传出字节总数。 | kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+) |
请求速率 | 每秒发送的平均请求数。 | kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+) |
请求总计 | 发送的请求总数。 | kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+) |
请求大小平均值 | 窗口中所有请求的平均大小。 | kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+) |
请求大小最大 | 窗口中发送的任何请求的最大大小。 | kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+) |
传入字节率 | 字节/秒读取所有套接字。 | kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+) |
传入字节总计 | 从所有套接字读取的总字节数。 | kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+) |
响应率 | 每秒收到的响应数。 | kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+) |
response-total 响应 | 收到的回复总数。 | kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+) |
选择速率 | I/O 层每秒检查要执行的新 I/O 的次数。 | kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+) |
选择总计 | I/O 层检查新 I/O 以执行的总次数。 | kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+) |
io-等待时间-ns-avg | I/O 线程等待套接字准备好进行读取或写入所花费的平均时间长度(以纳秒为单位)。 | kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+) |
io-等待时间-ns-total | I/O 线程等待所花费的总时间(以纳秒为单位)。 | kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+) |
io-waittime-total | *荒废的*I/O 线程等待所花费的总时间(以纳秒为单位)。替换是 。io-wait-time-ns-total |
kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+) |
io-wait-ratio (io-等待比率) | I/O 线程等待所用的时间的分数。 | kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+) |
io-时间-ns-avg | 每次 select 调用的 I/O 平均时间长度(以纳秒为单位)。 | kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+) |
io-time-ns-total | I/O 线程执行 I/O 所花费的总时间(以纳秒为单位)。 | kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+) |
iotime-total | *荒废的*I/O 线程执行 I/O 所花费的总时间(以纳秒为单位)。替换是 。io-time-ns-total |
kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+) |
io-ratio | I/O 线程执行 I/O 所花费的时间的分数。 | kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+) |
连接计数 | 当前活动连接数。 | kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+) |
成功身份验证率 | 使用 SASL 或 SSL 成功进行身份验证的每秒连接数。 | kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+) |
成功身份验证总数 | 使用 SASL 或 SSL 成功进行身份验证的连接总数。 | kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+) |
失败的身份验证速率 | 身份验证失败的每秒连接数。 | kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+) |
失败的身份验证总数 | 身份验证失败的连接总数。 | kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+) |
成功重新身份验证率 | 使用 SASL 成功重新进行身份验证的每秒连接数。 | kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+) |
成功重新验证总计 | 使用 SASL 成功重新进行身份验证的连接总数。 | kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+) |
reauthentication-latency-max (重新验证延迟最大值) | 由于重新身份验证而观察到的最大延迟(以毫秒为单位)。 | kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+) |
reauthentication-latency-avg (重新验证延迟平均值) | 由于重新身份验证而观察到的平均延迟(以毫秒为单位)。 | kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+) |
失败的重新身份验证率 | 重新身份验证失败的每秒连接数。 | kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+) |
失败的重新身份验证总计 | 重新身份验证失败的连接总数。 | kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+) |
成功身份验证 no-reauth-total | 由不支持重新身份验证的较旧的 2.2.0 之前的 SASL 客户端成功验证的连接总数。只能为非零值。 | kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+) |
producer/consumer/connect/streams 的常见 Per-broker 指标
以下指标在创建者/使用者/连接器/流实例上可用。有关具体指标,请参阅以下部分。量度/属性名称 | 描述 | Mbean 名称 |
---|---|---|
传出字节率 | 节点每秒发送的平均传出字节数。 | kafka.[producer|consumer|connect]:type=[消费者|生产者|连接]-node-metrics,client-id=([-.\w]+),node-id=([0-9]+) |
传出字节总计 | 为节点发送的传出字节总数。 | kafka.[producer|consumer|connect]:type=[消费者|生产者|连接]-node-metrics,client-id=([-.\w]+),node-id=([0-9]+) |
请求速率 | 节点每秒发送的平均请求数。 | kafka.[producer|consumer|connect]:type=[消费者|生产者|连接]-node-metrics,client-id=([-.\w]+),node-id=([0-9]+) |
请求总计 | 为节点发送的请求总数。 | kafka.[producer|consumer|connect]:type=[消费者|生产者|连接]-node-metrics,client-id=([-.\w]+),node-id=([0-9]+) |
请求大小平均值 | 节点的窗口中所有请求的平均大小。 | kafka.[producer|consumer|connect]:type=[消费者|生产者|连接]-node-metrics,client-id=([-.\w]+),node-id=([0-9]+) |
请求大小最大 | 在窗口中为节点发送的任何请求的最大大小。 | kafka.[producer|consumer|connect]:type=[消费者|生产者|连接]-node-metrics,client-id=([-.\w]+),node-id=([0-9]+) |
传入字节率 | 节点每秒接收的平均字节数。 | kafka.[producer|consumer|connect]:type=[消费者|生产者|连接]-node-metrics,client-id=([-.\w]+),node-id=([0-9]+) |
传入字节总计 | 节点接收的总字节数。 | kafka.[producer|consumer|connect]:type=[消费者|生产者|连接]-node-metrics,client-id=([-.\w]+),node-id=([0-9]+) |
请求延迟平均值 | 节点的平均请求延迟(以毫秒为单位)。 | kafka.[producer|consumer|connect]:type=[消费者|生产者|连接]-node-metrics,client-id=([-.\w]+),node-id=([0-9]+) |
请求延迟最大 | 节点的最大请求延迟(以毫秒为单位)。 | kafka.[producer|consumer|connect]:type=[消费者|生产者|连接]-node-metrics,client-id=([-.\w]+),node-id=([0-9]+) |
响应率 | 节点每秒收到的响应数。 | kafka.[producer|consumer|connect]:type=[消费者|生产者|连接]-node-metrics,client-id=([-.\w]+),node-id=([0-9]+) |
response-total 响应 | 节点收到的响应总数。 | kafka.[producer|consumer|connect]:type=[消费者|生产者|连接]-node-metrics,client-id=([-.\w]+),node-id=([0-9]+) |
生产者监控
以下指标在创建者实例上可用。量度/属性名称 | 描述 | Mbean 名称 |
---|---|---|
等待线程 | 阻塞等待缓冲区内存将其记录排入队列的用户线程数。 | kafka.producer:type=producer-metrics,client-id=([-.\w]+) |
缓冲区总字节数 | 客户端可以使用的最大缓冲区内存量(无论当前是否使用)。 | kafka.producer:type=producer-metrics,client-id=([-.\w]+) |
缓冲区可用字节 | 未使用的缓冲区内存总量(未分配或在可用列表中)。 | kafka.producer:type=producer-metrics,client-id=([-.\w]+) |
缓冲池等待时间 | appender 等待空间分配的时间分数。 | kafka.producer:type=producer-metrics,client-id=([-.\w]+) |
缓冲池等待时间总计 | *荒废的*appender 等待空间分配的总时间,以纳秒为单位。替换是bufferpool-wait-time-ns-total |
kafka.producer:type=producer-metrics,client-id=([-.\w]+) |
缓冲池等待时间 ns-总计 | appender 等待空间分配的总时间,以纳秒为单位。 | kafka.producer:type=producer-metrics,client-id=([-.\w]+) |
flush-time-ns-total (刷新时间 ns-总计) | Producer 在 Producer.flush 中花费的总时间(以纳秒为单位)。 | kafka.producer:type=producer-metrics,client-id=([-.\w]+) |
txn-init-time-ns-total | 创建者初始化交易所花费的总时间,以纳秒为单位(对于 EOS)。 | kafka.producer:type=producer-metrics,client-id=([-.\w]+) |
txn-开始时间-ns-总计 | 创建者在 beginTransaction 中花费的总时间,以纳秒为单位(对于 EOS)。 | kafka.producer:type=producer-metrics,client-id=([-.\w]+) |
txn-send-offsets-time-ns-total | 创建者向交易发送偏移量所花费的总时间,以纳秒为单位(对于 EOS)。 | kafka.producer:type=producer-metrics,client-id=([-.\w]+) |
txn-commit-time-ns-total | 创建者提交事务所花费的总时间,以纳秒为单位(对于 EOS)。 | kafka.producer:type=producer-metrics,client-id=([-.\w]+) |
txn-abort-time-ns-total | 创建者中止事务所花费的总时间,以纳秒为单位(对于 EOS)。 | kafka.producer:type=producer-metrics,client-id=([-.\w]+) |
Producer Sender 指标
kafka.producer:type=producer-metrics,client-id=“{client-id}” | ||
属性名称 | 描述 | |
---|---|---|
批量大小平均 | 每个请求每个分区发送的平均字节数。 | |
最大批量大小 | 每个请求每个分区发送的最大字节数。 | |
批量拆分速率 | 每秒平均批量拆分数 | |
批处理-拆分-总计 | 批量拆分的总数 | |
压缩率平均值 | 记录批处理的平均压缩率,定义为压缩的批处理大小与未压缩大小的平均比率。 | |
元数据年龄 | 正在使用的当前创建者元数据的期限(以秒为单位)。 | |
生产节流时间平均 | 代理限制请求的平均时间(以毫秒为单位) | |
生产节流时间最大 | 代理限制请求的最长时间(以毫秒为单位) | |
记录错误率 | 导致错误的每秒平均记录发送数 | |
记录错误总计 | 导致错误的记录发送总数 | |
记录队列时间平均值 | 在发送缓冲区中花费的平均记录批次时间(以毫秒为单位)。 | |
记录队列时间最大值 | 在发送缓冲区中花费的最长时间(以毫秒为单位)。 | |
记录重试率 | 每秒重试记录发送的平均次数 | |
记录重试总数 | 重试记录发送的总数 | |
记录发送速率 | 每秒发送的平均记录数。 | |
记录发送总计 | 发送的记录总数。 | |
记录大小平均值 | 平均记录大小 | |
记录大小最大值 | 最大记录大小 | |
每个请求平均记录数 | 每个请求的平均记录数。 | |
请求延迟平均值 | 平均请求延迟(毫秒) | |
请求延迟最大 | 最大请求延迟(毫秒) | |
请求中 | 当前等待响应的正在进行的请求数。 | |
kafka.producer:type=producer-topic-metrics,client-id=“{client-id}”,topic=“{topic}” | ||
属性名称 | 描述 | |
字节率 | 主题每秒发送的平均字节数。 | |
总字节数 | 为主题发送的总字节数。 | |
compression-rate 压缩率 | 主题的记录批次的平均压缩率,定义为压缩的批次大小与未压缩大小的平均比率。 | |
记录错误率 | 导致主题出错的平均每秒记录发送数 | |
记录错误总计 | 导致主题出错的记录发送总数 | |
记录重试率 | 为主题发送的平均每秒重试记录数 | |
记录重试总数 | 主题发送的重试记录总数 | |
记录发送速率 | 主题每秒发送的平均记录数。 | |
记录发送总计 | 为主题发送的记录总数。 |
消费者监控
以下指标可用于使用者实例。量度/属性名称 | 描述 | Mbean 名称 |
---|---|---|
轮询平均值 | poll() 调用之间的平均延迟。 | kafka.consumer:type=consumer-metrics,client-id=([-.\w]+) |
轮询最大值之间的时间 | 调用 poll() 之间的最大延迟。 | kafka.consumer:type=consumer-metrics,client-id=([-.\w]+) |
上次轮询秒前 | 自上次 poll() 调用以来的秒数。 | kafka.consumer:type=consumer-metrics,client-id=([-.\w]+) |
poll-idle-ratio-avg (平均轮询空闲比率) | 消费者的 poll() 处于空闲状态的平均时间分数,而不是等待用户代码处理记录的时间。 | kafka.consumer:type=consumer-metrics,client-id=([-.\w]+) |
提交时间 ns-total | Consumer 提交所花费的总时间(以纳秒为单位)。 | kafka.consumer:type=consumer-metrics,client-id=([-.\w]+) |
提交同步时间 ns-total | Consumer 提交偏移量所花费的总时间,以纳秒为单位(对于 AOS)。 | kafka.consumer:type=consumer-metrics,client-id=([-.\w]+) |
Consumer Group 指标
量度/属性名称 | 描述 | Mbean 名称 |
---|---|---|
提交延迟平均值 | 提交请求所用的平均时间 | kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+) |
提交延迟最大 | 提交请求所花费的最大时间 | kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+) |
提交率 | 每秒提交调用数 | kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+) |
提交总计 | 提交调用的总数 | kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+) |
已分配分区 | 当前分配给此使用者的分区数 | kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+) |
心跳响应时间最大值 | 接收检测信号请求的响应所用的最长时间 | kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+) |
心跳率 | 每秒的平均检测信号数 | kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+) |
心跳总 | 检测信号总数 | kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+) |
加入时间平均 | 组重新加入所用的平均时间 | kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+) |
加入时间最大值 | 组重新加入所用的最长时间 | kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+) |
加入率 | 每秒加入组的次数 | kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+) |
join-total | 组加入总数 | kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+) |
同步时间平均值 | 组同步所用的平均时间 | kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+) |
sync-time-max (同步时间最大值) | 组同步所用的最长时间 | kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+) |
sync-rate 同步速率 | 每秒的组同步数 | kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+) |
sync-total 同步总计 | 组同步的总数 | kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+) |
rebalance-latency-avg (重新平衡延迟平均值) | 组再平衡所用的平均时间 | kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+) |
最大再平衡延迟 | 组再平衡所用的最长时间 | kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+) |
rebalance-latency-total (总延迟) | 到目前为止,组再平衡所花费的总时间 | kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+) |
再平衡总计 | 参与的组再平衡总数 | kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+) |
每小时再平衡率 | 每小时参与的组再平衡数 | kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+) |
失败的再平衡总计 | 失败的组重新平衡的总数 | kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+) |
每小时 failed-rebalance-rate-per -hour | 每小时失败的组再平衡事件数 | kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+) |
上次再平衡秒前 | 自上次再平衡事件以来的秒数 | kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+) |
上次心跳秒前 | 自上次控制器检测信号以来的秒数 | kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+) |
partitions-revoked-latency-avg | on-partitions-revoked 再平衡侦听器回调所用的平均时间 | kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+) |
partitions-revoked-latency-max (分区撤销延迟最大值) | on-partitions-revoked 再平衡侦听器回调所用的最大时间 | kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+) |
分区分配延迟平均值 | on-partitions-assigned rebalance 侦听器回调所用的平均时间 | kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+) |
分区分配延迟最大值 | on-partitions-assigned 再平衡侦听器回调所用的最大时间 | kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+) |
分区丢失延迟平均值 | on-partitions-lost 再平衡监听器回调的平均耗时 | kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+) |
分区丢失延迟最大值 | on-partitions-lost 再平衡监听器回调所用的最大时间 | kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.\w]+) |
Consumer Fetch 指标
kafka.consumer:type=consumer-fetch-manager-metrics,client-id=“{client-id}” | ||
属性名称 | 描述 | |
---|---|---|
字节消耗率 | 每秒消耗的平均字节数 | |
总消耗的字节数 | 消耗的总字节数 | |
fetch-latency-avg | 获取请求所用的平均时间。 | |
fetch-latency-max (获取延迟最大值) | 任何 fetch 请求所用的最长时间。 | |
fetch-rate 获取速率 | 每秒的 fetch 请求数。 | |
获取大小平均值 | 每个请求提取的平均字节数 | |
获取大小最大 | 每个请求提取的最大字节数 | |
获取节流时间平均值 | 平均限制时间(以毫秒为单位) | |
获取节流时间最大值 | 最大限制时间(毫秒) | |
fetch-total (获取总计) | 获取请求的总数。 | |
记录消耗率 | 每秒消耗的平均记录数 | |
已消耗的记录总数 | 消耗的记录总数 | |
记录滞后最大 | 此窗口中任何分区的记录数的最大滞后。注意:这是基于当前偏移量,而不是提交的偏移量 | |
记录-潜在客户-分钟 | 此窗口中任何分区的记录数的最小领先优势 | |
每个请求平均记录数 | 每个请求中的平均记录数 | |
kafka.consumer:type=consumer-fetch-manager-metrics,client-id=“{client-id}”,topic=“{topic}” | ||
属性名称 | 描述 | |
字节消耗率 | 主题每秒消耗的平均字节数 | |
总消耗的字节数 | 主题消耗的总字节数 | |
获取大小平均值 | 主题的每个请求提取的平均字节数 | |
获取大小最大 | 主题的每个请求提取的最大字节数 | |
记录消耗率 | 主题每秒消耗的平均记录数 | |
已消耗的记录总数 | 主题消耗的记录总数 | |
每个请求平均记录数 | 主题的每个请求中的平均记录数 | |
kafka.consumer:type=consumer-fetch-manager-metrics,partition=“{partition}”,topic=“{topic}”,client-id=“{client-id}” | ||
属性名称 | 描述 | |
首选只读副本 | 分区的当前只读副本,如果从 leader 读取,则为 -1 | |
记录滞后 | 分区的最新滞后 | |
记录-滞后-平均值 | 分区的平均滞后 | |
记录滞后最大 | 分区的最大滞后 | |
记录潜在客户 | 分区的最新线索 | |
记录-潜在客户-平均值 | 分区的平均导联 | |
记录-潜在客户-分钟 | 分区的最小导引 |
连接监控
Connect 工作进程包含所有生产者和使用者指标以及特定于 Connect 的指标。 工作进程本身具有许多指标,而每个连接器和任务都有其他指标。 [2023-09-15 00:40:42,725]INFO Metrics 计划程序已关闭 (org.apache.kafka.common.metrics.Metrics:693) [2023-09-15 00:40:42,729]INFO Metrics Reporters 已关闭 (org.apache.kafka.common.metrics.Metrics:703)kafka.connect:type=connect-worker-metrics | ||
属性名称 | 描述 | |
---|---|---|
连接器计数 | 此工作程序中运行的连接器数量。 | |
连接器启动尝试总数 | 此 worker 尝试的连接器启动总数。 | |
连接器启动失败百分比 | 此工作线程的连接器启动失败的平均百分比。 | |
连接器启动失败总数 | 失败的连接器启动总数。 | |
连接器启动成功百分比 | 此工作线程的连接器启动成功的平均百分比。 | |
连接器-启动-成功-总计 | 成功的连接器启动总数。 | |
任务计数 | 此 worker 中运行的任务数。 | |
任务启动尝试总数 | 此工作程序尝试的任务启动总数。 | |
任务启动失败百分比 | 此工作程序的失败任务启动的平均百分比。 | |
任务启动失败总数 | 失败的任务启动总数。 | |
任务启动成功百分比 | 此工作人员的任务开始中成功的平均百分比。 | |
任务-启动-成功-总计 | 成功的任务启动总数。 | |
kafka.connect:type=connect-worker-metrics,connector=“{connector}” | ||
属性名称 | 描述 | |
连接器销毁任务计数 | worker 上 connector 的已销毁任务数。 | |
连接器失败任务计数 | 工作线程上连接器的失败任务数。 | |
连接器暂停任务计数 | worker 上连接器的暂停任务数。 | |
连接器重启任务计数 | worker 上 connector 的重启任务数。 | |
连接器运行任务计数 | worker 上连接器的运行任务数。 | |
连接器总任务计数 | worker 上连接器的任务数。 | |
连接器未分配任务计数 | 工作程序上连接器的未分配任务数。 | |
kafka.connect:type=connect-worker-rebalance-metrics | ||
属性名称 | 描述 | |
completed-rebalances-total | 此工作程序完成的再平衡总数。 | |
连接协议 | 此集群使用的 Connect 协议 | |
时代 | 此 worker 的 epoch 或代号。 | |
leader-name (领导名称) | 组长的姓名。 | |
再平衡平均时间 ms | 此工作程序重新平衡所花费的平均时间(以毫秒为单位)。 | |
重新平衡最大时间 ms | 此工作程序重新平衡所花费的最长时间(以毫秒为单位)。 | |
平衡 | 此 worker 当前是否正在重新平衡。 | |
自上次再平衡毫秒以来的时间 | 自此 worker 完成最近一次再平衡以来的时间(以毫秒为单位)。 | |
kafka.connect:type=connector-metrics,connector=“{connector}” | ||
属性名称 | 描述 | |
连接器类 | 连接器类的名称。 | |
连接器类型 | 连接器的类型。“source”或“sink”之一。 | |
连接器版本 | 连接器报告的连接器类的版本。 | |
地位 | 连接器的状态。“unassigned”、“running”、“paused”、“stopped”、“failed”或“restarting”之一。 | |
kafka.connect:type=connector-task-metrics,connector=“{connector}”,task=“{task}” | ||
属性名称 | 描述 | |
批量大小平均 | 任务到目前为止已处理的批次中的平均记录数。 | |
最大批量大小 | 到目前为止,任务已处理的最大批次中的记录数。 | |
偏移提交平均时间毫秒 | 此任务提交偏移量所用的平均时间(以毫秒为单位)。 | |
偏移提交失败百分比 | 此任务的偏移提交尝试失败的平均百分比。 | |
偏移提交最大时间毫秒 | 此任务提交偏移量所用的最长时间(以毫秒为单位)。 | |
offset-commit-success-percentage (偏移提交成功百分比) | 此任务的偏移提交尝试成功的平均百分比。 | |
暂停比率 | 此任务处于暂停状态的时间分数。 | |
运行比率 | 此任务处于 running 状态的时间分数。 | |
地位 | 连接器任务的状态。“unassigned”、“running”、“paused”、“failed”或“restarting”之一。 | |
kafka.connect:type=sink-task-metrics,connector=“{connector}”,task=“{task}” | ||
属性名称 | 描述 | |
offset-commit-completion-rate (偏移提交完成率) | 成功完成的每秒偏移提交完成的平均数量。 | |
offset-commit-completion-total | 已成功完成的偏移提交完成总数。 | |
偏移提交 seq-no | offset 提交的当前序列号。 | |
偏移提交跳过率 | 每秒收到时间过晚并跳过/忽略的偏移提交完成的平均数量。 | |
偏移提交跳过总计 | 接收时间过晚并跳过/忽略的偏移提交完成总数。 | |
分区计数 | 分配给此任务的主题分区数,属于此工作程序中的命名接收器连接器。 | |
放置批次平均时间毫秒 | 此任务放置一批 sink 记录所用的平均时间。 | |
放置批处理最大时间毫秒 | 此任务放置一批 sink 记录所用的最长时间。 | |
sink-record-active-count (接收记录主动计数) | 已从 Kafka 读取但尚未被 sink 任务完全提交/刷新/确认的记录数。 | |
sink-record-active-count-avg | 已从 Kafka 读取但尚未被 sink 任务完全提交/刷新/确认的平均记录数。 | |
接收器记录活动计数最大值 | 已从 Kafka 读取但尚未被 sink 任务完全提交/刷新/确认的最大记录数。 | |
sink-record-lag-max (接收记录滞后最大值) | 对于任何主题分区,sink 任务落后于使用者位置的记录数的最大滞后。 | |
接收记录读取速率 | 对于属于此工作程序中命名接收器连接器的此任务,每秒从 Kafka 读取的平均记录数。这是在应用转换之前。 | |
接收器记录读取总数 | 自上次重新启动任务以来,此任务属于此工作程序中命名接收器连接器的从 Kafka 读取的记录总数。 | |
sink-record-send-rate 发送速率 | 从转换输出和发送/放置到此任务的平均每秒记录数,属于此工作程序中的命名接收器连接器。这是在应用转换之后,不包括由转换筛选出的任何记录。 | |
sink-record-send-total (接收记录发送总计) | 自上次重新启动任务以来,从转换输出并发送/放入此任务的记录总数,属于此工作程序中的命名接收器连接器。 | |
kafka.connect:type=source-task-metrics,connector=“{connector}”,task=“{task}” | ||
属性名称 | 描述 | |
轮询批处理平均时间毫秒 | 此任务轮询一批源记录所用的平均时间(以毫秒为单位)。 | |
轮询批处理最大时间毫秒 | 此任务轮询一批源记录所用的最长时间(以毫秒为单位)。 | |
源记录活动计数 | 此任务已生成但尚未完全写入 Kafka 的记录数。 | |
源记录有效计数平均值 | 此任务已生成但尚未完全写入 Kafka 的平均记录数。 | |
源记录活动计数最大值 | 此任务已生成但尚未完全写入 Kafka 的最大记录数。 | |
源记录轮询率 | 此任务生成/轮询的平均每秒记录数(转换前),属于此工作程序中的指定源连接器。 | |
源记录轮询总数 | 此任务生成/轮询的记录总数(转换前),属于此工作程序中的指定源连接器。 | |
源记录写入速率 | 自上次重新启动任务以来,对于属于此工作程序中指定源连接器的此任务,每秒写入 Kafka 的平均记录数。这是在应用转换之后,不包括由转换筛选出的任何记录。 | |
源记录写入总计 | 自上次重新启动任务以来,此任务的写入 Kafka 的记录数,属于此工作程序中的指定源连接器。这是在应用转换之后,不包括由转换筛选出的任何记录。 | |
transaction-size-avg (交易大小平均) | 到目前为止,任务已提交的事务中的平均记录数。 | |
事务大小最大 | 到目前为止,任务已提交的最大事务中的记录数。 | |
事务大小最小 | 到目前为止,任务已提交的最小事务中的记录数。 | |
kafka.connect:type=task-error-metrics,connector=“{connector}”,task=“{task}” | ||
属性名称 | 描述 | |
deadletterqueue-produce-failures | 对死信队列的失败写入次数。 | |
deadletterqueue-生产请求 | 尝试写入死信队列的次数。 | |
上次错误时间戳 | 此任务上次遇到错误时的纪元时间戳。 | |
总计错误记录 | 记录的错误数。 | |
总记录错误 | 此任务中的记录处理错误数。 | |
总记录失败数 | 此任务中记录处理失败的次数。 | |
总跳过记录数 | 由于错误而跳过的记录数。 | |
总重试次数 | 重试的作数。 |
流监控
Kafka Streams 实例包含所有创建者和使用者指标以及特定于 Streams 的其他指标。 这些指标有三个记录级别:、 、 和 。info
debug
trace
请注意,这些指标具有 4 层层次结构。在顶层,每个 started 都有 client 级别的指标 Kafka Streams 客户端。每个客户端都有流线程,具有自己的指标。每个流线程都有任务,它们的 自己的指标。每个任务都有许多处理器节点,它们都有自己的指标。每个任务还具有许多状态存储 和记录缓存,它们都有自己的指标。
使用以下配置选项指定哪些指标 您希望收集:metrics.recording.level="info"
客户端指标
以下所有量度的记录级别为 :info
量度/属性名称 | 描述 | Mbean 名称 |
---|---|---|
版本 | Kafka Streams 客户端的版本。 | kafka.streams:type=stream-metrics,client-id=([-.\w]+) |
提交 ID | Kafka Streams 客户端的版本控制提交 ID。 | kafka.streams:type=stream-metrics,client-id=([-.\w]+) |
应用程序 ID | Kafka Streams 客户端的应用程序 ID。 | kafka.streams:type=stream-metrics,client-id=([-.\w]+) |
拓扑描述 | 在 Kafka Streams 客户端中执行的拓扑的描述。 | kafka.streams:type=stream-metrics,client-id=([-.\w]+) |
州 | Kafka Streams 客户端的状态。 | kafka.streams:type=stream-metrics,client-id=([-.\w]+) |
失败的流线程 | 自 Kafka Streams 客户端启动以来失败的流线程数。 | kafka.streams:type=stream-metrics,client-id=([-.\w]+) |
线程指标
以下所有量度的记录级别为 :info
量度/属性名称 | 描述 | Mbean 名称 |
---|---|---|
提交延迟平均值 | 此线程的所有正在运行的任务之间的提交平均执行时间(以毫秒为单位)。 | kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+) |
提交延迟最大 | 此线程的所有正在运行的任务中提交的最大执行时间(以毫秒为单位)。 | kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+) |
poll-latency-avg | 使用者轮询的平均执行时间(以毫秒为单位)。 | kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+) |
轮询延迟最大值 | 使用者轮询的最大执行时间(以毫秒为单位)。 | kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+) |
进程延迟平均值 | 用于处理的平均执行时间(以毫秒为单位)。 | kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+) |
进程延迟最大 | 用于处理的最长执行时间(以毫秒为单位)。 | kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+) |
punctuate-latency-avg | 标点符号的平均执行时间(以毫秒为单位)。 | kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+) |
punctuate-latency-max (最大延迟) | 标点符号的最长执行时间(以毫秒为单位)。 | kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+) |
提交率 | 每秒的平均提交数。 | kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+) |
提交总计 | 提交调用的总数。 | kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+) |
poll-rate 轮询率 | 每秒使用者轮询调用的平均次数。 | kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+) |
轮询总数 | 使用者轮询调用的总数。 | kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+) |
处理速率 | 每秒处理的平均记录数。 | kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+) |
process-total (进程总计) | 已处理记录的总数。 | kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+) |
punctuate-rate (点点率) | 每秒的平均标点符号调用数。 | kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+) |
punctuate-total | 标点点调用的总数。 | kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+) |
任务创建速率 | 每秒创建的平均任务数。 | kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+) |
任务创建总计 | 创建的任务总数。 | kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+) |
任务关闭率 | 每秒关闭的平均任务数。 | kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+) |
任务已关闭总计 | 已关闭的任务总数。 | kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+) |
阻止时间 ns-总计 | 线程在 Kafka 上阻塞所用的总时间。 | kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+) |
线程启动时间 | 线程的启动时间。 | kafka.streams:type=stream-thread-metrics,thread-id=([-.\w]+) |
任务指标
以下所有量度的记录级别为 ,但 dropped-records-* 和 active-process-ratio 指标,其记录级别为:debug
info
量度/属性名称 | 描述 | Mbean 名称 |
---|---|---|
进程延迟平均值 | 用于处理的平均执行时间(以 ns 为单位)。 | kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+) |
进程延迟最大 | 用于处理的最长执行时间(以 ns 为单位)。 | kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+) |
处理速率 | 此任务的所有源处理器节点中每秒处理的平均记录数。 | kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+) |
process-total (进程总计) | 此任务的所有源处理器节点中已处理的记录总数。 | kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+) |
提交延迟平均值 | 提交的平均执行时间(以 ns 为单位)。 | kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+) |
提交延迟最大 | 提交的最长执行时间(以 ns 为单位)。 | kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+) |
提交率 | 每秒平均提交调用数。 | kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+) |
提交总计 | 提交调用的总数。 | kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+) |
记录迟到平均 | 观察到的平均记录延迟时间 (流时间 - 记录时间戳)。 | kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+) |
记录迟到最大 | 观察到的最大记录延迟时间(流时间 - 记录时间戳)。 | kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+) |
强制处理速率 | 每秒强制处理的平均数量。 | kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+) |
强制处理总计 | 强制处理的总数。 | kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+) |
丢弃记录率 | 此任务中丢弃的平均记录数。 | kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+) |
丢弃的记录总数 | 此任务中丢弃的记录总数。 | kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+) |
主动进程比率 | 流线程在所有已分配的活动任务中处理此任务所花费的时间的分数。 | kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+) |
输入缓冲区字节总数 | 此任务累积的总字节数 | kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+) |
缓存大小字节总数 | 此任务累积的缓存大小(以字节为单位)。 | kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+) |
处理器节点指标
以下指标仅适用于某些类型的节点,即 process-* 指标仅适用于 源处理器节点,则度量仅适用于抑制作节点,度量仅适用于窗口聚合节点,度量仅适用于源处理器节点和终端节点(没有后续节点的节点 节点)。 所有量度的记录级别 除外,但 记录级别 :suppression-emit-*
emit-final-*
record-e2e-latency-*
debug
record-e2e-latency-*
info
量度/属性名称 | 描述 | Mbean 名称 |
---|---|---|
总消耗的字节数 | 源处理器节点占用的总字节数。 | kafka.streams:type=stream-topic-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+),topic=([-.\w]+) |
总生成字节数 | 接收器处理器节点生成的字节总数。 | kafka.streams:type=stream-topic-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+),topic=([-.\w]+) |
处理速率 | 源处理器节点每秒处理的平均记录数。 | kafka.streams:type=stream-processor-node-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+) |
process-total (进程总计) | 源处理器节点每秒处理的记录总数。 | kafka.streams:type=stream-processor-node-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+) |
抑制发出率 | 从隐藏作节点向下游发出的记录的速率。 | kafka.streams:type=stream-processor-node-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+) |
抑制 - 发出-总计 | 已从隐藏作节点下游发出的记录总数。 | kafka.streams:type=stream-processor-node-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+) |
emit-final-latency-max (发出最终延迟最大) | 当可以发出记录时,发出最终记录的最大延迟。 | kafka.streams:type=stream-processor-node-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+) |
emit-final-latency-avg | 当可以发出记录时,发出最终记录的平均延迟。 | kafka.streams:type=stream-processor-node-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+) |
发出最终记录速率 | 可以发出记录时的记录发出速率。 | kafka.streams:type=stream-processor-node-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+) |
发出最终记录总数 | 发出的记录总数。 | kafka.streams:type=stream-processor-node-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+) |
记录 e2e-延迟-avg | 记录的平均端到端延迟,通过将记录时间戳与节点完全处理记录的系统时间进行比较来衡量。 | kafka.streams:type=stream-processor-node-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+) |
记录 e2e-延迟-max | 记录的最大端到端延迟,通过将记录时间戳与节点完全处理记录的系统时间进行比较来衡量。 | kafka.streams:type=stream-processor-node-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+) |
record-e2e-latency-min | 记录的最小端到端延迟,通过将记录时间戳与节点完全处理记录的系统时间进行比较来衡量。 | kafka.streams:type=stream-processor-node-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+) |
已消耗的记录总数 | 源处理器节点使用的记录总数。 | kafka.streams:type=stream-topic-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+),topic=([-.\w]+) |
已生成记录总数 | 接收器处理器节点生成的记录总数。 | kafka.streams:type=stream-topic-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+),topic=([-.\w]+) |
状态存储指标
以下所有量度的记录级别为 ,但 record-e2e-latency-* 量度的记录级别 除外。 请注意,该值在 for user's customized state stores 中指定; 对于内置的 State Store,目前我们有:debug
trace
store-scope
StoreSupplier#metricsScope()
in-memory-state
in-memory-lru-state
in-memory-window-state
in-memory-suppression
(用于抑制缓冲液)rocksdb-state
(用于 RocksDB 支持的键值存储)rocksdb-window-state
(适用于 RocksDB 支持的窗口存储)rocksdb-session-state
(适用于 RocksDB 支持的会话存储)
量度/属性名称 | 描述 | Mbean 名称 |
---|---|---|
放置延迟平均值 | 平均看跌执行时间(以 ns 为单位)。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
放置延迟最大 | 最大 put 执行时间(以 ns 为单位)。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
放置如果不存在延迟平均值 | 平均 put-if-absence 执行时间(以 ns 为单位)。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
如果不存在延迟最大 | 最大 put-if-absence 执行时间(以 ns 为单位)。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
获取延迟平均值 | 平均 get 执行时间(以 ns 为单位)。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
获取延迟最大值 | 最大 get 执行时间(以 ns 为单位)。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
删除延迟平均值 | 平均删除执行时间(以 ns 为单位)。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
删除延迟最大值 | 最大删除执行时间(以 ns 为单位)。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
放置所有延迟平均值 | 平均 put-all 执行时间(以 ns 为单位)。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
放置所有延迟最大 | 最大 put-all 执行时间(以 ns 为单位)。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
所有延迟平均值 | 所有作的平均执行时间(以 ns 为单位)。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
所有延迟最大 | 最大 all operation 执行时间(以 ns 为单位)。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
range-latency-avg (平均延迟) | 平均范围执行时间(以 ns 为单位)。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
最大范围延迟 | 最大范围执行时间(以 ns 为单位)。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
flush-latency-avg (刷新延迟平均值) | 平均 flush 执行时间(以 ns 为单位)。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
flush-latency-max (刷新延迟最大) | 最大 flush 执行时间(以 ns 为单位)。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
恢复延迟平均值 | 平均还原执行时间(以 ns 为单位)。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
恢复延迟最大 | 最大还原执行时间(以 ns 为单位)。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
put-rate (放价) | 此商店的平均看跌汇率。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
如果不存在 RATE,则放置 | 此商店的平均 put-if-present 率。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
get-rate | 此商店的平均 get rate。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
删除率 | 此存储的平均删除率。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
全部放率 | 此商店的平均 put-all 费率。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
全速 | 此商店的平均 all 运营率。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
range-rate (距离速率) | 此商店的平均范围比率。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
flush-rate 刷新率 | 此商店的平均冲水率。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
恢复速率 | 此存储的平均还原速率。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
suppression-buffer-size-avg | 采样窗口内缓冲数据的平均总大小(以字节为单位)。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),in-memory-suppression-id=([-.\w]+) |
suppression-buffer-size-max (最大缓冲区大小) | 采样窗口内缓冲数据的最大总大小(以字节为单位)。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),in-memory-suppression-id=([-.\w]+) |
抑制缓冲区计数平均值 | 在采样时段内缓冲的平均记录数。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),in-memory-suppression-id=([-.\w]+) |
suppression-buffer-count-max (抑制缓冲区计数最大值) | 在采样时段内缓冲的最大记录数。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),in-memory-suppression-id=([-.\w]+) |
记录 e2e-延迟-avg | 记录的平均端到端延迟,通过将记录时间戳与节点完全处理记录的系统时间进行比较来衡量。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
记录 e2e-延迟-max | 记录的最大端到端延迟,通过将记录时间戳与节点完全处理记录的系统时间进行比较来衡量。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
record-e2e-latency-min | 记录的最小端到端延迟,通过将记录时间戳与节点完全处理记录的系统时间进行比较来衡量。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
RocksDB 指标
RocksDB 指标分为基于统计信息的指标和基于属性的指标。 前者记录自 RocksDB 状态存储收集的统计信息,而后者记录自 RocksDB 公开的属性。 RocksDB 收集的统计数据提供了随时间推移的累积测量值,例如写入状态存储的字节数。 RocksDB 公开的属性提供当前测量值,例如当前使用的内存量。 请注意,对于内置的 RocksDB 状态存储,目前如下:store-scope
rocksdb-state
(用于 RocksDB 支持的键值存储)rocksdb-window-state
(适用于 RocksDB 支持的窗口存储)rocksdb-session-state
(适用于 RocksDB 支持的会话存储)
debug
量度/属性名称 | 描述 | Mbean 名称 |
---|---|---|
字节写入速率 | 每秒写入 RocksDB 状态存储的平均字节数。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
总写入字节数 | 写入 RocksDB 状态存储的总字节数。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
字节读取速率 | 每秒从 RocksDB 状态存储读取的平均字节数。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
字节读取总计 | 从 RocksDB 状态存储读取的总字节数。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
memtable 字节刷新率 | 每秒从 memtable 刷新到磁盘的平均字节数。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
memtable 字节刷新总计 | 从 memtable 刷新到磁盘的总字节数。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
memtable-hit-ratio (内存匹配比率) | memtable 命中数相对于 memtable 的所有查找的比率。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
memtable-flush-time-avg | memtable 刷新到磁盘的平均持续时间(以毫秒为单位)。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
memtable 刷新时间分钟 | memtable 刷新到磁盘的最短持续时间(以毫秒为单位)。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
memtable 刷新时间最大值 | memtable 刷新到磁盘的最长持续时间(以毫秒为单位)。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
块缓存数据命中率 | 数据块的块缓存命中数相对于数据块的所有查找到块缓存的比率。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
块缓存索引命中率 | 索引块的块缓存命中数相对于索引块到块缓存的所有查找的比率。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
块缓存过滤器命中率 | filter blocks 的块缓存命中数相对于 filter blocks 的所有查找到块缓存的比率。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
写入停顿持续时间平均值 | 写入停顿的平均持续时间(以毫秒为单位)。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
写入停顿持续时间总计 | 写入停顿的总持续时间(以毫秒为单位)。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
字节读取压缩率 | 压缩期间每秒读取的平均字节数。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
字节写入压缩率 | 压缩期间每秒写入的平均字节数。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
压缩时间平均 | 光盘压缩的平均持续时间(以毫秒为单位)。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
压缩时间-min | 光盘压缩的最短持续时间(以毫秒为单位)。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
压缩时间最大值 | 光盘压缩的最长持续时间(以毫秒为单位)。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
number-open-files 文件 | 当前打开的文件数。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
number-file-errors-total 文件错误总数 | 发生的文件错误总数。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
info
block-cache-*
量度/属性名称 | 描述 | Mbean 名称 |
---|---|---|
num-immutable-mem-table (不可变内存表) | 尚未刷新的不可变 memtable 的数量。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
当前大小活动内存表 | 活动 memtable 的近似大小(以字节为单位)。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
当前大小所有内存表 | 活动和未刷新的不可变 memtable 的近似大小(以字节为单位)。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
大小所有内存表 | 活动、未刷新的不可变和固定的不可变 memtable 的近似大小(以字节为单位)。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
num-entries-active-mem-table (数字条目活动内存表) | 活动 memtable 中的条目数。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
num-entries-imm-mem-tables | 未刷新的不可变 memtable 中的条目数。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
num-deletes-active-mem-table | 活动 memtable 中的 delete 条目数。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
num-deletes-imm-mem-tables | 未刷新的不可变 memtable 中的 delete 条目数。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
mem-table-flush-pending | 如果 memtable flush 处于待处理状态,则此指标报告 1,否则报告 0。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
运行刷新的 num 个 | 当前正在运行的 flush 的数量。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
压缩待定 | 如果至少有一个压缩处于待处理状态,则此指标报告 1,否则报告 0。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
num-running-comp作 | 当前正在运行的压缩数。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
estimate-pending-compaction-bytes (估计待处理压缩字节数) | 压缩需要在磁盘上重写以将所有级别降至以下的估计总字节数 目标大小 (仅对关卡压缩有效)。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
总 SST 文件大小 | 所有 SST 文件的总大小(以字节为单位)。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
实时 sst 文件大小 | 属于最新 LSM 树的所有 SST 文件的总大小(以字节为单位)。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
实时版本数 | LSM 树的实时版本数。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
块缓存容量 | 块缓存的容量(以字节为单位)。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
块缓存使用 | 驻留在块缓存中的条目的内存大小(以字节为单位)。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
块缓存固定使用 | 被固定在块缓存中的条目的内存大小(以字节为单位)。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
估计 num 键 | 活动和未刷新的不可变 memtable 和存储中的估计键数。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
估计表读取者 mem | 用于读取 SST 表的估计内存(以字节为单位),不包括块缓存中使用的内存。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
背景错误 | 后台错误的总数。 | kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+) |
记录缓存指标
以下所有量度的记录级别为 :debug
量度/属性名称 | 描述 | Mbean 名称 |
---|---|---|
平均命中率 | 平均缓存命中率定义为缓存读取命中数与总缓存读取请求的比率。 | kafka.streams:type=stream-record-cache-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),record-cache-id=([-.\w]+) |
命中率-最小值 | 最小缓存命中率。 | kafka.streams:type=stream-record-cache-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),record-cache-id=([-.\w]+) |
命中率最大值 | 最大缓存命中率。 | kafka.streams:type=stream-record-cache-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),record-cache-id=([-.\w]+) |
别人
我们建议监控 GC 时间和其他统计信息以及各种服务器统计信息,例如 CPU 利用率、I/O 服务时间等。 在客户端,我们建议监控消息/字节速率(全局和每个主题)、请求速率/大小/时间,在消费者端,监控所有分区中消息的最大滞后和最小获取请求速率。为了让消费者跟上,最大滞后需要小于阈值,最小获取速率需要大于 0。6.9 动物园守护者
稳定版
目前的稳定分支是 3.5。Kafka 会定期更新,以包含 3.5 系列中的最新版本。ZooKeeper 弃用
随着 Apache Kafka 3.5 的发布,Zookeeper 现在被标记为已弃用。计划在 Apache Kafka 的下一个主要版本(版本 4.0)中删除 ZooKeeper, 计划不早于 2024 年 4 月进行。在弃用阶段,Kafka 集群的元数据管理仍然支持 ZooKeeper, 但不建议用于新部署。还有一小部分功能有待在 KRaft 中实现 有关详细信息,请参阅当前缺失的功能。
迁移
将现有的基于 ZooKeeper 的 Kafka 集群迁移到 KRaft 目前是预览版,我们预计它将在 3.6 版本中准备好用于生产。建议用户开始规划迁移到 KRaft 并开始测试以提供任何反馈。有关如何执行从 ZooKeeper 到 KRaft 的实时迁移和当前限制的详细信息,请参阅 ZooKeeper 到 KRaft 的迁移。
3.x 和 ZooKeeper 支持
支持 ZooKeeper 模式的最终 3.x 次要版本将在发布后的 12 个月内收到关键错误修复和安全修复。
ZooKeeper 和 KRaft 时间线
有关 ZooKeeper 删除暂定时间表和计划发布的 KRaft 功能的详细信息和更新,请参阅 KIP-833。
实施 ZooKeeper
在作上,我们为健康的 ZooKeeper 安装执行以下作:- 物理/硬件/网络布局中的冗余:尽量不要把它们都放在同一个机架上,体面的(但不要发疯)硬件,尽量保持冗余的电源和网络路径等。典型的 ZooKeeper ensemble 有 5 或 7 台服务器,分别可以容忍 2 台和 3 台服务器宕机。如果您的部署规模较小,则使用 3 台服务器是可以接受的,但请记住,在这种情况下,您只能容忍 1 台服务器停机。
- I/O 隔离:如果您执行大量写入类型的流量,您几乎肯定会希望事务日志位于专用磁盘组上。对事务日志的写入是同步的(但为了提高性能而进行批处理),因此,并发写入会显著影响性能。ZooKeeper 快照可以是这样一个并发写入源,理想情况下应该写入与事务日志分开的磁盘组。快照以异步方式写入磁盘,因此通常可以与作系统和消息日志文件共享。您可以通过 dataLogDir 参数将服务器配置为使用单独的磁盘组。
- 应用程序隔离:除非您真正了解要安装在同一机器上的其他应用程序的应用程序模式,否则最好单独运行 ZooKeeper(尽管这可能是与硬件功能的平衡行为)。
- 小心虚拟化:它可以工作,具体取决于您的集群布局、读/写模式和 SLA,但虚拟化层引入的微小开销可能会累积并丢弃 ZooKeeper,因为它可能对时间非常敏感
- ZooKeeper 配置:它是 java,确保给它“足够”的堆空间(我们通常用 3-5G 运行它们,但这主要是由于我们这里的数据集大小)。遗憾的是,我们没有一个好的公式,但请记住,允许更多的 ZooKeeper 状态意味着快照可能会变得很大,而大型快照会影响恢复时间。事实上,如果快照变得太大(几 GB),则可能需要增加 initLimit 参数,以便为服务器提供足够的时间来恢复和加入 ensemble。
- 监控:JMX 和 4 个字母的单词 (4lw) 命令都非常有用,它们在某些情况下确实会重叠(在这些情况下,我们更喜欢 4 个字母的命令,它们似乎更可预测,或者至少,它们与 LI 监控基础设施配合得更好)
- 不要过度构建集群:大型集群,尤其是在写入密集型使用模式下,意味着大量的集群内通信(写入和后续集群成员更新的仲裁),但不要构建不足(否则可能会淹没集群)。拥有更多服务器会增加您的读取容量。
6.10 KRaft
配置
进程角色
在 KRaft 模式下,每个 Kafka 服务器都可以使用属性配置为控制器、代理或两者兼而有之。此属性可以具有以下值:process.roles
- 如果设置为 ,则服务器将充当代理。
process.roles
broker
- 如果设置为 ,则服务器将充当控制器。
process.roles
controller
- 如果设置为 ,则服务器将同时充当代理和控制器。
process.roles
broker,controller
- 如果根本未设置,则假定它处于 ZooKeeper 模式。
process.roles
同时充当代理和控制器的 Kafka 服务器称为“组合”服务器。组合服务器更易于作,适用于开发环境等小型使用案例。主要缺点是控制器与系统其余部分的隔离程度较低。例如,在组合模式下,无法独立于 broker 滚动或扩展 controller。不建议在关键部署环境中使用组合模式。
控制器
在 KRaft 模式下,选择特定的 Kafka 服务器作为控制器(与基于 ZooKeeper 的模式不同,在基于 ZooKeeper 的模式下,任何服务器都可以成为 Controller)。被选为控制器的服务器将参与元数据仲裁。每个控制器都是当前活动控制器的活动或热备用控制器。
Kafka 管理员通常会为此角色选择 3 或 5 台服务器,具体取决于成本和系统应承受的并发故障数量等因素,而不会影响可用性。大多数 Controller 必须处于活动状态才能保持可用性。使用 3 个控制器时,集群可以容忍 1 个控制器故障;如果集群有 5 个控制器,则集群可以容忍 2 个控制器故障。
Kafka 集群中的所有服务器都使用该属性发现仲裁投票者。这将标识应使用的 quorum controller 服务器。必须枚举所有控制器。每个控制器都用他们的 和 信息来标识。例如:controller.quorum.voters
id
host
port
controller.quorum.voters=id1@host1:port1,id2@host2:port2,id3@host3:port3
如果 Kafka 集群有 3 个名为 controller1、controller2 和 controller3 的控制器,则 controller1 可能具有以下配置:
process.roles=controller
node.id=1
listeners=CONTROLLER://controller1.example.com:9093
controller.quorum.voters=1@controller1.example.com:9093,2@controller2.example.com:9093,3@controller3.example.com:9093
每个 broker 和 controller 都必须设置 property.属性中提供的节点 ID 必须与控制器服务器上的相应 ID 匹配。例如,在 controller1 上,node.id 必须设置为 1,依此类推。每个节点 ID 在特定集群中的所有服务器中必须是唯一的。无论其值如何,任何两个服务器都不能具有相同的节点 ID。controller.quorum.voters
controller.quorum.voters
process.roles
存储工具
该命令可用于为您的新集群生成集群 ID。使用命令格式化集群中的每个服务器时,必须使用此集群 ID。kafka-storage.sh random-uuid
kafka-storage.sh format
这与 Kafka 过去的运作方式不同。以前,Kafka 会自动格式化空白存储目录,并自动生成新的集群 ID。更改的一个原因是自动格式化有时会掩盖错误情况。这对于控制器和代理服务器维护的元数据日志尤其重要。如果大多数控制器能够从空日志目录开始,则可能能够在缺少提交数据的情况下选举 leader。
调试
元数据仲裁工具
该工具可用于描述集群元数据分区的运行时状态。例如,以下命令显示元数据仲裁的摘要:kafka-metadata-quorum
> bin/kafka-metadata-quorum.sh --bootstrap-server broker_host:port describe --status
ClusterId: fMCL8kv1SWm87L_Md-I2hg
LeaderId: 3002
LeaderEpoch: 2
HighWatermark: 10
MaxFollowerLag: 0
MaxFollowerLagTimeMs: -1
CurrentVoters: [3000,3001,3002]
CurrentObservers: [0,1,2]
转储日志工具
该工具可用于调试集群元数据目录的日志分段和快照。该工具将扫描提供的文件并解码元数据记录。例如,此命令解码并打印第一个日志段中的记录:kafka-dump-log
> bin/kafka-dump-log.sh --cluster-metadata-decoder --files metadata_log_dir/__cluster_metadata-0/00000000000000000000.log
此命令解码并打印集群元数据快照中的记录:
> bin/kafka-dump-log.sh --cluster-metadata-decoder --files metadata_log_dir/__cluster_metadata-0/00000000000000000100-0000000001.checkpoint
元数据 Shell
该工具可用于以交互方式检查集群元数据分区的状态:kafka-metadata-shell
> bin/kafka-metadata-shell.sh --snapshot metadata_log_dir/__cluster_metadata-0/00000000000000000000.log
>> ls /
brokers local metadataQuorum topicIds topics
>> ls /topics
foo
>> cat /topics/foo/0/data
{
"partitionId" : 0,
"topicId" : "5zoAlv-xEh9xRANKXt1Lbg",
"replicas" : [ 1 ],
"isr" : [ 1 ],
"removingReplicas" : null,
"addingReplicas" : null,
"leader" : 1,
"leaderEpoch" : 0,
"partitionEpoch" : 0
}
>> exit
部署注意事项
- Kafka 服务器应设置为其中一个或两个,但不能同时设置为两者。组合模式可以在开发环境中使用,但应避免在关键部署环境中使用。
process.role
broker
controller
- 为了实现冗余,Kafka 集群应使用 3 个控制器。在关键环境中,不建议使用 3 个以上的控制器。在极少数情况下,部分网络故障时,群集元数据仲裁可能会变得不可用。此限制将在 Kafka 的未来版本中得到解决。
- Kafka 控制器将集群的所有元数据存储在内存和磁盘上。我们认为,对于典型的 Kafka 集群,元数据日志控制器上的 5GB 主内存和 5GB 磁盘空间就足够了。
缺失的功能
以下功能在 KRaft 模式下未完全实现:
- 支持具有多个存储目录的 JBOD 配置
- 修改独立 KRaft 控制器上的某些动态配置
- 委托令牌
ZooKeeper 到 KRaft 的迁移
ZooKeeper 到 KRaft 的迁移被视为早期访问功能,不建议用于生产集群。
从 ZK 到 KRaft 的迁移尚不支持以下功能:
- 在迁移期间或之后降级到 ZooKeeper 模式
- KRaft 中尚不支持的其他功能
请使用 JIRA 项目和 “kraft” 组件将 ZooKeeper 迁移到 KRaft 的问题。
术语
我们在这里使用术语 “迁移” 来指代更改 Kafka 集群元数据的过程 系统从 ZooKeeper 迁移到 KRaft 并迁移现有元数据。“升级”是指安装较新版本的 Kafka。不建议 在执行元数据迁移的同时升级软件。
我们还使用术语“ZK 模式”来指代使用 ZooKeeper 作为其元数据的 Kafka 代理 系统。“KRaft 模式”是指使用 KRaft 控制器仲裁作为其元数据系统的 Kafka 代理。
准备迁移
在开始迁移之前,Kafka 代理必须升级到软件版本 3.5.0 并具有 “inter.broker.protocol.version” 配置设置为 “3.5”。请参阅升级到 3.5.0 以了解 升级说明。
建议在迁移处于活动状态时为迁移组件启用 TRACE 级别日志记录。这可以 通过将以下 log4j 配置添加到每个 KRaft 控制器的 “log4j.properties” 文件中来完成。
log4j.logger.org.apache.kafka.metadata.migration=TRACE
在迁移期间,在 KRaft 控制器和 ZK 代理上启用 DEBUG 日志记录通常很有用。
预置 KRaft 控制器仲裁
在开始迁移之前,需要做两件事。首先,必须将 broker 配置为支持迁移,其次, 必须部署 KRaft 控制器 quorum。KRaft 控制器应使用与 现有的 Kafka 集群。这可以通过检查数据目录中的 “meta.properties” 文件之一来找到 中,或者通过运行以下命令。
./bin/zookeeper-shell.sh localhost:2181 get /cluster/id
KRaft 控制器仲裁还应使用最新的 .
当您使用该工具格式化节点时,此作会自动完成。
有关 KRaft 部署的更多说明,请参阅上述文档。metadata.version
kafka-storage.sh
除了标准的 KRaft 配置外,KRaft 控制器还需要启用对迁移的支持 以及提供 ZooKeeper 连接配置。
以下是已准备好迁移的 KRaft 控制器的示例配置:
# Sample KRaft cluster controller.properties listening on 9093 process.roles=controller node.id=3000 controller.quorum.voters=3000@localhost:9093 controller.listener.names=CONTROLLER listeners=CONTROLLER://:9093 # Enable the migration zookeeper.metadata.migration.enable=true # ZooKeeper client configuration zookeeper.connect=localhost:2181 # Other configs ...
注意:KRaft 集群 node.id
值必须与任何现有的 ZK 代理 broker.id
不同。
在 KRaft 模式下,broker 和 controller 共享相同的 Node ID 命名空间。
在代理上启用迁移
一旦 KRaft 控制器 quorum 启动,就需要重新配置并重新启动 broker。经纪人 可以滚动方式重新启动,以避免影响集群可用性。每个代理都需要 以下配置与 KRaft 控制器通信并启用迁移。
- Controller.quorum.voters
- Controller.listener.names
- 还应将 controller.listener.name 添加到 listener.security.property.map 中
- zookeeper.metadata.migration.enable
以下是已准备好迁移的代理的示例配置:
# Sample ZK broker server.properties listening on 9092 broker.id=0 listeners=PLAINTEXT://:9092 advertised.listeners=PLAINTEXT://localhost:9092 listener.security.protocol.map=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT # Set the IBP inter.broker.protocol.version=3.5 # Enable the migration zookeeper.metadata.migration.enable=true # ZooKeeper client configuration zookeeper.connect=localhost:2181 # KRaft controller quorum configuration controller.quorum.voters=3000@localhost:9093 controller.listener.names=CONTROLLER
注意:使用必要的配置重新启动最终的 ZK broker 后,迁移将自动开始。迁移完成后,可以在活动控制器上观察到 INFO 级别日志:
Completed migration of metadata from Zookeeper to KRaft
将 broker 迁移到 KRaft
一旦 KRaft 控制器完成元数据迁移,代理仍将以 ZK 模式运行。虽然 KRaft 控制器处于迁移模式,它将继续向 ZK 模式代理发送控制器 RPC。这包括 RPC,如 UpdateMetadata 和 LeaderAndIsr。
要将 broker 迁移到 KRaft,只需将它们重新配置为 KRaft brokers 并重新启动即可。使用上述
broker 配置为例,我们将 替换为 并添加 。代理在重新启动时保持相同的 Broker/Node ID 非常重要。
此时应删除 zookeeper 配置。broker.id
node.id
process.roles=broker
# Sample KRaft broker server.properties listening on 9092 process.roles=broker node.id=0 listeners=PLAINTEXT://:9092 advertised.listeners=PLAINTEXT://localhost:9092 listener.security.protocol.map=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT # Don't set the IBP, KRaft uses "metadata.version" feature flag # inter.broker.protocol.version=3.5 # Remove the migration enabled flag # zookeeper.metadata.migration.enable=true # Remove ZooKeeper client configuration # zookeeper.connect=localhost:2181 # Keep the KRaft controller quorum configuration controller.quorum.voters=3000@localhost:9093 controller.listener.names=CONTROLLER
每个代理都使用 KRaft 配置重新启动,直到整个集群在 KRaft 模式下运行。
完成迁移
一旦所有 broker 都以 KRaft 模式重新启动,完成迁移的最后一步是执行 KRaft 控制器退出迁移模式。这是通过删除 “zookeeper.metadata.migration.enable” 来完成的 属性,然后一次重新启动一个。
# Sample KRaft cluster controller.properties listening on 9093 process.roles=controller node.id=3000 controller.quorum.voters=3000@localhost:9093 controller.listener.names=CONTROLLER listeners=CONTROLLER://:9093 # Disable the migration # zookeeper.metadata.migration.enable=true # Remove ZooKeeper client configuration # zookeeper.connect=localhost:2181 # Other configs ...
6.11 分层存储
分层存储概述
Kafka 数据主要使用尾部读取以流式方式使用。尾部读取利用作系统的页面缓存而不是磁盘读取来提供数据。 较旧的数据通常从磁盘中读取,用于回填或故障恢复,并且频率不高。
在分层存储方法中,Kafka 集群配置了两层存储 - 本地和远程。 本地层与当前 Kafka 相同,后者使用 Kafka 代理上的本地磁盘来存储日志分段。 新的远程层使用外部存储系统(如 HDFS 或 S3)来存储已完成的日志段。 请查看 KIP-405 了解更多信息。
注意:分层存储被视为一项早期访问功能,不建议在生产环境中使用
配置
代理配置
默认情况下,Kafka Server 不会启用分层存储功能。 是用于控制是否在 Broker 中启用分层存储功能的属性。将其设置为 “true” 将启用此功能。remote.log.storage.system.enable
RemoteStorageManager
是提供远程日志段和索引生命周期的接口。Kafka 服务器
不提供 RemoteStorageManager 的现成实现。配置 并指定 RemoteStorageManager 的实现。remote.log.storage.manager.class.name
remote.log.storage.manager.class.path
RemoteLogMetadataManager
是一个接口,用于提供具有强一致性语义的远程日志分段元数据的生命周期。
默认情况下,Kafka 提供将 storage 作为内部主题的实现。可以通过配置 和 来更改此实施。
当采用默认的 kafka 内部基于主题的实现时,是一个强制性属性,用于指定客户端由默认 RemoteLogMetadataManager 实现创建的侦听器。remote.log.metadata.manager.class.name
remote.log.metadata.manager.class.path
remote.log.metadata.manager.listener.name
Topic 配置
在正确配置了分层存储功能的 broker 端配置后,仍然需要设置 topic 级别的配置。 是用于确定主题是否要使用分层存储的开关。默认情况下,它设置为 false。
启用 property 后,接下来要考虑的是日志保留。
为主题启用分层存储后,还有 2 个额外的日志保留配置需要设置:remote.storage.enable
remote.storage.enable
local.retention.ms
retention.ms
local.retention.bytes
retention.bytes
local
retention.ms
retention.bytes
快速入门示例
Apache Kafka 不提供开箱即用的 RemoteStorageManager 实现。预览分层存储 功能,可以使用集成测试实现的 LocalTieredStorage,它会在本地存储中创建一个临时目录来模拟远程存储。
要采用 'LocalTieredStorage',需要在本地构建测试库
# please checkout to the specific version tag you're using before building it # ex: `git checkout 3.6.2` ./gradlew clean :storage:testJar
构建成功后,“storage/build/libs”下应该有一个“kafka-storage-x.x.x-test.jar”文件。 接下来,在 broker 端设置配置以启用分层存储功能。
# Sample Zookeeper/Kraft broker server.properties listening on PLAINTEXT://:9092 remote.log.storage.system.enable=true # Setting the listener for the clients in RemoteLogMetadataManager to talk to the brokers. remote.log.metadata.manager.listener.name=PLAINTEXT # Please provide the implementation info for remoteStorageManager. # This is the mandatory configuration for tiered storage. # Here, we use the `LocalTieredStorage` built above. remote.log.storage.manager.class.name=org.apache.kafka.server.log.remote.storage.LocalTieredStorage remote.log.storage.manager.class.path=/PATH/TO/kafka-storage-x.x.x-test.jar # These 2 prefix are default values, but customizable remote.log.storage.manager.impl.prefix=rsm.config. remote.log.metadata.manager.impl.prefix=rlmm.config. # Configure the directory used for `LocalTieredStorage` # Note, please make sure the brokers need to have access to this directory rsm.config.dir=/tmp/kafka-remote-storage # This needs to be changed if number of brokers in the cluster is more than 1 rlmm.config.remote.log.metadata.topic.replication.factor=1 # Try to speed up the log retention check interval for testing log.retention.check.interval.ms=1000
按照快速入门指南启动 Kafka 环境。 然后,创建一个使用 configs 启用了分层存储的主题:
# remote.storage.enable=true -> enables tiered storage on the topic # local.retention.ms=1000 -> The number of milliseconds to keep the local log segment before it gets deleted. Note that a local log segment is eligible for deletion only after it gets uploaded to remote. # retention.ms=3600000 -> when segments exceed this time, the segments in remote storage will be deleted # segment.bytes=1048576 -> for test only, to speed up the log segment rolling interval # file.delete.delay.ms=10000 -> for test only, to speed up the local-log segment file delete delay bin/kafka-topics.sh --create --topic tieredTopic --bootstrap-server localhost:9092 \ --config remote.storage.enable=true --config local.retention.ms=1000 --config retention.ms=3600000 \ --config segment.bytes=1048576 --config file.delete.delay.ms=1000
尝试向 'tieredTopic' 主题发送消息以滚动日志分段:
bin/kafka-producer-perf-test.sh --topic tieredTopic --num-records 1200 --record-size 1024 --throughput -1 --producer-props bootstrap.servers=localhost:9092
然后,在活动 Segment 滚动后,应将旧 Segment 移动到远程存储并被删除。 这可以通过检查上面配置的远程日志目录来验证。例如:
> ls /tmp/kafka-remote-storage/kafka-tiered-storage/tieredTopic-0-jF8s79t9SrG_PNqlwv7bAA 00000000000000000000-knnxbs3FSRyKdPcSAOQC-w.index 00000000000000000000-knnxbs3FSRyKdPcSAOQC-w.snapshot 00000000000000000000-knnxbs3FSRyKdPcSAOQC-w.leader_epoch_checkpoint 00000000000000000000-knnxbs3FSRyKdPcSAOQC-w.timeindex 00000000000000000000-knnxbs3FSRyKdPcSAOQC-w.log
最后,我们可以尝试从开头使用一些数据并打印偏移量编号,以确保它能够成功从远程存储中获取偏移量 0。
bin/kafka-console-consumer.sh --topic tieredTopic --from-beginning --max-messages 1 --bootstrap-server localhost:9092 --property print.offset=true
请注意,如果要在集群级别禁用分层存储,则应显式删除已启用分层存储的主题。 尝试在集群级别禁用分层存储而不删除使用分层存储的主题将导致启动期间出现异常。
bin/kafka-topics.sh --delete --topic tieredTopic --bootstrap-server localhost:9092
删除主题后,您可以安全地在代理配置中进行设置。remote.log.storage.system.enable=false
局限性
虽然 Tiered Storage 的早期访问版本提供了试用此新功能的机会,但请务必了解以下限制:
- 不支持具有多个日志目录的集群(即 JBOD 功能)
- 不支持压缩的主题
- 无法在主题级别禁用分层存储
- 在代理级别禁用分层存储之前,需要删除已启用分层存储的主题
- 仅版本 3.0 及更高版本的客户端支持与分层存储功能相关的管理员作
有关更多信息,请查看 Tiered Storage Early Access 发行说明。
7. 安全性
8. Kafka 连接
9. kafka流
Kafka Streams 是一个客户端库,用于处理和分析存储在 Kafka 中的数据。它建立在重要的流处理概念之上,例如正确区分事件时间和处理时间、窗口支持、恰好一次处理语义以及简单而高效的应用程序状态管理。
Kafka Streams 的进入门槛很低:您可以在单台机器上快速编写和运行小规模的概念验证;您只需在多台计算机上运行应用程序的其他实例,即可扩展到大批量生产工作负载。Kafka Streams 利用 Kafka 的并行模型以透明方式处理同一应用程序的多个实例的负载均衡。
要了解有关 Kafka Streams 的更多信息,请阅读本节。