Streams 安全性
Kafka Streams 与 Kafka 的安全功能原生集成,并支持所有 Kafka 中的客户端安全功能。Streams 利用 Java 创建者和使用者 API。
要保护您的 Stream processing 应用程序,请在相应的 Kafka 创建器中配置安全设置 和使用者客户端,然后在 Kafka Streams 应用程序中指定相应的配置设置。
Kafka 支持集群加密和身份验证,包括经过身份验证和未经身份验证的混合、 以及加密和非加密客户端。使用安全性是可选的。
以下是一些相关的客户端安全功能:
- 加密应用程序和 Kafka 代理之间的传输中数据
- 您可以启用应用程序与 Kafka 代理之间的客户端-服务器通信加密。 例如,您可以将应用程序配置为在读取和写入数据时始终使用加密 kafka.在跨安全域(如内部网络、公共网络)读取和写入数据时,这一点至关重要 Internet 和合作伙伴网络。
- 客户端身份验证
- 您可以为从应用程序到 Kafka 代理的连接启用客户端身份验证。例如,您可以 定义仅允许特定应用程序连接到您的 Kafka 集群。
- 客户端授权
- 您可以启用应用程序对读取和写入操作的客户端授权。例如,您可以定义 仅允许特定应用程序从 Kafka 主题中读取数据。您还可以限制对 Kafka 的写入访问 防止数据污染或欺诈活动的主题。
有关 Apache Kafka 中的安全功能的更多信息,请参阅 Kafka 安全性。
安全 Kafka 集群所需的 ACL 设置
Kafka 集群可以使用 ACL 来控制对资源的访问(例如创建主题的能力),对于此类集群,每个客户端 包括 Kafka Streams,需要以特定用户身份进行身份验证,才能获得适当的访问权限。 特别是,当 Streams 应用程序在安全的 Kafka 集群上运行时,运行该应用程序的主体必须具有 ACL 设置,以便应用程序具有创建、读取和写入内部主题的权限。
为避免向应用程序提供此权限,您可以手动创建所需的内部主题。
如果内部主题存在,Kafka Streams 将不会尝试重新创建它们。
请注意,必须使用正确数量的分区创建内部重新分区和更改日志主题,否则,Kafka Streams 将在启动时失败。
创建主题时,必须使用与输入主题相同的分区数,或者如果有多个主题,则为跨所有输入主题的最大分区数。
此外,Topology#describe()
<application.id>-<operatorName>-<suffix>
suffix
repartition
changelog
由于所有内部主题以及嵌入的消费者组名称都以应用程序 ID 为前缀,因此
建议在带前缀的资源模式上使用 ACL
配置控制列表以允许客户端管理以此前缀启动的所有主题和使用者组
如(有关详细信息,请参阅 KIP-277 和 KIP-290)。--resource-pattern-type prefixed --topic your.application.id --operation All
安全示例
目的是配置 Kafka Streams 应用程序,以便在 与其 Kafka 集群通信。
此示例假定集群中的 Kafka 代理已经设置了其安全设置,并且必要的 SSL 证书可供应用程序在本地文件系统位置使用。例如,如果您使用的是 Docker 然后,您还必须将这些 SSL 证书包含在 Docker 映像中的正确位置。
下面的代码段显示了为您的 Kafka Streams 应用程序及其从中读取和写入的 Kafka 集群:
# Essential security settings to enable client authentication and SSL encryption
bootstrap.servers=kafka.example.com:9093
security.protocol=SSL
ssl.truststore.location=/etc/security/tls/kafka.client.truststore.jks
ssl.truststore.password=test1234
ssl.keystore.location=/etc/security/tls/kafka.client.keystore.jks
ssl.keystore.password=test1234
ssl.key.password=test1234
在应用程序中为您的实例配置这些设置。这些设置将加密任何
data-in-transit,您的应用程序将根据
它正在与之通信的 Kafka 代理。请注意,此示例不涵盖客户端授权。Properties
// Code of your Java application that uses the Kafka Streams library
Properties settings = new Properties();
settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "secure-kafka-streams-app");
// Where to find secure Kafka brokers. Here, it's on port 9093.
settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka.example.com:9093");
//
// ...further non-security related settings may follow here...
//
// Security settings.
// 1. These settings must match the security settings of the secure Kafka cluster.
// 2. The SSL trust store and key store files must be locally accessible to the application.
settings.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
settings.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "/etc/security/tls/kafka.client.truststore.jks");
settings.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "test1234");
settings.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "/etc/security/tls/kafka.client.keystore.jks");
settings.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "test1234");
settings.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "test1234");
如果您在应用程序中错误地配置了安全设置,它将在运行时失败,通常在您之后
启动它。例如,如果为设置输入的密码不正确,则会显示错误消息
与此类似,应用程序将终止:ssl.keystore.password
# Misconfigured ssl.keystore.password
Exception in thread "main" org.apache.kafka.common.KafkaException: Failed to construct kafka producer
[...snip...]
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException:
java.io.IOException: Keystore was tampered with, or password was incorrect
[...snip...]
Caused by: java.security.UnrecoverableKeyException: Password verification failed
监控 Kafka Streams 应用程序日志文件中是否存在此类错误消息,以快速发现任何配置错误的应用程序。