From eaabaab84fa59b43dc2523064ae94cad8ed6e0bb Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Mon, 20 May 2024 16:07:35 +0100 Subject: [PATCH] Allow configuration of heap and vheap sizes for WAL and servers --- src/ra.hrl | 1 + src/ra_log_sup.erl | 2 ++ src/ra_log_wal.erl | 4 ++++ src/ra_server_proc.erl | 5 ++++- src/ra_system.erl | 6 ++++++ 5 files changed, 17 insertions(+), 1 deletion(-) diff --git a/src/ra.hrl b/src/ra.hrl index 8c85f849..9d46ec21 100644 --- a/src/ra.hrl +++ b/src/ra.hrl @@ -191,6 +191,7 @@ -define(WAL_DEFAULT_MAX_SIZE_BYTES, 256 * 1000 * 1000). -define(WAL_DEFAULT_MAX_BATCH_SIZE, 8192). -define(MIN_BIN_VHEAP_SIZE, 46422). +-define(MIN_HEAP_SIZE, 233). %% define a minimum allowable wal size. If anyone tries to set a really small %% size that is smaller than the logical block size the pre-allocation code may %% fail diff --git a/src/ra_log_sup.erl b/src/ra_log_sup.erl index da1f8c31..b5e28ebb 100644 --- a/src/ra_log_sup.erl +++ b/src/ra_log_sup.erl @@ -72,6 +72,7 @@ make_wal_conf(#{data_dir := DataDir, PreAlloc = maps:get(wal_pre_allocate, Cfg, false), MinBinVheapSize = maps:get(wal_min_bin_vheap_size, Cfg, ?MIN_BIN_VHEAP_SIZE), + MinHeapSize = maps:get(wal_min_heap_size, Cfg, ?MIN_HEAP_SIZE), CompressMemTables = maps:get(compress_mem_tables, Cfg, false), #{name => WalName, names => Names, @@ -86,6 +87,7 @@ make_wal_conf(#{data_dir := DataDir, hibernate_after => HibAfter, garbage_collect => Gc, pre_allocate => PreAlloc, + min_heap_size => MinHeapSize, min_bin_vheap_size => MinBinVheapSize, compress_mem_tables => CompressMemTables }. diff --git a/src/ra_log_wal.erl b/src/ra_log_wal.erl index e22acee0..3affd577 100644 --- a/src/ra_log_wal.erl +++ b/src/ra_log_wal.erl @@ -135,6 +135,7 @@ hibernate_after => non_neg_integer(), max_batch_size => non_neg_integer(), garbage_collect => boolean(), + min_heap_size => non_neg_integer(), min_bin_vheap_size => non_neg_integer(), compress_mem_tables => boolean() }. @@ -242,6 +243,7 @@ init(#{dir := Dir} = Conf0) -> write_strategy := WriteStrategy, sync_method := SyncMethod, garbage_collect := Gc, + min_heap_size := MinHeapSize, min_bin_vheap_size := MinBinVheapSize, compress_mem_tables := CompressMemTables, names := #{wal := WalName, @@ -256,6 +258,7 @@ init(#{dir := Dir} = Conf0) -> % writers process_flag(message_queue_data, off_heap), process_flag(min_bin_vheap_size, MinBinVheapSize), + process_flag(min_heap_size, MinHeapSize), CRef = ra_counters:new(WalName, ?COUNTER_FIELDS), % wait for the segment writer to process anything in flight ok = ra_log_segment_writer:await(SegWriter), @@ -971,6 +974,7 @@ merge_conf_defaults(Conf) -> garbage_collect => false, sync_method => datasync, min_bin_vheap_size => ?MIN_BIN_VHEAP_SIZE, + min_heap_size => ?MIN_HEAP_SIZE, compress_mem_tables => false}, Conf). to_binary(Term) -> diff --git a/src/ra_server_proc.erl b/src/ra_server_proc.erl index 789d3cfe..88cd1f7d 100644 --- a/src/ra_server_proc.erl +++ b/src/ra_server_proc.erl @@ -288,9 +288,12 @@ do_init(#{id := Id, system_config := SysConf} = maps:merge(config_defaults(Id), Config0), MsgQData = maps:get(message_queue_data, SysConf, off_heap), - MinBinVheapSize = maps:get(server_min_bin_vheap_size, SysConf, ?MIN_BIN_VHEAP_SIZE), + MinBinVheapSize = maps:get(server_min_bin_vheap_size, SysConf, + ?MIN_BIN_VHEAP_SIZE), + MinHeapSize = maps:get(server_min_heap_size, SysConf, ?MIN_BIN_VHEAP_SIZE), process_flag(message_queue_data, MsgQData), process_flag(min_bin_vheap_size, MinBinVheapSize), + process_flag(min_heap_size, MinHeapSize), #{cluster := Cluster} = ServerState = ra_server:init(Config), LogId = ra_server:log_id(ServerState), UId = ra_server:uid(ServerState), diff --git a/src/ra_system.erl b/src/ra_system.erl index 874b1b2d..54b9722c 100644 --- a/src/ra_system.erl +++ b/src/ra_system.erl @@ -50,8 +50,10 @@ default_max_pipeline_count => non_neg_integer(), default_max_append_entries_rpc_batch_size => non_neg_integer(), message_queue_data => on_heap | off_heap, + wal_min_heap_size => non_neg_integer(), wal_min_bin_vheap_size => non_neg_integer(), server_min_bin_vheap_size => non_neg_integer(), + server_min_heap_size => non_neg_integer(), compress_mem_tables => boolean(), low_priority_commands_flush_size => non_neg_integer(), low_priority_commands_in_memory_size => non_neg_integer(), @@ -97,6 +99,8 @@ default_config() -> WalPreAllocate = application:get_env(ra, wal_pre_allocate, false), WalMinBinVheapSize = application:get_env(ra, wal_min_bin_vheap_size, ?MIN_BIN_VHEAP_SIZE), + WalMinHeapSize = application:get_env(ra, wal_min_heap_size, ?MIN_HEAP_SIZE), + ServerMinHeapSize = application:get_env(ra, server_min_heap_size, ?MIN_HEAP_SIZE), ServerMinBinVheapSize = application:get_env(ra, server_min_bin_vheap_size, ?MIN_BIN_VHEAP_SIZE), DefaultMaxPipelineCount = application:get_env(ra, default_max_pipeline_count, @@ -124,6 +128,7 @@ default_config() -> wal_garbage_collect => WalGarbageCollect, wal_pre_allocate => WalPreAllocate, wal_sync_method => WalSyncMethod, + wal_min_heap_size => WalMinHeapSize, wal_min_bin_vheap_size => WalMinBinVheapSize, segment_max_entries => SegmentMaxEntries, segment_max_pending => SegmentMaxPending, @@ -134,6 +139,7 @@ default_config() -> compress_mem_tables => CompressMemTables, snapshot_chunk_size => SnapshotChunkSize, server_min_bin_vheap_size => ServerMinBinVheapSize, + server_min_heap_size => ServerMinHeapSize, receive_snapshot_timeout => ReceiveSnapshotTimeout, low_priority_commands_flush_size => LowPriorityCommandsFlushSize, low_priority_commands_in_memory_size => LowPriorityInMemSize,