diff --git a/Cargo.toml b/Cargo.toml index efff593b3..71809fdb7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,11 +18,12 @@ [workspace] resolver = "2" members = [ - "crates/catalog/*", - "crates/examples", - "crates/iceberg", - "crates/integrations/*", - "crates/test_utils", + "crates/catalog/*", + "crates/examples", + "crates/iceberg", + "crates/integration_tests", + "crates/integrations/*", + "crates/test_utils", ] exclude = ["bindings/python"] diff --git a/crates/catalog/rest/src/catalog.rs b/crates/catalog/rest/src/catalog.rs index 6905e17c7..fce5fe2be 100644 --- a/crates/catalog/rest/src/catalog.rs +++ b/crates/catalog/rest/src/catalog.rs @@ -1376,7 +1376,7 @@ mod tests { .with_schema_id(0) .with_summary(Summary { operation: Operation::Append, - other: HashMap::from_iter([ + additional_properties: HashMap::from_iter([ ("spark.app.id", "local-1646787004168"), ("added-data-files", "1"), ("added-records", "1"), diff --git a/crates/iceberg/src/catalog/mod.rs b/crates/iceberg/src/catalog/mod.rs index 9430447de..b897d1574 100644 --- a/crates/iceberg/src/catalog/mod.rs +++ b/crates/iceberg/src/catalog/mod.rs @@ -1423,7 +1423,7 @@ mod tests { .with_schema_id(1) .with_summary(Summary { operation: Operation::Append, - other: HashMap::default(), + additional_properties: HashMap::default(), }) .build(), }; @@ -1457,7 +1457,7 @@ mod tests { .with_manifest_list("s3://a/b/2.avro") .with_summary(Summary { operation: Operation::Append, - other: HashMap::default(), + additional_properties: HashMap::default(), }) .build(), }; diff --git a/crates/iceberg/src/io/object_cache.rs b/crates/iceberg/src/io/object_cache.rs index 35b6a2c94..88e2d0e2d 100644 --- a/crates/iceberg/src/io/object_cache.rs +++ b/crates/iceberg/src/io/object_cache.rs @@ -185,7 +185,7 @@ mod tests { use crate::spec::{ DataContentType, DataFileBuilder, DataFileFormat, FormatVersion, Literal, Manifest, ManifestContentType, ManifestEntry, ManifestListWriter, ManifestMetadata, ManifestStatus, - ManifestWriter, Struct, TableMetadata, EMPTY_SNAPSHOT_ID, + ManifestWriter, Struct, TableMetadata, }; use crate::table::Table; use crate::TableIdent; @@ -293,9 +293,7 @@ mod tests { .new_output(current_snapshot.manifest_list()) .unwrap(), current_snapshot.snapshot_id(), - current_snapshot - .parent_snapshot_id() - .unwrap_or(EMPTY_SNAPSHOT_ID), + current_snapshot.parent_snapshot_id(), current_snapshot.sequence_number(), ); manifest_list_write diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index 89e8846f0..8f0bc38f6 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -977,7 +977,6 @@ mod tests { DataContentType, DataFileBuilder, DataFileFormat, Datum, FormatVersion, Literal, Manifest, ManifestContentType, ManifestEntry, ManifestListWriter, ManifestMetadata, ManifestStatus, ManifestWriter, NestedField, PrimitiveType, Schema, Struct, TableMetadata, Type, - EMPTY_SNAPSHOT_ID, }; use crate::table::Table; use crate::TableIdent; @@ -1124,9 +1123,7 @@ mod tests { .new_output(current_snapshot.manifest_list()) .unwrap(), current_snapshot.snapshot_id(), - current_snapshot - .parent_snapshot_id() - .unwrap_or(EMPTY_SNAPSHOT_ID), + current_snapshot.parent_snapshot_id(), current_snapshot.sequence_number(), ); manifest_list_write diff --git a/crates/iceberg/src/spec/manifest.rs b/crates/iceberg/src/spec/manifest.rs index a0bb89b23..a868c7b11 100644 --- a/crates/iceberg/src/spec/manifest.rs +++ b/crates/iceberg/src/spec/manifest.rs @@ -900,6 +900,12 @@ impl ManifestEntry { } } + /// Snapshot id + #[inline] + pub fn snapshot_id(&self) -> Option { + self.snapshot_id + } + /// Data sequence number. #[inline] pub fn sequence_number(&self) -> Option { @@ -1328,7 +1334,7 @@ mod _serde { Ok(Self { content: value.content as i32, file_path: value.file_path, - file_format: value.file_format.to_string(), + file_format: value.file_format.to_string().to_ascii_uppercase(), partition: RawLiteral::try_from( Literal::Struct(value.partition), &Type::Struct(partition_type.clone()), diff --git a/crates/iceberg/src/spec/manifest_list.rs b/crates/iceberg/src/spec/manifest_list.rs index 3aaecf12d..5768b79d5 100644 --- a/crates/iceberg/src/spec/manifest_list.rs +++ b/crates/iceberg/src/spec/manifest_list.rs @@ -106,15 +106,17 @@ impl std::fmt::Debug for ManifestListWriter { impl ManifestListWriter { /// Construct a v1 [`ManifestListWriter`] that writes to a provided [`OutputFile`]. - pub fn v1(output_file: OutputFile, snapshot_id: i64, parent_snapshot_id: i64) -> Self { - let metadata = HashMap::from_iter([ + pub fn v1(output_file: OutputFile, snapshot_id: i64, parent_snapshot_id: Option) -> Self { + let mut metadata = HashMap::from_iter([ ("snapshot-id".to_string(), snapshot_id.to_string()), - ( - "parent-snapshot-id".to_string(), - parent_snapshot_id.to_string(), - ), ("format-version".to_string(), "1".to_string()), ]); + if let Some(parent_snapshot_id) = parent_snapshot_id { + metadata.insert( + "parent-snapshot-id".to_string(), + parent_snapshot_id.to_string(), + ); + } Self::new(FormatVersion::V1, output_file, metadata, 0, snapshot_id) } @@ -122,18 +124,20 @@ impl ManifestListWriter { pub fn v2( output_file: OutputFile, snapshot_id: i64, - parent_snapshot_id: i64, + parent_snapshot_id: Option, sequence_number: i64, ) -> Self { - let metadata = HashMap::from_iter([ + let mut metadata = HashMap::from_iter([ ("snapshot-id".to_string(), snapshot_id.to_string()), - ( - "parent-snapshot-id".to_string(), - parent_snapshot_id.to_string(), - ), ("sequence-number".to_string(), sequence_number.to_string()), ("format-version".to_string(), "2".to_string()), ]); + metadata.insert( + "parent-snapshot-id".to_string(), + parent_snapshot_id + .map(|v| v.to_string()) + .unwrap_or("null".to_string()), + ); Self::new( FormatVersion::V2, output_file, @@ -580,6 +584,18 @@ pub struct ManifestFile { pub key_metadata: Vec, } +impl ManifestFile { + /// Checks if the manifest file has any added files. + pub fn has_added_files(&self) -> bool { + self.added_files_count.is_none() || self.added_files_count.unwrap() > 0 + } + + /// Checks if the manifest file has any existed files. + pub fn has_existing_files(&self) -> bool { + self.existing_files_count.is_none() || self.existing_files_count.unwrap() > 0 + } +} + /// The type of files tracked by the manifest, either data or delete files; Data(0) for all v1 manifests #[derive(Debug, PartialEq, Clone, Eq)] pub enum ManifestContentType { @@ -1146,7 +1162,7 @@ mod test { let mut writer = ManifestListWriter::v1( file_io.new_output(full_path.clone()).unwrap(), 1646658105718557341, - 1646658105718557341, + Some(1646658105718557341), ); writer @@ -1213,7 +1229,7 @@ mod test { let mut writer = ManifestListWriter::v2( file_io.new_output(full_path.clone()).unwrap(), 1646658105718557341, - 1646658105718557341, + Some(1646658105718557341), 1, ); @@ -1335,7 +1351,7 @@ mod test { let io = FileIOBuilder::new_fs_io().build().unwrap(); let output_file = io.new_output(path.to_str().unwrap()).unwrap(); - let mut writer = ManifestListWriter::v1(output_file, 1646658105718557341, 0); + let mut writer = ManifestListWriter::v1(output_file, 1646658105718557341, Some(0)); writer .add_manifests(expected_manifest_list.entries.clone().into_iter()) .unwrap(); @@ -1391,7 +1407,7 @@ mod test { let io = FileIOBuilder::new_fs_io().build().unwrap(); let output_file = io.new_output(path.to_str().unwrap()).unwrap(); - let mut writer = ManifestListWriter::v2(output_file, snapshot_id, 0, seq_num); + let mut writer = ManifestListWriter::v2(output_file, snapshot_id, Some(0), seq_num); writer .add_manifests(expected_manifest_list.entries.clone().into_iter()) .unwrap(); @@ -1445,7 +1461,7 @@ mod test { let io = FileIOBuilder::new_fs_io().build().unwrap(); let output_file = io.new_output(path.to_str().unwrap()).unwrap(); - let mut writer = ManifestListWriter::v2(output_file, 1646658105718557341, 0, 1); + let mut writer = ManifestListWriter::v2(output_file, 1646658105718557341, Some(0), 1); writer .add_manifests(expected_manifest_list.entries.clone().into_iter()) .unwrap(); diff --git a/crates/iceberg/src/spec/snapshot.rs b/crates/iceberg/src/spec/snapshot.rs index b4bb2cd1f..81fd6eae6 100644 --- a/crates/iceberg/src/spec/snapshot.rs +++ b/crates/iceberg/src/spec/snapshot.rs @@ -59,7 +59,7 @@ pub struct Summary { pub operation: Operation, /// Other summary data. #[serde(flatten)] - pub other: HashMap, + pub additional_properties: HashMap, } impl Default for Operation { @@ -291,7 +291,7 @@ pub(super) mod _serde { }, summary: v1.summary.unwrap_or(Summary { operation: Operation::default(), - other: HashMap::new(), + additional_properties: HashMap::new(), }), schema_id: v1.schema_id, }) @@ -372,6 +372,21 @@ pub enum SnapshotRetention { }, } +impl SnapshotRetention { + /// Create a new branch retention policy + pub fn branch( + min_snapshots_to_keep: Option, + max_snapshot_age_ms: Option, + max_ref_age_ms: Option, + ) -> Self { + SnapshotRetention::Branch { + min_snapshots_to_keep, + max_snapshot_age_ms, + max_ref_age_ms, + } + } +} + #[cfg(test)] mod tests { use std::collections::HashMap; @@ -408,7 +423,7 @@ mod tests { assert_eq!( Summary { operation: Operation::Append, - other: HashMap::new() + additional_properties: HashMap::new() }, *result.summary() ); diff --git a/crates/iceberg/src/spec/table_metadata.rs b/crates/iceberg/src/spec/table_metadata.rs index 9609d9829..cd242f4b5 100644 --- a/crates/iceberg/src/spec/table_metadata.rs +++ b/crates/iceberg/src/spec/table_metadata.rs @@ -201,6 +201,18 @@ impl TableMetadata { self.last_sequence_number } + /// Returns the next sequence number for the table. + /// + /// For format version 1, it always returns the initial sequence number. + /// For other versions, it returns the last sequence number incremented by 1. + #[inline] + pub fn next_sequence_number(&self) -> i64 { + match self.format_version { + FormatVersion::V1 => INITIAL_SEQUENCE_NUMBER, + _ => self.last_sequence_number + 1, + } + } + /// Returns last updated time. #[inline] pub fn last_updated_timestamp(&self) -> Result> { @@ -1476,7 +1488,7 @@ mod tests { .with_sequence_number(0) .with_schema_id(0) .with_manifest_list("/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro") - .with_summary(Summary { operation: Operation::Append, other: HashMap::from_iter(vec![("spark.app.id".to_string(), "local-1662532784305".to_string()), ("added-data-files".to_string(), "4".to_string()), ("added-records".to_string(), "4".to_string()), ("added-files-size".to_string(), "6001".to_string())]) }) + .with_summary(Summary { operation: Operation::Append, additional_properties: HashMap::from_iter(vec![("spark.app.id".to_string(), "local-1662532784305".to_string()), ("added-data-files".to_string(), "4".to_string()), ("added-records".to_string(), "4".to_string()), ("added-files-size".to_string(), "6001".to_string())]) }) .build(); let expected = TableMetadata { @@ -1895,7 +1907,7 @@ mod tests { .with_manifest_list("s3://a/b/1.avro") .with_summary(Summary { operation: Operation::Append, - other: HashMap::new(), + additional_properties: HashMap::new(), }) .build(); @@ -1908,7 +1920,7 @@ mod tests { .with_manifest_list("s3://a/b/2.avro") .with_summary(Summary { operation: Operation::Append, - other: HashMap::new(), + additional_properties: HashMap::new(), }) .build(); diff --git a/crates/iceberg/src/spec/table_metadata_builder.rs b/crates/iceberg/src/spec/table_metadata_builder.rs index 7e60d9b9d..ed4ab7902 100644 --- a/crates/iceberg/src/spec/table_metadata_builder.rs +++ b/crates/iceberg/src/spec/table_metadata_builder.rs @@ -1818,7 +1818,7 @@ mod tests { .with_manifest_list("/snap-1.avro") .with_summary(Summary { operation: Operation::Append, - other: HashMap::from_iter(vec![ + additional_properties: HashMap::from_iter(vec![ ( "spark.app.id".to_string(), "local-1662532784305".to_string(), @@ -1881,7 +1881,7 @@ mod tests { .with_manifest_list("/snap-1.avro") .with_summary(Summary { operation: Operation::Append, - other: HashMap::from_iter(vec![ + additional_properties: HashMap::from_iter(vec![ ( "spark.app.id".to_string(), "local-1662532784305".to_string(), @@ -1901,7 +1901,7 @@ mod tests { .with_manifest_list("/snap-1.avro") .with_summary(Summary { operation: Operation::Append, - other: HashMap::from_iter(vec![ + additional_properties: HashMap::from_iter(vec![ ( "spark.app.id".to_string(), "local-1662532784305".to_string(), @@ -1949,7 +1949,7 @@ mod tests { .with_manifest_list("/snap-1.avro") .with_summary(Summary { operation: Operation::Append, - other: HashMap::new(), + additional_properties: HashMap::new(), }) .build(); @@ -1994,7 +1994,7 @@ mod tests { .with_manifest_list("/snap-1.avro") .with_summary(Summary { operation: Operation::Append, - other: HashMap::from_iter(vec![ + additional_properties: HashMap::from_iter(vec![ ( "spark.app.id".to_string(), "local-1662532784305".to_string(), @@ -2114,7 +2114,7 @@ mod tests { .with_manifest_list("/snap-1") .with_summary(Summary { operation: Operation::Append, - other: HashMap::new(), + additional_properties: HashMap::new(), }) .build(); @@ -2140,7 +2140,7 @@ mod tests { .with_parent_snapshot_id(Some(1)) .with_summary(Summary { operation: Operation::Append, - other: HashMap::new(), + additional_properties: HashMap::new(), }) .build(); diff --git a/crates/iceberg/src/spec/values.rs b/crates/iceberg/src/spec/values.rs index 552ac497f..6fb070527 100644 --- a/crates/iceberg/src/spec/values.rs +++ b/crates/iceberg/src/spec/values.rs @@ -1537,6 +1537,14 @@ impl Literal { })?; Ok(Self::decimal(decimal.mantissa())) } + + /// Attempts to convert the Literal to a PrimitiveLiteral + pub fn as_primitive_literal(&self) -> Option { + match self { + Literal::Primitive(primitive) => Some(primitive.clone()), + _ => None, + } + } } /// The partition struct stores the tuple of partition values for each file. @@ -1576,6 +1584,11 @@ impl Struct { pub fn is_null_at_index(&self, index: usize) -> bool { self.null_bitmap[index] } + + /// Return fields in the struct. + pub fn fields(&self) -> &[Literal] { + &self.fields + } } impl Index for Struct { diff --git a/crates/iceberg/src/transaction.rs b/crates/iceberg/src/transaction.rs index f29cf5122..edf1a8596 100644 --- a/crates/iceberg/src/transaction.rs +++ b/crates/iceberg/src/transaction.rs @@ -19,14 +19,26 @@ use std::cmp::Ordering; use std::collections::HashMap; +use std::future::Future; use std::mem::discriminant; +use std::ops::RangeFrom; + +use uuid::Uuid; use crate::error::Result; -use crate::spec::{FormatVersion, NullOrder, SortDirection, SortField, SortOrder, Transform}; +use crate::io::OutputFile; +use crate::spec::{ + DataFile, DataFileFormat, FormatVersion, Manifest, ManifestEntry, ManifestFile, + ManifestListWriter, ManifestMetadata, ManifestWriter, NullOrder, Operation, Snapshot, + SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder, Struct, StructType, + Summary, Transform, MAIN_BRANCH, +}; use crate::table::Table; use crate::TableUpdate::UpgradeFormatVersion; use crate::{Catalog, Error, ErrorKind, TableCommit, TableRequirement, TableUpdate}; +const META_ROOT_PATH: &str = "metadata"; + /// Table transaction. pub struct Transaction<'a> { table: &'a Table, @@ -96,6 +108,44 @@ impl<'a> Transaction<'a> { Ok(self) } + fn generate_unique_snapshot_id(&self) -> i64 { + let generate_random_id = || -> i64 { + let (lhs, rhs) = Uuid::new_v4().as_u64_pair(); + let snapshot_id = (lhs ^ rhs) as i64; + if snapshot_id < 0 { + -snapshot_id + } else { + snapshot_id + } + }; + let mut snapshot_id = generate_random_id(); + while self + .table + .metadata() + .snapshots() + .any(|s| s.snapshot_id() == snapshot_id) + { + snapshot_id = generate_random_id(); + } + snapshot_id + } + + /// Creates a fast append action. + pub fn fast_append( + self, + commit_uuid: Option, + key_metadata: Vec, + ) -> Result> { + let snapshot_id = self.generate_unique_snapshot_id(); + FastAppendAction::new( + self, + snapshot_id, + commit_uuid.unwrap_or_else(Uuid::now_v7), + key_metadata, + HashMap::new(), + ) + } + /// Creates replace sort order action. pub fn replace_sort_order(self) -> ReplaceSortOrderAction<'a> { ReplaceSortOrderAction { @@ -122,6 +172,365 @@ impl<'a> Transaction<'a> { } } +/// FastAppendAction is a transaction action for fast append data files to the table. +pub struct FastAppendAction<'a> { + snapshot_produce_action: SnapshotProduceAction<'a>, +} + +impl<'a> FastAppendAction<'a> { + #[allow(clippy::too_many_arguments)] + pub(crate) fn new( + tx: Transaction<'a>, + snapshot_id: i64, + commit_uuid: Uuid, + key_metadata: Vec, + snapshot_properties: HashMap, + ) -> Result { + Ok(Self { + snapshot_produce_action: SnapshotProduceAction::new( + tx, + snapshot_id, + key_metadata, + commit_uuid, + snapshot_properties, + )?, + }) + } + + /// Add data files to the snapshot. + pub fn add_data_files( + &mut self, + data_files: impl IntoIterator, + ) -> Result<&mut Self> { + self.snapshot_produce_action.add_data_files(data_files)?; + Ok(self) + } + + /// Finished building the action and apply it to the transaction. + pub async fn apply(self) -> Result> { + self.snapshot_produce_action + .apply(FastAppendOperation, DefaultManifestProcess) + .await + } +} + +struct FastAppendOperation; + +impl SnapshotProduceOperation for FastAppendOperation { + fn operation(&self) -> Operation { + Operation::Append + } + + async fn delete_entries( + &self, + _snapshot_produce: &SnapshotProduceAction<'_>, + ) -> Result> { + Ok(vec![]) + } + + async fn existing_manifest( + &self, + snapshot_produce: &SnapshotProduceAction<'_>, + ) -> Result> { + let Some(snapshot) = snapshot_produce.tx.table.metadata().current_snapshot() else { + return Ok(vec![]); + }; + + let manifest_list = snapshot + .load_manifest_list( + snapshot_produce.tx.table.file_io(), + &snapshot_produce.tx.table.metadata_ref(), + ) + .await?; + + Ok(manifest_list + .entries() + .iter() + .filter(|entry| entry.has_added_files() || entry.has_existing_files()) + .cloned() + .collect()) + } +} + +trait SnapshotProduceOperation: Send + Sync { + fn operation(&self) -> Operation; + #[allow(unused)] + fn delete_entries( + &self, + snapshot_produce: &SnapshotProduceAction, + ) -> impl Future>> + Send; + fn existing_manifest( + &self, + snapshot_produce: &SnapshotProduceAction, + ) -> impl Future>> + Send; +} + +struct DefaultManifestProcess; + +impl ManifestProcess for DefaultManifestProcess { + fn process_manifeset(&self, manifests: Vec) -> Vec { + manifests + } +} + +trait ManifestProcess: Send + Sync { + fn process_manifeset(&self, manifests: Vec) -> Vec; +} + +struct SnapshotProduceAction<'a> { + tx: Transaction<'a>, + snapshot_id: i64, + key_metadata: Vec, + commit_uuid: Uuid, + snapshot_properties: HashMap, + added_data_files: Vec, + // A counter used to generate unique manifest file names. + // It starts from 0 and increments for each new manifest file. + // Note: This counter is limited to the range of (0..u64::MAX). + manifest_counter: RangeFrom, +} + +impl<'a> SnapshotProduceAction<'a> { + pub(crate) fn new( + tx: Transaction<'a>, + snapshot_id: i64, + key_metadata: Vec, + commit_uuid: Uuid, + snapshot_properties: HashMap, + ) -> Result { + Ok(Self { + tx, + snapshot_id, + commit_uuid, + snapshot_properties, + added_data_files: vec![], + manifest_counter: (0..), + key_metadata, + }) + } + + // Check if the partition value is compatible with the partition type. + fn validate_partition_value( + partition_value: &Struct, + partition_type: &StructType, + ) -> Result<()> { + if partition_value.fields().len() != partition_type.fields().len() { + return Err(Error::new( + ErrorKind::DataInvalid, + "Partition value is not compatitable with partition type", + )); + } + for (value, field) in partition_value.fields().iter().zip(partition_type.fields()) { + if !field + .field_type + .as_primitive_type() + .ok_or_else(|| { + Error::new( + ErrorKind::Unexpected, + "Partition field should only be primitive type.", + ) + })? + .compatible(&value.as_primitive_literal().unwrap()) + { + return Err(Error::new( + ErrorKind::DataInvalid, + "Partition value is not compatitable partition type", + )); + } + } + Ok(()) + } + + /// Add data files to the snapshot. + pub fn add_data_files( + &mut self, + data_files: impl IntoIterator, + ) -> Result<&mut Self> { + let data_files: Vec = data_files.into_iter().collect(); + for data_file in &data_files { + if data_file.content_type() != crate::spec::DataContentType::Data { + return Err(Error::new( + ErrorKind::DataInvalid, + "Only data content type is allowed for fast append", + )); + } + Self::validate_partition_value( + data_file.partition(), + self.tx + .table + .metadata() + .default_partition_spec() + .partition_type(), + )?; + } + self.added_data_files.extend(data_files); + Ok(self) + } + + fn new_manifest_output(&mut self) -> Result { + let new_manifest_path = format!( + "{}/{}/{}-m{}.{}", + self.tx.table.metadata().location(), + META_ROOT_PATH, + self.commit_uuid, + self.manifest_counter.next().unwrap(), + DataFileFormat::Avro + ); + self.tx.table.file_io().new_output(new_manifest_path) + } + + // Write manifest file for added data files and return the ManifestFile for ManifestList. + async fn write_added_manifest(&mut self) -> Result { + let added_data_files = std::mem::take(&mut self.added_data_files); + let manifest_entries = added_data_files + .into_iter() + .map(|data_file| { + let builder = ManifestEntry::builder() + .status(crate::spec::ManifestStatus::Added) + .data_file(data_file); + if self.tx.table.metadata().format_version() == FormatVersion::V1 { + builder.snapshot_id(self.snapshot_id).build() + } else { + // For format version > 1, we set the snapshot id at the inherited time to avoid rewrite the manifest file when + // commit failed. + builder.build() + } + }) + .collect(); + let schema = self.tx.table.metadata().current_schema(); + let manifest_meta = ManifestMetadata::builder() + .schema(schema.clone()) + .schema_id(schema.schema_id()) + .format_version(self.tx.table.metadata().format_version()) + .partition_spec( + self.tx + .table + .metadata() + .default_partition_spec() + .as_ref() + .clone(), + ) + .content(crate::spec::ManifestContentType::Data) + .build(); + let manifest = Manifest::new(manifest_meta, manifest_entries); + let writer = ManifestWriter::new( + self.new_manifest_output()?, + self.snapshot_id, + self.key_metadata.clone(), + ); + writer.write(manifest).await + } + + async fn manifest_file( + &mut self, + snapshot_produce_operation: &OP, + manifest_process: &MP, + ) -> Result> { + let added_manifest = self.write_added_manifest().await?; + let existing_manifests = snapshot_produce_operation.existing_manifest(self).await?; + // # TODO + // Support process delete entries. + + let mut manifest_files = vec![added_manifest]; + manifest_files.extend(existing_manifests); + let manifest_files = manifest_process.process_manifeset(manifest_files); + Ok(manifest_files) + } + + // # TODO + // Fulfill this function + fn summary(&self, snapshot_produce_operation: &OP) -> Summary { + Summary { + operation: snapshot_produce_operation.operation(), + additional_properties: self.snapshot_properties.clone(), + } + } + + fn generate_manifest_list_file_path(&self, attempt: i64) -> String { + format!( + "{}/{}/snap-{}-{}-{}.{}", + self.tx.table.metadata().location(), + META_ROOT_PATH, + self.snapshot_id, + attempt, + self.commit_uuid, + DataFileFormat::Avro + ) + } + + /// Finished building the action and apply it to the transaction. + pub async fn apply( + mut self, + snapshot_produce_operation: OP, + process: MP, + ) -> Result> { + let new_manifests = self + .manifest_file(&snapshot_produce_operation, &process) + .await?; + let next_seq_num = self.tx.table.metadata().next_sequence_number(); + + let summary = self.summary(&snapshot_produce_operation); + + let manifest_list_path = self.generate_manifest_list_file_path(0); + + let mut manifest_list_writer = match self.tx.table.metadata().format_version() { + FormatVersion::V1 => ManifestListWriter::v1( + self.tx + .table + .file_io() + .new_output(manifest_list_path.clone())?, + self.snapshot_id, + self.tx.table.metadata().current_snapshot_id(), + ), + FormatVersion::V2 => ManifestListWriter::v2( + self.tx + .table + .file_io() + .new_output(manifest_list_path.clone())?, + self.snapshot_id, + self.tx.table.metadata().current_snapshot_id(), + next_seq_num, + ), + }; + manifest_list_writer.add_manifests(new_manifests.into_iter())?; + manifest_list_writer.close().await?; + + let commit_ts = chrono::Utc::now().timestamp_millis(); + let new_snapshot = Snapshot::builder() + .with_manifest_list(manifest_list_path) + .with_snapshot_id(self.snapshot_id) + .with_parent_snapshot_id(self.tx.table.metadata().current_snapshot_id()) + .with_sequence_number(next_seq_num) + .with_summary(summary) + .with_schema_id(self.tx.table.metadata().current_schema_id()) + .with_timestamp_ms(commit_ts) + .build(); + + self.tx.append_updates(vec![ + TableUpdate::AddSnapshot { + snapshot: new_snapshot, + }, + TableUpdate::SetSnapshotRef { + ref_name: MAIN_BRANCH.to_string(), + reference: SnapshotReference::new( + self.snapshot_id, + SnapshotRetention::branch(None, None, None), + ), + }, + ])?; + self.tx.append_requirements(vec![ + TableRequirement::UuidMatch { + uuid: self.tx.table.metadata().uuid(), + }, + TableRequirement::RefSnapshotIdMatch { + r#ref: MAIN_BRANCH.to_string(), + snapshot_id: self.tx.table.metadata().current_snapshot_id(), + }, + ])?; + Ok(self.tx) + } +} + /// Transaction action for replacing sort order. pub struct ReplaceSortOrderAction<'a> { tx: Transaction<'a>, @@ -203,10 +612,13 @@ mod tests { use std::fs::File; use std::io::BufReader; - use crate::io::FileIO; - use crate::spec::{FormatVersion, TableMetadata}; + use crate::io::FileIOBuilder; + use crate::spec::{ + DataContentType, DataFileBuilder, DataFileFormat, FormatVersion, Literal, Struct, + TableMetadata, + }; use crate::table::Table; - use crate::transaction::Transaction; + use crate::transaction::{Transaction, MAIN_BRANCH}; use crate::{TableIdent, TableRequirement, TableUpdate}; fn make_v1_table() -> Table { @@ -223,7 +635,7 @@ mod tests { .metadata(resp) .metadata_location("s3://bucket/test/location/metadata/v1.json".to_string()) .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap()) - .file_io(FileIO::from_path("/tmp").unwrap().build().unwrap()) + .file_io(FileIOBuilder::new("memory").build().unwrap()) .build() .unwrap() } @@ -242,7 +654,26 @@ mod tests { .metadata(resp) .metadata_location("s3://bucket/test/location/metadata/v1.json".to_string()) .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap()) - .file_io(FileIO::from_path("/tmp").unwrap().build().unwrap()) + .file_io(FileIOBuilder::new("memory").build().unwrap()) + .build() + .unwrap() + } + + fn make_v2_minimal_table() -> Table { + let file = File::open(format!( + "{}/testdata/table_metadata/{}", + env!("CARGO_MANIFEST_DIR"), + "TableMetadataV2ValidMinimal.json" + )) + .unwrap(); + let reader = BufReader::new(file); + let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap(); + + Table::builder() + .metadata(resp) + .metadata_location("s3://bucket/test/location/metadata/v1.json".to_string()) + .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap()) + .file_io(FileIOBuilder::new("memory").build().unwrap()) .build() .unwrap() } @@ -347,6 +778,88 @@ mod tests { ); } + #[tokio::test] + async fn test_fast_append_action() { + let table = make_v2_minimal_table(); + let tx = Transaction::new(&table); + let mut action = tx.fast_append(None, vec![]).unwrap(); + + // check add data file with uncompatitable partition value + let data_file = DataFileBuilder::default() + .content(DataContentType::Data) + .file_path("test/3.parquet".to_string()) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(100) + .record_count(1) + .partition(Struct::from_iter([Some(Literal::string("test"))])) + .build() + .unwrap(); + assert!(action.add_data_files(vec![data_file.clone()]).is_err()); + + let data_file = DataFileBuilder::default() + .content(DataContentType::Data) + .file_path("test/3.parquet".to_string()) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(100) + .record_count(1) + .partition(Struct::from_iter([Some(Literal::long(300))])) + .build() + .unwrap(); + action.add_data_files(vec![data_file.clone()]).unwrap(); + let tx = action.apply().await.unwrap(); + + // check updates and requirements + assert!( + matches!((&tx.updates[0],&tx.updates[1]), (TableUpdate::AddSnapshot { snapshot },TableUpdate::SetSnapshotRef { reference,ref_name }) if snapshot.snapshot_id() == reference.snapshot_id && ref_name == MAIN_BRANCH) + ); + assert_eq!( + vec![ + TableRequirement::UuidMatch { + uuid: tx.table.metadata().uuid() + }, + TableRequirement::RefSnapshotIdMatch { + r#ref: MAIN_BRANCH.to_string(), + snapshot_id: tx.table.metadata().current_snapshot_id + } + ], + tx.requirements + ); + + // check manifest list + let new_snapshot = if let TableUpdate::AddSnapshot { snapshot } = &tx.updates[0] { + snapshot + } else { + unreachable!() + }; + let manifest_list = new_snapshot + .load_manifest_list(table.file_io(), table.metadata()) + .await + .unwrap(); + assert_eq!(1, manifest_list.entries().len()); + assert_eq!( + manifest_list.entries()[0].sequence_number, + new_snapshot.sequence_number() + ); + + // check manifset + let manifest = manifest_list.entries()[0] + .load_manifest(table.file_io()) + .await + .unwrap(); + assert_eq!(1, manifest.entries().len()); + assert_eq!( + new_snapshot.sequence_number(), + manifest.entries()[0] + .sequence_number() + .expect("Inherit sequence number by load manifest") + ); + assert_eq!( + new_snapshot.snapshot_id(), + manifest.entries()[0].snapshot_id().unwrap() + ); + assert_eq!(data_file, *manifest.entries()[0].data_file()); + } + #[test] fn test_do_same_update_in_same_transaction() { let table = make_v2_table(); diff --git a/crates/integration_tests/Cargo.toml b/crates/integration_tests/Cargo.toml new file mode 100644 index 000000000..f9ba9e414 --- /dev/null +++ b/crates/integration_tests/Cargo.toml @@ -0,0 +1,37 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "iceberg-integration-tests" +version = { workspace = true } +edition = { workspace = true } +homepage = { workspace = true } +repository = { workspace = true } +license = { workspace = true } +rust-version = { workspace = true } + +[dependencies] +arrow-array = { workspace = true } +arrow-schema = { workspace = true } +futures = { workspace = true } +iceberg = { workspace = true } +iceberg-catalog-rest = { workspace = true } +iceberg_test_utils = { path = "../test_utils", features = ["tests"] } +log = { workspace = true } +parquet = { workspace = true } +port_scanner = { workspace = true } +tokio = { workspace = true } diff --git a/crates/integration_tests/src/lib.rs b/crates/integration_tests/src/lib.rs new file mode 100644 index 000000000..5777a4018 --- /dev/null +++ b/crates/integration_tests/src/lib.rs @@ -0,0 +1,74 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; + +use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY}; +use iceberg_catalog_rest::{RestCatalog, RestCatalogConfig}; +use iceberg_test_utils::docker::DockerCompose; +use iceberg_test_utils::{normalize_test_name, set_up}; +use port_scanner::scan_port_addr; +use tokio::time::sleep; + +const REST_CATALOG_PORT: u16 = 8181; + +pub struct TestFixture { + pub _docker_compose: DockerCompose, + pub rest_catalog: RestCatalog, +} + +pub async fn set_test_fixture(func: &str) -> TestFixture { + set_up(); + let docker_compose = DockerCompose::new( + normalize_test_name(format!("{}_{func}", module_path!())), + format!("{}/testdata", env!("CARGO_MANIFEST_DIR")), + ); + + // Start docker compose + docker_compose.run(); + + let rest_catalog_ip = docker_compose.get_container_ip("rest"); + + let read_port = format!("{}:{}", rest_catalog_ip, REST_CATALOG_PORT); + loop { + if !scan_port_addr(&read_port) { + log::info!("Waiting for 1s rest catalog to ready..."); + sleep(std::time::Duration::from_millis(1000)).await; + } else { + break; + } + } + + let container_ip = docker_compose.get_container_ip("minio"); + let read_port = format!("{}:{}", container_ip, 9000); + + let config = RestCatalogConfig::builder() + .uri(format!("http://{}:{}", rest_catalog_ip, REST_CATALOG_PORT)) + .props(HashMap::from([ + (S3_ENDPOINT.to_string(), format!("http://{}", read_port)), + (S3_ACCESS_KEY_ID.to_string(), "admin".to_string()), + (S3_SECRET_ACCESS_KEY.to_string(), "password".to_string()), + (S3_REGION.to_string(), "us-east-1".to_string()), + ])) + .build(); + let rest_catalog = RestCatalog::new(config); + + TestFixture { + _docker_compose: docker_compose, + rest_catalog, + } +} diff --git a/crates/integration_tests/testdata/docker-compose.yaml b/crates/integration_tests/testdata/docker-compose.yaml new file mode 100644 index 000000000..490f4eb94 --- /dev/null +++ b/crates/integration_tests/testdata/docker-compose.yaml @@ -0,0 +1,72 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +networks: + rest_bridge: + +services: + rest: + image: tabulario/iceberg-rest:0.10.0 + environment: + - AWS_ACCESS_KEY_ID=admin + - AWS_SECRET_ACCESS_KEY=password + - AWS_REGION=us-east-1 + - CATALOG_CATOLOG__IMPL=org.apache.iceberg.jdbc.JdbcCatalog + - CATALOG_URI=jdbc:sqlite:file:/tmp/iceberg_rest_mode=memory + - CATALOG_WAREHOUSE=s3://icebergdata/demo + - CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO + - CATALOG_S3_ENDPOINT=http://minio:9000 + depends_on: + - minio + networks: + rest_bridge: + aliases: + - icebergdata.minio + ports: + - 8181:8181 + expose: + - 8181 + + minio: + image: minio/minio:RELEASE.2024-03-07T00-43-48Z + environment: + - MINIO_ROOT_USER=admin + - MINIO_ROOT_PASSWORD=password + - MINIO_DOMAIN=minio + - MINIO_DEFAULT_BUCKETS=icebergdata + hostname: icebergdata.minio + networks: + rest_bridge: + ports: + - 9001:9001 + expose: + - 9001 + - 9000 + command: ["server", "/data", "--console-address", ":9001"] + + mc: + depends_on: + - minio + image: minio/mc:RELEASE.2024-03-07T00-31-49Z + environment: + - AWS_ACCESS_KEY_ID=admin + - AWS_SECRET_ACCESS_KEY=password + - AWS_REGION=us-east-1 + entrypoint: > + /bin/sh -c " until (/usr/bin/mc config host add minio http://minio:9000 admin password) do echo '...waiting...' && sleep 1; done; /usr/bin/mc rm -r --force minio/icebergdata; /usr/bin/mc mb minio/icebergdata; /usr/bin/mc policy set public minio/icebergdata; tail -f /dev/null " + networks: + rest_bridge: diff --git a/crates/integration_tests/tests/append_data_file_test.rs b/crates/integration_tests/tests/append_data_file_test.rs new file mode 100644 index 000000000..87e805c23 --- /dev/null +++ b/crates/integration_tests/tests/append_data_file_test.rs @@ -0,0 +1,178 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Integration tests for rest catalog. + +use std::collections::HashMap; +use std::sync::Arc; + +use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, StringArray}; +use futures::TryStreamExt; +use iceberg::spec::{NestedField, PrimitiveType, Schema, Type}; +use iceberg::transaction::Transaction; +use iceberg::writer::base_writer::data_file_writer::{DataFileWriterBuilder, DataFileWriterConfig}; +use iceberg::writer::file_writer::location_generator::{ + DefaultFileNameGenerator, DefaultLocationGenerator, +}; +use iceberg::writer::file_writer::ParquetWriterBuilder; +use iceberg::writer::{IcebergWriter, IcebergWriterBuilder}; +use iceberg::{Catalog, Namespace, NamespaceIdent, TableCreation}; +use iceberg_integration_tests::set_test_fixture; +use parquet::arrow::arrow_reader::ArrowReaderOptions; +use parquet::file::properties::WriterProperties; + +#[tokio::test] +async fn test_append_data_file() { + let fixture = set_test_fixture("test_create_table").await; + + let ns = Namespace::with_properties( + NamespaceIdent::from_strs(["apple", "ios"]).unwrap(), + HashMap::from([ + ("owner".to_string(), "ray".to_string()), + ("community".to_string(), "apache".to_string()), + ]), + ); + + fixture + .rest_catalog + .create_namespace(ns.name(), ns.properties().clone()) + .await + .unwrap(); + + let schema = Schema::builder() + .with_schema_id(1) + .with_identifier_field_ids(vec![2]) + .with_fields(vec![ + NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean)).into(), + ]) + .build() + .unwrap(); + + let table_creation = TableCreation::builder() + .name("t1".to_string()) + .schema(schema.clone()) + .build(); + + let table = fixture + .rest_catalog + .create_table(ns.name(), table_creation) + .await + .unwrap(); + + // Create the writer and write the data + let schema: Arc = Arc::new( + table + .metadata() + .current_schema() + .as_ref() + .try_into() + .unwrap(), + ); + let location_generator = DefaultLocationGenerator::new(table.metadata().clone()).unwrap(); + let file_name_generator = DefaultFileNameGenerator::new( + "test".to_string(), + None, + iceberg::spec::DataFileFormat::Parquet, + ); + let parquet_writer_builder = ParquetWriterBuilder::new( + WriterProperties::default(), + table.metadata().current_schema().clone(), + table.file_io().clone(), + location_generator.clone(), + file_name_generator.clone(), + ); + let data_file_writer_builder = DataFileWriterBuilder::new(parquet_writer_builder); + let mut data_file_writer = data_file_writer_builder + .build(DataFileWriterConfig::new(None)) + .await + .unwrap(); + let col1 = StringArray::from(vec![Some("foo"), Some("bar"), None, Some("baz")]); + let col2 = Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4)]); + let col3 = BooleanArray::from(vec![Some(true), Some(false), None, Some(false)]); + let batch = RecordBatch::try_new(schema.clone(), vec![ + Arc::new(col1) as ArrayRef, + Arc::new(col2) as ArrayRef, + Arc::new(col3) as ArrayRef, + ]) + .unwrap(); + data_file_writer.write(batch.clone()).await.unwrap(); + let data_file = data_file_writer.close().await.unwrap(); + + // check parquet file schema + let content = table + .file_io() + .new_input(data_file[0].file_path()) + .unwrap() + .read() + .await + .unwrap(); + let parquet_reader = parquet::arrow::arrow_reader::ArrowReaderMetadata::load( + &content, + ArrowReaderOptions::default(), + ) + .unwrap(); + let field_ids: Vec = parquet_reader + .parquet_schema() + .columns() + .iter() + .map(|col| col.self_type().get_basic_info().id()) + .collect(); + assert_eq!(field_ids, vec![1, 2, 3]); + + // commit result + let tx = Transaction::new(&table); + let mut append_action = tx.fast_append(None, vec![]).unwrap(); + append_action.add_data_files(data_file.clone()).unwrap(); + let tx = append_action.apply().await.unwrap(); + let table = tx.commit(&fixture.rest_catalog).await.unwrap(); + + // check result + let batch_stream = table + .scan() + .select_all() + .build() + .unwrap() + .to_arrow() + .await + .unwrap(); + let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); + assert_eq!(batches.len(), 1); + assert_eq!(batches[0], batch); + + // commit result again + let tx = Transaction::new(&table); + let mut append_action = tx.fast_append(None, vec![]).unwrap(); + append_action.add_data_files(data_file.clone()).unwrap(); + let tx = append_action.apply().await.unwrap(); + let table = tx.commit(&fixture.rest_catalog).await.unwrap(); + + // check result again + let batch_stream = table + .scan() + .select_all() + .build() + .unwrap() + .to_arrow() + .await + .unwrap(); + let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); + assert_eq!(batches.len(), 2); + assert_eq!(batches[0], batch); + assert_eq!(batches[1], batch); +} diff --git a/crates/integration_tests/tests/conflict_commit_test.rs b/crates/integration_tests/tests/conflict_commit_test.rs new file mode 100644 index 000000000..f3dd70f16 --- /dev/null +++ b/crates/integration_tests/tests/conflict_commit_test.rs @@ -0,0 +1,145 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Integration tests for rest catalog. + +use std::collections::HashMap; +use std::sync::Arc; + +use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, StringArray}; +use futures::TryStreamExt; +use iceberg::spec::{NestedField, PrimitiveType, Schema, Type}; +use iceberg::transaction::Transaction; +use iceberg::writer::base_writer::data_file_writer::{DataFileWriterBuilder, DataFileWriterConfig}; +use iceberg::writer::file_writer::location_generator::{ + DefaultFileNameGenerator, DefaultLocationGenerator, +}; +use iceberg::writer::file_writer::ParquetWriterBuilder; +use iceberg::writer::{IcebergWriter, IcebergWriterBuilder}; +use iceberg::{Catalog, Namespace, NamespaceIdent, TableCreation}; +use iceberg_integration_tests::set_test_fixture; +use parquet::file::properties::WriterProperties; + +#[tokio::test] +async fn test_append_data_file_conflict() { + let fixture = set_test_fixture("test_create_table").await; + + let ns = Namespace::with_properties( + NamespaceIdent::from_strs(["apple", "ios"]).unwrap(), + HashMap::from([ + ("owner".to_string(), "ray".to_string()), + ("community".to_string(), "apache".to_string()), + ]), + ); + + fixture + .rest_catalog + .create_namespace(ns.name(), ns.properties().clone()) + .await + .unwrap(); + + let schema = Schema::builder() + .with_schema_id(1) + .with_identifier_field_ids(vec![2]) + .with_fields(vec![ + NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean)).into(), + ]) + .build() + .unwrap(); + + let table_creation = TableCreation::builder() + .name("t1".to_string()) + .schema(schema.clone()) + .build(); + + let table = fixture + .rest_catalog + .create_table(ns.name(), table_creation) + .await + .unwrap(); + + // Create the writer and write the data + let schema: Arc = Arc::new( + table + .metadata() + .current_schema() + .as_ref() + .try_into() + .unwrap(), + ); + let location_generator = DefaultLocationGenerator::new(table.metadata().clone()).unwrap(); + let file_name_generator = DefaultFileNameGenerator::new( + "test".to_string(), + None, + iceberg::spec::DataFileFormat::Parquet, + ); + let parquet_writer_builder = ParquetWriterBuilder::new( + WriterProperties::default(), + table.metadata().current_schema().clone(), + table.file_io().clone(), + location_generator.clone(), + file_name_generator.clone(), + ); + let data_file_writer_builder = DataFileWriterBuilder::new(parquet_writer_builder); + let mut data_file_writer = data_file_writer_builder + .build(DataFileWriterConfig::new(None)) + .await + .unwrap(); + let col1 = StringArray::from(vec![Some("foo"), Some("bar"), None, Some("baz")]); + let col2 = Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4)]); + let col3 = BooleanArray::from(vec![Some(true), Some(false), None, Some(false)]); + let batch = RecordBatch::try_new(schema.clone(), vec![ + Arc::new(col1) as ArrayRef, + Arc::new(col2) as ArrayRef, + Arc::new(col3) as ArrayRef, + ]) + .unwrap(); + data_file_writer.write(batch.clone()).await.unwrap(); + let data_file = data_file_writer.close().await.unwrap(); + + // start two transaction and commit one of them + let tx1 = Transaction::new(&table); + let mut append_action = tx1.fast_append(None, vec![]).unwrap(); + append_action.add_data_files(data_file.clone()).unwrap(); + let tx1 = append_action.apply().await.unwrap(); + let tx2 = Transaction::new(&table); + let mut append_action = tx2.fast_append(None, vec![]).unwrap(); + append_action.add_data_files(data_file.clone()).unwrap(); + let tx2 = append_action.apply().await.unwrap(); + let table = tx2 + .commit(&fixture.rest_catalog) + .await + .expect("The first commit should not fail."); + + // check result + let batch_stream = table + .scan() + .select_all() + .build() + .unwrap() + .to_arrow() + .await + .unwrap(); + let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); + assert_eq!(batches.len(), 1); + assert_eq!(batches[0], batch); + + // another commit should fail + assert!(tx1.commit(&fixture.rest_catalog).await.is_err()); +}