diff --git a/cpp/examples/arrow/dataset-parquet-scan-example.cc b/cpp/examples/arrow/dataset-parquet-scan-example.cc index dc454b255b17d..8d181fc9bec24 100644 --- a/cpp/examples/arrow/dataset-parquet-scan-example.cc +++ b/cpp/examples/arrow/dataset-parquet-scan-example.cc @@ -65,7 +65,8 @@ struct Configuration { std::shared_ptr filter = ("total_amount"_ > 1000.0f).Copy(); } conf; -std::shared_ptr GetFileSystemFromUri(const std::string& uri, std::string* path) { +std::shared_ptr GetFileSystemFromUri(const std::string& uri, + std::string* path) { return fs::FileSystemFromUri(uri, path).ValueOrDie(); } diff --git a/cpp/src/arrow/dataset/dataset.cc b/cpp/src/arrow/dataset/dataset.cc index 000e97ab84d36..9ff8bdfe297d6 100644 --- a/cpp/src/arrow/dataset/dataset.cc +++ b/cpp/src/arrow/dataset/dataset.cc @@ -23,6 +23,7 @@ #include "arrow/dataset/dataset_internal.h" #include "arrow/dataset/filter.h" #include "arrow/dataset/scanner.h" +#include "arrow/util/bit_util.h" #include "arrow/util/iterator.h" #include "arrow/util/make_unique.h" @@ -32,6 +33,10 @@ namespace dataset { Fragment::Fragment(std::shared_ptr scan_options) : scan_options_(std::move(scan_options)), partition_expression_(scalar(true)) {} +const std::shared_ptr& Fragment::schema() const { + return scan_options_->schema(); +} + InMemoryFragment::InMemoryFragment( std::vector> record_batches, std::shared_ptr scan_options) @@ -99,9 +104,62 @@ FragmentIterator Source::GetFragments(std::shared_ptr scan_options) return GetFragmentsImpl(std::move(simplified_scan_options)); } +struct VectorRecordBatchGenerator : InMemorySource::RecordBatchGenerator { + explicit VectorRecordBatchGenerator(std::vector> batches) + : batches_(std::move(batches)) {} + + RecordBatchIterator Get() const final { return MakeVectorIterator(batches_); } + + std::vector> batches_; +}; + +InMemorySource::InMemorySource(std::shared_ptr schema, + std::vector> batches) + : Source(std::move(schema)), + get_batches_(new VectorRecordBatchGenerator(std::move(batches))) {} + +struct TableRecordBatchGenerator : InMemorySource::RecordBatchGenerator { + explicit TableRecordBatchGenerator(std::shared_ptr table) + : table_(std::move(table)) {} + + RecordBatchIterator Get() const final { + auto reader = std::make_shared(*table_); + auto table = table_; + return MakeFunctionIterator([reader, table] { return reader->Next(); }); + } + + std::shared_ptr
table_; +}; + +InMemorySource::InMemorySource(std::shared_ptr
table) + : Source(table->schema()), + get_batches_(new TableRecordBatchGenerator(std::move(table))) {} + FragmentIterator InMemorySource::GetFragmentsImpl( std::shared_ptr scan_options) { - return MakeVectorIterator(fragments_); + auto schema = this->schema(); + + auto create_fragment = + [scan_options, + schema](std::shared_ptr batch) -> Result> { + if (!batch->schema()->Equals(schema)) { + return Status::TypeError("yielded batch had schema ", *batch->schema(), + " which did not match InMemorySource's: ", *schema); + } + + std::vector> batches; + + auto batch_size = scan_options->batch_size; + auto n_batches = BitUtil::CeilDiv(batch->num_rows(), batch_size); + + for (int i = 0; i < n_batches; i++) { + batches.push_back(batch->Slice(batch_size * i, batch_size)); + } + + return std::make_shared(std::move(batches), scan_options); + }; + + return MakeMaybeMapIterator(std::move(create_fragment), get_batches_->Get()); } FragmentIterator TreeSource::GetFragmentsImpl(std::shared_ptr options) { diff --git a/cpp/src/arrow/dataset/dataset.h b/cpp/src/arrow/dataset/dataset.h index 33d6a8a689cd9..19896d08e435f 100644 --- a/cpp/src/arrow/dataset/dataset.h +++ b/cpp/src/arrow/dataset/dataset.h @@ -17,6 +17,7 @@ #pragma once +#include #include #include #include @@ -47,7 +48,9 @@ class ARROW_DS_EXPORT Fragment { /// scanning this fragment. May be nullptr, which indicates that no filtering /// or schema reconciliation will be performed and all partitions will be /// scanned. - std::shared_ptr scan_options() const { return scan_options_; } + const std::shared_ptr& scan_options() const { return scan_options_; } + + const std::shared_ptr& schema() const; virtual ~Fragment() = default; @@ -125,18 +128,33 @@ class ARROW_DS_EXPORT Source { std::shared_ptr partition_expression_; }; -/// \brief A Source consisting of a flat sequence of Fragments +/// \brief A Source which yields fragments wrapping a stream of record batches. +/// +/// The record batches must match the schema provided to the source at construction. class ARROW_DS_EXPORT InMemorySource : public Source { public: - explicit InMemorySource(std::shared_ptr schema, FragmentVector fragments) - : Source(std::move(schema)), fragments_(std::move(fragments)) {} + class RecordBatchGenerator { + public: + virtual ~RecordBatchGenerator() = default; + virtual RecordBatchIterator Get() const = 0; + }; + + InMemorySource(std::shared_ptr schema, + std::unique_ptr get_batches) + : Source(std::move(schema)), get_batches_(std::move(get_batches)) {} + + // Convenience constructor taking a fixed list of batches + InMemorySource(std::shared_ptr schema, + std::vector> batches); + + explicit InMemorySource(std::shared_ptr
table); FragmentIterator GetFragmentsImpl(std::shared_ptr options) override; std::string type_name() const override { return "in-memory"; } private: - FragmentVector fragments_; + std::unique_ptr get_batches_; }; /// \brief A recursive Source with child Sources. diff --git a/cpp/src/arrow/dataset/dataset_test.cc b/cpp/src/arrow/dataset/dataset_test.cc index ff2bc18655f69..a437c1a769ae9 100644 --- a/cpp/src/arrow/dataset/dataset_test.cc +++ b/cpp/src/arrow/dataset/dataset_test.cc @@ -33,6 +33,8 @@ namespace dataset { class TestInMemoryFragment : public DatasetFixtureMixin {}; +using RecordBatchVector = std::vector>; + TEST_F(TestInMemoryFragment, Scan) { constexpr int64_t kBatchSize = 1024; constexpr int64_t kNumberBatches = 16; @@ -51,21 +53,18 @@ TEST_F(TestInMemoryFragment, Scan) { class TestInMemorySource : public DatasetFixtureMixin {}; TEST_F(TestInMemorySource, GetFragments) { - constexpr int64_t kNumberFragments = 4; constexpr int64_t kBatchSize = 1024; constexpr int64_t kNumberBatches = 16; SetSchema({field("i32", int32()), field("f64", float64())}); auto batch = ConstantArrayGenerator::Zeroes(kBatchSize, schema_); - auto reader = ConstantArrayGenerator::Repeat(kNumberBatches * kNumberFragments, batch); + auto reader = ConstantArrayGenerator::Repeat(kNumberBatches, batch); - std::vector> batches{static_cast(kNumberBatches), - batch}; + RecordBatchVector batches{static_cast(kNumberBatches), batch}; auto fragment = std::make_shared(batches, options_); // It is safe to copy fragment multiple time since Scan() does not consume // the internal array. - auto source = - InMemorySource(schema_, {static_cast(kNumberFragments), fragment}); + auto source = InMemorySource(schema_, {static_cast(kNumberBatches), batch}); AssertSourceEquals(reader.get(), &source); } @@ -74,7 +73,6 @@ class TestTreeSource : public DatasetFixtureMixin {}; TEST_F(TestTreeSource, GetFragments) { constexpr int64_t kBatchSize = 1024; - constexpr int64_t kNumberBatches = 16; constexpr int64_t kChildPerNode = 2; constexpr int64_t kCompleteBinaryTreeDepth = 4; @@ -82,17 +80,13 @@ TEST_F(TestTreeSource, GetFragments) { auto batch = ConstantArrayGenerator::Zeroes(kBatchSize, schema_); auto n_leaves = 1U << kCompleteBinaryTreeDepth; - auto reader = ConstantArrayGenerator::Repeat(kNumberBatches * n_leaves, batch); - - std::vector> batches{static_cast(kNumberBatches), - batch}; - auto fragment = std::make_shared(batches, options_); + auto reader = ConstantArrayGenerator::Repeat(n_leaves, batch); // Creates a complete binary tree of depth kCompleteBinaryTreeDepth where the // leaves are InMemorySource containing kChildPerNode fragments. auto l1_leaf_source = std::make_shared( - schema_, FragmentVector{static_cast(kChildPerNode), fragment}); + schema_, RecordBatchVector{static_cast(kChildPerNode), batch}); auto l2_leaf_tree_source = std::make_shared( schema_, SourceVector{static_cast(kChildPerNode), l1_leaf_source}); @@ -109,7 +103,6 @@ TEST_F(TestTreeSource, GetFragments) { class TestDataset : public DatasetFixtureMixin {}; TEST_F(TestDataset, TrivialScan) { - constexpr int64_t kNumberFragments = 4; constexpr int64_t kNumberBatches = 16; constexpr int64_t kBatchSize = 1024; @@ -118,15 +111,13 @@ TEST_F(TestDataset, TrivialScan) { std::vector> batches{static_cast(kNumberBatches), batch}; - auto fragment = std::make_shared(batches, options_); - FragmentVector fragments{static_cast(kNumberFragments), fragment}; SourceVector sources = { - std::make_shared(schema_, fragments), - std::make_shared(schema_, fragments), + std::make_shared(schema_, batches), + std::make_shared(schema_, batches), }; - const int64_t total_batches = sources.size() * kNumberFragments * kNumberBatches; + const int64_t total_batches = sources.size() * kNumberBatches; auto reader = ConstantArrayGenerator::Repeat(total_batches, batch); ASSERT_OK_AND_ASSIGN(auto dataset, Dataset::Make(sources, schema_)); diff --git a/cpp/src/arrow/dataset/discovery_test.cc b/cpp/src/arrow/dataset/discovery_test.cc index d464b01e564db..f117e583f2850 100644 --- a/cpp/src/arrow/dataset/discovery_test.cc +++ b/cpp/src/arrow/dataset/discovery_test.cc @@ -67,7 +67,7 @@ class MockSourceFactory : public SourceFactory { Result> Finish(const std::shared_ptr& schema) override { return std::make_shared(schema, - std::vector>{}); + std::vector>{}); } protected: diff --git a/cpp/src/arrow/dataset/file_base.cc b/cpp/src/arrow/dataset/file_base.cc index cb4ab89cd4f51..fcca3fb3e6a87 100644 --- a/cpp/src/arrow/dataset/file_base.cc +++ b/cpp/src/arrow/dataset/file_base.cc @@ -40,7 +40,7 @@ Result> FileSource::Open() const { } Result FileFragment::Scan(std::shared_ptr context) { - return format_->ScanFile(source_, scan_options_, context); + return format_->ScanFile(source_, scan_options_, std::move(context)); } FileSystemSource::FileSystemSource(std::shared_ptr schema, diff --git a/cpp/src/arrow/dataset/file_base.h b/cpp/src/arrow/dataset/file_base.h index bcae386b08a2a..331a089d33fbc 100644 --- a/cpp/src/arrow/dataset/file_base.h +++ b/cpp/src/arrow/dataset/file_base.h @@ -130,7 +130,7 @@ class ARROW_DS_EXPORT FileFormat { /// \brief Open a fragment virtual Result> MakeFragment( - const FileSource& location, std::shared_ptr options) = 0; + FileSource location, std::shared_ptr options) = 0; }; /// \brief A Fragment that is stored in a file with a known format diff --git a/cpp/src/arrow/dataset/file_ipc.cc b/cpp/src/arrow/dataset/file_ipc.cc index 30474fc4c968d..d4e95700558aa 100644 --- a/cpp/src/arrow/dataset/file_ipc.cc +++ b/cpp/src/arrow/dataset/file_ipc.cc @@ -130,8 +130,8 @@ Result IpcFileFormat::ScanFile( } Result> IpcFileFormat::MakeFragment( - const FileSource& source, std::shared_ptr options) { - return std::make_shared(source, options); + FileSource source, std::shared_ptr options) { + return std::make_shared(std::move(source), std::move(options)); } } // namespace dataset diff --git a/cpp/src/arrow/dataset/file_ipc.h b/cpp/src/arrow/dataset/file_ipc.h index 716524a509ede..9d8958b6d25c3 100644 --- a/cpp/src/arrow/dataset/file_ipc.h +++ b/cpp/src/arrow/dataset/file_ipc.h @@ -19,6 +19,7 @@ #include #include +#include #include "arrow/dataset/file_base.h" #include "arrow/dataset/type_fwd.h" @@ -43,13 +44,13 @@ class ARROW_DS_EXPORT IpcFileFormat : public FileFormat { std::shared_ptr context) const override; Result> MakeFragment( - const FileSource& source, std::shared_ptr options) override; + FileSource source, std::shared_ptr options) override; }; class ARROW_DS_EXPORT IpcFragment : public FileFragment { public: - IpcFragment(const FileSource& source, std::shared_ptr options) - : FileFragment(source, std::make_shared(), options) {} + IpcFragment(FileSource source, std::shared_ptr options) + : FileFragment(std::move(source), std::make_shared(), options) {} bool splittable() const override { return true; } }; diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc index ad9ad213ea4aa..dc8a578961f22 100644 --- a/cpp/src/arrow/dataset/file_parquet.cc +++ b/cpp/src/arrow/dataset/file_parquet.cc @@ -31,6 +31,7 @@ #include "parquet/arrow/reader.h" #include "parquet/arrow/schema.h" #include "parquet/file_reader.h" +#include "parquet/properties.h" #include "parquet/statistics.h" namespace arrow { @@ -76,14 +77,119 @@ class ParquetScanTask : public ScanTask { std::shared_ptr reader_; }; +static Result> OpenReader( + const FileSource& source, parquet::ReaderProperties properties) { + ARROW_ASSIGN_OR_RAISE(auto input, source.Open()); + try { + return parquet::ParquetFileReader::Open(std::move(input), std::move(properties)); + } catch (const ::parquet::ParquetException& e) { + return Status::IOError("Could not open parquet input source '", source.path(), + "': ", e.what()); + } + + return Status::UnknownError("unknown exception caught"); +} + +static parquet::ReaderProperties MakeReaderProperties( + const ParquetFileFormat& format, MemoryPool* pool = default_memory_pool()) { + parquet::ReaderProperties properties(pool); + if (format.reader_options.use_buffered_stream) { + properties.enable_buffered_stream(); + } else { + properties.disable_buffered_stream(); + } + properties.set_buffer_size(format.reader_options.buffer_size); + properties.file_decryption_properties(format.reader_options.file_decryption_properties); + return properties; +} + +static parquet::ArrowReaderProperties MakeArrowReaderProperties( + const ParquetFileFormat& format, int64_t batch_size, + const parquet::ParquetFileReader& reader) { + parquet::ArrowReaderProperties properties(/* use_threads = */ false); + for (const std::string& name : format.reader_options.dict_columns) { + auto column_index = reader.metadata()->schema()->ColumnIndex(name); + properties.set_read_dictionary(column_index, true); + } + properties.set_batch_size(batch_size); + return properties; +} + +template +static Result GetSchemaManifest( + const M& metadata, const parquet::ArrowReaderProperties& properties) { + SchemaManifest manifest; + const std::shared_ptr& key_value_metadata = nullptr; + RETURN_NOT_OK( + SchemaManifest::Make(metadata.schema(), key_value_metadata, properties, &manifest)); + return manifest; +} + +static std::shared_ptr ColumnChunkStatisticsAsExpression( + const SchemaField& schema_field, const parquet::RowGroupMetaData& metadata) { + // For the remaining of this function, failure to extract/parse statistics + // are ignored by returning the `true` scalar. The goal is two fold. First + // avoid that an optimization break the computation. Second, allow the + // following columns to maybe succeed in extracting column statistics. + + // For now, only leaf (primitive) types are supported. + if (!schema_field.is_leaf()) { + return scalar(true); + } + + auto column_metadata = metadata.ColumnChunk(schema_field.column_index); + auto field = schema_field.field; + auto field_expr = field_ref(field->name()); + + // In case of missing statistics, return nothing. + if (!column_metadata->is_stats_set()) { + return scalar(true); + } + + auto statistics = column_metadata->statistics(); + if (statistics == nullptr) { + return scalar(true); + } + + // Optimize for corner case where all values are nulls + if (statistics->num_values() == statistics->null_count()) { + return equal(field_expr, scalar(MakeNullScalar(field->type()))); + } + + std::shared_ptr min, max; + if (!StatisticsAsScalars(*statistics, &min, &max).ok()) { + return scalar(true); + } + + return and_(greater_equal(field_expr, scalar(min)), + less_equal(field_expr, scalar(max))); +} + +static Result> RowGroupStatisticsAsExpression( + const parquet::RowGroupMetaData& metadata, + const parquet::ArrowReaderProperties& properties) { + ARROW_ASSIGN_OR_RAISE(auto manifest, GetSchemaManifest(metadata, properties)); + + ExpressionVector expressions; + for (const auto& schema_field : manifest.schema_fields) { + expressions.emplace_back(ColumnChunkStatisticsAsExpression(schema_field, metadata)); + } + + return expressions.empty() ? scalar(true) : and_(expressions); +} + // Skip RowGroups with a filter and metadata class RowGroupSkipper { public: static constexpr int kIterationDone = -1; RowGroupSkipper(std::shared_ptr metadata, + parquet::ArrowReaderProperties arrow_properties, std::shared_ptr filter) - : metadata_(std::move(metadata)), filter_(std::move(filter)), row_group_idx_(0) { + : metadata_(std::move(metadata)), + arrow_properties_(std::move(arrow_properties)), + filter_(std::move(filter)), + row_group_idx_(0) { num_row_groups_ = metadata_->num_row_groups(); } @@ -106,7 +212,7 @@ class RowGroupSkipper { private: bool CanSkip(const parquet::RowGroupMetaData& metadata) const { - auto maybe_stats_expr = RowGroupStatisticsAsExpression(metadata); + auto maybe_stats_expr = RowGroupStatisticsAsExpression(metadata, arrow_properties_); // Errors with statistics are ignored and post-filtering will apply. if (!maybe_stats_expr.ok()) { return false; @@ -118,36 +224,33 @@ class RowGroupSkipper { } std::shared_ptr metadata_; + parquet::ArrowReaderProperties arrow_properties_; std::shared_ptr filter_; int row_group_idx_; int num_row_groups_; int64_t rows_skipped_; }; -template -static Result GetSchemaManifest(const M& metadata) { - SchemaManifest manifest; - RETURN_NOT_OK(SchemaManifest::Make( - metadata.schema(), nullptr, parquet::default_arrow_reader_properties(), &manifest)); - return manifest; -} - class ParquetScanTaskIterator { public: - static Result Make( - std::shared_ptr options, std::shared_ptr context, - std::unique_ptr reader) { + static Result Make(std::shared_ptr options, + std::shared_ptr context, + std::unique_ptr reader, + parquet::ArrowReaderProperties arrow_properties) { auto metadata = reader->metadata(); - auto column_projection = InferColumnProjection(*metadata, options); + auto column_projection = InferColumnProjection(*metadata, arrow_properties, options); std::unique_ptr arrow_reader; RETURN_NOT_OK(parquet::arrow::FileReader::Make(context->pool, std::move(reader), - &arrow_reader)); + arrow_properties, &arrow_reader)); + + RowGroupSkipper skipper(std::move(metadata), std::move(arrow_properties), + options->filter); return ScanTaskIterator(ParquetScanTaskIterator( std::move(options), std::move(context), std::move(column_projection), - std::move(metadata), std::move(arrow_reader))); + std::move(skipper), std::move(arrow_reader))); } Result> Next() { @@ -166,8 +269,9 @@ class ParquetScanTaskIterator { // Compute the column projection out of an optional arrow::Schema static std::vector InferColumnProjection( const parquet::FileMetaData& metadata, + const parquet::ArrowReaderProperties& arrow_properties, const std::shared_ptr& options) { - auto maybe_manifest = GetSchemaManifest(metadata); + auto maybe_manifest = GetSchemaManifest(metadata, arrow_properties); if (!maybe_manifest.ok()) { return internal::Iota(metadata.num_columns()); } @@ -210,13 +314,12 @@ class ParquetScanTaskIterator { ParquetScanTaskIterator(std::shared_ptr options, std::shared_ptr context, - std::vector column_projection, - std::shared_ptr metadata, + std::vector column_projection, RowGroupSkipper skipper, std::unique_ptr reader) : options_(std::move(options)), context_(std::move(context)), column_projection_(std::move(column_projection)), - skipper_(std::move(metadata), options_->filter), + skipper_(std::move(skipper)), reader_(std::move(reader)) {} std::shared_ptr options_; @@ -226,10 +329,19 @@ class ParquetScanTaskIterator { std::shared_ptr reader_; }; +ParquetFileFormat::ParquetFileFormat(const parquet::ReaderProperties& reader_properties) { + reader_options.use_buffered_stream = reader_properties.is_buffered_stream_enabled(); + reader_options.buffer_size = reader_properties.buffer_size(); + reader_options.file_decryption_properties = + reader_properties.file_decryption_properties(); +} + Result ParquetFileFormat::IsSupported(const FileSource& source) const { try { ARROW_ASSIGN_OR_RAISE(auto input, source.Open()); - auto reader = parquet::ParquetFileReader::Open(input); + auto properties = MakeReaderProperties(*this); + auto reader = + parquet::ParquetFileReader::Open(std::move(input), std::move(properties)); auto metadata = reader->metadata(); return metadata != nullptr && metadata->can_decompress(); } catch (const ::parquet::ParquetInvalidOrCorruptedFileException& e) { @@ -245,11 +357,15 @@ Result ParquetFileFormat::IsSupported(const FileSource& source) const { Result> ParquetFileFormat::Inspect( const FileSource& source) const { - auto pool = default_memory_pool(); - ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(source, pool)); + auto properties = MakeReaderProperties(*this); + ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(source, std::move(properties))); + auto arrow_properties = + MakeArrowReaderProperties(*this, parquet::kArrowDefaultBatchSize, *reader); std::unique_ptr arrow_reader; - RETURN_NOT_OK(parquet::arrow::FileReader::Make(pool, std::move(reader), &arrow_reader)); + RETURN_NOT_OK(parquet::arrow::FileReader::Make(default_memory_pool(), std::move(reader), + std::move(arrow_properties), + &arrow_reader)); std::shared_ptr schema; RETURN_NOT_OK(arrow_reader->GetSchema(&schema)); @@ -259,78 +375,18 @@ Result> ParquetFileFormat::Inspect( Result ParquetFileFormat::ScanFile( const FileSource& source, std::shared_ptr options, std::shared_ptr context) const { - ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(source, context->pool)); - return ParquetScanTaskIterator::Make(options, context, std::move(reader)); -} + auto properties = MakeReaderProperties(*this, context->pool); + ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(source, std::move(properties))); -Result> ParquetFileFormat::MakeFragment( - const FileSource& source, std::shared_ptr options) { - return std::make_shared(source, options); + auto arrow_properties = MakeArrowReaderProperties(*this, options->batch_size, *reader); + return ParquetScanTaskIterator::Make(options, context, std::move(reader), + std::move(arrow_properties)); } -Result> ParquetFileFormat::OpenReader( - const FileSource& source, MemoryPool* pool) const { - ARROW_ASSIGN_OR_RAISE(auto input, source.Open()); - try { - return parquet::ParquetFileReader::Open(input); - } catch (const ::parquet::ParquetException& e) { - return Status::IOError("Could not open parquet input source '", source.path(), - "': ", e.what()); - } - - return Status::UnknownError("unknown exception caught"); -} - -static std::shared_ptr ColumnChunkStatisticsAsExpression( - const SchemaField& schema_field, const parquet::RowGroupMetaData& metadata) { - // For the remaining of this function, failure to extract/parse statistics - // are ignored by returning the `true` scalar. The goal is two fold. First - // avoid that an optimization break the computation. Second, allow the - // following columns to maybe succeed in extracting column statistics. - - // For now, only leaf (primitive) types are supported. - if (!schema_field.is_leaf()) { - return scalar(true); - } - - auto column_metadata = metadata.ColumnChunk(schema_field.column_index); - auto field = schema_field.field; - auto field_expr = field_ref(field->name()); - - // In case of missing statistics, return nothing. - if (!column_metadata->is_stats_set()) { - return scalar(true); - } - - auto statistics = column_metadata->statistics(); - if (statistics == nullptr) { - return scalar(true); - } - - // Optimize for corner case where all values are nulls - if (statistics->num_values() == statistics->null_count()) { - return equal(field_expr, scalar(MakeNullScalar(field->type()))); - } - - std::shared_ptr min, max; - if (!StatisticsAsScalars(*statistics, &min, &max).ok()) { - return scalar(true); - } - - return and_(greater_equal(field_expr, scalar(min)), - less_equal(field_expr, scalar(max))); -} - -Result> RowGroupStatisticsAsExpression( - const parquet::RowGroupMetaData& metadata) { - ARROW_ASSIGN_OR_RAISE(auto manifest, GetSchemaManifest(metadata)); - - ExpressionVector expressions; - for (const auto& schema_field : manifest.schema_fields) { - expressions.emplace_back(ColumnChunkStatisticsAsExpression(schema_field, metadata)); - } - - return expressions.empty() ? scalar(true) : and_(expressions); +Result> ParquetFileFormat::MakeFragment( + FileSource source, std::shared_ptr options) { + return std::make_shared(std::move(source), shared_from_this(), + std::move(options)); } } // namespace dataset diff --git a/cpp/src/arrow/dataset/file_parquet.h b/cpp/src/arrow/dataset/file_parquet.h index 58a51a3368675..a86bf95821a1e 100644 --- a/cpp/src/arrow/dataset/file_parquet.h +++ b/cpp/src/arrow/dataset/file_parquet.h @@ -19,6 +19,8 @@ #include #include +#include +#include #include "arrow/dataset/file_base.h" #include "arrow/dataset/type_fwd.h" @@ -28,14 +30,51 @@ namespace parquet { class ParquetFileReader; class RowGroupMetaData; class FileMetaData; +class FileDecryptionProperties; +class ReaderProperties; +class ArrowReaderProperties; } // namespace parquet namespace arrow { namespace dataset { /// \brief A FileFormat implementation that reads from Parquet files -class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat { +class ARROW_DS_EXPORT ParquetFileFormat + : public FileFormat, + public std::enable_shared_from_this { public: + ParquetFileFormat() = default; + + /// Convenience constructor which copies properties from a parquet::ReaderProperties. + /// memory_pool will be ignored. + explicit ParquetFileFormat(const parquet::ReaderProperties& reader_properties); + + struct ReaderOptions { + /// \defgroup parquet-file-format-reader-properties properties which correspond to + /// members of parquet::ReaderProperties. + /// + /// We don't embed parquet::ReaderProperties directly because we get memory_pool from + /// ScanContext at scan time and provide differing defaults. + /// + /// @{ + bool use_buffered_stream = false; + int64_t buffer_size = 1 << 13; + std::shared_ptr file_decryption_properties; + /// @} + + /// \defgroup parquet-file-format-arrow-reader-properties properties which correspond + /// to members of parquet::ArrowReaderProperties. + /// + /// We don't embed parquet::ReaderProperties directly because we get batch_size from + /// ScanOptions at scan time, and we will never pass use_threads == true (since we + /// defer parallelization of the scan). Additionally column names (rather than + /// indices) are used to indicate dictionary columns. + /// + /// @{ + std::unordered_set dict_columns; + /// @} + } reader_options; + std::string type_name() const override { return "parquet"; } Result IsSupported(const FileSource& source) const override; @@ -49,23 +88,17 @@ class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat { std::shared_ptr context) const override; Result> MakeFragment( - const FileSource& source, std::shared_ptr options) override; - - private: - Result> OpenReader( - const FileSource& source, MemoryPool* pool) const; + FileSource source, std::shared_ptr options) override; }; class ARROW_DS_EXPORT ParquetFragment : public FileFragment { public: - ParquetFragment(const FileSource& source, std::shared_ptr options) - : FileFragment(source, std::make_shared(), options) {} + ParquetFragment(FileSource source, std::shared_ptr format, + std::shared_ptr options) + : FileFragment(std::move(source), std::move(format), std::move(options)) {} bool splittable() const override { return true; } }; -Result> RowGroupStatisticsAsExpression( - const parquet::RowGroupMetaData& metadata); - } // namespace dataset } // namespace arrow diff --git a/cpp/src/arrow/dataset/file_parquet_test.cc b/cpp/src/arrow/dataset/file_parquet_test.cc index c3ebd86ed1182..0ee2a5c41c4b2 100644 --- a/cpp/src/arrow/dataset/file_parquet_test.cc +++ b/cpp/src/arrow/dataset/file_parquet_test.cc @@ -159,6 +159,7 @@ class ParquetBufferFixtureMixin : public ArrowParquetWriterMixin { class TestParquetFileFormat : public ParquetBufferFixtureMixin { protected: + std::shared_ptr format_ = std::make_shared(); std::shared_ptr opts_; std::shared_ptr ctx_ = std::make_shared(); }; @@ -168,7 +169,7 @@ TEST_F(TestParquetFileFormat, ScanRecordBatchReader) { auto source = GetFileSource(reader.get()); opts_ = ScanOptions::Make(reader->schema()); - auto fragment = std::make_shared(*source, opts_); + auto fragment = std::make_shared(*source, format_, opts_); ASSERT_OK_AND_ASSIGN(auto scan_task_it, fragment->Scan(ctx_)); int64_t row_count = 0; @@ -185,18 +186,45 @@ TEST_F(TestParquetFileFormat, ScanRecordBatchReader) { ASSERT_EQ(row_count, kNumRows); } -TEST_F(TestParquetFileFormat, OpenFailureWithRelevantError) { - auto format = ParquetFileFormat(); +TEST_F(TestParquetFileFormat, ScanRecordBatchReaderDictEncoded) { + schema_ = schema({field("utf8", utf8())}); + + auto reader = GetRecordBatchReader(); + auto source = GetFileSource(reader.get()); + + opts_ = ScanOptions::Make(reader->schema()); + + format_->reader_options.dict_columns = {"utf8"}; + ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source, opts_)); + + ASSERT_OK_AND_ASSIGN(auto scan_task_it, fragment->Scan(ctx_)); + int64_t row_count = 0; + Schema expected_schema({field("utf8", dictionary(int32(), utf8()))}); + + for (auto maybe_task : scan_task_it) { + ASSERT_OK_AND_ASSIGN(auto task, std::move(maybe_task)); + ASSERT_OK_AND_ASSIGN(auto rb_it, task->Execute()); + for (auto maybe_batch : rb_it) { + ASSERT_OK_AND_ASSIGN(auto batch, std::move(maybe_batch)); + row_count += batch->num_rows(); + AssertSchemaEqual(*batch->schema(), expected_schema, /* check_metadata = */ false); + } + } + + ASSERT_EQ(row_count, kNumRows); +} + +TEST_F(TestParquetFileFormat, OpenFailureWithRelevantError) { std::shared_ptr buf = std::make_shared(util::string_view("")); - auto result = format.Inspect(FileSource(buf)); + auto result = format_->Inspect(FileSource(buf)); EXPECT_RAISES_WITH_MESSAGE_THAT(IOError, testing::HasSubstr(""), result.status()); constexpr auto file_name = "herp/derp"; ASSERT_OK_AND_ASSIGN( auto fs, fs::internal::MockFileSystem::Make(fs::kNoTime, {fs::File(file_name)})); - result = format.Inspect({file_name, fs.get()}); + result = format_->Inspect({file_name, fs.get()}); EXPECT_RAISES_WITH_MESSAGE_THAT(IOError, testing::HasSubstr(file_name), result.status()); } @@ -215,7 +243,7 @@ TEST_F(TestParquetFileFormat, ScanRecordBatchReaderProjected) { auto reader = GetRecordBatchReader(); auto source = GetFileSource(reader.get()); - auto fragment = std::make_shared(*source, opts_); + auto fragment = std::make_shared(*source, format_, opts_); ASSERT_OK_AND_ASSIGN(auto scan_task_it, fragment->Scan(ctx_)); int64_t row_count = 0; @@ -257,7 +285,7 @@ TEST_F(TestParquetFileFormat, ScanRecordBatchReaderProjectedMissingCols) { auto source = GetFileSource({reader.get(), reader_without_i32.get(), reader_without_f64.get()}); - auto fragment = std::make_shared(*source, opts_); + auto fragment = std::make_shared(*source, format_, opts_); ASSERT_OK_AND_ASSIGN(auto scan_task_it, fragment->Scan(ctx_)); int64_t row_count = 0; @@ -277,28 +305,39 @@ TEST_F(TestParquetFileFormat, ScanRecordBatchReaderProjectedMissingCols) { TEST_F(TestParquetFileFormat, Inspect) { auto reader = GetRecordBatchReader(); auto source = GetFileSource(reader.get()); - auto format = ParquetFileFormat(); - ASSERT_OK_AND_ASSIGN(auto actual, format.Inspect(*source.get())); + ASSERT_OK_AND_ASSIGN(auto actual, format_->Inspect(*source.get())); AssertSchemaEqual(*actual, *schema_, /*check_metadata=*/false); } +TEST_F(TestParquetFileFormat, InspectDictEncoded) { + schema_ = schema({field("utf8", utf8())}); + + auto reader = GetRecordBatchReader(); + auto source = GetFileSource(reader.get()); + + format_->reader_options.dict_columns = {"utf8"}; + ASSERT_OK_AND_ASSIGN(auto actual, format_->Inspect(*source.get())); + + Schema expected_schema({field("utf8", dictionary(int32(), utf8()))}); + AssertSchemaEqual(*actual, expected_schema, /* check_metadata = */ false); +} + TEST_F(TestParquetFileFormat, IsSupported) { auto reader = GetRecordBatchReader(); auto source = GetFileSource(reader.get()); - auto format = ParquetFileFormat(); bool supported = false; std::shared_ptr buf = std::make_shared(util::string_view("")); - ASSERT_OK_AND_ASSIGN(supported, format.IsSupported(FileSource(buf))); + ASSERT_OK_AND_ASSIGN(supported, format_->IsSupported(FileSource(buf))); ASSERT_EQ(supported, false); buf = std::make_shared(util::string_view("corrupted")); - ASSERT_OK_AND_ASSIGN(supported, format.IsSupported(FileSource(buf))); + ASSERT_OK_AND_ASSIGN(supported, format_->IsSupported(FileSource(buf))); ASSERT_EQ(supported, false); - ASSERT_OK_AND_ASSIGN(supported, format.IsSupported(*source)); + ASSERT_OK_AND_ASSIGN(supported, format_->IsSupported(*source)); EXPECT_EQ(supported, true); } @@ -323,12 +362,12 @@ void CountRowsInScan(ScanTaskIterator& it, int64_t expected_rows, class TestParquetFileFormatPushDown : public TestParquetFileFormat { public: - void CountRowsAndBatchesInScan(Fragment& fragment, int64_t expected_rows, - int64_t expected_batches) { + void CountRowsAndBatchesInScan(const std::shared_ptr& fragment, + int64_t expected_rows, int64_t expected_batches) { int64_t actual_rows = 0; int64_t actual_batches = 0; - ASSERT_OK_AND_ASSIGN(auto it, fragment.Scan(ctx_)); + ASSERT_OK_AND_ASSIGN(auto it, fragment->Scan(ctx_)); for (auto maybe_scan_task : it) { ASSERT_OK_AND_ASSIGN(auto scan_task, std::move(maybe_scan_task)); ASSERT_OK_AND_ASSIGN(auto rb_it, scan_task->Execute()); @@ -364,35 +403,35 @@ TEST_F(TestParquetFileFormatPushDown, Basic) { auto source = GetFileSource(reader.get()); opts_ = ScanOptions::Make(reader->schema()); - auto fragment = std::make_shared(*source, opts_); + auto fragment = std::make_shared(*source, format_, opts_); opts_->filter = scalar(true); - CountRowsAndBatchesInScan(*fragment, kTotalNumRows, kNumRowGroups); + CountRowsAndBatchesInScan(fragment, kTotalNumRows, kNumRowGroups); for (int64_t i = 1; i <= kNumRowGroups; i++) { opts_->filter = ("i64"_ == int64_t(i)).Copy(); - CountRowsAndBatchesInScan(*fragment, i, 1); + CountRowsAndBatchesInScan(fragment, i, 1); } /* Out of bound filters should skip all RowGroups. */ opts_->filter = scalar(false); - CountRowsAndBatchesInScan(*fragment, 0, 0); + CountRowsAndBatchesInScan(fragment, 0, 0); opts_->filter = ("i64"_ == int64_t(kNumRowGroups + 1)).Copy(); - CountRowsAndBatchesInScan(*fragment, 0, 0); + CountRowsAndBatchesInScan(fragment, 0, 0); opts_->filter = ("i64"_ == int64_t(-1)).Copy(); - CountRowsAndBatchesInScan(*fragment, 0, 0); + CountRowsAndBatchesInScan(fragment, 0, 0); // No rows match 1 and 2. opts_->filter = ("i64"_ == int64_t(1) and "u8"_ == uint8_t(2)).Copy(); - CountRowsAndBatchesInScan(*fragment, 0, 0); + CountRowsAndBatchesInScan(fragment, 0, 0); opts_->filter = ("i64"_ == int64_t(2) or "i64"_ == int64_t(4)).Copy(); - CountRowsAndBatchesInScan(*fragment, 2 + 4, 2); + CountRowsAndBatchesInScan(fragment, 2 + 4, 2); opts_->filter = ("i64"_ < int64_t(6)).Copy(); - CountRowsAndBatchesInScan(*fragment, 5 * (5 + 1) / 2, 5); + CountRowsAndBatchesInScan(fragment, 5 * (5 + 1) / 2, 5); opts_->filter = ("i64"_ >= int64_t(6)).Copy(); - CountRowsAndBatchesInScan(*fragment, kTotalNumRows - (5 * (5 + 1) / 2), + CountRowsAndBatchesInScan(fragment, kTotalNumRows - (5 * (5 + 1) / 2), kNumRowGroups - 5); } diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc index 2ae4e86929a80..cd564494056c5 100644 --- a/cpp/src/arrow/dataset/scanner.cc +++ b/cpp/src/arrow/dataset/scanner.cc @@ -133,6 +133,14 @@ Status ScannerBuilder::UseThreads(bool use_threads) { return Status::OK(); } +Status ScannerBuilder::BatchSize(int64_t batch_size) { + if (batch_size <= 0) { + return Status::Invalid("BatchSize must be greater than 0, got ", batch_size); + } + options_->batch_size = batch_size; + return Status::OK(); +} + Result> ScannerBuilder::Finish() const { std::shared_ptr options; if (has_projection_ && !project_columns_.empty()) { diff --git a/cpp/src/arrow/dataset/scanner.h b/cpp/src/arrow/dataset/scanner.h index 690d1cee5c8fc..389de1690db0b 100644 --- a/cpp/src/arrow/dataset/scanner.h +++ b/cpp/src/arrow/dataset/scanner.h @@ -74,6 +74,9 @@ class ARROW_DS_EXPORT ScanOptions { // Projector for reconciling the final RecordBatch to the requested schema. RecordBatchProjector projector; + // Maximum row count for scanned batches. + int64_t batch_size = 1 << 15; + // Return a vector of fields that requires materialization. // // This is usually the union of the fields referenced in the projection and the @@ -212,6 +215,14 @@ class ARROW_DS_EXPORT ScannerBuilder { /// ThreadPool found in ScanContext; Status UseThreads(bool use_threads = true); + /// \brief Set the maximum number of rows per RecordBatch. + /// + /// \param[in] batch_size the maximum number of rows. + /// \returns An error if the number for batch is not greater than 0. + /// + /// This option provides a control limiting the memory owned by any RecordBatch. + Status BatchSize(int64_t batch_size); + /// \brief Return the constructed now-immutable Scanner object Result> Finish() const; diff --git a/cpp/src/arrow/dataset/scanner_test.cc b/cpp/src/arrow/dataset/scanner_test.cc index c683f8848c72a..d698c3edd7ce5 100644 --- a/cpp/src/arrow/dataset/scanner_test.cc +++ b/cpp/src/arrow/dataset/scanner_test.cc @@ -31,7 +31,6 @@ namespace dataset { class TestScanner : public DatasetFixtureMixin { protected: static constexpr int64_t kNumberSources = 2; - static constexpr int64_t kNumberFragments = 4; static constexpr int64_t kNumberBatches = 16; static constexpr int64_t kBatchSize = 1024; @@ -39,18 +38,16 @@ class TestScanner : public DatasetFixtureMixin { std::vector> batches{static_cast(kNumberBatches), batch}; - FragmentVector fragments{static_cast(kNumberFragments), - std::make_shared(batches, options_)}; - SourceVector sources{static_cast(kNumberSources), - std::make_shared(batch->schema(), fragments)}; + std::make_shared(batch->schema(), batches)}; return Scanner{sources, options_, ctx_}; } void AssertScannerEqualsRepetitionsOf(Scanner scanner, - std::shared_ptr batch) { - const int64_t total_batches = kNumberSources * kNumberBatches * kNumberFragments; + std::shared_ptr batch, + const int64_t total_batches = kNumberSources * + kNumberBatches) { auto expected = ConstantArrayGenerator::Repeat(total_batches, batch); // Verifies that the unified BatchReader is equivalent to flattening all the @@ -60,7 +57,6 @@ class TestScanner : public DatasetFixtureMixin { }; constexpr int64_t TestScanner::kNumberSources; -constexpr int64_t TestScanner::kNumberFragments; constexpr int64_t TestScanner::kNumberBatches; constexpr int64_t TestScanner::kBatchSize; @@ -70,6 +66,15 @@ TEST_F(TestScanner, Scan) { AssertScannerEqualsRepetitionsOf(MakeScanner(batch), batch); } +TEST_F(TestScanner, ScanWithCappedBatchSize) { + SetSchema({field("i32", int32()), field("f64", float64())}); + auto batch = ConstantArrayGenerator::Zeroes(kBatchSize, schema_); + options_->batch_size = kBatchSize / 2; + auto expected = batch->Slice(kBatchSize / 2); + AssertScannerEqualsRepetitionsOf(MakeScanner(batch), expected, + kNumberSources * kNumberBatches * 2); +} + TEST_F(TestScanner, FilteredScan) { SetSchema({field("f64", float64())}); @@ -122,8 +127,8 @@ TEST_F(TestScanner, MaterializeMissingColumn) { TEST_F(TestScanner, ToTable) { SetSchema({field("i32", int32()), field("f64", float64())}); auto batch = ConstantArrayGenerator::Zeroes(kBatchSize, schema_); - std::vector> batches{ - kNumberBatches * kNumberFragments * kNumberSources, batch}; + std::vector> batches{kNumberBatches * kNumberSources, + batch}; std::shared_ptr
expected; ASSERT_OK(Table::FromRecordBatches(batches, &expected)); diff --git a/cpp/src/arrow/dataset/test_util.h b/cpp/src/arrow/dataset/test_util.h index bb7355aa3f87b..ef1f76758e835 100644 --- a/cpp/src/arrow/dataset/test_util.h +++ b/cpp/src/arrow/dataset/test_util.h @@ -196,7 +196,7 @@ class DummyFileFormat : public FileFormat { } inline Result> MakeFragment( - const FileSource& location, std::shared_ptr options) override; + FileSource source, std::shared_ptr options) override; protected: std::shared_ptr schema_; @@ -211,7 +211,7 @@ class DummyFragment : public FileFragment { }; Result> DummyFileFormat::MakeFragment( - const FileSource& source, std::shared_ptr options) { + FileSource source, std::shared_ptr options) { return std::make_shared(source, options); } @@ -251,7 +251,7 @@ class JSONRecordBatchFileFormat : public FileFormat { } inline Result> MakeFragment( - const FileSource& location, std::shared_ptr options) override; + FileSource source, std::shared_ptr options) override; protected: SchemaResolver resolver_; @@ -268,7 +268,7 @@ class JSONRecordBatchFragment : public FileFragment { }; Result> JSONRecordBatchFileFormat::MakeFragment( - const FileSource& source, std::shared_ptr options) { + FileSource source, std::shared_ptr options) { return std::make_shared(source, resolver_(source), options); } diff --git a/cpp/src/arrow/status.cc b/cpp/src/arrow/status.cc index 785db45975227..480bbd3e46809 100644 --- a/cpp/src/arrow/status.cc +++ b/cpp/src/arrow/status.cc @@ -47,9 +47,12 @@ std::string Status::CodeAsString() const { if (state_ == nullptr) { return "OK"; } + return CodeAsString(code()); +} +std::string Status::CodeAsString(StatusCode code) { const char* type; - switch (code()) { + switch (code) { case StatusCode::OK: type = "OK"; break; diff --git a/cpp/src/arrow/status.h b/cpp/src/arrow/status.h index df3ea6b9af657..dbe1b7e6e2858 100644 --- a/cpp/src/arrow/status.h +++ b/cpp/src/arrow/status.h @@ -302,6 +302,7 @@ class ARROW_EXPORT Status : public util::EqualityComparable, /// \brief Return a string representation of the status code, without the message /// text or POSIX code information. std::string CodeAsString() const; + static std::string CodeAsString(StatusCode); /// \brief Return the StatusCode value attached to this status. StatusCode code() const { return ok() ? StatusCode::OK : state_->code; } diff --git a/cpp/src/arrow/testing/generator.cc b/cpp/src/arrow/testing/generator.cc index 007321cb7f1cf..41c1f752160f3 100644 --- a/cpp/src/arrow/testing/generator.cc +++ b/cpp/src/arrow/testing/generator.cc @@ -36,9 +36,9 @@ namespace arrow { template ::CType, typename BuilderType = typename TypeTraits::BuilderType> -static inline std::shared_ptr ConstantArray(int64_t size, CType value = 0) { +static inline std::shared_ptr ConstantArray(int64_t size, CType value) { auto type = TypeTraits::type_singleton(); - auto builder_fn = [](BuilderType* builder) { builder->UnsafeAppend(CType(0)); }; + auto builder_fn = [&](BuilderType* builder) { builder->UnsafeAppend(value); }; return ArrayFromBuilderVisitor(type, size, builder_fn).ValueOrDie(); } @@ -90,4 +90,9 @@ std::shared_ptr ConstantArrayGenerator::Float64(int64_t size, return ConstantArray(size, value); } +std::shared_ptr ConstantArrayGenerator::String(int64_t size, + std::string value) { + return ConstantArray(size, value); +} + } // namespace arrow diff --git a/cpp/src/arrow/testing/generator.h b/cpp/src/arrow/testing/generator.h index e67bd11dcfea0..b43cec1d5c3b9 100644 --- a/cpp/src/arrow/testing/generator.h +++ b/cpp/src/arrow/testing/generator.h @@ -19,6 +19,7 @@ #include #include +#include #include #include "arrow/record_batch.h" @@ -118,6 +119,14 @@ class ARROW_EXPORT ConstantArrayGenerator { /// \return a generated Array static std::shared_ptr Float64(int64_t size, double value = 0); + /// \brief Generates a constant StringArray + /// + /// \param[in] size the size of the array to generate + /// \param[in] value to repeat + /// + /// \return a generated Array + static std::shared_ptr String(int64_t size, std::string value = ""); + template static std::shared_ptr Numeric(int64_t size, CType value = 0) { switch (ArrowType::type_id) { @@ -179,6 +188,8 @@ class ARROW_EXPORT ConstantArrayGenerator { return Float32(size); case Type::DOUBLE: return Float64(size); + case Type::STRING: + return String(size); default: return nullptr; } diff --git a/cpp/src/arrow/type.h b/cpp/src/arrow/type.h index 40a954bb85fc4..cee62a6a90432 100644 --- a/cpp/src/arrow/type.h +++ b/cpp/src/arrow/type.h @@ -15,8 +15,7 @@ // specific language governing permissions and limitations // under the License. -#ifndef ARROW_TYPE_H -#define ARROW_TYPE_H +#pragma once #include #include @@ -1790,5 +1789,3 @@ Result> UnifySchemas( Field::MergeOptions field_merge_options = Field::MergeOptions::Defaults()); } // namespace arrow - -#endif // ARROW_TYPE_H diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc index 52ec7d70aad72..31e199cce98d1 100644 --- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc +++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc @@ -2689,9 +2689,9 @@ void TryReadDataFile(const std::string& path, s = arrow_reader->ReadTable(&table); } - ASSERT_TRUE(s.code() == expected_code) - << "Expected reading file to return " - << arrow::Status(expected_code, "").CodeAsString() << ", but got " << s.ToString(); + ASSERT_EQ(s.code(), expected_code) + << "Expected reading file to return " << arrow::Status::CodeAsString(expected_code) + << ", but got " << s.ToString(); } TEST(TestArrowReaderAdHoc, Int96BadMemoryAccess) { diff --git a/cpp/src/parquet/arrow/reader.h b/cpp/src/parquet/arrow/reader.h index 5d97048a36804..90d485e65aeaa 100644 --- a/cpp/src/parquet/arrow/reader.h +++ b/cpp/src/parquet/arrow/reader.h @@ -15,8 +15,7 @@ // specific language governing permissions and limitations // under the License. -#ifndef PARQUET_ARROW_READER_H -#define PARQUET_ARROW_READER_H +#pragma once #include #include @@ -324,5 +323,3 @@ ::arrow::Status FuzzReader(const uint8_t* data, int64_t size); } // namespace internal } // namespace arrow } // namespace parquet - -#endif // PARQUET_ARROW_READER_H diff --git a/cpp/src/parquet/file_reader.cc b/cpp/src/parquet/file_reader.cc index 57f39f89fffe3..b5b63b9f20ba2 100644 --- a/cpp/src/parquet/file_reader.cc +++ b/cpp/src/parquet/file_reader.cc @@ -243,7 +243,7 @@ class SerializedFile : public ParquetFileReader::Contents { ParseUnencryptedFileMetadata(footer_buffer, footer_read_size, &metadata_buffer, &metadata_len, &read_metadata_len); - auto file_decryption_properties = properties_.file_decryption_properties(); + auto file_decryption_properties = properties_.file_decryption_properties().get(); if (!file_metadata_->is_encryption_algorithm_set()) { // Non encrypted file. if (file_decryption_properties != nullptr) { if (!file_decryption_properties->plaintext_files_allowed()) { @@ -341,7 +341,7 @@ void SerializedFile::ParseMetaDataOfEncryptedFileWithEncryptedFooter( std::to_string(crypto_metadata_buffer->size()) + " bytes)"); } } - auto file_decryption_properties = properties_.file_decryption_properties(); + auto file_decryption_properties = properties_.file_decryption_properties().get(); if (file_decryption_properties == nullptr) { throw ParquetException( "Could not read encrypted metadata, no decryption found in reader's properties"); diff --git a/cpp/src/parquet/file_reader.h b/cpp/src/parquet/file_reader.h index 65504bf568be4..fec6219e79e71 100644 --- a/cpp/src/parquet/file_reader.h +++ b/cpp/src/parquet/file_reader.h @@ -15,8 +15,7 @@ // specific language governing permissions and limitations // under the License. -#ifndef PARQUET_FILE_READER_H -#define PARQUET_FILE_READER_H +#pragma once #include #include @@ -137,5 +136,3 @@ int64_t ScanFileContents(std::vector columns, const int32_t column_batch_si ParquetFileReader* reader); } // namespace parquet - -#endif // PARQUET_FILE_READER_H diff --git a/cpp/src/parquet/metadata.h b/cpp/src/parquet/metadata.h index c4ef07c60e6e5..1abfd895d1ab8 100644 --- a/cpp/src/parquet/metadata.h +++ b/cpp/src/parquet/metadata.h @@ -15,8 +15,7 @@ // specific language governing permissions and limitations // under the License. -#ifndef PARQUET_FILE_METADATA_H -#define PARQUET_FILE_METADATA_H +#pragma once #include #include @@ -396,5 +395,3 @@ class PARQUET_EXPORT FileMetaDataBuilder { PARQUET_EXPORT std::string ParquetVersionToString(ParquetVersion::type ver); } // namespace parquet - -#endif // PARQUET_FILE_METADATA_H diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h index 2c4c86fc3bd11..9ee1a07f3e427 100644 --- a/cpp/src/parquet/properties.h +++ b/cpp/src/parquet/properties.h @@ -15,8 +15,7 @@ // specific language governing permissions and limitations // under the License. -#ifndef PARQUET_COLUMN_PROPERTIES_H -#define PARQUET_COLUMN_PROPERTIES_H +#pragma once #include #include @@ -69,8 +68,8 @@ class PARQUET_EXPORT ReaderProperties { file_decryption_properties_ = std::move(decryption); } - FileDecryptionProperties* file_decryption_properties() { - return file_decryption_properties_.get(); + const std::shared_ptr& file_decryption_properties() const { + return file_decryption_properties_; } private: @@ -721,5 +720,3 @@ PARQUET_EXPORT std::shared_ptr default_arrow_writer_properties(); } // namespace parquet - -#endif // PARQUET_COLUMN_PROPERTIES_H diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 39089c9c54407..bea40e84d004a 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -73,10 +73,59 @@ cdef class FileFormat: return self.wrapped +cdef class ParquetFileFormatReaderOptions: + cdef: + CParquetFileFormatReaderOptions* options + + def __init__(self, ParquetFileFormat fmt): + self.options = &fmt.parquet_format.reader_options + + @property + def use_buffered_stream(self): + """Read files through buffered input streams rather than + loading entire row groups at once. This may be enabled to + reduce memory overhead. Disabled by default.""" + return self.options.use_buffered_stream + + @use_buffered_stream.setter + def use_buffered_stream(self, bint value): + self.options.use_buffered_stream = value + + @property + def buffer_size(self): + """Size of buffered stream, if enabled. Default is 8KB.""" + return self.options.buffer_size + + @buffer_size.setter + def buffer_size(self, int value): + self.options.buffer_size = value + + @property + def dict_columns(self): + """Names of columns which should be read as dictionaries.""" + return self.options.dict_columns + + @dict_columns.setter + def dict_columns(self, values): + self.options.dict_columns.clear() + for value in set(values): + self.options.dict_columns.insert(tobytes(value)) + + cdef class ParquetFileFormat(FileFormat): - def __init__(self): - self.init(shared_ptr[CFileFormat](new CParquetFileFormat())) + cdef: + CParquetFileFormat* parquet_format + + def __init__(self, dict reader_options=dict()): + self.init( make_shared[CParquetFileFormat]()) + self.parquet_format = self.wrapped.get() + for name, value in reader_options.items(): + setattr(self.reader_options, name, value) + + @property + def reader_options(self): + return ParquetFileFormatReaderOptions(self) cdef class IpcFileFormat(FileFormat): @@ -841,7 +890,7 @@ cdef class Dataset: return scanner.scan() def to_batches(self, columns=None, filter=None, - MemoryPool memory_pool=None): + batch_size=32*2**10, MemoryPool memory_pool=None): """Read the dataset as materialized record batches. Builds a scan operation against the dataset and sequentially executes @@ -863,6 +912,10 @@ cdef class Dataset: partition information or internal metadata found in the data source, e.g. Parquet statistics. Otherwise filters the loaded RecordBatches before yielding them. + batch_size : int, default 32K + The maximum row count for scanned record batches. If scanned + record batches are overflowing memory then this method can be + called to reduce their size. memory_pool : MemoryPool, default None For memory allocations, if required. If not specified, uses the default pool. @@ -872,13 +925,13 @@ cdef class Dataset: record_batches : iterator of RecordBatch """ scanner = Scanner(self, columns=columns, filter=filter, - memory_pool=memory_pool) + batch_size=batch_size, memory_pool=memory_pool) for task in scanner.scan(): for batch in task.execute(): yield batch def to_table(self, columns=None, filter=None, use_threads=True, - MemoryPool memory_pool=None): + batch_size=32*2**10, MemoryPool memory_pool=None): """Read the dataset to an arrow table. Note that this method reads all the selected data from the dataset @@ -903,6 +956,10 @@ cdef class Dataset: use_threads : boolean, default True If enabled, then maximum paralellism will be used determined by the number of available CPU cores. + batch_size : int, default 32K + The maximum row count for scanned record batches. If scanned + record batches are overflowing memory then this method can be + called to reduce their size. memory_pool : MemoryPool, default None For memory allocations, if required. If not specified, uses the default pool. @@ -912,7 +969,8 @@ cdef class Dataset: table : Table instance """ scanner = Scanner(self, columns=columns, filter=filter, - use_threads=use_threads, memory_pool=memory_pool) + use_threads=use_threads, batch_size=batch_size, + memory_pool=memory_pool) return scanner.to_table() @property @@ -1006,6 +1064,10 @@ cdef class Scanner: use_threads : boolean, default True If enabled, then maximum paralellism will be used determined by the number of available CPU cores. + batch_size : int, default 32K + The maximum row count for scanned record batches. If scanned + record batches are overflowing memory then this method can be + called to reduce their size. memory_pool : MemoryPool, default None For memory allocations, if required. If not specified, uses the default pool. @@ -1017,7 +1079,7 @@ cdef class Scanner: def __init__(self, Dataset dataset, list columns=None, Expression filter=None, bint use_threads=True, - MemoryPool memory_pool=None): + int batch_size=32*2**10, MemoryPool memory_pool=None): cdef: shared_ptr[CScanContext] context shared_ptr[CScannerBuilder] builder @@ -1048,6 +1110,8 @@ cdef class Scanner: if use_threads is not None: check_status(builder.get().UseThreads(use_threads)) + check_status(builder.get().BatchSize(batch_size)) + # instantiate the scanner object scanner = GetResultValue(builder.get().Finish()) self.init(scanner) diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py index 1a35542d3bce2..6972420bfa6b0 100644 --- a/python/pyarrow/dataset.py +++ b/python/pyarrow/dataset.py @@ -293,7 +293,7 @@ def dataset(sources, filesystem=None, partitioning=None, format=None): case, the additional keywords will be ignored). filesystem : FileSystem, default None By default will be inferred from the path. - partitioning : Partitioning(Factory), str, list of str + partitioning : Partitioning, PartitioningFactory, str, list of str The partitioning scheme specified with the ``partitioning()`` function. A flavor string can be used as shortcut, and with a list of field names a DirectionaryPartitioning will be inferred. diff --git a/python/pyarrow/includes/libarrow_dataset.pxd b/python/pyarrow/includes/libarrow_dataset.pxd index e9952f4aec5d3..3c9dc100179e5 100644 --- a/python/pyarrow/includes/libarrow_dataset.pxd +++ b/python/pyarrow/includes/libarrow_dataset.pxd @@ -54,13 +54,13 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: cdef cppclass CExpression "arrow::dataset::Expression": CExpression(CExpressionType type) - c_bool Equals(const CExpression& other) const - c_bool Equals(const shared_ptr[CExpression]& other) const + c_bool Equals(const CExpression & other) const + c_bool Equals(const shared_ptr[CExpression] & other) const c_bool IsNull() const - CResult[shared_ptr[CDataType]] Validate(const CSchema& schema) const - shared_ptr[CExpression] Assume(const CExpression& given) const + CResult[shared_ptr[CDataType]] Validate(const CSchema & schema) const + shared_ptr[CExpression] Assume(const CExpression & given) const shared_ptr[CExpression] Assume( - const shared_ptr[CExpression]& given) const + const shared_ptr[CExpression] & given) const c_string ToString() const CExpressionType type() const shared_ptr[CExpression] Copy() const @@ -70,17 +70,17 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: cdef cppclass CUnaryExpression "arrow::dataset::UnaryExpression"( CExpression): - const shared_ptr[CExpression]& operand() const + const shared_ptr[CExpression] & operand() const cdef cppclass CBinaryExpression "arrow::dataset::BinaryExpression"( CExpression): - const shared_ptr[CExpression]& left_operand() const - const shared_ptr[CExpression]& right_operand() const + const shared_ptr[CExpression] & left_operand() const + const shared_ptr[CExpression] & right_operand() const cdef cppclass CScalarExpression "arrow::dataset::ScalarExpression"( CExpression): - CScalarExpression(const shared_ptr[CScalar]& value) - const shared_ptr[CScalar]& value() const + CScalarExpression(const shared_ptr[CScalar] & value) + const shared_ptr[CScalar] & value() const cdef cppclass CFieldExpression "arrow::dataset::FieldExpression"( CExpression): @@ -123,30 +123,28 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: cdef shared_ptr[CNotExpression] CMakeNotExpression "arrow::dataset::not_"( shared_ptr[CExpression] operand) cdef shared_ptr[CExpression] CMakeAndExpression "arrow::dataset::and_"( - const CExpressionVector& subexpressions) + const CExpressionVector & subexpressions) cdef shared_ptr[CExpression] CMakeOrExpression "arrow::dataset::or_"( - const CExpressionVector& subexpressions) + const CExpressionVector & subexpressions) cdef CResult[shared_ptr[CExpression]] CInsertImplicitCasts \ "arrow::dataset::InsertImplicitCasts"( - const CExpression&, const CSchema&) + const CExpression &, const CSchema&) cdef cppclass CFilter "arrow::dataset::Filter": pass - cdef cppclass CWriteOptions "arrow::dataset::WriteOptions": - pass - cdef cppclass CScanOptions "arrow::dataset::ScanOptions": shared_ptr[CExpression] filter shared_ptr[CSchema] schema c_bool use_threads + int64_t batch_size @staticmethod shared_ptr[CScanOptions] Defaults() cdef cppclass CScanContext "arrow::dataset::ScanContext": - CMemoryPool* pool + CMemoryPool * pool cdef cppclass CScanTask" arrow::dataset::ScanTask": CResult[CRecordBatchIterator] Execute() @@ -161,10 +159,11 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: cdef cppclass CScannerBuilder "arrow::dataset::ScannerBuilder": CScannerBuilder(shared_ptr[CDataset], shared_ptr[CScanContext] scan_context) - CStatus Project(const vector[c_string]& columns) - CStatus Filter(const CExpression& filter) + CStatus Project(const vector[c_string] & columns) + CStatus Filter(const CExpression & filter) CStatus Filter(shared_ptr[CExpression] filter) CStatus UseThreads(c_bool use_threads) + CStatus BatchSize(int64_t batch_size) CResult[shared_ptr[CScanner]] Finish() shared_ptr[CSchema] schema() const @@ -186,8 +185,8 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: cdef cppclass CSource "arrow::dataset::Source": CFragmentIterator GetFragments(shared_ptr[CScanOptions] options) - const shared_ptr[CSchema]& schema() - const shared_ptr[CExpression]& partition_expression() + const shared_ptr[CSchema] & schema() + const shared_ptr[CExpression] & partition_expression() c_string type_name() ctypedef vector[shared_ptr[CSource]] CSourceVector \ @@ -204,59 +203,52 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: CResult[shared_ptr[CScannerBuilder]] NewScanWithContext "NewScan"( shared_ptr[CScanContext] context) CResult[shared_ptr[CScannerBuilder]] NewScan() - const CSourceVector& sources() + const CSourceVector & sources() shared_ptr[CSchema] schema() cdef cppclass CDatasetFactory "arrow::dataset::DatasetFactory": @staticmethod CResult[shared_ptr[CDatasetFactory]] Make( vector[shared_ptr[CSourceFactory]] factories) - const vector[shared_ptr[CSourceFactory]]& factories() const + const vector[shared_ptr[CSourceFactory]] & factories() const CResult[vector[shared_ptr[CSchema]]] InspectSchemas() CResult[shared_ptr[CSchema]] Inspect() CResult[shared_ptr[CDataset]] FinishWithSchema "Finish"( - const shared_ptr[CSchema]& schema) + const shared_ptr[CSchema] & schema) CResult[shared_ptr[CDataset]] Finish() - cdef cppclass CFileScanOptions "arrow::dataset::FileScanOptions"( - CScanOptions): - c_string file_type() - cdef cppclass CFileSource "arrow::dataset::FileSource": - CFileSource(c_string path, CFileSystem* filesystem, + CFileSource(c_string path, CFileSystem * filesystem, CompressionType compression) - c_bool operator==(const CFileSource& other) const + c_bool operator == (const CFileSource & other) const CompressionType compression() c_string path() - CFileSystem* filesystem() + CFileSystem * filesystem() shared_ptr[CBuffer] buffer() - CStatus Open(shared_ptr[CRandomAccessFile]* out) - - cdef cppclass CFileWriteOptions "arrow::dataset::WriteOptions"( - CWriteOptions): - c_string file_type() + CStatus Open(shared_ptr[CRandomAccessFile] * out) cdef cppclass CFileFormat "arrow::dataset::FileFormat": c_string type_name() - CStatus IsSupported(const CFileSource& source, c_bool* supported) const - CStatus Inspect(const CFileSource& source, - shared_ptr[CSchema]* out) const - CStatus ScanFile(const CFileSource& source, + CStatus IsSupported(const CFileSource & source, + c_bool * supported) const + CStatus Inspect(const CFileSource & source, + shared_ptr[CSchema] * out) const + CStatus ScanFile(const CFileSource & source, shared_ptr[CScanOptions] scan_options, shared_ptr[CScanContext] scan_context, - CScanTaskIterator* out) const - CStatus MakeFragment(const CFileSource& location, + CScanTaskIterator * out) const + CStatus MakeFragment(const CFileSource & location, shared_ptr[CScanOptions] opts, - shared_ptr[CFragment]* out) + shared_ptr[CFragment] * out) cdef cppclass CFileFragment "arrow::dataset::FileFragment"( CFragment): - CFileFragment(const CFileSource& source, + CFileFragment(const CFileSource & source, shared_ptr[CFileFormat] format, shared_ptr[CScanOptions] scan_options) CStatus Scan(shared_ptr[CScanContext] scan_context, - shared_ptr[CScanTaskIterator]* out) - const CFileSource& source() + shared_ptr[CScanTaskIterator] * out) + const CFileSource & source() shared_ptr[CFileFormat] format() shared_ptr[CScanOptions] scan_options() @@ -275,21 +267,19 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: shared_ptr[CFragmentIterator] GetFragments( shared_ptr[CScanOptions] options) - cdef cppclass CParquetScanOptions "arrow::dataset::ParquetScanOptions"( - CFileScanOptions): - c_string file_type() - - cdef cppclass CParquetWriterOptions "arrow::dataset::ParquetWriterOptions"( - CFileWriteOptions): - c_string file_type() + cdef cppclass CParquetFileFormatReaderOptions \ + "arrow::dataset::ParquetFileFormat::ReaderOptions": + c_bool use_buffered_stream + int64_t buffer_size + unordered_set[c_string] dict_columns cdef cppclass CParquetFileFormat "arrow::dataset::ParquetFileFormat"( CFileFormat): - pass + CParquetFileFormatReaderOptions reader_options cdef cppclass CParquetFragment "arrow::dataset::ParquetFragment"( CFileFragment): - CParquetFragment(const CFileSource& source, + CParquetFragment(const CFileSource & source, shared_ptr[CScanOptions] options) c_bool splittable() @@ -299,8 +289,8 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: cdef cppclass CPartitioning "arrow::dataset::Partitioning": c_string type_name() const - CResult[shared_ptr[CExpression]] Parse(const c_string& path) const - const shared_ptr[CSchema]& schema() + CResult[shared_ptr[CExpression]] Parse(const c_string & path) const + const shared_ptr[CSchema] & schema() cdef cppclass CPartitioningFactory "arrow::dataset::PartitioningFactory": pass @@ -322,8 +312,8 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: "arrow::dataset::PartitioningOrFactory": CPartitioningOrFactory(shared_ptr[CPartitioning]) CPartitioningOrFactory(shared_ptr[CPartitioningFactory]) - CPartitioningOrFactory& operator=(shared_ptr[CPartitioning]) - CPartitioningOrFactory& operator=( + CPartitioningOrFactory & operator = (shared_ptr[CPartitioning]) + CPartitioningOrFactory & operator = ( shared_ptr[CPartitioningFactory]) shared_ptr[CPartitioning] partitioning() const shared_ptr[CPartitioningFactory] factory() const diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index f52c1a61057b6..6bf68a84f6d5a 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -75,11 +75,13 @@ def mockfs(): data = [ list(range(5)), - list(map(float, range(5))) + list(map(float, range(5))), + list(map(str, range(5))) ] schema = pa.schema([ pa.field('i64', pa.int64()), - pa.field('f64', pa.float64()) + pa.field('f64', pa.float64()), + pa.field('str', pa.string()) ]) batch = pa.record_batch(data, schema=schema) table = pa.Table.from_batches([batch]) @@ -431,7 +433,7 @@ def test_expression_ergonomics(): ] ]) def test_file_system_factory(mockfs, paths_or_selector): - format = ds.ParquetFileFormat() + format = ds.ParquetFileFormat(reader_options=dict(dict_columns={"str"})) options = ds.FileSystemFactoryOptions('subdir') options.partitioning = ds.DirectoryPartitioning( @@ -464,23 +466,27 @@ def test_file_system_factory(mockfs, paths_or_selector): scanner = ds.Scanner(dataset) expected_i64 = pa.array([0, 1, 2, 3, 4], type=pa.int64()) expected_f64 = pa.array([0, 1, 2, 3, 4], type=pa.float64()) + expected_str = pa.DictionaryArray.from_arrays( + pa.array([0, 1, 2, 3, 4], type=pa.int32()), + pa.array("0 1 2 3 4".split(), type=pa.string())) for task, group, key in zip(scanner.scan(), [1, 2], ['xxx', 'yyy']): expected_group_column = pa.array([group] * 5, type=pa.int32()) expected_key_column = pa.array([key] * 5, type=pa.string()) for batch in task.execute(): - assert batch.num_columns == 4 + assert batch.num_columns == 5 assert batch[0].equals(expected_i64) assert batch[1].equals(expected_f64) - assert batch[2].equals(expected_group_column) - assert batch[3].equals(expected_key_column) + assert batch[2].equals(expected_str) + assert batch[3].equals(expected_group_column) + assert batch[4].equals(expected_key_column) table = dataset.to_table() assert isinstance(table, pa.Table) assert len(table) == 10 - assert table.num_columns == 4 + assert table.num_columns == 5 -def test_paritioning_factory(mockfs): +def test_partitioning_factory(mockfs): paths_or_selector = fs.FileSelector('subdir', recursive=True) format = ds.ParquetFileFormat() @@ -497,6 +503,7 @@ def test_paritioning_factory(mockfs): expected_schema = pa.schema([ ("i64", pa.int64()), ("f64", pa.float64()), + ("str", pa.string()), ("group", pa.int32()), ("key", pa.string()), ]) diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R index 052dc6d0c6597..46c0b982ffa78 100644 --- a/r/R/arrowExports.R +++ b/r/R/arrowExports.R @@ -360,8 +360,8 @@ dataset___FileFormat__type_name <- function(format){ .Call(`_arrow_dataset___FileFormat__type_name` , format) } -dataset___ParquetFileFormat__Make <- function(){ - .Call(`_arrow_dataset___ParquetFileFormat__Make` ) +dataset___ParquetFileFormat__Make <- function(use_buffered_stream, buffer_size, dict_columns){ + .Call(`_arrow_dataset___ParquetFileFormat__Make` , use_buffered_stream, buffer_size, dict_columns) } dataset___IpcFileFormat__Make <- function(){ @@ -456,6 +456,10 @@ dataset___ScannerBuilder__UseThreads <- function(sb, threads){ invisible(.Call(`_arrow_dataset___ScannerBuilder__UseThreads` , sb, threads)) } +dataset___ScannerBuilder__BatchSize <- function(sb, batch_size){ + invisible(.Call(`_arrow_dataset___ScannerBuilder__BatchSize` , sb, batch_size)) +} + dataset___ScannerBuilder__schema <- function(sb){ .Call(`_arrow_dataset___ScannerBuilder__schema` , sb) } diff --git a/r/R/dataset.R b/r/R/dataset.R index b9a6526809926..078f48246ecc0 100644 --- a/r/R/dataset.R +++ b/r/R/dataset.R @@ -254,7 +254,11 @@ SourceFactory$create <- function(path, recursive = recursive ) - format <- FileFormat$create(match.arg(format)) + if (is.character(format)) { + format <- FileFormat$create(match.arg(format)) + } else { + assert_is(format, "FileFormat") + } if (!is.null(partitioning)) { if (inherits(partitioning, "Schema")) { @@ -351,6 +355,12 @@ FileSystemSourceFactory$create <- function(filesystem, #' Currently supported options are "parquet", "arrow", and "ipc" (an alias for #' the Arrow file format) #' * `...`: Additional format-specific options +#' format="parquet": +#' * `use_buffered_stream`: Read files through buffered input streams rather than +#' loading entire row groups at once. This may be enabled +#' to reduce memory overhead. Disabled by default. +#' * `buffer_size`: Size of buffered stream, if enabled. Default is 8KB. +#' * `dict_columns`: Names of columns which should be read as dictionaries. #' #' It returns the appropriate subclass of `FileFormat` (e.g. `ParquetFileFormat`) #' @rdname FileFormat @@ -376,10 +386,8 @@ FileFormat <- R6Class("FileFormat", inherit = Object, ) ) FileFormat$create <- function(format, ...) { - # TODO: pass list(...) options to the initializers - # https://issues.apache.org/jira/browse/ARROW-7547 if (format == "parquet") { - shared_ptr(ParquetFileFormat, dataset___ParquetFileFormat__Make()) + ParquetFileFormat$create(...) } else if (format %in% c("ipc", "arrow")) { # These are aliases for the same thing shared_ptr(IpcFileFormat, dataset___IpcFileFormat__Make()) } else { @@ -392,6 +400,12 @@ FileFormat$create <- function(format, ...) { #' @rdname FileFormat #' @export ParquetFileFormat <- R6Class("ParquetFileFormat", inherit = FileFormat) +ParquetFileFormat$create <- function(use_buffered_stream = FALSE, + buffer_size = 8196, + dict_columns = character(0)) { + shared_ptr(ParquetFileFormat, dataset___ParquetFileFormat__Make( + use_buffered_stream, buffer_size, dict_columns)) +} #' @usage NULL #' @format NULL @@ -415,6 +429,9 @@ IpcFileFormat <- R6Class("IpcFileFormat", inherit = FileFormat) #' - `$UseThreads(threads)`: logical: should the scan use multithreading? #' The method's default input is `TRUE`, but you must call the method to enable #' multithreading because the scanner default is `FALSE`. +#' - `$BatchSize(batch_size)`: integer: Maximum row count of scanned record +#' batches, default is 32K. If scanned record batches are overflowing memory +#' then this method can be called to reduce their size. #' - `$schema`: Active binding, returns the [Schema] of the Dataset #' - `$Finish()`: Returns a `Scanner` #' @@ -449,6 +466,10 @@ ScannerBuilder <- R6Class("ScannerBuilder", inherit = Object, dataset___ScannerBuilder__UseThreads(self, threads) self }, + BatchSize = function(batch_size) { + dataset___ScannerBuilder__BatchSize(self, batch_size) + self + }, Finish = function() unique_ptr(Scanner, dataset___ScannerBuilder__Finish(self)) ), active = list( diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index d72f076877304..9c714bb456cff 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -1425,14 +1425,17 @@ RcppExport SEXP _arrow_dataset___FileFormat__type_name(SEXP format_sexp){ // dataset.cpp #if defined(ARROW_R_WITH_ARROW) -std::shared_ptr dataset___ParquetFileFormat__Make(); -RcppExport SEXP _arrow_dataset___ParquetFileFormat__Make(){ +std::shared_ptr dataset___ParquetFileFormat__Make(bool use_buffered_stream, int64_t buffer_size, CharacterVector dict_columns); +RcppExport SEXP _arrow_dataset___ParquetFileFormat__Make(SEXP use_buffered_stream_sexp, SEXP buffer_size_sexp, SEXP dict_columns_sexp){ BEGIN_RCPP - return Rcpp::wrap(dataset___ParquetFileFormat__Make()); + Rcpp::traits::input_parameter::type use_buffered_stream(use_buffered_stream_sexp); + Rcpp::traits::input_parameter::type buffer_size(buffer_size_sexp); + Rcpp::traits::input_parameter::type dict_columns(dict_columns_sexp); + return Rcpp::wrap(dataset___ParquetFileFormat__Make(use_buffered_stream, buffer_size, dict_columns)); END_RCPP } #else -RcppExport SEXP _arrow_dataset___ParquetFileFormat__Make(){ +RcppExport SEXP _arrow_dataset___ParquetFileFormat__Make(SEXP use_buffered_stream_sexp, SEXP buffer_size_sexp, SEXP dict_columns_sexp){ Rf_error("Cannot call dataset___ParquetFileFormat__Make(). Please use arrow::install_arrow() to install required runtime libraries. "); } #endif @@ -1789,6 +1792,23 @@ RcppExport SEXP _arrow_dataset___ScannerBuilder__UseThreads(SEXP sb_sexp, SEXP t } #endif +// dataset.cpp +#if defined(ARROW_R_WITH_ARROW) +void dataset___ScannerBuilder__BatchSize(const std::shared_ptr& sb, int64_t batch_size); +RcppExport SEXP _arrow_dataset___ScannerBuilder__BatchSize(SEXP sb_sexp, SEXP batch_size_sexp){ +BEGIN_RCPP + Rcpp::traits::input_parameter&>::type sb(sb_sexp); + Rcpp::traits::input_parameter::type batch_size(batch_size_sexp); + dataset___ScannerBuilder__BatchSize(sb, batch_size); + return R_NilValue; +END_RCPP +} +#else +RcppExport SEXP _arrow_dataset___ScannerBuilder__BatchSize(SEXP sb_sexp, SEXP batch_size_sexp){ + Rf_error("Cannot call dataset___ScannerBuilder__BatchSize(). Please use arrow::install_arrow() to install required runtime libraries. "); +} +#endif + // dataset.cpp #if defined(ARROW_R_WITH_ARROW) std::shared_ptr dataset___ScannerBuilder__schema(const std::shared_ptr& sb); @@ -5882,7 +5902,7 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_dataset___FSSFactory__Make1", (DL_FUNC) &_arrow_dataset___FSSFactory__Make1, 3}, { "_arrow_dataset___FSSFactory__Make3", (DL_FUNC) &_arrow_dataset___FSSFactory__Make3, 4}, { "_arrow_dataset___FileFormat__type_name", (DL_FUNC) &_arrow_dataset___FileFormat__type_name, 1}, - { "_arrow_dataset___ParquetFileFormat__Make", (DL_FUNC) &_arrow_dataset___ParquetFileFormat__Make, 0}, + { "_arrow_dataset___ParquetFileFormat__Make", (DL_FUNC) &_arrow_dataset___ParquetFileFormat__Make, 3}, { "_arrow_dataset___IpcFileFormat__Make", (DL_FUNC) &_arrow_dataset___IpcFileFormat__Make, 0}, { "_arrow_dataset___SFactory__Finish1", (DL_FUNC) &_arrow_dataset___SFactory__Finish1, 1}, { "_arrow_dataset___SFactory__Finish2", (DL_FUNC) &_arrow_dataset___SFactory__Finish2, 2}, @@ -5906,6 +5926,7 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_dataset___ScannerBuilder__Project", (DL_FUNC) &_arrow_dataset___ScannerBuilder__Project, 2}, { "_arrow_dataset___ScannerBuilder__Filter", (DL_FUNC) &_arrow_dataset___ScannerBuilder__Filter, 2}, { "_arrow_dataset___ScannerBuilder__UseThreads", (DL_FUNC) &_arrow_dataset___ScannerBuilder__UseThreads, 2}, + { "_arrow_dataset___ScannerBuilder__BatchSize", (DL_FUNC) &_arrow_dataset___ScannerBuilder__BatchSize, 2}, { "_arrow_dataset___ScannerBuilder__schema", (DL_FUNC) &_arrow_dataset___ScannerBuilder__schema, 1}, { "_arrow_dataset___ScannerBuilder__Finish", (DL_FUNC) &_arrow_dataset___ScannerBuilder__Finish, 1}, { "_arrow_dataset___Scanner__ToTable", (DL_FUNC) &_arrow_dataset___Scanner__ToTable, 1}, diff --git a/r/src/dataset.cpp b/r/src/dataset.cpp index 64093dd37eaac..6a1c318d29f54 100644 --- a/r/src/dataset.cpp +++ b/r/src/dataset.cpp @@ -19,6 +19,9 @@ #if defined(ARROW_R_WITH_ARROW) +using Rcpp::CharacterVector; +using Rcpp::String; + // [[arrow::export]] std::shared_ptr dataset___FSSFactory__Make2( const std::shared_ptr& fs, @@ -64,8 +67,19 @@ std::string dataset___FileFormat__type_name( } // [[arrow::export]] -std::shared_ptr dataset___ParquetFileFormat__Make() { - return std::make_shared(); +std::shared_ptr dataset___ParquetFileFormat__Make( + bool use_buffered_stream, int64_t buffer_size, CharacterVector dict_columns) { + auto fmt = std::make_shared(); + + fmt->reader_options.use_buffered_stream = use_buffered_stream; + fmt->reader_options.buffer_size = buffer_size; + + auto dict_columns_vector = Rcpp::as>(dict_columns); + auto& d = fmt->reader_options.dict_columns; + std::move(dict_columns_vector.begin(), dict_columns_vector.end(), + std::inserter(d, d.end())); + + return fmt; } // [[arrow::export]] @@ -210,6 +224,12 @@ void dataset___ScannerBuilder__UseThreads(const std::shared_ptrUseThreads(threads)); } +// [[arrow::export]] +void dataset___ScannerBuilder__BatchSize(const std::shared_ptr& sb, + int64_t batch_size) { + STOP_IF_NOT_OK(sb->BatchSize(batch_size)); +} + // [[arrow::export]] std::shared_ptr dataset___ScannerBuilder__schema( const std::shared_ptr& sb) { diff --git a/r/tests/testthat/test-dataset.R b/r/tests/testthat/test-dataset.R index bcde9721894d4..aae0b6f937f00 100644 --- a/r/tests/testthat/test-dataset.R +++ b/r/tests/testthat/test-dataset.R @@ -97,6 +97,12 @@ test_that("Simple interface for datasets", { ) }) +test_that("Simple interface for datasets (custom ParquetFileFormat)", { + ds <- open_dataset(dataset_dir, partitioning = schema(part = uint8()), + format = FileFormat$create("parquet", dict_columns = c("chr"))) + expect_equivalent(ds$schema$GetFieldByName("chr")$type, dictionary()) +}) + test_that("Hive partitioning", { ds <- open_dataset(hive_dir, partitioning = hive_partition(other = utf8(), group = uint8())) expect_is(ds, "Dataset")