Skip to content

Commit

Permalink
feat(datafusion): support metadata tables for Datafusion
Browse files Browse the repository at this point in the history
Signed-off-by: xxchan <[email protected]>
  • Loading branch information
xxchan committed Jan 8, 2025
1 parent 6e07faa commit 90abb68
Show file tree
Hide file tree
Showing 15 changed files with 519 additions and 93 deletions.
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ serde_derive = "1"
serde_json = "1"
serde_repr = "0.1.16"
serde_with = "3.4"
strum = "0.26"
tempfile = "3.15"
tokio = { version = "1", default-features = false }
typed-builder = "0.20"
Expand Down
3 changes: 2 additions & 1 deletion crates/iceberg/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ bitvec = { workspace = true }
bytes = { workspace = true }
chrono = { workspace = true }
derive_builder = { workspace = true }
expect-test = { workspace = true }
fnv = { workspace = true }
futures = { workspace = true }
itertools = { workspace = true }
Expand All @@ -78,6 +79,7 @@ serde_derive = { workspace = true }
serde_json = { workspace = true }
serde_repr = { workspace = true }
serde_with = { workspace = true }
strum = { workspace = true, features = ["derive"] }
tokio = { workspace = true, optional = true }
typed-builder = { workspace = true }
url = { workspace = true }
Expand All @@ -86,7 +88,6 @@ zstd = { workspace = true }

