Skip to content

Commit

Permalink
Metadata table scans as streams (#870)
Browse files Browse the repository at this point in the history
  • Loading branch information
rshkv authored Jan 8, 2025
1 parent e34f428 commit 6e07faa
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 33 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 16 additions & 12 deletions crates/iceberg/src/inspect/manifests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ use arrow_array::builder::{
use arrow_array::types::{Int32Type, Int64Type, Int8Type};
use arrow_array::RecordBatch;
use arrow_schema::{DataType, Field, Fields, Schema};
use futures::{stream, StreamExt};

use crate::scan::ArrowRecordBatchStream;
use crate::table::Table;
use crate::Result;

Expand All @@ -38,7 +40,7 @@ impl<'a> ManifestsTable<'a> {
Self { table }
}

fn partition_summary_fields(&self) -> Vec<Field> {
fn partition_summary_fields() -> Vec<Field> {
vec![
Field::new("contains_null", DataType::Boolean, false),
Field::new("contains_nan", DataType::Boolean, true),
Expand All @@ -65,7 +67,7 @@ impl<'a> ManifestsTable<'a> {
"partition_summaries",
DataType::List(Arc::new(Field::new_struct(
"item",
self.partition_summary_fields(),
Self::partition_summary_fields(),
false,
))),
false,
Expand All @@ -74,7 +76,7 @@ impl<'a> ManifestsTable<'a> {
}

/// Scans the manifests table.
pub async fn scan(&self) -> Result<RecordBatch> {
pub async fn scan(&self) -> Result<ArrowRecordBatchStream> {
let mut content = PrimitiveBuilder::<Int8Type>::new();
let mut path = StringBuilder::new();
let mut length = PrimitiveBuilder::<Int64Type>::new();
Expand All @@ -87,12 +89,12 @@ impl<'a> ManifestsTable<'a> {
let mut existing_delete_files_count = PrimitiveBuilder::<Int32Type>::new();
let mut deleted_delete_files_count = PrimitiveBuilder::<Int32Type>::new();
let mut partition_summaries = ListBuilder::new(StructBuilder::from_fields(
Fields::from(self.partition_summary_fields()),
Fields::from(Self::partition_summary_fields()),
0,
))
.with_field(Arc::new(Field::new_struct(
"item",
self.partition_summary_fields(),
Self::partition_summary_fields(),
false,
)));

Expand Down Expand Up @@ -142,7 +144,7 @@ impl<'a> ManifestsTable<'a> {
}
}

Ok(RecordBatch::try_new(Arc::new(self.schema()), vec![
let batch = RecordBatch::try_new(Arc::new(self.schema()), vec![
Arc::new(content.finish()),
Arc::new(path.finish()),
Arc::new(length.finish()),
Expand All @@ -155,26 +157,28 @@ impl<'a> ManifestsTable<'a> {
Arc::new(existing_delete_files_count.finish()),
Arc::new(deleted_delete_files_count.finish()),
Arc::new(partition_summaries.finish()),
])?)
])?;

Ok(stream::iter(vec![Ok(batch)]).boxed())
}
}

#[cfg(test)]
mod tests {
use expect_test::expect;

use crate::inspect::metadata_table::tests::check_record_batch;
use crate::inspect::metadata_table::tests::check_record_batches;
use crate::scan::tests::TableTestFixture;

#[tokio::test]
async fn test_manifests_table() {
let mut fixture = TableTestFixture::new();
fixture.setup_manifest_files().await;

let record_batch = fixture.table.inspect().manifests().scan().await.unwrap();
let batch_stream = fixture.table.inspect().manifests().scan().await.unwrap();

check_record_batch(
record_batch,
check_record_batches(
batch_stream,
expect![[r#"
Field { name: "content", data_type: Int8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "path", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Expand Down Expand Up @@ -259,6 +263,6 @@ mod tests {
]"#]],
&["path", "length"],
Some("path"),
);
).await;
}
}
26 changes: 18 additions & 8 deletions crates/iceberg/src/inspect/metadata_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,31 +25,33 @@ use crate::table::Table;
/// - <https://iceberg.apache.org/docs/latest/spark-queries/#querying-with-sql>
/// - <https://py.iceberg.apache.org/api/#inspecting-tables>
#[derive(Debug)]
pub struct MetadataTable(Table);
pub struct MetadataTable<'a>(&'a Table);

impl MetadataTable {
impl<'a> MetadataTable<'a> {
/// Creates a new metadata scan.
pub fn new(table: Table) -> Self {
pub fn new(table: &'a Table) -> Self {
Self(table)
}

/// Get the snapshots table.
pub fn snapshots(&self) -> SnapshotsTable {
SnapshotsTable::new(&self.0)
SnapshotsTable::new(self.0)
}

/// Get the manifests table.
pub fn manifests(&self) -> ManifestsTable {
ManifestsTable::new(&self.0)
ManifestsTable::new(self.0)
}
}

#[cfg(test)]
pub mod tests {
use arrow_array::RecordBatch;
use expect_test::Expect;
use futures::TryStreamExt;
use itertools::Itertools;

use crate::scan::ArrowRecordBatchStream;

/// Snapshot testing to check the resulting record batch.
///
/// - `expected_schema/data`: put `expect![[""]]` as a placeholder,
Expand All @@ -58,13 +60,21 @@ pub mod tests {
/// Check the doc of [`expect_test`] for more details.
/// - `ignore_check_columns`: Some columns are not stable, so we can skip them.
/// - `sort_column`: The order of the data might be non-deterministic, so we can sort it by a column.
pub fn check_record_batch(
record_batch: RecordBatch,
pub async fn check_record_batches(
batch_stream: ArrowRecordBatchStream,
expected_schema: Expect,
expected_data: Expect,
ignore_check_columns: &[&str],
sort_column: Option<&str>,
) {
let record_batches = batch_stream.try_collect::<Vec<_>>().await.unwrap();
assert!(!record_batches.is_empty(), "Empty record batches");

// Combine record batches using the first batch's schema
let first_batch = record_batches.first().unwrap();
let record_batch =
arrow_select::concat::concat_batches(&first_batch.schema(), &record_batches).unwrap();

let mut columns = record_batch.columns().to_vec();
if let Some(sort_column) = sort_column {
let column = record_batch.column_by_name(sort_column).unwrap();
Expand Down
26 changes: 16 additions & 10 deletions crates/iceberg/src/inspect/snapshots.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ use arrow_array::builder::{MapBuilder, PrimitiveBuilder, StringBuilder};
use arrow_array::types::{Int64Type, TimestampMillisecondType};
use arrow_array::RecordBatch;
use arrow_schema::{DataType, Field, Schema, TimeUnit};
use futures::{stream, StreamExt};

use crate::scan::ArrowRecordBatchStream;
use crate::table::Table;
use crate::Result;

Expand Down Expand Up @@ -70,7 +72,7 @@ impl<'a> SnapshotsTable<'a> {
}

/// Scans the snapshots table.
pub fn scan(&self) -> Result<RecordBatch> {
pub async fn scan(&self) -> Result<ArrowRecordBatchStream> {
let mut committed_at =
PrimitiveBuilder::<TimestampMillisecondType>::new().with_timezone("+00:00");
let mut snapshot_id = PrimitiveBuilder::<Int64Type>::new();
Expand All @@ -92,30 +94,34 @@ impl<'a> SnapshotsTable<'a> {
summary.append(true)?;
}

Ok(RecordBatch::try_new(Arc::new(self.schema()), vec![
let batch = RecordBatch::try_new(Arc::new(self.schema()), vec![
Arc::new(committed_at.finish()),
Arc::new(snapshot_id.finish()),
Arc::new(parent_id.finish()),
Arc::new(operation.finish()),
Arc::new(manifest_list.finish()),
Arc::new(summary.finish()),
])?)
])?;

Ok(stream::iter(vec![Ok(batch)]).boxed())
}
}

#[cfg(test)]
mod tests {
use expect_test::expect;

use crate::inspect::metadata_table::tests::check_record_batch;
use crate::inspect::metadata_table::tests::check_record_batches;
use crate::scan::tests::TableTestFixture;

#[test]
fn test_snapshots_table() {
#[tokio::test]
async fn test_snapshots_table() {
let table = TableTestFixture::new().table;
let record_batch = table.inspect().snapshots().scan().unwrap();
check_record_batch(
record_batch,

let batch_stream = table.inspect().snapshots().scan().await.unwrap();

check_record_batches(
batch_stream,
expect![[r#"
Field { name: "committed_at", data_type: Timestamp(Millisecond, Some("+00:00")), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "snapshot_id", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Expand Down Expand Up @@ -178,6 +184,6 @@ mod tests {
]"#]],
&["manifest_list"],
Some("committed_at"),
);
).await;
}
}
2 changes: 1 addition & 1 deletion crates/iceberg/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ impl Table {

/// Creates a metadata table which provides table-like APIs for inspecting metadata.
/// See [`MetadataTable`] for more details.
pub fn inspect(self) -> MetadataTable {
pub fn inspect(&self) -> MetadataTable<'_> {
MetadataTable::new(self)
}

Expand Down

0 comments on commit 6e07faa

Please sign in to comment.