Skip to content

Commit

Permalink
[Enhancement] Add skip_page_cache and skip_disk_cache options in sess…
Browse files Browse the repository at this point in the history
…ion variables

Signed-off-by: tracymacding <[email protected]>
  • Loading branch information
tracymacding committed Jan 3, 2025
1 parent bc92694 commit df814c4
Show file tree
Hide file tree
Showing 14 changed files with 47 additions and 9 deletions.
6 changes: 4 additions & 2 deletions be/src/connector/lake_connector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -249,9 +249,11 @@ Status LakeDataSource::init_reader_params(const std::vector<OlapScanRange*>& key
_params.skip_aggregation = skip_aggregation;
_params.profile = _runtime_profile;
_params.runtime_state = _runtime_state;
_params.use_page_cache = !config::disable_storage_page_cache && _scan_range.fill_data_cache;
_params.use_page_cache =
!config::disable_storage_page_cache && _scan_range.fill_data_cache && !_scan_range.skip_page_cache;
_params.lake_io_opts.fill_data_cache = _scan_range.fill_data_cache;
_params.runtime_range_pruner = OlapRuntimeScanRangePruner(parser, _conjuncts_manager.unarrived_runtime_filters());
_params.lake_io_opts.skip_disk_cache = _scan_range.skip_disk_cache;
_params.runtime_range_pruner = OlapRuntimeScanRangePruner(parser, _conjuncts_manager->unarrived_runtime_filters());
_params.splitted_scan_rows = _provider->get_splitted_scan_rows();
_params.scan_dop = _provider->get_scan_dop();

Expand Down
2 changes: 2 additions & 0 deletions be/src/fs/fs.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ struct SequentialFileOptions {
// Specify different buffer size for different read scenarios
int64_t buffer_size = -1;
FileEncryptionInfo encryption_info;
bool skip_disk_cache = false;
};

struct RandomAccessFileOptions {
Expand All @@ -99,6 +100,7 @@ struct RandomAccessFileOptions {
// Specify different buffer size for different read scenarios
int64_t buffer_size = -1;
FileEncryptionInfo encryption_info;
bool skip_disk_cache = false;
};

struct DirEntry {
Expand Down
1 change: 1 addition & 0 deletions be/src/fs/fs_starlet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ class StarletFileSystem : public FileSystem {
auto opt = ReadOptions();
opt.skip_fill_local_cache = opts.skip_fill_local_cache;
opt.buffer_size = opts.buffer_size;
opt.skip_read_local_cache = opts.skip_disk_cache;
if (info.size.has_value()) {
opt.file_size = info.size.value();
}
Expand Down
3 changes: 2 additions & 1 deletion be/src/storage/lake/horizontal_compaction_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ Status HorizontalCompactionTask::execute(CancelFunc cancel_func, ThreadPool* flu
reader_params.chunk_size = chunk_size;
reader_params.profile = nullptr;
reader_params.use_page_cache = false;
reader_params.lake_io_opts = {false, config::lake_compaction_stream_buffer_size_bytes};
reader_params.lake_io_opts = {.fill_data_cache = false,
.buffer_size = config::lake_compaction_stream_buffer_size_bytes};
reader_params.column_access_paths = &_column_access_paths;
RETURN_IF_ERROR(reader.open(reader_params));

Expand Down
4 changes: 2 additions & 2 deletions be/src/storage/lake/vertical_compaction_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,8 @@ Status VerticalCompactionTask::compact_column_group(bool is_key, int column_grou
reader_params.profile = nullptr;
reader_params.use_page_cache = false;
reader_params.column_access_paths = &_column_access_paths;
reader_params.lake_io_opts = {config::lake_enable_vertical_compaction_fill_data_cache,
config::lake_compaction_stream_buffer_size_bytes};
reader_params.lake_io_opts = {.fill_data_cache = config::lake_enable_vertical_compaction_fill_data_cache,
.buffer_size = config::lake_compaction_stream_buffer_size_bytes};
RETURN_IF_ERROR(reader.open(reader_params));

CompactionTaskStats prev_stats;
Expand Down
2 changes: 2 additions & 0 deletions be/src/storage/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ struct LakeIOOptions {
// Cache remote file locally on read requests.
// This options can be ignored if the underlying filesystem does not support local cache.
bool fill_data_cache = false;

bool skip_disk_cache = false;
// Specify different buffer size for different read scenarios
int64_t buffer_size = -1;
};
Expand Down
2 changes: 1 addition & 1 deletion be/src/storage/rowset/column_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ struct ColumnIteratorOptions {
bool use_page_cache = false;
// temporary data does not allow caching
bool temporary_data = false;
LakeIOOptions lake_io_opts{.fill_data_cache = true};
LakeIOOptions lake_io_opts{.fill_data_cache = true, .skip_disk_cache = false};

// check whether column pages are all dictionary encoding.
bool check_dict_encoding = false;
Expand Down
3 changes: 2 additions & 1 deletion be/src/storage/rowset/segment_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -562,7 +562,8 @@ Status SegmentIterator::_init_column_iterator_by_cid(const ColumnId cid, const C
iter_opts.has_preaggregation = _opts.has_preaggregation;

RandomAccessFileOptions opts{.skip_fill_local_cache = !_opts.lake_io_opts.fill_data_cache,
.buffer_size = _opts.lake_io_opts.buffer_size};
.buffer_size = _opts.lake_io_opts.buffer_size,
.skip_disk_cache = _opts.lake_io_opts.skip_disk_cache};

ColumnAccessPath* access_path = nullptr;
if (_column_access_paths.find(cid) != _column_access_paths.end()) {
Expand Down
2 changes: 1 addition & 1 deletion be/src/storage/rowset/segment_options.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class SegmentReadOptions {
bool use_page_cache = false;
// temporary data does not allow caching
bool temporary_data = false;
LakeIOOptions lake_io_opts{.fill_data_cache = true};
LakeIOOptions lake_io_opts{.fill_data_cache = true, .skip_disk_cache = false};

ReaderType reader_type = READER_QUERY;
int chunk_size = DEFAULT_CHUNK_SIZE;
Expand Down
2 changes: 1 addition & 1 deletion be/src/storage/rowset/segment_rewriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ Status SegmentRewriter::rewrite_auto_increment_lake(
auto tablet_mgr = tablet->tablet_mgr();
// not fill data and meta cache
auto fill_cache = false;
LakeIOOptions lake_io_opts{fill_cache, -1};
LakeIOOptions lake_io_opts{.fill_data_cache = fill_cache, .buffer_size = -1};
ASSIGN_OR_RETURN(auto segment,
tablet_mgr->load_segment(src, segment_id, &footer_sine_hint, lake_io_opts, fill_cache, tschema));
uint32_t num_rows = segment->num_rows();
Expand Down
3 changes: 3 additions & 0 deletions be/src/storage/tablet_reader_params.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ struct TabletReaderParams {
// Options only applies to cloud-native table r/w IO
LakeIOOptions lake_io_opts{.fill_data_cache = true};

// Disable local disk cache or not
bool skip_disk_cache = false;

RangeStartOperation range = RangeStartOperation::GT;
RangeEndOperation end_range = RangeEndOperation::LT;
std::vector<OlapTuple> start_key;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,8 @@ public void addScanRangeLocations(Partition partition,
long localBeId) throws UserException {
boolean enableQueryTabletAffinity =
ConnectContext.get() != null && ConnectContext.get().getSessionVariable().isEnableQueryTabletAffinity();
boolean skipDiskCache = ConnectContext.get() != null && ConnectContext.get().getSessionVariable().isSkipLocalDiskCache();
boolean skipPageCache = ConnectContext.get() != null && ConnectContext.get().getSessionVariable().isSkipPageCache();
int logNum = 0;
int schemaHash = olapTable.getSchemaHashByIndexId(index.getId());
String schemaHashStr = String.valueOf(schemaHash);
Expand Down Expand Up @@ -632,6 +634,8 @@ public void addScanRangeLocations(Partition partition,
scanRangeLocations.addToLocations(scanRangeLocation);
internalRange.addToHosts(new TNetworkAddress(ip, port));
internalRange.setFill_data_cache(fillDataCache);
internalRange.setSkip_page_cache(skipPageCache);
internalRange.setSkip_disk_cache(skipDiskCache);
tabletIsNull = false;

// for CBO
Expand Down
18 changes: 18 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,10 @@ public class SessionVariable implements Serializable, Writable, Cloneable {

public static final String ENABLE_QUERY_TABLET_AFFINITY = "enable_query_tablet_affinity";

public static final String SKIP_LOCAL_DISK_CACHE = "skip_local_disk_cache";

public static final String SKIP_PAGE_CACHE = "skip_page_cache";

public static final String ENABLE_TABLET_INTERNAL_PARALLEL = "enable_tablet_internal_parallel";
public static final String ENABLE_TABLET_INTERNAL_PARALLEL_V2 = "enable_tablet_internal_parallel_v2";

Expand Down Expand Up @@ -863,6 +867,12 @@ public static MaterializedViewRewriteMode parse(String str) {
@VariableMgr.VarAttr(name = ENABLE_QUERY_TABLET_AFFINITY)
private boolean enableQueryTabletAffinity = false;

@VariableMgr.VarAttr(name = SKIP_LOCAL_DISK_CACHE)
private boolean skipLocalDiskCache = false;

@VariableMgr.VarAttr(name = SKIP_PAGE_CACHE)
private boolean skipPageCache = false;

@VariableMgr.VarAttr(name = RUNTIME_FILTER_SCAN_WAIT_TIME, flag = VariableMgr.INVISIBLE)
private long runtimeFilterScanWaitTime = 20L;

Expand Down Expand Up @@ -2423,6 +2433,14 @@ public boolean isEnableQueryTabletAffinity() {
return enableQueryTabletAffinity;
}

public boolean isSkipLocalDiskCache() {
return skipLocalDiskCache;
}

public boolean isSkipPageCache() {
return skipPageCache;
}

public int getStatisticCollectParallelism() {
return statisticCollectParallelism;
}
Expand Down
4 changes: 4 additions & 0 deletions gensrc/thrift/PlanNodes.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,10 @@ struct TInternalScanRange {
12: optional bool fill_data_cache = true;
// used for per-bucket compute optimize
13: optional i32 bucket_sequence
// skip page cache when access page data
14: optional bool skip_page_cache = false;
// skip local disk data cache when access page data
15: optional bool skip_disk_cache = false;
}

enum TFileFormatType {
Expand Down

0 comments on commit df814c4

Please sign in to comment.