Kafka Streams 开发人员指南
在 Kafka Streams DSL 应用程序中命名运算符
现在,您可以在使用 Kafka Streams DSL 时为处理器命名。
在 PAPI 中,有 和 以及
您需要明确命名每个 ID。Processors
State Stores
在 DSL 层,有运算符。单个 DSL 运算符可以
编译为多个 AND 、 和
如果需要。但是有了 Kafka Streams
DSL,所有这些名称都是为你生成的。之间存在
生成的处理器名称 State Store Names(因此 changelog 主题名称)和 repartition
主题名称。请注意,状态存储和 changelog/repartition 主题的名称
是 “有状态的”,而处理器名称是 “无状态的”。Processors
State Stores
repartition topics
这种区别
的有状态名称与无状态名称在更新拓扑时具有重要意义。
虽然内部命名使创建
DSL 的拓扑结构要简单得多,
有几个权衡。第一个权衡是我们能做的
考虑一个可读性问题。另一个
更严重的权衡是由于
DSL 运算符和生成的 , changelog
主题和重新分区主题。Processors
State Stores
可读性问题
通过说存在可读性权衡,我们指的是查看拓扑的描述。
当你通过该方法渲染拓扑的字符串描述时,你可以看到处理器是什么,但你没有任何上下文来用于其业务目的。
例如,请考虑以下简单拓扑:Topology#describe()
KStream<String,String> stream = builder.stream("input");
stream.filter((k,v) -> !v.equals("invalid_txn"))
.mapValues((v) -> v.substring(0,5))
.to("output");
运行 会生成以下字符串:Topology#describe()
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [input])
--> KSTREAM-FILTER-0000000001
Processor: KSTREAM-FILTER-0000000001 (stores: [])
--> KSTREAM-MAPVALUES-0000000002
<-- KSTREAM-SOURCE-0000000000
Processor: KSTREAM-MAPVALUES-0000000002 (stores: [])
--> KSTREAM-SINK-0000000003
<-- KSTREAM-FILTER-0000000001
Sink: KSTREAM-SINK-0000000003 (topic: output)
<-- KSTREAM-MAPVALUES-0000000002
从此报告中,您可以看到不同的运算符是什么,但这里更广泛的背景是什么?
例如,考虑 ,我们可以看到它是一个
filter 操作,这意味着将删除与给定谓词不匹配的记录。但是什么是
谓词的含义?此外,您还可以查看源节点和接收器节点的主题名称。
但是,如果主题没有以有意义的方式命名怎么办?然后你就只能猜测
这些主题背后的商业目的。KSTREAM-FILTER-0000000001
另请注意此处的编号:源节点的后缀表示它是拓扑中的第一个处理器。
过滤器后缀为 ,表示它是 中的第二个处理器
拓扑。在 Kafka Streams 中,现在有
两者都接受
新参数 .通过使用类 DSL,用户可以
为其拓扑中的处理器提供有意义的名称。0000000000
0000000001
KStream
KTable
Named
Named
现在让我们看看您的拓扑结构,其中所有处理器都命名为:
KStream<String,String> stream =
builder.stream("input", Consumed.as("Customer_transactions_input_topic"));
stream.filter((k,v) -> !v.equals("invalid_txn"), Named.as("filter_out_invalid_txns"))
.mapValues((v) -> v.substring(0,5), Named.as("Map_values_to_first_6_characters"))
.to("output", Produced.as("Mapped_transactions_output_topic"));
Topologies:
Sub-topology: 0
Source: Customer_transactions_input_topic (topics: [input])
--> filter_out_invalid_txns
Processor: filter_out_invalid_txns (stores: [])
--> Map_values_to_first_6_characters
<-- Customer_transactions_input_topic
Processor: Map_values_to_first_6_characters (stores: [])
--> Mapped_transactions_output_topic
<-- filter_out_invalid_txns
Sink: Mapped_transactions_output_topic (topic: output)
<-- Map_values_to_first_6_characters
现在,您可以查看拓扑描述并轻松了解每个处理器的角色
在 拓扑 中播放。但是,当您
具有在 Kafka Streams 应用程序重启之间保留的有状态运算符,
状态存储、更改日志主题和重新分区主题。
更改名称
生成的名称在拓扑中构建的位置进行编号。
名称生成策略为 。该数字是一个
全局递增的数字,表示运算符在 topology 中的顺序。
生成的数字以不同数量的“0”为前缀,以创建
字符串,该字符串的长度始终为 10 个字符。
这意味着,如果您添加/删除或移动操作顺序,则
processor shifts,这将改变处理器的名称。由于存在大多数处理器
仅在内存中,这种名称偏移对许多拓扑没有问题。但是这个名字
移位确实对具有有状态运算符或重新分区主题的拓扑有影响。
下面是具有某种状态的不同拓扑:KSTREAM|KTABLE->operator name<->number suffix<
KStream<String,String> stream = builder.stream("input");
stream.groupByKey()
.count()
.toStream()
.to("output");
此拓扑描述产生以下内容:Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [input])
--> KSTREAM-AGGREGATE-0000000002
Processor: KSTREAM-AGGREGATE-0000000002 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000001])
--> KTABLE-TOSTREAM-0000000003
<-- KSTREAM-SOURCE-0000000000
Processor: KTABLE-TOSTREAM-0000000003 (stores: [])
--> KSTREAM-SINK-0000000004
<-- KSTREAM-AGGREGATE-0000000002
Sink: KSTREAM-SINK-0000000004 (topic: output)
<-- KTABLE-TOSTREAM-0000000003
从上面的拓扑描述中可以看到,状态存储名为 .以下是当您
添加一个过滤器以将某些记录排除在聚合之外:KSTREAM-AGGREGATE-STATE-STORE-0000000002
KStream<String,String> stream = builder.stream("input");
stream.filter((k,v)-> v !=null && v.length() >= 6 )
.groupByKey()
.count()
.toStream()
.to("output");
以及相应的拓扑:Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [input])
--> KSTREAM-FILTER-0000000001
Processor: KSTREAM-FILTER-0000000001 (stores: [])
--> KSTREAM-AGGREGATE-0000000003
<-- KSTREAM-SOURCE-0000000000
Processor: KSTREAM-AGGREGATE-0000000003 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000002])
--> KTABLE-TOSTREAM-0000000004
<-- KSTREAM-FILTER-0000000001
Processor: KTABLE-TOSTREAM-0000000004 (stores: [])
--> KSTREAM-SINK-0000000005
<-- KSTREAM-AGGREGATE-0000000003
Sink: KSTREAM-SINK-0000000005 (topic: output)
<-- KTABLE-TOSTREAM-0000000004
请注意,由于您在操作之前添加了操作,因此状态
store (和 changelog 主题) 名称已更改。此名称更改意味着您不能
对更新的拓扑执行滚动重新部署。此外,您必须使用 Streams Reset Tool 重新计算聚合,因为更改日志主题在启动时已更改,并且
New changelog topic contains no data.
幸运的是,有一个简单的解决方案可以解决这种情况。给
state 存储用户定义的名称,而不是依赖生成的名称,
因此,您不必担心拓扑更改会更改 State Store 的名称。
您已经能够使用 、 、 和 类以及
使用 命名 State Store 和 ChangeLog 主题。
但值得重申的是,再次命名这些 DSL 拓扑操作的重要性。
下面是你的 DSL 代码现在为状态存储指定名称后的样子:count
Joined
StreamJoined
Grouped
Materialized
KStream<String,String> stream = builder.stream("input");
stream.filter((k, v) -> v != null && v.length() >= 6)
.groupByKey()
.count(Materialized.as("Purchase_count_store"))
.toStream()
.to("output");
这是拓扑Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [input])
--> KSTREAM-FILTER-0000000001
Processor: KSTREAM-FILTER-0000000001 (stores: [])
--> KSTREAM-AGGREGATE-0000000002
<-- KSTREAM-SOURCE-0000000000
Processor: KSTREAM-AGGREGATE-0000000002 (stores: [Purchase_count_store])
--> KTABLE-TOSTREAM-0000000003
<-- KSTREAM-FILTER-0000000001
Processor: KTABLE-TOSTREAM-0000000003 (stores: [])
--> KSTREAM-SINK-0000000004
<-- KSTREAM-AGGREGATE-0000000002
Sink: KSTREAM-SINK-0000000004 (topic: output)
<-- KTABLE-TOSTREAM-0000000003
现在,即使您在 state store 之前添加了处理器,store 名称及其更改日志 主题名称不会更改。这使您的拓扑更加健壮,并且能够灵活地应对 添加或删除处理器。
结论
在使用 DSL 时命名处理节点是一种很好的做法,甚至 当您拥有“有状态”处理器时,执行此操作更为重要 您的应用程序(如 repartition) topics 和 state stores(以及随附的 changelog 主题)。以下是命名 DSL 拓扑时要记住的几点:
- 如果您有一个现有的拓扑,并且尚未将 State Stores(和更改日志主题)和 repartition 主题,我们建议您 这样做。但这将是一个拓扑破坏性更改,因此您需要关闭所有 应用程序实例,进行更改,然后运行 Streams Reset Tool。 虽然一开始这可能会带来不便,但值得付出努力保护您的应用程序免受 由于拓扑更改而导致的意外错误。
- 如果您有新的拓扑,请确保命名拓扑的持久部分: 状态存储 (ChangeLog Topics) 和 Repartition 主题。这样,当您部署 应用程序,您可以免受拓扑更改的影响,否则这些更改会破坏您的 Kafka Streams 应用程序。 如果您一开始不想为无状态处理器添加名称,那很好 以后总是回去添加名称。
操作 | 命名类 |
---|---|
聚合重新分区主题 | 分组 |
KStream-KStream Join 重新分区主题 | 流已加入 |
KStream-KTable Join 重新分区主题 | 加入 |
KStream-KStream 加入状态存储 | 流已加入 |
状态存储(用于聚合和 KTable-KTable 联接) | 物化 |
Stream/Table 无状态操作 | 叫 |