交互式查询

交互式查询允许您从应用程序外部利用应用程序的状态。Kafka Streams 使您的应用程序可查询。

应用程序的完整状态通常拆分到应用程序的许多分布式实例中,以及由这些应用程序实例在本地管理的许多状态存储中。

有本地和远程组件,用于以交互方式查询应用程序的状态。

本地状态
应用程序实例可以查询状态的本地管理部分,并直接查询自己的本地状态存储。您可以在应用程序代码的其他部分使用相应的本地数据,只要它不需要调用 Kafka Streams API。查询 state store 始终是只读的,以保证底层 state store 永远不会在带外发生突变(例如,您不能添加新条目)。状态存储只能由相应的处理器拓扑和它所操作的输入数据来改变。有关更多信息,请参阅查询应用程序实例的本地状态存储
远程状态

要查询应用程序的完整状态,您必须连接状态的各个片段,包括:

  • 查询本地 State Store
  • 发现网络中应用程序的所有正在运行的实例及其状态存储
  • 通过网络(例如 RPC 层)与这些实例通信

连接这些 fragment 可实现同一应用程序实例之间的通信以及来自其他应用程序的通信,以实现交互式查询。有关更多信息,请参阅查询整个应用程序的远程状态存储

Kafka Streams 原生提供了以交互方式查询应用程序状态所需的所有功能,除非您希望通过交互式查询公开应用程序的完整状态。要允许应用程序实例通过网络进行通信,您必须向应用程序添加远程过程调用 (RPC) 层(例如 REST API)。

下表显示了 Kafka Streams 对各种过程的本机通信支持。

程序 应用程序实例 整个应用程序
查询 App 实例的本地状态存储 支持 支持
使应用程序实例可供其他人发现 支持 支持
发现所有正在运行的应用程序实例及其状态存储 支持 支持
通过网络 (RPC) 与应用程序实例通信 支持 不支持(您必须配置)

查询应用程序实例的本地状态存储

Kafka Streams 应用程序通常在多个实例上运行。在任何给定实例上本地可用的状态只是应用程序整个状态的子集。查询实例上的本地存储将仅返回该特定实例上本地可用的数据。

该方法按名称和类型查找应用程序实例的本地状态存储。 请注意,版本控制状态存储目前不支持交互式查询。KafkaStreams#store(...)

每个应用程序实例都可以直接查询其任何本地状态存储。

状态存储的名称是在创建存储时定义的。您可以使用处理器 API 显式创建存储,也可以通过在 DSL 中使用有状态操作隐式创建存储。

状态存储的类型由 定义。您可以通过类 . Kafka Streams 目前有两种内置类型:QueryableStoreTypeQueryableStoreTypes

您还可以实现自己的 QueryableStoreType,如查询本地自定义状态存储部分所述。

注意

Kafka Streams 为每个流分区实现一个状态存储。这意味着您的应用程序可能会管理 许多底层 state store 的 state store 中。API 使您能够查询所有底层商店,而无需知道是哪个 分区。

查询本地键值存储

要查询本地键值存储,您必须首先创建具有键值存储的拓扑。此示例创建一个 key-value 名为 “CountsKeyValueStore” 的存储。此存储将保存在主题 “word-count-input” 上找到的任何单词的最新计数。

Properties  props = ...;
StreamsBuilder builder = ...;
KStream<String, String> textLines = ...;

// Define the processing topology (here: WordCount)
KGroupedStream<String, String> groupedByWord = textLines
  .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
  .groupBy((key, word) -> word, Grouped.with(stringSerde, stringSerde));

// Create a key-value store named "CountsKeyValueStore" for the all-time word counts
groupedByWord.count(Materialized.<String, String, KeyValueStore<Bytes, byte[]>as("CountsKeyValueStore"));

// Start an instance of the topology
KafkaStreams streams = new KafkaStreams(builder, props);
streams.start();

应用程序启动后,您可以访问“CountsKeyValueStore”,然后通过 ReadOnlyKeyValueStore API 查询它:

// Get the key-value store CountsKeyValueStore
ReadOnlyKeyValueStore<String, Long> keyValueStore =
    streams.store("CountsKeyValueStore", QueryableStoreTypes.keyValueStore());

// Get value by key
System.out.println("count for hello:" + keyValueStore.get("hello"));

// Get the values for a range of keys available in this application instance
KeyValueIterator<String, Long> range = keyValueStore.range("all", "streams");
while (range.hasNext()) {
  KeyValue<String, Long> next = range.next();
  System.out.println("count for " + next.key + ": " + next.value);
}

