流 DSL

Kafka Streams DSL(域特定语言)构建在 Streams Processor API 之上。推荐用于 大多数用户,尤其是初学者。大多数数据处理操作都可以用几行 DSL 代码来表示。

概述

处理器 API 相比,只有 DSL 支持:

  • KStreamKTableGlobalKTable 形式的流和表的内置抽象。拥有对流和表的一流支持至关重要 因为,在实践中,大多数用例不仅需要流或数据库/表,还需要两者的组合。 例如,如果您的使用案例是创建实时更新的客户 360 度视图,则您的 应用程序将要做的是将客户相关事件的许多输入转换为一个输出表,该包含不断更新的客户 360 度视图。
  • 具有无状态转换的声明式函数式编程风格(例如 和 ) 以及有状态转换,例如聚合(例如 和 )、联接(例如 )和窗口化(例如 会话窗口)。mapfiltercountreduceleftJoin

使用 DSL,您可以定义处理器拓扑(即逻辑 处理计划)。实现此目的的步骤是:

  1. 指定从 Kafka 主题读取的一个或多个输入流
  2. 对这些流执行 Compose 转换
  3. 生成的输出流写回 Kafka 主题,或通过交互式查询(例如,通过 REST API)将应用程序的处理结果直接公开给其他应用程序。

应用程序运行后,定义的处理器拓扑将持续执行(即,将处理计划放入 操作)。下面提供了使用 DSL 编写流处理应用程序的分步指南。

有关可用 API 功能的完整列表,另请参阅 Streams API 文档。

KStream

只有 Kafka Streams DSL 具有 .KStream

KStream记录流的抽象,其中每条数据记录都表示无界数据集中的一个自包含数据。使用表类比,记录流中的数据记录始终被解释为 “INSERT” -- 想想:向仅追加的账本添加更多条目 -- 因为没有记录用相同的键替换现有行。示例包括信用卡交易、页面查看事件或服务器日志条目。

为了说明这一点,我们假设以下两条数据记录被发送到流中:

(“爱丽丝”, 1) --> (“爱丽丝”, 3)

如果您的流处理应用程序要对每个用户的值求和,它将返回 for 。为什么?因为第二个数据记录不会被视为前一个记录的更新。将 KStream 的此行为与以下内容进行比较, 这将返回 。4aliceKTable3alice

KTable

只有 Kafka Streams DSL 具有 .KTable

KTablechangelog 流的抽象,其中每条数据记录代表一个更新。更准确地说,数据记录中的值被解释为同一记录键的最后一个值的 “UPDATE” (如果有)(如果相应的键尚不存在,则更新将被视为 INSERT)。使用表类比,changelog 流中的数据记录被解释为 UPSERT 又名 INSERT/UPDATE,因为具有相同键的任何现有行都将被覆盖。此外,值以特殊方式解释:具有值的记录表示记录键的 “DELETE” 或逻辑删除。nullnull

为了说明这一点,我们假设以下两条数据记录被发送到流中:

(“爱丽丝”, 1) --> (“爱丽丝”, 3)

如果您的流处理应用程序要对每个用户的值求和,它将返回 for 。为什么?因为第二条数据记录将被视为前一条记录的更新。3alice

Kafka 对数压缩的影响:另一种思考 KStream 和 KTable 的方式如下:如果你要将 KTable 存储到 Kafka 主题中,你可能希望启用 Kafka 的日志压缩功能,例如为了节省存储空间。

但是,在 KStream 的情况下启用日志压缩并不安全,因为一旦日志压缩开始清除相同键的旧数据记录,它就会破坏数据的语义。再次使用插图示例,您会突然得到 for 而不是 a,因为日志压缩会删除数据记录。因此,日志压缩对于 KTable(更改日志流)来说是完全安全的,但对于 KStream(记录流)来说是一个错误。3alice4("alice", 1)

我们已经在 streams and tables 部分看到了一个 changelog 流的示例。另一个示例是关系数据库的更改日志中的更改数据捕获 (CDC) 记录,表示插入、更新或删除了数据库表中的哪一行。

KTable 还提供了按键查找数据记录的当前值的能力。此表查找功能可通过 join 操作(另请参阅 Developer Guide 中的 Join)以及 Interactive Queries 获得。

GlobalKTable

只有 Kafka Streams DSL 具有 GlobalKTable 的概念。

KTable 一样,GlobalKTablechangelog 流的抽象,其中每个数据记录代表一个更新。

GlobalKTable 与 KTable 的不同之处在于它们所填充的数据,即底层 Kafka 主题中的哪些数据被读取到相应的表中。稍微简化一下,假设您有一个包含 5 个分区的输入主题。在您的应用程序中,您希望将此主题读入表中。此外,您希望在 5 个应用程序实例上运行应用程序,以实现最大并行性

  • 如果你将输入主题读入 KTable,那么每个应用程序实例的 “本地” KTable 实例将只填充来自该主题的 5 个分区中的 1 个分区的数据。
  • 如果将输入主题读入 GlobalKTable,则每个应用程序实例的本地 GlobalKTable 实例将填充来自该主题的所有分区的数据。

GlobalKTable 提供了按键查找数据记录的当前值的能力。此表查找功能可通过 获得。 请注意,与 KTable 相比,GlobalKTable 没有时间概念。join operations

全局表的好处:

  • 更方便和/或更高效的连接:值得注意的是,全局表允许您执行星型连接,它们支持“外键”查找(即,您不仅可以通过记录键查找表中的数据,还可以按记录值中的数据查找表中的数据),并且在链接多个连接时效率更高。此外,当针对全局表进行联接时,输入数据不需要进行共分区
  • 可用于将信息“广播”到应用程序的所有正在运行的实例。

全局表的缺点:

  • 与(分区的)KTable 相比,本地存储消耗增加,因为跟踪了整个主题。
  • 与(分区的)KTable 相比,网络和 Kafka 代理负载增加,因为整个主题都是读取的。

从 Kafka 创建源流

您可以轻松地将 Kafka 主题中的数据读取到您的应用程序中。支持以下操作。

从 Kafka 读取 描述

  • 输入 → KStream 的主题

从指定的 Kafka 输入主题创建 KStream 并解释数据 作为记录流。 A 表示分区的记录流。(详情)KStream

对于 KStream,每个应用程序实例的本地 KStream 实例将 仅使用来自 input topic 的分区子集的数据填充。总的来说,跨越 读取和处理所有应用程序实例、所有输入主题分区。

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;

StreamsBuilder builder = new StreamsBuilder();

KStream<String, Long> wordCounts = builder.stream(
    "word-counts-input-topic", /* input topic */
    Consumed.with(
      Serdes.String(), /* key serde */
      Serdes.Long()   /* value serde */
    );

如果未显式指定 Serdes,则使用配置中的默认 Serdes。

如果 Kafka 输入中记录的键或值类型,则必须显式指定 Serdes 主题与配置的默认 Serdes 不匹配。有关配置默认 Serdes 的信息,请参阅 Serdes 和实现您自己的自定义 Serdes 请参阅 数据类型和序列化

存在几种变体。例如,您可以为要从中读取的输入主题指定正则表达式模式(请注意,所有匹配的主题都将属于同一输入主题组,如果以这种方式订阅,则不会针对不同的主题并行化工作)。stream

桌子

  • → KTable 的 input topic

将指定的 Kafka 输入主题读入 KTable。主题是 解释为 changelog 流,其中具有相同键的记录被解释为 UPSERT aka INSERT/UPDATE (当记录值不是 )或 DELETE (当值为 时) 时。(详情)nullnull

对于 KTable,每个应用程序实例的本地 KTable 实例将 仅使用来自 input topic 的分区子集的数据填充。总的来说,跨越 读取和处理所有应用程序实例、所有输入主题分区。

您必须为表提供名称(更准确地说,为支持表的内部状态存储提供名称)。这是 支持对 table 的交互式查询。当 name 时,表将不可查询,并且将为状态存储提供内部名称。

如果未显式指定 Serdes,则使用配置中的默认 Serdes。

如果 Kafka 输入中记录的键或值类型,则必须显式指定 Serdes 主题与配置的默认 Serdes 不匹配。有关配置默认 Serdes 的信息,请参阅 Serdes 和实现您自己的自定义 Serdes 请参阅 数据类型和序列化

存在多个变体,例如,指定在 从 input 主题中读取。tableauto.offset.reset

全局表

  • 输入主题 → GlobalKTable

将指定的 Kafka 输入主题读取到 GlobalKTable 中。主题是 解释为 changelog 流,其中具有相同键的记录被解释为 UPSERT aka INSERT/UPDATE (当记录值不是 )或 DELETE (当值为 时) 时。(详情)nullnull

对于 GlobalKTable,每个应用程序实例的本地 GlobalKTable 实例将 填充来自 input topic 的所有分区的数据。

您必须为表提供名称(更准确地说,为支持表的内部状态存储提供名称)。这是 支持对 table 的交互式查询。当 name 时,表将不可查询,并且将为状态存储提供内部名称。

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.GlobalKTable;

StreamsBuilder builder = new StreamsBuilder();

GlobalKTable<String, Long> wordCounts = builder.globalTable(
    "word-counts-input-topic",
    Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as(
      "word-counts-global-store" /* table/store name */)
      .withKeySerde(Serdes.String()) /* key serde */
      .withValueSerde(Serdes.Long()) /* value serde */
    );

如果 Kafka 输入中记录的键或值类型,则必须显式指定 Serdes 主题与配置的默认 Serdes 不匹配。有关配置默认 Serdes 的信息,请参阅 Serdes 和实现您自己的自定义 Serdes 请参阅 数据类型和序列化

的几个变体存在,例如指定显式 Serdes。globalTable

转换流

KStream 和 KTable 接口支持各种转换操作。 这些操作中的每一个都可以转换为一个或多个连接的处理器,以进入底层处理器拓扑。 由于 KStream 和 KTable 是强类型的,因此所有这些转换操作都定义为 泛型函数,用户可以在其中指定输入和输出数据类型。

某些 KStream 转换可能会生成一个或多个 KStream 对象,例如: - 在一个 KStream 上将生成另一个 KStream - 在 KStream 上可以生成多个 KStreamfiltermapsplit

其他一些可能会生成 KTable 对象,例如 KStream 的聚合也会生成 KTable。这允许 Kafka Streams 在计算值之后到达无序记录时持续更新计算值 已生成给下游转换运算符。

所有 KTable 转换操作都只能生成另一个 KTable。但是,Kafka Streams DSL 确实提供了一个特殊的功能 将 KTable 表示形式转换为 KStream。所有这些转换方法可以链接在一起以组合 复杂的处理器拓扑。

以下小节介绍了这些转换操作:

无状态转换

无状态转换不需要状态进行处理,也不需要与 流处理器。Kafka 0.11.0 及更高版本允许您实现无状态转换的结果。这允许通过交互式查询查询结果。要实现 ,以下每个无状态操作都可以使用可选参数进行扩充KTableKTablequeryableStoreName

转型 描述

分支

  • KStream → BranchedKStream

根据提供的谓词将 a 分支(或拆分)为一个或多个实例。 (详情KStreamKStream)

谓词按顺序计算。在第一个匹配项上,将记录放置到一个且只有一个输出流: 如果第 n 个谓词的计算结果为 true,则记录将放入第 n 个流。如果记录与任何谓词都不匹配,则 它将被路由到 default 分支,如果未创建 default 分支,则将其丢弃。

分支非常有用,例如,将记录路由到不同的下游主题。

KStream<String, Long> stream = ...;
Map<String, KStream<String, Long>> branches =
    stream.split(Named.as("Branch-"))
        .branch((key, value) -> key.startsWith("A"),  /* first predicate  */
             Branched.as("A"))
        .branch((key, value) -> key.startsWith("B"),  /* second predicate */
             Branched.as("B"))
        .defaultBranch(Branched.as("C"))              /* default branch */
);

// KStream branches.get("Branch-A") contains all records whose keys start with "A"
// KStream branches.get("Branch-B") contains all records whose keys start with "B"
// KStream branches.get("Branch-C") contains all other records

滤波器

  • KStream → KStream
  • KTable → KTable

计算每个元素的布尔函数,并保留函数返回 true 的元素。 (KStream 详细信息KTable 详细信息)

KStream<String, Long> stream = ...;

