Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] Use more clear type description in ParquetField #52575

Merged
merged 4 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 13 additions & 14 deletions be/src/formats/parquet/column_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

#include <algorithm>
#include <boost/algorithm/string/case_conv.hpp>
#include <boost/iterator/iterator_facade.hpp>
#include <map>
#include <ostream>
#include <unordered_map>
Expand All @@ -30,7 +29,6 @@
#include "common/compiler_util.h"
#include "exec/exec_node.h"
#include "exec/hdfs_scanner.h"
#include "exprs/expr_context.h"
#include "formats/parquet/complex_column_reader.h"
#include "formats/parquet/scalar_column_reader.h"
#include "formats/utils.h"
Expand Down Expand Up @@ -137,7 +135,7 @@ Status ColumnDictFilterContext::rewrite_conjunct_ctxs_to_predicate(StoredColumnR

void ColumnReader::get_subfield_pos_with_pruned_type(const ParquetField& field, const TypeDescriptor& col_type,
bool case_sensitive, std::vector<int32_t>& pos) {
DCHECK(field.type.type == LogicalType::TYPE_STRUCT);
DCHECK(field.type == ColumnType::STRUCT);
if (!col_type.field_ids.empty()) {
std::unordered_map<int32_t, size_t> field_id_2_pos;
for (size_t i = 0; i < field.children.size(); i++) {
Expand Down Expand Up @@ -246,12 +244,12 @@ bool ColumnReader::_has_valid_subfield_column_reader(
Status ColumnReader::create(const ColumnReaderOptions& opts, const ParquetField* field, const TypeDescriptor& col_type,
std::unique_ptr<ColumnReader>* output) {
// We will only set a complex type in ParquetField
if ((field->type.is_complex_type() || col_type.is_complex_type()) && (field->type.type != col_type.type)) {
if ((field->is_complex_type() || col_type.is_complex_type()) && !field->has_same_complex_type(col_type)) {
return Status::InternalError(
strings::Substitute("ParquetField '$0' file's type $1 is different from table's type $2", field->name,
logical_type_to_string(field->type.type), logical_type_to_string(col_type.type)));
column_type_to_string(field->type), logical_type_to_string(col_type.type)));
}
if (field->type.type == LogicalType::TYPE_ARRAY) {
if (field->type == ColumnType::ARRAY) {
std::unique_ptr<ColumnReader> child_reader;
RETURN_IF_ERROR(ColumnReader::create(opts, &field->children[0], col_type.children[0], &child_reader));
if (child_reader != nullptr) {
Expand All @@ -261,7 +259,7 @@ Status ColumnReader::create(const ColumnReaderOptions& opts, const ParquetField*
} else {
*output = nullptr;
}
} else if (field->type.type == LogicalType::TYPE_MAP) {
} else if (field->type == ColumnType::MAP) {
std::unique_ptr<ColumnReader> key_reader = nullptr;
std::unique_ptr<ColumnReader> value_reader = nullptr;

Expand All @@ -279,7 +277,7 @@ Status ColumnReader::create(const ColumnReaderOptions& opts, const ParquetField*
} else {
*output = nullptr;
}
} else if (field->type.type == LogicalType::TYPE_STRUCT) {
} else if (field->type == ColumnType::STRUCT) {
std::vector<int32_t> subfield_pos(col_type.children.size());
get_subfield_pos_with_pruned_type(*field, col_type, opts.case_sensitive, subfield_pos);

Expand Down Expand Up @@ -316,12 +314,13 @@ Status ColumnReader::create(const ColumnReaderOptions& opts, const ParquetField*
Status ColumnReader::create(const ColumnReaderOptions& opts, const ParquetField* field, const TypeDescriptor& col_type,
const TIcebergSchemaField* iceberg_schema_field, std::unique_ptr<ColumnReader>* output) {
// We will only set a complex type in ParquetField
if ((field->type.is_complex_type() || col_type.is_complex_type()) && (field->type.type != col_type.type)) {
return Status::InternalError(strings::Substitute("ParquetField's type $0 is different from table's type $1",
field->type.type, col_type.type));
if ((field->is_complex_type() || col_type.is_complex_type()) && !field->has_same_complex_type(col_type)) {
return Status::InternalError(
strings::Substitute("ParquetField '$0' file's type $1 is different from table's type $2", field->name,
column_type_to_string(field->type), logical_type_to_string(col_type.type)));
}
DCHECK(iceberg_schema_field != nullptr);
if (field->type.type == LogicalType::TYPE_ARRAY) {
if (field->type == ColumnType::ARRAY) {
std::unique_ptr<ColumnReader> child_reader;
const TIcebergSchemaField* element_schema = &iceberg_schema_field->children[0];
RETURN_IF_ERROR(
Expand All @@ -333,7 +332,7 @@ Status ColumnReader::create(const ColumnReaderOptions& opts, const ParquetField*
} else {
*output = nullptr;
}
} else if (field->type.type == LogicalType::TYPE_MAP) {
} else if (field->type == ColumnType::MAP) {
std::unique_ptr<ColumnReader> key_reader = nullptr;
std::unique_ptr<ColumnReader> value_reader = nullptr;

Expand All @@ -356,7 +355,7 @@ Status ColumnReader::create(const ColumnReaderOptions& opts, const ParquetField*
} else {
*output = nullptr;
}
} else if (field->type.type == LogicalType::TYPE_STRUCT) {
} else if (field->type == ColumnType::STRUCT) {
std::vector<int32_t> subfield_pos(col_type.children.size());
std::vector<const TIcebergSchemaField*> iceberg_schema_subfield(col_type.children.size());
get_subfield_pos_with_pruned_type(*field, col_type, opts.case_sensitive, iceberg_schema_field, subfield_pos,
Expand Down
1 change: 0 additions & 1 deletion be/src/formats/parquet/group_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
#include "exec/hdfs_scanner.h"
#include "exprs/expr.h"
#include "exprs/expr_context.h"
#include "exprs/function_context.h"
#include "formats/parquet/metadata.h"
#include "formats/parquet/page_index_reader.h"
#include "formats/parquet/schema.h"
Expand Down
21 changes: 10 additions & 11 deletions be/src/formats/parquet/meta_helper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ void ParquetMetaHelper::build_column_name_2_pos_in_meta(
// -- ColumnMetaData(path_in_schema=[col_tinyint])
// -- ColumnMetaData(path_in_schema=[col_struct, name])
// -- ColumnMetaData(path_in_schema=[col_struct, age])
if (field->type.is_complex_type()) continue;
if (field->is_complex_type()) continue;
// Put SlotDescriptor's origin column name here!
column_name_2_pos_in_meta.emplace(slot->col_name(), field->physical_column_index);
}
Expand Down Expand Up @@ -102,25 +102,25 @@ bool ParquetMetaHelper::_is_valid_type(const ParquetField* parquet_field, const
}
// only check for complex type now
// if complex type has none valid subfield, we will treat this struct type as invalid type.
if (!parquet_field->type.is_complex_type()) {
if (!parquet_field->is_complex_type()) {
return true;
}

if (parquet_field->type.type != type_descriptor->type) {
// complex type mismatched
// check the complex type is matched
if (!parquet_field->has_same_complex_type(*type_descriptor)) {
return false;
}

bool has_valid_child = false;

if (parquet_field->type.is_array_type() || parquet_field->type.is_map_type()) {
if (parquet_field->type == ColumnType::ARRAY || parquet_field->type == ColumnType::MAP) {
for (size_t idx = 0; idx < parquet_field->children.size(); idx++) {
if (_is_valid_type(&parquet_field->children[idx], &type_descriptor->children[idx])) {
has_valid_child = true;
break;
}
}
} else if (parquet_field->type.is_struct_type()) {
} else if (parquet_field->type == ColumnType::STRUCT) {
if (!type_descriptor->field_ids.empty()) {
std::unordered_map<int32_t, const TypeDescriptor*> field_id_2_type;
for (size_t idx = 0; idx < type_descriptor->children.size(); idx++) {
Expand Down Expand Up @@ -192,26 +192,25 @@ bool IcebergMetaHelper::_is_valid_type(const ParquetField* parquet_field, const
const TypeDescriptor* type_descriptor) const {
// only check for complex type now
// if complex type has none valid subfield, we will treat this struct type as invalid type.
if (!parquet_field->type.is_complex_type()) {
if (!parquet_field->is_complex_type()) {
return true;
}

if (parquet_field->type.type != type_descriptor->type) {
// complex type mismatched
if (!parquet_field->has_same_complex_type(*type_descriptor)) {
return false;
}

bool has_valid_child = false;

if (parquet_field->type.is_array_type() || parquet_field->type.is_map_type()) {
if (parquet_field->type == ColumnType::ARRAY || parquet_field->type == ColumnType::MAP) {
for (size_t idx = 0; idx < parquet_field->children.size(); idx++) {
if (_is_valid_type(&parquet_field->children[idx], &field_schema->children[idx],
&type_descriptor->children[idx])) {
has_valid_child = true;
break;
}
}
} else if (parquet_field->type.is_struct_type()) {
} else if (parquet_field->type == ColumnType::STRUCT) {
std::unordered_map<int32_t, const TIcebergSchemaField*> field_id_2_iceberg_schema{};
std::unordered_map<int32_t, const TypeDescriptor*> field_id_2_type{};
for (const auto& field : field_schema->children) {
Expand Down
55 changes: 39 additions & 16 deletions be/src/formats/parquet/schema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
#include "formats/parquet/schema.h"

#include <boost/algorithm/string/case_conv.hpp>
#include <boost/iterator/iterator_facade.hpp>
#include <memory>
#include <sstream>
#include <utility>
Expand All @@ -26,6 +25,21 @@

namespace starrocks::parquet {

std::string column_type_to_string(const ColumnType& column_type) {
switch (column_type) {
case SCALAR:
return "scalar";
case ARRAY:
return "array";
case MAP:
return "map";
case STRUCT:
return "struct";
default:
return "unknown";
}
}

std::string LevelInfo::debug_string() const {
std::stringstream ss;
ss << "LevelInfo(max_def_level=" << max_def_level << ",max_rep_level=" << max_rep_level
Expand All @@ -35,7 +49,7 @@ std::string LevelInfo::debug_string() const {

std::string ParquetField::debug_string() const {
std::stringstream ss;
ss << "ParquetField(name=" << name << ",type=" << type.type << ",physical_type=" << physical_type
ss << "ParquetField(name=" << name << ",type=" << column_type_to_string(type) << ",physical_type=" << physical_type
<< ",physical_column_index=" << physical_column_index << ",levels_info=" << level_info.debug_string();
if (children.size() > 0) {
ss << ",children=[";
Expand All @@ -51,6 +65,23 @@ std::string ParquetField::debug_string() const {
return ss.str();
}

bool ParquetField::is_complex_type() const {
return type == ARRAY || type == MAP || type == STRUCT;
}

bool ParquetField::has_same_complex_type(const TypeDescriptor& type_descriptor) const {
// check the complex type is matched
if (type == ColumnType::ARRAY && type_descriptor.type == LogicalType::TYPE_ARRAY) {
return true;
} else if (type == ColumnType::MAP && type_descriptor.type == LogicalType::TYPE_MAP) {
return true;
} else if (type == ColumnType::STRUCT && type_descriptor.type == LogicalType::TYPE_STRUCT) {
return true;
} else {
return false;
}
}

static bool is_group(const tparquet::SchemaElement* schema) {
return schema->num_children > 0;
}
Expand Down Expand Up @@ -80,6 +111,7 @@ Status SchemaDescriptor::leaf_to_field(const tparquet::SchemaElement* t_schema,
bool is_nullable, ParquetField* field) {
field->name = t_schema->name;
field->schema_element = *t_schema;
field->type = ColumnType::SCALAR;
field->is_nullable = is_nullable;
field->physical_type = t_schema->type;
field->type_length = t_schema->type_length;
Expand Down Expand Up @@ -164,8 +196,7 @@ Status SchemaDescriptor::list_to_field(const std::vector<tparquet::SchemaElement

field->name = group_schema->name;
field->field_id = group_schema->field_id;
field->type.type = TYPE_ARRAY;
field->type.children.push_back(field->children[0].type);
field->type = ColumnType::ARRAY;
field->is_nullable = is_optional(group_schema);
field->level_info = cur_level_info;
field->level_info.immediate_repeated_ancestor_def_level = last_immediate_repeated_ancestor_def_level;
Expand Down Expand Up @@ -237,9 +268,7 @@ Status SchemaDescriptor::map_to_field(const std::vector<tparquet::SchemaElement>
field->name = group_schema->name;
// Actually, we don't need to put field_id here
field->field_id = group_schema->field_id;
field->type.type = TYPE_MAP;
field->type.children.emplace_back(key_field->type);
field->type.children.emplace_back(value_field->type);
field->type = ColumnType::MAP;
field->is_nullable = is_optional(group_schema);
field->level_info = cur_level_info;
field->level_info.immediate_repeated_ancestor_def_level = last_immediate_repeated_ancestor_def_level;
Expand All @@ -262,13 +291,7 @@ Status SchemaDescriptor::group_to_struct_field(const std::vector<tparquet::Schem
field->name = group_schema->name;
field->is_nullable = is_optional(group_schema);
field->level_info = cur_level_info;
field->type.type = TYPE_STRUCT;
for (size_t i = 0; i < num_children; i++) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

never used children

field->type.children.emplace_back(field->children[i].type);
}
for (size_t i = 0; i < num_children; i++) {
field->type.field_names.emplace_back(field->children[i].name);
}
field->type = ColumnType::STRUCT;
field->field_id = group_schema->field_id;
return Status::OK();
}
Expand All @@ -295,7 +318,7 @@ Status SchemaDescriptor::group_to_field(const std::vector<tparquet::SchemaElemen
RETURN_IF_ERROR(group_to_struct_field(t_schemas, pos, cur_level_info, &field->children[0], next_pos));

field->name = group_schema->name;
field->type.type = TYPE_ARRAY;
field->type = ColumnType::ARRAY;
field->is_nullable = false;
field->level_info = cur_level_info;
field->level_info.immediate_repeated_ancestor_def_level = last_immediate_repeated_ancestor_def_level;
Expand Down Expand Up @@ -334,7 +357,7 @@ Status SchemaDescriptor::node_to_field(const std::vector<tparquet::SchemaElement
RETURN_IF_ERROR(leaf_to_field(node_schema, cur_level_info, false, child));

field->name = node_schema->name;
field->type.type = TYPE_ARRAY;
field->type = ColumnType::ARRAY;
field->is_nullable = false;
field->field_id = node_schema->field_id;
field->level_info = cur_level_info;
Expand Down
8 changes: 7 additions & 1 deletion be/src/formats/parquet/schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,16 @@ struct LevelInfo {
std::string debug_string() const;
};

enum ColumnType { SCALAR = 0, ARRAY, MAP, STRUCT };

std::string column_type_to_string(const ColumnType& column_type);

struct ParquetField {
std::string name;
tparquet::SchemaElement schema_element;

// Used to identify if this field is a nested field.
TypeDescriptor type;
ColumnType type;
bool is_nullable;

// Only valid when this field is a leaf node
Expand All @@ -118,6 +122,8 @@ struct ParquetField {
int16_t max_def_level() const { return level_info.max_def_level; }
int16_t max_rep_level() const { return level_info.max_rep_level; }
std::string debug_string() const;
bool is_complex_type() const;
bool has_same_complex_type(const TypeDescriptor& type_descriptor) const;
};

class SchemaDescriptor {
Expand Down
2 changes: 1 addition & 1 deletion be/src/util/system_metrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -288,10 +288,10 @@ void SystemMetrics::_install_memory_metrics(MetricRegistry* registry) {
}

void SystemMetrics::_update_memory_metrics() {
size_t value = 0;
#if defined(ADDRESS_SANITIZER) || defined(LEAK_SANITIZER) || defined(THREAD_SANITIZER)
LOG(INFO) << "Memory tracking is not available with address sanitizer builds.";
#else
size_t value = 0;
Copy link
Contributor Author

@Smith-Cruise Smith-Cruise Nov 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix asan compile warning: unused values

// Update the statistics cached by mallctl.
uint64_t epoch = 1;
size_t sz = sizeof(epoch);
Expand Down
2 changes: 1 addition & 1 deletion be/test/formats/parquet/group_reader_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ TEST_F(GroupReaderTest, TestGetNext) {
TEST_F(GroupReaderTest, ColumnReaderCreateTypeMismatch) {
ParquetField field;
field.name = "col0";
field.type.type = LogicalType::TYPE_ARRAY;
field.type = ColumnType::ARRAY;

TypeDescriptor col_type;
col_type.type = LogicalType::TYPE_VARCHAR;
Expand Down
6 changes: 3 additions & 3 deletions be/test/formats/parquet/parquet_cli_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,20 +113,20 @@ class ParquetCLIReader {

StatusOr<TypeDescriptor> _build_type(const ParquetField& field) {
TypeDescriptor type;
if (field.type.type == TYPE_STRUCT) {
if (field.type == ColumnType::STRUCT) {
type.type = TYPE_STRUCT;
for (const auto& i : field.children) {
ASSIGN_OR_RETURN(auto child_type, _build_type(i));
type.children.emplace_back(child_type);
type.field_names.emplace_back(i.name);
}
} else if (field.type.type == TYPE_MAP) {
} else if (field.type == ColumnType::MAP) {
type.type = TYPE_MAP;
for (const auto& i : field.children) {
ASSIGN_OR_RETURN(auto child_type, _build_type(i));
type.children.emplace_back(child_type);
}
} else if (field.type.type == TYPE_ARRAY) {
} else if (field.type == ColumnType::ARRAY) {
type.type = TYPE_ARRAY;
ASSIGN_OR_RETURN(auto child_type, _build_type(field.children[0]));
type.children.emplace_back(child_type);
Expand Down
Loading
Loading