From 5814922581b48eac48ad7941957e1e9187ce1ae4 Mon Sep 17 00:00:00 2001 From: Jay Chia <17691182+jaychia@users.noreply.github.com> Date: Tue, 23 Jan 2024 18:30:17 -0800 Subject: [PATCH] [BUG] Fix for Iceberg schema projection (#1815) Correctly "projects" the schema retrieved from Iceberg onto Parquet files after reads This PR also introduces `ScanTask::materialized_schema()`, which returns the schema of a ScanTask after pushdowns (column pruning) is applied. 1. When an unloaded MicroPartition is created from a ScanTask, it inherits the ScanTask's `materialized_schema` 2. When an unloaded MicroPartition is materialized, it casts all resulting tables to its `self.schema`, which might have been modified by operations such as `.select` or `.cast`. --------- Co-authored-by: Fokko Driesprong Co-authored-by: Jay Chia --- src/daft-micropartition/src/micropartition.rs | 79 +++---------------- src/daft-scan/src/lib.rs | 27 ++++++- .../iceberg/docker-compose/Dockerfile | 4 +- .../iceberg/docker-compose/provision.py | 16 ++++ tests/integration/iceberg/test_table_load.py | 1 + 5 files changed, 57 insertions(+), 70 deletions(-) diff --git a/src/daft-micropartition/src/micropartition.rs b/src/daft-micropartition/src/micropartition.rs index aff4033a55..ef3da4da43 100644 --- a/src/daft-micropartition/src/micropartition.rs +++ b/src/daft-micropartition/src/micropartition.rs @@ -77,8 +77,6 @@ fn materialize_scan_task( cast_to_schema: Option, io_stats: Option, ) -> crate::Result<(Vec, SchemaRef)> { - log::debug!("Materializing ScanTask: {scan_task:?}"); - let column_names = scan_task .pushdowns .columns @@ -86,10 +84,6 @@ fn materialize_scan_task( .map(|v| v.iter().map(|s| s.as_ref()).collect::>()); let urls = scan_task.sources.iter().map(|s| s.get_path()); - // Schema to cast resultant tables into, ensuring that all Tables have the same schema. - // Note that we need to apply column pruning here if specified by the ScanTask - let cast_to_schema = cast_to_schema.unwrap_or_else(|| scan_task.schema.clone()); - let table_values = match scan_task.storage_config.as_ref() { StorageConfig::Native(native_storage_config) => { let runtime_handle = @@ -135,9 +129,10 @@ fn materialize_scan_task( // Native CSV Reads // **************** FileFormatConfig::Csv(cfg) => { + let schema_of_file = scan_task.schema.clone(); let col_names = if !cfg.has_headers { Some( - cast_to_schema + schema_of_file .fields .values() .map(|f| f.name.as_str()) @@ -226,7 +221,7 @@ fn materialize_scan_task( crate::python::read_parquet_into_py_table( py, url, - cast_to_schema.clone().into(), + scan_task.schema.clone().into(), (*coerce_int96_timestamp_unit).into(), scan_task.storage_config.clone().into(), scan_task @@ -254,7 +249,7 @@ fn materialize_scan_task( *has_headers, *delimiter, *double_quote, - cast_to_schema.clone().into(), + scan_task.schema.clone().into(), scan_task.storage_config.clone().into(), scan_task .pushdowns @@ -273,7 +268,7 @@ fn materialize_scan_task( crate::python::read_json_into_py_table( py, url, - cast_to_schema.clone().into(), + scan_task.schema.clone().into(), scan_task.storage_config.clone().into(), scan_task .pushdowns @@ -290,8 +285,10 @@ fn materialize_scan_task( } } }; - let cast_to_schema = prune_fields_from_schema_ref(cast_to_schema, column_names.as_deref()) - .context(DaftCoreComputeSnafu)?; + + // Schema to cast resultant tables into, ensuring that all Tables have the same schema. + // Note that we need to apply column pruning here if specified by the ScanTask + let cast_to_schema = cast_to_schema.unwrap_or_else(|| scan_task.materialized_schema()); let casted_table_values = table_values .iter() @@ -305,23 +302,13 @@ impl MicroPartition { /// Create a new "unloaded" MicroPartition using an associated [`ScanTask`] /// /// Schema invariants: - /// 1. All columns in `schema` must be exist in the `scan_task` schema - /// 2. Each Loaded column statistic in `statistics` must be castable to the corresponding column in the MicroPartition's schema + /// 1. Each Loaded column statistic in `statistics` must be castable to the corresponding column in the MicroPartition's schema pub fn new_unloaded( schema: SchemaRef, scan_task: Arc, metadata: TableMetadata, statistics: TableStatistics, ) -> Self { - assert!( - schema - .fields - .keys() - .collect::>() - .is_subset(&scan_task.schema.fields.keys().collect::>()), - "Unloaded MicroPartition's schema names must be a subset of its ScanTask's schema" - ); - MicroPartition { schema: schema.clone(), state: Mutex::new(TableState::Unloaded(scan_task)), @@ -370,7 +357,7 @@ impl MicroPartition { } pub fn from_scan_task(scan_task: Arc, io_stats: IOStatsRef) -> crate::Result { - let schema = scan_task.schema.clone(); + let schema = scan_task.materialized_schema(); match ( &scan_task.metadata, &scan_task.statistics, @@ -428,13 +415,7 @@ impl MicroPartition { ) .context(DaftCoreComputeSnafu)?; - let applied_schema = Arc::new( - mp.schema - .apply_hints(&schema) - .context(DaftCoreComputeSnafu)?, - ); - mp.cast_to_schema(applied_schema) - .context(DaftCoreComputeSnafu) + mp.cast_to_schema(schema).context(DaftCoreComputeSnafu) } // CASE: Last resort fallback option @@ -555,40 +536,6 @@ fn prune_fields_from_schema(schema: Schema, columns: Option<&[&str]>) -> DaftRes } } -fn prune_fields_from_schema_ref( - schema: SchemaRef, - columns: Option<&[&str]>, -) -> DaftResult { - if let Some(columns) = columns { - let avail_names = schema - .fields - .keys() - .map(|f| f.as_str()) - .collect::>(); - let mut names_to_keep = HashSet::new(); - for col_name in columns.iter() { - if avail_names.contains(col_name) { - names_to_keep.insert(*col_name); - } else { - return Err(super::Error::FieldNotFound { - field: col_name.to_string(), - available_fields: avail_names.iter().map(|v| v.to_string()).collect(), - } - .into()); - } - } - let filtered_columns = schema - .fields - .values() - .filter(|field| names_to_keep.contains(field.name.as_str())) - .cloned() - .collect::>(); - Ok(Schema::new(filtered_columns)?.into()) - } else { - Ok(schema) - } -} - fn parquet_sources_to_row_groups(sources: &[DataFileSource]) -> Option>>> { let row_groups = sources .iter() @@ -882,7 +829,7 @@ pub(crate) fn read_parquet_into_micropartition( let stats = stats.eval_expression_list(exprs.as_slice(), daft_schema.as_ref())?; Ok(MicroPartition::new_unloaded( - scan_task.schema.clone(), + scan_task.materialized_schema(), Arc::new(scan_task), TableMetadata { length: total_rows }, stats, diff --git a/src/daft-scan/src/lib.rs b/src/daft-scan/src/lib.rs index 6450f02cff..b99be9fbe8 100644 --- a/src/daft-scan/src/lib.rs +++ b/src/daft-scan/src/lib.rs @@ -7,7 +7,10 @@ use std::{ }; use common_error::{DaftError, DaftResult}; -use daft_core::{datatypes::Field, schema::SchemaRef}; +use daft_core::{ + datatypes::Field, + schema::{Schema, SchemaRef}, +}; use daft_dsl::ExprRef; use daft_stats::{PartitionSpec, TableMetadata, TableStatistics}; use file_format::FileFormatConfig; @@ -165,8 +168,13 @@ impl DataFileSource { #[derive(Debug, Serialize, Deserialize)] pub struct ScanTask { pub sources: Vec, - pub file_format_config: Arc, + + /// Schema to use when reading the DataFileSources. + /// Note that this is different than the schema of the data after pushdowns have been applied, + /// which can be obtained with [`ScanTask::materialized_schema`] instead. pub schema: SchemaRef, + + pub file_format_config: Arc, pub storage_config: Arc, pub pushdowns: Pushdowns, pub size_bytes_on_disk: Option, @@ -269,6 +277,21 @@ impl ScanTask { )) } + pub fn materialized_schema(&self) -> SchemaRef { + match &self.pushdowns.columns { + None => self.schema.clone(), + Some(columns) => Arc::new(Schema { + fields: self + .schema + .fields + .clone() + .into_iter() + .filter(|(name, _)| columns.contains(name)) + .collect(), + }), + } + } + pub fn num_rows(&self) -> Option { if self.pushdowns.filters.is_some() { None diff --git a/tests/integration/iceberg/docker-compose/Dockerfile b/tests/integration/iceberg/docker-compose/Dockerfile index 4756104f0b..6894899499 100644 --- a/tests/integration/iceberg/docker-compose/Dockerfile +++ b/tests/integration/iceberg/docker-compose/Dockerfile @@ -38,9 +38,9 @@ WORKDIR ${SPARK_HOME} ENV SPARK_VERSION=3.4.2 ENV ICEBERG_SPARK_RUNTIME_VERSION=3.4_2.12 -ENV ICEBERG_VERSION=1.4.0 +ENV ICEBERG_VERSION=1.4.3 ENV AWS_SDK_VERSION=2.20.18 -ENV PYICEBERG_VERSION=0.4.0 +ENV PYICEBERG_VERSION=0.5.1 RUN curl --retry 3 -s -C - https://daft-public-data.s3.us-west-2.amazonaws.com/distribution/spark-${SPARK_VERSION}-bin-hadoop3.tgz -o spark-${SPARK_VERSION}-bin-hadoop3.tgz \ && tar xzf spark-${SPARK_VERSION}-bin-hadoop3.tgz --directory /opt/spark --strip-components 1 \ diff --git a/tests/integration/iceberg/docker-compose/provision.py b/tests/integration/iceberg/docker-compose/provision.py index e1a7a6c0de..cce6362fdc 100644 --- a/tests/integration/iceberg/docker-compose/provision.py +++ b/tests/integration/iceberg/docker-compose/provision.py @@ -322,3 +322,19 @@ ('123') """ ) + +spark.sql( + """ + CREATE OR REPLACE TABLE default.add_new_column + USING iceberg + AS SELECT + 1 AS idx + UNION ALL SELECT + 2 AS idx + UNION ALL SELECT + 3 AS idx +""" +) + +spark.sql("ALTER TABLE default.add_new_column ADD COLUMN name STRING") +spark.sql("INSERT INTO default.add_new_column VALUES (3, 'abc'), (4, 'def')") diff --git a/tests/integration/iceberg/test_table_load.py b/tests/integration/iceberg/test_table_load.py index 81c8d7c8cb..4b7692e3c4 100644 --- a/tests/integration/iceberg/test_table_load.py +++ b/tests/integration/iceberg/test_table_load.py @@ -37,6 +37,7 @@ def test_daft_iceberg_table_open(local_iceberg_tables): # "test_table_sanitized_character", # Bug in scan().to_arrow().to_arrow() "test_table_version", # we have bugs when loading no files "test_uuid_and_fixed_unpartitioned", + "add_new_column", ]