From 66b78c438226efb22ac03aed47d820118ad28191 Mon Sep 17 00:00:00 2001 From: Youngwb Date: Thu, 12 Dec 2024 15:38:26 +0800 Subject: [PATCH] [Feature] Support query deletion vector for delta lake (#53766) Signed-off-by: Youngwb --- be/src/connector/CMakeLists.txt | 1 + .../deletion_vector/deletion_vector.cpp | 170 ++++++++++++++++++ .../deletion_vector/deletion_vector.h | 57 ++++++ be/src/connector/hive_connector.cpp | 6 + be/src/exec/hdfs_scanner.cpp | 5 +- be/src/exec/hdfs_scanner.h | 4 + be/src/exec/hdfs_scanner_parquet.cpp | 4 + be/src/util/CMakeLists.txt | 1 + be/src/util/base85.cpp | 77 ++++++++ be/src/util/base85.h | 23 +++ be/test/CMakeLists.txt | 2 + .../deletion_vector/deletion_vector_test.cpp | 65 +++++++ be/test/util/base85_test.cpp | 35 ++++ .../delta/DeltaConnectorScanRangeSource.java | 12 ++ .../connector/delta/DeltaLakeMetadata.java | 13 +- .../connector/delta/FileScanTask.java | 12 +- .../connector/delta/ScanFileUtils.java | 7 +- gensrc/thrift/PlanNodes.thrift | 11 ++ test/sql/test_deltalake/R/test_deltalake_dv | 25 +++ test/sql/test_deltalake/T/test_deltalake_dv | 15 ++ 20 files changed, 528 insertions(+), 17 deletions(-) create mode 100644 be/src/connector/deletion_vector/deletion_vector.cpp create mode 100644 be/src/connector/deletion_vector/deletion_vector.h create mode 100644 be/src/util/base85.cpp create mode 100644 be/src/util/base85.h create mode 100644 be/test/connector/deletion_vector/deletion_vector_test.cpp create mode 100644 be/test/util/base85_test.cpp create mode 100644 test/sql/test_deltalake/R/test_deltalake_dv create mode 100644 test/sql/test_deltalake/T/test_deltalake_dv diff --git a/be/src/connector/CMakeLists.txt b/be/src/connector/CMakeLists.txt index 5db1d68ab0c2e..f1c586f6bff08 100644 --- a/be/src/connector/CMakeLists.txt +++ b/be/src/connector/CMakeLists.txt @@ -31,4 +31,5 @@ add_library(Connector STATIC utils.cpp async_flush_stream_poller.cpp sink_memory_manager.cpp + deletion_vector/deletion_vector.cpp ) diff --git a/be/src/connector/deletion_vector/deletion_vector.cpp b/be/src/connector/deletion_vector/deletion_vector.cpp new file mode 100644 index 0000000000000..b24835af2399e --- /dev/null +++ b/be/src/connector/deletion_vector/deletion_vector.cpp @@ -0,0 +1,170 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "deletion_vector.h" + +#include + +#include "util/base85.h" +#include "util/uuid_generator.h" + +namespace starrocks { + +Status starrocks::DeletionVector::fill_row_indexes(std::set* need_skip_rowids) { + if (_deletion_vector_descriptor->__isset.cardinality && _deletion_vector_descriptor->__isset.cardinality == 0) { + return Status::OK(); + } else if (is_inline()) { + return deserialized_inline_dv(_deletion_vector_descriptor->pathOrInlineDv, need_skip_rowids); + } else { + std::shared_ptr shared_buffered_input_stream = nullptr; + std::shared_ptr cache_input_stream = nullptr; + HdfsScanStats app_scan_stats; + HdfsScanStats fs_scan_stats; + + ASSIGN_OR_RETURN(auto path, get_absolute_path(_params.table_location)); + int64_t offset = _deletion_vector_descriptor->offset; + int64_t length = _deletion_vector_descriptor->sizeInBytes; + + ASSIGN_OR_RETURN(auto dv_file, open_random_access_file(path, fs_scan_stats, app_scan_stats, + shared_buffered_input_stream, cache_input_stream)); + // Check the dv size + uint32_t size_from_deletion_vector_file; + RETURN_IF_ERROR(dv_file->read_at_fully(offset, &size_from_deletion_vector_file, DV_SIZE_LENGTH)); + offset += DV_SIZE_LENGTH; + // the size_from_deletion_vector_file is big endian byte order + if (LittleEndian::IsLittleEndian()) { + size_from_deletion_vector_file = BigEndian::ToHost32(size_from_deletion_vector_file); + } + + if (size_from_deletion_vector_file != length) { + std::stringstream ss; + ss << "DV size mismatch, expected : " << length << " , actual : " << size_from_deletion_vector_file; + return Status::RuntimeError(ss.str()); + } + + // Check the correctness of magic number + uint32_t magic_number_from_deletion_vector_file; + RETURN_IF_ERROR(dv_file->read_at_fully(offset, &magic_number_from_deletion_vector_file, MAGIC_NUMBER_LENGTH)); + // magic_number_from_deletion_vector_file is little endian byte order + if (!LittleEndian::IsLittleEndian()) { + magic_number_from_deletion_vector_file = LittleEndian::ToHost32(magic_number_from_deletion_vector_file); + } + offset += MAGIC_NUMBER_LENGTH; + + int64_t serialized_bitmap_length = length - MAGIC_NUMBER_LENGTH; + std::unique_ptr deletion_vector(new char[serialized_bitmap_length]); + RETURN_IF_ERROR(dv_file->read_at_fully(offset, deletion_vector.get(), serialized_bitmap_length)); + + return deserialized_deletion_vector(magic_number_from_deletion_vector_file, std::move(deletion_vector), + serialized_bitmap_length, need_skip_rowids); + } +} + +StatusOr> DeletionVector::open_random_access_file( + const std::string& file_path, HdfsScanStats& fs_scan_stats, HdfsScanStats& app_scan_stats, + std::shared_ptr& shared_buffered_input_stream, + std::shared_ptr& cache_input_stream) const { + const OpenFileOptions options{.fs = _params.fs, + .path = file_path, + .fs_stats = &fs_scan_stats, + .app_stats = &app_scan_stats, + .datacache_options = _params.datacache_options}; + ASSIGN_OR_RETURN(auto file, + HdfsScanner::create_random_access_file(shared_buffered_input_stream, cache_input_stream, options)); + std::vector io_ranges{}; + int64_t offset = _deletion_vector_descriptor->offset; + int64_t length = _deletion_vector_descriptor->sizeInBytes + DV_SIZE_LENGTH; + while (offset < length) { + const int64_t remain_length = std::min(static_cast(config::io_coalesce_read_max_buffer_size), length); + io_ranges.emplace_back(offset, remain_length); + offset += remain_length; + } + + RETURN_IF_ERROR(shared_buffered_input_stream->set_io_ranges(io_ranges)); + return file; +} + +Status DeletionVector::deserialized_inline_dv(std::string& encoded_bitmap_data, + std::set* need_skip_rowids) const { + ASSIGN_OR_RETURN(auto decoded_bitmap_data, base85_decode(encoded_bitmap_data)); + uint32_t inline_magic_number; + memcpy(&inline_magic_number, decoded_bitmap_data.data(), DeletionVector::MAGIC_NUMBER_LENGTH); + + int64_t serialized_bitmap_length = decoded_bitmap_data.size() - MAGIC_NUMBER_LENGTH; + std::unique_ptr deletion_vector(new char[serialized_bitmap_length]); + memcpy(deletion_vector.get(), decoded_bitmap_data.data() + MAGIC_NUMBER_LENGTH, serialized_bitmap_length); + + return deserialized_deletion_vector(inline_magic_number, std::move(deletion_vector), serialized_bitmap_length, + need_skip_rowids); +} + +Status DeletionVector::deserialized_deletion_vector(uint32_t magic_number, std::unique_ptr serialized_dv, + int64_t serialized_bitmap_length, + std::set* need_skip_rowids) const { + if (magic_number != MAGIC_NUMBER) { + std::stringstream ss; + ss << "Unexpected magic number : " << magic_number; + return Status::RuntimeError(ss.str()); + } + + // Construct the roaring bitmap of corresponding deletion vector + roaring64_bitmap_t* bitmap = + roaring64_bitmap_portable_deserialize_safe(serialized_dv.get(), serialized_bitmap_length); + if (bitmap == nullptr) { + return Status::RuntimeError("deserialize roaring64 bitmap error"); + } + + // Construct _need_skip_rowids from bitmap + uint64_t bitmap_cardinality = roaring64_bitmap_get_cardinality(bitmap); + std::unique_ptr bitmap_array(new uint64_t[bitmap_cardinality]); + roaring64_bitmap_to_uint64_array(bitmap, bitmap_array.get()); + need_skip_rowids->insert(bitmap_array.get(), bitmap_array.get() + bitmap_cardinality); + + roaring64_bitmap_free(bitmap); + return Status::OK(); +} + +StatusOr DeletionVector::get_absolute_path(const std::string& table_location) const { + std::string& storage_type = _deletion_vector_descriptor->storageType; + std::string& path = _deletion_vector_descriptor->pathOrInlineDv; + if (storage_type == "u") { + uint32_t random_prefix_len = path.length() - ENCODED_UUID_LENGTH; + std::string random_prefix = path.substr(0, random_prefix_len); + std::string encoded_uuid = path.substr(random_prefix_len); + ASSIGN_OR_RETURN(auto decoded_uuid, base85_decode(encoded_uuid)); + if (decoded_uuid.length() < 16) { + return Status::RuntimeError("decoded uuid length less than 16"); + } + boost::uuids::uuid uuid{}; + memcpy(uuid.data, decoded_uuid.data(), 8); + memcpy(uuid.data + 8, decoded_uuid.data() + 8, 8); + return assemble_deletion_vector_path(table_location, boost::uuids::to_string(uuid), random_prefix); + } else if (storage_type == "p") { + return path; + } else { + return Status::RuntimeError(fmt::format("unsupported storage type {}", storage_type)); + } +} + +std::string DeletionVector::assemble_deletion_vector_path(const string& table_location, string&& uuid, + string& prefix) const { + std::string file_name = fmt::format("deletion_vector_{}.bin", uuid); + if (prefix.empty()) { + return fmt::format("{}/{}", table_location, file_name); + } else { + return fmt::format("{}/{}/{}", table_location, prefix, file_name); + } +} + +} // namespace starrocks diff --git a/be/src/connector/deletion_vector/deletion_vector.h b/be/src/connector/deletion_vector/deletion_vector.h new file mode 100644 index 0000000000000..7fb241b34b76c --- /dev/null +++ b/be/src/connector/deletion_vector/deletion_vector.h @@ -0,0 +1,57 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include "common/status.h" +#include "exec/hdfs_scanner.h" +#include "fs/fs.h" + +namespace starrocks { + +class DeletionVector { +public: + DeletionVector(const HdfsScannerParams& scanner_params) + : _deletion_vector_descriptor(scanner_params.deletion_vector_descriptor), _params(scanner_params) {} + + Status fill_row_indexes(std::set* need_skip_rowids); + Status deserialized_inline_dv(std::string& encoded_bitmap_data, std::set* need_skip_rowids) const; + StatusOr get_absolute_path(const std::string& table_location) const; + + const bool is_inline() { + return _deletion_vector_descriptor->__isset.storageType && _deletion_vector_descriptor->storageType == "i"; + } + + static const int32_t DV_SIZE_LENGTH = 4; + static const int32_t MAGIC_NUMBER_LENGTH = 4; + static const uint32_t MAGIC_NUMBER = 1681511377; + // UUIDs always encode into 20 characters. + static const uint32_t ENCODED_UUID_LENGTH = 20; + +private: + StatusOr> open_random_access_file( + const std::string& file_path, HdfsScanStats& fs_scan_stats, HdfsScanStats& app_scan_stats, + std::shared_ptr& shared_buffered_input_stream, + std::shared_ptr& cache_input_stream) const; + + Status deserialized_deletion_vector(uint32_t magic_number, std::unique_ptr serialized_dv, + int64_t serialized_bitmap_length, std::set* need_skip_rowids) const; + + std::string assemble_deletion_vector_path(const std::string& table_location, std::string&& uuid, + std::string& prefix) const; + + const std::shared_ptr _deletion_vector_descriptor; + const HdfsScannerParams& _params; +}; +} // namespace starrocks diff --git a/be/src/connector/hive_connector.cpp b/be/src/connector/hive_connector.cpp index 09bd40fe7935c..28083fe3a8443 100644 --- a/be/src/connector/hive_connector.cpp +++ b/be/src/connector/hive_connector.cpp @@ -612,6 +612,7 @@ Status HiveDataSource::_init_scanner(RuntimeState* state) { scanner_params.fs = _pool.add(fs.release()); scanner_params.path = native_file_path; scanner_params.file_size = _scan_range.file_length; + scanner_params.table_location = _hive_table->get_base_path(); scanner_params.tuple_desc = _tuple_desc; scanner_params.materialize_slots = _materialize_slots; scanner_params.materialize_index_in_chunk = _materialize_index_in_chunk; @@ -645,6 +646,11 @@ Status HiveDataSource::_init_scanner(RuntimeState* state) { scanner_params.deletes.emplace_back(&delete_file); } + if (scan_range.__isset.deletion_vector_descriptor) { + scanner_params.deletion_vector_descriptor = + std::make_shared(scan_range.deletion_vector_descriptor); + } + if (scan_range.__isset.paimon_deletion_file && !scan_range.paimon_deletion_file.path.empty()) { scanner_params.paimon_deletion_file = std::make_shared(scan_range.paimon_deletion_file); } diff --git a/be/src/exec/hdfs_scanner.cpp b/be/src/exec/hdfs_scanner.cpp index 5b9157be052d2..8e2a9d710c6a1 100644 --- a/be/src/exec/hdfs_scanner.cpp +++ b/be/src/exec/hdfs_scanner.cpp @@ -243,7 +243,10 @@ StatusOr> HdfsScanner::create_random_access_fi std::shared_ptr& shared_buffered_input_stream, std::shared_ptr& cache_input_stream, const OpenFileOptions& options) { ASSIGN_OR_RETURN(std::unique_ptr raw_file, options.fs->new_random_access_file(options.path)) - const int64_t file_size = options.file_size; + int64_t file_size = options.file_size; + if (file_size < 0) { + ASSIGN_OR_RETURN(file_size, raw_file->stream()->get_size()); + } raw_file->set_size(file_size); const std::string& filename = raw_file->filename(); diff --git a/be/src/exec/hdfs_scanner.h b/be/src/exec/hdfs_scanner.h index cb3e392244e8a..a36018312f974 100644 --- a/be/src/exec/hdfs_scanner.h +++ b/be/src/exec/hdfs_scanner.h @@ -178,6 +178,8 @@ struct HdfsScannerParams { std::string path; // The file size. -1 means unknown. int64_t file_size = -1; + // the table location + std::string table_location; const TupleDescriptor* tuple_desc = nullptr; @@ -213,6 +215,8 @@ struct HdfsScannerParams { std::vector deletes; + std::shared_ptr deletion_vector_descriptor = nullptr; + const TIcebergSchema* iceberg_schema = nullptr; bool is_lazy_materialization_slot(SlotId slot_id) const; diff --git a/be/src/exec/hdfs_scanner_parquet.cpp b/be/src/exec/hdfs_scanner_parquet.cpp index b45fee41fd1eb..edf49223b66a9 100644 --- a/be/src/exec/hdfs_scanner_parquet.cpp +++ b/be/src/exec/hdfs_scanner_parquet.cpp @@ -14,6 +14,7 @@ #include "exec/hdfs_scanner_parquet.h" +#include "connector/deletion_vector/deletion_vector.h" #include "exec/hdfs_scanner.h" #include "exec/iceberg/iceberg_delete_builder.h" #include "exec/paimon/paimon_delete_file_builder.h" @@ -45,6 +46,9 @@ Status HdfsParquetScanner::do_init(RuntimeState* runtime_state, const HdfsScanne std::unique_ptr paimon_delete_file_builder( new PaimonDeleteFileBuilder(scanner_params.fs, &_need_skip_rowids)); RETURN_IF_ERROR(paimon_delete_file_builder->build(scanner_params.paimon_deletion_file.get())); + } else if (scanner_params.deletion_vector_descriptor != nullptr) { + std::unique_ptr dv = std::make_unique(scanner_params); + RETURN_IF_ERROR(dv->fill_row_indexes(&_need_skip_rowids)); } return Status::OK(); } diff --git a/be/src/util/CMakeLists.txt b/be/src/util/CMakeLists.txt index 2ee5b6effb60a..80853866a9440 100644 --- a/be/src/util/CMakeLists.txt +++ b/be/src/util/CMakeLists.txt @@ -24,6 +24,7 @@ set(UTIL_FILES arrow/utils.cpp await.cpp bfd_parser.cpp + base85.cpp compression/block_compression.cpp compression/compression_context_pool_singletons.cpp compression/stream_compression.cpp diff --git a/be/src/util/base85.cpp b/be/src/util/base85.cpp new file mode 100644 index 0000000000000..da7b1df0b06bb --- /dev/null +++ b/be/src/util/base85.cpp @@ -0,0 +1,77 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// The code in this file was modified from https://github.com/artemkin/z85/blob/master/src/z85.c +// Copyright 2013 Stanislav Artemkin . + +#include "base85.h" + +typedef unsigned char byte; + +static byte base256[] = {0x00, 0x44, 0x00, 0x54, 0x53, 0x52, 0x48, 0x00, 0x4B, 0x4C, 0x46, 0x41, 0x00, 0x3F, + 0x3E, 0x45, 0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x40, 0x00, + 0x49, 0x42, 0x4A, 0x47, 0x51, 0x24, 0x25, 0x26, 0x27, 0x28, 0x29, 0x2A, 0x2B, 0x2C, + 0x2D, 0x2E, 0x2F, 0x30, 0x31, 0x32, 0x33, 0x34, 0x35, 0x36, 0x37, 0x38, 0x39, 0x3A, + 0x3B, 0x3C, 0x3D, 0x4D, 0x00, 0x4E, 0x43, 0x00, 0x00, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, + 0x0F, 0x10, 0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18, 0x19, 0x1A, 0x1B, 0x1C, + 0x1D, 0x1E, 0x1F, 0x20, 0x21, 0x22, 0x23, 0x4F, 0x00, 0x50, 0x00, 0x00}; + +namespace starrocks { + +char* base85_decode_impl(const char* source, const char* sourceEnd, char* dest) { + byte* src = (byte*)source; + byte* end = (byte*)sourceEnd; + byte* dst = (byte*)dest; + uint32_t value; + + for (; src != end; src += 5, dst += 4) { + value = base256[(src[0] - 32) & 127]; + value = value * 85 + base256[(src[1] - 32) & 127]; + value = value * 85 + base256[(src[2] - 32) & 127]; + value = value * 85 + base256[(src[3] - 32) & 127]; + value = value * 85 + base256[(src[4] - 32) & 127]; + + // pack big-endian frame + dst[0] = value >> 24; + dst[1] = (byte)(value >> 16); + dst[2] = (byte)(value >> 8); + dst[3] = (byte)(value); + } + + return (char*)dst; +} + +StatusOr base85_decode(const std::string& source) { + if (source.empty()) { + return Status::RuntimeError("base85 encoded source is empty"); + } + + size_t input_size = source.size(); + if (input_size % 5) { + return Status::RuntimeError("base85 encoded source size error"); + } + + std::string buf; + size_t buf_size = input_size * 4 / 5; + buf.resize(buf_size); + char* dest = &buf[0]; + + const size_t decodedBytes = base85_decode_impl(source.data(), source.data() + input_size, dest) - dest; + if (decodedBytes == 0) { + return Status::RuntimeError("base85 decoded failed"); + } + + return buf; +} +} // namespace starrocks \ No newline at end of file diff --git a/be/src/util/base85.h b/be/src/util/base85.h new file mode 100644 index 0000000000000..a71e1988fd0db --- /dev/null +++ b/be/src/util/base85.h @@ -0,0 +1,23 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include + +#include "common/statusor.h" + +namespace starrocks { +StatusOr base85_decode(const std::string& source); +} diff --git a/be/test/CMakeLists.txt b/be/test/CMakeLists.txt index 8f102ad55b33c..64c9f2dd9513e 100644 --- a/be/test/CMakeLists.txt +++ b/be/test/CMakeLists.txt @@ -24,6 +24,7 @@ set(EXEC_FILES ./common/status_test.cpp ./common/tracer_test.cpp ./common/uri_test.cpp + ./connector/deletion_vector/deletion_vector_test.cpp ./connector_sink/hive_chunk_sink_test.cpp ./connector_sink/iceberg_chunk_sink_test.cpp ./connector_sink/file_chunk_sink_test.cpp @@ -410,6 +411,7 @@ set(EXEC_FILES ./util/phmap_test.cpp ./util/aes_util_test.cpp ./util/await_test.cpp + ./util/base85_test.cpp ./util/bitmap_test.cpp ./util/bit_mask_test.cpp ./util/bit_stream_utils_test.cpp diff --git a/be/test/connector/deletion_vector/deletion_vector_test.cpp b/be/test/connector/deletion_vector/deletion_vector_test.cpp new file mode 100644 index 0000000000000..7726f8e518fb2 --- /dev/null +++ b/be/test/connector/deletion_vector/deletion_vector_test.cpp @@ -0,0 +1,65 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "connector/deletion_vector/deletion_vector.h" + +#include + +#include "util/base85.h" + +namespace starrocks { + +class DeletionVectorTest : public testing::Test { +public: + void SetUp() override { dv = std::make_shared(params); } + + HdfsScannerParams params; + std::shared_ptr dv; +}; + +TEST_F(DeletionVectorTest, inlineDeletionVectorTest) { + std::string encoded_inline_dv = "^Bg9^0rr910000000000iXQKl0rr91000f55c8Xg0@@D72lkbi5=-{L"; + std::set need_skip_rowids; + Status status = dv->deserialized_inline_dv(encoded_inline_dv, &need_skip_rowids); + ASSERT_TRUE(status.ok()); + ASSERT_EQ(6, need_skip_rowids.size()); + std::stringstream ss; + for (int64_t rowid : need_skip_rowids) { + ss << rowid << " "; + } + ASSERT_EQ("3 4 7 11 18 29 ", ss.str()); +} + +TEST_F(DeletionVectorTest, absolutePathTest) { + // test relative path + params.deletion_vector_descriptor = std::make_shared(); + params.deletion_vector_descriptor->__set_storageType("u"); + params.deletion_vector_descriptor->__set_pathOrInlineDv("ab^-aqEH.-t@S}K{vb[*k^"); + dv = std::make_shared(params); + + std::string table_location = "s3://mytable"; + StatusOr absolute_path = dv->get_absolute_path(table_location); + ASSERT_TRUE(absolute_path.ok()); + ASSERT_EQ("s3://mytable/ab/deletion_vector_d2c639aa-8816-431a-aaf6-d3fe2512ff61.bin", absolute_path.value()); + // test absolute path + params.deletion_vector_descriptor = std::make_shared(); + params.deletion_vector_descriptor->__set_storageType("p"); + params.deletion_vector_descriptor->__set_pathOrInlineDv( + "s3://mytable/deletion_vector_d2c639aa-8816-431a-aaf6-d3fe2512ff61.bin"); + dv = std::make_shared(params); + absolute_path = dv->get_absolute_path(table_location); + ASSERT_TRUE(absolute_path.ok()); + ASSERT_EQ("s3://mytable/deletion_vector_d2c639aa-8816-431a-aaf6-d3fe2512ff61.bin", absolute_path.value()); +} +} // namespace starrocks \ No newline at end of file diff --git a/be/test/util/base85_test.cpp b/be/test/util/base85_test.cpp new file mode 100644 index 0000000000000..a7972a92dfe87 --- /dev/null +++ b/be/test/util/base85_test.cpp @@ -0,0 +1,35 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "util/base85.h" + +#include + +#include "util/uuid_generator.h" + +namespace starrocks { + +class Base85Test : public testing::Test {}; + +TEST_F(Base85Test, decodeTest) { + std::string encode = "bfX+S@(X0DVnaD&Lvg?b"; + auto decode_status = base85_decode(encode); + auto decode = decode_status.value(); + boost::uuids::uuid uuid{}; + memcpy(uuid.data + 0, decode.data(), 8); + memcpy(uuid.data + 8, decode.data() + 8, 8); + ASSERT_EQ("22ccda6c-fecb-4cba-b232-3a299360b660", boost::uuids::to_string(uuid)); +} + +} // namespace starrocks \ No newline at end of file diff --git a/fe/fe-core/src/main/java/com/starrocks/connector/delta/DeltaConnectorScanRangeSource.java b/fe/fe-core/src/main/java/com/starrocks/connector/delta/DeltaConnectorScanRangeSource.java index 16ec900fdf63f..093ec5b411f80 100644 --- a/fe/fe-core/src/main/java/com/starrocks/connector/delta/DeltaConnectorScanRangeSource.java +++ b/fe/fe-core/src/main/java/com/starrocks/connector/delta/DeltaConnectorScanRangeSource.java @@ -24,6 +24,7 @@ import com.starrocks.connector.RemoteFileInfoSource; import com.starrocks.connector.exception.StarRocksConnectorException; import com.starrocks.connector.hive.RemoteFileInputFormat; +import com.starrocks.thrift.TDeletionVectorDescriptor; import com.starrocks.thrift.THdfsScanRange; import com.starrocks.thrift.TNetworkAddress; import com.starrocks.thrift.TScanRange; @@ -101,6 +102,17 @@ private TScanRangeLocations toScanRange(FileScanTask fileScanTask) { hdfsScanRange.setFile_format(remoteFileInputFormat.toThrift()); hdfsScanRange.setPartition_value(table.toHdfsPartition(referencedPartitionInfo)); hdfsScanRange.setTable_id(table.getId()); + // serialize dv + if (fileScanTask.getDv() != null) { + TDeletionVectorDescriptor dv = new TDeletionVectorDescriptor(); + dv.setStorageType(fileScanTask.getDv().getStorageType()); + dv.setPathOrInlineDv(fileScanTask.getDv().getPathOrInlineDv()); + dv.setOffset(fileScanTask.getDv().getOffset().orElse(0)); + dv.setSizeInBytes(fileScanTask.getDv().getSizeInBytes()); + dv.setCardinality(fileScanTask.getDv().getCardinality()); + hdfsScanRange.setDeletion_vector_descriptor(dv); + } + TScanRange scanRange = new TScanRange(); scanRange.setHdfs_scan_range(hdfsScanRange); scanRangeLocations.setScan_range(scanRange); diff --git a/fe/fe-core/src/main/java/com/starrocks/connector/delta/DeltaLakeMetadata.java b/fe/fe-core/src/main/java/com/starrocks/connector/delta/DeltaLakeMetadata.java index 7a1aff74d32e5..a7d73f52bf48a 100644 --- a/fe/fe-core/src/main/java/com/starrocks/connector/delta/DeltaLakeMetadata.java +++ b/fe/fe-core/src/main/java/com/starrocks/connector/delta/DeltaLakeMetadata.java @@ -20,8 +20,6 @@ import com.starrocks.catalog.DeltaLakeTable; import com.starrocks.catalog.PartitionKey; import com.starrocks.catalog.Table; -import com.starrocks.common.ErrorCode; -import com.starrocks.common.ErrorReport; import com.starrocks.common.Pair; import com.starrocks.common.profile.Timer; import com.starrocks.common.profile.Tracers; @@ -39,7 +37,6 @@ import com.starrocks.connector.statistics.StatisticsUtils; import com.starrocks.credential.CloudConfiguration; import com.starrocks.qe.ConnectContext; -import com.starrocks.sql.common.ErrorType; import com.starrocks.sql.optimizer.OptimizerContext; import com.starrocks.sql.optimizer.Utils; import com.starrocks.sql.optimizer.operator.scalar.ColumnRefOperator; @@ -238,14 +235,8 @@ public Pair next() { Row scanFileRow = scanFileRows.next(); DeletionVectorDescriptor dv = InternalScanFileUtils.getDeletionVectorDescriptorFromRow(scanFileRow); - if (dv != null) { - ErrorReport.reportValidateException(ErrorCode.ERR_BAD_TABLE_ERROR, ErrorType.UNSUPPORTED, - "Delta table feature [deletion vectors] is not supported"); - } - - Pair pair = - ScanFileUtils.convertFromRowToFileScanTask(enableCollectColumnStats, scanFileRow, estimateRowSize); - return pair; + return ScanFileUtils.convertFromRowToFileScanTask(enableCollectColumnStats, scanFileRow, + estimateRowSize, dv); } private void ensureOpen() { diff --git a/fe/fe-core/src/main/java/com/starrocks/connector/delta/FileScanTask.java b/fe/fe-core/src/main/java/com/starrocks/connector/delta/FileScanTask.java index c517eafd571ba..22ecbde0ddf8e 100644 --- a/fe/fe-core/src/main/java/com/starrocks/connector/delta/FileScanTask.java +++ b/fe/fe-core/src/main/java/com/starrocks/connector/delta/FileScanTask.java @@ -14,22 +14,26 @@ package com.starrocks.connector.delta; +import io.delta.kernel.internal.actions.DeletionVectorDescriptor; import io.delta.kernel.utils.FileStatus; import org.apache.commons.collections4.map.CaseInsensitiveMap; import java.util.Map; -// FileScanTask represents one `AddFile` in DeltaLake. +// FileScanTask represents one `AddFile` with DV in DeltaLake. // TODO: The file representations of different Catalogs will be unified later. public class FileScanTask { private final FileStatus fileStatus; private final long records; private final Map partitionValues; + private final DeletionVectorDescriptor dv; - public FileScanTask(FileStatus fileStatus, long records, Map partitionValues) { + public FileScanTask(FileStatus fileStatus, long records, Map partitionValues, + DeletionVectorDescriptor dv) { this.fileStatus = fileStatus; this.records = records; this.partitionValues = new CaseInsensitiveMap<>(partitionValues); + this.dv = dv; } public FileStatus getFileStatus() { @@ -47,4 +51,8 @@ public long getRecords() { public Map getPartitionValues() { return partitionValues; } + + public DeletionVectorDescriptor getDv() { + return dv; + } } diff --git a/fe/fe-core/src/main/java/com/starrocks/connector/delta/ScanFileUtils.java b/fe/fe-core/src/main/java/com/starrocks/connector/delta/ScanFileUtils.java index f0fab6fbdb244..2da4d436fca54 100644 --- a/fe/fe-core/src/main/java/com/starrocks/connector/delta/ScanFileUtils.java +++ b/fe/fe-core/src/main/java/com/starrocks/connector/delta/ScanFileUtils.java @@ -19,6 +19,7 @@ import com.starrocks.persist.gson.GsonUtils; import io.delta.kernel.data.Row; import io.delta.kernel.internal.InternalScanFileUtils; +import io.delta.kernel.internal.actions.DeletionVectorDescriptor; import io.delta.kernel.utils.FileStatus; import java.util.Map; @@ -67,7 +68,7 @@ private static Row getAddFileEntry(Row scanFileInfo) { } public static Pair convertFromRowToFileScanTask( - boolean needStats, Row file, long estimateRowSize) { + boolean needStats, Row file, long estimateRowSize, DeletionVectorDescriptor dv) { FileStatus fileStatus = InternalScanFileUtils.getAddFileStatus(file); Map partitionValues = InternalScanFileUtils.getPartitionValues(file); Row addFileRow = getAddFileEntry(file); @@ -76,11 +77,11 @@ public static Pair convertFromRowToFil if (needStats) { DeltaLakeAddFileStatsSerDe stats = ScanFileUtils.getColumnStatistics( addFileRow, fileStatus, estimateRowSize); - fileScanTask = new FileScanTask(fileStatus, stats.numRecords, partitionValues); + fileScanTask = new FileScanTask(fileStatus, stats.numRecords, partitionValues, dv); return new Pair<>(fileScanTask, stats); } else { long records = ScanFileUtils.getFileRows(addFileRow, fileStatus, estimateRowSize); - fileScanTask = new FileScanTask(fileStatus, records, partitionValues); + fileScanTask = new FileScanTask(fileStatus, records, partitionValues, dv); return new Pair<>(fileScanTask, null); } } diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index c69c9006c742a..f4e71113d9472 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -315,6 +315,15 @@ struct TPaimonDeletionFile { 3: optional i64 length } +// refer to https://github.com/delta-io/delta/blob/master/PROTOCOL.md#deletion-vector-descriptor-schema +struct TDeletionVectorDescriptor { + 1: optional string storageType + 2: optional string pathOrInlineDv + 3: optional i64 offset + 4: optional i64 sizeInBytes + 5: optional i64 cardinality +} + // Hdfs scan range struct THdfsScanRange { // File name (not the full path). The path is assumed to be relative to the @@ -401,6 +410,8 @@ struct THdfsScanRange { 29: optional Descriptors.THdfsPartition partition_value; 30: optional Types.TTableId table_id; + + 31:optional TDeletionVectorDescriptor deletion_vector_descriptor } struct TBinlogScanRange { diff --git a/test/sql/test_deltalake/R/test_deltalake_dv b/test/sql/test_deltalake/R/test_deltalake_dv new file mode 100644 index 0000000000000..65db1c14d6202 --- /dev/null +++ b/test/sql/test_deltalake/R/test_deltalake_dv @@ -0,0 +1,25 @@ +-- name: test_delta_lake_dv +create external catalog delta_test_${uuid0} PROPERTIES ( + "type"="deltalake", + "hive.metastore.uris"="${deltalake_catalog_hive_metastore_uris}", + "aws.s3.access_key"="${oss_ak}", + "aws.s3.secret_key"="${oss_sk}", + "aws.s3.endpoint"="${oss_endpoint}" +); +-- result: +-- !result +select * from delta_test_${uuid0}.delta_oss_db.delta_test_dv order by `key`; +-- result: +1 name1 StarRocks +2 name2 sr +3 name3 sr +5 name5 rocks +6 name6 rocks +-- !result +select count(1) from delta_test_${uuid0}.delta_oss_db.delta_test_par_dv where comment = 'starrocks'; +-- result: +15 +-- !result +drop catalog delta_test_${uuid0} +-- result: +-- !result \ No newline at end of file diff --git a/test/sql/test_deltalake/T/test_deltalake_dv b/test/sql/test_deltalake/T/test_deltalake_dv new file mode 100644 index 0000000000000..dee3c67d11b55 --- /dev/null +++ b/test/sql/test_deltalake/T/test_deltalake_dv @@ -0,0 +1,15 @@ +-- name: test_delta_lake_dv + +-- create catalog +create external catalog delta_test_${uuid0} PROPERTIES ( + "type"="deltalake", + "hive.metastore.uris"="${deltalake_catalog_hive_metastore_uris}", + "aws.s3.access_key"="${oss_ak}", + "aws.s3.secret_key"="${oss_sk}", + "aws.s3.endpoint"="${oss_endpoint}" +); + +select * from delta_test_${uuid0}.delta_oss_db.delta_test_dv order by `key`; +select count(1) from delta_test_${uuid0}.delta_oss_db.delta_test_par_dv where comment = 'starrocks'; + +drop catalog delta_test_${uuid0} \ No newline at end of file