数据类型和序列化
每个 Kafka Streams 应用程序都必须为记录键和记录值(例如 )的数据类型提供 Serdes(序列化器/反序列化器),以便在必要时具体化数据。需要此类 Serdes 信息的操作包括: 、 。java.lang.String
stream()
table()
to()
repartition()
groupByKey()
groupBy()
您可以使用以下任一方法提供 Serdes,但必须至少使用一种:
- 通过在 config 实例中设置默认 Serdes。
java.util.Properties
- 通过在调用适当的 API 方法时指定显式 Serdes,从而覆盖默认值。
目录
配置 Serdes
在 Streams 配置中指定的 Serdes 将用作 Kafka Streams 应用程序中的默认值。 由于此配置的默认值为 null,因此您必须使用此 配置或显式传入 Serdes,如下所述。
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
Properties settings = new Properties();
// Default serde for keys of data records (here: built-in serde for String type)
settings.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
// Default serde for values of data records (here: built-in serde for Long type)
settings.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass().getName());
覆盖默认 Serdes
您还可以通过将 Serdes 传递给相应的 API 方法来显式指定 Serde,这将覆盖默认的 serde 设置:
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();
// The stream userCountByRegion has type `String` for record keys (for region)
// and type `Long` for record values (for user counts).
KStream<String, Long> userCountByRegion = ...;
userCountByRegion.to("RegionCountsTopic", Produced.with(stringSerde, longSerde));
如果要有选择地覆盖 serdes,即保留某些字段的默认值,那么每当要利用默认设置时,都不要指定 serde:
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
// Use the default serializer for record keys (here: region as String) by not specifying the key serde,
// but override the default serializer for record values (here: userCount as Long).
final Serde<Long> longSerde = Serdes.Long();
KStream<String, Long> userCountByRegion = ...;
userCountByRegion.to("RegionCountsTopic", Produced.valueSerde(Serdes.Long()));
如果你的一些 incoming records 损坏或格式不正确,它们将导致 deserializer 类报告错误。
从 1.0.x 开始,我们引入了一个接口,它允许
you 自定义如何处理此类记录。接口的自定义实现可以通过 .
有关更多详细信息,请随时阅读 配置 Streams 应用程序 部分。DeserializationExceptionHandler
StreamsConfig
可用的 Serdes
基元类型和基本类型
Apache Kafka 包含多个用于 Java 基元和基本类型的内置 serde 实现,例如
其 Maven 工件:byte[]
kafka-clients
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
此工件在包 org.apache.kafka.common.serialization 下提供了以下 serde 实现,例如,您可以在 Streams 配置中定义默认序列化器时利用这些实现。
数据类型 | 塞尔德 |
---|---|
字节 | Serdes.ByteArray() ,(请参阅下面的提示)Serdes.Bytes() |
字节缓冲区 | Serdes.ByteBuffer() |
双整型 | Serdes.Double() |
整数 | Serdes.Integer() |
长整型 | Serdes.Long() |
字符串 | Serdes.String() |
UUID 的 | Serdes.UUID() |
无效 | Serdes.Void() |
列表 | Serdes.ListSerde() |
布尔 | Serdes.Boolean() |
提示
Bytes 是 Java(字节数组)的包装器,支持适当的相等和排序语义。您可能需要考虑在应用程序中使用 instead 而不是。byte[]
Bytes
byte[]
JSON 格式
Kafka Streams 代码示例还包括 JSON 的基本 serde 实现:
如示例中所示,您可以使用 JSONSerdes 内部类来构造与 JSON 兼容的序列化程序和反序列化程序。Serdes.serdeFrom(<serializerInstance>, <deserializerInstance>)
实现自定义 Serdes
如果需要实现自定义 Serdes,最好的起点是查看 现有的 Serdes (请参阅上一节)。通常,您的工作流程将类似于:
- 通过实现 org.apache.kafka.common.serialization.Serializer 为您的数据类型编写序列化器。
T
- 通过实现 org.apache.kafka.common.serialization.Deserializer 编写一个反序列化器。
T
- 通过实现 org.apache.kafka.common.serialization.Serde 编写一个 serde,
您可以手动执行此操作(请参阅上一节中的现有 Serdes),或者利用 Serdes 中的辅助函数,例如 .
请注意,如果要在提供给 .
如果您的 serde 类具有泛型类型,或者您使用 ,则只能传递 serde
via 方法调用(例如 )。
T
Serdes.serdeFrom(Serializer<T>, Deserializer<T>)
KafkaStreams
Serdes.serdeFrom(Serializer<T>, Deserializer<T>)
builder.stream("topicName", Consumed.with(...))
适用于 Scala 隐式 Serdes 的 Kafka Streams DSL
使用 Kafka Streams DSL for Scala 时,您不需要配置默认 Serdes。事实上,它不受支持。相反,默认情况下,Serdes 是隐式提供的,用于常见的 primitive 数据类型。有关详细信息,请参阅 DSL API 文档中的隐式 Serdes 和用户定义的 Serdes 部分