处理器 API

处理器 API 允许开发人员定义和连接自定义处理器,并与状态存储进行交互。使用 处理器 API,您可以定义任意流处理器,一次处理一条接收到的记录,并将这些 processors 及其关联的状态存储,以组成表示自定义处理的处理器拓扑 逻辑。

概述

处理器 API 可用于实现无状态有状态操作,其中后者是 通过使用 State Store 实现。

提示

结合 DSL 和处理器 API:您可以将 DSL 的便利性与处理器 API 的强大功能和灵活性相结合,如 部分 应用处理器和转换器 (处理器 API 集成)。

有关可用 API 功能的完整列表,请参阅 Streams API 文档。

定义流处理器

流处理器是处理器拓扑中的一个节点,表示单个处理步骤。 使用处理器 API,您可以定义一次处理一条接收记录的任意流处理器,然后连接 这些处理器及其关联的状态存储来组成处理器拓扑。

您可以通过实现提供 API 方法的接口来定义自定义流处理器。 该方法在每条收到的记录上调用。Processorprocess()process()

该接口还有一个方法,在 task 构造期间由 Kafka Streams 库调用 阶段。处理器实例应在此方法中执行任何所需的初始化。该方法传入一个实例,该实例提供对当前处理记录的元数据的访问,包括其源 Kafka 主题和分区。 其相应的 message offset 以及进一步的此类信息。您还可以使用此上下文实例来安排标点符号 函数 (通过 ),将新记录作为键值对转发到下游处理器 (通过 ), 并提交当前处理进度 (通过 )。 您可以在该方法中清理您设置的任何资源。请注意,Kafka Streams 可以通过在 之后再次调用单个对象来重用该对象。Processorinit()init()ProcessorContextProcessorContext#schedule()ProcessorContext#forward()ProcessorContext#commit()init()close()Processorinit()close()

该接口采用两组通用参数: .这些定义输入和输出类型 处理器实现可以处理。 并定义将传递的键和值类型 自。 同样,并定义将接受的转发的 key 和 value 类型。如果您的处理器根本不转发任何记录(或者如果它只转发键或值), 最佳做法是将 output 泛型类型参数设置为 . 如果它需要转发多个不共享公共超类的类型,你将 必须将 output 泛型类型参数设置为 .ProcessorKIn, VIn, KOut, VOutKInVInprocess()KOutVOutProcessorContext#forward()nullVoidObject

和 the 方法都以数据类的形式处理记录。此类允许您访问 Kafka 记录的主要组件: 键、值、时间戳和标头。转发记录时,您可以使用 构造函数从头开始创建一个新的,或者您可以使用 Convenience Builder 方法替换 的 的 属性之一 并复制其余部分。例如,将从 while 将 Output Record 的值设置为 . 请注意,这不会 mutize 。 而是创建一个浅层副本。请注意,这只是一个浅层副本,因此,如果您 计划更改程序中其他位置的 key、value 或 headers,您将需要 自己创建这些字段的深层副本。Processor#process()ProcessorContext#forward()Record<K, V>RecordRecordinputRecord.withValue(newValue)inputRecordnewValueinputRecord

除了通过 处理传入记录外,还通过 您可以选择安排定期调用(称为“标点符号”) 通过调用并传递一个 . 这决定了使用什么时间概念 对于标点符号计划:stream-time 或 wall-clock-time(默认情况下,stream-time 配置为通过 表示事件时间)。当使用 stream-time 时,纯粹触发 by data,因为 stream-time 是由从 Importing 数据派生的时间戳确定 (并向前推进) 的。当 没有新的输入数据到达,则 stream-time 不是高级的,因此不会被调用。Processor#process()init()ProcessorContext#schedule()PunctuatorPunctuationTypeTimestampExtractorpunctuate()punctuate()

例如,如果您根据 处理 60 条记录的流,其连续时间戳从 1(第一条记录)到 60 秒(最后一条记录), 则被调用 6 次。无论实际处理这些记录所需的时间如何,都会发生这种情况。 将调用 6 次,无论处理这 60 条记录需要一秒、一分钟还是一小时。PunctuatorPunctuationType.STREAM_TIMEpunctuate()punctuate()

