教程:编写 Kafka Streams 应用程序

在本指南中,我们将从头开始设置您自己的项目,以使用 Kafka Streams 编写流处理应用程序。 如果您尚未阅读有关如何运行以 Kafka Streams 编写的 Streams 应用程序的快速入门,强烈建议您先阅读。

设置 Maven 项目

我们将使用 Kafka Streams Maven 原型通过以下命令创建 Streams 项目结构:

$ mvn archetype:generate \
-DarchetypeGroupId=org.apache.kafka \
-DarchetypeArtifactId=streams-quickstart-java \
-DarchetypeVersion={{fullDotVersion}} \
-DgroupId=streams.examples \
-DartifactId=streams-quickstart \
-Dversion=0.1 \
-Dpackage=myapps

如果需要,您可以对 和 parameters 使用不同的值。 假设使用了上述参数值,此命令将创建一个如下所示的项目结构:groupIdartifactIdpackage

$ tree streams-quickstart
streams-quickstart
|-- pom.xml
|-- src
    |-- main
        |-- java
        |   |-- myapps
        |       |-- LineSplit.java
        |       |-- Pipe.java
        |       |-- WordCount.java
        |-- resources
            |-- log4j.properties

项目中包含的文件已定义 Streams 依赖项。 请注意,生成的 API 以 Java 8 为目标,不适用于更高的 Java 版本。pom.xmlpom.xml

在 下已经有几个使用 Streams 库编写的示例程序。 由于我们将从头开始编写此类程序,因此我们现在可以删除这些示例:src/main/java

