Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support metadata table "Manifests" #861

Merged
merged 17 commits into from
Jan 2, 2025
246 changes: 243 additions & 3 deletions crates/iceberg/src/metadata_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@

use std::sync::Arc;

use arrow_array::builder::{MapBuilder, PrimitiveBuilder, StringBuilder};
use arrow_array::types::{Int64Type, TimestampMillisecondType};
use arrow_array::builder::{
BooleanBuilder, ListBuilder, MapBuilder, PrimitiveBuilder, StringBuilder, StructBuilder,
};
use arrow_array::types::{Int32Type, Int64Type, Int8Type, TimestampMillisecondType};
use arrow_array::RecordBatch;
use arrow_schema::{DataType, Field, Schema, TimeUnit};
use arrow_schema::{DataType, Field, Fields, Schema, TimeUnit};

use crate::spec::TableMetadata;
use crate::table::Table;
Expand Down Expand Up @@ -50,6 +52,13 @@ impl MetadataTable {
}
}

/// Get the manifests table.
pub fn manifests(&self) -> ManifestsTable {
ManifestsTable {
metadata_table: self,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, I think we can simply use Table here, which suggests that MetadataTable is merely a wrapper and doesn't implement any additional API.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed in 9fe6bd0, PTAL

}
}

fn metadata(&self) -> &TableMetadata {
self.0.metadata()
}
Expand Down Expand Up @@ -128,6 +137,135 @@ impl<'a> SnapshotsTable<'a> {
}
}

/// Manifests table.
pub struct ManifestsTable<'a> {
metadata_table: &'a MetadataTable,
}

impl<'a> ManifestsTable<'a> {
fn partition_summary_fields(&self) -> Vec<Field> {
vec![
Field::new("contains_null", DataType::Boolean, false),
Field::new("contains_nan", DataType::Boolean, true),
Field::new("lower_bound", DataType::Utf8, true),
Field::new("upper_bound", DataType::Utf8, true),
]
}

fn schema(&self) -> Schema {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might want to make this pub, so engines can get the schema first without fetching the data.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed in 83e8811

Schema::new(vec![
Field::new("content", DataType::Int8, false),
Field::new("path", DataType::Utf8, false),
Field::new("length", DataType::Int64, false),
Field::new("partition_spec_id", DataType::Int32, false),
Field::new("added_snapshot_id", DataType::Int64, false),
Field::new("added_data_files_count", DataType::Int32, false),
Field::new("existing_data_files_count", DataType::Int32, false),
Field::new("deleted_data_files_count", DataType::Int32, false),
Field::new("added_delete_files_count", DataType::Int32, false),
Field::new("existing_delete_files_count", DataType::Int32, false),
Field::new("deleted_delete_files_count", DataType::Int32, false),
Field::new(
"partition_summaries",
DataType::List(Arc::new(Field::new_struct(
"item",
self.partition_summary_fields(),
false,
))),
false,
),
])
}

/// Scans the manifests table.
pub async fn scan(&self) -> Result<RecordBatch> {
let mut content = PrimitiveBuilder::<Int8Type>::new();
let mut path = StringBuilder::new();
let mut length = PrimitiveBuilder::<Int64Type>::new();
let mut partition_spec_id = PrimitiveBuilder::<Int32Type>::new();
let mut added_snapshot_id = PrimitiveBuilder::<Int64Type>::new();
let mut added_data_files_count = PrimitiveBuilder::<Int32Type>::new();
let mut existing_data_files_count = PrimitiveBuilder::<Int32Type>::new();
let mut deleted_data_files_count = PrimitiveBuilder::<Int32Type>::new();
let mut added_delete_files_count = PrimitiveBuilder::<Int32Type>::new();
let mut existing_delete_files_count = PrimitiveBuilder::<Int32Type>::new();
let mut deleted_delete_files_count = PrimitiveBuilder::<Int32Type>::new();
let mut partition_summaries = ListBuilder::new(StructBuilder::from_fields(
Fields::from(self.partition_summary_fields()),
0,
))
.with_field(Arc::new(Field::new_struct(
"item",
self.partition_summary_fields(),
false,
)));

if let Some(snapshot) = self.metadata_table.metadata().current_snapshot() {
let manifest_list = snapshot
.load_manifest_list(
self.metadata_table.0.file_io(),
&self.metadata_table.0.metadata_ref(),
)
.await?;
for manifest in manifest_list.entries() {
content.append_value(manifest.content.clone() as i8);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bit unusual to see something that can use as u8 but still requires clone.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may derive Copy for ManifestContentType

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed in 4c6e338

path.append_value(manifest.manifest_path.clone());
length.append_value(manifest.manifest_length);
partition_spec_id.append_value(manifest.partition_spec_id);
added_snapshot_id.append_value(manifest.added_snapshot_id);
added_data_files_count.append_value(manifest.added_files_count.unwrap_or(0) as i32);
existing_data_files_count
.append_value(manifest.existing_files_count.unwrap_or(0) as i32);
deleted_data_files_count
.append_value(manifest.deleted_files_count.unwrap_or(0) as i32);
added_delete_files_count
.append_value(manifest.added_files_count.unwrap_or(0) as i32);
existing_delete_files_count
.append_value(manifest.existing_files_count.unwrap_or(0) as i32);
deleted_delete_files_count
.append_value(manifest.deleted_files_count.unwrap_or(0) as i32);

let partition_summaries_builder = partition_summaries.values();
for summary in &manifest.partitions {
partition_summaries_builder
.field_builder::<BooleanBuilder>(0)
.unwrap()
.append_value(summary.contains_null);
partition_summaries_builder
.field_builder::<BooleanBuilder>(1)
.unwrap()
.append_option(summary.contains_nan);
partition_summaries_builder
.field_builder::<StringBuilder>(2)
.unwrap()
.append_option(summary.lower_bound.as_ref().map(|v| v.to_string()));
partition_summaries_builder
.field_builder::<StringBuilder>(3)
.unwrap()
.append_option(summary.upper_bound.as_ref().map(|v| v.to_string()));
partition_summaries_builder.append(true);
}
partition_summaries.append(true);
}
}

Ok(RecordBatch::try_new(Arc::new(self.schema()), vec![
Arc::new(content.finish()),
Arc::new(path.finish()),
Arc::new(length.finish()),
Arc::new(partition_spec_id.finish()),
Arc::new(added_snapshot_id.finish()),
Arc::new(added_data_files_count.finish()),
Arc::new(existing_data_files_count.finish()),
Arc::new(deleted_data_files_count.finish()),
Arc::new(added_delete_files_count.finish()),
Arc::new(existing_delete_files_count.finish()),
Arc::new(deleted_delete_files_count.finish()),
Arc::new(partition_summaries.finish()),
])?)
}
}

