From 59734e8f629973b2130f9e8996089d19b9bd2d5a Mon Sep 17 00:00:00 2001 From: tracymacding Date: Fri, 3 Jan 2025 16:45:47 +0800 Subject: [PATCH] [Enhancement] Add skip_page_cache and skip_disk_cache options in session variables (#54653) Signed-off-by: tracymacding --- be/src/connector/lake_connector.cpp | 4 +++- be/src/fs/fs.h | 2 ++ be/src/fs/fs_starlet.cpp | 1 + .../lake/horizontal_compaction_task.cpp | 3 ++- .../storage/lake/vertical_compaction_task.cpp | 4 ++-- be/src/storage/options.h | 2 ++ be/src/storage/rowset/column_iterator.h | 2 +- be/src/storage/rowset/segment_iterator.cpp | 3 ++- be/src/storage/rowset/segment_options.h | 2 +- be/src/storage/rowset/segment_rewriter.cpp | 2 +- be/src/storage/tablet_reader_params.h | 3 +++ .../com/starrocks/planner/OlapScanNode.java | 4 ++++ .../java/com/starrocks/qe/SessionVariable.java | 18 ++++++++++++++++++ gensrc/thrift/PlanNodes.thrift | 4 ++++ 14 files changed, 46 insertions(+), 8 deletions(-) diff --git a/be/src/connector/lake_connector.cpp b/be/src/connector/lake_connector.cpp index cd119a755904b..76523edd0ed0f 100644 --- a/be/src/connector/lake_connector.cpp +++ b/be/src/connector/lake_connector.cpp @@ -249,8 +249,10 @@ Status LakeDataSource::init_reader_params(const std::vector& key _params.skip_aggregation = skip_aggregation; _params.profile = _runtime_profile; _params.runtime_state = _runtime_state; - _params.use_page_cache = !config::disable_storage_page_cache && _scan_range.fill_data_cache; + _params.use_page_cache = + !config::disable_storage_page_cache && _scan_range.fill_data_cache && !_scan_range.skip_page_cache; _params.lake_io_opts.fill_data_cache = _scan_range.fill_data_cache; + _params.lake_io_opts.skip_disk_cache = _scan_range.skip_disk_cache; _params.runtime_range_pruner = OlapRuntimeScanRangePruner(parser, _conjuncts_manager.unarrived_runtime_filters()); _params.splitted_scan_rows = _provider->get_splitted_scan_rows(); _params.scan_dop = _provider->get_scan_dop(); diff --git a/be/src/fs/fs.h b/be/src/fs/fs.h index 8a2656b3d21d8..211823a60742c 100644 --- a/be/src/fs/fs.h +++ b/be/src/fs/fs.h @@ -90,6 +90,7 @@ struct SequentialFileOptions { // Specify different buffer size for different read scenarios int64_t buffer_size = -1; FileEncryptionInfo encryption_info; + bool skip_disk_cache = false; }; struct RandomAccessFileOptions { @@ -99,6 +100,7 @@ struct RandomAccessFileOptions { // Specify different buffer size for different read scenarios int64_t buffer_size = -1; FileEncryptionInfo encryption_info; + bool skip_disk_cache = false; }; struct DirEntry { diff --git a/be/src/fs/fs_starlet.cpp b/be/src/fs/fs_starlet.cpp index c02c392fc2e42..56c80fb962fd5 100644 --- a/be/src/fs/fs_starlet.cpp +++ b/be/src/fs/fs_starlet.cpp @@ -291,6 +291,7 @@ class StarletFileSystem : public FileSystem { auto opt = ReadOptions(); opt.skip_fill_local_cache = opts.skip_fill_local_cache; opt.buffer_size = opts.buffer_size; + opt.skip_read_local_cache = opts.skip_disk_cache; if (info.size.has_value()) { opt.file_size = info.size.value(); } diff --git a/be/src/storage/lake/horizontal_compaction_task.cpp b/be/src/storage/lake/horizontal_compaction_task.cpp index 7a2fa16dc8d78..b7a5ac6ea8273 100644 --- a/be/src/storage/lake/horizontal_compaction_task.cpp +++ b/be/src/storage/lake/horizontal_compaction_task.cpp @@ -50,7 +50,8 @@ Status HorizontalCompactionTask::execute(CancelFunc cancel_func, ThreadPool* flu reader_params.chunk_size = chunk_size; reader_params.profile = nullptr; reader_params.use_page_cache = false; - reader_params.lake_io_opts = {false, config::lake_compaction_stream_buffer_size_bytes}; + reader_params.lake_io_opts = {.fill_data_cache = false, + .buffer_size = config::lake_compaction_stream_buffer_size_bytes}; reader_params.column_access_paths = &_column_access_paths; RETURN_IF_ERROR(reader.open(reader_params)); diff --git a/be/src/storage/lake/vertical_compaction_task.cpp b/be/src/storage/lake/vertical_compaction_task.cpp index 97c76deeabeaf..0363fd3807b3d 100644 --- a/be/src/storage/lake/vertical_compaction_task.cpp +++ b/be/src/storage/lake/vertical_compaction_task.cpp @@ -158,8 +158,8 @@ Status VerticalCompactionTask::compact_column_group(bool is_key, int column_grou reader_params.profile = nullptr; reader_params.use_page_cache = false; reader_params.column_access_paths = &_column_access_paths; - reader_params.lake_io_opts = {config::lake_enable_vertical_compaction_fill_data_cache, - config::lake_compaction_stream_buffer_size_bytes}; + reader_params.lake_io_opts = {.fill_data_cache = config::lake_enable_vertical_compaction_fill_data_cache, + .buffer_size = config::lake_compaction_stream_buffer_size_bytes}; RETURN_IF_ERROR(reader.open(reader_params)); CompactionTaskStats prev_stats; diff --git a/be/src/storage/options.h b/be/src/storage/options.h index 42f7fb85dfef1..ed817b5dfaebc 100644 --- a/be/src/storage/options.h +++ b/be/src/storage/options.h @@ -75,6 +75,8 @@ struct LakeIOOptions { // Cache remote file locally on read requests. // This options can be ignored if the underlying filesystem does not support local cache. bool fill_data_cache = false; + + bool skip_disk_cache = false; // Specify different buffer size for different read scenarios int64_t buffer_size = -1; }; diff --git a/be/src/storage/rowset/column_iterator.h b/be/src/storage/rowset/column_iterator.h index 11f4ff8190d88..3c311c528fc6d 100644 --- a/be/src/storage/rowset/column_iterator.h +++ b/be/src/storage/rowset/column_iterator.h @@ -62,7 +62,7 @@ struct ColumnIteratorOptions { bool use_page_cache = false; // temporary data does not allow caching bool temporary_data = false; - LakeIOOptions lake_io_opts{.fill_data_cache = true}; + LakeIOOptions lake_io_opts{.fill_data_cache = true, .skip_disk_cache = false}; // check whether column pages are all dictionary encoding. bool check_dict_encoding = false; diff --git a/be/src/storage/rowset/segment_iterator.cpp b/be/src/storage/rowset/segment_iterator.cpp index 3253b416417bb..b538f05d0e3b5 100644 --- a/be/src/storage/rowset/segment_iterator.cpp +++ b/be/src/storage/rowset/segment_iterator.cpp @@ -562,7 +562,8 @@ Status SegmentIterator::_init_column_iterator_by_cid(const ColumnId cid, const C iter_opts.has_preaggregation = _opts.has_preaggregation; RandomAccessFileOptions opts{.skip_fill_local_cache = !_opts.lake_io_opts.fill_data_cache, - .buffer_size = _opts.lake_io_opts.buffer_size}; + .buffer_size = _opts.lake_io_opts.buffer_size, + .skip_disk_cache = _opts.lake_io_opts.skip_disk_cache}; ColumnAccessPath* access_path = nullptr; if (_column_access_paths.find(cid) != _column_access_paths.end()) { diff --git a/be/src/storage/rowset/segment_options.h b/be/src/storage/rowset/segment_options.h index af479e91f6c8e..840e6e7f2aeec 100644 --- a/be/src/storage/rowset/segment_options.h +++ b/be/src/storage/rowset/segment_options.h @@ -75,7 +75,7 @@ class SegmentReadOptions { bool use_page_cache = false; // temporary data does not allow caching bool temporary_data = false; - LakeIOOptions lake_io_opts{.fill_data_cache = true}; + LakeIOOptions lake_io_opts{.fill_data_cache = true, .skip_disk_cache = false}; ReaderType reader_type = READER_QUERY; int chunk_size = DEFAULT_CHUNK_SIZE; diff --git a/be/src/storage/rowset/segment_rewriter.cpp b/be/src/storage/rowset/segment_rewriter.cpp index 0e065f0fabe2d..5380f419472a4 100644 --- a/be/src/storage/rowset/segment_rewriter.cpp +++ b/be/src/storage/rowset/segment_rewriter.cpp @@ -213,7 +213,7 @@ Status SegmentRewriter::rewrite_auto_increment_lake( auto tablet_mgr = tablet->tablet_mgr(); // not fill data and meta cache auto fill_cache = false; - LakeIOOptions lake_io_opts{fill_cache, -1}; + LakeIOOptions lake_io_opts{.fill_data_cache = fill_cache, .buffer_size = -1}; ASSIGN_OR_RETURN(auto segment, tablet_mgr->load_segment(src, segment_id, &footer_sine_hint, lake_io_opts, fill_cache, tschema)); uint32_t num_rows = segment->num_rows(); diff --git a/be/src/storage/tablet_reader_params.h b/be/src/storage/tablet_reader_params.h index eeba0678c128f..9b9b61d373c02 100644 --- a/be/src/storage/tablet_reader_params.h +++ b/be/src/storage/tablet_reader_params.h @@ -62,6 +62,9 @@ struct TabletReaderParams { // Options only applies to cloud-native table r/w IO LakeIOOptions lake_io_opts{.fill_data_cache = true}; + // Disable local disk cache or not + bool skip_disk_cache = false; + RangeStartOperation range = RangeStartOperation::GT; RangeEndOperation end_range = RangeEndOperation::LT; std::vector start_key; diff --git a/fe/fe-core/src/main/java/com/starrocks/planner/OlapScanNode.java b/fe/fe-core/src/main/java/com/starrocks/planner/OlapScanNode.java index 7a55fea7b00f3..704d1568a0396 100644 --- a/fe/fe-core/src/main/java/com/starrocks/planner/OlapScanNode.java +++ b/fe/fe-core/src/main/java/com/starrocks/planner/OlapScanNode.java @@ -529,6 +529,8 @@ public void addScanRangeLocations(Partition partition, long localBeId) throws UserException { boolean enableQueryTabletAffinity = ConnectContext.get() != null && ConnectContext.get().getSessionVariable().isEnableQueryTabletAffinity(); + boolean skipDiskCache = ConnectContext.get() != null && ConnectContext.get().getSessionVariable().isSkipLocalDiskCache(); + boolean skipPageCache = ConnectContext.get() != null && ConnectContext.get().getSessionVariable().isSkipPageCache(); int logNum = 0; int schemaHash = olapTable.getSchemaHashByIndexId(index.getId()); String schemaHashStr = String.valueOf(schemaHash); @@ -632,6 +634,8 @@ public void addScanRangeLocations(Partition partition, scanRangeLocations.addToLocations(scanRangeLocation); internalRange.addToHosts(new TNetworkAddress(ip, port)); internalRange.setFill_data_cache(fillDataCache); + internalRange.setSkip_page_cache(skipPageCache); + internalRange.setSkip_disk_cache(skipDiskCache); tabletIsNull = false; // for CBO diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java b/fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java index bdbc73b85c8d2..548277d77e853 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java @@ -264,6 +264,10 @@ public class SessionVariable implements Serializable, Writable, Cloneable { public static final String ENABLE_QUERY_TABLET_AFFINITY = "enable_query_tablet_affinity"; + public static final String SKIP_LOCAL_DISK_CACHE = "skip_local_disk_cache"; + + public static final String SKIP_PAGE_CACHE = "skip_page_cache"; + public static final String ENABLE_TABLET_INTERNAL_PARALLEL = "enable_tablet_internal_parallel"; public static final String ENABLE_TABLET_INTERNAL_PARALLEL_V2 = "enable_tablet_internal_parallel_v2"; @@ -863,6 +867,12 @@ public static MaterializedViewRewriteMode parse(String str) { @VariableMgr.VarAttr(name = ENABLE_QUERY_TABLET_AFFINITY) private boolean enableQueryTabletAffinity = false; + @VariableMgr.VarAttr(name = SKIP_LOCAL_DISK_CACHE) + private boolean skipLocalDiskCache = false; + + @VariableMgr.VarAttr(name = SKIP_PAGE_CACHE) + private boolean skipPageCache = false; + @VariableMgr.VarAttr(name = RUNTIME_FILTER_SCAN_WAIT_TIME, flag = VariableMgr.INVISIBLE) private long runtimeFilterScanWaitTime = 20L; @@ -2423,6 +2433,14 @@ public boolean isEnableQueryTabletAffinity() { return enableQueryTabletAffinity; } + public boolean isSkipLocalDiskCache() { + return skipLocalDiskCache; + } + + public boolean isSkipPageCache() { + return skipPageCache; + } + public int getStatisticCollectParallelism() { return statisticCollectParallelism; } diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 32eaa0698d825..7dc8e961ad6bf 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -133,6 +133,10 @@ struct TInternalScanRange { 12: optional bool fill_data_cache = true; // used for per-bucket compute optimize 13: optional i32 bucket_sequence + // skip page cache when access page data + 14: optional bool skip_page_cache = false; + // skip local disk data cache when access page data + 15: optional bool skip_disk_cache = false; } enum TFileFormatType {