教程:编写 Kafka Streams 应用程序
在本指南中,我们将从头开始设置您自己的项目,以使用 Kafka Streams 编写流处理应用程序。 如果您尚未阅读有关如何运行以 Kafka Streams 编写的 Streams 应用程序的快速入门,强烈建议您先阅读。
设置 Maven 项目
我们将使用 Kafka Streams Maven 原型通过以下命令创建 Streams 项目结构:
$ mvn archetype:generate \
-DarchetypeGroupId=org.apache.kafka \
-DarchetypeArtifactId=streams-quickstart-java \
-DarchetypeVersion={{fullDotVersion}} \
-DgroupId=streams.examples \
-DartifactId=streams-quickstart \
-Dversion=0.1 \
-Dpackage=myapps
如果需要,您可以对 和 parameters 使用不同的值。
假设使用了上述参数值,此命令将创建一个如下所示的项目结构:groupId
artifactId
package
$ tree streams-quickstart
streams-quickstart
|-- pom.xml
|-- src
|-- main
|-- java
| |-- myapps
| |-- LineSplit.java
| |-- Pipe.java
| |-- WordCount.java
|-- resources
|-- log4j.properties
项目中包含的文件已定义 Streams 依赖项。
请注意,生成的 API 以 Java 8 为目标,不适用于更高的 Java 版本。pom.xml
pom.xml
在 下已经有几个使用 Streams 库编写的示例程序。
由于我们将从头开始编写此类程序,因此我们现在可以删除这些示例:src/main/java
$ cd streams-quickstart
$ rm src/main/java/myapps/*.java
编写第一个 Streams 应用程序:Pipe
现在是编码时间!随意打开您最喜欢的 IDE 并导入此 Maven 项目,或者只需打开文本编辑器并在 下创建一个 java 文件。 让我们命名它:src/main/java/myapps
Pipe.java
package myapps;
public class Pipe {
public static void main(String[] args) throws Exception {
}
}
我们将填写函数来编写这个 pipe 程序。请注意,我们不会随时列出 import 语句,因为 IDE 通常可以自动添加它们。
但是,如果您使用的是文本编辑器,则需要手动添加导入,在本节结束时,我们将为您显示带有 import 语句的完整代码片段。main
编写 Streams 应用程序的第一步是创建一个映射,以指定 中定义的不同 Streams 执行配置值。
您需要设置的几个重要配置值是:,它指定了用于建立与 Kafka 集群的初始连接的主机/端口对列表,
和 ,它提供 Streams 应用程序的唯一标识符,以便与与同一 Kafka 集群通信的其他应用程序区分开来:java.util.Properties
StreamsConfig
StreamsConfig.BOOTSTRAP_SERVERS_CONFIG
StreamsConfig.APPLICATION_ID_CONFIG
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // assuming that the Kafka broker this application is talking to runs on local machine with port 9092
此外,您还可以自定义同一 map 中的其他配置,例如,记录键值对的默认序列化和反序列化库:
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
有关 Kafka Streams 配置的完整列表,请参阅此表。
接下来,我们将定义 Streams 应用程序的计算逻辑。
在 Kafka Streams 中,此计算逻辑定义为连接的处理器节点。
我们可以使用拓扑构建器来构建这样的拓扑,topology
final StreamsBuilder builder = new StreamsBuilder();
然后,使用以下拓扑构建器从名为 Kafka 的主题创建源流:streams-plaintext-input
KStream<String, String> source = builder.stream("streams-plaintext-input");
现在,我们得到了一个从其源 Kafka 主题持续生成记录的 .
记录被组织为键入的键值对。
我们可以对这个流做的最简单的事情是将其写入另一个 Kafka 主题,比如说它被命名为:KStream
streams-plaintext-input
String
streams-pipe-output
source.to("streams-pipe-output");
请注意,我们还可以将上述两行连接成一行,如下所示:
builder.stream("streams-plaintext-input").to("streams-pipe-output");
我们可以通过执行以下操作来检查从此构建器创建的 类型 :topology
final Topology topology = builder.build();
并将其描述打印到标准输出中,如下所示:
System.out.println(topology.describe());
如果我们就到此停下来,编译并运行程序,它会输出以下信息:
$ mvn clean package
$ mvn exec:java -Dexec.mainClass=myapps.Pipe
Sub-topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000(topics: streams-plaintext-input) --> KSTREAM-SINK-0000000001
Sink: KSTREAM-SINK-0000000001(topic: streams-pipe-output) <-- KSTREAM-SOURCE-0000000000
Global Stores:
none
如上所示,它说明了构造的拓扑有两个处理器节点,一个 source 节点和一个 sink 节点。 从 Kafka 主题中持续读取记录并将其通过管道传输到其下游节点; 将它收到的每条记录按顺序写入另一个 Kafka 主题(和箭头指示该节点的下游和上游处理器节点,即拓扑图中的“children”和“parents”)。
它还说明了这个简单的拓扑没有与之关联的全局状态存储(我们将在下面的部分中更多地讨论状态存储)。KSTREAM-SOURCE-0000000000
KSTREAM-SINK-0000000001
KSTREAM-SOURCE-0000000000
streams-plaintext-input
KSTREAM-SINK-0000000001
KSTREAM-SINK-0000000001
streams-pipe-output
-->
<--
请注意,当我们在代码中构建拓扑时,我们始终可以像上面一样在任何给定点描述拓扑,因此作为用户,您可以交互式地 “尝试并品尝” 拓扑中定义的计算逻辑,直到您对它感到满意为止。
假设我们已经完成了这个简单的拓扑结构,它只是以无休止的流式方式将数据从一个 Kafka 主题传递到另一个 Kafka 主题。
现在,我们可以使用上面刚刚构建的两个组件来构造 Streams 客户端:实例中指定的配置映射和对象。java.util.Properties
Topology
final KafkaStreams streams = new KafkaStreams(topology, props);
通过调用它的函数,我们可以触发此客户端的执行。
执行不会停止,直到在此客户端上调用。
例如,我们可以添加一个带有倒计时闩锁的关闭钩子,以捕获用户中断并在终止此程序时关闭客户端:start()
close()
final CountDownLatch latch = new CountDownLatch(1);
// attach shutdown handler to catch control-c
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
@Override
public void run() {
streams.close();
latch.countDown();
}
});
try {
streams.start();
latch.await();
} catch (Throwable e) {
System.exit(1);
}
System.exit(0);
到目前为止,完整的代码如下所示:
package myapps;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
public class Pipe {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
final StreamsBuilder builder = new StreamsBuilder();
builder.stream("streams-plaintext-input").to("streams-pipe-output");
final Topology topology = builder.build();
final KafkaStreams streams = new KafkaStreams(topology, props);
final CountDownLatch latch = new CountDownLatch(1);
// attach shutdown handler to catch control-c
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
@Override
public void run() {
streams.close();
latch.countDown();
}
});
try {
streams.start();
latch.await();
} catch (Throwable e) {
System.exit(1);
}
System.exit(0);
}
}
如果您已经在 启动并运行 Kafka 代理,则
以及在该代理上创建的 主题 和 ,
您可以使用 Maven 在 IDE 或命令行中运行此代码:localhost:9092
streams-plaintext-input
streams-pipe-output
$ mvn clean package
$ mvn exec:java -Dexec.mainClass=myapps.Pipe
有关如何运行 Streams 应用程序并观察其计算结果的详细说明, 请阅读 Play with a Streams 应用程序 部分。 我们不会在本节的其余部分讨论这个问题。
编写第二个 Streams 应用程序:Line Split
我们已经学习了如何构造一个 Streams 客户端及其两个关键组件:和 。
现在让我们继续通过增强当前拓扑来添加一些真正的 processing logic。
我们可以先通过复制现有 class 来创建另一个程序:StreamsConfig
Topology
Pipe.java
$ cp src/main/java/myapps/Pipe.java src/main/java/myapps/LineSplit.java
并更改其类名以及应用程序 id 配置以区分与原始程序:
public class LineSplit {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-linesplit");
// ...
}
}
由于源流的每个记录都是一个类型化的键值对,因此
让我们将 Value String 视为文本行,并使用运算符将其拆分为单词:String
FlatMapValues
KStream<String, String> source = builder.stream("streams-plaintext-input");
KStream<String, String> words = source.flatMapValues(new ValueMapper<String, Iterable<String>>() {
@Override
public Iterable<String> apply(String value) {
return Arrays.asList(value.split("\\W+"));
}
});
该运算符将流作为其输入,并生成一个新流,该流的命名方式是按顺序处理其源流中的每条记录,并将其值字符串分解为单词列表,并生成
每个单词作为新记录添加到输出流中。
这是一个无状态的运算符,不需要跟踪任何以前收到的记录或处理的结果。
请注意,如果您使用的是 JDK 8,则可以使用 lambda 表达式并将上述代码简化为:source
words
words
KStream<String, String> source = builder.stream("streams-plaintext-input");
KStream<String, String> words = source.flatMapValues(value -> Arrays.asList(value.split("\\W+")));
最后,我们可以将单词 stream 写回另一个 Kafka 主题,比如 。
同样,这两个步骤可以连接如下(假设使用 lambda 表达式):streams-linesplit-output
KStream<String, String> source = builder.stream("streams-plaintext-input");
source.flatMapValues(value -> Arrays.asList(value.split("\\W+")))
.to("streams-linesplit-output");
如果我们现在将这种增强拓扑描述为 ,我们将得到以下内容:System.out.println(topology.describe())
$ mvn clean package
$ mvn exec:java -Dexec.mainClass=myapps.LineSplit
Sub-topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000(topics: streams-plaintext-input) --> KSTREAM-FLATMAPVALUES-0000000001
Processor: KSTREAM-FLATMAPVALUES-0000000001(stores: []) --> KSTREAM-SINK-0000000002 <-- KSTREAM-SOURCE-0000000000
Sink: KSTREAM-SINK-0000000002(topic: streams-linesplit-output) <-- KSTREAM-FLATMAPVALUES-0000000001
Global Stores:
none
正如我们在上面看到的,一个新的处理器节点被注入到原始 source 节点和 sink 节点之间的拓扑中。
它将源节点作为其父节点,将接收器节点作为其子节点。
换句话说,源节点获取的每条记录都会首先遍历到新添加的要处理的节点,
,并且将生成一个或多个新记录。它们将继续向下遍历到 sink 节点,以写回 Kafka。
请注意,此处理器节点是 “无状态” 的,因为它不与任何 store (即 ) 关联。KSTREAM-FLATMAPVALUES-0000000001
KSTREAM-FLATMAPVALUES-0000000001
(stores: [])
完整的代码如下所示(假设使用 lambda 表达式):
package myapps;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
public class LineSplit {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-linesplit");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
final StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source = builder.stream("streams-plaintext-input");
source.flatMapValues(value -> Arrays.asList(value.split("\\W+")))
.to("streams-linesplit-output");
final Topology topology = builder.build();
final KafkaStreams streams = new KafkaStreams(topology, props);
final CountDownLatch latch = new CountDownLatch(1);
// ... same as Pipe.java above
}
}
编写第三个 Streams 应用程序:Wordcount
现在让我们更进一步,通过计算从源文本流中分割的单词的出现次数,向拓扑添加一些 “stateful” 计算。
按照类似的步骤,让我们基于该类创建另一个程序:LineSplit.java
public class WordCount {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
// ...
}
}
为了计算单词数,我们可以首先修改运算符,将所有单词都视为小写(假设使用 lambda 表达式):flatMapValues
source.flatMapValues(new ValueMapper<String, Iterable<String>>() {
@Override
public Iterable<String> apply(String value) {
return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"));
}
});
为了进行计数聚合,我们必须首先指定我们要使用运算符在值字符串(即小写单词)上对流进行键控。
此运算符生成一个新的分组流,然后可以通过运算符聚合该流,该运算符在每个分组的键上生成运行计数:groupBy
count
KTable<String, Long> counts =
source.flatMapValues(new ValueMapper<String, Iterable<String>>() {
@Override
public Iterable<String> apply(String value) {
return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"));
}
})
.groupBy(new KeyValueMapper<String, String, String>() {
@Override
public String apply(String key, String value) {
return value;
}
})
// Materialize the result into a KeyValueStore named "counts-store".
// The Materialized store is always of type <Bytes, byte[]> as this is the format of the inner most store.
.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>> as("counts-store"));
请注意,该运算符有一个参数,该参数指定
Running Count 应存储在名为 的状态存储中。
可以实时查询此存储,详细信息在 开发人员手册.count
Materialized
counts-store
counts-store
我们还可以将 KTable 的 changelog 流写回另一个 Kafka 主题,比如 。
由于结果是 changelog 流,因此输出主题应在启用日志压缩的情况下进行配置。
请注意,这次值类型不再是 but ,因此默认序列化类不再可用于将其写入 Kafka。
我们需要为类型提供覆盖的序列化方法,否则将抛出运行时异常:counts
streams-wordcount-output
streams-wordcount-output
String
Long
Long
counts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
请注意,为了从 topic 读取 changelog 流 ,
需要将值 deserialization 设置为 。
有关此内容的详细信息,请参阅 Play with a Streams Application 部分。
假设可以使用 JDK 8 中的 lambda 表达式,则上述代码可以简化为:streams-wordcount-output
org.apache.kafka.common.serialization.LongDeserializer
KStream<String, String> source = builder.stream("streams-plaintext-input");
source.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
.groupBy((key, value) -> value)
.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"))
.toStream()
.to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
如果我们再次将这种增强拓扑描述为 ,我们将得到以下内容:System.out.println(topology.describe())
$ mvn clean package
$ mvn exec:java -Dexec.mainClass=myapps.WordCount
Sub-topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000(topics: streams-plaintext-input) --> KSTREAM-FLATMAPVALUES-0000000001
Processor: KSTREAM-FLATMAPVALUES-0000000001(stores: []) --> KSTREAM-KEY-SELECT-0000000002 <-- KSTREAM-SOURCE-0000000000
Processor: KSTREAM-KEY-SELECT-0000000002(stores: []) --> KSTREAM-FILTER-0000000005 <-- KSTREAM-FLATMAPVALUES-0000000001
Processor: KSTREAM-FILTER-0000000005(stores: []) --> KSTREAM-SINK-0000000004 <-- KSTREAM-KEY-SELECT-0000000002
Sink: KSTREAM-SINK-0000000004(topic: counts-store-repartition) <-- KSTREAM-FILTER-0000000005
Sub-topology: 1
Source: KSTREAM-SOURCE-0000000006(topics: counts-store-repartition) --> KSTREAM-AGGREGATE-0000000003
Processor: KSTREAM-AGGREGATE-0000000003(stores: [counts-store]) --> KTABLE-TOSTREAM-0000000007 <-- KSTREAM-SOURCE-0000000006
Processor: KTABLE-TOSTREAM-0000000007(stores: []) --> KSTREAM-SINK-0000000008 <-- KSTREAM-AGGREGATE-0000000003
Sink: KSTREAM-SINK-0000000008(topic: streams-wordcount-output) <-- KTABLE-TOSTREAM-0000000007
Global Stores:
none
正如我们在上面看到的,拓扑现在包含两个互不相连的子拓扑。
第一个子拓扑的 sink 节点将写入 repartition 主题 ,
,该节点将由第二个子拓扑的 source node 读取。
repartition 主题用于按其聚合键(在本例中为值 string)对源流进行 “shuffle”。
此外,在第一个子拓扑中,在分组节点和 sink 节点之间注入一个无状态节点,以过滤掉聚合键为空的任何中间记录。KSTREAM-SINK-0000000004
counts-store-repartition
KSTREAM-SOURCE-0000000006
KSTREAM-FILTER-0000000005
KSTREAM-KEY-SELECT-0000000002
在第二个子拓扑中,聚合节点与一个名为的状态存储相关联(名称由用户在 operator 中指定)。
从即将到来的流源节点收到每条记录后,聚合处理器将首先查询其关联的存储以获取该键的当前计数,增加 1,然后将新计数写回存储。
密钥的每个更新计数也将通过管道传输到下游节点,节点将此更新流解释为记录流,然后进一步通过管道传输到 sink 节点以写回 Kafka。KSTREAM-AGGREGATE-0000000003
counts-store
count
counts-store
KTABLE-TOSTREAM-0000000007
KSTREAM-SINK-0000000008
完整的代码如下所示(假设使用 lambda 表达式):
package myapps;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.state.KeyValueStore;
import java.util.Arrays;
import java.util.Locale;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
public class WordCount {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
final StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source = builder.stream("streams-plaintext-input");
source.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
.groupBy((key, value) -> value)
.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"))
.toStream()
.to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
final Topology topology = builder.build();
final KafkaStreams streams = new KafkaStreams(topology, props);
final CountDownLatch latch = new CountDownLatch(1);
// ... same as Pipe.java above
}
}