-
Notifications
You must be signed in to change notification settings - Fork 189
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Support metadata table "Manifests" #861
Changes from 14 commits
7408b05
66dc35d
fa9d1e3
f40aa87
aefea92
52d099f
7a07c30
382bdf0
a7a942e
8626d43
49639ed
fc780a2
940ddc1
0dd6678
83e8811
4c6e338
9fe6bd0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,10 +19,12 @@ | |
|
||
use std::sync::Arc; | ||
|
||
use arrow_array::builder::{MapBuilder, PrimitiveBuilder, StringBuilder}; | ||
use arrow_array::types::{Int64Type, TimestampMillisecondType}; | ||
use arrow_array::builder::{ | ||
BooleanBuilder, ListBuilder, MapBuilder, PrimitiveBuilder, StringBuilder, StructBuilder, | ||
}; | ||
use arrow_array::types::{Int32Type, Int64Type, Int8Type, TimestampMillisecondType}; | ||
use arrow_array::RecordBatch; | ||
use arrow_schema::{DataType, Field, Schema, TimeUnit}; | ||
use arrow_schema::{DataType, Field, Fields, Schema, TimeUnit}; | ||
|
||
use crate::spec::TableMetadata; | ||
use crate::table::Table; | ||
|
@@ -50,6 +52,13 @@ impl MetadataTable { | |
} | ||
} | ||
|
||
/// Get the manifests table. | ||
pub fn manifests(&self) -> ManifestsTable { | ||
ManifestsTable { | ||
metadata_table: self, | ||
} | ||
} | ||
|
||
fn metadata(&self) -> &TableMetadata { | ||
self.0.metadata() | ||
} | ||
|
@@ -128,6 +137,135 @@ impl<'a> SnapshotsTable<'a> { | |
} | ||
} | ||
|
||
/// Manifests table. | ||
pub struct ManifestsTable<'a> { | ||
metadata_table: &'a MetadataTable, | ||
} | ||
|
||
impl<'a> ManifestsTable<'a> { | ||
fn partition_summary_fields(&self) -> Vec<Field> { | ||
vec![ | ||
Field::new("contains_null", DataType::Boolean, false), | ||
Field::new("contains_nan", DataType::Boolean, true), | ||
Field::new("lower_bound", DataType::Utf8, true), | ||
Field::new("upper_bound", DataType::Utf8, true), | ||
] | ||
} | ||
|
||
fn schema(&self) -> Schema { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We might want to make this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fixed in 83e8811 |
||
Schema::new(vec![ | ||
Field::new("content", DataType::Int8, false), | ||
Field::new("path", DataType::Utf8, false), | ||
Field::new("length", DataType::Int64, false), | ||
Field::new("partition_spec_id", DataType::Int32, false), | ||
Field::new("added_snapshot_id", DataType::Int64, false), | ||
Field::new("added_data_files_count", DataType::Int32, false), | ||
Field::new("existing_data_files_count", DataType::Int32, false), | ||
Field::new("deleted_data_files_count", DataType::Int32, false), | ||
Field::new("added_delete_files_count", DataType::Int32, false), | ||
Field::new("existing_delete_files_count", DataType::Int32, false), | ||
Field::new("deleted_delete_files_count", DataType::Int32, false), | ||
Field::new( | ||
"partition_summaries", | ||
DataType::List(Arc::new(Field::new_struct( | ||
"item", | ||
self.partition_summary_fields(), | ||
false, | ||
))), | ||
false, | ||
), | ||
]) | ||
} | ||
|
||
/// Scans the manifests table. | ||
pub async fn scan(&self) -> Result<RecordBatch> { | ||
let mut content = PrimitiveBuilder::<Int8Type>::new(); | ||
let mut path = StringBuilder::new(); | ||
let mut length = PrimitiveBuilder::<Int64Type>::new(); | ||
let mut partition_spec_id = PrimitiveBuilder::<Int32Type>::new(); | ||
let mut added_snapshot_id = PrimitiveBuilder::<Int64Type>::new(); | ||
let mut added_data_files_count = PrimitiveBuilder::<Int32Type>::new(); | ||
let mut existing_data_files_count = PrimitiveBuilder::<Int32Type>::new(); | ||
let mut deleted_data_files_count = PrimitiveBuilder::<Int32Type>::new(); | ||
let mut added_delete_files_count = PrimitiveBuilder::<Int32Type>::new(); | ||
let mut existing_delete_files_count = PrimitiveBuilder::<Int32Type>::new(); | ||
let mut deleted_delete_files_count = PrimitiveBuilder::<Int32Type>::new(); | ||
let mut partition_summaries = ListBuilder::new(StructBuilder::from_fields( | ||
Fields::from(self.partition_summary_fields()), | ||
0, | ||
)) | ||
.with_field(Arc::new(Field::new_struct( | ||
"item", | ||
self.partition_summary_fields(), | ||
false, | ||
))); | ||
|
||
if let Some(snapshot) = self.metadata_table.metadata().current_snapshot() { | ||
let manifest_list = snapshot | ||
.load_manifest_list( | ||
self.metadata_table.0.file_io(), | ||
&self.metadata_table.0.metadata_ref(), | ||
) | ||
.await?; | ||
for manifest in manifest_list.entries() { | ||
content.append_value(manifest.content.clone() as i8); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's a bit unusual to see something that can use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We may derive There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fixed in 4c6e338 |
||
path.append_value(manifest.manifest_path.clone()); | ||
length.append_value(manifest.manifest_length); | ||
partition_spec_id.append_value(manifest.partition_spec_id); | ||
added_snapshot_id.append_value(manifest.added_snapshot_id); | ||
added_data_files_count.append_value(manifest.added_files_count.unwrap_or(0) as i32); | ||
existing_data_files_count | ||
.append_value(manifest.existing_files_count.unwrap_or(0) as i32); | ||
deleted_data_files_count | ||
.append_value(manifest.deleted_files_count.unwrap_or(0) as i32); | ||
added_delete_files_count | ||
.append_value(manifest.added_files_count.unwrap_or(0) as i32); | ||
existing_delete_files_count | ||
.append_value(manifest.existing_files_count.unwrap_or(0) as i32); | ||
deleted_delete_files_count | ||
.append_value(manifest.deleted_files_count.unwrap_or(0) as i32); | ||
|
||
let partition_summaries_builder = partition_summaries.values(); | ||
for summary in &manifest.partitions { | ||
partition_summaries_builder | ||
.field_builder::<BooleanBuilder>(0) | ||
.unwrap() | ||
.append_value(summary.contains_null); | ||
partition_summaries_builder | ||
.field_builder::<BooleanBuilder>(1) | ||
.unwrap() | ||
.append_option(summary.contains_nan); | ||
partition_summaries_builder | ||
.field_builder::<StringBuilder>(2) | ||
.unwrap() | ||
.append_option(summary.lower_bound.as_ref().map(|v| v.to_string())); | ||
partition_summaries_builder | ||
.field_builder::<StringBuilder>(3) | ||
.unwrap() | ||
.append_option(summary.upper_bound.as_ref().map(|v| v.to_string())); | ||
partition_summaries_builder.append(true); | ||
} | ||
partition_summaries.append(true); | ||
} | ||
} | ||
|
||
Ok(RecordBatch::try_new(Arc::new(self.schema()), vec![ | ||
Arc::new(content.finish()), | ||
Arc::new(path.finish()), | ||
Arc::new(length.finish()), | ||
Arc::new(partition_spec_id.finish()), | ||
Arc::new(added_snapshot_id.finish()), | ||
Arc::new(added_data_files_count.finish()), | ||
Arc::new(existing_data_files_count.finish()), | ||
Arc::new(deleted_data_files_count.finish()), | ||
Arc::new(added_delete_files_count.finish()), | ||
Arc::new(existing_delete_files_count.finish()), | ||
Arc::new(deleted_delete_files_count.finish()), | ||
Arc::new(partition_summaries.finish()), | ||
])?) | ||
} | ||
} | ||
|
||
#[cfg(test)] | ||
mod tests { | ||
use expect_test::{expect, Expect}; | ||
|
@@ -253,4 +391,106 @@ mod tests { | |
Some("committed_at"), | ||
); | ||
} | ||
|
||
#[tokio::test] | ||
async fn test_manifests_table() { | ||
let mut fixture = TableTestFixture::new(); | ||
fixture.setup_manifest_files().await; | ||
|
||
let record_batch = fixture | ||
.table | ||
.metadata_table() | ||
.manifests() | ||
.scan() | ||
.await | ||
.unwrap(); | ||
|
||
check_record_batch( | ||
record_batch, | ||
expect![[r#" | ||
Field { name: "content", data_type: Int8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, | ||
Field { name: "path", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, | ||
Field { name: "length", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, | ||
Field { name: "partition_spec_id", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, | ||
Field { name: "added_snapshot_id", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, | ||
Field { name: "added_data_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, | ||
Field { name: "existing_data_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, | ||
Field { name: "deleted_data_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, | ||
Field { name: "added_delete_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, | ||
Field { name: "existing_delete_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, | ||
Field { name: "deleted_delete_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, | ||
Field { name: "partition_summaries", data_type: List(Field { name: "item", data_type: Struct([Field { name: "contains_null", data_type: Boolean, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "contains_nan", data_type: Boolean, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "lower_bound", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "upper_bound", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }"#]], | ||
expect![[r#" | ||
content: PrimitiveArray<Int8> | ||
[ | ||
0, | ||
], | ||
path: (skipped), | ||
length: (skipped), | ||
partition_spec_id: PrimitiveArray<Int32> | ||
[ | ||
0, | ||
], | ||
added_snapshot_id: PrimitiveArray<Int64> | ||
[ | ||
3055729675574597004, | ||
], | ||
added_data_files_count: PrimitiveArray<Int32> | ||
[ | ||
1, | ||
], | ||
existing_data_files_count: PrimitiveArray<Int32> | ||
[ | ||
1, | ||
], | ||
deleted_data_files_count: PrimitiveArray<Int32> | ||
[ | ||
1, | ||
], | ||
added_delete_files_count: PrimitiveArray<Int32> | ||
[ | ||
1, | ||
], | ||
existing_delete_files_count: PrimitiveArray<Int32> | ||
[ | ||
1, | ||
], | ||
deleted_delete_files_count: PrimitiveArray<Int32> | ||
[ | ||
1, | ||
], | ||
partition_summaries: ListArray | ||
[ | ||
StructArray | ||
-- validity: | ||
[ | ||
valid, | ||
] | ||
[ | ||
-- child 0: "contains_null" (Boolean) | ||
BooleanArray | ||
[ | ||
false, | ||
] | ||
-- child 1: "contains_nan" (Boolean) | ||
BooleanArray | ||
[ | ||
false, | ||
] | ||
-- child 2: "lower_bound" (Utf8) | ||
StringArray | ||
[ | ||
"100", | ||
] | ||
-- child 3: "upper_bound" (Utf8) | ||
StringArray | ||
[ | ||
"300", | ||
] | ||
], | ||
]"#]], | ||
&["path", "length"], | ||
Some("path"), | ||
); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, I think we can simply use
Table
here, which suggests thatMetadataTable
is merely a wrapper and doesn't implement any additional API.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed in 9fe6bd0, PTAL