Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] Use more clear type description in ParquetField (backport #52575) #52615

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 47 additions & 9 deletions be/src/formats/parquet/column_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include <boost/algorithm/string.hpp>

<<<<<<< HEAD
#include "column/array_column.h"
#include "column/map_column.h"
#include "column/struct_column.h"
Expand All @@ -25,6 +26,26 @@
#include "formats/parquet/column_converter.h"
#include "formats/parquet/stored_column_reader.h"
#include "gutil/strings/substitute.h"
=======
#include <algorithm>
#include <boost/algorithm/string/case_conv.hpp>
#include <map>
#include <ostream>
#include <unordered_map>
#include <utility>

#include "column/chunk.h"
#include "column/column_helper.h"
#include "column/nullable_column.h"
#include "common/compiler_util.h"
#include "exec/exec_node.h"
#include "exec/hdfs_scanner.h"
#include "formats/parquet/complex_column_reader.h"
#include "formats/parquet/scalar_column_reader.h"
#include "formats/utils.h"
#include "gen_cpp/Descriptors_types.h"
#include "gen_cpp/parquet_types.h"
>>>>>>> 1028b6ac2c ([Enhancement] Use more clear type description in ParquetField (#52575))
#include "simd/batch_run_counter.h"
#include "storage/column_or_predicate.h"
#include "util/runtime_profile.h"
Expand Down Expand Up @@ -1035,7 +1056,16 @@ class StructColumnReader : public ColumnReader {

void ColumnReader::get_subfield_pos_with_pruned_type(const ParquetField& field, const TypeDescriptor& col_type,
bool case_sensitive, std::vector<int32_t>& pos) {
<<<<<<< HEAD
DCHECK(field.type.type == LogicalType::TYPE_STRUCT);
=======
DCHECK(field.type == ColumnType::STRUCT);
if (!col_type.field_ids.empty()) {
std::unordered_map<int32_t, size_t> field_id_2_pos;
for (size_t i = 0; i < field.children.size(); i++) {
field_id_2_pos.emplace(field.children[i].field_id, i);
}
>>>>>>> 1028b6ac2c ([Enhancement] Use more clear type description in ParquetField (#52575))

// build tmp mapping for ParquetField
std::unordered_map<std::string, size_t> field_name_2_pos;
Expand Down Expand Up @@ -1118,11 +1148,18 @@ bool ColumnReader::_has_valid_subfield_column_reader(
Status ColumnReader::create(const ColumnReaderOptions& opts, const ParquetField* field, const TypeDescriptor& col_type,
std::unique_ptr<ColumnReader>* output) {
// We will only set a complex type in ParquetField
<<<<<<< HEAD
if ((field->type.is_complex_type() || col_type.is_complex_type()) && (field->type.type != col_type.type)) {
return Status::InternalError(strings::Substitute("ParquetField's type $0 is different from table's type $1",
field->type.type, col_type.type));
=======
if ((field->is_complex_type() || col_type.is_complex_type()) && !field->has_same_complex_type(col_type)) {
return Status::InternalError(
strings::Substitute("ParquetField '$0' file's type $1 is different from table's type $2", field->name,
column_type_to_string(field->type), logical_type_to_string(col_type.type)));
>>>>>>> 1028b6ac2c ([Enhancement] Use more clear type description in ParquetField (#52575))
}
if (field->type.type == LogicalType::TYPE_ARRAY) {
if (field->type == ColumnType::ARRAY) {
std::unique_ptr<ColumnReader> child_reader;
RETURN_IF_ERROR(ColumnReader::create(opts, &field->children[0], col_type.children[0], &child_reader));
if (child_reader != nullptr) {
Expand All @@ -1132,7 +1169,7 @@ Status ColumnReader::create(const ColumnReaderOptions& opts, const ParquetField*
} else {
*output = nullptr;
}
} else if (field->type.type == LogicalType::TYPE_MAP) {
} else if (field->type == ColumnType::MAP) {
std::unique_ptr<ColumnReader> key_reader = nullptr;
std::unique_ptr<ColumnReader> value_reader = nullptr;

Expand All @@ -1150,7 +1187,7 @@ Status ColumnReader::create(const ColumnReaderOptions& opts, const ParquetField*
} else {
*output = nullptr;
}
} else if (field->type.type == LogicalType::TYPE_STRUCT) {
} else if (field->type == ColumnType::STRUCT) {
std::vector<int32_t> subfield_pos(col_type.children.size());
get_subfield_pos_with_pruned_type(*field, col_type, opts.case_sensitive, subfield_pos);

Expand Down Expand Up @@ -1186,12 +1223,13 @@ Status ColumnReader::create(const ColumnReaderOptions& opts, const ParquetField*
Status ColumnReader::create(const ColumnReaderOptions& opts, const ParquetField* field, const TypeDescriptor& col_type,
const TIcebergSchemaField* iceberg_schema_field, std::unique_ptr<ColumnReader>* output) {
// We will only set a complex type in ParquetField
if ((field->type.is_complex_type() || col_type.is_complex_type()) && (field->type.type != col_type.type)) {
return Status::InternalError(strings::Substitute("ParquetField's type $0 is different from table's type $1",
field->type.type, col_type.type));
if ((field->is_complex_type() || col_type.is_complex_type()) && !field->has_same_complex_type(col_type)) {
return Status::InternalError(
strings::Substitute("ParquetField '$0' file's type $1 is different from table's type $2", field->name,
column_type_to_string(field->type), logical_type_to_string(col_type.type)));
}
DCHECK(iceberg_schema_field != nullptr);
if (field->type.type == LogicalType::TYPE_ARRAY) {
if (field->type == ColumnType::ARRAY) {
std::unique_ptr<ColumnReader> child_reader;
const TIcebergSchemaField* element_schema = &iceberg_schema_field->children[0];
RETURN_IF_ERROR(
Expand All @@ -1203,7 +1241,7 @@ Status ColumnReader::create(const ColumnReaderOptions& opts, const ParquetField*
} else {
*output = nullptr;
}
} else if (field->type.type == LogicalType::TYPE_MAP) {
} else if (field->type == ColumnType::MAP) {
std::unique_ptr<ColumnReader> key_reader = nullptr;
std::unique_ptr<ColumnReader> value_reader = nullptr;

Expand All @@ -1226,7 +1264,7 @@ Status ColumnReader::create(const ColumnReaderOptions& opts, const ParquetField*
} else {
*output = nullptr;
}
} else if (field->type.type == LogicalType::TYPE_STRUCT) {
} else if (field->type == ColumnType::STRUCT) {
std::vector<int32_t> subfield_pos(col_type.children.size());
std::vector<const TIcebergSchemaField*> iceberg_schema_subfield(col_type.children.size());
get_subfield_pos_with_pruned_type(*field, col_type, opts.case_sensitive, iceberg_schema_field, subfield_pos,
Expand Down
6 changes: 6 additions & 0 deletions be/src/formats/parquet/group_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@
#include "exec/hdfs_scanner.h"
#include "exprs/expr.h"
#include "exprs/expr_context.h"
<<<<<<< HEAD
=======
#include "formats/parquet/metadata.h"
#include "formats/parquet/page_index_reader.h"
#include "formats/parquet/schema.h"
>>>>>>> 1028b6ac2c ([Enhancement] Use more clear type description in ParquetField (#52575))
#include "gutil/strings/substitute.h"
#include "runtime/types.h"
#include "simd/simd.h"
Expand Down
53 changes: 44 additions & 9 deletions be/src/formats/parquet/meta_helper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,34 @@ void ParquetMetaHelper::build_column_name_2_pos_in_meta(
break;
}
}
<<<<<<< HEAD
=======

// After the column is added, there is no new column when querying the previously
// imported parquet file. It is skipped here, and this column will be set to NULL
// in the FileReader::_read_min_max_chunk.
if (field == nullptr) continue;
// For field which type is complex, the filed physical_column_index in file meta is not same with the column index
// in row_group's column metas
// For example:
// table schema :
// -- col_tinyint tinyint
// -- col_struct struct
// ----- name string
// ----- age int
// file metadata schema :
// -- ParquetField(name=col_tinyint, physical_column_index=0)
// -- ParquetField(name=col_struct,physical_column_index=0,
// children=[ParquetField(name=name, physical_column_index=1),
// ParquetField(name=age, physical_column_index=2)])
// row group column metas:
// -- ColumnMetaData(path_in_schema=[col_tinyint])
// -- ColumnMetaData(path_in_schema=[col_struct, name])
// -- ColumnMetaData(path_in_schema=[col_struct, age])
if (field->is_complex_type()) continue;
// Put SlotDescriptor's origin column name here!
column_name_2_pos_in_meta.emplace(slot->col_name(), field->physical_column_index);
>>>>>>> 1028b6ac2c ([Enhancement] Use more clear type description in ParquetField (#52575))
}
}

Expand Down Expand Up @@ -66,24 +94,25 @@ bool ParquetMetaHelper::_is_valid_type(const ParquetField* parquet_field, const
}
// only check for complex type now
// if complex type has none valid subfield, we will treat this struct type as invalid type.
if (!parquet_field->type.is_complex_type()) {
if (!parquet_field->is_complex_type()) {
return true;
}

if (parquet_field->type.type != type_descriptor->type) {
// complex type mismatched
// check the complex type is matched
if (!parquet_field->has_same_complex_type(*type_descriptor)) {
return false;
}

bool has_valid_child = false;

if (parquet_field->type.is_array_type() || parquet_field->type.is_map_type()) {
if (parquet_field->type == ColumnType::ARRAY || parquet_field->type == ColumnType::MAP) {
for (size_t idx = 0; idx < parquet_field->children.size(); idx++) {
if (_is_valid_type(&parquet_field->children[idx], &type_descriptor->children[idx])) {
has_valid_child = true;
break;
}
}
<<<<<<< HEAD
} else if (parquet_field->type.is_struct_type()) {
std::unordered_map<std::string, const TypeDescriptor*> field_name_2_type{};
for (size_t idx = 0; idx < type_descriptor->children.size(); idx++) {
Expand All @@ -96,6 +125,13 @@ bool ParquetMetaHelper::_is_valid_type(const ParquetField* parquet_field, const
auto it = field_name_2_type.find(Utils::format_name(child_parquet_field.name, _case_sensitive));
if (it == field_name_2_type.end()) {
continue;
=======
} else if (parquet_field->type == ColumnType::STRUCT) {
if (!type_descriptor->field_ids.empty()) {
std::unordered_map<int32_t, const TypeDescriptor*> field_id_2_type;
for (size_t idx = 0; idx < type_descriptor->children.size(); idx++) {
field_id_2_type.emplace(type_descriptor->field_ids[idx], &type_descriptor->children[idx]);
>>>>>>> 1028b6ac2c ([Enhancement] Use more clear type description in ParquetField (#52575))
}

if (_is_valid_type(&child_parquet_field, it->second)) {
Expand All @@ -122,26 +158,25 @@ bool IcebergMetaHelper::_is_valid_type(const ParquetField* parquet_field, const
const TypeDescriptor* type_descriptor) const {
// only check for complex type now
// if complex type has none valid subfield, we will treat this struct type as invalid type.
if (!parquet_field->type.is_complex_type()) {
if (!parquet_field->is_complex_type()) {
return true;
}

if (parquet_field->type.type != type_descriptor->type) {
// complex type mismatched
if (!parquet_field->has_same_complex_type(*type_descriptor)) {
return false;
}

bool has_valid_child = false;

if (parquet_field->type.is_array_type() || parquet_field->type.is_map_type()) {
if (parquet_field->type == ColumnType::ARRAY || parquet_field->type == ColumnType::MAP) {
for (size_t idx = 0; idx < parquet_field->children.size(); idx++) {
if (_is_valid_type(&parquet_field->children[idx], &field_schema->children[idx],
&type_descriptor->children[idx])) {
has_valid_child = true;
break;
}
}
} else if (parquet_field->type.is_struct_type()) {
} else if (parquet_field->type == ColumnType::STRUCT) {
std::unordered_map<int32_t, const TIcebergSchemaField*> field_id_2_iceberg_schema{};
std::unordered_map<int32_t, const TypeDescriptor*> field_id_2_type{};
for (const auto& field : field_schema->children) {
Expand Down
61 changes: 46 additions & 15 deletions be/src/formats/parquet/schema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,35 @@

#include "formats/parquet/schema.h"

<<<<<<< HEAD
#include <boost/algorithm/string.hpp>
=======
#include <boost/algorithm/string/case_conv.hpp>
#include <memory>
#include <sstream>
#include <utility>
>>>>>>> 1028b6ac2c ([Enhancement] Use more clear type description in ParquetField (#52575))

#include "gutil/casts.h"
#include "gutil/strings/substitute.h"

namespace starrocks::parquet {

std::string column_type_to_string(const ColumnType& column_type) {
switch (column_type) {
case SCALAR:
return "scalar";
case ARRAY:
return "array";
case MAP:
return "map";
case STRUCT:
return "struct";
default:
return "unknown";
}
}

std::string LevelInfo::debug_string() const {
std::stringstream ss;
ss << "LevelInfo(max_def_level=" << max_def_level << ",max_rep_level=" << max_rep_level
Expand All @@ -30,7 +52,7 @@ std::string LevelInfo::debug_string() const {

std::string ParquetField::debug_string() const {
std::stringstream ss;
ss << "ParquetField(name=" << name << ",type=" << type.type << ",physical_type=" << physical_type
ss << "ParquetField(name=" << name << ",type=" << column_type_to_string(type) << ",physical_type=" << physical_type
<< ",physical_column_index=" << physical_column_index << ",levels_info=" << level_info.debug_string();
if (children.size() > 0) {
ss << ",children=[";
Expand All @@ -46,6 +68,23 @@ std::string ParquetField::debug_string() const {
return ss.str();
}

bool ParquetField::is_complex_type() const {
return type == ARRAY || type == MAP || type == STRUCT;
}

bool ParquetField::has_same_complex_type(const TypeDescriptor& type_descriptor) const {
// check the complex type is matched
if (type == ColumnType::ARRAY && type_descriptor.type == LogicalType::TYPE_ARRAY) {
return true;
} else if (type == ColumnType::MAP && type_descriptor.type == LogicalType::TYPE_MAP) {
return true;
} else if (type == ColumnType::STRUCT && type_descriptor.type == LogicalType::TYPE_STRUCT) {
return true;
} else {
return false;
}
}

static bool is_group(const tparquet::SchemaElement* schema) {
return schema->num_children > 0;
}
Expand Down Expand Up @@ -75,6 +114,7 @@ Status SchemaDescriptor::leaf_to_field(const tparquet::SchemaElement* t_schema,
bool is_nullable, ParquetField* field) {
field->name = t_schema->name;
field->schema_element = *t_schema;
field->type = ColumnType::SCALAR;
field->is_nullable = is_nullable;
field->physical_type = t_schema->type;
field->type_length = t_schema->type_length;
Expand Down Expand Up @@ -159,8 +199,7 @@ Status SchemaDescriptor::list_to_field(const std::vector<tparquet::SchemaElement

field->name = group_schema->name;
field->field_id = group_schema->field_id;
field->type.type = TYPE_ARRAY;
field->type.children.push_back(field->children[0].type);
field->type = ColumnType::ARRAY;
field->is_nullable = is_optional(group_schema);
field->level_info = cur_level_info;
field->level_info.immediate_repeated_ancestor_def_level = last_immediate_repeated_ancestor_def_level;
Expand Down Expand Up @@ -232,9 +271,7 @@ Status SchemaDescriptor::map_to_field(const std::vector<tparquet::SchemaElement>
field->name = group_schema->name;
// Actually, we don't need to put field_id here
field->field_id = group_schema->field_id;
field->type.type = TYPE_MAP;
field->type.children.emplace_back(key_field->type);
field->type.children.emplace_back(value_field->type);
field->type = ColumnType::MAP;
field->is_nullable = is_optional(group_schema);
field->level_info = cur_level_info;
field->level_info.immediate_repeated_ancestor_def_level = last_immediate_repeated_ancestor_def_level;
Expand All @@ -257,13 +294,7 @@ Status SchemaDescriptor::group_to_struct_field(const std::vector<tparquet::Schem
field->name = group_schema->name;
field->is_nullable = is_optional(group_schema);
field->level_info = cur_level_info;
field->type.type = TYPE_STRUCT;
for (size_t i = 0; i < num_children; i++) {
field->type.children.emplace_back(field->children[i].type);
}
for (size_t i = 0; i < num_children; i++) {
field->type.field_names.emplace_back(field->children[i].name);
}
field->type = ColumnType::STRUCT;
field->field_id = group_schema->field_id;
return Status::OK();
}
Expand All @@ -290,7 +321,7 @@ Status SchemaDescriptor::group_to_field(const std::vector<tparquet::SchemaElemen
RETURN_IF_ERROR(group_to_struct_field(t_schemas, pos, cur_level_info, &field->children[0], next_pos));

field->name = group_schema->name;
field->type.type = TYPE_ARRAY;
field->type = ColumnType::ARRAY;
field->is_nullable = false;
field->level_info = cur_level_info;
field->level_info.immediate_repeated_ancestor_def_level = last_immediate_repeated_ancestor_def_level;
Expand Down Expand Up @@ -329,7 +360,7 @@ Status SchemaDescriptor::node_to_field(const std::vector<tparquet::SchemaElement
RETURN_IF_ERROR(leaf_to_field(node_schema, cur_level_info, false, child));

field->name = node_schema->name;
field->type.type = TYPE_ARRAY;
field->type = ColumnType::ARRAY;
field->is_nullable = false;
field->field_id = node_schema->field_id;
field->level_info = cur_level_info;
Expand Down
Loading
Loading