// Get the values for all of the keys available in this application instance
KeyValueIterator<String, Long> range = keyValueStore.all();
while (range.hasNext()) {
  KeyValue<String, Long> next = range.next();
  System.out.println("count for " + next.key + ": " + next.value);
}

您还可以使用采用 a 的重载方法实现无状态运算符的结果,如以下示例所示:queryableStoreName

StreamsBuilder builder = ...;
KTable<String, Integer> regionCounts = ...;

// materialize the result of filtering corresponding to odd numbers
// the "queryableStoreName" can be subsequently queried.
KTable<String, Integer> oddCounts = numberLines.filter((region, count) -> (count % 2 != 0),
  Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>as("queryableStoreName"));

// do not materialize the result of filtering corresponding to even numbers
// this means that these results will not be materialized and cannot be queried.
KTable<String, Integer> oddCounts = numberLines.filter((region, count) -> (count % 2 == 0));

查询本地窗口存储

对于任何给定的键,窗口存储可能会有许多结果,因为该键可以存在于多个窗口中。 但是,对于给定的键,每个窗口只有一个结果。

要查询本地窗口存储,您必须首先创建具有窗口存储的拓扑。此示例创建一个窗口存储 名为 “CountsWindowStore”,其中包含 1 分钟窗口中的单词计数。

StreamsBuilder builder = ...;
KStream<String, String> textLines = ...;

// Define the processing topology (here: WordCount)
KGroupedStream<String, String> groupedByWord = textLines
  .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
  .groupBy((key, word) -> word, Grouped.with(stringSerde, stringSerde));

// Create a window state store named "CountsWindowStore" that contains the word counts for every minute
groupedByWord.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofSeconds(60)))
  .count(Materialized.<String, Long, WindowStore<Bytes, byte[]>as("CountsWindowStore"));

应用程序启动后,您可以访问“CountsWindowStore”,然后通过 ReadOnlyWindowStore API 查询它:

// Get the window store named "CountsWindowStore"
ReadOnlyWindowStore<String, Long> windowStore =
    streams.store("CountsWindowStore", QueryableStoreTypes.windowStore());

// Fetch values for the key "world" for all of the windows available in this application instance.
// To get *all* available windows we fetch windows from the beginning of time until now.
Instant timeFrom = Instant.ofEpochMilli(0); // beginning of time = oldest available
Instant timeTo = Instant.now(); // now (in processing-time)
WindowStoreIterator<Long> iterator = windowStore.fetch("world", timeFrom, timeTo);
while (iterator.hasNext()) {
  KeyValue<Long, Long> next = iterator.next();
  long windowTimestamp = next.key;
  System.out.println("Count of 'world' @ time " + windowTimestamp + " is " + next.value);
}

查询本地自定义状态存储

注意

只有 Processor API 支持自定义状态存储。

在查询自定义状态存储之前,您必须实现以下接口:

  • 您的自定义状态存储必须实现 .StateStore
  • 您必须有一个接口来表示 store 上可用的操作。
  • 您必须提供用于创建 store 实例的 实现。StoreBuilder
  • 建议您提供一个接口,以限制对只读操作的访问。这可以防止此 API 的用户在带外更改正在运行的 Kafka Streams 应用程序的状态。

自定义商店的类/接口层次结构可能如下所示:

public class MyCustomStore<K,V> implements StateStore, MyWriteableCustomStore<K,V> {
  // implementation of the actual store
}

// Read-write interface for MyCustomStore
public interface MyWriteableCustomStore<K,V> extends MyReadableCustomStore<K,V> {
  void write(K Key, V value);
}

// Read-only interface for MyCustomStore
public interface MyReadableCustomStore<K,V> {
  V read(K key);
}

public class MyCustomStoreBuilder implements StoreBuilder {
  // implementation of the supplier for MyCustomStore
}

要使此商店可查询,您必须:

  • 提供 QueryableStoreType 的实现。
  • 提供一个包装类,该类可以访问 store 的所有底层实例并用于查询。

以下是如何实现:QueryableStoreType

public class MyCustomStoreType<K,V> implements QueryableStoreType<MyReadableCustomStore<K,V>> {

  // Only accept StateStores that are of type MyCustomStore
  public boolean accepts(final StateStore stateStore) {
    return stateStore instanceOf MyCustomStore;
  }

  public MyReadableCustomStore<K,V> create(final StateStoreProvider storeProvider, final String storeName) {
      return new MyCustomStoreTypeWrapper(storeProvider, storeName, this);
  }

}

