配置 Streams 应用程序

在使用 Streams 之前,必须配置 Kafka 和 Kafka Streams 配置选项。您可以通过在实例中指定参数来配置 Kafka Streams。java.util.Properties

  1. 创建实例。java.util.Properties

  2. 设置参数。例如:

    import java.util.Properties;
    import org.apache.kafka.streams.StreamsConfig;
    
    Properties settings = new Properties();
    // Set a few key parameters
    settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-first-streams-application");
    settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092");
    // Any further settings
    settings.put(... , ...);

配置参数参考

本节包含最常见的 Streams 配置参数。有关完整参考,请参阅 Streams Javadocs。

必需的配置参数

以下是必需的 Streams 配置参数。

参数名称 重要性 描述 默认值
application.id 必填 流处理应用程序的标识符。在 Kafka 集群中必须是唯一的。 没有
引导程序.服务器 必填 用于建立与 Kafka 集群的初始连接的主机/端口对列表。 没有

application.id

(必填)应用程序 ID。每个流处理应用程序都必须具有唯一的 ID。必须将相同的 ID 提供给 应用程序的所有实例。建议仅使用字母数字字符 (dot)、(连字符) 和 (下划线)。例子:.-_"hello_world""hello_world-v1.0.0"

此 ID 用于以下位置,以将应用程序使用的资源与其他资源隔离开来:

  • 作为默认的 Kafka 使用者和生成者前缀client.id
  • 作为协调的 Kafka 使用者group.id
  • 作为 state 目录中的子目录的名称 (cf.state.dir)
  • 作为内部 Kafka 主题名称的前缀
提示:
当应用程序更新时,除非您想重用内部主题和状态存储中的现有数据,否则应该更改 。 例如,您可以将版本信息嵌入到 、 as 和 中。application.idapplication.idmy-app-v1.0.0my-app-v1.0.2

引导程序.服务器

(必填)Kafka 引导服务器。这与底层生产者和使用者客户端用于连接到 Kafka 集群的设置相同。 例:。"kafka-broker1:9092,kafka-broker2:9092"

可选配置参数

以下是可选的 Streams javadocs,按重要性级别排序:

  • 高:这些参数可能会对性能产生重大影响。在决定这些参数的值时要小心。
  • Medium(中):这些参数可能会对性能产生一些影响。您的特定环境将决定应将多少优化工作集中在这些参数上。
  • Low(低):这些参数对性能的影响不太一般或不太显著。