$ cd streams-quickstart
$ rm src/main/java/myapps/*.java

编写第一个 Streams 应用程序:Pipe

现在是编码时间!随意打开您最喜欢的 IDE 并导入此 Maven 项目,或者只需打开文本编辑器并在 下创建一个 java 文件。 让我们命名它:src/main/java/myappsPipe.java
package myapps;

public class Pipe {

    public static void main(String[] args) throws Exception {

    }
}

我们将填写函数来编写这个 pipe 程序。请注意,我们不会随时列出 import 语句,因为 IDE 通常可以自动添加它们。 但是,如果您使用的是文本编辑器,则需要手动添加导入,在本节结束时,我们将为您显示带有 import 语句的完整代码片段。main

编写 Streams 应用程序的第一步是创建一个映射,以指定 中定义的不同 Streams 执行配置值。 您需要设置的几个重要配置值是:,它指定了用于建立与 Kafka 集群的初始连接的主机/端口对列表, 和 ,它提供 Streams 应用程序的唯一标识符,以便与与同一 Kafka 集群通信的其他应用程序区分开来:java.util.PropertiesStreamsConfigStreamsConfig.BOOTSTRAP_SERVERS_CONFIGStreamsConfig.APPLICATION_ID_CONFIG

Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");    // assuming that the Kafka broker this application is talking to runs on local machine with port 9092

此外,您还可以自定义同一 map 中的其他配置,例如,记录键值对的默认序列化和反序列化库:

props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

有关 Kafka Streams 配置的完整列表,请参阅此

接下来,我们将定义 Streams 应用程序的计算逻辑。 在 Kafka Streams 中,此计算逻辑定义为连接的处理器节点。 我们可以使用拓扑构建器来构建这样的拓扑,topology

final StreamsBuilder builder = new StreamsBuilder();

然后,使用以下拓扑构建器从名为 Kafka 的主题创建源流:streams-plaintext-input

KStream<String, String> source = builder.stream("streams-plaintext-input");

现在,我们得到了一个从其源 Kafka 主题持续生成记录的 . 记录被组织为键入的键值对。 我们可以对这个流做的最简单的事情是将其写入另一个 Kafka 主题,比如说它被命名为:KStreamstreams-plaintext-inputStringstreams-pipe-output

source.to("streams-pipe-output");

请注意,我们还可以将上述两行连接成一行,如下所示:

builder.stream("streams-plaintext-input").to("streams-pipe-output");

我们可以通过执行以下操作来检查从此构建器创建的 类型 :topology

final Topology topology = builder.build();

并将其描述打印到标准输出中,如下所示:

System.out.println(topology.describe());

如果我们就到此停下来,编译并运行程序,它会输出以下信息:

$ mvn clean package
$ mvn exec:java -Dexec.mainClass=myapps.Pipe
Sub-topologies:
  Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000(topics: streams-plaintext-input) --> KSTREAM-SINK-0000000001
    Sink: KSTREAM-SINK-0000000001(topic: streams-pipe-output) <-- KSTREAM-SOURCE-0000000000
Global Stores:
  none

如上所示,它说明了构造的拓扑有两个处理器节点,一个 source 节点和一个 sink 节点。 从 Kafka 主题中持续读取记录并将其通过管道传输到其下游节点; 将它收到的每条记录按顺序写入另一个 Kafka 主题(和箭头指示该节点的下游和上游处理器节点,即拓扑图中的“children”和“parents”)。 它还说明了这个简单的拓扑没有与之关联的全局状态存储(我们将在下面的部分中更多地讨论状态存储)。KSTREAM-SOURCE-0000000000KSTREAM-SINK-0000000001KSTREAM-SOURCE-0000000000streams-plaintext-inputKSTREAM-SINK-0000000001KSTREAM-SINK-0000000001streams-pipe-output--><--

请注意,当我们在代码中构建拓扑时,我们始终可以像上面一样在任何给定点描述拓扑,因此作为用户,您可以交互式地 “尝试并品尝” 拓扑中定义的计算逻辑,直到您对它感到满意为止。 假设我们已经完成了这个简单的拓扑结构,它只是以无休止的流式方式将数据从一个 Kafka 主题传递到另一个 Kafka 主题。 现在,我们可以使用上面刚刚构建的两个组件来构造 Streams 客户端:实例中指定的配置映射和对象。java.util.PropertiesTopology

final KafkaStreams streams = new KafkaStreams(topology, props);

通过调用它的函数,我们可以触发此客户端的执行。 执行不会停止,直到在此客户端上调用。 例如,我们可以添加一个带有倒计时闩锁的关闭钩子,以捕获用户中断并在终止此程序时关闭客户端:start()close()

final CountDownLatch latch = new CountDownLatch(1);

// attach shutdown handler to catch control-c
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
    @Override
    public void run() {
        streams.close();
        latch.countDown();
    }
});

try {
    streams.start();
    latch.await();
} catch (Throwable e) {
    System.exit(1);
}
System.exit(0);

到目前为止,完整的代码如下所示:

package myapps;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;

import java.util.Properties;
import java.util.concurrent.CountDownLatch;

public class Pipe {

    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        final StreamsBuilder builder = new StreamsBuilder();

        builder.stream("streams-plaintext-input").to("streams-pipe-output");

        final Topology topology = builder.build();

        final KafkaStreams streams = new KafkaStreams(topology, props);
        final CountDownLatch latch = new CountDownLatch(1);

        // attach shutdown handler to catch control-c
        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
            @Override
            public void run() {
                streams.close();
                latch.countDown();
            }
        });

        try {
            streams.start();
            latch.await();
        } catch (Throwable e) {
            System.exit(1);
        }
        System.exit(0);
    }
}

如果您已经在 启动并运行 Kafka 代理,则 以及在该代理上创建的 主题 和 , 您可以使用 Maven 在 IDE 或命令行中运行此代码:localhost:9092streams-plaintext-inputstreams-pipe-output

$ mvn clean package
$ mvn exec:java -Dexec.mainClass=myapps.Pipe

有关如何运行 Streams 应用程序并观察其计算结果的详细说明, 请阅读 Play with a Streams 应用程序 部分。 我们不会在本节的其余部分讨论这个问题。

编写第二个 Streams 应用程序:Line Split

我们已经学习了如何构造一个 Streams 客户端及其两个关键组件:和 。 现在让我们继续通过增强当前拓扑来添加一些真正的 processing logic。 我们可以先通过复制现有 class 来创建另一个程序:StreamsConfigTopologyPipe.java

$ cp src/main/java/myapps/Pipe.java src/main/java/myapps/LineSplit.java

并更改其类名以及应用程序 id 配置以区分与原始程序:

public class LineSplit {

    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-linesplit");
        // ...
    }
}

由于源流的每个记录都是一个类型化的键值对,因此 让我们将 Value String 视为文本行,并使用运算符将其拆分为单词:StringFlatMapValues

KStream<String, String> source = builder.stream("streams-plaintext-input");
KStream<String, String> words = source.flatMapValues(new ValueMapper<String, Iterable<String>>() {
            @Override
            public Iterable<String> apply(String value) {
                return Arrays.asList(value.split("\\W+"));
            }
        });

该运算符将流作为其输入,并生成一个新流,该流的命名方式是按顺序处理其源流中的每条记录,并将其值字符串分解为单词列表,并生成 每个单词作为新记录添加到输出流中。 这是一个无状态的运算符,不需要跟踪任何以前收到的记录或处理的结果。 请注意,如果您使用的是 JDK 8,则可以使用 lambda 表达式并将上述代码简化为:sourcewordswords

KStream<String, String> source = builder.stream("streams-plaintext-input");
KStream<String, String> words = source.flatMapValues(value -> Arrays.asList(value.split("\\W+")));

最后,我们可以将单词 stream 写回另一个 Kafka 主题,比如 。 同样,这两个步骤可以连接如下(假设使用 lambda 表达式):streams-linesplit-output

KStream<String, String> source = builder.stream("streams-plaintext-input");
source.flatMapValues(value -> Arrays.asList(value.split("\\W+")))
      .to("streams-linesplit-output");

如果我们现在将这种增强拓扑描述为 ,我们将得到以下内容:System.out.println(topology.describe())

$ mvn clean package
$ mvn exec:java -Dexec.mainClass=myapps.LineSplit
Sub-topologies:
  Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000(topics: streams-plaintext-input) --> KSTREAM-FLATMAPVALUES-0000000001
    Processor: KSTREAM-FLATMAPVALUES-0000000001(stores: []) --> KSTREAM-SINK-0000000002 <-- KSTREAM-SOURCE-0000000000
    Sink: KSTREAM-SINK-0000000002(topic: streams-linesplit-output) <-- KSTREAM-FLATMAPVALUES-0000000001
  Global Stores:
    none

正如我们在上面看到的,一个新的处理器节点被注入到原始 source 节点和 sink 节点之间的拓扑中。 它将源节点作为其父节点,将接收器节点作为其子节点。 换句话说,源节点获取的每条记录都会首先遍历到新添加的要处理的节点, ,并且将生成一个或多个新记录。它们将继续向下遍历到 sink 节点,以写回 Kafka。 请注意,此处理器节点是 “无状态” 的,因为它不与任何 store (即 ) 关联。KSTREAM-FLATMAPVALUES-0000000001KSTREAM-FLATMAPVALUES-0000000001(stores: [])

完整的代码如下所示(假设使用 lambda 表达式):

package myapps;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;

import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;

public class LineSplit {

    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-linesplit");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        final StreamsBuilder builder = new StreamsBuilder();

        KStream<String, String> source = builder.stream("streams-plaintext-input");
        source.flatMapValues(value -> Arrays.asList(value.split("\\W+")))
              .to("streams-linesplit-output");

        final Topology topology = builder.build();
        final KafkaStreams streams = new KafkaStreams(topology, props);
        final CountDownLatch latch = new CountDownLatch(1);

        // ... same as Pipe.java above
    }
}

编写第三个 Streams 应用程序:Wordcount

现在让我们更进一步,通过计算从源文本流中分割的单词的出现次数,向拓扑添加一些 “stateful” 计算。 按照类似的步骤,让我们基于该类创建另一个程序:LineSplit.java

public class WordCount {

    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
        // ...
    }
}

为了计算单词数,我们可以首先修改运算符,将所有单词都视为小写(假设使用 lambda 表达式):flatMapValues

source.flatMapValues(new ValueMapper<String, Iterable<String>>() {
    @Override
    public Iterable<String> apply(String value) {
        return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"));
    }
});

为了进行计数聚合,我们必须首先指定我们要使用运算符在值字符串(即小写单词)上对流进行键控。 此运算符生成一个新的分组流,然后可以通过运算符聚合该流,该运算符在每个分组的键上生成运行计数:groupBycount

KTable<String, Long> counts =
source.flatMapValues(new ValueMapper<String, Iterable<String>>() {
            @Override
            public Iterable<String> apply(String value) {
                return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"));
            }
        })
      .groupBy(new KeyValueMapper<String, String, String>() {
           @Override
           public String apply(String key, String value) {
               return value;
           }
        })
      // Materialize the result into a KeyValueStore named "counts-store".
      // The Materialized store is always of type <Bytes, byte[]> as this is the format of the inner most store.
      .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>> as("counts-store"));

请注意,该运算符有一个参数,该参数指定 Running Count 应存储在名为 的状态存储中。 可以实时查询此存储,详细信息在 开发人员手册.countMaterializedcounts-storecounts-store

我们还可以将 KTable 的 changelog 流写回另一个 Kafka 主题,比如 。 由于结果是 changelog 流,因此输出主题应在启用日志压缩的情况下进行配置。 请注意,这次值类型不再是 but ,因此默认序列化类不再可用于将其写入 Kafka。 我们需要为类型提供覆盖的序列化方法,否则将抛出运行时异常:countsstreams-wordcount-outputstreams-wordcount-outputStringLongLong

counts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));

请注意,为了从 topic 读取 changelog 流 , 需要将值 deserialization 设置为 。 有关此内容的详细信息,请参阅 Play with a Streams Application 部分。 假设可以使用 JDK 8 中的 lambda 表达式,则上述代码可以简化为:streams-wordcount-outputorg.apache.kafka.common.serialization.LongDeserializer

KStream<String, String> source = builder.stream("streams-plaintext-input");
source.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
      .groupBy((key, value) -> value)
      .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"))
      .toStream()
      .to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));

如果我们再次将这种增强拓扑描述为 ,我们将得到以下内容:System.out.println(topology.describe())

$ mvn clean package
$ mvn exec:java -Dexec.mainClass=myapps.WordCount
Sub-topologies:
  Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000(topics: streams-plaintext-input) --> KSTREAM-FLATMAPVALUES-0000000001
    Processor: KSTREAM-FLATMAPVALUES-0000000001(stores: []) --> KSTREAM-KEY-SELECT-0000000002 <-- KSTREAM-SOURCE-0000000000
    Processor: KSTREAM-KEY-SELECT-0000000002(stores: []) --> KSTREAM-FILTER-0000000005 <-- KSTREAM-FLATMAPVALUES-0000000001
    Processor: KSTREAM-FILTER-0000000005(stores: []) --> KSTREAM-SINK-0000000004 <-- KSTREAM-KEY-SELECT-0000000002
    Sink: KSTREAM-SINK-0000000004(topic: counts-store-repartition) <-- KSTREAM-FILTER-0000000005
  Sub-topology: 1
    Source: KSTREAM-SOURCE-0000000006(topics: counts-store-repartition) --> KSTREAM-AGGREGATE-0000000003
    Processor: KSTREAM-AGGREGATE-0000000003(stores: [counts-store]) --> KTABLE-TOSTREAM-0000000007 <-- KSTREAM-SOURCE-0000000006
    Processor: KTABLE-TOSTREAM-0000000007(stores: []) --> KSTREAM-SINK-0000000008 <-- KSTREAM-AGGREGATE-0000000003
    Sink: KSTREAM-SINK-0000000008(topic: streams-wordcount-output) <-- KTABLE-TOSTREAM-0000000007
Global Stores:
  none

正如我们在上面看到的,拓扑现在包含两个互不相连的子拓扑。 第一个子拓扑的 sink 节点将写入 repartition 主题 , ,该节点将由第二个子拓扑的 source node 读取。 repartition 主题用于按其聚合键(在本例中为值 string)对源流进行 “shuffle”。 此外,在第一个子拓扑中,在分组节点和 sink 节点之间注入一个无状态节点,以过滤掉聚合键为空的任何中间记录。KSTREAM-SINK-0000000004counts-store-repartitionKSTREAM-SOURCE-0000000006KSTREAM-FILTER-0000000005KSTREAM-KEY-SELECT-0000000002

在第二个子拓扑中,聚合节点与一个名为的状态存储相关联(名称由用户在 operator 中指定)。 从即将到来的流源节点收到每条记录后,聚合处理器将首先查询其关联的存储以获取该键的当前计数,增加 1,然后将新计数写回存储。 密钥的每个更新计数也将通过管道传输到下游节点,节点将此更新流解释为记录流,然后进一步通过管道传输到 sink 节点以写回 Kafka。KSTREAM-AGGREGATE-0000000003counts-storecountcounts-storeKTABLE-TOSTREAM-0000000007KSTREAM-SINK-0000000008

完整的代码如下所示(假设使用 lambda 表达式):

package myapps;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.state.KeyValueStore;

import java.util.Arrays;
import java.util.Locale;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;

public class WordCount {

    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        final StreamsBuilder builder = new StreamsBuilder();

        KStream<String, String> source = builder.stream("streams-plaintext-input");
        source.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
              .groupBy((key, value) -> value)
              .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"))
              .toStream()
              .to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));

        final Topology topology = builder.build();
        final KafkaStreams streams = new KafkaStreams(topology, props);
        final CountDownLatch latch = new CountDownLatch(1);

        // ... same as Pipe.java above
    }
}