当使用 wall-clock-time (即 ) 时,完全由挂钟时间触发。 重用上面的示例,如果函数是根据 进行调度的,并且如果这些 在 20 秒内处理了 60 条记录,则调用 2 次(每 10 秒 1 次)。如果这 60 条记录 在 5 秒内处理完,则根本不调用 no。请注意,您可以通过调用多个 Times Inside 方法。PunctuationType.WALL_CLOCK_TIMEpunctuate()PunctuatorPunctuationType.WALL_CLOCK_TIMEpunctuate()punctuate()PunctuatorPunctuationTypeProcessorContext#schedule()init()

注意力

仅当 Streams 处理记录时,流时间才会提前。 没有要处理的记录,或者 Streams 正在等待新记录 由于 Task Idling 配置,如果指定了流时间,则不会提前,也不会触发流时间。 此行为与配置的时间戳提取器无关,即,使用不会启用挂钟触发。punctuate()PunctuationType.STREAM_TIMEWallclockTimestampExtractorpunctuate()

以下示例定义了一个简单的字数统计算法,并执行以下操作:Processor

  • 在该方法中,安排每 1000 个时间单位使用一次标点符号(时间单位通常为毫秒,在本例中为每 1 秒转换为每 1 秒使用一次标点符号),并按其名称 “Counts” 检索本地状态存储。init()
  • 在该方法中,在每条收到的记录上,将值字符串拆分为单词,并将其计数更新到状态存储中(我们将在本节后面讨论)。process()
  • 在该方法中,迭代本地 state store 并将聚合计数发送到下游处理器(我们将在本节后面讨论下游处理器),并提交当前 stream 状态。punctuate()
public class WordCountProcessor implements Processor<String, String, String, String> {
    private KeyValueStore<String, Integer> kvStore;

    @Override
    public void init(final ProcessorContext<String, String> context) {
        context.schedule(Duration.ofSeconds(1), PunctuationType.STREAM_TIME, timestamp -> {
            try (final KeyValueIterator<String, Integer> iter = kvStore.all()) {
                while (iter.hasNext()) {
                    final KeyValue<String, Integer> entry = iter.next();
                    context.forward(new Record<>(entry.key, entry.value.toString(), timestamp));
                }
            }
        });
        kvStore = context.getStateStore("Counts");
    }

    @Override
    public void process(final Record<String, String> record) {
        final String[] words = record.value().toLowerCase(Locale.getDefault()).split("\\W+");

        for (final String word : words) {
            final Integer oldValue = kvStore.get(word);

            if (oldValue == null) {
                kvStore.put(word, 1);
            } else {
                kvStore.put(word, oldValue + 1);
            }
        }
    }

    @Override
    public void close() {
        // close any resources managed by this processor
        // Note: Do not close any StateStores as these are managed by the library
    }
}

注意

使用状态存储进行有状态处理:上面定义的可以访问其方法中当前接收到的记录,并且可以 利用状态存储来维护处理状态,例如,记住最近 到达记录以满足有状态处理需求,例如聚合和联接。有关更多信息,请参阅 State Stores 文档。WordCountProcessorprocess()

单元测试处理器

Kafka Streams 附带一个模块,可帮助您为 处理器test-utils

状态存储

要实现有状态 or ,必须向处理器提供一个或多个状态存储 或 transformer (无状态处理器或 transformer 不需要 state 存储)。State store 可用于记住 最近收到的输入记录、跟踪滚动聚合、删除重复的输入记录等。 状态存储的另一个特性是,它们可以从其他应用程序例如 基于 NodeJS 的控制面板或以 Scala 或 Go 实现的微服务。ProcessorTransformer

默认情况下,Kafka Streams 中可用的状态存储类型已启用容错能力。

定义和创建 State Store

您可以使用其中一种可用的商店类型,也可以实现自己的自定义商店类型。 通常的做法是通过工厂利用现有的 store 类型。Stores

