Skip to content

Commit

Permalink
[Enhancement] Support merge invalid columns with null in files()
Browse files Browse the repository at this point in the history
Signed-off-by: wyb <[email protected]>
  • Loading branch information
wyb committed Nov 5, 2024
1 parent 6e5e594 commit 5f662ce
Show file tree
Hide file tree
Showing 12 changed files with 376 additions and 9 deletions.
3 changes: 3 additions & 0 deletions be/src/exec/orc_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ Status ORCScanner::open() {
RETURN_IF_ERROR(_orc_reader->set_timezone(_state->timezone()));
_orc_reader->set_runtime_state(_state);
_orc_reader->set_case_sensitive(_case_sensitive);
if (_scan_range.params.__isset.flexible_column_mapping && _scan_range.params.flexible_column_mapping) {
_orc_reader->set_invalid_as_null(true);
}
RETURN_IF_ERROR(_open_next_orc_reader());

return Status::OK();
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/parquet_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ Status ParquetReaderWrap::column_indices(const std::vector<SlotDescriptor*>& tup
for (auto index : iter->second) {
_parquet_column_ids.emplace_back(index);
}
} else {
} else if (!_invalid_as_null) {
std::stringstream str_error;
str_error << "Column: " << slot_desc->col_name() << " is not found in file: " << _filename;
LOG(WARNING) << str_error.str();
Expand Down
3 changes: 3 additions & 0 deletions be/src/exec/parquet_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ class ParquetReaderWrap {
int64_t num_rows() { return _num_rows; }

Status get_schema(std::vector<SlotDescriptor>* schema);
void set_invalid_as_null(bool invalid_as_null) { _invalid_as_null = invalid_as_null; }

private:
Status column_indices(const std::vector<SlotDescriptor*>& tuple_slot_descs);
Expand Down Expand Up @@ -107,6 +108,8 @@ class ParquetReaderWrap {
int64_t _read_size;

std::string _filename;

bool _invalid_as_null{false};
};

// Reader of broker parquet file
Expand Down
26 changes: 18 additions & 8 deletions be/src/exec/parquet_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,17 +77,21 @@ Status ParquetScanner::initialize_src_chunk(ChunkPtr* chunk) {
SCOPED_RAW_TIMER(&_counter->init_chunk_ns);
_pool.clear();
(*chunk) = std::make_shared<Chunk>();
size_t column_pos = 0;
_chunk_filter.clear();
for (auto i = 0; i < _num_of_columns_from_file; ++i) {
SlotDescriptor* slot_desc = _src_slot_descriptors[i];
if (slot_desc == nullptr) {
continue;
}
auto* array = _batch->column(column_pos++).get();
ColumnPtr column;
RETURN_IF_ERROR(new_column(array->type().get(), slot_desc, &column, _conv_funcs[i].get(), &_cast_exprs[i],
_pool, _strict_mode));
auto array_ptr = _batch->GetColumnByName(slot_desc->col_name());
if (array_ptr == nullptr) {
_cast_exprs[i] = _pool.add(new ColumnRef(slot_desc));
column = ColumnHelper::create_column(slot_desc->type(), slot_desc->is_nullable());
} else {
RETURN_IF_ERROR(new_column(array_ptr->type().get(), slot_desc, &column, _conv_funcs[i].get(),
&_cast_exprs[i], _pool, _strict_mode));
}
column->reserve(_max_chunk_size);
(*chunk)->append_column(column, slot_desc->id());
}
Expand All @@ -98,18 +102,21 @@ Status ParquetScanner::append_batch_to_src_chunk(ChunkPtr* chunk) {
SCOPED_RAW_TIMER(&_counter->fill_ns);
size_t num_elements =
std::min<size_t>((_max_chunk_size - _chunk_start_idx), (_batch->num_rows() - _batch_start_idx));
size_t column_pos = 0;
_chunk_filter.resize(_chunk_filter.size() + num_elements, 1);
for (auto i = 0; i < _num_of_columns_from_file; ++i) {
SlotDescriptor* slot_desc = _src_slot_descriptors[i];
if (slot_desc == nullptr) {
continue;
}
_conv_ctx.current_slot = slot_desc;
auto* array = _batch->column(column_pos++).get();
auto& column = (*chunk)->get_column_by_slot_id(slot_desc->id());
RETURN_IF_ERROR(convert_array_to_column(_conv_funcs[i].get(), num_elements, array, column, _batch_start_idx,
_chunk_start_idx, &_chunk_filter, &_conv_ctx));
auto array_ptr = _batch->GetColumnByName(slot_desc->col_name());
if (array_ptr == nullptr) {
(void)column->append_nulls(_batch->num_rows());
} else {
RETURN_IF_ERROR(convert_array_to_column(_conv_funcs[i].get(), num_elements, array_ptr.get(), column,
_batch_start_idx, _chunk_start_idx, &_chunk_filter, &_conv_ctx));
}
}

_chunk_start_idx += num_elements;
Expand Down Expand Up @@ -460,6 +467,9 @@ Status ParquetScanner::open_next_reader() {
auto parquet_file = std::make_shared<ParquetChunkFile>(file, 0, _counter);
auto parquet_reader = std::make_shared<ParquetReaderWrap>(std::move(parquet_file), _num_of_columns_from_file,
range_desc.start_offset, range_desc.size);
if (_scan_range.params.__isset.flexible_column_mapping && _scan_range.params.flexible_column_mapping) {
parquet_reader->set_invalid_as_null(true);
}
_next_file++;
int64_t file_size;
RETURN_IF_ERROR(parquet_reader->size(&file_size));
Expand Down
3 changes: 3 additions & 0 deletions be/src/formats/orc/orc_chunk_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,9 @@ Status OrcChunkReader::init(std::unique_ptr<orc::Reader> reader, const OrcPredic
return Status::InternalError(s);
}

// _batch can't be reused because the schema between files may be different
_batch.reset();

// TODO(SmithCruise) delete _init_position_in_orc() when develop subfield lazy load.
RETURN_IF_ERROR(_init_position_in_orc());
RETURN_IF_ERROR(_init_cast_exprs());
Expand Down
115 changes: 115 additions & 0 deletions test/sql/test_files/R/test_orc_files_merge
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
-- name: test_orc_files_merge

create database db_${uuid0};
use db_${uuid0};

shell: ossutil64 mkdir oss://${oss_bucket}/test_files/orc_format/${uuid0} >/dev/null || echo "exit 0" >/dev/null

shell: ossutil64 cp --force ./sql/test_files/orc_format/basic_type.orc oss://${oss_bucket}/test_files/orc_format/${uuid0}/ | grep -Pv "(average|elapsed)"
-- result:
0

Succeed: Total num: 1, size: 1,027. OK num: 1(upload 1 files).
-- !result

shell: ossutil64 cp --force ./sql/test_files/orc_format/basic_type_k2k5k7.orc oss://${oss_bucket}/test_files/orc_format/${uuid0}/ | grep -Pv "(average|elapsed)"
-- result:
0

Succeed: Total num: 1, size: 434. OK num: 1(upload 1 files).
-- !result


desc files(
"path" = "oss://${oss_bucket}/test_files/orc_format/${uuid0}/basic_type.orc",
"format" = "orc",
"aws.s3.access_key" = "${oss_ak}",
"aws.s3.secret_key" = "${oss_sk}",
"aws.s3.endpoint" = "${oss_endpoint}");
-- result:
k1 boolean YES
k2 int YES
k3 bigint YES
k4 decimal(10,2) YES
k5 date YES
k6 datetime YES
k7 varchar(1048576) YES
k8 double YES
-- !result

desc files(
"path" = "oss://${oss_bucket}/test_files/orc_format/${uuid0}/basic_type_k2k5k7.orc",
"format" = "orc",
"aws.s3.access_key" = "${oss_ak}",
"aws.s3.secret_key" = "${oss_sk}",
"aws.s3.endpoint" = "${oss_endpoint}");
-- result:
k2 int YES
k5 date YES
k7 varchar(1048576) YES
-- !result


select * from files(
"path" = "oss://${oss_bucket}/test_files/orc_format/${uuid0}/basic_type.orc",
"format" = "orc",
"aws.s3.access_key" = "${oss_ak}",
"aws.s3.secret_key" = "${oss_sk}",
"aws.s3.endpoint" = "${oss_endpoint}");
-- result:
0 1 2 3.20 2024-10-01 2024-10-01 12:12:12 a 4.3
1 11 12 13.20 2024-10-02 2024-10-02 13:13:13 b 14.3
-- !result

select * from files(
"path" = "oss://${oss_bucket}/test_files/orc_format/${uuid0}/basic_type_k2k5k7.orc",
"format" = "orc",
"aws.s3.access_key" = "${oss_ak}",
"aws.s3.secret_key" = "${oss_sk}",
"aws.s3.endpoint" = "${oss_endpoint}");
-- result:
21 2024-10-03 c
-- !result


select * from files(
"path" = "oss://${oss_bucket}/test_files/orc_format/${uuid0}/*",
"format" = "orc",
"auto_detect_sample_files" = "2",
"aws.s3.access_key" = "${oss_ak}",
"aws.s3.secret_key" = "${oss_sk}",
"aws.s3.endpoint" = "${oss_endpoint}");
-- result:
None 21 None None 2024-10-03 None c None
0 1 2 3.20 2024-10-01 2024-10-01 12:12:12 a 4.3
1 11 12 13.20 2024-10-02 2024-10-02 13:13:13 b 14.3
-- !result

select k2, k5, k7 from files(
"path" = "oss://${oss_bucket}/test_files/orc_format/${uuid0}/*",
"format" = "orc",
"auto_detect_sample_files" = "2",
"aws.s3.access_key" = "${oss_ak}",
"aws.s3.secret_key" = "${oss_sk}",
"aws.s3.endpoint" = "${oss_endpoint}");
-- result:
21 2024-10-03 c
1 2024-10-01 a
11 2024-10-02 b
-- !result

select k1, k3, k8 from files(
"path" = "oss://${oss_bucket}/test_files/orc_format/${uuid0}/*",
"format" = "orc",
"auto_detect_sample_files" = "2",
"aws.s3.access_key" = "${oss_ak}",
"aws.s3.secret_key" = "${oss_sk}",
"aws.s3.endpoint" = "${oss_endpoint}");
-- result:
None None None
0 2 4.3
1 12 14.3
-- !result


shell: ossutil64 rm -rf oss://${oss_bucket}/test_files/orc_format/${uuid0}/ > /dev/null
115 changes: 115 additions & 0 deletions test/sql/test_files/R/test_parquet_files_merge
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
-- name: test_parquet_files_merge

create database db_${uuid0};
use db_${uuid0};

shell: ossutil64 mkdir oss://${oss_bucket}/test_files/parquet_format/${uuid0} >/dev/null || echo "exit 0" >/dev/null

shell: ossutil64 cp --force ./sql/test_files/parquet_format/basic_type.parquet oss://${oss_bucket}/test_files/parquet_format/${uuid0}/ | grep -Pv "(average|elapsed)"
-- result:
0

Succeed: Total num: 1, size: 2,281. OK num: 1(upload 1 files).
-- !result

shell: ossutil64 cp --force ./sql/test_files/parquet_format/basic_type_k2k5k7.parquet oss://${oss_bucket}/test_files/parquet_format/${uuid0}/ | grep -Pv "(average|elapsed)"
-- result:
0

Succeed: Total num: 1, size: 836. OK num: 1(upload 1 files).
-- !result


desc files(
"path" = "oss://${oss_bucket}/test_files/parquet_format/${uuid0}/basic_type.parquet",
"format" = "parquet",
"aws.s3.access_key" = "${oss_ak}",
"aws.s3.secret_key" = "${oss_sk}",
"aws.s3.endpoint" = "${oss_endpoint}");
-- result:
k1 boolean YES
k2 int YES
k3 bigint YES
k4 decimal(10,2) YES
k5 date YES
k6 datetime YES
k7 varchar(1048576) YES
k8 double YES
-- !result

desc files(
"path" = "oss://${oss_bucket}/test_files/parquet_format/${uuid0}/basic_type_k2k5k7.parquet",
"format" = "parquet",
"aws.s3.access_key" = "${oss_ak}",
"aws.s3.secret_key" = "${oss_sk}",
"aws.s3.endpoint" = "${oss_endpoint}");
-- result:
k2 int YES
k5 date YES
k7 varchar(1048576) YES
-- !result


select * from files(
"path" = "oss://${oss_bucket}/test_files/parquet_format/${uuid0}/basic_type.parquet",
"format" = "parquet",
"aws.s3.access_key" = "${oss_ak}",
"aws.s3.secret_key" = "${oss_sk}",
"aws.s3.endpoint" = "${oss_endpoint}");
-- result:
0 1 2 3.20 2024-10-01 2024-10-01 12:12:12 a 4.3
1 11 12 13.20 2024-10-02 2024-10-02 13:13:13 b 14.3
-- !result

select * from files(
"path" = "oss://${oss_bucket}/test_files/parquet_format/${uuid0}/basic_type_k2k5k7.parquet",
"format" = "parquet",
"aws.s3.access_key" = "${oss_ak}",
"aws.s3.secret_key" = "${oss_sk}",
"aws.s3.endpoint" = "${oss_endpoint}");
-- result:
21 2024-10-03 c
-- !result


select * from files(
"path" = "oss://${oss_bucket}/test_files/parquet_format/${uuid0}/*",
"format" = "parquet",
"auto_detect_sample_files" = "2",
"aws.s3.access_key" = "${oss_ak}",
"aws.s3.secret_key" = "${oss_sk}",
"aws.s3.endpoint" = "${oss_endpoint}");
-- result:
None 21 None None 2024-10-03 None c None
0 1 2 3.20 2024-10-01 2024-10-01 12:12:12 a 4.3
1 11 12 13.20 2024-10-02 2024-10-02 13:13:13 b 14.3
-- !result

select k2, k5, k7 from files(
"path" = "oss://${oss_bucket}/test_files/parquet_format/${uuid0}/*",
"format" = "parquet",
"auto_detect_sample_files" = "2",
"aws.s3.access_key" = "${oss_ak}",
"aws.s3.secret_key" = "${oss_sk}",
"aws.s3.endpoint" = "${oss_endpoint}");
-- result:
21 2024-10-03 c
1 2024-10-01 a
11 2024-10-02 b
-- !result

select k1, k3, k8 from files(
"path" = "oss://${oss_bucket}/test_files/parquet_format/${uuid0}/*",
"format" = "parquet",
"auto_detect_sample_files" = "2",
"aws.s3.access_key" = "${oss_ak}",
"aws.s3.secret_key" = "${oss_sk}",
"aws.s3.endpoint" = "${oss_endpoint}");
-- result:
None None None
0 2 4.3
1 12 14.3
-- !result


shell: ossutil64 rm -rf oss://${oss_bucket}/test_files/parquet_format/${uuid0}/ > /dev/null
59 changes: 59 additions & 0 deletions test/sql/test_files/T/test_orc_files_merge
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
-- name: test_orc_files_merge

create database db_${uuid0};
use db_${uuid0};

shell: ossutil64 mkdir oss://${oss_bucket}/test_files/orc_format/${uuid0} >/dev/null || echo "exit 0" >/dev/null

shell: ossutil64 cp --force ./sql/test_files/orc_format/basic_type.orc oss://${oss_bucket}/test_files/orc_format/${uuid0}/ | grep -Pv "(average|elapsed)"
shell: ossutil64 cp --force ./sql/test_files/orc_format/basic_type_k2k5k7.orc oss://${oss_bucket}/test_files/orc_format/${uuid0}/ | grep -Pv "(average|elapsed)"

desc files(
"path" = "oss://${oss_bucket}/test_files/orc_format/${uuid0}/basic_type.orc",
"format" = "orc",
"aws.s3.access_key" = "${oss_ak}",
"aws.s3.secret_key" = "${oss_sk}",
"aws.s3.endpoint" = "${oss_endpoint}");
desc files(
"path" = "oss://${oss_bucket}/test_files/orc_format/${uuid0}/basic_type_k2k5k7.orc",
"format" = "orc",
"aws.s3.access_key" = "${oss_ak}",
"aws.s3.secret_key" = "${oss_sk}",
"aws.s3.endpoint" = "${oss_endpoint}");

select * from files(
"path" = "oss://${oss_bucket}/test_files/orc_format/${uuid0}/basic_type.orc",
"format" = "orc",
"aws.s3.access_key" = "${oss_ak}",
"aws.s3.secret_key" = "${oss_sk}",
"aws.s3.endpoint" = "${oss_endpoint}");
select * from files(
"path" = "oss://${oss_bucket}/test_files/orc_format/${uuid0}/basic_type_k2k5k7.orc",
"format" = "orc",
"aws.s3.access_key" = "${oss_ak}",
"aws.s3.secret_key" = "${oss_sk}",
"aws.s3.endpoint" = "${oss_endpoint}");

select * from files(
"path" = "oss://${oss_bucket}/test_files/orc_format/${uuid0}/*",
"format" = "orc",
"auto_detect_sample_files" = "2",
"aws.s3.access_key" = "${oss_ak}",
"aws.s3.secret_key" = "${oss_sk}",
"aws.s3.endpoint" = "${oss_endpoint}");
select k2, k5, k7 from files(
"path" = "oss://${oss_bucket}/test_files/orc_format/${uuid0}/*",
"format" = "orc",
"auto_detect_sample_files" = "2",
"aws.s3.access_key" = "${oss_ak}",
"aws.s3.secret_key" = "${oss_sk}",
"aws.s3.endpoint" = "${oss_endpoint}");
select k1, k3, k8 from files(
"path" = "oss://${oss_bucket}/test_files/orc_format/${uuid0}/*",
"format" = "orc",
"auto_detect_sample_files" = "2",
"aws.s3.access_key" = "${oss_ak}",
"aws.s3.secret_key" = "${oss_sk}",
"aws.s3.endpoint" = "${oss_endpoint}");

shell: ossutil64 rm -rf oss://${oss_bucket}/test_files/orc_format/${uuid0}/ > /dev/null
Loading

0 comments on commit 5f662ce

Please sign in to comment.