Skip to content

Commit

Permalink
[BugFix] Fix the bug of decode min/max value in null page stats (#54196)
Browse files Browse the repository at this point in the history
Signed-off-by: trueeyu <[email protected]>
  • Loading branch information
trueeyu authored Dec 23, 2024
1 parent 67efdf0 commit 33da390
Show file tree
Hide file tree
Showing 15 changed files with 1,028 additions and 1,185 deletions.
46 changes: 32 additions & 14 deletions be/src/formats/parquet/file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -311,23 +311,35 @@ bool FileReader::_filter_group_with_more_filter(const GroupReaderPtr& group_read
return true;
}
} else if (filter_type == StatisticsHelper::StatSupportedFilter::FILTER_IN) {
if (!column_meta->statistics.__isset.null_count) continue;

std::vector<string> min_values;
std::vector<string> max_values;
std::vector<int64_t> null_counts;
std::vector<bool> null_pages;
int64_t num_rows = group_reader->get_row_group_metadata()->num_rows;

const ParquetField* field = group_reader->get_column_parquet_field(slot->id());
if (field == nullptr) {
LOG(WARNING) << "Can't get " + slot->col_name() + "'s ParquetField in _read_min_max_chunk.";
continue;
}
auto st = StatisticsHelper::get_min_max_value(_file_metadata.get(), slot->type(), column_meta,
field, min_values, max_values);
if (!st.ok()) continue;
st = StatisticsHelper::get_null_counts(column_meta, null_counts);
if (!st.ok()) continue;
Status st;

null_counts.emplace_back(column_meta->statistics.null_count);
null_pages.emplace_back(num_rows == column_meta->statistics.null_count);
if (num_rows == column_meta->statistics.null_count) {
min_values.emplace_back("");
max_values.emplace_back("");
} else {
st = StatisticsHelper::get_min_max_value(_file_metadata.get(), slot->type(), column_meta, field,
min_values, max_values);
if (!st.ok()) continue;
}

Filter selected(min_values.size(), 1);
st = StatisticsHelper::in_filter_on_min_max_stat(min_values, max_values, null_counts, ctx, field,
_scanner_ctx->timezone, selected);
st = StatisticsHelper::in_filter_on_min_max_stat(min_values, max_values, null_pages, null_counts,
ctx, field, _scanner_ctx->timezone, selected);
if (!st.ok()) continue;
if (!selected[0]) {
return true;
Expand Down Expand Up @@ -403,11 +415,6 @@ Status FileReader::_read_has_nulls(const GroupReaderPtr& group_reader, const std
// statistics not exist in parquet file
return Status::Aborted("No exist statistics");
} else {
const ParquetField* field = group_reader->get_column_parquet_field(slot->id());
if (field == nullptr) {
LOG(WARNING) << "Can't get " + slot->col_name() + "'s ParquetField in _read_has_nulls.";
return Status::InternalError(strings::Substitute("Can't get $0 field", slot->col_name()));
}
RETURN_IF_ERROR(StatisticsHelper::get_has_nulls(column_meta, *has_nulls));
}
}
Expand Down Expand Up @@ -448,8 +455,18 @@ Status FileReader::_read_min_max_chunk(const GroupReaderPtr& group_reader, const
// statistics not exist in parquet file
return Status::Aborted("No exist statistics");
} else {
size_t num_rows = group_reader->get_row_group_metadata()->num_rows;
std::vector<string> min_values;
std::vector<string> max_values;
std::vector<bool> null_pages;

// If all values of one group is null, the statistics is like this:
// max=<null>, min=<null>, null_count=3, distinct_count=<null>, max_value=<null>, min_value=<null>
if (column_meta->statistics.__isset.null_count && column_meta->statistics.null_count == num_rows) {
(*min_chunk)->columns()[i]->append_nulls(1);
(*max_chunk)->columns()[i]->append_nulls(1);
continue;
}

const ParquetField* field = group_reader->get_column_parquet_field(slot->id());
if (field == nullptr) {
Expand All @@ -459,10 +476,11 @@ Status FileReader::_read_min_max_chunk(const GroupReaderPtr& group_reader, const

RETURN_IF_ERROR(StatisticsHelper::get_min_max_value(_file_metadata.get(), slot->type(), column_meta, field,
min_values, max_values));
null_pages.emplace_back(false);
RETURN_IF_ERROR(StatisticsHelper::decode_value_into_column((*min_chunk)->columns()[i], min_values,
slot->type(), field, ctx.timezone));
null_pages, slot->type(), field, ctx.timezone));
RETURN_IF_ERROR(StatisticsHelper::decode_value_into_column((*max_chunk)->columns()[i], max_values,
slot->type(), field, ctx.timezone));
null_pages, slot->type(), field, ctx.timezone));
}
}