需要一个包装器类,因为 Kafka Streams 应用程序的每个实例都可以运行多个流任务并管理 特定 state store 的多个本地实例。包装器类隐藏了这种复杂性,并允许您查询 “logical” state store 的 intent 实例,而无需了解该 state store 的所有底层本地实例。

在实现包装器类时,您必须使用 StateStoreProvider 接口来访问商店的底层实例。 返回 state 的 a 具有给定 storeName 且类型由 定义的 stores 。StateStoreProvider#stores(String storeName, QueryableStoreType<T> queryableStoreType)ListqueryableStoreType

以下是包装器的示例实现 (Java 8+):

// We strongly recommended implementing a read-only interface
// to restrict usage of the store to safe read operations!
public class MyCustomStoreTypeWrapper<K,V> implements MyReadableCustomStore<K,V> {

  private final QueryableStoreType<MyReadableCustomStore<K, V>> customStoreType;
  private final String storeName;
  private final StateStoreProvider provider;

  public CustomStoreTypeWrapper(final StateStoreProvider provider,
                              final String storeName,
                              final QueryableStoreType<MyReadableCustomStore<K, V>> customStoreType) {

    // ... assign fields ...
  }

  // Implement a safe read method
  @Override
  public V read(final K key) {
    // Get all the stores with storeName and of customStoreType
    final List<MyReadableCustomStore<K, V>> stores = provider.getStores(storeName, customStoreType);
    // Try and find the value for the given key
    final Optional<V> value = stores.stream().filter(store -> store.read(key) != null).findFirst();
    // Return the value if it exists
    return value.orElse(null);
  }

}

您现在可以查找和查询您的自定义商店:

Topology topology = ...;
ProcessorSupplier processorSuppler = ...;

// Create CustomStoreSupplier for store name the-custom-store
MyCustomStoreBuilder customStoreBuilder = new MyCustomStoreBuilder("the-custom-store") //...;
// Add the source topic
topology.addSource("input", "inputTopic");
// Add a custom processor that reads from the source topic
topology.addProcessor("the-processor", processorSupplier, "input");
// Connect your custom state store to the custom processor above
topology.addStateStore(customStoreBuilder, "the-processor");

KafkaStreams streams = new KafkaStreams(topology, config);
streams.start();

// Get access to the custom store
MyReadableCustomStore<String,String> store = streams.store("the-custom-store", new MyCustomStoreType<String,String>());
// Query the store
String value = store.read("key");

查询整个应用的远程状态存储

要查询整个应用的远程状态,您必须向其他应用程序公开应用程序的完整状态,包括 在不同计算机上运行的应用程序。

例如,您有一个 Kafka Streams 应用程序,用于处理多玩家视频游戏中的用户事件,并且您希望直接检索每个用户的最新状态并将其显示在移动应用程序中。以下是使应用程序的完整状态可查询所需的步骤:

  1. 将 RPC 层添加到您的应用程序,以便 您的应用程序实例可以通过网络(例如,REST API、Thrift、自定义协议、 等等)。实例必须响应交互式查询。您可以按照提供的参考示例来获取 开始。
  2. 公开 的 RPC 端点 应用程序的实例通过 Kafka Streams 的配置设置。由于 RPC 终端节点在网络中必须是唯一的,每个实例对于此配置设置都有自己的值。 这使得应用程序实例可被其他实例发现。application.server
  3. 在 RPC 层中,发现远程应用程序实例及其状态存储,并查询本地可用的状态存储,以使应用程序的完整状态可查询。如果特定实例缺少本地数据来响应查询,则远程应用程序实例可以将查询转发到其他应用程序实例。本地可用的状态存储可以直接响应查询。

发现同一应用程序的任何正在运行的实例以及它们为其公开的相应 RPC 端点 交互式查询

将 RPC 层添加到您的应用程序

添加 RPC 层的方法有很多种。唯一的要求是 RPC 层嵌入在 Kafka Streams 中 应用程序,并且它公开了其他应用程序实例和应用程序可以连接到的终端节点。

公开应用程序的 RPC 端点

要在分布式 Kafka Streams 应用程序中启用远程状态存储发现,您必须在 config 属性中设置 configuration 属性。 该属性定义了一个唯一的对,该对指向 Kafka Streams 应用程序的相应实例的 RPC 端点。 此配置属性的值将因应用程序的实例而异。 设置此属性后,Kafka Streams 将通过 StreamsMetadata 实例跟踪应用程序的每个实例的 RPC 端点信息、其状态存储和分配的流分区。application.serverhost:port

提示

考虑利用应用程序的公开 RPC 端点来获得更多功能,例如 搭载超越交互式查询的其他应用程序间通信。

