Kafka Streams 开发人员指南

在 Kafka Streams DSL 应用程序中命名运算符

现在,您可以在使用 Kafka Streams DSL 时为处理器命名。 在 PAPI 中,有 和 以及 您需要明确命名每个 ID。ProcessorsState Stores

在 DSL 层,有运算符。单个 DSL 运算符可以 编译为多个 AND 、 和 如果需要。但是有了 Kafka Streams DSL,所有这些名称都是为你生成的。之间存在 生成的处理器名称 State Store Names(因此 changelog 主题名称)和 repartition 主题名称。请注意,状态存储和 changelog/repartition 主题的名称 是 “有状态的”,而处理器名称是 “无状态的”。ProcessorsState Storesrepartition topics

这种区别 的有状态名称与无状态名称在更新拓扑时具有重要意义。 虽然内部命名使创建 DSL 的拓扑结构要简单得多, 有几个权衡。第一个权衡是我们能做的 考虑一个可读性问题。另一个 更严重的权衡是由于 DSL 运算符和生成的 , changelog 主题和重新分区主题。ProcessorsState Stores

可读性问题

通过说存在可读性权衡,我们指的是查看拓扑的描述。 当你通过该方法渲染拓扑的字符串描述时,你可以看到处理器是什么,但你没有任何上下文来用于其业务目的。 例如,请考虑以下简单拓扑:Topology#describe()

KStream<String,String> stream = builder.stream("input");
stream.filter((k,v) -> !v.equals("invalid_txn"))
	  .mapValues((v) -> v.substring(0,5))
	  .to("output");

运行 会生成以下字符串:Topology#describe()

Topologies:
   Sub-topology: 0
	Source: KSTREAM-SOURCE-0000000000 (topics: [input])
	  --> KSTREAM-FILTER-0000000001
	Processor: KSTREAM-FILTER-0000000001 (stores: [])
	  --> KSTREAM-MAPVALUES-0000000002
	  <-- KSTREAM-SOURCE-0000000000
	Processor: KSTREAM-MAPVALUES-0000000002 (stores: [])
	  --> KSTREAM-SINK-0000000003
	  <-- KSTREAM-FILTER-0000000001
	Sink: KSTREAM-SINK-0000000003 (topic: output)
	  <-- KSTREAM-MAPVALUES-0000000002
从此报告中,您可以看到不同的运算符是什么,但这里更广泛的背景是什么? 例如,考虑 ,我们可以看到它是一个 filter 操作,这意味着将删除与给定谓词不匹配的记录。但是什么是 谓词的含义?此外,您还可以查看源节点和接收器节点的主题名称。 但是,如果主题没有以有意义的方式命名怎么办?然后你就只能猜测 这些主题背后的商业目的。KSTREAM-FILTER-0000000001

另请注意此处的编号:源节点的后缀表示它是拓扑中的第一个处理器。 过滤器后缀为 ,表示它是 中的第二个处理器 拓扑。在 Kafka Streams 中,现在有 两者都接受 新参数 .通过使用类 DSL,用户可以 为其拓扑中的处理器提供有意义的名称。00000000000000000001KStreamKTableNamedNamed

现在让我们看看您的拓扑结构,其中所有处理器都命名为:

KStream<String,String> stream =
builder.stream("input", Consumed.as("Customer_transactions_input_topic"));
stream.filter((k,v) -> !v.equals("invalid_txn"), Named.as("filter_out_invalid_txns"))
	  .mapValues((v) -> v.substring(0,5), Named.as("Map_values_to_first_6_characters"))
	  .to("output", Produced.as("Mapped_transactions_output_topic"));
Topologies:
   Sub-topology: 0
	Source: Customer_transactions_input_topic (topics: [input])
	  --> filter_out_invalid_txns
	Processor: filter_out_invalid_txns (stores: [])
	  --> Map_values_to_first_6_characters
	  <-- Customer_transactions_input_topic
	Processor: Map_values_to_first_6_characters (stores: [])
	  --> Mapped_transactions_output_topic
	  <-- filter_out_invalid_txns
	Sink: Mapped_transactions_output_topic (topic: output)
	  <-- Map_values_to_first_6_characters
现在,您可以查看拓扑描述并轻松了解每个处理器的角色 在 拓扑 中播放。但是,当您 具有在 Kafka Streams 应用程序重启之间保留的有状态运算符, 状态存储、更改日志主题和重新分区主题。

更改名称

生成的名称在拓扑中构建的位置进行编号。 名称生成策略为 。该数字是一个 全局递增的数字,表示运算符在 topology 中的顺序。 生成的数字以不同数量的“0”为前缀,以创建 字符串,该字符串的长度始终为 10 个字符。 这意味着,如果您添加/删除或移动操作顺序,则 processor shifts,这将改变处理器的名称。由于存在大多数处理器 仅在内存中,这种名称偏移对许多拓扑没有问题。但是这个名字 移位确实对具有有状态运算符或重新分区主题的拓扑有影响。 下面是具有某种状态的不同拓扑:KSTREAM|KTABLE->operator name<->number suffix<

KStream<String,String> stream = builder.stream("input");
 stream.groupByKey()
	   .count()
	   .toStream()
	   .to("output");