请注意,在使用 Kafka Streams 时,您通常不会直接在代码中创建或实例化状态存储。 相反,您可以通过创建所谓的 .此构建器由 Kafka Streams 作为工厂,用于何时何地在应用程序实例中本地实例化实际状态存储 需要。StoreBuilder

以下商店类型是开箱即用的。

商店类型 存储引擎 容错? 描述
持续KeyValueStore<K, V> RocksDB 数据库 是(默认启用)
  • 大多数使用案例的推荐存储类型。
  • 将其数据存储在本地磁盘上。
  • 存储容量: 托管本地状态可以大于 Application 实例,但必须适合可用的本地磁盘 空间。
  • RocksDB 设置可以微调,参见 RocksDB 配置
  • 可用的商店变体: 带时间戳的键值存储、版本控制的键值存储、时间窗口键值存储、会话窗口键值存储。
  • 当您需要支持 put/get/delete 和 range 查询的持久性键(值/时间戳)存储时,请使用 persistentTimestampedKeyValueStore
  • 当您需要支持 put/get/delete 和带时间戳的 get 操作的持久、版本控制的键(值/时间戳)存储时,请使用 persistentVersionedKeyValueStore
  • 当您需要持久性 timeWindowedKey-value 或 timeWindowedKey-(value/timestamp) 存储时,请分别使用 persistentWindowStorepersistentTimestampedWindowStore
  • 当您需要持久 sessionWindowedKey 值存储时,请使用 persistentSessionStore
// Creating a persistent key-value store:
// here, we create a `KeyValueStore<String, Long>` named "persistent-counts".
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;

// Using a `KeyValueStoreBuilder` to build a `KeyValueStore`.
StoreBuilder<KeyValueStore<String, Long>> countStoreSupplier =
  Stores.keyValueStoreBuilder(
    Stores.persistentKeyValueStore("persistent-counts"),
    Serdes.String(),
    Serdes.Long());
KeyValueStore<String, Long> countStore = countStoreSupplier.build();
内存中KeyValueStore<K, V> - 是(默认启用)
  • 将其数据存储在内存中。
  • 存储容量: 托管本地状态必须适合 application 实例。
  • 当应用程序实例在本地 磁盘空间不可用或本地磁盘空间被擦除 中间应用程序实例重新启动。
  • 可用的商店变体: 时间窗口键值存储、会话窗口键值存储。
  • 当您需要支持 put/get/delete 和 range 查询的 key-(value/timestamp) 存储时,请使用 TimestampedKeyValueStore
  • 当您需要存储 windowedKey-(value/timestamp) 对时,请使用 TimestampedWindowStore
  • 目前没有内置的内存中版本控制的键值存储。
// Creating an in-memory key-value store:
// here, we create a `KeyValueStore<String, Long>` named "inmemory-counts".
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;

// Using a `KeyValueStoreBuilder` to build a `KeyValueStore`.
StoreBuilder<KeyValueStore<String, Long>> countStoreSupplier =
  Stores.keyValueStoreBuilder(
    Stores.inMemoryKeyValueStore("inmemory-counts"),
    Serdes.String(),
    Serdes.Long());
KeyValueStore<String, Long> countStore = countStoreSupplier.build();

容错状态存储

要使状态存储具有容错能力并允许在不丢失数据的情况下进行状态存储迁移,状态存储可以是 在后台持续备份到 Kafka 主题。例如,要将有状态流任务从一个 在应用程序中弹性添加或删除容量时,将 machine 移动到另一个 machine 中。 本主题有时称为 state store 的关联更改日志主题或其更改日志。例如,如果 您遇到机器故障时,状态存储和应用程序的状态可以从其更改日志中完全恢复。您可以启用或禁用此备份功能 state store 中。

容错状态存储由压缩的 changelog 主题提供支持。压缩此 topic 是为了防止 topic 无限增长,减少关联 Kafka 集群中消耗的存储空间, 如果需要从其 changelog 主题中恢复状态存储,则最大限度地减少恢复时间。

