diff --git a/cpp/src/arrow/dataset/file_base.cc b/cpp/src/arrow/dataset/file_base.cc index 4fc894de04d0c..cb4ab89cd4f51 100644 --- a/cpp/src/arrow/dataset/file_base.cc +++ b/cpp/src/arrow/dataset/file_base.cc @@ -84,6 +84,19 @@ Result> FileSystemSource::Make( std::move(filesystem), std::move(forest), std::move(partitions))); } +std::vector FileSystemSource::files() const { + std::vector files; + + DCHECK_OK(forest_.Visit([&](fs::PathForest::Ref ref) { + if (ref.stats().IsFile()) { + files.push_back(ref.stats().path()); + } + return Status::OK(); + })); + + return files; +} + std::string FileSystemSource::ToString() const { std::string repr = "FileSystemSource:"; diff --git a/cpp/src/arrow/dataset/file_base.h b/cpp/src/arrow/dataset/file_base.h index c2c73fc5fca8a..bcae386b08a2a 100644 --- a/cpp/src/arrow/dataset/file_base.h +++ b/cpp/src/arrow/dataset/file_base.h @@ -212,10 +212,12 @@ class ARROW_DS_EXPORT FileSystemSource : public Source { std::string type_name() const override { return "filesystem"; } - std::string ToString() const; - const std::shared_ptr& format() const { return format_; } + std::vector files() const; + + std::string ToString() const; + protected: FragmentIterator GetFragmentsImpl(std::shared_ptr options) override; diff --git a/cpp/src/arrow/dataset/file_test.cc b/cpp/src/arrow/dataset/file_test.cc index c6ef025a43bca..0bbcca250b301 100644 --- a/cpp/src/arrow/dataset/file_test.cc +++ b/cpp/src/arrow/dataset/file_test.cc @@ -84,10 +84,12 @@ TEST_F(TestFileSystemSource, Basic) { MakeSource({fs::File("a"), fs::File("b"), fs::File("c")}); AssertFragmentsAreFromPath(source_->GetFragments(options_), {"a", "b", "c"}); + AssertFilesAre(source_, {"a", "b", "c"}); // Should not create fragment from directories. MakeSource({fs::Dir("A"), fs::Dir("A/B"), fs::File("A/a"), fs::File("A/B/b")}); AssertFragmentsAreFromPath(source_->GetFragments(options_), {"A/a", "A/B/b"}); + AssertFilesAre(source_, {"A/a", "A/B/b"}); } TEST_F(TestFileSystemSource, RootPartitionPruning) { diff --git a/cpp/src/arrow/dataset/test_util.h b/cpp/src/arrow/dataset/test_util.h index fb405abe004f3..bb7355aa3f87b 100644 --- a/cpp/src/arrow/dataset/test_util.h +++ b/cpp/src/arrow/dataset/test_util.h @@ -305,6 +305,12 @@ class TestFileSystemSource : public ::testing::Test { std::shared_ptr options_ = ScanOptions::Make(schema({})); }; +void AssertFilesAre(const std::shared_ptr& source, + std::vector expected) { + auto fs_source = internal::checked_cast(source.get()); + EXPECT_THAT(fs_source->files(), testing::UnorderedElementsAreArray(expected)); +} + void AssertFragmentsAreFromPath(FragmentIterator it, std::vector expected) { std::vector actual; diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index c44476df11ec0..39089c9c54407 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -681,6 +681,12 @@ cdef class FileSystemSource(Source): Source.init(self, sp) self.filesystem_source = sp.get() + @property + def files(self): + """List of the files""" + cdef vector[c_string] files = self.filesystem_source.files() + return [frombytes(f) for f in files] + cdef class DatasetFactory: """ diff --git a/python/pyarrow/includes/libarrow_dataset.pxd b/python/pyarrow/includes/libarrow_dataset.pxd index 9c8c2fe3f568a..e9952f4aec5d3 100644 --- a/python/pyarrow/includes/libarrow_dataset.pxd +++ b/python/pyarrow/includes/libarrow_dataset.pxd @@ -271,6 +271,7 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: CFileStatsVector stats, CExpressionVector partitions) c_string type() + vector[c_string] files() shared_ptr[CFragmentIterator] GetFragments( shared_ptr[CScanOptions] options) diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index 7996834620328..f52c1a61057b6 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -203,6 +203,7 @@ def test_filesystem_source(mockfs): filesystem=mockfs, partitions=partitions, file_format=file_format) assert source.partition_expression.equals(root_partition) + assert set(source.files) == set(paths) def test_dataset(dataset): @@ -455,7 +456,7 @@ def test_file_system_factory(mockfs, paths_or_selector): assert factory.root_partition.equals(ds.ScalarExpression(True)) source = factory.finish() - assert isinstance(source, ds.Source) + assert isinstance(source, ds.FileSystemSource) dataset = ds.Dataset([source], inspected_schema) assert len(list(dataset.scan())) == 2 diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R index c4c1939ad6f6e..052dc6d0c6597 100644 --- a/r/R/arrowExports.R +++ b/r/R/arrowExports.R @@ -356,6 +356,10 @@ dataset___FSSFactory__Make3 <- function(fs, selector, format, factory){ .Call(`_arrow_dataset___FSSFactory__Make3` , fs, selector, format, factory) } +dataset___FileFormat__type_name <- function(format){ + .Call(`_arrow_dataset___FileFormat__type_name` , format) +} + dataset___ParquetFileFormat__Make <- function(){ .Call(`_arrow_dataset___ParquetFileFormat__Make` ) } @@ -372,12 +376,24 @@ dataset___SFactory__Finish2 <- function(factory, schema){ .Call(`_arrow_dataset___SFactory__Finish2` , factory, schema) } +dataset___SFactory__Inspect <- function(factory){ + .Call(`_arrow_dataset___SFactory__Inspect` , factory) +} + dataset___Source__schema <- function(source){ .Call(`_arrow_dataset___Source__schema` , source) } -dataset___SFactory__Inspect <- function(factory){ - .Call(`_arrow_dataset___SFactory__Inspect` , factory) +dataset___Source__type_name <- function(source){ + .Call(`_arrow_dataset___Source__type_name` , source) +} + +dataset___FSSource__format <- function(source){ + .Call(`_arrow_dataset___FSSource__format` , source) +} + +dataset___FSSource__files <- function(source){ + .Call(`_arrow_dataset___FSSource__files` , source) } dataset___DFactory__Make <- function(sources){ @@ -420,6 +436,10 @@ dataset___Dataset__schema <- function(ds){ .Call(`_arrow_dataset___Dataset__schema` , ds) } +dataset___Dataset__sources <- function(ds){ + .Call(`_arrow_dataset___Dataset__sources` , ds) +} + dataset___Dataset__NewScan <- function(ds){ .Call(`_arrow_dataset___Dataset__NewScan` , ds) } diff --git a/r/R/dataset.R b/r/R/dataset.R index d188778572202..b9a6526809926 100644 --- a/r/R/dataset.R +++ b/r/R/dataset.R @@ -93,7 +93,12 @@ Dataset <- R6Class("Dataset", inherit = Object, #' @description #' Return the Dataset's `Schema` schema = function() shared_ptr(Schema, dataset___Dataset__schema(self)), - metadata = function() self$schema$metadata + metadata = function() self$schema$metadata, + #' @description + #' Return the Dataset's `Source`s + sources = function() { + map(dataset___Dataset__sources(self), ~shared_ptr(Source, .)$..dispatch()) + } ) ) Dataset$create <- function(sources, schema) { @@ -138,7 +143,7 @@ DatasetFactory$create <- function(sources) { #' fragments contained in it, and declare a partitioning. #' `FileSystemSourceFactory` is a subclass of `SourceFactory` for #' discovering files in the local file system, the only currently supported -#' file system. +#' file system, it constructs an instance of `FileSystemSource`. #' #' In general, you'll deal with `SourceFactory` rather than `Source` itself. #' @section Factory: @@ -153,24 +158,57 @@ DatasetFactory$create <- function(sources) { #' Currently supported options are "parquet", "arrow", and "ipc" (an alias for #' the Arrow file format) #' @section Methods: -#' `Source` has one defined method: +#' `Source` and its subclasses have the following method: #' #' - `$schema`: Active binding, returns the [Schema] of the `Source` #' +#' `FileSystemSource` has the following methods: +#' +#' - `$files`: Active binding, returns the files of the `FileSystemSource` +#' - `$format`: Active binding, returns the [FileFormat] of the `FileSystemSource` +#' #' `SourceFactory` and its subclasses have the following methods: #' #' - `$Inspect()`: Walks the files in the directory and returns a common [Schema] #' - `$Finish(schema)`: Returns a `Source` #' @rdname Source #' @name Source -#' @seealso [Dataset] for what do do with a `Source` +#' @seealso [Dataset] for what to do with a `Source` #' @export Source <- R6Class("Source", inherit = Object, + public = list( + ..dispatch = function() { + if (self$type == "filesystem") { + shared_ptr(FileSystemSource, self$pointer()) + } else { + self + } + } + ), active = list( #' @description #' Return the Source's `Schema` schema = function() { shared_ptr(Schema, dataset___Source__schema(self)) + }, + #' @description + #' Return the Source's type. + type = function() dataset___Source__type_name(self) + ) +) + +#' @name FileSystemSource +#' @rdname Source +#' @export +FileSystemSource <- R6Class("FileSystemSource", inherit = Source, + active = list( + #' @description + #' Return the files contained in this `Source` + files = function() dataset___FSSource__files(self), + #' @description + #' Return the format of files in this `Source` + format = function() { + shared_ptr(FileFormat, dataset___FSSource__format(self))$..dispatch() } ) ) @@ -183,10 +221,11 @@ SourceFactory <- R6Class("SourceFactory", inherit = Object, public = list( Finish = function(schema = NULL) { if (is.null(schema)) { - shared_ptr(Source, dataset___SFactory__Finish1(self)) + ptr <- dataset___SFactory__Finish1(self) } else { - shared_ptr(Source, dataset___SFactory__Finish2(self, schema)) + ptr <- dataset___SFactory__Finish2(self, schema) } + shared_ptr(Source, ptr)$..dispatch() }, Inspect = function() shared_ptr(Schema, dataset___SFactory__Inspect(self)) ) @@ -282,23 +321,21 @@ FileSystemSourceFactory$create <- function(filesystem, assert_is(filesystem, "FileSystem") assert_is(selector, "FileSelector") assert_is(format, "FileFormat") + if (is.null(partitioning)) { - shared_ptr( - FileSystemSourceFactory, - dataset___FSSFactory__Make1(filesystem, selector, format) - ) + ptr <- dataset___FSSFactory__Make1(filesystem, selector, format) } else if (inherits(partitioning, "PartitioningFactory")) { - shared_ptr( - FileSystemSourceFactory, - dataset___FSSFactory__Make3(filesystem, selector, format, partitioning) - ) + ptr <- dataset___FSSFactory__Make3(filesystem, selector, format, partitioning) + } else if (inherits(partitioning, "Partitioning")) { + ptr <- dataset___FSSFactory__Make2(filesystem, selector, format, partitioning) } else { - assert_is(partitioning, "Partitioning") - shared_ptr( - FileSystemSourceFactory, - dataset___FSSFactory__Make2(filesystem, selector, format, partitioning) + stop( + "Expected 'partitioning' to be NULL, PartitioningFactory or Partitioning", + call. = FALSE ) } + + shared_ptr(FileSystemSourceFactory, ptr) } #' Dataset file formats @@ -319,7 +356,25 @@ FileSystemSourceFactory$create <- function(filesystem, #' @rdname FileFormat #' @name FileFormat #' @export -FileFormat <- R6Class("FileFormat", inherit = Object) +FileFormat <- R6Class("FileFormat", inherit = Object, + public = list( + ..dispatch = function() { + type <- self$type + if (type == "parquet") { + shared_ptr(ParquetFileFormat, self$pointer()) + } else if (type == "ipc") { + shared_ptr(IpcFileFormat, self$pointer()) + } else { + self + } + } + ), + active = list( + #' @description + #' Return the `FileFormat`'s type + type = function() dataset___FileFormat__type_name(self) + ) +) FileFormat$create <- function(format, ...) { # TODO: pass list(...) options to the initializers # https://issues.apache.org/jira/browse/ARROW-7547 diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index 9259f8f44e9cd..d72f076877304 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -1408,6 +1408,21 @@ RcppExport SEXP _arrow_dataset___FSSFactory__Make3(SEXP fs_sexp, SEXP selector_s } #endif +// dataset.cpp +#if defined(ARROW_R_WITH_ARROW) +std::string dataset___FileFormat__type_name(const std::shared_ptr& format); +RcppExport SEXP _arrow_dataset___FileFormat__type_name(SEXP format_sexp){ +BEGIN_RCPP + Rcpp::traits::input_parameter&>::type format(format_sexp); + return Rcpp::wrap(dataset___FileFormat__type_name(format)); +END_RCPP +} +#else +RcppExport SEXP _arrow_dataset___FileFormat__type_name(SEXP format_sexp){ + Rf_error("Cannot call dataset___FileFormat__type_name(). Please use arrow::install_arrow() to install required runtime libraries. "); +} +#endif + // dataset.cpp #if defined(ARROW_R_WITH_ARROW) std::shared_ptr dataset___ParquetFileFormat__Make(); @@ -1467,6 +1482,21 @@ RcppExport SEXP _arrow_dataset___SFactory__Finish2(SEXP factory_sexp, SEXP schem } #endif +// dataset.cpp +#if defined(ARROW_R_WITH_ARROW) +std::shared_ptr dataset___SFactory__Inspect(const std::shared_ptr& factory); +RcppExport SEXP _arrow_dataset___SFactory__Inspect(SEXP factory_sexp){ +BEGIN_RCPP + Rcpp::traits::input_parameter&>::type factory(factory_sexp); + return Rcpp::wrap(dataset___SFactory__Inspect(factory)); +END_RCPP +} +#else +RcppExport SEXP _arrow_dataset___SFactory__Inspect(SEXP factory_sexp){ + Rf_error("Cannot call dataset___SFactory__Inspect(). Please use arrow::install_arrow() to install required runtime libraries. "); +} +#endif + // dataset.cpp #if defined(ARROW_R_WITH_ARROW) std::shared_ptr dataset___Source__schema(const std::shared_ptr& source); @@ -1484,16 +1514,46 @@ RcppExport SEXP _arrow_dataset___Source__schema(SEXP source_sexp){ // dataset.cpp #if defined(ARROW_R_WITH_ARROW) -std::shared_ptr dataset___SFactory__Inspect(const std::shared_ptr& factory); -RcppExport SEXP _arrow_dataset___SFactory__Inspect(SEXP factory_sexp){ +std::string dataset___Source__type_name(const std::shared_ptr& source); +RcppExport SEXP _arrow_dataset___Source__type_name(SEXP source_sexp){ BEGIN_RCPP - Rcpp::traits::input_parameter&>::type factory(factory_sexp); - return Rcpp::wrap(dataset___SFactory__Inspect(factory)); + Rcpp::traits::input_parameter&>::type source(source_sexp); + return Rcpp::wrap(dataset___Source__type_name(source)); END_RCPP } #else -RcppExport SEXP _arrow_dataset___SFactory__Inspect(SEXP factory_sexp){ - Rf_error("Cannot call dataset___SFactory__Inspect(). Please use arrow::install_arrow() to install required runtime libraries. "); +RcppExport SEXP _arrow_dataset___Source__type_name(SEXP source_sexp){ + Rf_error("Cannot call dataset___Source__type_name(). Please use arrow::install_arrow() to install required runtime libraries. "); +} +#endif + +// dataset.cpp +#if defined(ARROW_R_WITH_ARROW) +std::shared_ptr dataset___FSSource__format(const std::shared_ptr& source); +RcppExport SEXP _arrow_dataset___FSSource__format(SEXP source_sexp){ +BEGIN_RCPP + Rcpp::traits::input_parameter&>::type source(source_sexp); + return Rcpp::wrap(dataset___FSSource__format(source)); +END_RCPP +} +#else +RcppExport SEXP _arrow_dataset___FSSource__format(SEXP source_sexp){ + Rf_error("Cannot call dataset___FSSource__format(). Please use arrow::install_arrow() to install required runtime libraries. "); +} +#endif + +// dataset.cpp +#if defined(ARROW_R_WITH_ARROW) +std::vector dataset___FSSource__files(const std::shared_ptr& source); +RcppExport SEXP _arrow_dataset___FSSource__files(SEXP source_sexp){ +BEGIN_RCPP + Rcpp::traits::input_parameter&>::type source(source_sexp); + return Rcpp::wrap(dataset___FSSource__files(source)); +END_RCPP +} +#else +RcppExport SEXP _arrow_dataset___FSSource__files(SEXP source_sexp){ + Rf_error("Cannot call dataset___FSSource__files(). Please use arrow::install_arrow() to install required runtime libraries. "); } #endif @@ -1648,6 +1708,21 @@ RcppExport SEXP _arrow_dataset___Dataset__schema(SEXP ds_sexp){ } #endif +// dataset.cpp +#if defined(ARROW_R_WITH_ARROW) +std::vector> dataset___Dataset__sources(const std::shared_ptr& ds); +RcppExport SEXP _arrow_dataset___Dataset__sources(SEXP ds_sexp){ +BEGIN_RCPP + Rcpp::traits::input_parameter&>::type ds(ds_sexp); + return Rcpp::wrap(dataset___Dataset__sources(ds)); +END_RCPP +} +#else +RcppExport SEXP _arrow_dataset___Dataset__sources(SEXP ds_sexp){ + Rf_error("Cannot call dataset___Dataset__sources(). Please use arrow::install_arrow() to install required runtime libraries. "); +} +#endif + // dataset.cpp #if defined(ARROW_R_WITH_ARROW) std::shared_ptr dataset___Dataset__NewScan(const std::shared_ptr& ds); @@ -5806,12 +5881,16 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_dataset___FSSFactory__Make2", (DL_FUNC) &_arrow_dataset___FSSFactory__Make2, 4}, { "_arrow_dataset___FSSFactory__Make1", (DL_FUNC) &_arrow_dataset___FSSFactory__Make1, 3}, { "_arrow_dataset___FSSFactory__Make3", (DL_FUNC) &_arrow_dataset___FSSFactory__Make3, 4}, + { "_arrow_dataset___FileFormat__type_name", (DL_FUNC) &_arrow_dataset___FileFormat__type_name, 1}, { "_arrow_dataset___ParquetFileFormat__Make", (DL_FUNC) &_arrow_dataset___ParquetFileFormat__Make, 0}, { "_arrow_dataset___IpcFileFormat__Make", (DL_FUNC) &_arrow_dataset___IpcFileFormat__Make, 0}, { "_arrow_dataset___SFactory__Finish1", (DL_FUNC) &_arrow_dataset___SFactory__Finish1, 1}, { "_arrow_dataset___SFactory__Finish2", (DL_FUNC) &_arrow_dataset___SFactory__Finish2, 2}, - { "_arrow_dataset___Source__schema", (DL_FUNC) &_arrow_dataset___Source__schema, 1}, { "_arrow_dataset___SFactory__Inspect", (DL_FUNC) &_arrow_dataset___SFactory__Inspect, 1}, + { "_arrow_dataset___Source__schema", (DL_FUNC) &_arrow_dataset___Source__schema, 1}, + { "_arrow_dataset___Source__type_name", (DL_FUNC) &_arrow_dataset___Source__type_name, 1}, + { "_arrow_dataset___FSSource__format", (DL_FUNC) &_arrow_dataset___FSSource__format, 1}, + { "_arrow_dataset___FSSource__files", (DL_FUNC) &_arrow_dataset___FSSource__files, 1}, { "_arrow_dataset___DFactory__Make", (DL_FUNC) &_arrow_dataset___DFactory__Make, 1}, { "_arrow_dataset___DFactory__Inspect", (DL_FUNC) &_arrow_dataset___DFactory__Inspect, 1}, { "_arrow_dataset___DFactory__Finish1", (DL_FUNC) &_arrow_dataset___DFactory__Finish1, 1}, @@ -5822,6 +5901,7 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_dataset___HivePartitioning__MakeFactory", (DL_FUNC) &_arrow_dataset___HivePartitioning__MakeFactory, 0}, { "_arrow_dataset___Dataset__create", (DL_FUNC) &_arrow_dataset___Dataset__create, 2}, { "_arrow_dataset___Dataset__schema", (DL_FUNC) &_arrow_dataset___Dataset__schema, 1}, + { "_arrow_dataset___Dataset__sources", (DL_FUNC) &_arrow_dataset___Dataset__sources, 1}, { "_arrow_dataset___Dataset__NewScan", (DL_FUNC) &_arrow_dataset___Dataset__NewScan, 1}, { "_arrow_dataset___ScannerBuilder__Project", (DL_FUNC) &_arrow_dataset___ScannerBuilder__Project, 2}, { "_arrow_dataset___ScannerBuilder__Filter", (DL_FUNC) &_arrow_dataset___ScannerBuilder__Filter, 2}, diff --git a/r/src/dataset.cpp b/r/src/dataset.cpp index 59c37d2275722..64093dd37eaac 100644 --- a/r/src/dataset.cpp +++ b/r/src/dataset.cpp @@ -57,6 +57,12 @@ std::shared_ptr dataset___FSSFactory__Make3( return VALUE_OR_STOP(ds::FileSystemSourceFactory::Make(fs, *selector, format, options)); } +// [[arrow::export]] +std::string dataset___FileFormat__type_name( + const std::shared_ptr& format) { + return format->type_name(); +} + // [[arrow::export]] std::shared_ptr dataset___ParquetFileFormat__Make() { return std::make_shared(); @@ -80,6 +86,12 @@ std::shared_ptr dataset___SFactory__Finish2( return VALUE_OR_STOP(factory->Finish(schema)); } +// [[arrow::export]] +std::shared_ptr dataset___SFactory__Inspect( + const std::shared_ptr& factory) { + return VALUE_OR_STOP(factory->Inspect()); +} + // [[arrow::export]] std::shared_ptr dataset___Source__schema( const std::shared_ptr& source) { @@ -87,9 +99,20 @@ std::shared_ptr dataset___Source__schema( } // [[arrow::export]] -std::shared_ptr dataset___SFactory__Inspect( - const std::shared_ptr& factory) { - return VALUE_OR_STOP(factory->Inspect()); +std::string dataset___Source__type_name(const std::shared_ptr& source) { + return source->type_name(); +} + +// [[arrow::export]] +std::shared_ptr dataset___FSSource__format( + const std::shared_ptr& source) { + return source->format(); +} + +// [[arrow::export]] +std::vector dataset___FSSource__files( + const std::shared_ptr& source) { + return source->files(); } // DatasetFactory @@ -154,6 +177,12 @@ std::shared_ptr dataset___Dataset__schema( return ds->schema(); } +// [[arrow::export]] +std::vector> dataset___Dataset__sources( + const std::shared_ptr& ds) { + return ds->sources(); +} + // [[arrow::export]] std::shared_ptr dataset___Dataset__NewScan( const std::shared_ptr& ds) { diff --git a/r/tests/testthat/test-dataset.R b/r/tests/testthat/test-dataset.R index 49c92d89cce0a..bcde9721894d4 100644 --- a/r/tests/testthat/test-dataset.R +++ b/r/tests/testthat/test-dataset.R @@ -19,13 +19,13 @@ context("Datasets") library(dplyr) -dataset_dir <- tempfile() +dataset_dir <- normalizePath(tempfile(), winslash = "/") dir.create(dataset_dir) -hive_dir <- tempfile() +hive_dir <- normalizePath(tempfile(), winslash = "/") dir.create(hive_dir) -ipc_dir <- tempfile() +ipc_dir <- normalizePath(tempfile(), winslash = "/") dir.create(ipc_dir) first_date <- lubridate::ymd_hms("2015-04-29 03:12:39") @@ -371,6 +371,11 @@ expect_scan_result <- function(ds, schm) { ) } +files <- c( + file.path(dataset_dir, 1, "file1.parquet", fsep = "/"), + file.path(dataset_dir, 2, "file2.parquet", fsep = "/") +) + test_that("Assembling a Dataset manually and getting a Table", { fs <- LocalFileSystem$create() selector <- FileSelector$create(dataset_dir, recursive = TRUE) @@ -383,14 +388,16 @@ test_that("Assembling a Dataset manually and getting a Table", { schm <- factory$Inspect() expect_is(schm, "Schema") - phys_schm <- ParquetFileReader$create(file.path(dataset_dir, 1, "file1.parquet"))$GetSchema() + phys_schm <- ParquetFileReader$create(files[1])$GetSchema() expect_equal(names(phys_schm), names(df1)) expect_equal(names(schm), c(names(phys_schm), "part")) src <- factory$Finish(schm) - expect_is(src, "Source") + expect_is(src, "FileSystemSource") expect_is(src$schema, "Schema") + expect_is(src$format, "ParquetFileFormat") expect_equal(names(schm), names(src$schema)) + expect_equivalent(src$files, files) ds <- Dataset$create(list(src), schm) expect_is(ds, "Dataset") @@ -400,9 +407,11 @@ test_that("Assembling a Dataset manually and getting a Table", { }) test_that("Assembling multiple SourceFactories with DatasetFactory", { - src1 <- open_source(file.path(dataset_dir, 1), format = "parquet") + dir1 <- file.path(dataset_dir, 1, fsep = "/") + src1 <- open_source(dir1, format = "parquet") expect_is(src1, "FileSystemSourceFactory") - src2 <- open_source(file.path(dataset_dir, 2), format = "parquet") + dir2 <- file.path(dataset_dir, 2, fsep = "/") + src2 <- open_source(dir2, format = "parquet") expect_is(src2, "FileSystemSourceFactory") factory <- DatasetFactory$create(c(src1, src2)) @@ -411,13 +420,14 @@ test_that("Assembling multiple SourceFactories with DatasetFactory", { schm <- factory$Inspect() expect_is(schm, "Schema") - phys_schm <- ParquetFileReader$create(file.path(dataset_dir, 1, "file1.parquet"))$GetSchema() + phys_schm <- ParquetFileReader$create(files[1])$GetSchema() expect_equal(names(phys_schm), names(df1)) ds <- factory$Finish(schm) expect_is(ds, "Dataset") expect_is(ds$schema, "Schema") expect_equal(names(schm), names(ds$schema)) + expect_equivalent(map(ds$sources, ~.$files), files) expect_scan_result(ds, schm) })