Expand Down
22 changes: 13 additions & 9 deletions be/src/formats/parquet/page_index_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ Status PageIndexReader::_deal_with_min_max_conjuncts(const std::vector<ExprConte
ColumnPtr max_column = ColumnHelper::create_column(type, true);
max_chunk->append_column(max_column, id);
// deal with min_values
auto st = StatisticsHelper::decode_value_into_column(min_column, column_index.min_values, type,
_column_readers.at(id)->get_column_parquet_field(),
auto st = StatisticsHelper::decode_value_into_column(min_column, column_index.min_values, column_index.null_pages,
type, _column_readers.at(id)->get_column_parquet_field(),
_group_reader->_param.timezone);
if (!st.ok()) {
// swallow error status
Expand All @@ -107,7 +107,7 @@ Status PageIndexReader::_deal_with_min_max_conjuncts(const std::vector<ExprConte
}

// deal with max_values
st = StatisticsHelper::decode_value_into_column(max_column, column_index.max_values, type,
st = StatisticsHelper::decode_value_into_column(max_column, column_index.max_values, column_index.null_pages, type,
_column_readers.at(id)->get_column_parquet_field(),
_group_reader->_param.timezone);
if (!st.ok()) {
Expand Down Expand Up @@ -178,13 +178,17 @@ Status PageIndexReader::_deal_with_more_conjunct(const std::vector<ExprContext*>
}
}
} else if (filter_type == StatisticsHelper::StatSupportedFilter::FILTER_IN) {
RETURN_IF_ERROR(StatisticsHelper::in_filter_on_min_max_stat(
column_index.min_values, column_index.max_values, column_index.null_counts, ctx, field,
timezone, page_filter));
if (column_index.__isset.null_counts) {
RETURN_IF_ERROR(StatisticsHelper::in_filter_on_min_max_stat(
column_index.min_values, column_index.max_values, column_index.null_pages,
column_index.null_counts, ctx, field, timezone, page_filter));
}
} else if (filter_type == StatisticsHelper::StatSupportedFilter::RF_MIN_MAX) {
RETURN_IF_ERROR(StatisticsHelper::min_max_filter_on_min_max_stat(
column_index.min_values, column_index.max_values, column_index.null_counts, ctx, field,
timezone, page_filter));
if (column_index.__isset.null_counts) {
RETURN_IF_ERROR(StatisticsHelper::min_max_filter_on_min_max_stat(
column_index.min_values, column_index.max_values, column_index.null_pages,
column_index.null_counts, ctx, field, timezone, page_filter));
}
}
}
}
Expand Down
13 changes: 8 additions & 5 deletions be/src/formats/parquet/scalar_column_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,8 @@ StatusOr<bool> ScalarColumnReader::row_group_zone_map_filter(const std::vector<c
if (get_chunk_metadata()->meta_data.statistics.__isset.null_count) {
has_null = get_chunk_metadata()->meta_data.statistics.null_count > 0;
is_all_null = get_chunk_metadata()->meta_data.statistics.null_count == rg_num_rows;
} else {
return true;
}

std::optional<ZoneMapDetail> zone_map_detail = std::nullopt;
Expand All @@ -306,13 +308,14 @@ StatusOr<bool> ScalarColumnReader::row_group_zone_map_filter(const std::vector<c
} else {
std::vector<string> min_values;
std::vector<string> max_values;
std::vector<bool> null_pages{false};
Status st =
StatisticsHelper::get_min_max_value(_opts.file_meta_data, *_col_type, &get_chunk_metadata()->meta_data,
get_column_parquet_field(), min_values, max_values);
if (st.ok()) {
RETURN_IF_ERROR(StatisticsHelper::decode_value_into_column(min_column, min_values, *_col_type,
RETURN_IF_ERROR(StatisticsHelper::decode_value_into_column(min_column, min_values, null_pages, *_col_type,
get_column_parquet_field(), _opts.timezone));
RETURN_IF_ERROR(StatisticsHelper::decode_value_into_column(max_column, max_values, *_col_type,
RETURN_IF_ERROR(StatisticsHelper::decode_value_into_column(max_column, max_values, null_pages, *_col_type,
get_column_parquet_field(), _opts.timezone));

zone_map_detail = ZoneMapDetail{min_column->get(0), max_column->get(0), has_null};
Expand Down Expand Up @@ -355,19 +358,20 @@ StatusOr<bool> ScalarColumnReader::page_index_zone_map_filter(const std::vector<
ASSIGN_OR_RETURN(const tparquet::OffsetIndex* offset_index, get_offset_index(rg_first_row));

const size_t page_num = column_index.min_values.size();
const std::vector<bool> null_pages = column_index.null_pages;

ColumnPtr min_column = ColumnHelper::create_column(*_col_type, true);
ColumnPtr max_column = ColumnHelper::create_column(*_col_type, true);
// deal with min_values
auto st = StatisticsHelper::decode_value_into_column(min_column, column_index.min_values, *_col_type,
auto st = StatisticsHelper::decode_value_into_column(min_column, column_index.min_values, null_pages, *_col_type,
get_column_parquet_field(), _opts.timezone);
if (!st.ok()) {
// swallow error status
LOG(INFO) << "Error when decode min/max statistics, type " << _col_type->debug_string();
return false;
}
// deal with max_values
st = StatisticsHelper::decode_value_into_column(max_column, column_index.max_values, *_col_type,
st = StatisticsHelper::decode_value_into_column(max_column, column_index.max_values, null_pages, *_col_type,
get_column_parquet_field(), _opts.timezone);
if (!st.ok()) {
// swallow error status
Expand All @@ -379,7 +383,6 @@ StatusOr<bool> ScalarColumnReader::page_index_zone_map_filter(const std::vector<
DCHECK_EQ(page_num, max_column->size());

// fill ZoneMapDetail
const std::vector<bool> null_pages = column_index.null_pages;
std::vector<ZoneMapDetail> zone_map_details{};
for (size_t i = 0; i < page_num; i++) {
if (null_pages[i]) {
Expand Down
Loading

0 comments on commit 33da390

Please sign in to comment.