第 1 步:获取 Kafka

下载最新的 Kafka 版本并提取它:

$ tar -xzf kafka_{{scalaVersion}}-{{fullDotVersion}}.tgz
$ cd kafka_{{scalaVersion}}-{{fullDotVersion}}

第 2 步:启动 Kafka 环境

注意:您的本地环境必须安装 Java 8+。

Apache Kafka 可以使用 KRaft 或 ZooKeeper 启动。要开始使用任一配置,请遵循以下部分之一,但不能同时执行这两部分。

带有 KRaft 的 Kafka

Kafka 可以使用本地脚本和下载的文件或 docker 镜像通过 KRaft 模式运行。按照以下部分之一(但不能同时执行)操作启动 Kafka 服务器。

使用下载的文件

生成集群 UUID

$ KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"

设置日志目录的格式

$ bin/kafka-storage.sh format --standalone -t $KAFKA_CLUSTER_ID -c config/kraft/reconfig-server.properties

启动 Kafka 服务器

$ bin/kafka-server-start.sh config/kraft/reconfig-server.properties

成功启动 Kafka 服务器后,您将有一个基本的 Kafka 环境运行并可供使用。

使用基于 JVM 的 Apache Kafka Docker 镜像

获取 Docker 镜像:

$ docker pull apache/kafka:{{fullDotVersion}}

启动 Kafka Docker 容器:

$ docker run -p 9092:9092 apache/kafka:{{fullDotVersion}}
使用基于 GraalVM 的原生 Apache Kafka Docker 镜像

获取 Docker 镜像:

$ docker pull apache/kafka-native:{{fullDotVersion}}

启动 Kafka Docker 容器:

$ docker run -p 9092:9092 apache/kafka-native:{{fullDotVersion}}
带有 ZooKeeper 的 Kafka

运行以下命令以按正确顺序启动所有服务:

# Start the ZooKeeper service
$ bin/zookeeper-server-start.sh config/zookeeper.properties

打开另一个终端会话并运行:

# Start the Kafka broker service
$ bin/kafka-server-start.sh config/server.properties

成功启动所有服务后,您将有一个基本的 Kafka 环境运行并可供使用。

第 3 步:创建主题以存储您的事件

Kafka 是一个分布式事件流平台,可让您读取、写入、存储和处理事件(在文档中也称为记录消息) 跨多台机器。

示例事件包括付款交易、手机的地理位置更新、发货订单、传感器测量 来自 IoT 设备或医疗设备等等。这些事件被组织并存储在主题中。 非常简单,主题类似于文件系统中的文件夹,事件是该文件夹中的文件。

因此,在编写第一个事件之前,必须创建一个主题。打开另一个终端会话并运行:

$ bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092

Kafka 的所有命令行工具都有额外的选项:在没有任何 参数来显示使用信息。例如,它还可以显示新主题的分区计数等详细信息kafka-topics.sh

$ bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092
Topic: quickstart-events        TopicId: NPmZHyhbR9y00wMglMH2sg PartitionCount: 1       ReplicationFactor: 1	Configs:
Topic: quickstart-events Partition: 0    Leader: 0   Replicas: 0 Isr: 0

步骤 4:将一些事件写入主题

Kafka 客户端通过网络与 Kafka 代理通信以写入(或读取)事件。 收到事件后,代理将以持久且容错的方式存储事件,只要您 需要——甚至永远。

运行控制台创建者客户端,将一些事件写入您的主题中。 默认情况下,您输入的每一行都会导致将单独的事件写入主题。

$ bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
>This is my first event
>This is my second event

您可以随时停止生产者客户端。Ctrl-C

步骤 5:读取事件

打开另一个终端会话并运行控制台使用者客户端以读取您刚刚创建的事件:

$ bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
This is my first event
This is my second event

您可以随时停止使用者客户端。Ctrl-C

随意尝试:例如,切换回您的 producer 终端(上一步)以编写 additional events 的 EVENTS 中,并查看事件如何立即显示在您的消费终端中。

由于事件持久存储在 Kafka 中,因此您可以根据需要多次读取这些事件,并由任意数量的使用者读取。 您可以通过打开另一个终端会话并再次重新运行上一个命令来轻松验证这一点。

第 6 步:使用 Kafka Connect 将数据导入/导出为事件流

您可能在现有系统(如关系数据库或传统消息传递系统)中拥有大量数据。 以及许多已经使用这些系统的应用程序。Kafka Connect 允许您持续摄取 数据从外部系统到 Kafka,反之亦然。它是一个运行连接器的可扩展工具,这些连接器实现用于与外部系统交互的自定义逻辑。 因此,将现有系统与 Kafka 集成非常容易。为了使此过程更加简单, 有数百个这样的连接器随时可用。