容错窗口状态存储由一个同时使用压缩和 删除。由于发送到 changelog 主题的消息键的结构,这种 Window 存储的 changelog 主题需要 deletion and compaction。对于窗口存储,消息键是 包含 “normal” 键和窗口时间戳的复合键。对于这些类型的组合键,它不会 只需启用 compaction 以防止 changelog 主题超出范围就足够了。删除 启用后,已过期的旧窗口将在日志段过期时由 Kafka 的日志清理程序清理。这 默认保留设置为 + 1 天。您可以通过在 .Windows#maintainMs()StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIGStreamsConfig

当您从状态存储中打开 an 时,您必须在完成使用 它是为了回收资源;或者,您可以从 try-with-resources 语句中使用迭代器。如果不关闭迭代器,则 您可能会遇到 OOM 错误。Iteratorclose()

启用或禁用状态存储的容错能力 (Store Changelogs)

您可以通过启用或禁用更改日志记录来启用或禁用状态存储的容错能力 的商店通过 和 。 如果需要,您还可以微调关联主题的配置。enableLogging()disableLogging()

禁用 fault-tolerance 的示例:

import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;

StoreBuilder<KeyValueStore<String, Long>> countStoreSupplier = Stores.keyValueStoreBuilder(
  Stores.persistentKeyValueStore("Counts"),
    Serdes.String(),
    Serdes.Long())
  .withLoggingDisabled(); // disable backing up the store to a changelog topic

注意力

如果禁用更改日志,则附加的状态存储不再具有容错能力,并且它不能有任何备用副本

以下是启用 Fault Tolerance 的示例,其中包含额外的 changelog-topic 配置: 您可以从 kafka.log.LogConfig 添加任何日志配置。 无法识别的配置将被忽略。

import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;

Map<String, String> changelogConfig = new HashMap();
// override min.insync.replicas
changelogConfig.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "1")

StoreBuilder<KeyValueStore<String, Long>> countStoreSupplier = Stores.keyValueStoreBuilder(
  Stores.persistentKeyValueStore("Counts"),
    Serdes.String(),
    Serdes.Long())
  .withLoggingEnabled(changelogConfig); // enable changelogging, with custom changelog settings

带时间戳的状态存储

默认情况下,KTables 始终存储时间戳。 带时间戳的状态存储改进了流处理语义,并支持 处理源 KTables 中的无序数据,检测无序连接和聚合, 以及获取 Interactive Query 中最新更新的时间戳。

您可以查询带时间戳和不带时间戳的 state store。

升级说明:所有用户都通过每个实例的一次滚动退回进行升级。
  • 对于处理器 API 用户,现有应用程序不会发生任何变化,您 可以选择使用带时间戳的存储。
  • 对于 DSL 算子,store data 在后台懒惰地升级。
  • 如果您提供自定义 XxxBytesStoreSupplier,则不会进行升级,但您可以选择加入 通过实现 TimestampedBytesStore 接口。在这种情况下,将保留旧格式,并且 Streams 使用代理存储 这会在读/写时删除/添加时间戳。

版本化的键值状态存储

版本控制的键值状态存储从 Kafka Streams 3.5 开始可用。 不是为每个键存储单个记录版本(值和时间戳),不如 版本化状态存储可以为每个键存储多个记录版本。这 允许版本化状态存储支持带时间戳的检索操作 返回截至指定时间戳的最新记录 (每个键)。

您可以通过将 VersionedBytesStoreSupplier 传递给 versionedKeyValueStoreBuilder 来创建持久的版本化状态存储。 或者通过实现您自己的 VersionedKeyValueStore

每个版本控制存储都有一个关联的固定持续时间历史记录保留参数,该参数指定应保留较长的旧记录版本。 具体而言,版本控制存储保证返回 时间戳检索操作,其中查询的时间戳在 当前观察到的流时间的历史记录保留。

历史记录保留期也是其宽限期的两倍,该宽限期确定 将接受多长时间前对 store 的无序写入。一个 在以下情况下,版本控制存储将不接受写入(插入、更新或删除) 与写入关联的时间戳早于当前观察到的时间戳 流时间超过宽限期。在这种情况时间为 按分区而不是按键跟踪,这意味着这很重要 宽限期(即历史记录保留期)设置得足够高,以便 容纳一个 key 相对于 record 获取另一个键。