// A filter that selects (keeps) only positive numbers
// Java 8+ example, using lambda expressions
KStream<String, Long> onlyPositives = stream.filter((key, value) -> value > 0);

Inverse Filter

  • KStream → KStream
  • KTable → KTable

计算每个元素的布尔函数,并删除函数返回 true 的那些元素。 (KStream 详细信息KTable 详细信息)

KStream<String, Long> stream = ...;

// An inverse filter that discards any negative numbers or zero
// Java 8+ example, using lambda expressions
KStream<String, Long> onlyPositives = stream.filterNot((key, value) -> value <= 0);

平面地图

  • KStream → KStream

获取一条记录并生成零条、一条或多条记录。您可以修改记录键和值,包括 它们的类型。 (详情)

标记流以进行数据重新分区:之后应用分组或联接将导致记录重新分区。 如果可能,请改用,这不会导致数据重新分区。flatMapflatMapValues

KStream<Long, String> stream = ...;
KStream<String, Integer> transformed = stream.flatMap(
     // Here, we generate two output records for each input record.
     // We also change the key and value types.
     // Example: (345L, "Hello") -> ("HELLO", 1000), ("hello", 9000)
    (key, value) -> {
      List<KeyValue<String, Integer>> result = new LinkedList<>();
      result.add(KeyValue.pair(value.toUpperCase(), 1000));
      result.add(KeyValue.pair(value.toLowerCase(), 9000));
      return result;
    }
  );

FlatMapValues (平面贴图值)

  • KStream → KStream

获取一条记录并生成零条、一条或多条记录,同时保留原始记录的键。 您可以修改记录值和值类型。 (详情)

flatMapValues更可取,因为它不会导致数据重新分区。但是,您 无法像修改 Key 或 Key Type 那样修改 key。flatMapflatMap

// Split a sentence into words.
KStream<byte[], String> sentences = ...;
KStream<byte[], String> words = sentences.flatMapValues(value -> Arrays.asList(value.split("\\s+")));

Foreach (查找)

  • 虚空→
  • 虚空→
  • 可KTable → void

终端操作。对每条记录执行无状态操作。 (详情)

您将用于根据输入数据产生副作用(类似于 ),然后停止对输入数据的进一步处理(与 不同,它不是终端操作)。foreachpeekpeek

关于加工保证的说明:操作的任何副作用(例如写入外部系统)都不是 可由 Kafka 跟踪,这意味着他们通常不会从 Kafka 的处理保证中受益。

KStream<String, Long> stream = ...;

// Print the contents of the KStream to the local console.
// Java 8+ example, using lambda expressions
stream.foreach((key, value) -> System.out.println(key + " => " + value));

GroupByKey 组合键

  • KStream → KGroupedStream

按现有键对记录进行分组。 (详情)

分组是聚合流或表的先决条件,可确保对数据进行正确分区(“键控”)以供后续操作使用。

何时设置显式 Serdes:存在 的变体 覆盖应用程序的已配置默认 Serdes,如果结果的键和/或值类型与配置的默认值不匹配,则必须执行此操作 塞尔德斯。groupByKeyKGroupedStream

注意

分组与窗口化:一个相关的操作是窗口化,它允许您控制如何 将同一键的分组记录“子分组”到所谓的 Windows 中,以进行有状态操作,例如 窗口化聚合或 窗口联接

当且仅当流被标记为重新分区时,才导致数据重新分区。 更可取,因为它仅在已标记流时才对数据进行重新分区 进行重新分区。但是,不允许您像修改密钥或密钥类型那样修改密钥或密钥类型。groupByKeygroupBygroupByKeygroupBy

KStream<byte[], String> stream = ...;

// Group by the existing key, using the application's configured
// default serdes for keys and values.
KGroupedStream<byte[], String> groupedStream = stream.groupByKey();

// When the key and/or value types do not match the configured
// default serdes, we must explicitly specify serdes.
KGroupedStream<byte[], String> groupedStream = stream.groupByKey(
    Grouped.with(
      Serdes.ByteArray(), /* key */
      Serdes.String())     /* value */
  );

分组依据

  • KStream → KGroupedStream
  • KTable → KGroupedTable

按新键对记录进行分组,键可能具有不同的键类型。 对表进行分组时,您还可以指定新的值和值类型。 是 的简写形式。 (KStream 详细信息KTable 详细信息groupByselectKey(...).groupByKey())

分组是聚合流或表的先决条件,可确保对数据进行正确分区(“键控”)以供后续操作使用。

何时设置显式 Serdes:exists 的变体来覆盖应用程序配置的默认 Serdes ,如果结果的键和/或值类型或与生成的 配置了默认 Serdes。groupByKGroupedStreamKGroupedTable

注意

分组与窗口化:一个相关的操作是窗口化,它允许您控制如何 将同一键的分组记录“子分组”到所谓的 Windows 中,以进行有状态操作,例如 窗口化聚合或 窗口联接

Always causes data re-partitioning:始终导致数据重新分区。 如果可能,请改用 it,它将仅在需要时重新分区数据。groupBygroupByKey

KStream<byte[], String> stream = ...;
KTable<byte[], String> table = ...;

// Java 8+ examples, using lambda expressions

// Group the stream by a new key and key type
KGroupedStream<String, String> groupedStream = stream.groupBy(
    (key, value) -> value,
    Grouped.with(
      Serdes.String(), /* key (note: type was modified) */
      Serdes.String())  /* value */
  );

// Group the table by a new key and key type, and also modify the value and value type.
KGroupedTable<String, Integer> groupedTable = table.groupBy(
    (key, value) -> KeyValue.pair(value, value.length()),
    Grouped.with(
      Serdes.String(), /* key (note: type was modified) */
      Serdes.Integer()) /* value (note: type was modified) */
  );

共组

  • KGroupedStream → CogroupedKStream
  • CogroupedKStream → CogroupedKStream

Cogrouping 允许在单个操作中聚合多个 input 流。 不同的(已分组的)输入流必须具有相同的键类型,并且可能具有不同的值类型。KGroupedStream#cogroup() 使用单个输入流创建一个新的共分组流,而 CogroupedKStream#cogroup() 将一个分组流添加到现有的共分组流中。 A 可以在聚合之前进行窗口化CogroupedKStream

cogroup 不会导致重新分区,因为它具有对 Importing 流进行分组的先决条件。在创建这些组的过程中,如果流已经标记为重新分区,则它们将已经被重新分区。

KStream<byte[], String> stream = ...;
                        KStream<byte[], String> stream2 = ...;

// Group by the existing key, using the application's configured
// default serdes for keys and values.
KGroupedStream<byte[], String> groupedStream = stream.groupByKey();
KGroupedStream<byte[], String> groupedStream2 = stream2.groupByKey();
CogroupedKStream<byte[], String> cogroupedStream = groupedStream.cogroup(aggregator1).cogroup(groupedStream2, aggregator2);

KTable<byte[], String> table = cogroupedStream.aggregate(initializer);

KTable<byte[], String> table2 = cogroupedStream.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMillis(500))).aggregate(initializer);

地图

  • KStream → KStream

获取 1 条记录并生成 1 条记录。您可以修改记录键和值,包括它们的类型。 (详情)

标记流以进行数据重新分区:之后应用分组或联接将导致记录重新分区。 如果可能,请改用,这不会导致数据重新分区。mapmapValues

KStream<byte[], String> stream = ...;

// Java 8+ example, using lambda expressions
// Note how we change the key and the key type (similar to `selectKey`)
// as well as the value and the value type.
KStream<String, Integer> transformed = stream.map(
    (key, value) -> KeyValue.pair(value.toLowerCase(), value.length()));

地图(仅限值)

  • KStream → KStream
  • KTable → KTable

获取一条记录并生成一条记录,同时保留原始记录的键。 您可以修改记录值和值类型。 (KStream 详细信息KTable 详细信息)

mapValues更可取,因为它不会导致数据重新分区。但是,它没有 允许您像修改 Key 或 Key Type 一样修改 Key 或 Key Type。mapmap

KStream<byte[], String> stream = ...;

// Java 8+ example, using lambda expressions
KStream<byte[], String> uppercased = stream.mapValues(value -> value.toUpperCase());

合并

  • KStream → KStream

将两个流的记录合并为一个更大的流。 (详情)

记录之间没有排序保证 来自合并流中的不同流。不过,每个 input 流中都保留了相对顺序(即,同一 input 流中的记录是按顺序处理的)

KStream<byte[], String> stream1 = ...;

KStream<byte[], String> stream2 = ...;

KStream<byte[], String> merged = stream1.merge(stream2);

偷看

  • KStream → KStream

对每条记录执行无状态操作,并返回未更改的流。 (详情)

您将用于根据输入数据产生副作用(类似于 ),并继续处理输入数据(与 不同,后者是终端操作)。 返回输入 按原样流式传输;如果需要修改 Input 流,请改用 OR。peekforeachforeachpeekmapmapValues

peek对于日志记录或跟踪指标等使用案例或调试和故障排除非常有用。

关于加工保证的说明:操作的任何副作用(例如写入外部系统)都不是 可由 Kafka 跟踪,这意味着他们通常不会从 Kafka 的处理保证中受益。

KStream<byte[], String> stream = ...;

// Java 8+ example, using lambda expressions
KStream<byte[], String> unmodifiedStream = stream.peek(
    (key, value) -> System.out.println("key=" + key + ", value=" + value));

打印

  • 虚空→

终端操作。将记录打印到 。请参阅 Javadocs 以获取 serde 和注意事项。 (详情System.outtoString())

跟注与跟注相同print()foreach((key, value) -> System.out.println(key + ", " + value))

print主要用于调试/测试目的,它将尝试在每个记录打印时刷新。因此,如果涉及性能要求,则不应用于生产用途。

KStream<byte[], String> stream = ...;
// print to sysout
stream.print();

// print to file with a custom label
stream.print(Printed.toFile("streams.out").withLabel("streams"));

选择键

  • KStream → KStream

为每条记录分配一个新键(可能为新的键类型)。 (详情)

调用与调用 相同。selectKey(mapper)map((key, value) -> mapper(key, value), value)

标记流以进行数据重新分区:之后应用分组或联接将导致记录重新分区。selectKey

KStream<byte[], String> stream = ...;

// Derive a new record key from the record's value.  Note how the key type changes, too.
// Java 8+ example, using lambda expressions
KStream<String, String> rekeyed = stream.selectKey((key, value) -> value.split(" ")[0])

表到流

  • KTable → KStream

获取此表的 changelog 流。 (详情)

KTable<byte[], String> table = ...;

// Also, a variant of `toStream` exists that allows you
// to select a new key for the resulting stream.
KStream<byte[], String> stream = table.toStream();

流到表

  • KStream → KTable

将事件流转换为表,或者说是 changelog 流。 (详情)

KStream<byte[], String> stream = ...;

KTable<byte[], String> table = stream.toTable();

重新分区

  • KStream → KStream

手动触发具有所需分区数的流的重新分区。(详情)

repartition()与此类似,Kafka Streams 将为您管理主题。 生成的主题被视为内部主题,因此数据将像任何其他内部重新分区主题一样自动清除。 此外,您可以指定所需的分区数量,从而轻松扩展/缩减下游子拓扑。 操作总是触发流的重新分区,因此它可以与嵌入式处理器 API 方法(如 et al.)一起使用,这些方法在事先执行密钥更改操作时不会触发自动重新分区。through()repartition()transform()
KStream<byte[], String> stream = ... ;
KStream<byte[], String> repartitionedStream = stream.repartition(Repartitioned.numberOfPartitions(10));

有状态转换

有状态转换依赖于状态来处理输入和生成输出,并且需要与流处理器关联的状态存储。例如,在聚合操作中,窗口状态存储用于收集 窗。在 join 操作中,窗口状态存储用于收集到目前为止在 定义的窗口边界。

注意:无论可能指定的类型如何(通过 parameter ),都会使用以下存储类型 :materialized

请注意,状态存储是容错的。 如果发生故障,Kafka Streams 保证在恢复处理之前完全恢复所有状态存储。 有关详细信息,请参阅 Fault Tolerance

DSL 中可用的有状态转换包括:

下图显示了它们的关系:

DSL 中的状态转换。

下面是一个有状态应用程序的示例:WordCount 算法。