此拓扑描述产生以下内容:
Topologies:
   Sub-topology: 0
	Source: KSTREAM-SOURCE-0000000000 (topics: [input])
	 --> KSTREAM-AGGREGATE-0000000002
	Processor: KSTREAM-AGGREGATE-0000000002 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000001])
	 --> KTABLE-TOSTREAM-0000000003
	 <-- KSTREAM-SOURCE-0000000000
	Processor: KTABLE-TOSTREAM-0000000003 (stores: [])
	 --> KSTREAM-SINK-0000000004
	 <-- KSTREAM-AGGREGATE-0000000002
	Sink: KSTREAM-SINK-0000000004 (topic: output)
	 <-- KTABLE-TOSTREAM-0000000003

从上面的拓扑描述中可以看到,状态存储名为 .以下是当您 添加一个过滤器以将某些记录排除在聚合之外:KSTREAM-AGGREGATE-STATE-STORE-0000000002

KStream<String,String> stream = builder.stream("input");
stream.filter((k,v)-> v !=null && v.length() >= 6 )
      .groupByKey()
      .count()
      .toStream()
      .to("output");
以及相应的拓扑:
Topologies:
	Sub-topology: 0
	 Source: KSTREAM-SOURCE-0000000000 (topics: [input])
	  --> KSTREAM-FILTER-0000000001
	 Processor: KSTREAM-FILTER-0000000001 (stores: [])
	   --> KSTREAM-AGGREGATE-0000000003
	   <-- KSTREAM-SOURCE-0000000000
	 Processor: KSTREAM-AGGREGATE-0000000003 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000002])
	   --> KTABLE-TOSTREAM-0000000004
	   <-- KSTREAM-FILTER-0000000001
	 Processor: KTABLE-TOSTREAM-0000000004 (stores: [])
	   --> KSTREAM-SINK-0000000005
	   <-- KSTREAM-AGGREGATE-0000000003
	  Sink: KSTREAM-SINK-0000000005 (topic: output)
	   <-- KTABLE-TOSTREAM-0000000004

请注意,由于您在操作之前添加了操作,因此状态 store (和 changelog 主题) 名称已更改。此名称更改意味着您不能 对更新的拓扑执行滚动重新部署。此外,您必须使用 Streams Reset Tool 重新计算聚合,因为更改日志主题在启动时已更改,并且 New changelog topic contains no data. 幸运的是,有一个简单的解决方案可以解决这种情况。给 state 存储用户定义的名称,而不是依赖生成的名称, 因此,您不必担心拓扑更改会更改 State Store 的名称。 您已经能够使用 、 、 和 类以及 使用 命名 State Store 和 ChangeLog 主题。 但值得重申的是,再次命名这些 DSL 拓扑操作的重要性。 下面是你的 DSL 代码现在为状态存储指定名称后的样子:countJoinedStreamJoinedGroupedMaterialized

KStream<String,String> stream = builder.stream("input");
stream.filter((k, v) -> v != null && v.length() >= 6)
	  .groupByKey()
	  .count(Materialized.as("Purchase_count_store"))
	  .toStream()
	  .to("output");
这是拓扑
Topologies:
   Sub-topology: 0
	Source: KSTREAM-SOURCE-0000000000 (topics: [input])
	  --> KSTREAM-FILTER-0000000001
	Processor: KSTREAM-FILTER-0000000001 (stores: [])
	  --> KSTREAM-AGGREGATE-0000000002
	  <-- KSTREAM-SOURCE-0000000000
	Processor: KSTREAM-AGGREGATE-0000000002 (stores: [Purchase_count_store])
	  --> KTABLE-TOSTREAM-0000000003
	  <-- KSTREAM-FILTER-0000000001
	Processor: KTABLE-TOSTREAM-0000000003 (stores: [])
	  --> KSTREAM-SINK-0000000004
	  <-- KSTREAM-AGGREGATE-0000000002
	Sink: KSTREAM-SINK-0000000004 (topic: output)
	  <-- KTABLE-TOSTREAM-0000000003

现在,即使您在 state store 之前添加了处理器,store 名称及其更改日志 主题名称不会更改。这使您的拓扑更加健壮,并且能够灵活地应对 添加或删除处理器。

结论

在使用 DSL 时命名处理节点是一种很好的做法,甚至 当您拥有“有状态”处理器时,执行此操作更为重要 您的应用程序(如 repartition) topics 和 state stores(以及随附的 changelog 主题)。

以下是命名 DSL 拓扑时要记住的几点:

  1. 如果您有一个现有的拓扑,并且尚未将 State Stores(和更改日志主题)和 repartition 主题,我们建议您 这样做。但这将是一个拓扑破坏性更改,因此您需要关闭所有 应用程序实例,进行更改,然后运行 Streams Reset Tool。 虽然一开始这可能会带来不便,但值得付出努力保护您的应用程序免受 由于拓扑更改而导致的意外错误。
  2. 如果您有新的拓扑,请确保命名拓扑的持久部分: 状态存储 (ChangeLog Topics) 和 Repartition 主题。这样,当您部署 应用程序,您可以免受拓扑更改的影响,否则这些更改会破坏您的 Kafka Streams 应用程序。 如果您一开始不想为无状态处理器添加名称,那很好 以后总是回去添加名称。
以下是命名 的关键部分的快速参考 Kafka Streams 应用程序,以防止拓扑名称更改破坏应用程序:
操作命名类
聚合重新分区主题分组
KStream-KStream Join 重新分区主题流已加入
KStream-KTable Join 重新分区主题加入
KStream-KStream 加入状态存储流已加入
状态存储(用于聚合和 KTable-KTable 联接)物化
Stream/Table 无状态操作