From baee5ad71820aa72b6a2a880afa05a6d26aef57a Mon Sep 17 00:00:00 2001 From: zhixingheyi-tian Date: Tue, 19 Jul 2022 14:16:28 +0800 Subject: [PATCH 01/11] Add parquet scan benchmark --- cpp/CMakeLists.txt | 5 + cpp/src/parquet/CMakeLists.txt | 2 + .../parquet/arrow/parquet_scan_benchmark.cc | 213 +++++++++++++++++ .../arrow/parquet_scan_string_benchmark.cc | 223 ++++++++++++++++++ cpp/src/parquet/arrow/test_utils.h | 132 +++++++++++ cpp/src/parquet/arrow/utils/exception.h | 25 ++ cpp/src/parquet/arrow/utils/macros.h | 104 ++++++++ 7 files changed, 704 insertions(+) create mode 100644 cpp/src/parquet/arrow/parquet_scan_benchmark.cc create mode 100644 cpp/src/parquet/arrow/parquet_scan_string_benchmark.cc create mode 100644 cpp/src/parquet/arrow/test_utils.h create mode 100644 cpp/src/parquet/arrow/utils/exception.h create mode 100644 cpp/src/parquet/arrow/utils/macros.h diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 234a66c00d875..0f112af489c88 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -48,6 +48,7 @@ if(POLICY CMP0074) endif() set(ARROW_VERSION "4.0.0") +add_compile_options(-g) string(REGEX MATCH "^[0-9]+\\.[0-9]+\\.[0-9]+" ARROW_BASE_VERSION "${ARROW_VERSION}") @@ -937,3 +938,7 @@ config_summary_message() if(${ARROW_BUILD_CONFIG_SUMMARY_JSON}) config_summary_json() endif() + + + + diff --git a/cpp/src/parquet/CMakeLists.txt b/cpp/src/parquet/CMakeLists.txt index 3f3ca5a529917..0aceae25fc98b 100644 --- a/cpp/src/parquet/CMakeLists.txt +++ b/cpp/src/parquet/CMakeLists.txt @@ -399,6 +399,8 @@ add_parquet_benchmark(column_io_benchmark) add_parquet_benchmark(encoding_benchmark) add_parquet_benchmark(level_conversion_benchmark) add_parquet_benchmark(arrow/reader_writer_benchmark PREFIX "parquet-arrow") +add_parquet_benchmark(arrow/parquet_scan_benchmark PREFIX "parquet-arrow") +add_parquet_benchmark(arrow/parquet_scan_string_benchmark PREFIX "parquet-arrow") if(ARROW_WITH_BROTLI) add_definitions(-DARROW_WITH_BROTLI) diff --git a/cpp/src/parquet/arrow/parquet_scan_benchmark.cc b/cpp/src/parquet/arrow/parquet_scan_benchmark.cc new file mode 100644 index 0000000000000..2ab95e1c380d0 --- /dev/null +++ b/cpp/src/parquet/arrow/parquet_scan_benchmark.cc @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "arrow/record_batch.h" +#include "parquet/arrow/utils/macros.h" +#include "parquet/arrow/test_utils.h" + + +// namespace parquet { +// namespace benchmark { + +const int batch_buffer_size = 32768; + +class GoogleBenchmarkColumnarToRow { + public: + GoogleBenchmarkColumnarToRow(std::string file_name) { GetRecordBatchReader(file_name); } + + void GetRecordBatchReader(const std::string& input_file) { + std::unique_ptr<::parquet::arrow::FileReader> parquet_reader; + std::shared_ptr record_batch_reader; + + std::shared_ptr fs; + std::string file_name; + ARROW_ASSIGN_OR_THROW(fs, arrow::fs::FileSystemFromUriOrPath(input_file, &file_name)) + + ARROW_ASSIGN_OR_THROW(file, fs->OpenInputFile(file_name)); + + properties.set_batch_size(batch_buffer_size); + properties.set_pre_buffer(false); + properties.set_use_threads(false); + + ASSERT_NOT_OK(::parquet::arrow::FileReader::Make( + arrow::default_memory_pool(), ::parquet::ParquetFileReader::Open(file), + properties, &parquet_reader)); + + ASSERT_NOT_OK(parquet_reader->GetSchema(&schema)); + + auto num_rowgroups = parquet_reader->num_row_groups(); + + for (int i = 0; i < num_rowgroups; ++i) { + row_group_indices.push_back(i); + } + + auto num_columns = schema->num_fields(); + for (int i = 0; i < num_columns; ++i) { + column_indices.push_back(i); + } + } + + virtual void operator()(benchmark::State& state) {} + + protected: + long SetCPU(uint32_t cpuindex) { + cpu_set_t cs; + CPU_ZERO(&cs); + CPU_SET(cpuindex, &cs); + return sched_setaffinity(0, sizeof(cs), &cs); + } + + protected: + std::string file_name; + std::shared_ptr file; + std::vector row_group_indices; + std::vector column_indices; + std::shared_ptr schema; + parquet::ArrowReaderProperties properties; +}; +class GoogleBenchmarkColumnarToRow_CacheScan_Benchmark + : public GoogleBenchmarkColumnarToRow { + public: + GoogleBenchmarkColumnarToRow_CacheScan_Benchmark(std::string filename) + : GoogleBenchmarkColumnarToRow(filename) {} + void operator()(benchmark::State& state) { + if (state.range(0) == 0xffffffff) { + SetCPU(state.thread_index()); + } else { + SetCPU(state.range(0)); + } + + arrow::Compression::type compression_type = (arrow::Compression::type)1; + + std::shared_ptr record_batch; + int64_t elapse_read = 0; + int64_t num_batches = 0; + int64_t num_rows = 0; + int64_t init_time = 0; + int64_t write_time = 0; + + + std::vector local_column_indices = column_indices; + + std::shared_ptr local_schema; + local_schema = std::make_shared(*schema.get()); + + if (state.thread_index() == 0) std::cout << local_schema->ToString() << std::endl; + + for (auto _ : state) { + std::unique_ptr<::parquet::arrow::FileReader> parquet_reader; + std::shared_ptr record_batch_reader; + ASSERT_NOT_OK(::parquet::arrow::FileReader::Make( + ::arrow::default_memory_pool(), ::parquet::ParquetFileReader::Open(file), + properties, &parquet_reader)); + + std::vector> batches; + ASSERT_NOT_OK(parquet_reader->GetRecordBatchReader( + row_group_indices, local_column_indices, &record_batch_reader)); + do { + TIME_NANO_OR_THROW(elapse_read, record_batch_reader->ReadNext(&record_batch)); + + if (record_batch) { + // batches.push_back(record_batch); + num_batches += 1; + num_rows += record_batch->num_rows(); + } + } while (record_batch); + + std::cout << " parquet parse done elapsed time = " << elapse_read / 1000000 + << " rows = " << num_rows << std::endl; + } + + state.counters["rowgroups"] = + benchmark::Counter(row_group_indices.size(), benchmark::Counter::kAvgThreads, + benchmark::Counter::OneK::kIs1000); + state.counters["columns"] = + benchmark::Counter(column_indices.size(), benchmark::Counter::kAvgThreads, + benchmark::Counter::OneK::kIs1000); + state.counters["batches"] = benchmark::Counter( + num_batches, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); + state.counters["num_rows"] = benchmark::Counter( + num_rows, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); + state.counters["batch_buffer_size"] = + benchmark::Counter(batch_buffer_size, benchmark::Counter::kAvgThreads, + benchmark::Counter::OneK::kIs1024); + + state.counters["parquet_parse"] = benchmark::Counter( + elapse_read, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); + state.counters["init_time"] = benchmark::Counter( + init_time, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); + state.counters["write_time"] = benchmark::Counter( + write_time, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); + } +}; + +// } // namespace columnartorow +// } // namespace sparkcolumnarplugin + +int main(int argc, char** argv) { + uint32_t iterations = 1; + uint32_t threads = 1; + std::string datafile; + uint32_t cpu = 0xffffffff; + + for (int i = 0; i < argc; i++) { + if (strcmp(argv[i], "--iterations") == 0) { + iterations = atol(argv[i + 1]); + } else if (strcmp(argv[i], "--threads") == 0) { + threads = atol(argv[i + 1]); + } else if (strcmp(argv[i], "--file") == 0) { + datafile = argv[i + 1]; + } else if (strcmp(argv[i], "--cpu") == 0) { + cpu = atol(argv[i + 1]); + } + } + std::cout << "iterations = " << iterations << std::endl; + std::cout << "threads = " << threads << std::endl; + std::cout << "datafile = " << datafile << std::endl; + std::cout << "cpu = " << cpu << std::endl; + + GoogleBenchmarkColumnarToRow_CacheScan_Benchmark + bck(datafile); + + benchmark::RegisterBenchmark("GoogleBenchmarkColumnarToRow::CacheScan", bck) + ->Args({ + cpu, + }) + ->Iterations(iterations) + ->Threads(threads) + ->ReportAggregatesOnly(false) + ->MeasureProcessCPUTime() + ->Unit(benchmark::kSecond); + + benchmark::Initialize(&argc, argv); + benchmark::RunSpecifiedBenchmarks(); + benchmark::Shutdown(); +} diff --git a/cpp/src/parquet/arrow/parquet_scan_string_benchmark.cc b/cpp/src/parquet/arrow/parquet_scan_string_benchmark.cc new file mode 100644 index 0000000000000..58763e2edfee0 --- /dev/null +++ b/cpp/src/parquet/arrow/parquet_scan_string_benchmark.cc @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "arrow/record_batch.h" +#include "parquet/arrow/utils/macros.h" +#include "parquet/arrow/test_utils.h" + + +// namespace parquet { +// namespace benchmark { + +const int batch_buffer_size = 32768; + +class GoogleBenchmarkParquetStringScan { + public: + GoogleBenchmarkParquetStringScan(std::string file_name) { GetRecordBatchReader(file_name); } + + void GetRecordBatchReader(const std::string& input_file) { + std::unique_ptr<::parquet::arrow::FileReader> parquet_reader; + std::shared_ptr record_batch_reader; + + std::shared_ptr fs; + std::string file_name; + ARROW_ASSIGN_OR_THROW(fs, arrow::fs::FileSystemFromUriOrPath(input_file, &file_name)) + + ARROW_ASSIGN_OR_THROW(file, fs->OpenInputFile(file_name)); + + properties.set_batch_size(batch_buffer_size); + properties.set_pre_buffer(false); + properties.set_use_threads(false); + + ASSERT_NOT_OK(::parquet::arrow::FileReader::Make( + arrow::default_memory_pool(), ::parquet::ParquetFileReader::Open(file), + properties, &parquet_reader)); + + ASSERT_NOT_OK(parquet_reader->GetSchema(&schema)); + + auto num_rowgroups = parquet_reader->num_row_groups(); + + for (int i = 0; i < num_rowgroups; ++i) { + row_group_indices.push_back(i); + } + + auto num_columns = schema->num_fields(); + std::cout << "Enter Is_binary_like Check: " << std::endl; + for (int i = 0; i < num_columns; ++i) { + auto field = schema->field(i); + auto type = field->type(); + if (arrow::is_binary_like(type->id())) { + std::cout << "Is_binary_like colIndex: " << i << std::endl; + column_indices.push_back(i); + } + } + } + + virtual void operator()(benchmark::State& state) {} + + protected: + long SetCPU(uint32_t cpuindex) { + cpu_set_t cs; + CPU_ZERO(&cs); + CPU_SET(cpuindex, &cs); + return sched_setaffinity(0, sizeof(cs), &cs); + } + + protected: + std::string file_name; + std::shared_ptr file; + std::vector row_group_indices; + std::vector column_indices; + std::shared_ptr schema; + parquet::ArrowReaderProperties properties; +}; +class GoogleBenchmarkParquetStringScan_IteratorScan_Benchmark + : public GoogleBenchmarkParquetStringScan { + public: + GoogleBenchmarkParquetStringScan_IteratorScan_Benchmark(std::string filename) + : GoogleBenchmarkParquetStringScan(filename) {} + void operator()(benchmark::State& state) { + if (state.range(0) == 0xffffffff) { + SetCPU(state.thread_index()); + } else { + SetCPU(state.range(0)); + } + + arrow::Compression::type compression_type = (arrow::Compression::type)1; + + std::shared_ptr record_batch; + int64_t elapse_read = 0; + int64_t num_batches = 0; + int64_t num_rows = 0; + int64_t init_time = 0; + int64_t write_time = 0; + + + std::vector local_column_indices = column_indices; + + for (auto val : local_column_indices){ + std::cout << "local_column_indices: is_binary_like colIndex: " << val << std::endl; + } + + std::shared_ptr local_schema; + local_schema = std::make_shared(*schema.get()); + + if (state.thread_index() == 0) std::cout << local_schema->ToString() << std::endl; + + for (auto _ : state) { + std::unique_ptr<::parquet::arrow::FileReader> parquet_reader; + std::shared_ptr record_batch_reader; + ASSERT_NOT_OK(::parquet::arrow::FileReader::Make( + ::arrow::default_memory_pool(), ::parquet::ParquetFileReader::Open(file), + properties, &parquet_reader)); + + std::vector> batches; + ASSERT_NOT_OK(parquet_reader->GetRecordBatchReader( + row_group_indices, local_column_indices, &record_batch_reader)); + do { + TIME_NANO_OR_THROW(elapse_read, record_batch_reader->ReadNext(&record_batch)); + + if (record_batch) { + // batches.push_back(record_batch); + num_batches += 1; + num_rows += record_batch->num_rows(); + } + } while (record_batch); + + std::cout << " parquet parse done elapsed time = " << elapse_read / 1000000 + << " rows = " << num_rows << std::endl; + } + + state.counters["rowgroups"] = + benchmark::Counter(row_group_indices.size(), benchmark::Counter::kAvgThreads, + benchmark::Counter::OneK::kIs1000); + state.counters["columns"] = + benchmark::Counter(column_indices.size(), benchmark::Counter::kAvgThreads, + benchmark::Counter::OneK::kIs1000); + state.counters["batches"] = benchmark::Counter( + num_batches, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); + state.counters["num_rows"] = benchmark::Counter( + num_rows, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); + state.counters["batch_buffer_size"] = + benchmark::Counter(batch_buffer_size, benchmark::Counter::kAvgThreads, + benchmark::Counter::OneK::kIs1024); + + state.counters["parquet_parse"] = benchmark::Counter( + elapse_read, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); + state.counters["init_time"] = benchmark::Counter( + init_time, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); + state.counters["write_time"] = benchmark::Counter( + write_time, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); + } +}; + +// } // namespace ParquetStringScan +// } // namespace sparkcolumnarplugin + +int main(int argc, char** argv) { + uint32_t iterations = 1; + uint32_t threads = 1; + std::string datafile; + uint32_t cpu = 0xffffffff; + + for (int i = 0; i < argc; i++) { + if (strcmp(argv[i], "--iterations") == 0) { + iterations = atol(argv[i + 1]); + } else if (strcmp(argv[i], "--threads") == 0) { + threads = atol(argv[i + 1]); + } else if (strcmp(argv[i], "--file") == 0) { + datafile = argv[i + 1]; + } else if (strcmp(argv[i], "--cpu") == 0) { + cpu = atol(argv[i + 1]); + } + } + std::cout << "iterations = " << iterations << std::endl; + std::cout << "threads = " << threads << std::endl; + std::cout << "datafile = " << datafile << std::endl; + std::cout << "cpu = " << cpu << std::endl; + + GoogleBenchmarkParquetStringScan_IteratorScan_Benchmark + bck(datafile); + + benchmark::RegisterBenchmark("GoogleBenchmarkParquetStringScan::IteratorScan", bck) + ->Args({ + cpu, + }) + ->Iterations(iterations) + ->Threads(threads) + ->ReportAggregatesOnly(false) + ->MeasureProcessCPUTime() + ->Unit(benchmark::kSecond); + + benchmark::Initialize(&argc, argv); + benchmark::RunSpecifiedBenchmarks(); + benchmark::Shutdown(); +} diff --git a/cpp/src/parquet/arrow/test_utils.h b/cpp/src/parquet/arrow/test_utils.h new file mode 100644 index 0000000000000..d3afa459dfdf6 --- /dev/null +++ b/cpp/src/parquet/arrow/test_utils.h @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include "utils/macros.h" +using namespace arrow; + +using TreeExprBuilder = gandiva::TreeExprBuilder; +using FunctionNode = gandiva::FunctionNode; + +#define ASSERT_NOT_OK(status) \ + do { \ + ::arrow::Status __s = (status); \ + if (!__s.ok()) { \ + throw std::runtime_error(__s.message()); \ + } \ + } while (false); + +#define ARROW_ASSIGN_OR_THROW_IMPL(status_name, lhs, rexpr) \ + do { \ + auto status_name = (rexpr); \ + auto __s = status_name.status(); \ + if (!__s.ok()) { \ + throw std::runtime_error(__s.message()); \ + } \ + lhs = std::move(status_name).ValueOrDie(); \ + } while (false); + +#define ARROW_ASSIGN_OR_THROW_NAME(x, y) ARROW_CONCAT(x, y) + +#define ARROW_ASSIGN_OR_THROW(lhs, rexpr) \ + ARROW_ASSIGN_OR_THROW_IMPL(ARROW_ASSIGN_OR_THROW_NAME(_error_or_value, __COUNTER__), \ + lhs, rexpr); + +template +Status Equals(const T& expected, const T& actual) { + if (expected.Equals(actual)) { + return arrow::Status::OK(); + } + std::stringstream pp_expected; + std::stringstream pp_actual; + ::arrow::PrettyPrintOptions options(/*indent=*/2); + options.window = 50; + ASSERT_NOT_OK(PrettyPrint(expected, options, &pp_expected)); + ASSERT_NOT_OK(PrettyPrint(actual, options, &pp_actual)); + if (pp_expected.str() == pp_actual.str()) { + return arrow::Status::OK(); + } + return Status::Invalid("Expected RecordBatch is ", pp_expected.str(), " with schema ", + expected.schema()->ToString(), ", while actual is ", + pp_actual.str(), " with schema ", actual.schema()->ToString()); +} + +void MakeInputBatch(std::vector input_data, + std::shared_ptr sch, + std::shared_ptr* input_batch) { + // prepare input record Batch + std::vector> array_list; + int length = -1; + int i = 0; + for (auto data : input_data) { + std::shared_ptr a0; + ASSERT_NOT_OK(arrow::ipc::internal::json::ArrayFromJSON(sch->field(i++)->type(), + data.c_str(), &a0)); + if (length == -1) { + length = a0->length(); + } + assert(length == a0->length()); + array_list.push_back(a0); + } + + *input_batch = RecordBatch::Make(sch, length, array_list); + return; +} + +void ConstructNullInputBatch(std::shared_ptr* null_batch) { + std::vector> columns; + arrow::Int64Builder builder1; + builder1.AppendNull(); + builder1.Append(1); + + arrow::Int64Builder builder2; + builder2.Append(1); + builder2.AppendNull(); + + std::shared_ptr array1; + builder1.Finish(&array1); + std::shared_ptr array2; + builder2.Finish(&array2); + + columns.push_back(array1); + columns.push_back(array2); + + std::vector> schema_vec{ + arrow::field("col1", arrow::int64()), + arrow::field("col2", arrow::int64()), + }; + + std::shared_ptr schema{std::make_shared(schema_vec)}; + *null_batch = arrow::RecordBatch::Make(schema, 2, columns); + return; +} diff --git a/cpp/src/parquet/arrow/utils/exception.h b/cpp/src/parquet/arrow/utils/exception.h new file mode 100644 index 0000000000000..582903d0ef0fa --- /dev/null +++ b/cpp/src/parquet/arrow/utils/exception.h @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +class JniPendingException : public std::runtime_error { + public: + explicit JniPendingException(const std::string& arg) : runtime_error(arg) {} +}; \ No newline at end of file diff --git a/cpp/src/parquet/arrow/utils/macros.h b/cpp/src/parquet/arrow/utils/macros.h new file mode 100644 index 0000000000000..e123d46f82854 --- /dev/null +++ b/cpp/src/parquet/arrow/utils/macros.h @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include + +#include "parquet/arrow/utils/exception.h" + +#define TIME_NANO_DIFF(finish, start) \ + (finish.tv_sec - start.tv_sec) * 1000000000 + (finish.tv_nsec - start.tv_nsec) + +#define TIME_MICRO_OR_RAISE(time, expr) \ + do { \ + auto start = std::chrono::steady_clock::now(); \ + auto __s = (expr); \ + if (!__s.ok()) { \ + return __s; \ + } \ + auto end = std::chrono::steady_clock::now(); \ + time += std::chrono::duration_cast(end - start).count(); \ + } while (false); + +#define TIME_MICRO_OR_THROW(time, expr) \ + do { \ + auto start = std::chrono::steady_clock::now(); \ + auto __s = (expr); \ + if (!__s.ok()) { \ + throw JniPendingException(__s.message()); \ + } \ + auto end = std::chrono::steady_clock::now(); \ + time += std::chrono::duration_cast(end - start).count(); \ + } while (false); + +#define TIME_MICRO(time, res, expr) \ + do { \ + auto start = std::chrono::steady_clock::now(); \ + res = (expr); \ + auto end = std::chrono::steady_clock::now(); \ + time += std::chrono::duration_cast(end - start).count(); \ + } while (false); + +#define TIME_NANO_OR_RAISE(time, expr) \ + do { \ + auto start = std::chrono::steady_clock::now(); \ + auto __s = (expr); \ + if (!__s.ok()) { \ + return __s; \ + } \ + auto end = std::chrono::steady_clock::now(); \ + time += std::chrono::duration_cast(end - start).count(); \ + } while (false); + +#define TIME_NANO_OR_THROW(time, expr) \ + do { \ + auto start = std::chrono::steady_clock::now(); \ + auto __s = (expr); \ + if (!__s.ok()) { \ + throw JniPendingException(__s.message()); \ + } \ + auto end = std::chrono::steady_clock::now(); \ + time += std::chrono::duration_cast(end - start).count(); \ + } while (false); + +#define VECTOR_PRINT(v, name) \ + std::cout << "[" << name << "]:"; \ + for (int i = 0; i < v.size(); i++) { \ + if (i != v.size() - 1) \ + std::cout << v[i] << ","; \ + else \ + std::cout << v[i]; \ + } \ + std::cout << std::endl; + +#define THROW_NOT_OK(expr) \ + do { \ + auto __s = (expr); \ + if (!__s.ok()) { \ + throw JniPendingException(__s.message()); \ + } \ + } while (false); + +#define TIME_TO_STRING(time) \ + (time > 10000 ? time / 1000 : time) << (time > 10000 ? " ms" : " us") + +#define TIME_NANO_TO_STRING(time) \ + (time > 1e7 ? time / 1e6 : ((time > 1e4) ? time / 1e3 : time)) \ + << (time > 1e7 ? "ms" : (time > 1e4 ? "us" : "ns")) From 7fdf91400f11036980d002da9b720c531d0db606 Mon Sep 17 00:00:00 2001 From: zhixingheyi-tian Date: Fri, 22 Jul 2022 15:29:47 +0800 Subject: [PATCH 02/11] Add Usage --- cpp/README.md | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/cpp/README.md b/cpp/README.md index b083f3fe78e74..ec6b136aa83a3 100644 --- a/cpp/README.md +++ b/cpp/README.md @@ -32,3 +32,16 @@ to install pre-compiled binary versions of the library. Please refer to our latest [C++ Development Documentation][1]. [1]: https://github.com/apache/arrow/blob/master/docs/source/developers/cpp + +## Run parquet string scan benchmark +#### Minimal benchmark build +cd arrow +mkdir -p cpp/debug +cd cpp/debug +cmake -DCMAKE_BUILD_TYPE=Release -DARROW_BUILD_BENCHMARKS=ON -DARROW_WITH_ZLIB=ON -DARROW_JEMALLOC=OFF -DARROW_PARQUET=ON -DARROW_COMPUTE=ON -DARROW_DATASET=ON -DARROW_WITH_SNAPPY=ON -DARROW_FILESYSTEM=ON .. + +#### Run benchmark and collect perf data +cpp/debug +./release/parquet-arrow-parquet-scan-string-benchmark --iterations 10 --threads 1 --file {parquet_path} --cpu 0 & +perf record -e cycles:ppp -C 0 sleep 10 + From 3b8ffa3eb3383eb43726e63b7f402ecec2efa53f Mon Sep 17 00:00:00 2001 From: zhixingheyi-tian Date: Fri, 22 Jul 2022 15:30:28 +0800 Subject: [PATCH 03/11] perf report --- cpp/README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/cpp/README.md b/cpp/README.md index ec6b136aa83a3..9f563149beb77 100644 --- a/cpp/README.md +++ b/cpp/README.md @@ -44,4 +44,5 @@ cmake -DCMAKE_BUILD_TYPE=Release -DARROW_BUILD_BENCHMARKS=ON -DARROW_WITH_ZLIB=O cpp/debug ./release/parquet-arrow-parquet-scan-string-benchmark --iterations 10 --threads 1 --file {parquet_path} --cpu 0 & perf record -e cycles:ppp -C 0 sleep 10 +perf report From a7c3879ebe44e4cacae390e053e7798d18468aee Mon Sep 17 00:00:00 2001 From: zhixingheyi-tian Date: Thu, 11 Aug 2022 09:16:00 +0800 Subject: [PATCH 04/11] Add Optimize append --- cpp/CMakeLists.txt | 2 +- .../arrow/parquet_scan_string_benchmark.cc | 1 + cpp/src/parquet/arrow/reader_internal.cc | 17 ++- cpp/src/parquet/column_reader.cc | 85 ++++++++++++++- cpp/src/parquet/column_reader.h | 6 ++ cpp/src/parquet/encoding.cc | 102 ++++++++++++++++++ cpp/src/parquet/encoding.h | 9 ++ 7 files changed, 215 insertions(+), 7 deletions(-) diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 0f112af489c88..822e4de7ed826 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -48,7 +48,7 @@ if(POLICY CMP0074) endif() set(ARROW_VERSION "4.0.0") -add_compile_options(-g) +add_compile_options(-g -O0) string(REGEX MATCH "^[0-9]+\\.[0-9]+\\.[0-9]+" ARROW_BASE_VERSION "${ARROW_VERSION}") diff --git a/cpp/src/parquet/arrow/parquet_scan_string_benchmark.cc b/cpp/src/parquet/arrow/parquet_scan_string_benchmark.cc index 58763e2edfee0..79db29bf1b08b 100644 --- a/cpp/src/parquet/arrow/parquet_scan_string_benchmark.cc +++ b/cpp/src/parquet/arrow/parquet_scan_string_benchmark.cc @@ -38,6 +38,7 @@ // namespace benchmark { const int batch_buffer_size = 32768; +// const int batch_buffer_size = 2; class GoogleBenchmarkParquetStringScan { public: diff --git a/cpp/src/parquet/arrow/reader_internal.cc b/cpp/src/parquet/arrow/reader_internal.cc index ccd05d3c38cbe..386b3e42ef04c 100644 --- a/cpp/src/parquet/arrow/reader_internal.cc +++ b/cpp/src/parquet/arrow/reader_internal.cc @@ -25,6 +25,7 @@ #include #include #include +#include #include "arrow/array.h" #include "arrow/compute/api.h" @@ -330,6 +331,17 @@ std::shared_ptr TransferZeroCopy(RecordReader* reader, return ::arrow::MakeArray(data); } +std::shared_ptr TransferBinaryZeroCopy(RecordReader* reader, + const std::shared_ptr& type) { + std::vector> buffers = {reader->ReleaseIsValid(), + reader->ReleaseOffsets(), + reader->ReleaseValues()}; + auto data = std::make_shared<::arrow::ArrayData>(type, reader->values_written(), + buffers, reader->null_count()); + std::cout << "::arrow::MakeArray(data)->ToString():" << ::arrow::MakeArray(data)->ToString() << std::endl; + return ::arrow::MakeArray(data); +} + Status TransferBool(RecordReader* reader, MemoryPool* pool, Datum* out) { int64_t length = reader->values_written(); @@ -697,8 +709,9 @@ Status TransferColumnData(RecordReader* reader, std::shared_ptr value_ case ::arrow::Type::STRING: case ::arrow::Type::LARGE_BINARY: case ::arrow::Type::LARGE_STRING: { - RETURN_NOT_OK(TransferBinary(reader, pool, value_type, &chunked_result)); - result = chunked_result; + result = TransferBinaryZeroCopy(reader, value_type); + // RETURN_NOT_OK(TransferBinary(reader, pool, value_type, &chunked_result)); + // result = chunked_result; } break; case ::arrow::Type::DECIMAL128: { switch (descr->physical_type()) { diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc index ec205f3d3f935..e62008d837d34 100644 --- a/cpp/src/parquet/column_reader.cc +++ b/cpp/src/parquet/column_reader.cc @@ -1554,6 +1554,8 @@ class ByteArrayChunkedRecordReader : public TypedRecordReader, : TypedRecordReader(descr, leaf_info, pool) { DCHECK_EQ(descr_->physical_type(), Type::BYTE_ARRAY); accumulator_.builder.reset(new ::arrow::BinaryBuilder(pool)); + values_ = AllocateBuffer(pool); + offset_ = AllocateBuffer(pool); } ::arrow::ArrayVector GetBuilderChunks() override { @@ -1568,23 +1570,98 @@ class ByteArrayChunkedRecordReader : public TypedRecordReader, } void ReadValuesDense(int64_t values_to_read) override { - int64_t num_decoded = this->current_decoder_->DecodeArrowNonNull( - static_cast(values_to_read), &accumulator_); + // int64_t num_decoded = this->current_decoder_->DecodeArrowNonNull( + // static_cast(values_to_read), &accumulator_); + int64_t num_decoded = this->current_decoder_->DecodeArrow_opt( + static_cast(values_to_read), 0, + NULLPTR, (reinterpret_cast(offset_->mutable_data()) + values_written_), + values_, 0, &accumulator_, &bianry_length_); DCHECK_EQ(num_decoded, values_to_read); ResetValues(); } + // void ReadValuesSpaced(int64_t values_to_read, int64_t null_count) override { + // int64_t num_decoded = this->current_decoder_->DecodeArrow( + // static_cast(values_to_read), static_cast(null_count), + // valid_bits_->mutable_data(), values_written_, &accumulator_); + // DCHECK_EQ(num_decoded, values_to_read - null_count); + // ResetValues(); + // } + void ReadValuesSpaced(int64_t values_to_read, int64_t null_count) override { - int64_t num_decoded = this->current_decoder_->DecodeArrow( + int64_t num_decoded = this->current_decoder_->DecodeArrow_opt( static_cast(values_to_read), static_cast(null_count), - valid_bits_->mutable_data(), values_written_, &accumulator_); + valid_bits_->mutable_data(), (reinterpret_cast(offset_->mutable_data()) + values_written_), + values_, values_written_, &accumulator_, &bianry_length_); DCHECK_EQ(num_decoded, values_to_read - null_count); ResetValues(); } + void ReserveValues(int64_t extra_values) { + const int64_t new_values_capacity = + UpdateCapacity(values_capacity_, values_written_, extra_values); + if (new_values_capacity > values_capacity_) { + PARQUET_THROW_NOT_OK( + values_->Resize(bytes_for_values(new_values_capacity * 20), false)); + PARQUET_THROW_NOT_OK( + offset_->Resize(bytes_for_values(new_values_capacity * 4), false)); + + auto offset = reinterpret_cast(offset_->mutable_data()); + offset[0] = 0; + + values_capacity_ = new_values_capacity; + } + if (leaf_info_.HasNullableValues()) { + int64_t valid_bytes_new = BitUtil::BytesForBits(values_capacity_); + if (valid_bits_->size() < valid_bytes_new) { + int64_t valid_bytes_old = BitUtil::BytesForBits(values_written_); + PARQUET_THROW_NOT_OK(valid_bits_->Resize(valid_bytes_new, false)); + + // Avoid valgrind warnings + memset(valid_bits_->mutable_data() + valid_bytes_old, 0, + valid_bytes_new - valid_bytes_old); + } + } + } + + std::shared_ptr ReleaseValues() override { + auto result = values_; + // PARQUET_THROW_NOT_OK(result->Resize(bytes_for_values(values_written_), true)); + values_ = AllocateBuffer(this->pool_); + values_capacity_ = 0; + return result; + } + + std::shared_ptr ReleaseOffsets() { + auto result = offset_; + offset_ = AllocateBuffer(this->pool_); + bianry_length_ = 0; + return result; + } + + void ResetValues() { + if (values_written_ > 0) { + // Resize to 0, but do not shrink to fit + PARQUET_THROW_NOT_OK(valid_bits_->Resize(0, false)); + PARQUET_THROW_NOT_OK(offset_->Resize(0, false)); + PARQUET_THROW_NOT_OK(values_->Resize(0, false)); + + values_written_ = 0; + values_capacity_ = 0; + null_count_ = 0; + bianry_length_ = 0; + } + } + private: // Helper data structure for accumulating builder chunks typename EncodingTraits::Accumulator accumulator_; + + int32_t bianry_length_ = 0; + + // std::shared_ptr<::arrow::ResizableBuffer> values_; + std::shared_ptr<::arrow::ResizableBuffer> offset_; + // std::shared_ptr<::arrow::ResizableBuffer> valid_bits_; }; class ByteArrayDictionaryRecordReader : public TypedRecordReader, diff --git a/cpp/src/parquet/column_reader.h b/cpp/src/parquet/column_reader.h index a73bba6cb4e9c..535c885f53852 100644 --- a/cpp/src/parquet/column_reader.h +++ b/cpp/src/parquet/column_reader.h @@ -226,6 +226,8 @@ class RecordReader { /// \brief Pre-allocate space for data. Results in better flat read performance virtual void Reserve(int64_t num_values) = 0; + virtual void ReserveValues(int64_t capacity) {} + /// \brief Clear consumed values and repetition/definition levels as the /// result of calling ReadRecords virtual void Reset() = 0; @@ -234,6 +236,10 @@ class RecordReader { /// allocated in subsequent ReadRecords calls virtual std::shared_ptr ReleaseValues() = 0; + virtual std::shared_ptr ReleaseOffsets() { + return nullptr; + } + /// \brief Transfer filled validity bitmap buffer to caller. A new one will /// be allocated in subsequent ReadRecords calls virtual std::shared_ptr ReleaseIsValid() = 0; diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index eeeff1c8f9b7a..8c436ddd5bfb7 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -25,6 +25,7 @@ #include #include #include +#include #include "arrow/array.h" #include "arrow/array/builder_dict.h" @@ -1351,7 +1352,27 @@ class PlainByteArrayDecoder : public PlainDecoder, return result; } + int DecodeArrow_opt(int num_values, int null_count, const uint8_t* valid_bits, + int32_t* offset, + std::shared_ptr<::arrow::ResizableBuffer> & values, + int64_t valid_bits_offset, + typename EncodingTraits::Accumulator* out, + int32_t* bianry_length) { + int result = 0; + PARQUET_THROW_NOT_OK(DecodeArrowDense_opt(num_values, null_count, valid_bits, + offset, values, + valid_bits_offset, out, &result, bianry_length)); + + // PARQUET_THROW_NOT_OK(DecodeArrowDense(num_values, null_count, valid_bits, + // valid_bits_offset, out, &result)); + + return result; + } + private: + + // const int32_t* offset_arr; + Status DecodeArrowDense(int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, typename EncodingTraits::Accumulator* out, @@ -1403,6 +1424,87 @@ class PlainByteArrayDecoder : public PlainDecoder, return Status::OK(); } + Status DecodeArrowDense_opt(int num_values, int null_count, const uint8_t* valid_bits, + int32_t* offset, + std::shared_ptr<::arrow::ResizableBuffer> & values, + int64_t valid_bits_offset, + typename EncodingTraits::Accumulator* out, + int* out_values_decoded, + int32_t* bianry_length) { + // ArrowBinaryHelper helper(out); + int values_decoded = 0; + + + + // RETURN_NOT_OK(helper.builder->Reserve(num_values)); + // RETURN_NOT_OK(helper.builder->ReserveData( + // std::min(len_, helper.chunk_space_remaining))); + + auto dst_value = values->mutable_data() + (*bianry_length); + int capacity = values->capacity(); + if (ARROW_PREDICT_FALSE((len_ + *bianry_length) >= capacity)) { + values->Reserve(len_); + dst_value = values->mutable_data() + (*bianry_length); + } + + + + int i = 0; + RETURN_NOT_OK(VisitNullBitmapInline( + valid_bits, valid_bits_offset, num_values, null_count, + [&]() { + if (ARROW_PREDICT_FALSE(len_ < 4)) { + ParquetException::EofException(); + } + auto value_len = ::arrow::util::SafeLoadAs(data_); + if (ARROW_PREDICT_FALSE(value_len < 0 || value_len > INT32_MAX - 4)) { + return Status::Invalid("Invalid or corrupted value_len '", value_len, "'"); + } + auto increment = value_len + 4; + if (ARROW_PREDICT_FALSE(len_ < increment)) { + ParquetException::EofException(); + } + // if (ARROW_PREDICT_FALSE(!helper.CanFit(value_len))) { + // // This element would exceed the capacity of a chunk + // RETURN_NOT_OK(helper.PushChunk()); + // RETURN_NOT_OK(helper.builder->Reserve(num_values - i)); + // RETURN_NOT_OK(helper.builder->ReserveData( + // std::min(len_, helper.chunk_space_remaining))); + // } + // helper.UnsafeAppend(data_ + 4, value_len); + + (*bianry_length) += value_len; + offset[i+1] = offset[i] + value_len; + memcpy(dst_value, data_ + 4, value_len); + dst_value = dst_value + value_len; + + // std::cout << "*(data_ + 4) :" << *(data_ + 4) << std::endl; + // std::cout << "*(data_ + 5) " << *(data_ + 5) << std::endl; + + data_ += increment; + len_ -= increment; + + // uint8_t* address = values->mutable_data(); + // for(int i=0; i< 10; i++) { + // std::cout << "*(address+" << i << ")" << *(address+i) << std::endl; + // } + + ++values_decoded; + ++i; + return Status::OK(); + }, + [&]() { + // helper.UnsafeAppendNull(); + offset[i+1] = offset[i]; + ++i; + return Status::OK(); + })); + + num_values_ -= values_decoded; + *out_values_decoded = values_decoded; + return Status::OK(); + } + template Status DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, BuilderType* builder, diff --git a/cpp/src/parquet/encoding.h b/cpp/src/parquet/encoding.h index a3d8e012b6a52..4902da1a9a395 100644 --- a/cpp/src/parquet/encoding.h +++ b/cpp/src/parquet/encoding.h @@ -316,6 +316,15 @@ class TypedDecoder : virtual public Decoder { virtual int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, typename EncodingTraits::Accumulator* out) = 0; + + virtual int DecodeArrow_opt(int num_values, int null_count, const uint8_t* valid_bits, + int32_t* offset, + std::shared_ptr<::arrow::ResizableBuffer> & values, + int64_t valid_bits_offset, + typename EncodingTraits::Accumulator* out, + int32_t* bianry_length) { + return 0; + } /// \brief Decode into an ArrayBuilder or other accumulator ignoring nulls /// From 642bfa4ed8080ccb3196a13a4b14cc1e82399f48 Mon Sep 17 00:00:00 2001 From: zhixingheyi-tian Date: Tue, 16 Aug 2022 10:21:25 +0800 Subject: [PATCH 05/11] Complete plaindecoder code and passed test --- cpp/src/parquet/arrow/reader_internal.cc | 2 +- cpp/src/parquet/column_reader.cc | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/cpp/src/parquet/arrow/reader_internal.cc b/cpp/src/parquet/arrow/reader_internal.cc index 386b3e42ef04c..22d19a4c899b0 100644 --- a/cpp/src/parquet/arrow/reader_internal.cc +++ b/cpp/src/parquet/arrow/reader_internal.cc @@ -338,7 +338,7 @@ std::shared_ptr TransferBinaryZeroCopy(RecordReader* reader, reader->ReleaseValues()}; auto data = std::make_shared<::arrow::ArrayData>(type, reader->values_written(), buffers, reader->null_count()); - std::cout << "::arrow::MakeArray(data)->ToString():" << ::arrow::MakeArray(data)->ToString() << std::endl; + // std::cout << "::arrow::MakeArray(data)->ToString():" << ::arrow::MakeArray(data)->ToString() << std::endl; return ::arrow::MakeArray(data); } diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc index e62008d837d34..a45e4da577df1 100644 --- a/cpp/src/parquet/column_reader.cc +++ b/cpp/src/parquet/column_reader.cc @@ -1577,7 +1577,7 @@ class ByteArrayChunkedRecordReader : public TypedRecordReader, NULLPTR, (reinterpret_cast(offset_->mutable_data()) + values_written_), values_, 0, &accumulator_, &bianry_length_); DCHECK_EQ(num_decoded, values_to_read); - ResetValues(); + // ResetValues(); } // void ReadValuesSpaced(int64_t values_to_read, int64_t null_count) override { @@ -1594,7 +1594,7 @@ class ByteArrayChunkedRecordReader : public TypedRecordReader, valid_bits_->mutable_data(), (reinterpret_cast(offset_->mutable_data()) + values_written_), values_, values_written_, &accumulator_, &bianry_length_); DCHECK_EQ(num_decoded, values_to_read - null_count); - ResetValues(); + // ResetValues(); } void ReserveValues(int64_t extra_values) { From 25ad7998e9f0cdbe6625ece027a459a464e31777 Mon Sep 17 00:00:00 2001 From: zhixingheyi-tian Date: Thu, 18 Aug 2022 14:51:47 +0800 Subject: [PATCH 06/11] Add code for DictDecoder --- cpp/src/parquet/encoding.cc | 163 +++++++++++++++++++++++++++++++++++- 1 file changed, 162 insertions(+), 1 deletion(-) diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index 8c436ddd5bfb7..63b3d0637e323 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -1443,7 +1443,7 @@ class PlainByteArrayDecoder : public PlainDecoder, auto dst_value = values->mutable_data() + (*bianry_length); int capacity = values->capacity(); if (ARROW_PREDICT_FALSE((len_ + *bianry_length) >= capacity)) { - values->Reserve(len_); + values->Reserve(len_ + *bianry_length); dst_value = values->mutable_data() + (*bianry_length); } @@ -1958,6 +1958,31 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl, return result; } + int DecodeArrow_opt(int num_values, int null_count, const uint8_t* valid_bits, + int32_t* offset, + std::shared_ptr<::arrow::ResizableBuffer> & values, + int64_t valid_bits_offset, + typename EncodingTraits::Accumulator* out, + int32_t* bianry_length) { + int result = 0; + if (null_count == 0) { + PARQUET_THROW_NOT_OK(DecodeArrowDenseNonNull_opt(num_values, + offset, values, + out, &result, bianry_length)); + } else { + PARQUET_THROW_NOT_OK(DecodeArrowDense_opt(num_values, null_count, valid_bits, + offset, values, + valid_bits_offset, out, &result, bianry_length)); + + } + + // PARQUET_THROW_NOT_OK(DecodeArrowDense(num_values, null_count, valid_bits, + // valid_bits_offset, out, &result)); + + return result; + } + + private: Status DecodeArrowDense(int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, @@ -2022,6 +2047,92 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl, return Status::OK(); } + Status DecodeArrowDense_opt(int num_values, int null_count, const uint8_t* valid_bits, + int32_t* offset, + std::shared_ptr<::arrow::ResizableBuffer> & values, + int64_t valid_bits_offset, + typename EncodingTraits::Accumulator* out, + int* out_num_values, + int32_t* bianry_length) { + constexpr int32_t kBufferSize = 1024; + int32_t indices[kBufferSize]; + + // ArrowBinaryHelper helper(out); + + auto dst_value = values->mutable_data() + (*bianry_length); + + + + ::arrow::internal::BitmapReader bit_reader(valid_bits, valid_bits_offset, num_values); + + auto dict_values = reinterpret_cast(dictionary_->data()); + int values_decoded = 0; + int num_appended = 0; + while (num_appended < num_values) { + bool is_valid = bit_reader.IsSet(); + bit_reader.Next(); + + if (is_valid) { + int32_t batch_size = + std::min(kBufferSize, num_values - num_appended - null_count); + int num_indices = idx_decoder_.GetBatch(indices, batch_size); + + if (ARROW_PREDICT_FALSE(num_indices < 1)) { + return Status::Invalid("Invalid number of indices '", num_indices, "'"); + } + + int i = 0; + while (true) { + // Consume all indices + if (is_valid) { + auto idx = indices[i]; + RETURN_NOT_OK(IndexInBounds(idx)); + const auto& val = dict_values[idx]; + // if (ARROW_PREDICT_FALSE(!helper.CanFit(val.len))) { + // RETURN_NOT_OK(helper.PushChunk()); + // } + // RETURN_NOT_OK(helper.Append(val.ptr, static_cast(val.len))); + + auto value_len = val.len; + (*bianry_length) += value_len; + auto value_offset= offset[num_appended+1] = offset[num_appended] + value_len; + uint64_t capacity = values->capacity(); + if (ARROW_PREDICT_FALSE(value_offset >= capacity)) { + capacity = capacity + std::max((capacity >> 1), (uint64_t)value_len); + values->Reserve(capacity); + dst_value = values->mutable_data() + (*bianry_length); + } + memcpy(dst_value, val.ptr, static_cast(value_len)); + dst_value = dst_value + value_len; + + + ++i; + ++values_decoded; + } else { + // RETURN_NOT_OK(helper.AppendNull()); + offset[num_appended+1] = offset[num_appended]; + --null_count; + } + ++num_appended; + if (i == num_indices) { + // Do not advance the bit_reader if we have fulfilled the decode + // request + break; + } + is_valid = bit_reader.IsSet(); + bit_reader.Next(); + } + } else { + // RETURN_NOT_OK(helper.AppendNull()); + offset[num_appended+1] = offset[num_appended]; + --null_count; + ++num_appended; + } + } + *out_num_values = values_decoded; + return Status::OK(); + } + Status DecodeArrowDenseNonNull(int num_values, typename EncodingTraits::Accumulator* out, int* out_num_values) { @@ -2051,6 +2162,56 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl, return Status::OK(); } + Status DecodeArrowDenseNonNull_opt(int num_values, + int32_t* offset, + std::shared_ptr<::arrow::ResizableBuffer> & values, + typename EncodingTraits::Accumulator* out, + int* out_num_values, + int32_t* bianry_length) { + + constexpr int32_t kBufferSize = 2048; + int32_t indices[kBufferSize]; + int values_decoded = 0; + + // ArrowBinaryHelper helper(out); + auto dict_values = reinterpret_cast(dictionary_->data()); + + auto dst_value = values->mutable_data() + (*bianry_length); + int num_appended = 0; + + while (values_decoded < num_values) { + int32_t batch_size = std::min(kBufferSize, num_values - values_decoded); + int num_indices = idx_decoder_.GetBatch(indices, batch_size); + if (num_indices == 0) ParquetException::EofException(); + for (int i = 0; i < num_indices; ++i) { + auto idx = indices[i]; + RETURN_NOT_OK(IndexInBounds(idx)); + const auto& val = dict_values[idx]; + // if (ARROW_PREDICT_FALSE(!helper.CanFit(val.len))) { + // RETURN_NOT_OK(helper.PushChunk()); + // } + // RETURN_NOT_OK(helper.Append(val.ptr, static_cast(val.len))); + + auto value_len = val.len; + (*bianry_length) += value_len; + auto value_offset= offset[num_appended+1] = offset[num_appended] + value_len; + uint64_t capacity = values->capacity(); + if (ARROW_PREDICT_FALSE(value_offset >= capacity)) { + capacity = capacity + std::max((capacity >> 1), (uint64_t)value_len); + values->Reserve(capacity); + dst_value = values->mutable_data() + (*bianry_length); + } + memcpy(dst_value, val.ptr, static_cast(value_len)); + dst_value = dst_value + value_len; + + num_appended++; + } + values_decoded += num_indices; + } + *out_num_values = values_decoded; + return Status::OK(); + } + template Status DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, BuilderType* builder, From f3dce6b7df0fdab9c9f5b8767896b8fb70679131 Mon Sep 17 00:00:00 2001 From: zhixingheyi-tian Date: Fri, 19 Aug 2022 09:34:07 +0800 Subject: [PATCH 07/11] Resume CMakeLists.txt --- cpp/CMakeLists.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 822e4de7ed826..1d58528cf7059 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -48,7 +48,7 @@ if(POLICY CMP0074) endif() set(ARROW_VERSION "4.0.0") -add_compile_options(-g -O0) +#add_compile_options(-g -O0) string(REGEX MATCH "^[0-9]+\\.[0-9]+\\.[0-9]+" ARROW_BASE_VERSION "${ARROW_VERSION}") From 5c4d2539a50c35db330a739803fe651adfe8d67a Mon Sep 17 00:00:00 2001 From: zhixingheyi-tian Date: Mon, 22 Aug 2022 17:48:42 +0800 Subject: [PATCH 08/11] Fix offset validate --- cpp/src/parquet/arrow/reader.cc | 6 +++--- cpp/src/parquet/encoding.cc | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc index 016ceacb0ef62..c8329894498c4 100644 --- a/cpp/src/parquet/arrow/reader.cc +++ b/cpp/src/parquet/arrow/reader.cc @@ -105,9 +105,9 @@ class ColumnReaderImpl : public ColumnReader { std::shared_ptr<::arrow::ChunkedArray>* out) final { RETURN_NOT_OK(LoadBatch(batch_size)); RETURN_NOT_OK(BuildArray(batch_size, out)); - for (int x = 0; x < (*out)->num_chunks(); x++) { - RETURN_NOT_OK((*out)->chunk(x)->Validate()); - } + // for (int x = 0; x < (*out)->num_chunks(); x++) { + // RETURN_NOT_OK((*out)->chunk(x)->Validate()); + // } return Status::OK(); } diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index 63b3d0637e323..824fb655620bf 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -2094,7 +2094,6 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl, // RETURN_NOT_OK(helper.Append(val.ptr, static_cast(val.len))); auto value_len = val.len; - (*bianry_length) += value_len; auto value_offset= offset[num_appended+1] = offset[num_appended] + value_len; uint64_t capacity = values->capacity(); if (ARROW_PREDICT_FALSE(value_offset >= capacity)) { @@ -2102,6 +2101,7 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl, values->Reserve(capacity); dst_value = values->mutable_data() + (*bianry_length); } + (*bianry_length) += value_len; memcpy(dst_value, val.ptr, static_cast(value_len)); dst_value = dst_value + value_len; @@ -2193,7 +2193,6 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl, // RETURN_NOT_OK(helper.Append(val.ptr, static_cast(val.len))); auto value_len = val.len; - (*bianry_length) += value_len; auto value_offset= offset[num_appended+1] = offset[num_appended] + value_len; uint64_t capacity = values->capacity(); if (ARROW_PREDICT_FALSE(value_offset >= capacity)) { @@ -2201,6 +2200,7 @@ class DictByteArrayDecoderImpl : public DictDecoderImpl, values->Reserve(capacity); dst_value = values->mutable_data() + (*bianry_length); } + (*bianry_length) += value_len; memcpy(dst_value, val.ptr, static_cast(value_len)); dst_value = dst_value + value_len; From 64123b0893d9386cd36e8e5da54ce7489df668d1 Mon Sep 17 00:00:00 2001 From: zhixingheyi-tian Date: Tue, 23 Aug 2022 17:02:03 +0800 Subject: [PATCH 09/11] reduce buffer capacity --- cpp/src/parquet/column_reader.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc index a45e4da577df1..5b432967231cd 100644 --- a/cpp/src/parquet/column_reader.cc +++ b/cpp/src/parquet/column_reader.cc @@ -1602,9 +1602,9 @@ class ByteArrayChunkedRecordReader : public TypedRecordReader, UpdateCapacity(values_capacity_, values_written_, extra_values); if (new_values_capacity > values_capacity_) { PARQUET_THROW_NOT_OK( - values_->Resize(bytes_for_values(new_values_capacity * 20), false)); + values_->Resize(new_values_capacity * 20, false)); PARQUET_THROW_NOT_OK( - offset_->Resize(bytes_for_values(new_values_capacity * 4), false)); + offset_->Resize(new_values_capacity * 4, false)); auto offset = reinterpret_cast(offset_->mutable_data()); offset[0] = 0; From 494e271914e2d458c634b54f18cbbd709adb0343 Mon Sep 17 00:00:00 2001 From: zhixingheyi-tian Date: Mon, 29 Aug 2022 17:04:49 +0800 Subject: [PATCH 10/11] Reuse Buffer --- cpp/src/parquet/column_reader.cc | 13 ++++++++++--- cpp/src/parquet/column_reader.h | 4 ++++ 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc index 5b432967231cd..778300a41f4a9 100644 --- a/cpp/src/parquet/column_reader.cc +++ b/cpp/src/parquet/column_reader.cc @@ -1602,7 +1602,7 @@ class ByteArrayChunkedRecordReader : public TypedRecordReader, UpdateCapacity(values_capacity_, values_written_, extra_values); if (new_values_capacity > values_capacity_) { PARQUET_THROW_NOT_OK( - values_->Resize(new_values_capacity * 20, false)); + values_->Resize(new_values_capacity * binary_per_row_length_, false)); PARQUET_THROW_NOT_OK( offset_->Resize(new_values_capacity * 4, false)); @@ -1627,14 +1627,21 @@ class ByteArrayChunkedRecordReader : public TypedRecordReader, std::shared_ptr ReleaseValues() override { auto result = values_; // PARQUET_THROW_NOT_OK(result->Resize(bytes_for_values(values_written_), true)); - values_ = AllocateBuffer(this->pool_); + // values_ = AllocateBuffer(this->pool_); values_capacity_ = 0; return result; } std::shared_ptr ReleaseOffsets() { auto result = offset_; - offset_ = AllocateBuffer(this->pool_); + if (ARROW_PREDICT_FALSE(binary_per_row_length_ == kDefaultBinaryPerRowSzie)) { + auto offsetArr = reinterpret_cast(offset_->mutable_data()); + const auto first_offset = offsetArr[0]; + const auto last_offset = offsetArr[values_written_]; + binary_per_row_length_ = (last_offset - first_offset) / values_written_ + 1; + std::cout << "binary_per_row_length_:" << binary_per_row_length_ << std::endl; + } + // offset_ = AllocateBuffer(this->pool_); bianry_length_ = 0; return result; } diff --git a/cpp/src/parquet/column_reader.h b/cpp/src/parquet/column_reader.h index 535c885f53852..b726d75447f41 100644 --- a/cpp/src/parquet/column_reader.h +++ b/cpp/src/parquet/column_reader.h @@ -54,6 +54,8 @@ static constexpr uint32_t kDefaultMaxPageHeaderSize = 16 * 1024 * 1024; // 16 KB is the default expected page header size static constexpr uint32_t kDefaultPageHeaderSize = 16 * 1024; +static constexpr int32_t kDefaultBinaryPerRowSzie = 20; + class PARQUET_EXPORT LevelDecoder { public: LevelDecoder(); @@ -301,6 +303,8 @@ class RecordReader { int64_t levels_position_; int64_t levels_capacity_; + int64_t binary_per_row_length_ = kDefaultBinaryPerRowSzie; + std::shared_ptr<::arrow::ResizableBuffer> values_; // In the case of false, don't allocate the values buffer (when we directly read into // builder classes). From 7f515dc27d5e5ee2667af096da2227dbb9c59a63 Mon Sep 17 00:00:00 2001 From: zhixingheyi-tian Date: Mon, 29 Aug 2022 17:26:22 +0800 Subject: [PATCH 11/11] Just Test --- cpp/src/parquet/column_reader.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc index 778300a41f4a9..0836e7699b3f2 100644 --- a/cpp/src/parquet/column_reader.cc +++ b/cpp/src/parquet/column_reader.cc @@ -1602,7 +1602,7 @@ class ByteArrayChunkedRecordReader : public TypedRecordReader, UpdateCapacity(values_capacity_, values_written_, extra_values); if (new_values_capacity > values_capacity_) { PARQUET_THROW_NOT_OK( - values_->Resize(new_values_capacity * binary_per_row_length_, false)); + values_->Resize(1, false)); PARQUET_THROW_NOT_OK( offset_->Resize(new_values_capacity * 4, false));