Java 8+ 中的 WordCount 示例,使用 lambda 表达式:

// Assume the record values represent lines of text.  For the sake of this example, you can ignore
// whatever may be stored in the record keys.
KStream<String, String> textLines = ...;

KStream<String, Long> wordCounts = textLines
    // Split each text line, by whitespace, into words.  The text lines are the record
    // values, i.e. you can ignore whatever data is in the record keys and thus invoke
    // `flatMapValues` instead of the more generic `flatMap`.
    .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
    // Group the stream by word to ensure the key of the record is the word.
    .groupBy((key, word) -> word)
    // Count the occurrences of each word (record key).
    //
    // This will change the stream type from `KGroupedStream<String, String>` to
    // `KTable<String, Long>` (word -> count).
    .count()
    // Convert the `KTable<String, Long>` into a `KStream<String, Long>`.
    .toStream();

聚合

在通过键 或 - 将记录分组并因此表示为 a 或 a 后,可以聚合它们 通过诸如 .聚合是基于键的操作,这意味着它们始终对记录进行操作 (特别是 record values)的 intent 值。 您可以对窗口化或非窗口化数据执行聚合。groupByKeygroupByKGroupedStreamKGroupedTablereduce

转型 描述

骨料

  • KGroupedStream → KTable
  • KGroupedTable → KTable

滚动聚合。按分组键或协同分组聚合(非窗口化)记录的值。 聚合是 Aggregating 的泛化,它允许,例如,聚合值具有不同的 type 而不是 input 值。 (KGroupedStream 详细信息KGroupedTable 详细信息 KGroupedTable 详细信息reduce)

聚合分组流时,必须提供初始值设定项(例如 )和“加法器” 聚合器(例如 )。聚合分组表时,您必须额外提供 “subtractor” 聚合器(想想:)。aggValue = 0aggValue + curValueaggValue - oldValue

在聚合共分组流时,在之前的调用中为每个 Importing 流提供了实际的聚合器,因此你只需要提供一个初始化器(例如,cogroup()aggValue = 0)

存在多种变体,有关详细信息,请参阅 Javadocs。aggregate

KGroupedStream<byte[], String> groupedStream = ...;
KGroupedTable<byte[], String> groupedTable = ...;

// Java 8+ examples, using lambda expressions

// Aggregating a KGroupedStream (note how the value type changes from String to Long)
KTable<byte[], Long> aggregatedStream = groupedStream.aggregate(
    () -> 0L, /* initializer */
    (aggKey, newValue, aggValue) -> aggValue + newValue.length(), /* adder */
    Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("aggregated-stream-store") /* state store name */
        .withValueSerde(Serdes.Long()); /* serde for aggregate value */

// Aggregating a KGroupedTable (note how the value type changes from String to Long)
KTable<byte[], Long> aggregatedTable = groupedTable.aggregate(
    () -> 0L, /* initializer */
    (aggKey, newValue, aggValue) -> aggValue + newValue.length(), /* adder */
    (aggKey, oldValue, aggValue) -> aggValue - oldValue.length(), /* subtractor */
    Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("aggregated-table-store") /* state store name */
	.withValueSerde(Serdes.Long()) /* serde for aggregate value */

的详细行为 :KGroupedStream

  • 带有键的输入记录将被忽略。null
  • 当第一次收到 record key 时,初始化器被调用(并在 adder 之前调用)。
  • 每当收到具有非值的记录时,都会调用 adder。null

的详细行为 :KGroupedTable

  • 带有键的输入记录将被忽略。null
  • 当第一次收到记录键时,将调用初始化器(并在加法器之前调用 和减法器)。请注意,与 相反,随着时间的推移,初始化程序可能会被调用 由于收到了该键的 input tombstone 记录,因此多次收到该键的 Tombstone 记录(见下文)。KGroupedStream
  • 当收到 key 的第一个 non- 值(例如 INSERT)时,仅调用 adder。null
  • 当收到 key 的后续非值(例如 UPDATE)时,则 (1) 减法器为 使用表中存储的旧值调用 Adder,并且 (2) Adder 使用 input 记录。如果提取的 old 和 new 值的分组键相同,则保证在 adder 之前调用 subtracter。 这种情况的检测取决于提取的键类型的 equals() 方法的正确实现。否则,为减法器的执行顺序 并且 adder 未定义。null
  • 当收到 key 的 tombstone 记录(即具有值的记录)时(例如 DELETE), 则仅调用 subtracter。请注意,每当减法器本身返回值时, 然后从生成的 .如果发生这种情况,则任何下一个输入 record 将再次触发初始化器。nullnullKTable

有关聚合语义的可视化效果,请参阅本节底部的示例。

聚合 (窗口化)

  • KGroupedStream → KTable

窗口聚合。按分组键聚合每个窗口的记录值。 聚合是 Aggregating 的泛化,它允许,例如,聚合值具有不同的 type 而不是 input 值。 (TimeWindowedKStream 详细信息SessionWindowedKStream 详细信息reduce)

您必须提供初始化器(例如)、“adder”聚合器(例如)、 和一个窗口。当基于会话进行窗口化时,您必须额外提供 “session merger” 聚合器 (例如, )。aggValue = 0aggValue + curValuemergedAggValue = leftAggValue + rightAggValue

窗口化将 a 或 变为窗口化 .aggregateTimeWindowedKStream<K, V>SessionWindowedKStream<K, V>KTable<Windowed<K>, V>

存在多种变体,有关详细信息,请参阅 Javadocs。aggregate

import java.time.Duration;
KGroupedStream<String, Long> groupedStream = ...;

// Java 8+ examples, using lambda expressions

// Aggregating with time-based windowing (here: with 5-minute tumbling windows)
KTable<Windowed<String>, Long> timeWindowedAggregatedStream = groupedStream.windowedBy(Duration.ofMinutes(5))
    .aggregate(
        () -> 0L, /* initializer */
        (aggKey, newValue, aggValue) -> aggValue + newValue, /* adder */
        Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("time-windowed-aggregated-stream-store") /* state store name */
        .withValueSerde(Serdes.Long())); /* serde for aggregate value */

// Aggregating with time-based windowing (here: with 5-minute sliding windows and 30-minute grace period)
KTable<Windowed<String>, Long> timeWindowedAggregatedStream = groupedStream.windowedBy(SlidingWindows.ofTimeDifferenceAndGrace(Duration.ofMinutes(5), Duration.ofMinutes(30)))
    .aggregate(
        () -> 0L, /* initializer */
        (aggKey, newValue, aggValue) -> aggValue + newValue, /* adder */
        Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("time-windowed-aggregated-stream-store") /* state store name */
        .withValueSerde(Serdes.Long())); /* serde for aggregate value */

// Aggregating with session-based windowing (here: with an inactivity gap of 5 minutes)
KTable<Windowed<String>, Long> sessionizedAggregatedStream = groupedStream.windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMinutes(5)).
    aggregate(
    	() -> 0L, /* initializer */
    	(aggKey, newValue, aggValue) -> aggValue + newValue, /* adder */
        (aggKey, leftAggValue, rightAggValue) -> leftAggValue + rightAggValue, /* session merger */
        Materialized.<String, Long, SessionStore<Bytes, byte[]>>as("sessionized-aggregated-stream-store") /* state store name */
        .withValueSerde(Serdes.Long())); /* serde for aggregate value */

详细行为:

  • 窗口化聚合的行为类似于上述滚动聚合。另一个转折点是 该行为适用于每个窗口
  • 通常忽略带有键的输入记录。null
  • 当首次收到给定窗口的记录键时,将调用初始值设定项(并调用 在加法器之前)。
  • 每当为给定窗口接收到具有非值的记录时,都会调用 adder。null
  • 使用会话窗口时:每当合并两个会话时,都会调用会话合并。

有关聚合语义的可视化效果,请参阅本节底部的示例。

计数

  • KGroupedStream → KTable
  • KGroupedTable → KTable

滚动聚合。按分组的键计算记录数。 (KGroupedStream 详细信息KGroupedTable 详细信息)

存在多种变体,有关详细信息,请参阅 Javadocs。count

KGroupedStream<String, Long> groupedStream = ...;
KGroupedTable<String, Long> groupedTable = ...;

// Counting a KGroupedStream
KTable<String, Long> aggregatedStream = groupedStream.count();

// Counting a KGroupedTable
KTable<String, Long> aggregatedTable = groupedTable.count();

的详细行为 :KGroupedStream

  • 将忽略带有键或值的输入记录。null

的详细行为 :KGroupedTable

  • 带有键的输入记录将被忽略。具有值的记录不会被忽略,而是被解释 作为相应键的 “Tombstones”,表示从表中删除该键。nullnull

计数(窗口)

  • KGroupedStream → KTable

窗口聚合。按分组键计算每个窗口的记录数。 (TimeWindowedKStream 详细信息SessionWindowedKStream 详细信息)

窗口化将 a 或 变为窗口化 .countTimeWindowedKStream<K, V>SessionWindowedKStream<K, V>KTable<Windowed<K>, V>

存在多种变体,有关详细信息,请参阅 Javadocs。count

import java.time.Duration;
KGroupedStream<String, Long> groupedStream = ...;

// Counting a KGroupedStream with time-based windowing (here: with 5-minute tumbling windows)
KTable<Windowed<String>, Long> aggregatedStream = groupedStream.windowedBy(
    TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5))) /* time-based window */
    .count();

// Counting a KGroupedStream with time-based windowing (here: with 5-minute sliding windows and 30-minute grace period)
KTable<Windowed<String>, Long> aggregatedStream = groupedStream.windowedBy(
    SlidingWindows.ofTimeDifferenceAndGrace(Duration.ofMinutes(5), Duration.ofMinutes(30))) /* time-based window */
    .count();

// Counting a KGroupedStream with session-based windowing (here: with 5-minute inactivity gaps)
KTable<Windowed<String>, Long> aggregatedStream = groupedStream.windowedBy(
    SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMinutes(5))) /* session window */
    .count();

详细行为:

  • 将忽略带有键或值的输入记录。null

减少

  • KGroupedStream → KTable
  • KGroupedTable → KTable

滚动聚合。按分组键组合 (非窗口化) 记录的值。 当前记录值与上一个减少的值合并,并返回新的减少值。 结果值类型无法更改,这与 . (KGroupedStream 详细信息KGroupedTable 详细信息aggregate)

在减少分组的流时,您必须提供 “adder” reducer (例如 )。 在减少分组表时,您必须额外提供一个 “subtractor” reducer (例如 )。aggValue + curValueaggValue - oldValue

存在多种变体,有关详细信息,请参阅 Javadocs。reduce

KGroupedStream<String, Long> groupedStream = ...;
KGroupedTable<String, Long> groupedTable = ...;

// Java 8+ examples, using lambda expressions

// Reducing a KGroupedStream
KTable<String, Long> aggregatedStream = groupedStream.reduce(
    (aggValue, newValue) -> aggValue + newValue /* adder */);

// Reducing a KGroupedTable
KTable<String, Long> aggregatedTable = groupedTable.reduce(
    (aggValue, newValue) -> aggValue + newValue, /* adder */
    (aggValue, oldValue) -> aggValue - oldValue /* subtractor */);

的详细行为 :KGroupedStream

  • 通常忽略带有键的输入记录。null
  • 首次收到记录键时,该记录的值将用作初始 总值。
  • 每当收到具有非值的记录时,都会调用 adder。null

的详细行为 :KGroupedTable

  • 通常忽略带有键的输入记录。null
  • 首次收到记录键时,该记录的值将用作初始 总值。 请注意,与 相比,随着时间的推移,此初始化步骤可能会发生多次 对于某个键(由于已收到该键的 Input Tombstone 记录)(请参阅下文)。KGroupedStream
  • 当收到 key 的第一个 non- 值(例如 INSERT)时,仅调用 adder。null
  • 当收到 key 的后续非值(例如 UPDATE)时,则 (1) 减法器为 使用表中存储的旧值调用 Adder,并且 (2) Adder 使用 input 记录。如果提取的旧值和新值的分组键相同,则保证在加法器之前调用减法器。 这种情况的检测取决于提取的键类型的 equals() 方法的正确实现。否则,为减法器的执行顺序 并且 adder 未定义。null
  • 当收到 key 的 tombstone 记录(即具有值的记录)时(例如 DELETE), 则仅调用 subtracter。请注意,每当减法器本身返回值时, 然后从生成的 .如果发生这种情况,则任何下一个输入 record 将重新初始化其聚合值。nullnullKTable

