diff --git a/crates/iceberg/src/spec/manifest.rs b/crates/iceberg/src/spec/manifest.rs index 086c63080..f517b8e0d 100644 --- a/crates/iceberg/src/spec/manifest.rs +++ b/crates/iceberg/src/spec/manifest.rs @@ -18,6 +18,7 @@ //! Manifest for Iceberg. use std::cmp::min; use std::collections::HashMap; +use std::io::{Read, Write}; use std::str::FromStr; use std::sync::Arc; @@ -61,7 +62,7 @@ impl Manifest { let entries = match metadata.format_version { FormatVersion::V1 => { - let schema = manifest_schema_v1(partition_type.clone())?; + let schema = manifest_schema_v1(&partition_type)?; let reader = AvroReader::with_schema(&schema, bs)?; reader .into_iter() @@ -72,7 +73,7 @@ impl Manifest { .collect::>>()? } FormatVersion::V2 => { - let schema = manifest_schema_v2(partition_type.clone())?; + let schema = manifest_schema_v2(&partition_type)?; let reader = AvroReader::with_schema(&schema, bs)?; reader .into_iter() @@ -241,8 +242,8 @@ impl ManifestWriter { .partition_type(&manifest.metadata.schema)?; let table_schema = &manifest.metadata.schema; let avro_schema = match manifest.metadata.format_version { - FormatVersion::V1 => manifest_schema_v1(partition_type.clone())?, - FormatVersion::V2 => manifest_schema_v2(partition_type.clone())?, + FormatVersion::V1 => manifest_schema_v1(&partition_type)?, + FormatVersion::V2 => manifest_schema_v2(&partition_type)?, }; let mut avro_writer = AvroWriter::new(&avro_schema, Vec::new()); avro_writer.add_user_metadata( @@ -656,7 +657,39 @@ mod _const_schema { }) }; - pub(super) fn manifest_schema_v2(partition_type: StructType) -> Result { + fn data_file_fields_v2(partition_type: &StructType) -> Vec { + vec![ + CONTENT.clone(), + FILE_PATH.clone(), + FILE_FORMAT.clone(), + Arc::new(NestedField::required( + 102, + "partition", + Type::Struct(partition_type.clone()), + )), + RECORD_COUNT.clone(), + FILE_SIZE_IN_BYTES.clone(), + COLUMN_SIZES.clone(), + VALUE_COUNTS.clone(), + NULL_VALUE_COUNTS.clone(), + NAN_VALUE_COUNTS.clone(), + LOWER_BOUNDS.clone(), + UPPER_BOUNDS.clone(), + KEY_METADATA.clone(), + SPLIT_OFFSETS.clone(), + EQUALITY_IDS.clone(), + SORT_ORDER_ID.clone(), + ] + } + + pub(super) fn data_file_schema_v2(partition_type: &StructType) -> Result { + let schema = Schema::builder() + .with_fields(data_file_fields_v2(partition_type)) + .build()?; + schema_to_avro_schema("data_file", &schema) + } + + pub(super) fn manifest_schema_v2(partition_type: &StructType) -> Result { let fields = vec![ STATUS.clone(), SNAPSHOT_ID_V2.clone(), @@ -665,62 +698,52 @@ mod _const_schema { Arc::new(NestedField::required( 2, "data_file", - Type::Struct(StructType::new(vec![ - CONTENT.clone(), - FILE_PATH.clone(), - FILE_FORMAT.clone(), - Arc::new(NestedField::required( - 102, - "partition", - Type::Struct(partition_type), - )), - RECORD_COUNT.clone(), - FILE_SIZE_IN_BYTES.clone(), - COLUMN_SIZES.clone(), - VALUE_COUNTS.clone(), - NULL_VALUE_COUNTS.clone(), - NAN_VALUE_COUNTS.clone(), - LOWER_BOUNDS.clone(), - UPPER_BOUNDS.clone(), - KEY_METADATA.clone(), - SPLIT_OFFSETS.clone(), - EQUALITY_IDS.clone(), - SORT_ORDER_ID.clone(), - ])), + Type::Struct(StructType::new(data_file_fields_v2(partition_type))), )), ]; let schema = Schema::builder().with_fields(fields).build()?; schema_to_avro_schema("manifest_entry", &schema) } - pub(super) fn manifest_schema_v1(partition_type: StructType) -> Result { + fn data_file_fields_v1(partition_type: &StructType) -> Vec { + vec![ + FILE_PATH.clone(), + FILE_FORMAT.clone(), + Arc::new(NestedField::required( + 102, + "partition", + Type::Struct(partition_type.clone()), + )), + RECORD_COUNT.clone(), + FILE_SIZE_IN_BYTES.clone(), + BLOCK_SIZE_IN_BYTES.clone(), + COLUMN_SIZES.clone(), + VALUE_COUNTS.clone(), + NULL_VALUE_COUNTS.clone(), + NAN_VALUE_COUNTS.clone(), + LOWER_BOUNDS.clone(), + UPPER_BOUNDS.clone(), + KEY_METADATA.clone(), + SPLIT_OFFSETS.clone(), + SORT_ORDER_ID.clone(), + ] + } + + pub(super) fn data_file_schema_v1(partition_type: &StructType) -> Result { + let schema = Schema::builder() + .with_fields(data_file_fields_v1(partition_type)) + .build()?; + schema_to_avro_schema("data_file", &schema) + } + + pub(super) fn manifest_schema_v1(partition_type: &StructType) -> Result { let fields = vec![ STATUS.clone(), SNAPSHOT_ID_V1.clone(), Arc::new(NestedField::required( 2, "data_file", - Type::Struct(StructType::new(vec![ - FILE_PATH.clone(), - FILE_FORMAT.clone(), - Arc::new(NestedField::required( - 102, - "partition", - Type::Struct(partition_type), - )), - RECORD_COUNT.clone(), - FILE_SIZE_IN_BYTES.clone(), - BLOCK_SIZE_IN_BYTES.clone(), - COLUMN_SIZES.clone(), - VALUE_COUNTS.clone(), - NULL_VALUE_COUNTS.clone(), - NAN_VALUE_COUNTS.clone(), - LOWER_BOUNDS.clone(), - UPPER_BOUNDS.clone(), - KEY_METADATA.clone(), - SPLIT_OFFSETS.clone(), - SORT_ORDER_ID.clone(), - ])), + Type::Struct(StructType::new(data_file_fields_v1(partition_type))), )), ]; let schema = Schema::builder().with_fields(fields).build()?; @@ -1189,6 +1212,49 @@ impl DataFile { self.sort_order_id } } + +/// Convert data files to avro bytes and write to writer. +/// Return the bytes written. +pub fn write_data_files_to_avro( + writer: &mut W, + data_files: impl IntoIterator, + partition_type: &StructType, + version: FormatVersion, +) -> Result { + let avro_schema = match version { + FormatVersion::V1 => _const_schema::data_file_schema_v1(partition_type).unwrap(), + FormatVersion::V2 => _const_schema::data_file_schema_v2(partition_type).unwrap(), + }; + let mut writer = AvroWriter::new(&avro_schema, writer); + + for data_file in data_files { + let value = to_value(_serde::DataFile::try_from(data_file, partition_type, true)?)? + .resolve(&avro_schema)?; + writer.append(value)?; + } + + Ok(writer.flush()?) +} + +/// Parse data files from avro bytes. +pub fn read_data_files_from_avro( + reader: &mut R, + schema: &Schema, + partition_type: &StructType, + version: FormatVersion, +) -> Result> { + let avro_schema = match version { + FormatVersion::V1 => _const_schema::data_file_schema_v1(partition_type).unwrap(), + FormatVersion::V2 => _const_schema::data_file_schema_v2(partition_type).unwrap(), + }; + + let reader = AvroReader::with_schema(&avro_schema, reader)?; + reader + .into_iter() + .map(|value| from_value::<_serde::DataFile>(&value?)?.try_into(partition_type, schema)) + .collect::>>() +} + /// Type of content stored by the data file: data, equality deletes, or /// position deletes (all v1 files are data files) #[derive(Debug, PartialEq, Eq, Clone, Copy, Serialize, Deserialize)] @@ -1551,6 +1617,7 @@ mod _serde { #[cfg(test)] mod tests { use std::fs; + use std::io::Cursor; use std::sync::Arc; use tempfile::TempDir; @@ -2336,4 +2403,67 @@ mod tests { // Verify manifest (fs::read(path).expect("read_file must succeed"), res) } + + #[tokio::test] + async fn test_data_file_serialize_deserialize() { + let schema = Arc::new( + Schema::builder() + .with_fields(vec![ + Arc::new(NestedField::optional( + 1, + "v1", + Type::Primitive(PrimitiveType::Int), + )), + Arc::new(NestedField::optional( + 2, + "v2", + Type::Primitive(PrimitiveType::String), + )), + Arc::new(NestedField::optional( + 3, + "v3", + Type::Primitive(PrimitiveType::String), + )), + ]) + .build() + .unwrap(), + ); + let data_files = vec![DataFile { + content: DataContentType::Data, + file_path: "s3://testbucket/iceberg_data/iceberg_ctl/iceberg_db/iceberg_tbl/data/00000-7-45268d71-54eb-476c-b42c-942d880c04a1-00001.parquet".to_string(), + file_format: DataFileFormat::Parquet, + partition: Struct::empty(), + record_count: 1, + file_size_in_bytes: 875, + column_sizes: HashMap::from([(1,47),(2,48),(3,52)]), + value_counts: HashMap::from([(1,1),(2,1),(3,1)]), + null_value_counts: HashMap::from([(1,0),(2,0),(3,0)]), + nan_value_counts: HashMap::new(), + lower_bounds: HashMap::from([(1,Datum::int(1)),(2,Datum::string("a")),(3,Datum::string("AC/DC"))]), + upper_bounds: HashMap::from([(1,Datum::int(1)),(2,Datum::string("a")),(3,Datum::string("AC/DC"))]), + key_metadata: None, + split_offsets: vec![4], + equality_ids: vec![], + sort_order_id: Some(0), + }]; + + let mut buffer = Vec::new(); + let _ = write_data_files_to_avro( + &mut buffer, + data_files.clone().into_iter(), + &StructType::new(vec![]), + FormatVersion::V2, + ) + .unwrap(); + + let actual_data_file = read_data_files_from_avro( + &mut Cursor::new(buffer), + &schema, + &StructType::new(vec![]), + FormatVersion::V2, + ) + .unwrap(); + + assert_eq!(data_files, actual_data_file); + } }