Kafka 流

编写任务关键型实时应用程序和微服务的最简单方法

Kafka Streams 是一个用于构建应用程序和微服务的客户端库,其中输入和输出数据存储在 Kafka 集群中。它将在客户端编写和部署标准 Java 和 Scala 应用程序的简单性与 Kafka 服务器端集群技术的优势相结合。

(点击图片将加载来自 YouTube 的视频) (点击图片将加载来自 YouTube 的视频)( 点击图片将加载来自 YouTube 的视频) (点击图片将加载来自 YouTube 的视频)

STREAMS API 导览

1Streams 简介

2创建 Streams 应用程序

3转换数据 第 1 部分

4转换数据 第 2 部分


为什么您会喜欢使用 Kafka Streams!

  • 弹性、高度可扩展、容错
  • 部署到容器、VM、裸机、云
  • 同样适用于小型、中型和大型用例
  • 与 Kafka 安全性完全集成
  • 编写标准 Java 和 Scala 应用程序
  • Exactly-once 处理语义
  • 无需单独的处理集群
  • 在 Mac、Linux、Windows 上开发

Kafka Streams 使用案例

《纽约时报》使用 Apache Kafka 和 Kafka Streams 实时存储和分发已发布的内容,并将其分发到各种应用程序和系统,以便读者可以使用这些内容。

作为欧洲领先的在线时尚零售商,Zalando 使用 Kafka 作为 ESB(企业服务总线),这有助于我们从整体架构过渡到微服务架构。使用 Kafka 处理事件流使我们的技术团队能够实现近乎实时的商业智能。

LINE 使用 Apache Kafka 作为我们的服务相互通信的中央数据中心。每天产生数千亿条消息,用于执行各种业务逻辑、威胁检测、搜索索引和数据分析。LINE 利用 Kafka Streams 可靠地转换和筛选主题,使消费者能够有效地使用子主题,同时由于其复杂而最小的代码库,保持易于维护性。

Pinterest 大规模使用 Apache Kafka 和 Kafka Streams 来支持其广告基础设施的实时预测性预算系统。使用 Kafka Streams,支出预测比以往任何时候都更加准确。

Rabobank 是荷兰 3 大银行之一。其数字神经系统 Business Event Bus 由 Apache Kafka 提供支持。它被越来越多的财务流程和服务使用,其中之一是 Rabo Alerts。该服务使用 Kafka Streams 构建,可实时提醒客户注意财务事件。

Trivago 是一个全球性的酒店搜索平台。我们专注于重塑旅客搜索和比较酒店的方式,同时通过我们的网站和应用程序向广大旅客提供访问权限,使酒店广告主能够发展业务。截至 2017 年,我们提供 190 多个国家/地区的约 180 万家酒店和其他住宿。我们使用 Kafka、Kafka Connect 和 Kafka Streams,使我们的开发人员能够自由访问公司中的数据。Kafka Streams 为我们的部分分析管道提供支持,并提供无限的选项来探索和操作我们手头的数据源。

Hello Kafka 流

下面的代码示例实现了一个 WordCount 应用程序,该应用程序具有弹性、高度可伸缩性、容错性、有状态,并且已准备好在大规模生产环境中运行

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.state.KeyValueStore;

import java.util.Arrays;
import java.util.Properties;

public class WordCountApplication {

   public static void main(final String[] args) throws Exception {
       Properties props = new Properties();
       props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
       props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092");
       props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
       props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

       StreamsBuilder builder = new StreamsBuilder();
       KStream<String, String> textLines = builder.stream("TextLinesTopic");
       KTable<String, Long> wordCounts = textLines
           .flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\W+")))
           .groupBy((key, word) -> word)
           .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"));
       wordCounts.toStream().to("WordsWithCountsTopic", Produced.with(Serdes.String(), Serdes.Long()));

       KafkaStreams streams = new KafkaStreams(builder.build(), props);
       streams.start();
   }

}
import java.util.Properties
import java.util.concurrent.TimeUnit

import org.apache.kafka.streams.kstream.Materialized
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala._
import org.apache.kafka.streams.scala.kstream._
import org.apache.kafka.streams.{KafkaStreams, StreamsConfig}

object WordCountApplication extends App {
  import Serdes._

  val props: Properties = {
    val p = new Properties()
    p.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application")
    p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092")
    p
  }

  val builder: StreamsBuilder = new StreamsBuilder
  val textLines: KStream[String, String] = builder.stream[String, String]("TextLinesTopic")
  val wordCounts: KTable[String, Long] = textLines
    .flatMapValues(textLine => textLine.toLowerCase.split("\\W+"))
    .groupBy((_, word) => word)
    .count()(Materialized.as("counts-store"))
  wordCounts.toStream.to("WordsWithCountsTopic")

  val streams: KafkaStreams = new KafkaStreams(builder.build(), props)
  streams.start()

  sys.ShutdownHookThread {
     streams.close(10, TimeUnit.SECONDS)
  }
}