数据类型和序列化

每个 Kafka Streams 应用程序都必须为记录键和记录值(例如 )的数据类型提供 Serdes(序列化器/反序列化器),以便在必要时具体化数据。需要此类 Serdes 信息的操作包括: 、 。java.lang.Stringstream()table()to()repartition()groupByKey()groupBy()

您可以使用以下任一方法提供 Serdes,但必须至少使用一种:

  • 通过在 config 实例中设置默认 Serdes。java.util.Properties
  • 通过在调用适当的 API 方法时指定显式 Serdes,从而覆盖默认值。

目录

配置 Serdes

在 Streams 配置中指定的 Serdes 将用作 Kafka Streams 应用程序中的默认值。 由于此配置的默认值为 null,因此您必须使用此 配置或显式传入 Serdes,如下所述。

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;

Properties settings = new Properties();
// Default serde for keys of data records (here: built-in serde for String type)
settings.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
// Default serde for values of data records (here: built-in serde for Long type)
settings.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass().getName());

覆盖默认 Serdes

您还可以通过将 Serdes 传递给相应的 API 方法来显式指定 Serde,这将覆盖默认的 serde 设置:

import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;

final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();

// The stream userCountByRegion has type `String` for record keys (for region)
// and type `Long` for record values (for user counts).
KStream<String, Long> userCountByRegion = ...;
userCountByRegion.to("RegionCountsTopic", Produced.with(stringSerde, longSerde));

如果要有选择地覆盖 serdes,即保留某些字段的默认值,那么每当要利用默认设置时,都不要指定 serde:

import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;

// Use the default serializer for record keys (here: region as String) by not specifying the key serde,
// but override the default serializer for record values (here: userCount as Long).
final Serde<Long> longSerde = Serdes.Long();
KStream<String, Long> userCountByRegion = ...;
userCountByRegion.to("RegionCountsTopic", Produced.valueSerde(Serdes.Long()));

如果你的一些 incoming records 损坏或格式不正确,它们将导致 deserializer 类报告错误。 从 1.0.x 开始,我们引入了一个接口,它允许 you 自定义如何处理此类记录。接口的自定义实现可以通过 . 有关更多详细信息,请随时阅读 配置 Streams 应用程序 部分。DeserializationExceptionHandlerStreamsConfig

可用的 Serdes

基元类型和基本类型

Apache Kafka 包含多个用于 Java 基元和基本类型的内置 serde 实现,例如 其 Maven 工件:byte[]kafka-clients

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.8.0</version>
</dependency>

此工件在包 org.apache.kafka.common.serialization 下提供了以下 serde 实现,例如,您可以在 Streams 配置中定义默认序列化器时利用这些实现。

数据类型 塞尔德
字节 Serdes.ByteArray(),(请参阅下面的提示)Serdes.Bytes()
字节缓冲区 Serdes.ByteBuffer()
双整型 Serdes.Double()
整数 Serdes.Integer()
长整型 Serdes.Long()
字符串 Serdes.String()
UUID 的 Serdes.UUID()
无效 Serdes.Void()
列表 Serdes.ListSerde()
布尔 Serdes.Boolean()

提示

Bytes 是 Java(字节数组)的包装器,支持适当的相等和排序语义。您可能需要考虑在应用程序中使用 instead 而不是。byte[]Bytesbyte[]

JSON 格式

Kafka Streams 代码示例还包括 JSON 的基本 serde 实现:

如示例中所示,您可以使用 JSONSerdes 内部类来构造与 JSON 兼容的序列化程序和反序列化程序。Serdes.serdeFrom(<serializerInstance>, <deserializerInstance>)

实现自定义 Serdes

如果需要实现自定义 Serdes,最好的起点是查看 现有的 Serdes (请参阅上一节)。通常,您的工作流程将类似于:

  1. 通过实现 org.apache.kafka.common.serialization.Serializer 为您的数据类型编写序列化器T
  2. 通过实现 org.apache.kafka.common.serialization.Deserializer 编写一个反序列化器T
  3. 通过实现 org.apache.kafka.common.serialization.Serde 编写一个 serde, 您可以手动执行此操作(请参阅上一节中的现有 Serdes),或者利用 Serdes 中的辅助函数,例如 . 请注意,如果要在提供给 . 如果您的 serde 类具有泛型类型,或者您使用 ,则只能传递 serde via 方法调用(例如 )。TSerdes.serdeFrom(Serializer<T>, Deserializer<T>)KafkaStreamsSerdes.serdeFrom(Serializer<T>, Deserializer<T>)builder.stream("topicName", Consumed.with(...))

适用于 Scala 隐式 Serdes 的 Kafka Streams DSL

使用 Kafka Streams DSL for Scala 时,您不需要配置默认 Serdes。事实上,它不受支持。相反,默认情况下,Serdes 是隐式提供的,用于常见的 primitive 数据类型。有关详细信息,请参阅 DSL API 文档中的隐式 Serdes用户定义的 Serdes 部分