此示例说明如何配置和运行支持发现其状态存储的 Kafka Streams 应用程序。

Properties props = new Properties();
// Set the unique RPC endpoint of this application instance through which it
// can be interactively queried.  In a real application, the value would most
// probably not be hardcoded but derived dynamically.
String rpcEndpoint = "host1:4460";
props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, rpcEndpoint);
// ... further settings may follow here ...

StreamsBuilder builder = new StreamsBuilder();

KStream<String, String> textLines = builder.stream(stringSerde, stringSerde, "word-count-input");

final KGroupedStream<String, String> groupedByWord = textLines
    .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
    .groupBy((key, word) -> word, Grouped.with(stringSerde, stringSerde));

// This call to `count()` creates a state store named "word-count".
// The state store is discoverable and can be queried interactively.
groupedByWord.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>as("word-count"));

// Start an instance of the topology
KafkaStreams streams = new KafkaStreams(builder, props);
streams.start();

// Then, create and start the actual RPC service for remote access to this
// application instance's local state stores.
//
// This service should be started on the same host and port as defined above by
// the property `StreamsConfig.APPLICATION_SERVER_CONFIG`.  The example below is
// fictitious, but we provide end-to-end demo applications (such as KafkaMusicExample)
// that showcase how to implement such a service to get you started.
MyRPCService rpcService = ...;
rpcService.listenAt(rpcEndpoint);

发现和访问应用程序实例及其本地状态存储

以下方法返回 StreamsMetadata 对象,这些对象提供有关应用程序实例的元信息,例如其 RPC 终端节点和本地可用的状态存储。

  • KafkaStreams#allMetadata():查找此应用程序的所有实例
  • KafkaStreams#allMetadataForStore(String storeName):查找管理状态存储 “storeName” 的本地实例的那些应用程序实例
  • KafkaStreams#metadataForKey(String storeName, K key, Serializer<K> keySerializer):使用默认流分区策略,在给定状态存储中查找保存给定键数据的一个应用程序实例
  • KafkaStreams#metadataForKey(String storeName, K key, StreamPartitioner<K, ?> partitioner):使用 ,查找在给定状态存储中保存给定键数据的一个应用程序实例partitioner

注意力

如果未为应用程序实例配置,则上述方法将找不到它的任何 StreamsMetadataapplication.server

例如,我们现在可以找到我们在 上一节中所示的代码示例:StreamsMetadata

KafkaStreams streams = ...;
// Find all the locations of local instances of the state store named "word-count"
Collection<StreamsMetadata> wordCountHosts = streams.allMetadataForStore("word-count");

// For illustrative purposes, we assume using an HTTP client to talk to remote app instances.
HttpClient http = ...;

// Get the word count for word (aka key) 'alice': Approach 1
//
// We first find the one app instance that manages the count for 'alice' in its local state stores.
StreamsMetadata metadata = streams.metadataForKey("word-count", "alice", Serdes.String().serializer());
// Then, we query only that single app instance for the latest count of 'alice'.
// Note: The RPC URL shown below is fictitious and only serves to illustrate the idea.  Ultimately,
// the URL (or, in general, the method of communication) will depend on the RPC layer you opted to
// implement.  Again, we provide end-to-end demo applications (such as KafkaMusicExample) that showcase
// how to implement such an RPC layer.
Long result = http.getLong("http://" + metadata.host() + ":" + metadata.port() + "/word-count/alice");

// Get the word count for word (aka key) 'alice': Approach 2
//
// Alternatively, we could also choose (say) a brute-force approach where we query every app instance
// until we find the one that happens to know about 'alice'.
Optional<Long> result = streams.allMetadataForStore("word-count")
    .stream()
    .map(streamsMetadata -> {
        // Construct the (fictituous) full endpoint URL to query the current remote application instance
        String url = "http://" + streamsMetadata.host() + ":" + streamsMetadata.port() + "/word-count/alice";
        // Read and return the count for 'alice', if any.
        return http.getLong(url);
    })
    .filter(s -> s != null)
    .findFirst();

此时,应用程序的完整状态是交互式可查询的:

  • 您可以发现应用程序的运行实例以及它们在本地管理的状态存储。
  • 通过添加到应用程序的 RPC 层,您可以通过 network 并查询它们的本地可用状态。
  • 应用程序实例能够提供此类查询,因为它们可以直接查询自己的本地状态存储 并通过 RPC 层进行响应。
  • 总的来说,这允许我们查询整个应用程序的完整状态。

要查看具有交互式查询的端到端应用程序,请查看演示应用程序