From b8e45f4e432b12148e5f93bc12e60930759eebf2 Mon Sep 17 00:00:00 2001 From: Colin Ho Date: Mon, 26 Aug 2024 10:19:33 -0700 Subject: [PATCH] add --- src/daft-io/src/object_store_glob.rs | 13 +++++++--- .../io/parquet/test_reads_s3_minio.py | 24 +++++++++++++++++++ 2 files changed, 34 insertions(+), 3 deletions(-) diff --git a/src/daft-io/src/object_store_glob.rs b/src/daft-io/src/object_store_glob.rs index 43a1b3e23c..f5d874998e 100644 --- a/src/daft-io/src/object_store_glob.rs +++ b/src/daft-io/src/object_store_glob.rs @@ -26,10 +26,12 @@ const SCHEME_SUFFIX_LEN: usize = "://".len(); /// the `glob` utility can only be used with POSIX-style paths. const GLOB_DELIMITER: &str = "/"; -// NOTE: We use the following suffixes to filter out Spark marker files +// NOTE: We use the following suffixes to filter out Spark/Databricks marker files const MARKER_SUFFIXES: [&str; 1] = [".crc"]; -// NOTE: We use the following file names to filter out Spark marker files +// NOTE: We use the following file names to filter out Spark/Databricks marker files const MARKER_FILES: [&str; 3] = ["_metadata", "_common_metadata", "_success"]; +// NOTE: We use the following prefixes to filter out Spark/Databricks marker files +const MARKER_PREFIXES: [&str; 2] = ["_started", "_committed"]; #[derive(Clone)] pub(crate) struct GlobState { @@ -322,7 +324,12 @@ fn _should_return(fm: &FileMetadata) -> bool { .iter() .any(|suffix| file_path.ends_with(suffix)) || file_name - .is_some_and(|file| MARKER_FILES.iter().any(|m_file| file == *m_file)) => + .is_some_and(|file| MARKER_FILES.iter().any(|m_file| file == *m_file)) + || file_name.is_some_and(|file| { + MARKER_PREFIXES + .iter() + .any(|m_prefix| file.starts_with(m_prefix)) + }) => { false } diff --git a/tests/integration/io/parquet/test_reads_s3_minio.py b/tests/integration/io/parquet/test_reads_s3_minio.py index 075eacc91d..307e9e5dfa 100644 --- a/tests/integration/io/parquet/test_reads_s3_minio.py +++ b/tests/integration/io/parquet/test_reads_s3_minio.py @@ -62,6 +62,30 @@ def test_minio_parquet_ignore_marker_files(minio_io_config): assert read.to_pydict() == {"x": [1, 2, 3, 4] * 3} +@pytest.mark.integration() +def test_minio_parquet_ignore_marker_prefixes(minio_io_config): + from datetime import datetime + + bucket_name = "data-engineering-prod" + with minio_create_bucket(minio_io_config, bucket_name=bucket_name) as fs: + target_paths = [ + "s3://data-engineering-prod/X/part-00000-51723f93-0ba2-42f1-a58f-154f0ed40f28.c000.snappy.parquet", + "s3://data-engineering-prod/Y/part-00000-6d5c7cc6-3b4a-443e-a46a-ca9e080bda1b.c000.snappy.parquet", + ] + data = {"x": [1, 2, 3, 4]} + pa_table = pa.Table.from_pydict(data) + for path in target_paths: + pq.write_table(pa_table, path, filesystem=fs) + + marker_prefixes = ["_started", "_committed"] + for marker_prefix in marker_prefixes: + fs.touch(f"s3://{bucket_name}/X/{marker_prefix}_{datetime.now().isoformat()}") + fs.touch(f"s3://{bucket_name}/Y/{marker_prefix}_{datetime.now().isoformat()}") + + read = daft.read_parquet(f"s3://{bucket_name}/**", io_config=minio_io_config) + assert read.to_pydict() == {"x": [1, 2, 3, 4] * 2} + + @pytest.mark.integration() def test_minio_parquet_read_mismatched_schemas_no_pushdown(minio_io_config): # When we read files, we infer schema from the first file