在本快速入门中,我们将了解如何使用导入数据的简单连接器运行 Kafka Connect 从文件导出到 Kafka 主题,并将数据从 Kafka 主题导出到文件。

首先,确保添加到 Connect 工作程序配置中的属性。 在本快速入门中,我们将使用相对路径,并将连接器的包视为一个 uber jar,当从安装目录运行快速入门命令时,该 jar 将起作用。 但是,值得注意的是,对于生产部署,使用绝对路径始终是可取的。有关如何设置此配置的详细说明,请参阅 plugin.pathconnect-file-{{fullDotVersion}}.jarplugin.path

编辑文件,添加或更改符合以下内容的配置属性,然后保存文件:config/connect-standalone.propertiesplugin.path

$ echo "plugin.path=libs/connect-file-{{fullDotVersion}}.jar" >> config/connect-standalone.properties

然后,首先创建一些种子数据以进行测试:

$ echo -e "foo\nbar" > test.txt
或者在 Windows 上:
$ echo foo > test.txt
$ echo bar >> test.txt

接下来,我们将启动两个以独立模式运行的连接器,这意味着它们在单个本地专用 过程。我们提供了三个配置文件作为参数。第一个始终是 Kafka Connect 的配置 进程,其中包含常见配置,例如要连接的 Kafka 代理和数据序列化格式。 其余配置文件分别指定要创建的连接器。这些文件包括一个唯一的连接器名称,即 类来实例化,以及连接器所需的任何其他配置。

$ bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties

这些示例配置文件(包含在 Kafka 中)使用您之前启动的默认本地集群配置 并创建两个连接器:第一个是源连接器,它从输入文件中读取行并将每个行生成到 Kafka 主题 第二个是接收器连接器,它从 Kafka 主题中读取消息,并在输出文件中将每个消息生成为一行。

在启动过程中,您将看到许多日志消息,其中一些消息指示连接器正在实例化。 Kafka Connect 进程启动后,源连接器应开始从 和 读取行 将它们生成到 topic ,sink 连接器应开始从主题中读取消息并将其写入 file 。我们可以验证数据是否已通过整个管道交付 通过检查输出文件的内容:test.txtconnect-testconnect-testtest.sink.txt

$ more test.sink.txt
foo
bar

请注意,数据存储在 Kafka topic 中,因此我们还可以运行控制台使用者来查看 data 中(或使用自定义使用者代码对其进行处理):connect-test

$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}
…

连接器继续处理数据,因此我们可以将数据添加到文件中,并查看它在管道中移动:

$ echo "Another line" >> test.txt

您应该会看到该行出现在控制台使用者输出和 sink 文件中。

第 7 步:使用 Kafka Streams 处理事件

将数据作为事件存储在 Kafka 中后,您可以使用适用于 Java/Scala 的 Kafka Streams 客户端库处理数据。 它允许您实施任务关键型实时应用程序和微服务,其中输入 和/或输出数据存储在 Kafka 主题中。Kafka Streams 结合了编写和部署的简单性 客户端的标准 Java 和 Scala 应用程序,具有 Kafka 服务器端集群的优势 技术使这些应用程序具有高度可扩展性、弹性、容错性和分布式。库 支持恰好一次处理、有状态操作和聚合、窗口化、联接、基于处理 在事件时间等等。

为了让您初步体验,以下是如何实现流行的算法:WordCount

KStream<String, String> textLines = builder.stream("quickstart-events");

KTable<String, Long> wordCounts = textLines
            .flatMapValues(line -> Arrays.asList(line.toLowerCase().split(" ")))
            .groupBy((keyIgnored, word) -> word)
            .count();

wordCounts.toStream().to("output-topic", Produced.with(Serdes.String(), Serdes.Long()));

Kafka Streams 演示应用程序开发教程演示了如何从头到尾编写和运行此类流式处理应用程序。

步骤 8:终止 Kafka 环境

现在,您已经完成了快速入门的结尾,请随时拆除 Kafka 环境,或者 继续玩。

  1. 使用 停止创建者和使用者客户端(如果尚未执行此操作)。Ctrl-C
  2. 使用 停止 Kafka 代理。Ctrl-C
  3. 最后,如果遵循 Kafka with ZooKeeper 部分,请使用 .Ctrl-C

如果您还想删除本地 Kafka 环境的任何数据,包括您创建的任何事件 在此过程中,运行以下命令:

$ rm -rf /tmp/kafka-logs /tmp/zookeeper /tmp/kraft-combined-logs

祝贺!

您已成功完成 Apache Kafka 快速入门。

要了解更多信息,我们建议执行以下步骤: