Skip to content

Commit

Permalink
[Enhancement] Use more clear type description in ParquetField (#52575)
Browse files Browse the repository at this point in the history
Signed-off-by: Smith Cruise <[email protected]>
(cherry picked from commit 1028b6a)

# Conflicts:
#	be/src/formats/parquet/column_reader.cpp
#	be/src/formats/parquet/group_reader.cpp
#	be/src/formats/parquet/meta_helper.cpp
#	be/src/formats/parquet/schema.cpp
#	be/test/formats/parquet/group_reader_test.cpp
  • Loading branch information
Smith-Cruise authored and mergify[bot] committed Nov 5, 2024
1 parent 9be095c commit 19b62dc
Show file tree
Hide file tree
Showing 9 changed files with 203 additions and 71 deletions.
56 changes: 47 additions & 9 deletions be/src/formats/parquet/column_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include <boost/algorithm/string.hpp>

<<<<<<< HEAD
#include "column/array_column.h"
#include "column/map_column.h"
#include "column/struct_column.h"
Expand All @@ -25,6 +26,26 @@
#include "formats/parquet/column_converter.h"
#include "formats/parquet/stored_column_reader.h"
#include "gutil/strings/substitute.h"
=======
#include <algorithm>
#include <boost/algorithm/string/case_conv.hpp>
#include <map>
#include <ostream>
#include <unordered_map>
#include <utility>

#include "column/chunk.h"
#include "column/column_helper.h"
#include "column/nullable_column.h"
#include "common/compiler_util.h"
#include "exec/exec_node.h"
#include "exec/hdfs_scanner.h"
#include "formats/parquet/complex_column_reader.h"
#include "formats/parquet/scalar_column_reader.h"
#include "formats/utils.h"
#include "gen_cpp/Descriptors_types.h"
#include "gen_cpp/parquet_types.h"
>>>>>>> 1028b6ac2c ([Enhancement] Use more clear type description in ParquetField (#52575))
#include "simd/batch_run_counter.h"
#include "storage/column_or_predicate.h"
#include "util/runtime_profile.h"
Expand Down Expand Up @@ -1035,7 +1056,16 @@ class StructColumnReader : public ColumnReader {

void ColumnReader::get_subfield_pos_with_pruned_type(const ParquetField& field, const TypeDescriptor& col_type,
bool case_sensitive, std::vector<int32_t>& pos) {
<<<<<<< HEAD
DCHECK(field.type.type == LogicalType::TYPE_STRUCT);
=======
DCHECK(field.type == ColumnType::STRUCT);
if (!col_type.field_ids.empty()) {
std::unordered_map<int32_t, size_t> field_id_2_pos;
for (size_t i = 0; i < field.children.size(); i++) {
field_id_2_pos.emplace(field.children[i].field_id, i);
}
>>>>>>> 1028b6ac2c ([Enhancement] Use more clear type description in ParquetField (#52575))

// build tmp mapping for ParquetField
std::unordered_map<std::string, size_t> field_name_2_pos;
Expand Down Expand Up @@ -1118,11 +1148,18 @@ bool ColumnReader::_has_valid_subfield_column_reader(
Status ColumnReader::create(const ColumnReaderOptions& opts, const ParquetField* field, const TypeDescriptor& col_type,
std::unique_ptr<ColumnReader>* output) {
// We will only set a complex type in ParquetField
<<<<<<< HEAD
if ((field->type.is_complex_type() || col_type.is_complex_type()) && (field->type.type != col_type.type)) {
return Status::InternalError(strings::Substitute("ParquetField's type $0 is different from table's type $1",
field->type.type, col_type.type));
=======
if ((field->is_complex_type() || col_type.is_complex_type()) && !field->has_same_complex_type(col_type)) {
return Status::InternalError(
strings::Substitute("ParquetField '$0' file's type $1 is different from table's type $2", field->name,
column_type_to_string(field->type), logical_type_to_string(col_type.type)));
>>>>>>> 1028b6ac2c ([Enhancement] Use more clear type description in ParquetField (#52575))
}
if (field->type.type == LogicalType::TYPE_ARRAY) {
if (field->type == ColumnType::ARRAY) {
std::unique_ptr<ColumnReader> child_reader;
RETURN_IF_ERROR(ColumnReader::create(opts, &field->children[0], col_type.children[0], &child_reader));
if (child_reader != nullptr) {
Expand All @@ -1132,7 +1169,7 @@ Status ColumnReader::create(const ColumnReaderOptions& opts, const ParquetField*
} else {
*output = nullptr;
}
} else if (field->type.type == LogicalType::TYPE_MAP) {
} else if (field->type == ColumnType::MAP) {
std::unique_ptr<ColumnReader> key_reader = nullptr;
std::unique_ptr<ColumnReader> value_reader = nullptr;

Expand All @@ -1150,7 +1187,7 @@ Status ColumnReader::create(const ColumnReaderOptions& opts, const ParquetField*
} else {
*output = nullptr;
}
} else if (field->type.type == LogicalType::TYPE_STRUCT) {
} else if (field->type == ColumnType::STRUCT) {
std::vector<int32_t> subfield_pos(col_type.children.size());
get_subfield_pos_with_pruned_type(*field, col_type, opts.case_sensitive, subfield_pos);

Expand Down Expand Up @@ -1186,12 +1223,13 @@ Status ColumnReader::create(const ColumnReaderOptions& opts, const ParquetField*
Status ColumnReader::create(const ColumnReaderOptions& opts, const ParquetField* field, const TypeDescriptor& col_type,
const TIcebergSchemaField* iceberg_schema_field, std::unique_ptr<ColumnReader>* output) {
// We will only set a complex type in ParquetField
if ((field->type.is_complex_type() || col_type.is_complex_type()) && (field->type.type != col_type.type)) {
return Status::InternalError(strings::Substitute("ParquetField's type $0 is different from table's type $1",
field->type.type, col_type.type));
if ((field->is_complex_type() || col_type.is_complex_type()) && !field->has_same_complex_type(col_type)) {
return Status::InternalError(
strings::Substitute("ParquetField '$0' file's type $1 is different from table's type $2", field->name,
column_type_to_string(field->type), logical_type_to_string(col_type.type)));
}
DCHECK(iceberg_schema_field != nullptr);
if (field->type.type == LogicalType::TYPE_ARRAY) {
if (field->type == ColumnType::ARRAY) {
std::unique_ptr<ColumnReader> child_reader;
const TIcebergSchemaField* element_schema = &iceberg_schema_field->children[0];
RETURN_IF_ERROR(
Expand All @@ -1203,7 +1241,7 @@ Status ColumnReader::create(const ColumnReaderOptions& opts, const ParquetField*
} else {
*output = nullptr;
}
} else if (field->type.type == LogicalType::TYPE_MAP) {
} else if (field->type == ColumnType::MAP) {
std::unique_ptr<ColumnReader> key_reader = nullptr;
std::unique_ptr<ColumnReader> value_reader = nullptr;

Expand All @@ -1226,7 +1264,7 @@ Status ColumnReader::create(const ColumnReaderOptions& opts, const ParquetField*
} else {
*output = nullptr;
}
} else if (field->type.type == LogicalType::TYPE_STRUCT) {
} else if (field->type == ColumnType::STRUCT) {
std::vector<int32_t> subfield_pos(col_type.children.size());
std::vector<const TIcebergSchemaField*> iceberg_schema_subfield(col_type.children.size());
get_subfield_pos_with_pruned_type(*field, col_type, opts.case_sensitive, iceberg_schema_field, subfield_pos,
Expand Down
6 changes: 6 additions & 0 deletions be/src/formats/parquet/group_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@
#include "exec/hdfs_scanner.h"
#include "exprs/expr.h"
#include "exprs/expr_context.h"
<<<<<<< HEAD
=======
#include "formats/parquet/metadata.h"
#include "formats/parquet/page_index_reader.h"
#include "formats/parquet/schema.h"
>>>>>>> 1028b6ac2c ([Enhancement] Use more clear type description in ParquetField (#52575))
#include "gutil/strings/substitute.h"
#include "runtime/types.h"
#include "simd/simd.h"
Expand Down
53 changes: 44 additions & 9 deletions be/src/formats/parquet/meta_helper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,34 @@ void ParquetMetaHelper::build_column_name_2_pos_in_meta(
break;
}
}
<<<<<<< HEAD
=======

// After the column is added, there is no new column when querying the previously
// imported parquet file. It is skipped here, and this column will be set to NULL
// in the FileReader::_read_min_max_chunk.
if (field == nullptr) continue;
// For field which type is complex, the filed physical_column_index in file meta is not same with the column index
// in row_group's column metas
// For example:
// table schema :
// -- col_tinyint tinyint
// -- col_struct struct
// ----- name string
// ----- age int
// file metadata schema :
// -- ParquetField(name=col_tinyint, physical_column_index=0)
// -- ParquetField(name=col_struct,physical_column_index=0,
// children=[ParquetField(name=name, physical_column_index=1),
// ParquetField(name=age, physical_column_index=2)])
// row group column metas:
// -- ColumnMetaData(path_in_schema=[col_tinyint])
// -- ColumnMetaData(path_in_schema=[col_struct, name])
// -- ColumnMetaData(path_in_schema=[col_struct, age])
if (field->is_complex_type()) continue;
// Put SlotDescriptor's origin column name here!
column_name_2_pos_in_meta.emplace(slot->col_name(), field->physical_column_index);
>>>>>>> 1028b6ac2c ([Enhancement] Use more clear type description in ParquetField (#52575))
}
}

Expand Down Expand Up @@ -66,24 +94,25 @@ bool ParquetMetaHelper::_is_valid_type(const ParquetField* parquet_field, const
}
// only check for complex type now
// if complex type has none valid subfield, we will treat this struct type as invalid type.
if (!parquet_field->type.is_complex_type()) {
if (!parquet_field->is_complex_type()) {
return true;
}

if (parquet_field->type.type != type_descriptor->type) {
// complex type mismatched
// check the complex type is matched
if (!parquet_field->has_same_complex_type(*type_descriptor)) {
return false;
}

bool has_valid_child = false;

if (parquet_field->type.is_array_type() || parquet_field->type.is_map_type()) {
if (parquet_field->type == ColumnType::ARRAY || parquet_field->type == ColumnType::MAP) {
for (size_t idx = 0; idx < parquet_field->children.size(); idx++) {
if (_is_valid_type(&parquet_field->children[idx], &type_descriptor->children[idx])) {
has_valid_child = true;
break;
}
}
<<<<<<< HEAD
} else if (parquet_field->type.is_struct_type()) {
std::unordered_map<std::string, const TypeDescriptor*> field_name_2_type{};
for (size_t idx = 0; idx < type_descriptor->children.size(); idx++) {
Expand All @@ -96,6 +125,13 @@ bool ParquetMetaHelper::_is_valid_type(const ParquetField* parquet_field, const
auto it = field_name_2_type.find(Utils::format_name(child_parquet_field.name, _case_sensitive));
if (it == field_name_2_type.end()) {
continue;
=======
} else if (parquet_field->type == ColumnType::STRUCT) {
if (!type_descriptor->field_ids.empty()) {
std::unordered_map<int32_t, const TypeDescriptor*> field_id_2_type;
for (size_t idx = 0; idx < type_descriptor->children.size(); idx++) {
field_id_2_type.emplace(type_descriptor->field_ids[idx], &type_descriptor->children[idx]);
>>>>>>> 1028b6ac2c ([Enhancement] Use more clear type description in ParquetField (#52575))
}

if (_is_valid_type(&child_parquet_field, it->second)) {
Expand All @@ -122,26 +158,25 @@ bool IcebergMetaHelper::_is_valid_type(const ParquetField* parquet_field, const
const TypeDescriptor* type_descriptor) const {
// only check for complex type now
// if complex type has none valid subfield, we will treat this struct type as invalid type.
if (!parquet_field->type.is_complex_type()) {
if (!parquet_field->is_complex_type()) {
return true;
}

if (parquet_field->type.type != type_descriptor->type) {
// complex type mismatched
if (!parquet_field->has_same_complex_type(*type_descriptor)) {
return false;
}

bool has_valid_child = false;

if (parquet_field->type.is_array_type() || parquet_field->type.is_map_type()) {
if (parquet_field->type == ColumnType::ARRAY || parquet_field->type == ColumnType::MAP) {
for (size_t idx = 0; idx < parquet_field->children.size(); idx++) {
if (_is_valid_type(&parquet_field->children[idx], &field_schema->children[idx],
&type_descriptor->children[idx])) {
has_valid_child = true;
break;
}
}
} else if (parquet_field->type.is_struct_type()) {
} else if (parquet_field->type == ColumnType::STRUCT) {
std::unordered_map<int32_t, const TIcebergSchemaField*> field_id_2_iceberg_schema{};
std::unordered_map<int32_t, const TypeDescriptor*> field_id_2_type{};
for (const auto& field : field_schema->children) {
Expand Down
61 changes: 46 additions & 15 deletions be/src/formats/parquet/schema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,35 @@

#include "formats/parquet/schema.h"

<<<<<<< HEAD
#include <boost/algorithm/string.hpp>
=======
#include <boost/algorithm/string/case_conv.hpp>
#include <memory>
#include <sstream>
#include <utility>
>>>>>>> 1028b6ac2c ([Enhancement] Use more clear type description in ParquetField (#52575))

#include "gutil/casts.h"
#include "gutil/strings/substitute.h"

namespace starrocks::parquet {

std::string column_type_to_string(const ColumnType& column_type) {
switch (column_type) {
case SCALAR:
return "scalar";
case ARRAY:
return "array";
case MAP:
return "map";
case STRUCT:
return "struct";
default:
return "unknown";
}
}

std::string LevelInfo::debug_string() const {
std::stringstream ss;
ss << "LevelInfo(max_def_level=" << max_def_level << ",max_rep_level=" << max_rep_level
Expand All @@ -30,7 +52,7 @@ std::string LevelInfo::debug_string() const {

std::string ParquetField::debug_string() const {
std::stringstream ss;
ss << "ParquetField(name=" << name << ",type=" << type.type << ",physical_type=" << physical_type
ss << "ParquetField(name=" << name << ",type=" << column_type_to_string(type) << ",physical_type=" << physical_type
<< ",physical_column_index=" << physical_column_index << ",levels_info=" << level_info.debug_string();
if (children.size() > 0) {
ss << ",children=[";
Expand All @@ -46,6 +68,23 @@ std::string ParquetField::debug_string() const {
return ss.str();
}

bool ParquetField::is_complex_type() const {
return type == ARRAY || type == MAP || type == STRUCT;
}

bool ParquetField::has_same_complex_type(const TypeDescriptor& type_descriptor) const {
// check the complex type is matched
if (type == ColumnType::ARRAY && type_descriptor.type == LogicalType::TYPE_ARRAY) {
return true;
} else if (type == ColumnType::MAP && type_descriptor.type == LogicalType::TYPE_MAP) {
return true;
} else if (type == ColumnType::STRUCT && type_descriptor.type == LogicalType::TYPE_STRUCT) {
return true;
} else {
return false;
}
}

static bool is_group(const tparquet::SchemaElement* schema) {
return schema->num_children > 0;
}
Expand Down Expand Up @@ -75,6 +114,7 @@ Status SchemaDescriptor::leaf_to_field(const tparquet::SchemaElement* t_schema,
bool is_nullable, ParquetField* field) {
field->name = t_schema->name;
field->schema_element = *t_schema;
field->type = ColumnType::SCALAR;
field->is_nullable = is_nullable;
field->physical_type = t_schema->type;
field->type_length = t_schema->type_length;
Expand Down Expand Up @@ -159,8 +199,7 @@ Status SchemaDescriptor::list_to_field(const std::vector<tparquet::SchemaElement

field->name = group_schema->name;
field->field_id = group_schema->field_id;
field->type.type = TYPE_ARRAY;
field->type.children.push_back(field->children[0].type);
field->type = ColumnType::ARRAY;
field->is_nullable = is_optional(group_schema);
field->level_info = cur_level_info;
field->level_info.immediate_repeated_ancestor_def_level = last_immediate_repeated_ancestor_def_level;
Expand Down Expand Up @@ -232,9 +271,7 @@ Status SchemaDescriptor::map_to_field(const std::vector<tparquet::SchemaElement>
field->name = group_schema->name;
// Actually, we don't need to put field_id here
field->field_id = group_schema->field_id;
field->type.type = TYPE_MAP;
field->type.children.emplace_back(key_field->type);
field->type.children.emplace_back(value_field->type);
field->type = ColumnType::MAP;
field->is_nullable = is_optional(group_schema);
field->level_info = cur_level_info;
field->level_info.immediate_repeated_ancestor_def_level = last_immediate_repeated_ancestor_def_level;
Expand All @@ -257,13 +294,7 @@ Status SchemaDescriptor::group_to_struct_field(const std::vector<tparquet::Schem
field->name = group_schema->name;
field->is_nullable = is_optional(group_schema);
field->level_info = cur_level_info;
field->type.type = TYPE_STRUCT;
for (size_t i = 0; i < num_children; i++) {
field->type.children.emplace_back(field->children[i].type);
}
for (size_t i = 0; i < num_children; i++) {
field->type.field_names.emplace_back(field->children[i].name);
}
field->type = ColumnType::STRUCT;
field->field_id = group_schema->field_id;
return Status::OK();
}
Expand All @@ -290,7 +321,7 @@ Status SchemaDescriptor::group_to_field(const std::vector<tparquet::SchemaElemen
RETURN_IF_ERROR(group_to_struct_field(t_schemas, pos, cur_level_info, &field->children[0], next_pos));

field->name = group_schema->name;
field->type.type = TYPE_ARRAY;
field->type = ColumnType::ARRAY;
field->is_nullable = false;
field->level_info = cur_level_info;
field->level_info.immediate_repeated_ancestor_def_level = last_immediate_repeated_ancestor_def_level;
Expand Down Expand Up @@ -329,7 +360,7 @@ Status SchemaDescriptor::node_to_field(const std::vector<tparquet::SchemaElement
RETURN_IF_ERROR(leaf_to_field(node_schema, cur_level_info, false, child));

field->name = node_schema->name;
field->type.type = TYPE_ARRAY;
field->type = ColumnType::ARRAY;
field->is_nullable = false;
field->field_id = node_schema->field_id;
field->level_info = cur_level_info;
Expand Down
Loading

0 comments on commit 19b62dc

Please sign in to comment.