内存管理
您可以指定用于内部缓存和压缩记录的总内存 (RAM) 大小。此缓存发生 在将记录写入 State Store 或转发到下游其他节点之前。
记录缓存在 DSL 和处理器 API 中的实现略有不同。
在 DSL 中记录缓存
您可以指定处理拓扑实例的记录缓存的总内存 (RAM) 大小。它是杠杆的
通过以下实例:KTable
- Source :通过 或 创建的实例。
KTable
KTable
StreamsBuilder#table()
StreamsBuilder#globalTable()
- Aggregation :作为聚合的结果创建的实例。
KTable
KTable
对于此类实例,记录缓存用于:KTable
使用以下示例了解使用和不使用记录缓存的行为。在此示例中,输入是 a 和记录 。此示例的重点是
在具有键 == 的记录上。KStream<String, Integer>
<K,V>: <A, 1>, <D, 5>, <A, 20>, <A, 300>
A
聚合计算 input 并返回一个 .
KTable<String, Integer>
- Without caching:为 key 发出一系列输出记录,这些记录表示
生成的聚合表。括号 () 表示更改,左侧数字是新的聚合值
正确的数字是旧的聚合值: 。
A
()
<A, (1, null)>, <A, (21, 1)>, <A, (321, 21)>
- 使用缓存:为可能在缓存中压缩的 key 发出单个输出记录,
导致单个输出记录 。此记录将写入聚合的内部状态
store 和 forwarded 到任何下游操作。
A
<A, (321, null)>
- Without caching:为 key 发出一系列输出记录,这些记录表示
生成的聚合表。括号 () 表示更改,左侧数字是新的聚合值
正确的数字是旧的聚合值: 。
缓存大小是通过参数指定的,该参数是
处理拓扑:cache.max.bytes.buffering
// Enable record cache of size 10 MB.
Properties props = new Properties();
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 * 1024L);
此参数控制为缓存分配的字节数。具体来说,对于具有分配用于缓存的线程和字节的处理器拓扑实例,每个线程都将有一个偶数字节来构造自己的
cache 并在其任务中根据需要使用这意味着有多少线程就有多少缓存,但没有共享
跨线程缓存时有发生。T
C
C/T
缓存的基本 API 由 和 calls 组成。记录包括
在达到缓存大小后使用简单的 LRU 方案逐出。键控记录首次在节点完成处理时,它会在缓存中标记为 dirty。具有
在该时间段内在该节点上处理的相同密钥将覆盖 ,这称为
“正在压缩”。这与 Kafka 的日志压缩具有相同的效果,但发生得更早,而
记录仍在内存中,并且位于客户端应用程序中,而不是在服务器端(即 Kafka
broker) 的 broker)刷新后,将转发到下一个处理节点,然后写入本地状态存储。put()
get()
R1 = <K1, V1>
R2 = <K1, V2>
K1
<K1, V1>
R2
缓存的语义是将数据刷新到状态存储并转发到下一个下游处理器节点
每当最早的 OR (缓存压力) 命中时。both 和 都是全局参数。因此,无法指定
各个节点的参数不同。commit.interval.ms
cache.max.bytes.buffering
commit.interval.ms
cache.max.bytes.buffering
以下是基于所需场景的两个参数的示例设置。
要关闭缓存,缓存大小可以设置为零:
// Disable record cache Properties props = new Properties(); props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
要启用缓存,但仍然对记录的缓存时间有上限,您可以设置提交间隔。在此示例中,它设置为 1000 毫秒:
Properties props = new Properties(); // Enable record cache of size 10 MB. props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 * 1024L); // Set commit interval to 1 second. props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
这两种配置的效果如下图所示。记录使用 4 个键显示:蓝色、红色、黄色和绿色。假设缓存只有 3 个键的空间。
禁用缓存 (a) 时,将输出所有输入记录。
启用缓存时 (b):
- 大多数记录在提交间隔结束时输出(例如,输出单个蓝色记录,这是该时间之前蓝色键的最终覆盖)。
t1
- 由于缓存压力(即在 commit 间隔结束之前),某些记录会输出。例如,请参阅 前面的红色记录。对于较小的缓存大小,我们预计缓存压力将是决定何时输出记录的主要因素。对于较大的缓存大小,提交间隔将是主要因素。
t2
- 输出的记录总数已从 15 条减少到 8 条。
- 大多数记录在提交间隔结束时输出(例如,输出单个蓝色记录,这是该时间之前蓝色键的最终覆盖)。
在处理器 API 中记录缓存
您可以指定处理拓扑实例的记录缓存的总内存 (RAM) 大小。它被使用 用于在输出记录从有状态处理器节点写入其 状态存储。
处理器 API 中的记录缓存不会缓存或压缩向下游转发的任何输出记录。 这意味着所有下游处理器节点都可以看到所有记录,而状态存储看到的记录数量减少。 这不会影响系统的正确性,但这是对 state store 的性能优化。例如,使用 处理器 API,您可以将记录存储在状态存储中,同时向下游转发不同的值。
根据 状态存储 部分中首次显示的示例,要禁用缓存,您可以
添加调用(请注意,默认情况下启用缓存,但有一个显式调用)。withCachingDisabled
withCachingEnabled
StoreBuilder countStoreBuilder =
Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("Counts"),
Serdes.String(),
Serdes.Long())
.withCachingEnabled();
版本控制状态存储不支持记录缓存。
为避免读取过时的数据,您可以在创建迭代器之前进行 store。
请注意,如果使用 RocksDB,过于频繁的 flush 会导致性能下降,因此我们建议通常避免手动 flush。flush()
RocksDB 数据库
RocksDB 的每个实例都为块缓存、索引和过滤块以及 memtable(写入缓冲区)分配堆外内存。关键配置(适用于 RocksDB 版本 4.1.0)包括 和 .这些可以通过配置指定。block_cache_size
write_buffer_size
max_write_buffer_number
rocksdb.config.setter
此外,我们建议更改 RocksDB 的默认内存分配器,因为默认分配器可能会导致内存消耗增加。
要将内存分配器更改为 ,您需要在启动 Kafka Streams 应用程序之前设置环境变量:jemalloc
LD_PRELOAD
# example: install jemalloc (on Debian)
$ apt install -y libjemalloc-dev
# set LD_PRELOAD before you start your Kafka Streams application
$ export LD_PRELOAD="/usr/lib/x86_64-linux-gnu/libjemalloc.so"
从 2.3.0 开始,所有实例的内存使用量都可以受到限制,从而限制 Kafka Streams 应用程序的总堆外内存。为此,您必须配置 RocksDB 以缓存块缓存中的索引和筛选块,通过共享 WriteBufferManager 限制 memtable 内存,并根据块缓存对其内存进行计数,然后将相同的 Cache 对象传递给每个实例。有关详细信息,请参阅 RocksDB 内存使用情况。实现此目的的示例 RocksDBConfigSetter 如下所示:
public static class BoundedMemoryRocksDBConfig implements RocksDBConfigSetter { private static org.rocksdb.Cache cache = new org.rocksdb.LRUCache(TOTAL_OFF_HEAP_MEMORY, -1, false, INDEX_FILTER_BLOCK_RATIO);
1private static org.rocksdb.WriteBufferManager writeBufferManager = new org.rocksdb.WriteBufferManager(TOTAL_MEMTABLE_MEMORY, cache); @Override public void setConfig(final String storeName, final Options options, final Map<String, Object> configs) { BlockBasedTableConfig tableConfig = (BlockBasedTableConfig) options.tableFormatConfig(); // These three options in combination will limit the memory used by RocksDB to the size passed to the block cache (TOTAL_OFF_HEAP_MEMORY) tableConfig.setBlockCache(cache); tableConfig.setCacheIndexAndFilterBlocks(true); options.setWriteBufferManager(writeBufferManager); // These options are recommended to be set when bounding the total memory tableConfig.setCacheIndexAndFilterBlocksWithHighPriority(true);
2tableConfig.setPinTopLevelIndexAndFilter(true); tableConfig.setBlockSize(BLOCK_SIZE);
3options.setMaxWriteBufferNumber(N_MEMTABLES); options.setWriteBufferSize(MEMTABLE_SIZE); options.setTableFormatConfig(tableConfig); } @Override public void close(final String storeName, final Options options) { // Cache and WriteBufferManager should not be closed here, as the same objects are shared by every store instance. } }
2. 必须设置此项才能使 INDEX_FILTER_BLOCK_RATIO 生效(参见脚注 1),如 RocksDB 文档中所述
3. 您可能希望根据 RocksDB 文档中的这些说明修改默认块大小。较大的块大小意味着索引块会更小,但缓存的数据块可能包含更多的冷数据,否则这些数据将被逐出。
- Note: 虽然我们建议至少设置上述配置,但产生最佳性能的特定选项取决于工作负载,您应该考虑尝试这些选项,以确定适合您特定用例的最佳选择。请记住,一个应用程序的最佳配置可能不适用于具有不同拓扑或输入主题的应用程序。 除了上面推荐的配置之外,你可能还需要考虑使用 RocksDB 文档中描述的分区索引过滤器。
其他内存使用情况
Apache Kafka 中还有其他模块可以在运行时分配内存。它们包括以下内容:
- 生产者缓冲,由 producer config 管理。
buffer.memory
- Consumer buffering,目前没有严格管理,但可以通过 fetch size 间接控制,即 和 .
fetch.max.bytes
fetch.max.wait.ms
- 生产者和消费者也有单独的 TCP 发送/接收缓冲区,这些缓冲区不算作缓冲内存。
这些由 / 配置控制。
send.buffer.bytes
receive.buffer.bytes
- 反序列化对象缓冲:返回记录后,它们将被反序列化以提取
timestamp 并在 streams 空间中缓冲。目前,这只是由 间接控制的。
consumer.poll()
buffered.records.per.partition
提示
迭代器应该显式关闭以释放资源:存储迭代器(例如 和 )必须在完成时显式关闭,以释放资源,例如打开的文件处理程序和内存中读取缓冲区,或者对此 Closeable 类使用 try-with-resources 语句(自 JDK7 起可用)。KeyValueIterator
WindowStoreIterator
否则,流应用程序的内存使用量在运行时会不断增加,直到达到 OOM。