[dev-dependencies]
ctor = { workspace = true }
expect-test = { workspace = true }
iceberg-catalog-memory = { workspace = true }
iceberg_test_utils = { path = "../test_utils", features = ["tests"] }
pretty_assertions = { workspace = true }
Expand Down
8 changes: 4 additions & 4 deletions crates/iceberg/src/inspect/manifests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,9 @@ impl<'a> ManifestsTable<'a> {
#[cfg(test)]
mod tests {
use expect_test::expect;
use futures::TryStreamExt;

use crate::inspect::metadata_table::tests::check_record_batches;
use crate::scan::tests::TableTestFixture;
use crate::{scan::tests::TableTestFixture, test_utils::check_record_batches};

#[tokio::test]
async fn test_manifests_table() {
Expand All @@ -178,7 +178,7 @@ mod tests {
let batch_stream = fixture.table.inspect().manifests().scan().await.unwrap();

check_record_batches(
batch_stream,
batch_stream.try_collect::<Vec<_>>().await.unwrap(),
expect![[r#"
Field { name: "content", data_type: Int8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "path", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Expand Down Expand Up @@ -263,6 +263,6 @@ mod tests {
]"#]],
&["path", "length"],
Some("path"),
).await;
);
}
}
101 changes: 37 additions & 64 deletions crates/iceberg/src/inspect/metadata_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,43 @@ use crate::table::Table;
#[derive(Debug)]
pub struct MetadataTable<'a>(&'a Table);

/// Metadata table type.
#[derive(Debug, Clone, strum::EnumIter)]
pub enum MetadataTableType {
/// [`SnapshotsTable`]
Snapshots,
/// [`ManifestsTable`]
Manifests,
}

impl MetadataTableType {
/// Returns the string representation of the metadata table type.
pub fn as_str(&self) -> &str {
match self {
MetadataTableType::Snapshots => "snapshots",
MetadataTableType::Manifests => "manifests",
}
}

/// Returns all the metadata table types.
pub fn all_types() -> impl Iterator<Item = Self> {
use strum::IntoEnumIterator;
Self::iter()
}
}

impl TryFrom<&str> for MetadataTableType {
type Error = String;

fn try_from(value: &str) -> std::result::Result<Self, String> {
match value {
"snapshots" => Ok(Self::Snapshots),
"manifests" => Ok(Self::Manifests),
_ => Err(format!("invalid metadata table type: {value}")),
}
}
}

impl<'a> MetadataTable<'a> {
/// Creates a new metadata scan.
pub fn new(table: &'a Table) -> Self {
Expand All @@ -43,67 +80,3 @@ impl<'a> MetadataTable<'a> {
ManifestsTable::new(self.0)
}
}

#[cfg(test)]
pub mod tests {
use expect_test::Expect;
use futures::TryStreamExt;
use itertools::Itertools;

use crate::scan::ArrowRecordBatchStream;

/// Snapshot testing to check the resulting record batch.
///
/// - `expected_schema/data`: put `expect![[""]]` as a placeholder,
/// and then run test with `UPDATE_EXPECT=1 cargo test` to automatically update the result,
/// or use rust-analyzer (see [video](https://github.com/rust-analyzer/expect-test)).
/// Check the doc of [`expect_test`] for more details.
/// - `ignore_check_columns`: Some columns are not stable, so we can skip them.
/// - `sort_column`: The order of the data might be non-deterministic, so we can sort it by a column.
pub async fn check_record_batches(
batch_stream: ArrowRecordBatchStream,
expected_schema: Expect,
expected_data: Expect,
ignore_check_columns: &[&str],
sort_column: Option<&str>,
) {
let record_batches = batch_stream.try_collect::<Vec<_>>().await.unwrap();
assert!(!record_batches.is_empty(), "Empty record batches");

// Combine record batches using the first batch's schema
let first_batch = record_batches.first().unwrap();
let record_batch =
arrow_select::concat::concat_batches(&first_batch.schema(), &record_batches).unwrap();

let mut columns = record_batch.columns().to_vec();
if let Some(sort_column) = sort_column {
let column = record_batch.column_by_name(sort_column).unwrap();
let indices = arrow_ord::sort::sort_to_indices(column, None, None).unwrap();
columns = columns
.iter()
.map(|column| arrow_select::take::take(column.as_ref(), &indices, None).unwrap())
.collect_vec();
}

expected_schema.assert_eq(&format!(
"{}",
record_batch.schema().fields().iter().format(",\n")
));
expected_data.assert_eq(&format!(
"{}",
record_batch
.schema()
.fields()
.iter()
.zip_eq(columns)
.map(|(field, column)| {
if ignore_check_columns.contains(&field.name().as_str()) {
format!("{}: (skipped)", field.name())
} else {
format!("{}: {:?}", field.name(), column)
}
})
.format(",\n")
));
}
}
27 changes: 15 additions & 12 deletions crates/iceberg/src/inspect/snapshots.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,17 @@ impl<'a> SnapshotsTable<'a> {
summary.append(true)?;
}

let batch = RecordBatch::try_new(Arc::new(self.schema()), vec![
Arc::new(committed_at.finish()),
Arc::new(snapshot_id.finish()),
Arc::new(parent_id.finish()),
Arc::new(operation.finish()),
Arc::new(manifest_list.finish()),
Arc::new(summary.finish()),
])?;
let batch = RecordBatch::try_new(
Arc::new(self.schema()),
vec![
Arc::new(committed_at.finish()),
Arc::new(snapshot_id.finish()),
Arc::new(parent_id.finish()),
Arc::new(operation.finish()),
Arc::new(manifest_list.finish()),
Arc::new(summary.finish()),
],
)?;

Ok(stream::iter(vec![Ok(batch)]).boxed())
}
Expand All @@ -110,9 +113,9 @@ impl<'a> SnapshotsTable<'a> {
#[cfg(test)]
mod tests {
use expect_test::expect;
use futures::TryStreamExt;

use crate::inspect::metadata_table::tests::check_record_batches;
use crate::scan::tests::TableTestFixture;
use crate::{scan::tests::TableTestFixture, test_utils::check_record_batches};

#[tokio::test]
async fn test_snapshots_table() {
Expand All @@ -121,7 +124,7 @@ mod tests {
let batch_stream = table.inspect().snapshots().scan().await.unwrap();

check_record_batches(
batch_stream,
batch_stream.try_collect::<Vec<_>>().await.unwrap(),
expect![[r#"
Field { name: "committed_at", data_type: Timestamp(Millisecond, Some("+00:00")), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "snapshot_id", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Expand Down Expand Up @@ -184,6 +187,6 @@ mod tests {
]"#]],
&["manifest_list"],
Some("committed_at"),
).await;
);
}
}
1 change: 1 addition & 0 deletions crates/iceberg/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ pub mod transform;
mod runtime;

pub mod arrow;
pub mod test_utils;
mod utils;
pub mod writer;

Expand Down
78 changes: 78 additions & 0 deletions crates/iceberg/src/test_utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Test utilities.
//! This module is pub just for internal testing.
//! It is subject to change and is not intended to be used by external users.
use arrow_array::RecordBatch;
use expect_test::Expect;
use itertools::Itertools;

/// Snapshot testing to check the resulting record batch.
///
/// - `expected_schema/data`: put `expect![[""]]` as a placeholder,
/// and then run test with `UPDATE_EXPECT=1 cargo test` to automatically update the result,
/// or use rust-analyzer (see [video](https://github.com/rust-analyzer/expect-test)).
/// Check the doc of [`expect_test`] for more details.
/// - `ignore_check_columns`: Some columns are not stable, so we can skip them.
/// - `sort_column`: The order of the data might be non-deterministic, so we can sort it by a column.
pub fn check_record_batches(
record_batches: Vec<RecordBatch>,
expected_schema: Expect,
expected_data: Expect,
ignore_check_columns: &[&str],
sort_column: Option<&str>,
) {
assert!(!record_batches.is_empty(), "Empty record batches");

// Combine record batches using the first batch's schema
let first_batch = record_batches.first().unwrap();
let record_batch =
arrow_select::concat::concat_batches(&first_batch.schema(), &record_batches).unwrap();

let mut columns = record_batch.columns().to_vec();
if let Some(sort_column) = sort_column {
let column = record_batch.column_by_name(sort_column).unwrap();
let indices = arrow_ord::sort::sort_to_indices(column, None, None).unwrap();
columns = columns
.iter()
.map(|column| arrow_select::take::take(column.as_ref(), &indices, None).unwrap())
.collect_vec();
}

expected_schema.assert_eq(&format!(
"{}",
record_batch.schema().fields().iter().format(",\n")
));
expected_data.assert_eq(&format!(
"{}",
record_batch
.schema()
.fields()
.iter()
.zip_eq(columns)
.map(|(field, column)| {
if ignore_check_columns.contains(&field.name().as_str()) {
format!("{}: (skipped)", field.name())
} else {
format!("{}: {:?}", field.name(), column)
}
})
.format(",\n")
));
}
1 change: 1 addition & 0 deletions crates/integrations/datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,6 @@ iceberg = { workspace = true }
tokio = { workspace = true }

[dev-dependencies]
expect-test = { workspace = true }
iceberg-catalog-memory = { workspace = true }
tempfile = { workspace = true }
Loading

0 comments on commit 90abb68

Please sign in to comment.