Kafka 流
编写任务关键型实时应用程序和微服务的最简单方法
Kafka Streams 是一个用于构建应用程序和微服务的客户端库,其中输入和输出数据存储在 Kafka 集群中。它将在客户端编写和部署标准 Java 和 Scala 应用程序的简单性与 Kafka 服务器端集群技术的优势相结合。
STREAMS API 导览
1Streams 简介
2创建 Streams 应用程序
3转换数据 第 1 部分
4转换数据 第 2 部分
为什么您会喜欢使用 Kafka Streams!
- 弹性、高度可扩展、容错
- 部署到容器、VM、裸机、云
- 同样适用于小型、中型和大型用例
- 与 Kafka 安全性完全集成
- 编写标准 Java 和 Scala 应用程序
- Exactly-once 处理语义
- 无需单独的处理集群
- 在 Mac、Linux、Windows 上开发
Kafka Streams 使用案例
《纽约时报》使用 Apache Kafka 和 Kafka Streams 实时存储和分发已发布的内容,并将其分发到各种应用程序和系统,以便读者可以使用这些内容。
作为欧洲领先的在线时尚零售商,Zalando 使用 Kafka 作为 ESB(企业服务总线),这有助于我们从整体架构过渡到微服务架构。使用 Kafka 处理事件流使我们的技术团队能够实现近乎实时的商业智能。
LINE 使用 Apache Kafka 作为我们的服务相互通信的中央数据中心。每天产生数千亿条消息,用于执行各种业务逻辑、威胁检测、搜索索引和数据分析。LINE 利用 Kafka Streams 可靠地转换和筛选主题,使消费者能够有效地使用子主题,同时由于其复杂而最小的代码库,保持易于维护性。
Pinterest 大规模使用 Apache Kafka 和 Kafka Streams 来支持其广告基础设施的实时预测性预算系统。使用 Kafka Streams,支出预测比以往任何时候都更加准确。
Rabobank 是荷兰 3 大银行之一。其数字神经系统 Business Event Bus 由 Apache Kafka 提供支持。它被越来越多的财务流程和服务使用,其中之一是 Rabo Alerts。该服务使用 Kafka Streams 构建,可实时提醒客户注意财务事件。
Trivago 是一个全球性的酒店搜索平台。我们专注于重塑旅客搜索和比较酒店的方式,同时通过我们的网站和应用程序向广大旅客提供访问权限,使酒店广告主能够发展业务。截至 2017 年,我们提供 190 多个国家/地区的约 180 万家酒店和其他住宿。我们使用 Kafka、Kafka Connect 和 Kafka Streams,使我们的开发人员能够自由访问公司中的数据。Kafka Streams 为我们的部分分析管道提供支持,并提供无限的选项来探索和操作我们手头的数据源。
Hello Kafka 流
下面的代码示例实现了一个 WordCount 应用程序,该应用程序具有弹性、高度可伸缩性、容错性、有状态,并且已准备好在大规模生产环境中运行
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.state.KeyValueStore;
import java.util.Arrays;
import java.util.Properties;
public class WordCountApplication {
public static void main(final String[] args) throws Exception {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> textLines = builder.stream("TextLinesTopic");
KTable<String, Long> wordCounts = textLines
.flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\W+")))
.groupBy((key, word) -> word)
.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"));
wordCounts.toStream().to("WordsWithCountsTopic", Produced.with(Serdes.String(), Serdes.Long()));
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
}
}
import java.util.Properties
import java.util.concurrent.TimeUnit
import org.apache.kafka.streams.kstream.Materialized
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala._
import org.apache.kafka.streams.scala.kstream._
import org.apache.kafka.streams.{KafkaStreams, StreamsConfig}
object WordCountApplication extends App {
import Serdes._
val props: Properties = {
val p = new Properties()
p.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application")
p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092")
p
}
val builder: StreamsBuilder = new StreamsBuilder
val textLines: KStream[String, String] = builder.stream[String, String]("TextLinesTopic")
val wordCounts: KTable[String, Long] = textLines
.flatMapValues(textLine => textLine.toLowerCase.split("\\W+"))
.groupBy((_, word) => word)
.count()(Materialized.as("counts-store"))
wordCounts.toStream.to("WordsWithCountsTopic")
val streams: KafkaStreams = new KafkaStreams(builder.build(), props)
streams.start()
sys.ShutdownHookThread {
streams.close(10, TimeUnit.SECONDS)
}
}