有关聚合语义的可视化效果,请参阅本节底部的示例。

减少(窗口化)

  • KGroupedStream → KTable

窗口聚合。按分组键组合每个窗口的记录值。 当前记录值与上一个减少的值合并,并返回新的减少值。 具有 key 或 value 的记录将被忽略。 结果值类型无法更改,这与 . (TimeWindowedKStream 详细信息SessionWindowedKStream 详细信息nullaggregate)

窗口化 将 a 或 a 变为窗口化 。reduceTimeWindowedKStream<K, V>SessionWindowedKStream<K, V>KTable<Windowed<K>, V>

存在多种变体,有关详细信息,请参阅 Javadocs。reduce

import java.time.Duration;
KGroupedStream<String, Long> groupedStream = ...;

// Java 8+ examples, using lambda expressions

// Aggregating with time-based windowing (here: with 5-minute tumbling windows)
KTable<Windowed<String>, Long> timeWindowedAggregatedStream = groupedStream.windowedBy(
  TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)) /* time-based window */)
  .reduce(
    (aggValue, newValue) -> aggValue + newValue /* adder */
  );

// Aggregating with time-based windowing (here: with 5-minute sliding windows and 30-minute grace)
KTable<Windowed<String>, Long> timeWindowedAggregatedStream = groupedStream.windowedBy(
  SlidingWindows.ofTimeDifferenceAndGrace(Duration.ofMinutes(5), Duration.ofMinutes(30))) /* time-based window */)
  .reduce(
    (aggValue, newValue) -> aggValue + newValue /* adder */
  );

// Aggregating with session-based windowing (here: with an inactivity gap of 5 minutes)
KTable<Windowed<String>, Long> sessionzedAggregatedStream = groupedStream.windowedBy(
  SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMinutes(5))) /* session window */
  .reduce(
    (aggValue, newValue) -> aggValue + newValue /* adder */
  );

详细行为:

  • 窗口化 reduce 的行为类似于上述滚动 reduce。另一个变化是 行为按窗口应用。
  • 通常忽略带有键的输入记录。null
  • 当首次收到给定窗口的记录键时,该记录的值将用作 初始聚合值。
  • 每当为给定窗口接收到具有非值的记录时,都会调用 adder。null

有关聚合语义的可视化效果,请参阅本节底部的示例。

流聚合的语义示例:下面显示了一个→示例。流和表最初是空的。大胆 字体在 “KTable ” 列中用于突出显示更改的状态。诸如 demod 的条目表示 具有 key 和 value 的 record 。为了提高语义表的可读性,您可以假设所有记录 按时间戳顺序处理。KGroupedStreamKTableaggregated(hello, 1)hello1

// Key: word, value: count
KStream<String, Integer> wordCounts = ...;

KGroupedStream<String, Integer> groupedStream = wordCounts
    .groupByKey(Grouped.with(Serdes.String(), Serdes.Integer()));

KTable<String, Integer> aggregated = groupedStream.aggregate(
    () -> 0, /* initializer */
    (aggKey, newValue, aggValue) -> aggValue + newValue, /* adder */
    Materialized.<String, Long, KeyValueStore<Bytes, byte[]>as("aggregated-stream-store" /* state store name */)
      .withKeySerde(Serdes.String()) /* key serde */
      .withValueSerde(Serdes.Integer()); /* serde for aggregate value */

注意

记录缓存的影响: 为了便于说明,下面的 “KTable ” 列显示了表的状态随时间的变化。 非常精细的方式。实际上,只有在禁用记录缓存时,您才会以如此精细的方式观察状态更改(默认值:enabled)。 启用记录缓存后,例如,可能发生的情况是带有时间戳的行的输出结果 4 和 5 将被压缩,并且只会有 KTable 中密钥的单个状态更新(此处:从直接到 . 通常,您应该仅出于测试或调试目的禁用记录缓存 - 在正常情况下 最好保持启用记录缓存。aggregatedkafka(kafka 1)(kafka, 3)

  KStreamwordCounts KGroupedStreamgroupedStream KTableaggregated
时间戳 输入记录 分组 初始 化 蝰蛇
1 (你好,1) (你好,1) 0 (表示 hello) (你好,0 + 1)
(你好,1)
2 (kafka,1) (kafka,1) 0 (对于 Kafka) (kafka,0 + 1)
(你好,1)
(kafka,1)
3 (流,1) (流,1) 0(对于流) (流,0 + 1)
(你好,1)
(kafka,1)
(流,1)
4 (kafka,1) (kafka,1)   (kafka,1 + 1)
(你好,1)
(kafka,2)
(流,1)
5 (kafka,1) (kafka,1)   (kafka,2 + 1)
(你好,1)
(kafka,3)
(流,1)
6 (流,1) (流,1)   (流,1 + 1)
(你好,1)
“(kafka,3)
(流,2)

表聚合的语义示例:下面显示了一个→示例。这些表最初是空的。列中使用粗体字体 用于 “KTable ” 以突出显示更改的状态。诸如 such 的条目表示具有 key 和 value 的记录。为了提高语义表的可读性,您可以假定所有记录都已处理 按时间戳顺序。KGroupedTableKTableaggregated(hello, 1)hello1

// Key: username, value: user region (abbreviated to "E" for "Europe", "A" for "Asia")
KTable<String, String> userProfiles = ...;

// Re-group `userProfiles`.  Don't read too much into what the grouping does:
// its prime purpose in this example is to show the *effects* of the grouping
// in the subsequent aggregation.
KGroupedTable<String, Integer> groupedTable = userProfiles
    .groupBy((user, region) -> KeyValue.pair(region, user.length()), Serdes.String(), Serdes.Integer());

KTable<String, Integer> aggregated = groupedTable.aggregate(
    () -> 0, /* initializer */
    (aggKey, newValue, aggValue) -> aggValue + newValue, /* adder */
    (aggKey, oldValue, aggValue) -> aggValue - oldValue, /* subtractor */
    Materialized.<String, Long, KeyValueStore<Bytes, byte[]>as("aggregated-table-store" /* state store name */)
      .withKeySerde(Serdes.String()) /* key serde */
      .withValueSerde(Serdes.Integer()); /* serde for aggregate value */

注意

记录缓存的影响: 为了便于说明,下面的 “KTable ” 列显示了表的状态随时间的变化。 非常精细的方式。实际上,只有在禁用记录缓存时,您才会以如此精细的方式观察状态更改(默认值:enabled)。 启用记录缓存后,例如,可能发生的情况是带有时间戳的行的输出结果 4 和 5 将被压缩,并且只会有 KTable 中密钥的单个状态更新(此处:从直接到 . 通常,您应该仅出于测试或调试目的禁用记录缓存 - 在正常情况下 最好保持启用记录缓存。aggregatedkafka(kafka 1)(kafka, 3)

  KTableuserProfiles KGroupedTablegroupedTable KTableaggregated
时间戳 输入记录 解释为 分组 初始 化 蝰蛇 Subtractor (减法器)
1 (爱丽丝,E) 插入 alice (E,5) 0(对于 E) (E,0 + 5)  
(E,5)
2 (鲍勃,A) INSERT 鲍勃 (一,3) 0 (对于 A) (A,0 + 3)  
(一,3)
(E,5)
3 (查理,A) 插入查理 (一,7)   (A,3 + 7)  
(A, 10)
(E,5)
4 (爱丽丝,A) 更新 alice (一,5)   (A,10 + 5) (E,5 - 5)
A,15 岁))
(E, 0)
5 (查理,空) 删除 charlie (null, 7)     (甲,15 - 7)
(一,8)
(E, 0)
6 (null, E) 忽视        
(一,8)
(E, 0)
7 (鲍勃,E) 更新 bob (E,3)   (E, 0 + 3) (A,8 - 3)
(A, 5)
(E, 3)

加入

还可以联接流和表。实际上,许多流处理应用程序都编码为流式联接。 例如,支持在线商店的应用程序可能需要访问多个更新的数据库表(例如 sales prices, inventory, customer information) 来丰富新的数据记录(例如,Customer Transaction) 信息。也就是说,您需要以非常大的规模和较低的处理量执行表查找 延迟。在这里,一种流行的模式是通过所谓的变更数据捕获Kafka 的 Connect API 相结合,使数据库中的信息在 Kafka 中可用,然后实现 利用 Streams API 执行非常快速和高效的本地联接的应用程序 的 API 和 Streams 中,而不是要求应用程序通过网络对远程数据库进行查询 对于每个记录。在此示例中,Kafka Streams 中的 KTable 概念将使您能够跟踪最新状态 (例如,快照)在本地状态存储中每个表,从而大大降低了处理延迟以及 在执行此类流式联接时减少远程数据库的负载。

支持以下联接操作,另请参阅 Stateful Transformations 的概述部分中的图表。 根据操作数的不同,联接可以是窗口联接或 非窗口联接。

联接操作数 类型 (内部)加入 左联接 外部联接
KStream 到 KStream 窗口 支持 支持 支持
KTable 到 KTable 非窗口化 支持 支持 支持
KTable-to-KTable 外键连接 非窗口化 支持 支持 不支持
KStream 到 KTable 非窗口化 支持 支持 不支持
KStream 到全球 KTable 非窗口化 支持 支持 不支持
KTable 到全球KTable 不适用 不支持 不支持 不支持

后续部分将更详细地解释每种情况。

联接共分区要求

对于等值连接,连接时必须对输入数据进行共分区。这可确保 input 记录的两侧具有相同键的 join 的 API 任务在处理过程中被传送到同一个 Stream 任务。加入时,您有责任确保数据协同分区。 在执行 KTable-KTable 外键连接全局 KTable 连接时,不需要共同分区。

数据协同分区的要求是:

  • join 的 input 主题(左侧和右侧)必须具有相同的分区数
  • 写入输入主题的所有应用程序必须具有相同的分区策略,以便 相同的 key 被传送到相同的分区号。换句话说,输入数据的键空间必须为 以相同的方式分布在各个分区中。 这意味着,例如,使用 Kafka 的 Java Producer API 的应用程序必须使用 相同的分区程序(参见 producer 设置 aka ), 使用 Kafka 的 Streams API 的应用程序必须对 .好消息是,如果您碰巧在所有 应用程序,您无需担心分区策略。"partitioner.class"ProducerConfig.PARTITIONER_CLASS_CONFIGStreamPartitionerKStream#to()

为什么需要数据协同分区?由于 KStream-KStreamKTable-KTableKStream-KTable 联接 根据记录的键(例如,)执行,则要求 Join 的 Input Streams/Table 按 Key 进行共分区。leftRecord.key == rightRecord.key

有两个例外不需要共分区。对于 KStream-GlobalKTable 联接,共分区为 不是必需的,因为 的基础 changelog 流的所有分区都可供 每个实例。也就是说,每个实例都有 changelog 流的完整副本。此外,a 允许从 到 的非基于键的连接。KTable-KTable 外键联接也不需要共同分区。Kafka Streams 在内部确保外键联接的共分区。GlobalKTableKafkaStreamsKeyValueMapperKStreamGlobalKTable

注意

Kafka Streams 部分验证了共分区要求:在分区分配步骤中,即在运行时,Kafka Streams 会验证 联接的两端相同。如果不是,则 (运行时异常) 正在 扔。请注意,Kafka Streams 无法验证分区策略是否在输入之间匹配 streams/tables – 由用户来确保情况如此。TopologyBuilderException

确保数据协同分区:如果 join 的 inputs 尚未共分区,则必须手动确保这一点。 您可以遵循如下所述的程序。 建议使用较少的分区对主题进行重新分区,以匹配较大的分区数以避免瓶颈。 从技术上讲,也可以将具有更多分区的主题重新分区为较小的分区编号。 对于流表联接,建议对 KStream 重新分区,因为对 KTable 重新分区可能会导致第二个状态存储。 对于表-表联接,您可能还会考虑 KTables 的大小,并对较小的 KTable 进行重新分区。

  1. 确定其底层 Kafka 主题的分区数较少的联接中的输入 KStream/KTable。 我们将此流/表称为 “SMALLER”,并将联接的另一侧称为 “LARGER”。要了解 您可以使用 Kafka 主题的分区,例如,带有 option 的 CLI 工具。bin/kafka-topics--describe

  2. 在您的应用程序中,对 “SMALLER” 的数据进行重新分区。您必须确保在重新分区 带有 的数据与使用 “LARGER” 相同的分区程序。repartition

    • 如果 “SMALLER” 是 KStream: .KStream#repartition(Repartitioned.numberOfPartitions(...))
    • 如果 “SMALLER” 是 KTable: .KTable#toStream#repartition(Repartitioned.numberOfPartitions(...).toTable())
  3. 在您的应用程序中,执行 “LARGER” 和新 stream/table 之间的联接。

KStream-KStream 加入

KStream-KStream 联接始终是窗口联接,因为否则 用于执行 Join 的内部状态存储(例如,滑动窗口或“缓冲区”)将 无限增长。对于 stream-stream join,重要的是要强调一侧的新 input 记录将 为另一端的每个匹配记录生成一个 join 输出,并且可以有多个这样的匹配记录 在给定的 join 窗口中(例如,参见下面 join semantics 表中时间戳为 15 的行)。

利用用户提供的 :ValueJoiner

KeyValue<K, LV> leftRecord = ...;
KeyValue<K, RV> rightRecord = ...;
ValueJoiner<LV, RV, JV> joiner = ...;

KeyValue<K, JV> joinOutputRecord = KeyValue.pair(
    leftRecord.key, /* by definition, leftRecord.key == rightRecord.key */
    joiner.apply(leftRecord.value, rightRecord.value)
  );
转型 描述

内部联接(窗口化)

  • (KStream,KStream) → KStream

将此流与另一个流执行 INNER JOIN。 即使此操作是窗口化的,联接的流也将是 type 而不是 .(详情)KStream<K, ...>KStream<Windowed<K>, ...>

数据必须协同分区:两侧的输入数据必须协同分区

当且仅当流被标记为重新分区时,才导致流的数据重新分区(如果两个流都被标记,则两个流都被重新分区)。

exists 的几个变体,有关详细信息,请参阅 Javadocs。join

import java.time.Duration;
KStream<String, Long> left = ...;
KStream<String, Double> right = ...;

// Java 8+ example, using lambda expressions
KStream<String, String> joined = left.join(right,
    (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue, /* ValueJoiner */
    JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5)),
    Joined.with(
      Serdes.String(), /* key */
      Serdes.Long(),   /* left value */
      Serdes.Double())  /* right value */
  );

详细行为:

  • 连接是基于的,即使用 join predicate ,并且是基于窗口的,即当且仅当它们的 时间戳由 User-Supplied 定义,彼此“接近”,即窗口在记录时间戳上定义一个额外的连接谓词。leftRecord.key == rightRecord.keyJoinWindows

  • 每当收到新输入时,都会在下面列出的条件下触发联接。当它被触发时,将调用用户提供的 连接输出记录。ValueJoiner

    • 具有键或值的输入记录将被忽略,并且不会触发联接。nullnull

有关详细说明,请参阅本节底部的语义概述。

左连接(窗口化)

  • (KStream,KStream) → KStream

将此流与另一个流执行 LEFT JOIN。 即使此操作是窗口化的,联接的流也将是 type 而不是 .(详情)KStream<K, ...>KStream<Windowed<K>, ...>

数据必须协同分区:两侧的输入数据必须协同分区

当且仅当流被标记为重新分区时,才导致流的数据重新分区(如果两个流都被标记,则两个流都被重新分区)。

exists 的几个变体,有关详细信息,请参阅 Javadocs。leftJoin

import java.time.Duration;
KStream<String, Long> left = ...;
KStream<String, Double> right = ...;

// Java 8+ example, using lambda expressions
KStream<String, String> joined = left.leftJoin(right,
    (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue, /* ValueJoiner */
    JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5)),
    Joined.with(
      Serdes.String(), /* key */
      Serdes.Long(),   /* left value */
      Serdes.Double())  /* right value */
  );