#[cfg(test)]
mod tests {
use expect_test::{expect, Expect};
Expand Down Expand Up @@ -253,4 +391,106 @@ mod tests {
Some("committed_at"),
);
}

#[tokio::test]
async fn test_manifests_table() {
let mut fixture = TableTestFixture::new();
fixture.setup_manifest_files().await;

let record_batch = fixture
.table
.metadata_table()
.manifests()
.scan()
.await
.unwrap();

check_record_batch(
record_batch,
expect![[r#"
Field { name: "content", data_type: Int8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "path", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "length", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "partition_spec_id", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "added_snapshot_id", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "added_data_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "existing_data_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "deleted_data_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "added_delete_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "existing_delete_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "deleted_delete_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "partition_summaries", data_type: List(Field { name: "item", data_type: Struct([Field { name: "contains_null", data_type: Boolean, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "contains_nan", data_type: Boolean, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "lower_bound", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "upper_bound", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }"#]],
expect![[r#"
content: PrimitiveArray<Int8>
[
0,
],
path: (skipped),
length: (skipped),
partition_spec_id: PrimitiveArray<Int32>
[
0,
],
added_snapshot_id: PrimitiveArray<Int64>
[
3055729675574597004,
],
added_data_files_count: PrimitiveArray<Int32>
[
1,
],
existing_data_files_count: PrimitiveArray<Int32>
[
1,
],
deleted_data_files_count: PrimitiveArray<Int32>
[
1,
],
added_delete_files_count: PrimitiveArray<Int32>
[
1,
],
existing_delete_files_count: PrimitiveArray<Int32>
[
1,
],
deleted_delete_files_count: PrimitiveArray<Int32>
[
1,
],
partition_summaries: ListArray
[
StructArray
-- validity:
[
valid,
]
[
-- child 0: "contains_null" (Boolean)
BooleanArray
[
false,
]
-- child 1: "contains_nan" (Boolean)
BooleanArray
[
false,
]
-- child 2: "lower_bound" (Utf8)
StringArray
[
"100",
]
-- child 3: "upper_bound" (Utf8)
StringArray
[
"300",
]
],
]"#]],
&["path", "length"],
Some("path"),
);
}
}
2 changes: 1 addition & 1 deletion crates/iceberg/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1050,7 +1050,7 @@ pub mod tests {
.unwrap()
}

async fn setup_manifest_files(&mut self) {
pub async fn setup_manifest_files(&mut self) {
let current_snapshot = self.table.metadata().current_snapshot().unwrap();
let parent_snapshot = current_snapshot
.parent_snapshot(self.table.metadata())
Expand Down
Loading