.
参数名称 重要性 描述 默认值
可接受.recovery.lag 中等 将实例视为已捕获并准备好执行活动任务的最大可接受滞后 (要赶上的偏移量数)。 10000
应用程序 .server 一个 host:port 对,指向嵌入式用户定义的端点,可用于发现 状态存储。对于每个实例,this 的值必须不同 的应用程序。 空字符串
buffered.records.per.partition 每个分区要缓冲的最大记录数。 1000
statestore.cache.max.bytes 中等 用于所有线程中的记录缓存的最大内存字节数。 10485760
cache.max.bytes.buffering (已弃用。请改用 statestore.cache.max.bytes。 中等 用于所有线程中的记录缓存的最大内存字节数。 10485760 字节
client.id 中等 发出请求时要传递给服务器的 ID 字符串。 (此设置将传递给 Kafka Streams 内部使用的使用者/创建者客户端。 空字符串
commit.interval.ms 保存任务位置(源主题中的偏移量)的频率(以毫秒为单位)。 30000 毫秒
default.deserialization.exception.handler 中等 实现接口的异常处理类。DeserializationExceptionHandler LogAndContinueExceptionHandler
default.key.serde 中等 记录键的默认序列化器/反序列化器类,实现接口。必须为 由用户设置,或者必须显式传入所有 serdes (另请参见 default.value.serde)。Serde null
default.production.exception.handler 中等 实现接口的异常处理类。ProductionExceptionHandler DefaultProductionExceptionHandler
default.timestamp.extract或 中等 实现接口的 Timestamp extractor 类。TimestampExtractor 参见 Timestamp Extractor
default.value.serde 中等 记录值的默认序列化器/反序列化器类,实现接口。必须为 由用户设置或必须显式传入所有 serdes (另请参见 default.key.serde)。Serde null
default.windowed.key.serde.inner(已弃用。请改用 windowed.inner.class.serde。 中等 窗口式键的内部类的默认序列化器/解serialize器,实现接口。Serde null
default.windowed.value.serde.inner(已弃用。请改用 windowed.inner.class.serde。 中等 窗口化值内部类的默认序列化器/反序列化器,实现接口。Serde null
default.dsl.store [已弃用]DSL 运算符使用的默认状态存储类型。已弃用 青睐dsl.store.suppliers.class ROCKS_DB
dsl.store.suppliers.class 定义任何有状态 DSL 运算符要使用的默认状态存储实现 ,该 API 的 API 调用的 API API 的 API 调用的 API必须实现 界面。org.apache.kafka.streams.state.DslStoreSuppliers BuiltInDslStoreSuppliers.RocksDBDslStoreSuppliers
max.task.idle.ms 中等

此配置控制联接和合并是否会产生无序结果。 config 值是流任务保持空闲状态的最长时间(以毫秒为单位) 当它完全赶上某些(但不是全部)输入分区时 等待生产者发送额外的记录并避免潜在的 跨多个输入流的无序记录处理。 默认值(零)不会等待生产者发送更多记录, 但它确实会等待获取 broker 上已经存在的数据。 此默认值意味着,对于 broker 上已存在的记录, Streams 将按时间戳顺序处理它们。 设置为 -1 以完全禁用空闲并处理任何本地可用的数据, 即使这样做可能会产生无序处理。

0 毫秒
max.warmup.replicas 中等 可以一次分配的预热副本(超出配置的 num.standbys 的额外备用副本)的最大数量。 2
metric.reporters 要用作指标报告器的类列表。 空列表
metrics.num.samples 为计算指标而维护的样本数。 2
metrics.recording.level 指标的最高记录级别。 INFO
metrics.sample.window.ms 计算指标样本的时间窗口(以毫秒为单位)。 30000 毫秒 (30 秒)
num.standby.replicas 每个任务的备用副本数。 0
num.stream.threads 中等 要执行流处理的线程数。 1
probing.rebalance.interval.ms 在触发再平衡以探测已充分赶上预热副本之前等待的最长时间(以毫秒为单位)。 600000 毫秒 (10 分钟)
processing.exception.handler 中等 实现接口的异常处理类。ProcessingExceptionHandler LogAndFailProcessingExceptionHandler
processing.guarantee 中等 处理模式。可以是 (默认) 或(对于 EOS 版本 2,需要代理版本 2.5+)。已弃用的配置选项包括 (对于 EOS 版本 1) 和 (对于 EOS 版本 2,需要代理版本 2.5+)"at_least_once""exactly_once_v2""exactly_once""exactly_once_beta"请参阅处理保证
poll.ms 块等待输入的时间(以毫秒为单位)。 100 毫秒
rack.aware.assignment.tags 中等 用于在 Kafka Streams 之间分配备用副本的标签键列表 客户。配置后,Kafka Streams 将尽最大努力将备用任务分发到 具有不同 tag 值的 client。 空列表
replication.factor 中等 应用程序创建的 changelog 主题和 repartition 主题的复制因子。 默认值 (即:使用代理默认复制因子) 需要代理版本 2.4 或更高版本。-1 -1
retry.backoff.ms 中等 重试请求之前的时间量(以毫秒为单位)。如果参数配置为大于 0,则适用。retries 100
rocksdb.config.setter 中等 RocksDB 配置。
state.cleanup.delay.ms 迁移分区时,删除状态前等待的时间(以毫秒为单位)。 600000 毫秒 (10 分钟)
state.dir 目录 状态存储的目录位置。 /${java.io.tmpdir}/kafka-streams
task.assignor.class 中等 实现接口的任务分配器类或类名。TaskAssignor 高可用性任务分配器。
task.timeout.ms 中等 任务可能因内部错误而停止并重试直到引发错误的最长时间(以毫秒为单位)。如果超时 为 ,任务将引发第一个内部错误的错误。对于任何大于 的超时,任务将在引发错误之前至少重试一次。0 ms0 ms 300000 毫秒 (5 分钟)
topology.optimization (拓扑优化) 中等 一个配置,告诉 Kafka Streams 是否应该优化拓扑以及要应用哪些优化。可接受的值为:()、() 或以逗号分隔的特定优化列表:()、()、()。StreamsConfig.NO_OPTIMIZATIONnoneStreamsConfig.OPTIMIZEallStreamsConfig.REUSE_KTABLE_SOURCE_TOPICSreuse.ktable.source.topicsStreamsConfig.MERGE_REPARTITION_TOPICSmerge.repartition.topicsStreamsConfig.SINGLE_STORE_SELF_JOINsingle.store.self.join NO_OPTIMIZATION
upgrade.from 中等 在滚动升级期间要从中升级的版本。 请参阅升级自
windowstore.changelog.additional.retention.ms 添加到 Windows maintainMs 以确保数据不会过早地从日志中删除。允许 clock drift。 86400000 毫秒 (1 天)
window.size.ms 设置 deserializer 的窗口大小,以便计算窗口结束时间。 null
log.summary.interval.ms 添加到 Windows maintainMs 以确保数据不会过早地从日志中删除。允许 clock drift。 120000 毫秒 (2 分钟)

可接受.recovery.lag

要被视为已捕获并能够接收活动任务的实例的最大可接受滞后(要从更改日志中赶上的偏移总数)。Streams 将仅分配 状态存储位于可接受的恢复滞后范围内的实例(如果存在)的有状态活动任务,并分配预热副本以在后台恢复实例的状态 尚未赶上。对于给定工作负载,应对应于远低于一分钟的恢复时间。必须至少为 0。

注意:如果将此项设置为此项,则会有效地禁用预热副本和任务高可用性,从而允许 Streams 立即生成平衡的 分配任务并将其迁移到新实例,而无需先预热它们。Long.MAX_VALUE

default.deserialization.exception.handler

默认的反序列化异常处理程序允许您管理无法反序列化的记录异常。这 可能是由数据损坏、序列化逻辑不正确或未处理的记录类型引起的。实现的异常 handler 需要返回 OR,具体取决于记录和引发的异常。返回将指示 Streams 应关闭,并指示 Streams 应忽略该问题 并继续处理。可以使用以下库内置异常处理程序:FAILCONTINUEFAILCONTINUE

  • LogAndContinueExceptionHandler 的 此处理程序记录反序列化异常,然后向处理管道发出信号以继续处理更多记录。 这种记录和跳过策略允许 Kafka Streams 取得进展,而不是在有记录失败时失败 进行反序列化。
  • LogAndFailExceptionHandler 的 Controller。 此处理程序记录反序列化异常,然后向处理管道发出信号以停止处理更多记录。

除了库提供的异常处理程序之外,您还可以提供自己的自定义异常处理程序来满足您的需求。例如,您可以选择转发 corrupt 记录放入隔离主题(想想:“死信队列”)中,以便进一步处理。为此,请使用 Producer API 将损坏的记录直接写入 隔离主题。更具体地说,您可以在 Streams 客户端之外创建一个单独的对象,并传入此对象 以及死信队列主题名称到 Map 中,然后可以从函数调用中检索。 这种方法的缺点是 “手动” 写入是 Kafka Streams 运行时库不可见的副作用。 因此,它们不会从 Streams API 的端到端处理保证中受益:KafkaProducerPropertiesconfigure

public class SendToDeadLetterQueueExceptionHandler implements DeserializationExceptionHandler {
    KafkaProducer<byte[], byte[]> dlqProducer;
    String dlqTopic;

    @Override
    public DeserializationHandlerResponse handle(final ErrorHandlerContext context,
                                                 final ConsumerRecord<byte[], byte[]> record,
                                                 final Exception exception) {

        log.warn("Exception caught during Deserialization, sending to the dead queue topic; " +
            "taskId: {}, topic: {}, partition: {}, offset: {}",
            context.taskId(), record.topic(), record.partition(), record.offset(),
            exception);

        dlqProducer.send(new ProducerRecord<>(dlqTopic, record.timestamp(), record.key(), record.value(), record.headers())).get();

        return DeserializationHandlerResponse.CONTINUE;
    }

    @Override
    public void configure(final Map<String, ?> configs) {
        dlqProducer = .. // get a producer from the configs map
        dlqTopic = .. // get the topic name from the configs map
    }
}

default.production.exception.handler

使用默认的 production 异常处理程序,可以管理在尝试与 broker 交互时触发的异常 例如尝试生成太大的记录。默认情况下,Kafka 提供并使用 DefaultProductionExceptionHandler,当这些异常发生时,它总是失败。

每个异常处理程序都可以返回 OR,具体取决于记录和引发的异常。返回将表示 Streams 应该关闭,并将表示 Streams 应忽略该问题并继续处理。如果要提供始终忽略太大记录的异常处理程序,则可以实现如下内容:FAILCONTINUEFAILCONTINUE

import java.util.Properties;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.streams.errors.ProductionExceptionHandler;
import org.apache.kafka.streams.errors.ProductionExceptionHandler.ProductionExceptionHandlerResponse;

public class IgnoreRecordTooLargeHandler implements ProductionExceptionHandler {
    public void configure(Map<String, Object> config) {}

    public ProductionExceptionHandlerResponse handle(final ErrorHandlerContext context,
                                                     final ProducerRecord<byte[], byte[]> record,
                                                     final Exception exception) {
        if (exception instanceof RecordTooLargeException) {
            return ProductionExceptionHandlerResponse.CONTINUE;
        } else {
            return ProductionExceptionHandlerResponse.FAIL;
        }
    }
}

Properties settings = new Properties();

// other various kafka streams settings, e.g. bootstrap servers, application id, etc

settings.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
             IgnoreRecordTooLargeHandler.class);

default.timestamp.extract或

时间戳提取器从 ConsumerRecord 的实例中提取时间戳。 时间戳用于控制流的进度。

默认提取程序为 FailOnInvalidTimestamp。 此提取器检索由 Kafka 生产者自动嵌入到 Kafka 消息中的内置时间戳 自 Kafka 版本 0.10 起的客户端。 根据 Kafka 服务器端代理和主题参数的设置, 此提取器为您提供:log.message.timestamp.typemessage.timestamp.type

  • 事件时间处理语义 if 设置为 AK “producer time” (这是默认设置)。这表示 Kafka 生产者发送原始消息的时间。如果您使用 Kafka 的 官方生产者客户端,时间戳表示自 epoch 以来的毫秒数。log.message.timestamp.typeCreateTime
  • ingestion-time processing semantics if 设置为 aka “broker 时间”。这表示 Kafka 代理收到原始消息的时间,以自纪元以来的毫秒为单位。log.message.timestamp.typeLogAppendTime

如果记录包含无效(即负数)内置 timestamp,因为 Kafka Streams 不会处理此记录,而是以静默方式删除它。无效的内置时间戳可能会 由于各种原因而发生:例如,如果您使用由 0.10 之前的 Kafka 生产者客户端写入的主题 或者由尚不支持新 Kafka 0.10 消息格式的第三方生产者客户端提供;另一种情况 这可能是在将 Kafka 集群从 升级到 之后发生的,其中生成的所有数据 with 不包含消息时间戳。FailOnInvalidTimestamp0.90.100.90.10

如果您的数据具有无效时间戳并希望对其进行处理,则有两个替代提取程序可用。 两者都适用于内置时间戳,但处理无效时间戳的方式不同。

  • LogAndSkipOnInvalidTimestamp 的 此提取器记录一条警告消息,并将无效的时间戳返回给 Kafka Streams,Kafka Streams 不会处理,但会处理 静默删除记录。 这种对数和跳过策略允许 Kafka Streams 取得进展,而不是在存在 输入数据中的内置时间戳无效。
  • UsePartitionTimeOnInvalidTimestamp 的 ValidTimestamp 中。 如果记录有效(即不负),则此提取器返回记录的内置时间戳。如果记录没有 具有有效的内置时间戳,则提取器会返回之前从 与当前记录相同的主题分区作为时间戳估计。如果无法估计时间戳,则 引发异常。

另一个内置提取器是 WallclockTimestampExtractor。 此提取器实际上并没有从消耗的记录中“提取”时间戳,而是返回 毫秒(想想:),这实际上意味着 Streams 将运行 根据事件的所谓处理时间System.currentTimeMillis()

您还可以提供自己的时间戳提取器,例如检索嵌入在 payload 中的时间戳 消息。如果无法提取有效的时间戳,则可以引发异常、返回负时间戳或 估计时间戳。返回负时间戳将导致数据丢失 – 相应的记录不会 已处理,但无声地丢弃。如果要估计新的时间戳,可以使用 via 提供的值(即 Kafka Streams 时间戳估计)。以下是自定义实现的示例:previousTimestampTimestampExtractor

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.streams.processor.TimestampExtractor;

// Extracts the embedded timestamp of a record (giving you "event-time" semantics).
public class MyEventTimeExtractor implements TimestampExtractor {

  @Override
  public long extract(final ConsumerRecord<Object, Object> record, final long previousTimestamp) {
    // `Foo` is your own custom class, which we assume has a method that returns
    // the embedded timestamp (milliseconds since midnight, January 1, 1970 UTC).
    long timestamp = -1;
    final Foo myPojo = (Foo) record.value();
    if (myPojo != null) {
      timestamp = myPojo.getTimestampInMillis();
    }
    if (timestamp < 0) {
      // Invalid timestamp!  Attempt to estimate a new timestamp,
      // otherwise fall back to wall-clock time (processing-time).
      if (previousTimestamp >= 0) {
        return previousTimestamp;
      } else {
        return System.currentTimeMillis();
      }
    }
  }

}

然后,您将在 Streams 配置中定义自定义时间戳提取器,如下所示:

import java.util.Properties;
import org.apache.kafka.streams.StreamsConfig;

Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MyEventTimeExtractor.class);

default.key.serde

记录键的默认 Serializer/Deserializer 类,除非由用户设置,否则为 null。Kafka Streams 中的序列化和反序列化发生 每当需要具体化数据时,例如:

  • 每当从 Kafka 主题读取数据或将数据写入 Kafka 主题时(例如,通过 and 方法)。StreamsBuilder#stream()KStream#to()
  • 每当从状态存储读取数据或将数据写入状态存储时。

数据类型和序列化中对此进行了更详细的讨论。

default.value.serde

记录值的默认 Serializer/Deserializer 类,除非由用户设置,否则为 null。Kafka Streams 中的序列化和反序列化 每当需要具体化数据时都会发生,例如:

  • 每当从 Kafka 主题读取数据或将数据写入 Kafka 主题时(例如,通过 and 方法)。StreamsBuilder#stream()KStream#to()
  • 每当从状态存储读取数据或将数据写入状态存储时。

数据类型和序列化中对此进行了更详细的讨论。

default.windowed.key.serde.inner(已弃用。

窗口化键的内部类的默认 Serializer/Deserializer 类。Kafka Streams 中的序列化和反序列化发生 每当需要具体化数据时,例如:

  • 每当从 Kafka 主题读取数据或将数据写入 Kafka 主题时(例如,通过 and 方法)。StreamsBuilder#stream()KStream#to()
  • 每当从状态存储读取数据或将数据写入状态存储时。

数据类型和序列化中对此进行了更详细的讨论。

default.windowed.value.serde.inner(已弃用。

窗口化值的内部类的默认 Serializer/Deserializer 类。Kafka Streams 中的序列化和反序列化发生 每当需要具体化数据时都会发生,例如:

  • 每当从 Kafka 主题读取数据或将数据写入 Kafka 主题时(例如,通过 and 方法)。StreamsBuilder#stream()KStream#to()
  • 每当从状态存储读取数据或将数据写入状态存储时。

数据类型和序列化中对此进行了更详细的讨论。

rack.aware.assignment.non_overlap_cost

此配置设置从原始工作分配中移动任务的成本,该成本由 或 计算。䋰 它们控制优化器是倾向于最小化跨机架流量还是最小化现有分配中任务的移动。如果此配置设置为大于 的值,则 优化器将尝试维护 Task 分配器计算的现有分配。请注意,优化器会考虑这两个配置的比率,以支持维护现有分配或最小化流量成本。例如,设置为 10 和 1 比设置为 100 和 50 更有可能保持现有分配。StickyTaskAssignorHighAvailabilityTaskAssignorrack.aware.assignment.traffic_costrack.aware.assignment.traffic_costrack.aware.assignment.non_overlap_costrack.aware.assignment.traffic_costrack.aware.assignment.non_overlap_costrack.aware.assignment.traffic_cost

默认值为 null,这意味着将使用不同赋值器中的 default。在 中,它的默认值为 10,并且具有 默认值 1,这意味着在 中首选保持粘性。在 中,它的默认值为 1,并且具有 默认值 10,这意味着在 中首选最小化跨机架流量。non_overlap_costStickyTaskAssignorrack.aware.assignment.traffic_costStickyTaskAssignorHighAvailabilityTaskAssignorrack.aware.assignment.traffic_costHighAvailabilityTaskAssignor

rack.aware.assignment.strategy

此配置设置 Kafka Streams 用于机架感知任务分配的策略,以便可以减少从代理到客户端的交叉流量。此配置仅在 broker 上设置,并在 Kafka Streams 侧设置时生效。此配置有两个设置:broker.rackclient.rack

  • none.这是默认值,这意味着将禁用机架感知任务分配。
  • min_traffic.此设置意味着机架感知任务分配器将计算一个分配,以尝试最小化跨机架流量。
  • balance_subtopology.此设置意味着机架感知任务分配器将计算一个分配,该分配将尝试将任务从同一子拓扑平衡到不同的客户端,并在此基础上最大限度地减少跨机架流量。

此配置可与 rack.aware.assignment.non_overlap_costrack.aware.assignment.traffic_cost 一起使用,以平衡减少跨机架流量和维持现有分配。

rack.aware.assignment.tags

此配置设置用于在 Kafka Streams 之间分配备用副本的标签键列表 客户。配置后,Kafka Streams 将尽最大努力将备用任务分发到 具有不同 tag 值的 client。

可以通过 prefix 设置 Kafka Streams 客户端的标签。例:client.tag.


Client-1                                   | Client-2
_______________________________________________________________________
client.tag.zone: eu-central-1a             | client.tag.zone: eu-central-1b
client.tag.cluster: k8s-cluster1           | client.tag.cluster: k8s-cluster1
rack.aware.assignment.tags: zone,cluster   | rack.aware.assignment.tags: zone,cluster


Client-3                                   | Client-4
_______________________________________________________________________
client.tag.zone: eu-central-1a             | client.tag.zone: eu-central-1b
client.tag.cluster: k8s-cluster2           | client.tag.cluster: k8s-cluster2
rack.aware.assignment.tags: zone,cluster   | rack.aware.assignment.tags: zone,cluster

在上面的示例中,我们有四个 Kafka Streams 客户端,分别跨两个区域 (、) 和两个集群 (, )。 对于位于 上的活动任务,Kafka Streams 将在 上分配备用任务,因为 具有不同于和不同的 。eu-central-1aeu-central-1bk8s-cluster1k8s-cluster2Client-1Client-4Client-4zoneclusterClient-1

rack.aware.assignment.traffic_cost

此配置设置跨机架流量的成本。䋰 它们控制优化器是倾向于最小化跨机架流量还是最小化现有分配中任务的移动。如果此配置设置为大于 的值,则 优化器将尝试计算一个 assignment,以最小化跨机架流量。请注意,优化器会考虑这两个配置的比率,以支持维护现有分配或最小化流量成本。例如,设置为 10 和 1 比设置为 100 和 50 更有可能最大限度地减少跨机架流量。rack.aware.assignment.non_overlap_costrack.aware.assignment.non_overlap_costrack.aware.assignment.traffic_costrack.aware.assignment.non_overlap_costrack.aware.assignment.traffic_costrack.aware.assignment.non_overlap_cost

默认值为 null,这意味着将使用不同分配器中的默认流量成本。在 中,它的默认值为 1,默认值为 10。在 中,它的默认值为 10,默认值为 1。StickyTaskAssignorrack.aware.assignment.non_overlap_costHighAvailabilityTaskAssignorrack.aware.assignment.non_overlap_cost

max.task.idle.ms

此配置控制 Streams 将等待多长时间来获取数据,以便 提供 In-order 处理语义。

在处理具有多个输入分区的任务时(如在 join 或 merge 中), Streams 需要选择要处理下一条记录的分区。 当所有输入分区都有本地缓冲数据时,Streams 会选择该分区 其下一条记录的时间戳最低。这具有理想的效果 按时间戳顺序整理 Importing 分区,这通常是 想要在流式处理中加入或合并。但是,当 Streams 没有任何数据时 在本地缓冲,则不知道下一个分区 record 的时间戳将低于或高于其余的 partitions 的记录。

有两种情况需要考虑:要么在 broker 尚未获取,或者 Streams 已完全赶上 分区,并且创建者根本没有生成任何新记录 自 Streams 轮询上一批以来。

默认值 会导致 Streams 在检测到任务没有本地 buffered data 的 Partition,但 broker 上有可用的数据。 具体来说,当本地缓冲区中存在空分区,但 Streams 该分区的滞后不为零。但是,一旦 Streams 赶上 代理,它将继续处理,即使其中一个 分区。也就是说,它不会等待生成新数据。 此默认值旨在牺牲一些吞吐量以换取直观 更正 join 语义。0

任何大于零的配置值都表示 Streams 在具有捕获但为空的分区时将等待的额外毫秒数。 换句话说,这是等待生成新数据的时间量 添加到输入分区中,以确保在事件中按顺序处理数据 一个缓慢的生产者。

的 config 值表示 Streams 在选择 按时间戳创建下一条记录,从而以 引入无序处理。-1

max.warmup.replicas

为了保持 任务在一个实例上可用,同时在另一个实例上预热,它已被重新分配到。用于限制额外代理的数量 traffic 和 cluster state 可用于实现高可用性。增加此值将允许 Streams 一次预热更多任务,从而加快时间 对于重新分配的预热,以恢复足够的状态,以便它们转换为活动任务。必须至少为 1。num.standbys

请注意,一个预热副本对应一个 Stream Task。此外,请注意,每个预热任务只能在 再平衡(通常在所谓的探测再平衡期间,以 Config 指定的频率发生)。这意味着 活动任务从一个 Kafka Streams 实例迁移到另一个实例的最大速率可以由以下因素决定 ( / ).probing.rebalance.interval.msmax.warmup.replicasprobing.rebalance.interval.ms

num.standby.replicas

备用副本的数量。备用副本是本地状态存储的卷影副本。Kafka Streams 尝试创建 指定每个存储的副本数,只要有足够的实例运行,它们就会保持最新状态。 备用副本用于最大限度地减少任务故障转移的延迟。以前在失败实例上运行的任务是 首选在具有备用副本的实例上重新启动,以便本地状态存储从其 changelog 可以最小化。有关 Kafka Streams 如何利用备用副本来最大限度地降低 可以在 State (状态) 部分找到故障转移时恢复任务。
建议:
将备用数据库的数量增加到 1 个,以获得即时故障转移,即高可用性。 增加备用数据库的数量需要更多的客户端存储空间。 例如,如果 1 个备用数据库,则需要 2 倍空间。
注意:
如果开启 n 个备用任务,则需要预置 n+1 个实例。KafkaStreams

num.stream.threads

这指定 Kafka Streams 应用程序实例中的流线程数。流处理代码在这些线程中运行。 有关 Kafka Streams 线程模型的更多信息,请参阅线程模型

probing.rebalance.interval.ms

在触发再平衡以探测已恢复到足以被视为已赶上进度的预热副本之前等待的最长时间。Streams 只会将有状态的活动任务分配给 被赶上并在 acceptable.recovery.lag 内的实例(如果存在)。探测再平衡用于查询预热副本和转换的最新总滞后 如果准备好,他们到活动任务。只要有预热任务,它们就会继续触发,直到分配平衡为止。必须至少为 1 分钟。

processing.exception.handler

处理异常处理程序允许您管理在处理记录期间触发的异常。实现的异常 handler 需要返回 OR,具体取决于记录和引发的异常。返回将指示 Streams 应关闭,并指示 Streams 应忽略该问题 并继续处理。可以使用以下库内置异常处理程序:FAILCONTINUEFAILCONTINUE

除了库提供的异常处理程序之外,您还可以提供自己的自定义异常处理程序来满足您的需求。例如,您可以选择转发 corrupt 记录放入隔离主题(想想:“死信队列”)中,以便进一步处理。为此,请使用 Producer API 将损坏的记录直接写入 隔离主题。更具体地说,您可以在 Streams 客户端之外创建一个单独的对象,并传入此对象 以及死信队列主题名称到 Map 中,然后可以从函数调用中检索。 这种方法的缺点是 “手动” 写入是 Kafka Streams 运行时库不可见的副作用。 因此,它们不会从 Streams API 的端到端处理保证中受益:KafkaProducerPropertiesconfigure

public class SendToDeadLetterQueueExceptionHandler implements ProcessingExceptionHandler {
    KafkaProducer<byte[], byte[]> dlqProducer;
    String dlqTopic;

    @Override
    public ProcessingHandlerResponse handle(final ErrorHandlerContext context,
                                            final Record record,
                                            final Exception exception) {

        log.warn("Exception caught during message processing, sending to the dead queue topic; " +
            "processor node: {}, taskId: {}, source topic: {}, source partition: {}, source offset: {}",
            context.processorNodeId(), context.taskId(), context.topic(), context.partition(), context.offset(),
            exception);

        dlqProducer.send(new ProducerRecord<>(dlqTopic, null, record.timestamp(), (byte[]) record.key(), (byte[]) record.value(), record.headers()));

        return ProcessingHandlerResponse.CONTINUE;
    }

    @Override
    public void configure(final Map<String, ?> configs) {
        dlqProducer = .. // get a producer from the configs map
        dlqTopic = .. // get the topic name from the configs map
    }
}

processing.guarantee

应使用的处理保证。 可能的值为 (default) 和 (适用于 EOS 版本 2)。 已弃用的配置选项包括 (EOS alpha)、 和 (适用于 EOS 版本 2)。 使用(或已弃用的)需要代理版本 2.5 或更高版本, 使用已弃用的 需要 Broker 版本 0.11.0 或更高版本。 请注意,如果启用了 exactly-once 处理,则参数的默认值将更改为 100ms。 此外,使用者配置了 perdefault,创建者配置了 per。 请注意,默认情况下,恰好一次处理需要一个至少包含三个代理的集群,这是推荐的生产设置。 对于开发,您可以通过调整 broker 设置和要使用的 broker 数量来更改此配置。 有关更多详细信息,请参阅处理保证"at_least_once""exactly_once_v2""exactly_once""exactly_once_beta""exactly_once_v2""exactly_once_beta""exactly_once"commit.interval.msisolation.level="read_committed"enable.idempotence=truetransaction.state.log.replication.factortransaction.state.log.min.isr
建议:
虽然从技术上讲可以将 EOS 与任何复制因子一起使用,但使用复制因子的 比 3 有效地使 EOS 无效。因此,强烈建议使用 replication factor 3(与 一起)。此建议适用于所有主题(即 、、Kafka Streams 内部主题和用户主题)。min.in.sync.replicas=2__transaction_state__consumer_offsets

replication.factor

这指定了 Kafka Streams 在使用本地状态或流 重新分区以进行聚合。复制对于容错非常重要。没有复制,甚至单个代理失败 可能会阻止流处理应用程序的进度。建议使用与源主题类似的复制因子。

建议:
将复制因子增加到 3,以确保内部 Kafka Streams 主题最多可以容忍 2 个代理故障。 请注意,您还需要更多的存储空间(复制因子为 3 时为 3 倍)。

rocksdb.config.setter

RocksDB 配置。Kafka Streams 使用 RocksDB 作为持久存储的默认存储引擎。更改默认值 配置,你可以通过 rocksdb.config.setter 实现和提供你的自定义类。RocksDBConfigSetter

下面是一个调整 RocksDB 消耗的内存大小的示例。

public static class CustomRocksDBConfig implements RocksDBConfigSetter {
    // This object should be a member variable so it can be closed in RocksDBConfigSetter#close.
    private org.rocksdb.Cache cache = new org.rocksdb.LRUCache(16 * 1024L * 1024L);

    @Override
    public void setConfig(final String storeName, final Options options, final Map<String, Object> configs) {
        // See #1 below.
        BlockBasedTableConfig tableConfig = (BlockBasedTableConfig) options.tableFormatConfig();
        tableConfig.setBlockCache(cache);
        // See #2 below.
        tableConfig.setBlockSize(16 * 1024L);
        // See #3 below.
        tableConfig.setCacheIndexAndFilterBlocks(true);
        options.setTableFormatConfig(tableConfig);
        // See #4 below.
        options.setMaxWriteBufferNumber(2);
    }

    @Override
    public void close(final String storeName, final Options options) {
        // See #5 below.
        cache.close();
    }
}

Properties streamsSettings = new Properties();
streamsConfig.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, CustomRocksDBConfig.class);
例如:
  1. BlockBasedTableConfig tableConfig = (BlockBasedTableConfig) options.tableFormatConfig();获取对现有表配置的引用,而不是创建新的表配置,这样您就不会意外覆盖默认值,例如,这是一项重要的优化。BloomFilter
  2. tableConfig.setBlockSize(16 * 1024L);根据 RocksDB GitHub 中的这些说明修改默认数据块大小
  3. tableConfig.setCacheIndexAndFilterBlocks(true);不要让 index 和 filter 块无限增长。有关更多信息,请参阅 RocksDB GitHub
  4. options.setMaxWriteBufferNumber(2);请参阅 RocksDB GitHub 中的高级选项。
  5. cache.close();为避免内存泄漏,您必须关闭您构建的任何扩展 org.rocksdb.RocksObject 的对象。有关更多详细信息,请参阅 RocksJava 文档

state.dir 目录

状态目录。Kafka Streams 将本地状态保存在 state 目录下。每个应用程序在其主机上都有一个子目录 machine 的 state 目录。子目录的名称是应用程序 ID。关联的 state store 替换为 application 在此子目录下创建。在单台计算机上运行同一应用程序的多个实例时, 对于每个此类实例,此路径必须是唯一的。

task.assignor.class

实现接口的任务分配器类或类名。默认为 high-availability task 分配器。Apache Kafka 中提供的一种可能的替代实现是 ,这是默认任务 assignor 之前,并以有状态任务可用性为代价最大限度地减少任务移动。的替代实现 通过实现自定义并将此配置设置为自定义任务分配器类的名称,可以将任务分配算法插入到应用程序中。org.apache.kafka.streams.processor.assignment.TaskAssignororg.apache.kafka.streams.processor.assignment.assignors.StickyTaskAssignorTaskAssignor

topology.optimization (拓扑优化)

一个配置,告诉 Kafka Streams 是否应该优化拓扑以及要应用哪些优化。可接受的值为:()、() 或以逗号分隔的特定优化列表:()、()、()。StreamsConfig.NO_OPTIMIZATIONnoneStreamsConfig.OPTIMIZEallStreamsConfig.REUSE_KTABLE_SOURCE_TOPICSreuse.ktable.source.topicsStreamsConfig.MERGE_REPARTITION_TOPICSmerge.repartition.topicsStreamsConfig.SINGLE_STORE_SELF_JOINsingle.store.self.join

我们建议在生产代码的配置中列出特定的优化,以便在升级 Streams 库期间拓扑的结构不会意外更改。

这些优化包括移动/减少重新分区主题和重用源主题作为源 KTables 的更新日志。这些优化将节省 Kafka 中的网络流量和存储,而不会改变应用程序的语义。建议启用它们。

请注意,从 2.3 开始,您需要执行两项操作才能启用优化。除了将此配置设置为 之外,您还需要传入 configuration 属性。 例如。StreamsConfig.OPTIMIZEStreamsBuilder.build(Properties)KafkaStreams myStream = new KafkaStreams(streamsBuilder.build(properties), properties)

windowed.inner.class.serde

Serde 用于窗口记录的内部类。必须实现 org.apache.kafka.common.serialization.Serde 接口。

请注意,此配置仅由通过 configs 设置窗口化 de/serializer 类型的普通 consumer/producer 客户端使用。对于处理窗口类型的 Kafka Streams 应用程序,在实例化拓扑的窗口化 serde 对象时,必须传入内部 serde 类型。

log.summary.interval.ms

此配置控制摘要信息的输出间隔。 如果大于等于 0,则按照设置的时间间隔输出 summary log; 如果小于 0,则禁用摘要输出。

upgrade.from

您要升级的版本。在执行滚动升级到某些版本时,请务必设置此配置,如升级指南中所述。 您应该在退回实例并将其升级到较新版本之前将此配置设置为适当的版本。一旦每个人都在 newer 版本,您应该删除此配置并执行第二次滚动退回。只需设置此配置并遵循 two-bounce 升级路径 从低于 2.0 的版本升级时,或从低于 2.4 的任何版本升级到 2.4+ 时。

Kafka 使用者、创建者和管理客户端配置参数

您可以为 Kafka 使用者生产者、 和 admin 客户端。 使用者、创建者和管理客户端设置是通过在实例中指定参数来定义的。StreamsConfig

在此示例中,Kafka 使用者会话超时在 Streams 设置中配置为 60000 毫秒:

Properties streamsSettings = new Properties();
// Example of a "normal" setting for Kafka Streams
streamsSettings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker-01:9092");
// Customize the Kafka consumer settings of your Streams application
streamsSettings.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 60000);

命名

一些使用者、创建者和管理客户端配置参数使用相同的参数名称,Kafka Streams 库本身也使用一些与其嵌入式客户端共享相同名称的参数。例如,和 用于配置 TCP 缓冲区; 并控制客户端请求的重试; 用于配置在处理 Broker 请求响应中的可重试错误时允许的重试次数。 您可以通过在参数名称前加上 、 或 (例如,和 ) 来避免重复名称。send.buffer.bytesreceive.buffer.bytesrequest.timeout.msretry.backoff.msretriesconsumer.producer.admin.consumer.send.buffer.bytesproducer.send.buffer.bytes

Properties streamsSettings = new Properties();
// same value for consumer, producer, and admin client
streamsSettings.put("PARAMETER_NAME", "value");
// different values for consumer and producer
streamsSettings.put("consumer.PARAMETER_NAME", "consumer-value");
streamsSettings.put("producer.PARAMETER_NAME", "producer-value");
streamsSettings.put("admin.PARAMETER_NAME", "admin-value");
// alternatively, you can use
streamsSettings.put(StreamsConfig.consumerPrefix("PARAMETER_NAME"), "consumer-value");
streamsSettings.put(StreamsConfig.producerPrefix("PARAMETER_NAME"), "producer-value");
streamsSettings.put(StreamsConfig.adminClientPrefix("PARAMETER_NAME"), "admin-value");

您可以通过添加不同的前缀来进一步分离使用者配置:

  • main.consumer.for main consumer,这是 Stream Source 的默认 consumer。
  • restore.consumer.对于负责状态存储恢复的 Restore Consumer。
  • global.consumer.面向全球消费者,用于全球 KTable 建设。

例如,如果你只想设置恢复消费者配置而不触及其他消费者的设置,你可以简单地使用 来设置配置。restore.consumer.

Properties streamsSettings = new Properties();
// same config value for all consumer types
streamsSettings.put("consumer.PARAMETER_NAME", "general-consumer-value");
// set a different restore consumer config. This would make restore consumer take restore-consumer-value,
// while main consumer and global consumer stay with general-consumer-value
streamsSettings.put("restore.consumer.PARAMETER_NAME", "restore-consumer-value");
// alternatively, you can use
streamsSettings.put(StreamsConfig.restoreConsumerPrefix("PARAMETER_NAME"), "restore-consumer-value");

如果您只想指定一个使用者类型 config,则同样适用于 和 。main.consumer.main.consumer.

此外,要配置内部 repartition/changelog 主题,您可以使用前缀,后跟任何标准主题配置。topic.

Properties streamsSettings = new Properties();
// Override default for both changelog and repartition topics
streamsSettings.put("topic.PARAMETER_NAME", "topic-value");
// alternatively, you can use
streamsSettings.put(StreamsConfig.topicPrefix("PARAMETER_NAME"), "topic-value");