详细行为:

  • 连接是基于的,即使用 join predicate ,并且是基于窗口的,即当且仅当它们的 时间戳由 User-Supplied 定义,彼此“接近”,即窗口在记录时间戳上定义一个额外的连接谓词。leftRecord.key == rightRecord.keyJoinWindows

  • 每当收到新输入时,都会在下面列出的条件下触发联接。当它被触发时,将调用用户提供的 连接输出记录。ValueJoiner

    • 具有值的输入记录将被忽略,并且不会触发联接。null
  • 对于左侧上没有任何匹配项的每条输入记录,将使用 ; 这解释了下表中 timestamp=60 和 timestampe=80 的行,其中列出了 和 LEFT JOIN 列。 请注意,这些剩余结果将在指定的宽限期过后发出。注意:使用已弃用的 API 可能会导致急切地发出虚假的 left 结果。ValueJoinerValueJoiner#apply(leftRecord.value, null)[E, null][F, null]JoinWindows.of(...).grace(...)

有关详细说明,请参阅本节底部的语义概述。

外部联接(窗口式)

  • (KStream,KStream) → KStream

将此流与另一个流执行 OUTER JOIN。 即使此操作是窗口化的,联接的流也将是 type 而不是 .(详情)KStream<K, ...>KStream<Windowed<K>, ...>

数据必须协同分区:两侧的输入数据必须协同分区

当且仅当流被标记为重新分区时,才导致流的数据重新分区(如果两个流都被标记,则两个流都被重新分区)。

exists 的几个变体,有关详细信息,请参阅 Javadocs。outerJoin

import java.time.Duration;
KStream<String, Long> left = ...;
KStream<String, Double> right = ...;

// Java 8+ example, using lambda expressions
KStream<String, String> joined = left.outerJoin(right,
    (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue, /* ValueJoiner */
    JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5)),
    Joined.with(
      Serdes.String(), /* key */
      Serdes.Long(),   /* left value */
      Serdes.Double())  /* right value */
  );

详细行为:

  • 连接是基于的,即使用 join predicate ,并且是基于窗口的,即当且仅当它们的 时间戳由 User-Supplied 定义,彼此“接近”,即窗口在记录时间戳上定义一个额外的连接谓词。leftRecord.key == rightRecord.keyJoinWindows

  • 每当收到新输入时,都会在下面列出的条件下触发联接。当它被触发时,将调用用户提供的 连接输出记录。ValueJoiner

    • 具有值的输入记录将被忽略,并且不会触发联接。null
  • 对于一侧没有任何匹配项的一侧的每个 input 记录,将分别使用 或 调用 ;下表中 timestamp=60、timestamp=80 和 timestamp=100 的行对此进行了说明,其中列出了 、 和 OUTER JOIN 列。 请注意,这些 left 和 right 结果将在指定的宽限期过后发出。注意:使用已弃用的 API 可能会导致急切地发出虚假的左/右结果。ValueJoinerValueJoiner#apply(leftRecord.value, null)ValueJoiner#apply(null, rightRecord.value)[E, null][F, null][null, f]JoinWindows.of(...).grace(...)

有关详细说明,请参阅本节底部的语义概述。

stream-stream 连接的语义:下面介绍了各种 stream-stream join 变体的语义。 为了提高表的可读性,假设 (1) 所有记录都具有相同的键(因此省略了表中的键),并且 (2) 所有记录都按时间戳顺序处理。 我们假设加入窗口大小为 10 秒,宽限期为 5 秒。

注意:如果您使用旧的和现已弃用的 API 来指定宽限期,即 左/外联接结果会急切地发出,观察到的结果可能与下面显示的结果不同。JoinWindows.of(...).grace(...)

列 INNER JOIN、LEFT JOIN 和 OUTER JOIN 表示每当在联接的任一端收到新的输入记录时,分别作为参数传递给用户提供的 ValueJoiner for the , 和 methods。空表 cell 表示 根本没有调用 。joinleftJoinouterJoinValueJoiner

时间戳 左 (KStream) 右 (KStream) (内部)加入 左联接 外部联接
1        
2        
3 一个    
4   一个 [一、一] [一、一] [一、一]
5 B   [二、一] [二、一] [二、一]
6   b [A, b], [B, b] [A, b], [B, b] [A, b], [B, b]
7        
8        
9 C   [C, a], [C, b] [C, a], [C, b] [C, a], [C, b]
10   c [A, c], [B, c], [C, c] [A, c], [B, c], [C, c] [A, c], [B, c], [C, c]
11        
12        
13        
14   d [A, d], [B, d], [C, d] [A, d], [B, d], [C, d] [A, d], [B, d], [C, d]
15 D   [D, a], [D, b], [D, c], [D, d] [D, a], [D, b], [D, c], [D, d] [D, a], [D, b], [D, c], [D, d]
...
40 E
...
60 F [E, 空] [E, 空]
...
80 f [F, 空] [F, 空]
...
100 G [null, f]
KTable-KTable 等连接

KTable-KTable 等值联接始终是非窗口联接。它们的设计与 关系数据库。两个 KTable 的 changelog 流都被具体化到本地状态存储中,以表示 他们 Table Doubles 的最新快照。 join 结果是一个新的 KTable,它表示 join 操作的 changelog 流。

利用用户提供的 :ValueJoiner

KeyValue<K, LV> leftRecord = ...;
KeyValue<K, RV> rightRecord = ...;
ValueJoiner<LV, RV, JV> joiner = ...;

KeyValue<K, JV> joinOutputRecord = KeyValue.pair(
    leftRecord.key, /* by definition, leftRecord.key == rightRecord.key */
    joiner.apply(leftRecord.value, rightRecord.value)
  );
转型 描述

内部联接

  • (KTable、KTable) → KTable

将此表与另一个表执行 INNER JOIN。 结果是一个不断更新的 KTable,它表示联接的 “当前” 结果。(详情)

数据必须协同分区:两侧的输入数据必须协同分区

KTable<String, Long> left = ...;
KTable<String, Double> right = ...;

