diff --git a/src/context/iceberg.rs b/src/context/iceberg.rs index ce78def8..d7bf798a 100644 --- a/src/context/iceberg.rs +++ b/src/context/iceberg.rs @@ -1,4 +1,5 @@ use core::str; +use itertools::izip; use std::collections::HashMap; use std::error::Error; use std::pin::Pin; @@ -68,7 +69,7 @@ fn create_empty_metadata( } // Clone an arrow schema, assigning sequential field IDs starting from 1 -fn assign_field_ids(arrow_schema: Arc) -> Schema { +fn assign_field_ids(arrow_schema: &Arc) -> Schema { let mut field_id_counter = 1; let new_fields: Vec = arrow_schema .fields @@ -88,6 +89,36 @@ fn assign_field_ids(arrow_schema: Arc) -> Schema { Schema::new_with_metadata(new_fields, arrow_schema.metadata.clone()) } +fn is_schema_aligned( + new_arrow_schema: &Arc, + existing_iceberg_schema: &Arc, +) -> Result<(), DataLoadingError> { + let old_iceberg_struct = existing_iceberg_schema.as_struct(); + let old_iceberg_fields = old_iceberg_struct.fields(); + + let new_arrow_schema_with_field_ids = assign_field_ids(new_arrow_schema); + let new_iceberg_schema = Arc::new(iceberg::arrow::arrow_schema_to_schema( + &new_arrow_schema_with_field_ids, + )?); + let new_iceberg_struct = new_iceberg_schema.as_struct(); + let new_iceberg_fields = new_iceberg_struct.fields(); + + if old_iceberg_fields.len() != new_iceberg_fields.len() { + return Err(DataLoadingError::BadInputError(format!("New data is incompatible with existing schema. Old schema has {} fields but new schema has {} fields", old_iceberg_fields.len(), new_iceberg_fields.len()))); + } + for (i, old_iceberg_field, new_iceberg_field) in + izip!(0.., old_iceberg_fields.iter(), new_iceberg_fields.iter()) + { + if old_iceberg_field.required && !new_iceberg_field.required { + return Err(DataLoadingError::BadInputError(format!("New data is incompatible with existing schema. Field {} ({}) is required in old schema but not required in new schema", i, old_iceberg_field.name))); + } + if old_iceberg_field.field_type != new_iceberg_field.field_type { + return Err(DataLoadingError::BadInputError(format!("New data is incompatible with existing schema. Field {} ({}) has data type {:?} in old schema but {:?} in new schema", i, old_iceberg_field.name, old_iceberg_field.field_type, new_iceberg_field.field_type))); + } + } + Ok(()) +} + // Create a new TableMetadata object by updating the current snapshot of an existing TableMetadata fn update_metadata_snapshot( previous_metadata: &TableMetadata, @@ -134,10 +165,6 @@ pub async fn record_batches_to_iceberg( let table_base_url = Url::parse(table_location).unwrap(); let file_io = table.file_io(); - let arrow_schema_with_ids = assign_field_ids(arrow_schema.clone()); - let iceberg_schema = Arc::new(iceberg::arrow::arrow_schema_to_schema( - &arrow_schema_with_ids, - )?); let version_hint_location = format!("{}/metadata/version-hint.text", table_base_url); let version_hint_input = file_io.new_input(&version_hint_location)?; @@ -161,42 +188,49 @@ pub async fn record_batches_to_iceberg( } else { None }; - let (previous_metadata, previous_metadata_location) = match old_version_hint { - Some(version_hint) => { - let old_metadata_location = format!( - "{}/metadata/v{}.metadata.json", - table_base_url, version_hint - ); - let old_metadata_bytes = - file_io.new_input(&old_metadata_location)?.read().await?; - let old_metadata_string = - str::from_utf8(&old_metadata_bytes).map_err(|_| { - DataLoadingError::IcebergError(iceberg::Error::new( - iceberg::ErrorKind::DataInvalid, - "Could not parse UTF-8 in old metadata file", - )) - })?; - let old_metadata = serde_json::from_str::(old_metadata_string) + let (previous_metadata, previous_metadata_location, iceberg_schema) = + match old_version_hint { + Some(version_hint) => { + let old_metadata_location = format!( + "{}/metadata/v{}.metadata.json", + table_base_url, version_hint + ); + let old_metadata_bytes = + file_io.new_input(&old_metadata_location)?.read().await?; + let old_metadata_string = + str::from_utf8(&old_metadata_bytes).map_err(|_| { + DataLoadingError::IcebergError(iceberg::Error::new( + iceberg::ErrorKind::DataInvalid, + "Could not parse UTF-8 in old metadata file", + )) + })?; + let old_metadata = serde_json::from_str::( + old_metadata_string, + ) .map_err(|_| { DataLoadingError::IcebergError(iceberg::Error::new( iceberg::ErrorKind::DataInvalid, "Could not parse old metadata file", )) })?; - if old_metadata.current_schema() != &iceberg_schema { - return Err(DataLoadingError::IcebergError(iceberg::Error::new( - iceberg::ErrorKind::FeatureUnsupported, - "Schema changes not supported", - ))); + let old_iceberg_schema = old_metadata.current_schema(); + is_schema_aligned(&arrow_schema, old_iceberg_schema)?; + ( + old_metadata.clone(), + Some(old_metadata_location), + old_iceberg_schema.clone(), + ) } - (old_metadata, Some(old_metadata_location)) - } - None => { - let empty_metadata = - create_empty_metadata(&iceberg_schema, table_base_url.to_string())?; - (empty_metadata, None) - } - }; + None => { + let arrow_schema_with_ids = assign_field_ids(&arrow_schema); + let iceberg_schema = Arc::new(iceberg::arrow::arrow_schema_to_schema( + &arrow_schema_with_ids, + )?); + let empty_metadata = + create_empty_metadata(&iceberg_schema, table_base_url.to_string())?; + (empty_metadata, None, iceberg_schema) + } + }; let file_writer_builder = ParquetWriterBuilder::new( WriterProperties::builder().build(), @@ -383,3 +417,211 @@ impl SeafowlContext { Ok(()) } } + +#[cfg(test)] +mod tests { + use std::{collections::HashMap, sync::Arc}; + + use arrow_schema::{DataType, Field, Schema}; + use iceberg::spec::{NestedField, PrimitiveType, Type}; + + use super::is_schema_aligned; + + #[test] + fn test_is_schema_aligned_positive() { + let arrow_schema = Schema::new_with_metadata( + vec![ + Field::new("a", DataType::Utf8, false), + Field::new("b", DataType::Int32, false), + Field::new("c", DataType::Boolean, false), + ], + HashMap::new(), + ); + + let iceberg_schema = iceberg::spec::Schema::builder() + .with_fields(vec![ + NestedField::optional(1, "a", Type::Primitive(PrimitiveType::String)) + .into(), + NestedField::required(2, "b", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::optional(3, "c", Type::Primitive(PrimitiveType::Boolean)) + .into(), + ]) + .build() + .unwrap(); + + assert!( + is_schema_aligned(&Arc::new(arrow_schema), &Arc::new(iceberg_schema)).is_ok() + ); + } + + #[test] + fn test_is_schema_aligned_positive_renamed() { + let arrow_schema = Schema::new_with_metadata( + vec![ + // Fields renamed + Field::new("x", DataType::Utf8, false), + Field::new("y", DataType::Int32, false), + Field::new("z", DataType::Boolean, false), + ], + HashMap::new(), + ); + + let iceberg_schema = iceberg::spec::Schema::builder() + .with_fields(vec![ + NestedField::required(1, "a", Type::Primitive(PrimitiveType::String)) + .into(), + NestedField::required(2, "b", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required(3, "c", Type::Primitive(PrimitiveType::Boolean)) + .into(), + ]) + .build() + .unwrap(); + + assert!( + is_schema_aligned(&Arc::new(arrow_schema), &Arc::new(iceberg_schema)).is_ok() + ); + } + + // OK to insert a non-nullable value into a nullable field + #[test] + fn test_is_schema_aligned_positive_nonnullable() { + let arrow_schema = Schema::new_with_metadata( + vec![ + Field::new("a", DataType::Utf8, false), + Field::new("b", DataType::Int32, false), + Field::new("c", DataType::Boolean, false), + ], + HashMap::new(), + ); + + let iceberg_schema = iceberg::spec::Schema::builder() + .with_fields(vec![ + NestedField::optional(1, "a", Type::Primitive(PrimitiveType::String)) + .into(), + NestedField::optional(2, "b", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::optional(3, "c", Type::Primitive(PrimitiveType::Boolean)) + .into(), + ]) + .build() + .unwrap(); + + assert!( + is_schema_aligned(&Arc::new(arrow_schema), &Arc::new(iceberg_schema)).is_ok() + ); + } + + #[test] + fn test_is_schema_aligned_negative_added_field() { + let arrow_schema = Schema::new_with_metadata( + vec![ + Field::new("a", DataType::Utf8, false), + Field::new("b", DataType::Int32, false), + Field::new("c", DataType::Boolean, false), + Field::new("d", DataType::Boolean, false), // Added field + ], + HashMap::new(), + ); + + let iceberg_schema = iceberg::spec::Schema::builder() + .with_fields(vec![ + NestedField::required(1, "a", Type::Primitive(PrimitiveType::String)) + .into(), + NestedField::required(2, "b", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required(3, "c", Type::Primitive(PrimitiveType::Boolean)) + .into(), + ]) + .build() + .unwrap(); + + assert!( + is_schema_aligned(&Arc::new(arrow_schema), &Arc::new(iceberg_schema)) + .is_err() + ); + } + + #[test] + fn test_is_schema_aligned_negative_different_type() { + let arrow_schema = Schema::new_with_metadata( + vec![ + Field::new("a", DataType::Utf8, false), + Field::new("b", DataType::Int32, false), + Field::new("c", DataType::Int32, false), // Mismatched type + ], + HashMap::new(), + ); + + let iceberg_schema = iceberg::spec::Schema::builder() + .with_fields(vec![ + NestedField::required(1, "a", Type::Primitive(PrimitiveType::String)) + .into(), + NestedField::required(2, "b", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required(3, "c", Type::Primitive(PrimitiveType::Boolean)) + .into(), + ]) + .build() + .unwrap(); + + assert!( + is_schema_aligned(&Arc::new(arrow_schema), &Arc::new(iceberg_schema)) + .is_err() + ); + } + + #[test] + fn test_is_schema_aligned_negative_reordered() { + let arrow_schema = Schema::new_with_metadata( + vec![ + // Same fields but in wrong order + Field::new("b", DataType::Int32, false), + Field::new("a", DataType::Utf8, false), + Field::new("c", DataType::Boolean, false), + ], + HashMap::new(), + ); + + let iceberg_schema = iceberg::spec::Schema::builder() + .with_fields(vec![ + NestedField::required(1, "a", Type::Primitive(PrimitiveType::String)) + .into(), + NestedField::required(2, "b", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required(3, "c", Type::Primitive(PrimitiveType::Boolean)) + .into(), + ]) + .build() + .unwrap(); + + assert!( + is_schema_aligned(&Arc::new(arrow_schema), &Arc::new(iceberg_schema)) + .is_err() + ); + } + + // Not allowed to insert a nullable value into a non-nullable field + #[test] + fn test_is_schema_aligned_negative_nullable() { + let arrow_schema = Schema::new_with_metadata( + vec![ + Field::new("a", DataType::Utf8, true), // Nullable + Field::new("b", DataType::Int32, false), + Field::new("c", DataType::Boolean, false), + ], + HashMap::new(), + ); + + let iceberg_schema = iceberg::spec::Schema::builder() + .with_fields(vec![ + NestedField::required(1, "a", Type::Primitive(PrimitiveType::String)) + .into(), + NestedField::required(2, "b", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required(3, "c", Type::Primitive(PrimitiveType::Boolean)) + .into(), + ]) + .build() + .unwrap(); + + assert!( + is_schema_aligned(&Arc::new(arrow_schema), &Arc::new(iceberg_schema)) + .is_err() + ); + } +}