应用程序重置工具
您可以使用应用程序重置工具重置应用程序并强制它从头开始重新处理其数据。 这对于开发和测试或修复 bug 非常有用。
应用程序重置工具处理 Kafka Streams 用户主题(输入 output 和 intermediate 主题)和 internal 主题 重置应用程序时。
以下是应用程序重置工具对每种主题类型的操作:
- 输入主题:将偏移量重置到指定位置(默认为主题的开头)。
- 中级主题:跳到主题的末尾,即将应用程序为所有分区提交的 Consumer Offsets 设置为每个分区的 (for Consumer Group)。
logSize
application.id
- Internal topics:删除内部主题(这会自动删除任何提交的偏移量)。
应用程序重置工具不会:
- 重置应用程序的输出主题。如果下游使用任何输出(或中间)主题 应用程序,则您有责任在重置 upstream 应用程序。
- 重置应用程序实例的本地环境。您有责任删除本地 state 在运行 Application 实例的任何计算机上。请参阅步骤 2:重置应用程序实例的本地环境中的说明,了解如何执行此操作。
- 先决条件
必须停止应用程序的所有实例。否则,应用程序可能会进入无效状态、崩溃或产生不正确的结果。您可以使用 验证具有 ID 的使用者组是否仍处于活动状态。 如果配置了长会话超时,则活动成员可能需要更长的时间才能在 broker 上过期,从而阻止重置作业完成。使用该选项可以立即删除那些剩余的成员。确保在指定此选项时关闭所有流应用程序,以避免意外的重新平衡。
application.id
bin/kafka-consumer-groups
--force
请谨慎使用此工具并仔细检查其参数:如果您提供的参数值错误(例如,拼写错误)或指定参数不一致(例如,为应用程序指定了错误的输入主题),则此工具可能会使应用程序的状态失效,甚至影响其他应用程序、使用者组或您的 Kafka 主题。
application.id
在运行应用程序重置工具之前,您应该手动删除并重新创建任何中间主题。这将释放 Kafka 代理中的磁盘空间。
您应该在运行应用程序重置工具之前删除并重新创建中间主题,除非满足以下条件:
- 您有应用程序的中间主题的外部下游使用者。
- 您所处的开发环境中不需要手动删除和重新创建中间主题。
第 1 步:运行应用程序重置工具
从命令行调用应用程序重置工具
警告!此工具可对应用程序进行不可逆的更改。强烈建议您运行一次,以便在进行更改之前预览更改。--dry-run
$ bin/kafka-streams-application-reset
该工具接受以下参数:
Option (* = required) Description
--------------------- -----------
* --application-id <String: id> The Kafka Streams application ID
(application.id).
--bootstrap-server <String: server to REQUIRED unless --bootstrap-servers
connect to> (deprecated) is specified. The server
(s) to connect to. The broker list
string in the form HOST1:PORT1,HOST2:
PORT2.
--bootstrap-servers <String: urls> DEPRECATED: Comma-separated list of
broker urls with format: HOST1:PORT1,
HOST2:PORT2 (default: localhost:9092)
--by-duration <String: urls> Reset offsets to offset by duration from
current timestamp. Format: 'PnDTnHnMnS'
--config-file <String: file name> Property file containing configs to be
passed to admin clients and embedded
consumer.
--dry-run Display the actions that would be
performed without executing the reset
commands.
--from-file <String: urls> Reset offsets to values defined in CSV
file.
--input-topics <String: list> Comma-separated list of user input
topics. For these topics, the tool will
reset the offset to the earliest
available offset.
--intermediate-topics <String: list> Comma-separated list of intermediate user
topics (topics used in the through()
method). For these topics, the tool
will skip to the end.
--internal-topics <String: list> Comma-separated list of internal topics
to delete. Must be a subset of the
internal topics marked for deletion by
the default behaviour (do a dry-run without
this option to view these topics).
--shift-by <Long: number-of-offsets> Reset offsets shifting current offset by
'n', where 'n' can be positive or
negative
--to-datetime <String> Reset offsets to offset from datetime.
Format: 'YYYY-MM-DDTHH:mm:SS.sss'
--to-earliest Reset offsets to earliest offset.
--to-latest Reset offsets to latest offset.
--to-offset <Long> Reset offsets to a specific offset.
--force Force removing members of the consumer group
(intended to remove left-over members if
long session timeout was configured).
将以下内容视为 reset-offset 方案:input-topics
- 按持续时间
- from-file
- 移位
- 至今
- to-earliest (最早)
- 至最新版本
- to-offset (目标偏移)
只能定义其中一种情况。否则,将默认执行to-earliest
所有其他参数都可以根据需要组合。例如,如果要从
空内部状态,但不重新处理以前的数据,只需省略参数 和 。--input-topics
--intermediate-topics
第 2 步:重置应用程序实例的本地环境
要完成应用程序重置,您必须在包含 Application 实例。您必须在同一台计算机上重新启动应用程序实例之前执行此操作。您可以 使用以下任一方法:
- 应用程序代码中的 API 方法。
KafkaStreams#cleanUp()
- 手动删除相应的本地状态目录(默认位置:)。有关更多信息,请参阅 Streams javadocs。
/tmp/kafka-streams/<application.id>