// Java 8+ example, using lambda expressions
KTable<String, String> joined = left.join(right,
    (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue /* ValueJoiner */
  );

详细行为:

  • 连接是基于的,即使用 join 谓词 。leftRecord.key == rightRecord.key

  • 每当收到新输入时,都会在下面列出的条件下触发联接。当它被触发时,将调用用户提供的 连接输出记录。ValueJoiner

    • 带有键的输入记录将被忽略,并且不会触发联接。null
    • 具有值的输入记录将解释为相应键的逻辑删除,这表示从表中删除了键。墓碑不会 触发联接。当收到输入逻辑删除时,如果需要,输出逻辑删除将直接转发到连接结果 KTable(即,仅当相应的 key 实际上已经存在于连接结果 KTable 中)。null
    • 连接版本化表时, 无序输入记录,即同一表中的另一条记录具有相同键和更大时间戳的记录, 已被处理,将被忽略并且不会触发联接。

有关详细说明,请参阅本节底部的语义概述。

左联接

  • (KTable、KTable) → KTable

将此表与另一个表执行 LEFT JOIN。(详情)

数据必须协同分区:两侧的输入数据必须协同分区

KTable<String, Long> left = ...;
KTable<String, Double> right = ...;

// Java 8+ example, using lambda expressions
KTable<String, String> joined = left.leftJoin(right,
    (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue /* ValueJoiner */
  );

详细行为:

  • 连接是基于的,即使用 join 谓词 。leftRecord.key == rightRecord.key

  • 每当收到新输入时,都会在下面列出的条件下触发联接。当它被触发时,将调用用户提供的 连接输出记录。ValueJoiner

    • 带有键的输入记录将被忽略,并且不会触发联接。null
    • 具有值的输入记录将解释为相应键的逻辑删除,这表示从表中删除了键。右侧逻辑删除会触发联接, 但 left-tombstones 则不会:当收到一个输入的 tombstone 时,如果需要,输出 tombstone 会直接转发到连接结果 KTable(即,仅当相应的 key 实际上已经存在于连接结果 KTable 中)。null
    • 连接版本化表时, 无序输入记录,即同一表中的另一条记录具有相同键和更大时间戳的记录, 已被处理,将被忽略并且不会触发联接。
  • 对于左侧上没有任何匹配项的每条输入记录,将使用 ; 这解释了下表中 timestamp=3 的行,该行列在 LEFT JOIN 列中。ValueJoinerValueJoiner#apply(leftRecord.value, null)[A, null]

有关详细说明,请参阅本节底部的语义概述。

外部联接

  • (KTable、KTable) → KTable

将此表与另一个表执行 OUTER JOIN。(详情)

数据必须协同分区:两侧的输入数据必须协同分区

KTable<String, Long> left = ...;
KTable<String, Double> right = ...;

// Java 8+ example, using lambda expressions
KTable<String, String> joined = left.outerJoin(right,
    (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue /* ValueJoiner */
  );

详细行为:

  • 连接是基于的,即使用 join 谓词 。leftRecord.key == rightRecord.key

  • 每当收到新输入时,都会在下面列出的条件下触发联接。当它被触发时,将调用用户提供的 连接输出记录。ValueJoiner

    • 带有键的输入记录将被忽略,并且不会触发联接。null
    • 具有值的输入记录将解释为相应键的逻辑删除,这表示从表中删除了键。逻辑删除可能会触发连接, 取决于 Left 和 Right 表中的内容。当收到输入逻辑删除时,如果需要,输出逻辑删除将直接转发到连接结果 KTable(即,仅当相应的 key 实际上已经存在于连接结果 KTable 中)。null
    • 连接版本化表时, 无序输入记录,即同一表中的另一条记录具有相同键和更大时间戳的记录, 已被处理,将被忽略并且不会触发联接。
  • 对于一侧没有任何匹配项的一侧的每个 input 记录,将分别使用 或 调用 ;这解释了下表中 timestamp=3 和 timestamp=7 的行,它们分别在 OUTER JOIN 列中列出 和 。ValueJoinerValueJoiner#apply(leftRecord.value, null)ValueJoiner#apply(null, rightRecord.value)[A, null][null, b]

有关详细说明,请参阅本节底部的语义概述。

表-表等值联接的语义:下面介绍了各种 table-table 等值联接变体的语义。 为了提高表的可读性,您可以假设 (1) 所有记录都具有相同的键(因此省略了表中的键),并且 (2) 所有记录都按时间戳顺序处理。 列 INNER JOIN、LEFT JOIN 和 OUTER JOIN 表示每当在联接的任一端收到新的输入记录时,分别作为参数传递给用户提供的 ValueJoiner for the , 和 methods。空表 cell 表示 根本没有调用 。joinleftJoinouterJoinValueJoiner

时间戳 左 (KTable) 右 (KTable) (内部)加入 左联接 外部联接
1        
2        
3 一个     [A, 空] [A, 空]
4   一个 [一、一] [一、一] [一、一]
5 B   [二、一] [二、一] [二、一]
6   b [二、二] [二、二] [二、二]
7   [null, b]
8      
9 C     [C, 空] [C, 空]
10   c [三、三] [三、三] [三、三]
11   [C, 空] [C, 空]
12    
13        
14   d     [null, d]
15 D   [D, D] [D, D] [D, D]
KTable-KTable 外键 加入

KTable-KTable 外键联接始终是非窗口联接。外键联接类似于 SQL 中的联接。举个粗略的例子:

SELECT ... FROM {this KTable} JOIN {other KTable} ON {other.key} = {result of foreignKeyExtractor(this.value)} ...
该操作的输出是一个包含 join 结果的新 KTable。

的 changelog streams 这两个 KTables 都被具体化到本地 state store 中,以 代表他们的牌桌对偶组的最新快照。外键提取器 函数应用于左侧记录,并使用新的中间 record created 的 ID 中,用于查找和连接相应的 主键位于右侧表上。 结果是一个表示 changelog 流的新 KTable 的 join 操作。

左侧 KTable 可以有多个记录映射到同一个 键。对单个左侧 KTable 条目的更新 可能会导致单个 output 事件,前提是相应的 key 存在于正确的 KTable 中。因此,对 right KTable 条目将导致 left KTable 具有相同的外键。

转型 描述

内部联接

  • (KTable、KTable) → KTable

执行此 table 替换为另一个 table。结果是不断更新 KTable 的 URL 来表示联接的 “当前” 结果。(详情)

KTable<String, Long> left = ...;
                KTable<Long, Double> right = ...;
//This foreignKeyExtractor simply uses the left-value to map to the right-key.
Function<Long, Long> foreignKeyExtractor = (x) -> x;

// Java 8+ example, using lambda expressions
                KTable<String, String> joined = left.join(right, foreignKeyExtractor,
                    (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue /* ValueJoiner */
                  );

详细行为:

  • join 是基于的,即 替换为 Join 谓词:

    foreignKeyExtractor.apply(leftRecord.value) == rightRecord.key
  • join 将在 条件,每当新输入为 收到。触发时,将调用用户提供的 join 输出记录。ValueJoiner

    • 其 produces 的记录将被忽略,并且不会触发联接。 如果要使用外键进行联接,请使用合适的 sentinel 值(即 对于 String 字段或自动递增的 Integer 字段)。foreignKeyExtractornullnull"NULL"-1
    • 具有值的输入记录将解释为相应键的逻辑删除,这表示 从表中删除键。墓碑不会 触发联接。当输入 tombstone 接收到,然后输出 Tombstone 直接转发到 Join result KTable(即仅当 相应的 key 实际上已经存在于 连接结果 KTable)。null
    • 连接版本化表时, 无序输入记录,即同一表中的另一条记录具有相同键和更大时间戳的记录, 已被处理,将被忽略并且不会触发联接。

请参阅底部的语义概述 有关详细说明。

左联接

  • (KTable、KTable) → KTable

执行此的外键 LEFT JOIN table 替换为另一个 table。(详情)

KTable<String, Long> left = ...;
                KTable<Long, Double> right = ...;
//This foreignKeyExtractor simply uses the left-value to map to the right-key.
Function<Long, Long> foreignKeyExtractor = (x) -> x;

// Java 8+ example, using lambda expressions
                KTable<String, String> joined = left.leftJoin(right, foreignKeyExtractor,
                    (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue /* ValueJoiner */
                  );

详细行为:

  • join 是基于的,即 替换为 Join 谓词:

    foreignKeyExtractor.apply(leftRecord.value) == rightRecord.key
  • join 将在 条件,每当新输入为 收到。触发时,将调用用户提供的 join 输出记录。ValueJoiner

    • 具有值的输入记录将解释为相应键的逻辑删除,这表示 从表中删除键。右逻辑删除 触发联接,但 left-tombstones 不会: 当收到输入逻辑删除时,输出 Tombstone 直接转发到 Join result KTable(即仅当 相应的 key 实际上已经存在于 连接结果 KTable)。null
    • 连接版本化表时, 无序输入记录,即同一表中的另一条记录具有相同键和更大时间戳的记录, 已被处理,将被忽略并且不会触发联接。
  • 对于左侧的每个输入记录 右侧没有任何匹配项的一侧, 将 用 ;这 解释了 timestamp=7 & 8 的行 表,其中列在 LEFT JOIN 列中。ValueJoinerValueJoiner#apply(leftRecord.value, null)(q,10,null) and (r,10,null)

请参阅底部的语义概述 有关详细说明。

表-表外键联接的语义:table-table 外键 INNER 和 LEFT JOIN 的语义 变体如下所示。 该键显示在每条记录的值旁边。 记录按递增的偏移顺序处理。 INNER JOIN 和 LEFT JOIN 列表示 每当收到新的输入记录时,分别作为参数传递给用户提供的 和 方法 ValueJoiner 在联接的两侧。空的表格单元格表示根本没有调用 the。对于此示例,只需使用 left-value 作为输出。joinleftJoinValueJoinerFunction foreignKeyExtractor

记录偏移量 左 KTable (K, extracted-FK) 右 KTable (FK, VR) (内部)加入 左联接
1 (k,1) (1,foo) (k、1、foo)
(k、1、foo)
2 (k,2)
(k,null) (k,2,null)
3 (k,3)
  (k,null) (k,3,null)
4   (3,巴)
(k,3,bar)
(k,3,bar)
5 (k,null)
  (k,null)
(k,null,null)
6 (k,1)
(k、1、foo)
(k、1、foo)
7 (问,10)
 
(q,10,null)
8 (R,10)
  (r,10,null)
9
(10,巴兹) (Q,10,BAZ)、(R,10,BAZ) (Q,10,BAZ)、(R,10,BAZ)
KStream-KTable 联接

KStream-KTable 联接始终是非窗口联接。它们允许您对 KTable 执行表查找 (changelog stream) 收到来自 KStream(记录流)的新记录。一个示例用例是扩充 具有最新用户配置文件信息 (KTable) 的用户活动流 (KStream)。

利用用户提供的 :ValueJoiner

KeyValue<K, LV> leftRecord = ...;
KeyValue<K, RV> rightRecord = ...;
ValueJoiner<LV, RV, JV> joiner = ...;

KeyValue<K, JV> joinOutputRecord = KeyValue.pair(
    leftRecord.key, /* by definition, leftRecord.key == rightRecord.key */
    joiner.apply(leftRecord.value, rightRecord.value)
  );
转型 描述

内部联接

  • (KStream、KTable) → KStream

使用表执行此流的 INNER JOIN,从而有效地执行表查找。(详情)

数据必须协同分区:两侧的输入数据必须协同分区

当且仅当流被标记为重新分区时,才导致流的数据重新分区。

exists 的几个变体,有关详细信息,请参阅 Javadocs。join

KStream<String, Long> left = ...;
KTable<String, Double> right = ...;

// Java 8+ example, using lambda expressions
KStream<String, String> joined = left.join(right,
    (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue, /* ValueJoiner */
    Joined.keySerde(Serdes.String()) /* key */
      .withValueSerde(Serdes.Long()) /* left value */
      .withGracePeriod(Duration.ZERO) /* grace period */
  );

详细行为:

  • 连接是基于的,即使用 join 谓词 。leftRecord.key == rightRecord.key

  • 每当收到新输入时,都会在下面列出的条件下触发联接。当它被触发时,将调用用户提供的 连接输出记录。ValueJoiner

    • 只有左侧 (stream) 的输入记录才会触发联接。右侧(表)的输入记录仅更新内部右侧联接状态。
    • 具有键或值的流的输入记录将被忽略,并且不会触发联接。nullnull
    • 具有值的表的输入记录将解释为相应键的逻辑删除,这表示从表中删除了键。 逻辑删除不会触发联接。null
  • 对表进行版本控制后, 要联接的表记录是通过执行带时间戳的查找来确定的,即联接的表记录将是具有时间戳的最新时间戳记录 小于或等于流记录时间戳。如果流记录时间戳早于表的历史记录保留期,则会删除该记录。
  • 要使用宽限期,需要对表进行版本控制。 这将导致流在尝试在表中查找具有正确时间戳的匹配记录之前缓冲指定的宽限期。 使用宽限期的情况是,表中记录的时间戳小于或等于流记录时间戳,但在流记录之后到达。 如果表记录在宽限期内到达,则仍将进行联接。 如果表记录在宽限期之前未到达,则联接将照常继续。

有关详细说明,请参阅本节底部的语义概述。

左联接

  • (KStream、KTable) → KStream

对表执行此流的 LEFT JOIN,从而有效地执行表查找。(详情)

数据必须协同分区:两侧的输入数据必须协同分区

当且仅当流被标记为重新分区时,才导致流的数据重新分区。

exists 的几个变体,有关详细信息,请参阅 Javadocs。leftJoin

KStream<String, Long> left = ...;
KTable<String, Double> right = ...;

// Java 8+ example, using lambda expressions
KStream<String, String> joined = left.leftJoin(right,
    (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue, /* ValueJoiner */
    Joined.keySerde(Serdes.String()) /* key */
      .withValueSerde(Serdes.Long()) /* left value */
      .withGracePeriod(Duration.ZERO) /* grace period */
  );

详细行为:

  • 连接是基于的,即使用 join 谓词 。leftRecord.key == rightRecord.key

  • 每当收到新输入时,都会在下面列出的条件下触发联接。当它被触发时,将调用用户提供的 连接输出记录。ValueJoiner

    • 只有左侧 (stream) 的输入记录才会触发联接。右侧(表)的输入记录仅更新内部右侧联接状态。
    • 具有值的流的输入记录将被忽略,并且不会触发联接。null
    • 具有值的表的输入记录将解释为相应键的逻辑删除,这表示从表中删除了键。 逻辑删除不会触发联接。null
  • 对于左侧上没有任何匹配项的每条输入记录,将使用 ; 这解释了下表中 timestamp=3 的行,该行列在 LEFT JOIN 列中。ValueJoinerValueJoiner#apply(leftRecord.value, null)[A, null]

  • 对表进行版本控制后, 要联接的表记录是通过执行带时间戳的查找来确定的,即联接的表记录将是具有时间戳的最新时间戳记录 小于或等于流记录时间戳。如果流记录时间戳早于表的历史记录保留期,则联接的记录将为 。null
  • 要使用宽限期,需要对表进行版本控制。 这将导致流在尝试在表中查找具有正确时间戳的匹配记录之前缓冲指定的宽限期。 使用宽限期的情况是,表中记录的时间戳小于或等于流记录时间戳,但在流记录之后到达。 如果表记录在宽限期内到达,则仍将进行联接。 如果表记录在宽限期之前未到达,则联接将照常继续。

有关详细说明,请参阅本节底部的语义概述。

流表联接的语义:下面介绍了各种 stream-table 联接变体的语义。 为了提高表的可读性,我们假设 (1) 所有记录都具有相同的键(因此我们省略了 表),并且 (2) 所有记录都按时间戳顺序处理。 列 INNER JOIN 和 LEFT JOIN 表示每当在联接的任一端收到新的输入记录时,分别作为参数传递给用户提供的 value Joiner for 和 方法的内容。空表 cell 表示 根本没有调用 。joinleftJoinValueJoiner

时间戳 左 (KStream) 右 (KTable) (内部)加入 左联接
1      
2      
3 一个     [A, 空]
4   一个    
5 B   [二、一] [二、一]
6   b    
7      
8      
9 C     [C, 空]
10   c    
11      
12      
13      
14   d    
15 D   [D, D] [D, D]
KStream-GlobalKTable 加入

KStream-GlobalKTable 联接始终是非窗口联接。它们允许您在从 KStream(记录流)。一个示例使用案例是 “star queries” 或 “star joins”,您可以在其中扩充流 用户活动 (KStream) 与最新的用户配置文件信息 (GlobalKTable) 和更多上下文信息 (更多 GlobalKTables)。但是,由于 GlobalKTables 没有时间概念,因此 KStream-GlobalKTable 联接不是临时联接,并且对 GlobalKTable 的更新与 KStream 记录的处理之间没有事件时间同步。

概括地说,KStream-GlobalKTable 联接与 KStream-KTable 联接非常相似。但是,全局表为您提供 与分区相比,具有更大的灵活性,但代价更高 表:

  • 它们不需要数据协同分区
  • 它们允许高效的 “星形连接”;即,将大规模的 “facts” 流与 “dimension” 表联接
  • 它们允许针对外键进行连接;即,您可以在表中查找数据,而不仅仅是通过 stream 的 SET 和记录值中的 DATA。
  • 它们使许多用例成为可能,在这些用例中,您必须处理严重倾斜的数据,从而遭受热分区的困扰。
  • 当您需要在 继承。

利用用户提供的 :ValueJoiner

KeyValue<K, LV> leftRecord = ...;
KeyValue<K, RV> rightRecord = ...;
ValueJoiner<LV, RV, JV> joiner = ...;

KeyValue<K, JV> joinOutputRecord = KeyValue.pair(
    leftRecord.key, /* by definition, leftRecord.key == rightRecord.key */
    joiner.apply(leftRecord.value, rightRecord.value)
  );
转型 描述

内部联接

  • (KStream,GlobalKTable) → KStream

使用全局表执行此流的 INNER JOIN,从而有效地执行表查找。(详情)

在实例(重新)启动时完全引导,这意味着该表完全填充了底层主题中的所有数据,即 在启动时可用。实际的数据处理仅在引导完成后才开始。GlobalKTableKafkaStreams

当且仅当流被标记为重新分区时,才导致流的数据重新分区。

KStream<String, Long> left = ...;
GlobalKTable<Integer, Double> right = ...;

// Java 8+ example, using lambda expressions
KStream<String, String> joined = left.join(right,
    (leftKey, leftValue) -> leftKey.length(), /* derive a (potentially) new key by which to lookup against the table */
    (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue /* ValueJoiner */
  );

详细行为:

  • 连接是间接基于键的,即使用 join 谓词 。KeyValueMapper#apply(leftRecord.key, leftRecord.value) == rightRecord.key

  • 每当收到新输入时,都会在下面列出的条件下触发联接。当它被触发时,将调用用户提供的 连接输出记录。ValueJoiner

    • 只有左侧 (stream) 的输入记录才会触发联接。右侧(表)的输入记录仅更新内部右侧联接状态。
    • 具有键或值的流的输入记录将被忽略,并且不会触发联接。nullnull
    • 具有值的表的输入记录将解释为逻辑删除,这表示从表中删除了记录键。逻辑删除不会触发 加入。null

左联接

  • (KStream,GlobalKTable) → KStream

使用全局表执行此流的 LEFT JOIN,从而有效地执行表查找。(详情)

在实例(重新)启动时完全引导,这意味着该表完全填充了底层主题中的所有数据,即 在启动时可用。实际的数据处理仅在引导完成后才开始。GlobalKTableKafkaStreams

当且仅当流被标记为重新分区时,才导致流的数据重新分区。

KStream<String, Long> left = ...;
GlobalKTable<Integer, Double> right = ...;

// Java 8+ example, using lambda expressions
KStream<String, String> joined = left.leftJoin(right,
    (leftKey, leftValue) -> leftKey.length(), /* derive a (potentially) new key by which to lookup against the table */
    (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue /* ValueJoiner */
  );

详细行为:

  • 连接是间接基于键的,即使用 join 谓词 。KeyValueMapper#apply(leftRecord.key, leftRecord.value) == rightRecord.key

  • 每当收到新输入时,都会在下面列出的条件下触发联接。当它被触发时,将调用用户提供的 连接输出记录。ValueJoiner

    • 只有左侧 (stream) 的输入记录才会触发联接。右侧(表)的输入记录仅更新内部右侧联接状态。
    • 具有值的流的输入记录将被忽略,并且不会触发联接。null
    • 具有值的表的输入记录将解释为逻辑删除,这表示从表中删除了记录键。逻辑删除不会触发 加入。null
  • 对于左侧上没有任何匹配项的每条输入记录,将使用 调用 。ValueJoinerValueJoiner#apply(leftRecord.value, null)

stream-global-table 联接的语义:联接语义与 KStream-KTable 联接不同,因为它不是临时联接。 另一个区别是,对于 KStream-GlobalKTable 联接,左侧输入记录首先被 “映射” 为 在表查找之前提供给表的键空间的用户。KeyValueMapper

窗口

开窗允许您控制如何对具有相同键的记录进行分组,以进行有状态操作(例如聚合联接)到 所谓的 Windows。按记录键跟踪窗口。

注意

相关操作是 grouping,它将所有 具有相同键的记录,以确保数据被正确分区(“键控”)以供后续操作使用。 分组后,窗口允许您进一步对键的记录进行子分组。

例如,在 join 操作中,窗口状态存储用于存储到目前为止在 定义的窗口边界。在聚合操作中,窗口状态存储用于存储最新的聚合 每个窗口的结果。 状态存储中的旧记录将在指定的窗口保留期后清除。 Kafka Streams 保证至少在此指定时间内保留一个窗口;默认值为 1 天,可以是 通过 更改。Materialized#withRetention()

DSL 支持以下类型的窗口:

窗口名称 行为 简短描述
跳跃时间窗口 基于时间 固定大小的重叠窗口
翻转时间窗口 基于时间 固定大小、不重叠、无间隙的窗口
滑动时间窗口 基于时间 固定大小的重叠窗口,用于处理记录时间戳之间的差异
Session 窗口 基于会话 动态大小、非重叠、数据驱动的窗口
跳跃时间窗口

跳跃时间窗口是基于时间间隔的窗口。他们对固定大小的(可能)重叠的窗口进行建模。 跳跃窗口由两个属性定义:窗口的大小及其提前间隔(又名“跳跃”)。进步 interval 指定窗口相对于前一个窗口向前移动的程度。例如,您可以配置 大小为 5 分钟且提前间隔为 1 分钟的跳跃窗口。由于跳跃窗口可以重叠 - 并且 一般他们这样做 – 数据记录可能属于多个此类窗口。

注意

跳跃窗口与滑动窗口:在其他流处理工具中,跳跃窗口有时称为 “滑动窗口”。Kafka Streams 遵循 学术文献中的术语,其中滑动窗口的语义与跳跃窗口的语义不同。

以下代码定义了一个大小为 5 分钟且提前间隔为 1 分钟的跳跃窗口:

import java.time.Duration;
import org.apache.kafka.streams.kstream.TimeWindows;

// A hopping time window with a size of 5 minutes and an advance interval of 1 minute.
// The window's name -- the string parameter -- is used to e.g. name the backing state store.
Duration windowSize = Duration.ofMinutes(5);
Duration advance = Duration.ofMinutes(1);
TimeWindows.ofSizeWithNoGrace(windowSize).advanceBy(advance);

此图显示了使用跳跃窗口对数据记录流进行窗口化。在此图中,时间数字 表示分钟;例如,t=5 表示“在 5 分钟标记处”。实际上,Kafka Streams 中的时间单位是 毫秒,这意味着时间数字需要乘以 60 * 1000 才能将分钟转换为 毫秒(例如 t=5 将变为 t=300,000)。

跳跃时间窗口与 epoch 对齐,包括区间下限和上限 排他性。“与纪元对齐”意味着第一个窗口从时间戳 0 开始。例如,hopping 大小为 5000 毫秒且高级间隔(“跃点”)为 3000 毫秒的窗口具有可预测的窗口边界,而不是甚至像 .[0;5000),[3000;8000),...[1000;6000),[4000;9000),...[1452;6452),[4452;9452),...

与我们之前看到的非窗口化聚合不同,窗口化聚合返回一个窗口化的 KTable,其键 type 为 。这是为了区分来自不同窗口的具有相同键的聚合值。这 相应的 window 实例和嵌入的 key 可以作为 和 检索。 分别。Windowed<K>Windowed#window()Windowed#key()

翻转时间窗口

滚动时间窗口是跳跃时间窗口的一种特殊情况,与后者一样,是基于时间的窗口 间隔。它们对固定大小、不重叠、无间隙的窗口进行建模。 翻转窗口由单个属性定义:窗口的大小。 翻转窗口是窗口大小等于其提前间隔的跳跃窗口。 由于翻转窗口永远不会重叠,因此数据记录将属于一个且仅属于一个窗口。

此图显示了使用滚动窗口对数据记录流进行窗口化。窗口不重叠,因为 定义,则提前间隔与窗口大小相同。在此图中,时间数字表示分钟; 例如,t=5 表示“在 5 分钟标记处”。实际上,Kafka Streams 中的时间单位是毫秒,这意味着 时间数字需要乘以 60 * 1,000 才能从分钟转换为毫秒(例如 t=5 将 变为 t=300,000)。

滚动时间窗口与 epoch 对齐,包括区间下限和上限 排他性。“与纪元对齐”意味着第一个窗口从时间戳 0 开始。例如,翻滚 大小为 5000ms 的窗口具有可预测的窗口边界 — 而不是,甚至不是像 .[0;5000),[5000;10000),...[1000;6000),[6000;11000),...[1452;6452),[6452;11452),...

以下代码定义了一个大小为 5 分钟的滚动窗口:

import java.time.Duration;
import org.apache.kafka.streams.kstream.TimeWindows;

// A tumbling time window with a size of 5 minutes (and, by definition, an implicit
// advance interval of 5 minutes), and grace period of 1 minute.
Duration windowSize = Duration.ofMinutes(5);
Duration gracePeriod = Duration.ofMinutes(1);
TimeWindows.ofSizeAndGrace(windowSize, gracePeriod);

// The above is equivalent to the following code:
TimeWindows.ofSizeAndGrace(windowSize, gracePeriod).advanceBy(windowSize);
滑动时间窗口

滑动窗口实际上与跳跃和翻转的窗口完全不同。在 Kafka Streams 中,滑动窗口 用于使用 class 指定的 join 操作和使用 class 指定的窗口化聚合。JoinWindowsSlidingWindows

滑动窗口对在时间轴上连续滑动的固定大小的窗口进行建模。在此模型中,两条数据记录是 如果(在对称窗口的情况下)它们的时间戳之差为 在窗口大小内。当滑动窗口沿时间轴移动时,记录可能会落入 滑动窗口,但每个唯一的记录组合仅显示在一个滑动窗口快照中。

以下代码定义了一个时差为 10 分钟、宽限期为 30 分钟的滑动窗口:

import org.apache.kafka.streams.kstream.SlidingWindows;

// A sliding time window with a time difference of 10 minutes and grace period of 30 minutes
Duration timeDifference = Duration.ofMinutes(10);
Duration gracePeriod = Duration.ofMinutes(30);
SlidingWindows.ofTimeDifferenceAndGrace(timeDifference, gracePeriod);

此图显示了使用滑动窗口对数据记录流进行窗口化。的 滑动窗口快照因记录时间而异。在此图中,时间数字表示毫秒。例如 t=5 表示“在 5 毫秒处”。

滑动窗口与数据记录时间戳对齐,而不是与 epoch 对齐。与跳跃和翻滚的窗户相比, 滑动窗口的窗口下限和上限时间间隔边界均包括在内。

会话窗口

会话窗口用于将基于键的事件聚合到所谓的会话中,其过程称为 作为会话化。会话表示由定义的不活动间隙(或 “闲暇”)。任何已处理的事件如果落在任何现有会话的不活动间隙内,都会合并到 现有会话。如果事件超出会话间隙,则将创建一个新会话。

会话窗口与其他窗口类型的区别在于:

  • 所有窗口都跨键独立跟踪 - 例如,不同键的窗口通常具有不同的 Start 和结束时间
  • 它们的窗口大小大小各不相同 - 即使是同一键的窗口通常也具有不同的大小

会话窗口的主要应用领域是用户行为分析。基于会话的分析范围包括 从简单的量度(例如新闻网站或社交平台上的用户访问次数)到更复杂的量度(例如顾客 转化漏斗和事件流)。

以下代码定义了一个不活动间隔为 5 分钟的会话窗口:

import java.time.Duration;
import org.apache.kafka.streams.kstream.SessionWindows;

// A session window with an inactivity gap of 5 minutes.
SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMinutes(5));

给定前面的会话窗口示例,以下是对 6 条记录的输入流会发生什么情况。 当前三条记录到达时(下图的上半部分),我们将有三个会话(请参阅下半部分) 处理完这些记录后:绿色记录键为 2,1 个会话在 0 分钟标记(仅由于插图看起来会话从 0 变为 1),另一个开始和 在 6 分钟处结束;以及一个用于蓝色记录键的会话,从 2 分钟标记开始和结束。

在收到三条输入记录后检测到的会话:两条绿色记录键的 t=0 和 t=6 记录,以及 t=2 时蓝色记录键的一条记录。 在此图中,时间数字表示分钟;例如,t=5 表示“在 5 分钟标记处”。实际上,该单位 Kafka Streams 中的时间是毫秒,这意味着时间数字需要乘以 60 * 1000 到 从分钟转换为毫秒(例如 t=5 将变为 t=300,000)。

如果我们随后收到 3 个额外的记录(包括 2 个乱序记录),则 2 个 绿色记录键的现有会话将合并为一个会话,从时间 0 开始,到时间 6 结束。 由总共三条记录组成。蓝色记录键的现有会话将延长至在时间 5 结束。 由总共两条记录组成。最后,蓝色键将有一个新会话,开始和结束于 时间 11.

在收到 6 条输入记录后检测到的会话。请注意 t=4(绿色)和 t=5(蓝色),分别导致会话合并和会话扩展。

Window 最终结果

在 Kafka Streams 中,窗口计算会不断更新其结果。 当窗口的新数据到达时,新计算的结果会向下游发出。 对于许多应用来说,这是理想的选择,因为总是有新的结果。 Kafka Streams 旨在实现无缝编程连续计算。 但是,某些应用程序只需要对窗口计算的最终结果执行操作。 这方面的常见示例是发送警报或将结果传送到不支持更新的系统。

假设您每个用户每小时有一次窗口化事件计数。 如果您想在用户一小时内的事件少于 3 个时发送警报,那么您将面临真正的挑战。 所有用户一开始都会匹配此条件,直到他们累积了足够的事件,因此您不能简单地 当有人符合条件时发送警报;您必须等待,直到您知道不会再看到特定窗口的任何事件 ,然后发送警报。

Kafka Streams 提供了一种定义此逻辑的简洁方法:定义窗口计算后,您可以抑制中间结果,在窗口关闭时发出每个用户的最终计数。

例如:

KGroupedStream<UserId, Event> grouped = ...;
grouped
    .windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofHours(1), Duration.ofMinutes(10)))
    .count()
    .suppress(Suppressed.untilWindowCloses(unbounded()))
    .filter((windowedUserId, count) -> count < 3)
    .toStream()
    .foreach((windowedUserId, count) -> sendAlert(windowedUserId.window(), windowedUserId.key(), count));

该程序的关键部分是:

ofSizeAndGrace(Duration.ofHours(1), Duration.ofMinutes(10))
指定的 10 分钟宽限期(即参数)允许我们限制窗口将接受的事件的延迟。 例如,09:00 到 10:00 窗口将接受无序记录,直到 10:10,此时窗口将关闭Duration.ofMinutes(10)
.suppress(Suppressed.untilWindowCloses(...))
这会将 suppression 运算符配置为在窗口关闭之前不为窗口发出任何内容,然后发出最终结果。 例如,如果用户在 09:00 到 10:10 之间收到 10 个事件,则抑制的下游 在 10:10 之前不会获取窗口化键的任何事件,然后它将恰好获得值为 . 这是窗口计数的最终结果。UfilterU@09:00-10:0010
unbounded()
这将配置用于存储事件的缓冲区 直到他们的窗户关上。 生产代码能够对数量设置上限 用于缓冲区的内存, 但是这个简单的示例创建了一个没有 上界。

需要注意的一点是,抑制就像任何其他 Kafka Streams Operator 的 Topology 中,因此您可以构建一个包含两个 从 、 中出现的分支 一个被抑制,一个没有,甚至多个不同 配置的抑制。 这允许在需要的地方应用抑制 ,否则依赖于默认的持续更新行为。count

有关更多详细信息,请参阅 config 对象上的 JavaDoc 和 KIP-328Suppressed

应用处理器和转换器(处理器 API 集成)

除了上述无状态有状态转换之外,您还可以 利用 DSL 中的处理器 API。 在许多情况下,这可能会有所帮助:

  • 定制:您需要实现 DSL 中尚未或尚不可用的特殊自定义逻辑。
  • 在需要的地方将易用性与充分的灵活性相结合:即使您通常更喜欢使用 DSL 的表现力,您的处理中有一些步骤需要更大的灵活性和 修补比 DSL 提供的要多。例如,只有处理器 API 提供对 记录的元数据,例如其主题、分区和偏移量信息。 但是,您不希望仅仅因为这个原因就完全切换到 Processor API。
  • 从其他工具迁移:您正在从其他提供 命令式 API,并且将一些遗留代码迁移到处理器 API 比 立即完全迁移到 DSL。
转型 描述

过程

  • KStream -> 虚空

终端操作。将 a 应用于每条记录。 允许您从 DSL 利用处理器 API。 (详情Processorprocess())

这基本上等同于将 via 添加到您的处理器拓扑中。ProcessorTopology#addProcessor()

javadocs 中提供了一个示例。

变换

  • KStream -> KStream

将 a 应用于每条记录。 允许您从 DSL 利用处理器 API。 (详情Transformertransform())

每个输入记录都转换为零条、一条或多条输出记录(类似于 stateless )。 对于零输出,必须返回。 您可以修改记录的键和值,包括它们的类型。flatMapTransformernull

标记流以进行数据重新分区:之后应用分组或联接将导致记录重新分区。 如果可能,请改用,这不会导致数据重新分区。transformtransformValues

transform本质上等同于将 via 添加到您的处理器拓扑中。TransformerTopology#addProcessor()

javadocs 中提供了一个示例。

Transform (仅值)

  • KStream -> KStream
  • KTable -> KTable

将 a 应用于每条记录,同时保留原始记录的键。 允许您从 DSL 利用处理器 API。 (详情ValueTransformertransformValues())

每个输入记录都转换为一个输出记录(不能零个输出记录或多个输出记录)。 可能会作为记录的新值返回。ValueTransformernull

transformValues更可取,因为它不会导致数据重新分区。transform

transformValues本质上等同于将 via 添加到您的处理器拓扑中。ValueTransformerTopology#addProcessor()

javadocs 中提供了一个示例。

以下示例演示如何通过该方法利用自定义,该自定义将 每当页面查看计数达到预定义的阈值时发送电子邮件通知。KStream#process()Processor

首先,我们需要实现一个自定义流处理器 ,它实现以下接口:PopularPageEmailAlertProcessor

// A processor that sends an alert message about a popular page to a configurable email address
public class PopularPageEmailAlert implements Processor<PageId, Long, Void, Void> {

  private final String emailAddress;
  private ProcessorContext<Void, Void> context;

  public PopularPageEmailAlert(String emailAddress) {
    this.emailAddress = emailAddress;
  }

  @Override
  public void init(ProcessorContext<Void, Void> context) {
    this.context = context;

    // Here you would perform any additional initializations such as setting up an email client.
  }

  @Override
  void process(Record<PageId, Long> record) {
    // Here you would format and send the alert email.
    //
    // In this specific example, you would be able to include
    // information about the page's ID and its view count
  }

  @Override
  void close() {
    // Any code for clean up would go here, for example tearing down the email client and anything
    // else you created in the init() method
    // This processor instance will not be used again after this call.
  }

}

提示

尽管我们在此示例中没有演示它,但流处理器可以通过以下方式访问任何可用的状态存储 叫。 仅当状态存储已连接到处理器,或者它们是全局存储时,它们才可用。虽然全局存储不需要显式连接,但它们只允许只读访问。 有两种方法可以将状态存储连接到处理器:ProcessorContext#getStateStore()

  • 通过将已添加的 store 的名称传递给相应的方法调用。Topology#addStateStore()KStream#process()
  • 在传递给 .在这种情况下,无需事先致电,商店将自动为您添加。您还可以在 或 supplier 变体上实施,或者 / 或其任何变体。ConnectedStoreProvider#stores()ProcessorSupplierKStream#process()StreamsBuilder#addStateStore()ConnectedStoreProvider#stores()Value**WithKeyTransformerSupplier

然后我们可以通过 来利用 DSL 中的处理器。PopularPageEmailAlertKStream#process

KStream<String, GenericRecord> pageViews = ...;

// Send an email notification when the view count of a page reaches one thousand.
pageViews.groupByKey()
         .count()
         .filter((PageId pageId, Long viewCount) -> viewCount == 1000)
         // PopularPageEmailAlert is your custom processor that implements the
         // `Processor` interface, see further down below.
         .process(() -> new PopularPageEmailAlert("alerts@yourcompany.com"));

Streams DSL 应用程序中的命名运算符

Kafka Streams 允许您命名通过 Streams DSL 创建的处理器

控制 KTable 的发射速率

KTable 在逻辑上是一个持续更新的表。 只要有新数据可用,这些更新就会发送给下游运营商,确保整个计算尽可能新鲜。 从逻辑上讲,大多数程序描述的是一系列转换,更新率不是程序行为的一个因素。 在这些情况下,更新速率更像是一个性能问题。 操作员可以通过调整来优化网络流量(到 Kafka 代理)和磁盘流量(到本地状态存储) commit interval 和 batch size 配置。

但是,某些应用程序需要执行其他操作,例如调用外部系统、 因此需要对调用速率进行一些控制,例如 .KStream#foreach

与其作为 KTable 记录缓存的副作用来实现这一点, 您可以通过 Operator 直接施加速率限制。KTable#suppress

例如:

KGroupedTable<String, String> groupedTable = ...;
groupedTable
    .count()
    .suppress(untilTimeLimit(ofMinutes(5), maxBytes(1_000_000L).emitEarlyWhenFull()))
    .toStream()
    .foreach((key, count) -> updateCountsDatabase(key, count));