因为版本控制的键值存储的内存占用量高于 对于非版本控制的键值存储,您可能需要相应地调整 RocksDB 内存设置。使用版本控制的 store 对应用程序进行基准测试也是 建议这样做,因为预计性能会比使用非版本控制时差 商店。

版本控制存储目前不支持缓存或交互式查询。 此外,窗口存储和全局表可能未进行版本控制。

升级说明:版本控制状态存储仅是可选的;无自动 将进行从未版本控制的存储区升级到受版本控制的存储区。

支持从持久的、非版本控制的键值存储进行升级 到持久的、版本控制的键值存储中,只要原始存储 与正在升级的版本控制存储具有相同的 changelog 主题格式 自。持久化键值存储和带时间戳的键值存储持久化的键值存储共享相同的 changelog 主题格式。 因此两者都有资格获得升舱。

如果您希望使用持久、非版本控制 键值存储 (key-value stores) 使用持久的、版本控制的键值存储 相反,您可以执行以下过程:

  • 停止所有应用程序实例,并清除正在升级的 store 的所有本地状态目录
  • 更新您的应用程序代码以在需要时使用版本控制的存储。
  • 更新相关 state store 的 changelog 主题配置, 将 的值设置为至少所需的历史记录保留期。历史记录保留 plus 建议使用 1 天作为经纪人挂钟时间的缓冲 在压缩期间。min.compaction.lag.ms
  • 重新启动应用程序实例,并为版本化的 stores 来重建 changelog 中的状态。

ReadOnly 状态存储

只读状态存储从其输入主题具体化数据。它还使用输入主题 for fault-tolerance,因此没有额外的 changelog 主题(输入主题是 重新用作 changelog)。因此,输入主题应配置日志压缩。 请注意,任何其他处理器都不应修改 state store 的内容,并且唯一的 writer 应该是关联的 “state update processor”;其他处理者可能会读取 read-only store 的 read-only store 中。

注意:在 加工。您可能希望确保原始 changelog 主题与处理器共分区 读取只读 StateStore。

实现自定义状态存储

您可以使用内置的 state store 类型或实现自己的 state store 类型。 要为 store 实现的主要接口是 。Kafka Streams 也有一些扩展接口,例如 as 和 .org.apache.kafka.streams.processor.StateStoreKeyValueStoreVersionedKeyValueStore

请注意,您的自定义实现还需要提供有关如何恢复状态的逻辑 通过 OR 接口。 有关如何实例化这些接口的详细信息,请参阅 javadocsorg.apache.kafka.streams.processor.StateStoreorg.apache.kafka.streams.processor.StateRestoreCallbackorg.apache.kafka.streams.processor.BatchingStateRestoreCallback

您还需要通过实现接口为存储提供 “构建器”,Kafka Streams 使用该接口创建 您的商店。org.apache.kafka.streams.state.StoreBuilder

访问处理器上下文

正如我们在 定义流处理器 一节中提到的,a 控制处理工作流,例如调度标点符号函数和提交当前已处理的状态。ProcessorContext

此对象还可用于访问与应用程序相关的元数据,如 、 、 和 ,并将相关元数据记录为 、 、 和 。applicationIdtaskIdstateDirtopicpartitionoffsettimestampheaders

以下是如何向记录添加新标头的示例实现:

public void process(String key, String value) {

    // add a header to the elements
    context().headers().add.("key", "value");
}

连接处理器和状态存储

现在,处理器 (WordCountProcessor) 和 状态存储,您可以通过将这些处理器和状态存储连接在一起来构建处理器拓扑 使用实例。此外,您还可以添加具有指定 Kafka 主题的源处理器 将输入数据流生成到拓扑中,并使用具有指定 Kafka 主题的 sink 处理器生成 从拓扑中输出数据流。Topology

这是一个示例实现:

Topology builder = new Topology();
// add the source processor node that takes Kafka topic "source-topic" as input
builder.addSource("Source", "source-topic")
    // add the WordCountProcessor node which takes the source processor as its upstream processor
    .addProcessor("Process", () -> new WordCountProcessor(), "Source")
    // add the count store associated with the WordCountProcessor processor
    .addStateStore(countStoreBuilder, "Process")
    // add the sink processor node that takes Kafka topic "sink-topic" as output
    // and the WordCountProcessor node as its upstream processor
    .addSink("Sink", "sink-topic", "Process");

以下是此示例的快速说明:

  • 使用该方法将名为 的源处理器节点添加到拓扑中,并向其馈送一个 Kafka 主题。"Source"addSource"source-topic"
  • 然后,将以预定义 logic 命名的处理器节点添加为下游 processor 的节点。"Process"WordCountProcessor"Source"addProcessor
  • 使用 创建预定义的持久性键值状态存储并将其与节点关联。"Process"countStoreBuilder
  • 然后添加一个 sink processor 节点以使用该方法完成拓扑,采用该节点 作为其上游处理器并写入单独的 Kafka 主题(请注意,用户还可以使用另一个重载变体来动态确定从上游处理器接收的每个记录要写入的 Kafka 主题)。addSink"Process""sink-topic"addSink

在某些情况下,在将处理器添加到拓扑的同时添加和连接状态存储可能更方便。 这可以通过在 上实现而不是调用 来完成,如下所示:ConnectedStoreProvider#stores()ProcessorSupplierTopology#addStateStore()

Topology builder = new Topology();
// add the source processor node that takes Kafka "source-topic" as input
builder.addSource("Source", "source-topic")
    // add the WordCountProcessor node which takes the source processor as its upstream processor.
    // the ProcessorSupplier provides the count store associated with the WordCountProcessor
    .addProcessor("Process", new ProcessorSupplier<String, String, String, String>() {
        public Processor<String, String, String, String> get() {
            return new WordCountProcessor();
        }

        public Set<StoreBuilder<?>> stores() {
            final StoreBuilder<KeyValueStore<String, Long>> countsStoreBuilder =
                Stores
                    .keyValueStoreBuilder(
                        Stores.persistentKeyValueStore("Counts"),
                        Serdes.String(),
                        Serdes.Long()
                    );
            return Collections.singleton(countsStoreBuilder);
        }
    }, "Source")
    // add the sink processor node that takes Kafka topic "sink-topic" as output
    // and the WordCountProcessor node as its upstream processor
    .addSink("Sink", "sink-topic", "Process");

这允许处理器 “拥有” 状态存储,有效地封装了用户对 topology 的使用情况。 共享状态存储的多个处理器可以使用此技术提供相同的存储,只要 相同。StoreBuilderinstance

在这些拓扑中,流处理器节点被视为节点的下游处理器,而 upstream 处理器。因此,每当节点将新获取的记录从 Kafka 添加到其下游节点,则触发该方法以处理记录和 更新关联的 State Store。每当在方法中调用时,聚合键值对将通过处理器节点发送到 Kafka 主题 。注意,在实现中,必须参考 访问键值 store 时,会同名,否则运行时会抛出异常, 表示找不到 State Store。如果状态存储未与处理器关联 在代码中,在处理器的方法中访问它也会在 runtime,指示无法从此处理器访问状态存储。"Process""Source""Sink""Source""Process"WordCountProcessor#process()context#forward()WordCountProcessor#punctuate()"Sink""sink-topic"WordCountProcessor"Counts"Topologyinit()

请注意,该函数采用 as 参数,并且 supplier 模式要求每次调用时都返回一个新实例。创建单个对象并返回相同的对象引用将违反供应商模式,并导致运行时异常。 因此,请记住不要向 .每次调用时,都应该总是生成一个新实例。Topology#addProcessorProcessorSupplierProcessorProcessorSupplier#get()ProcessorProcessorSupplier#get()ProcessorTopologyProcessorSupplierProcessorSupplier#get()

现在,您已经在应用程序中完全定义了处理器拓扑,您可以继续运行 Kafka Streams 应用程序