From 90abb688d5f0ace1717b61d1de02fa405a9df0e5 Mon Sep 17 00:00:00 2001 From: xxchan Date: Mon, 6 Jan 2025 01:54:15 +0800 Subject: [PATCH] feat(datafusion): support metadata tables for Datafusion Signed-off-by: xxchan --- Cargo.lock | 5 + Cargo.toml | 1 + crates/iceberg/Cargo.toml | 3 +- crates/iceberg/src/inspect/manifests.rs | 8 +- crates/iceberg/src/inspect/metadata_table.rs | 101 +++++-------- crates/iceberg/src/inspect/snapshots.rs | 27 ++-- crates/iceberg/src/lib.rs | 1 + crates/iceberg/src/test_utils.rs | 78 ++++++++++ crates/integrations/datafusion/Cargo.toml | 1 + .../src/physical_plan/metadata_scan.rs | 96 ++++++++++++ .../datafusion/src/physical_plan/mod.rs | 1 + crates/integrations/datafusion/src/schema.rs | 47 ++++-- .../datafusion/src/table/metadata_table.rs | 95 ++++++++++++ .../integrations/datafusion/src/table/mod.rs | 10 ++ .../tests/integration_datafusion_test.rs | 138 +++++++++++++++++- 15 files changed, 519 insertions(+), 93 deletions(-) create mode 100644 crates/iceberg/src/test_utils.rs create mode 100644 crates/integrations/datafusion/src/physical_plan/metadata_scan.rs create mode 100644 crates/integrations/datafusion/src/table/metadata_table.rs diff --git a/Cargo.lock b/Cargo.lock index 4e5e796be..71f52ff92 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2944,6 +2944,7 @@ dependencies = [ "serde_json", "serde_repr", "serde_with", + "strum", "tempfile", "tera", "tokio", @@ -3071,6 +3072,7 @@ dependencies = [ "anyhow", "async-trait", "datafusion", + "expect-test", "futures", "iceberg", "iceberg-catalog-memory", @@ -5756,6 +5758,9 @@ name = "strum" version = "0.26.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8fec0f0aef304996cf250b31b5a10dee7980c85da9d759361292b8bca5a18f06" +dependencies = [ + "strum_macros", +] [[package]] name = "strum_macros" diff --git a/Cargo.toml b/Cargo.toml index aef6b2274..d8f119d61 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -91,6 +91,7 @@ serde_derive = "1" serde_json = "1" serde_repr = "0.1.16" serde_with = "3.4" +strum = "0.26" tempfile = "3.15" tokio = { version = "1", default-features = false } typed-builder = "0.20" diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index 97e77a2c5..1adaf1066 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -58,6 +58,7 @@ bitvec = { workspace = true } bytes = { workspace = true } chrono = { workspace = true } derive_builder = { workspace = true } +expect-test = { workspace = true } fnv = { workspace = true } futures = { workspace = true } itertools = { workspace = true } @@ -78,6 +79,7 @@ serde_derive = { workspace = true } serde_json = { workspace = true } serde_repr = { workspace = true } serde_with = { workspace = true } +strum = { workspace = true, features = ["derive"] } tokio = { workspace = true, optional = true } typed-builder = { workspace = true } url = { workspace = true } @@ -86,7 +88,6 @@ zstd = { workspace = true } [dev-dependencies] ctor = { workspace = true } -expect-test = { workspace = true } iceberg-catalog-memory = { workspace = true } iceberg_test_utils = { path = "../test_utils", features = ["tests"] } pretty_assertions = { workspace = true } diff --git a/crates/iceberg/src/inspect/manifests.rs b/crates/iceberg/src/inspect/manifests.rs index ab63d2f6e..f01c21a17 100644 --- a/crates/iceberg/src/inspect/manifests.rs +++ b/crates/iceberg/src/inspect/manifests.rs @@ -166,9 +166,9 @@ impl<'a> ManifestsTable<'a> { #[cfg(test)] mod tests { use expect_test::expect; + use futures::TryStreamExt; - use crate::inspect::metadata_table::tests::check_record_batches; - use crate::scan::tests::TableTestFixture; + use crate::{scan::tests::TableTestFixture, test_utils::check_record_batches}; #[tokio::test] async fn test_manifests_table() { @@ -178,7 +178,7 @@ mod tests { let batch_stream = fixture.table.inspect().manifests().scan().await.unwrap(); check_record_batches( - batch_stream, + batch_stream.try_collect::>().await.unwrap(), expect![[r#" Field { name: "content", data_type: Int8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "path", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, @@ -263,6 +263,6 @@ mod tests { ]"#]], &["path", "length"], Some("path"), - ).await; + ); } } diff --git a/crates/iceberg/src/inspect/metadata_table.rs b/crates/iceberg/src/inspect/metadata_table.rs index 75dbc7472..92571db18 100644 --- a/crates/iceberg/src/inspect/metadata_table.rs +++ b/crates/iceberg/src/inspect/metadata_table.rs @@ -27,6 +27,43 @@ use crate::table::Table; #[derive(Debug)] pub struct MetadataTable<'a>(&'a Table); +/// Metadata table type. +#[derive(Debug, Clone, strum::EnumIter)] +pub enum MetadataTableType { + /// [`SnapshotsTable`] + Snapshots, + /// [`ManifestsTable`] + Manifests, +} + +impl MetadataTableType { + /// Returns the string representation of the metadata table type. + pub fn as_str(&self) -> &str { + match self { + MetadataTableType::Snapshots => "snapshots", + MetadataTableType::Manifests => "manifests", + } + } + + /// Returns all the metadata table types. + pub fn all_types() -> impl Iterator { + use strum::IntoEnumIterator; + Self::iter() + } +} + +impl TryFrom<&str> for MetadataTableType { + type Error = String; + + fn try_from(value: &str) -> std::result::Result { + match value { + "snapshots" => Ok(Self::Snapshots), + "manifests" => Ok(Self::Manifests), + _ => Err(format!("invalid metadata table type: {value}")), + } + } +} + impl<'a> MetadataTable<'a> { /// Creates a new metadata scan. pub fn new(table: &'a Table) -> Self { @@ -43,67 +80,3 @@ impl<'a> MetadataTable<'a> { ManifestsTable::new(self.0) } } - -#[cfg(test)] -pub mod tests { - use expect_test::Expect; - use futures::TryStreamExt; - use itertools::Itertools; - - use crate::scan::ArrowRecordBatchStream; - - /// Snapshot testing to check the resulting record batch. - /// - /// - `expected_schema/data`: put `expect![[""]]` as a placeholder, - /// and then run test with `UPDATE_EXPECT=1 cargo test` to automatically update the result, - /// or use rust-analyzer (see [video](https://github.com/rust-analyzer/expect-test)). - /// Check the doc of [`expect_test`] for more details. - /// - `ignore_check_columns`: Some columns are not stable, so we can skip them. - /// - `sort_column`: The order of the data might be non-deterministic, so we can sort it by a column. - pub async fn check_record_batches( - batch_stream: ArrowRecordBatchStream, - expected_schema: Expect, - expected_data: Expect, - ignore_check_columns: &[&str], - sort_column: Option<&str>, - ) { - let record_batches = batch_stream.try_collect::>().await.unwrap(); - assert!(!record_batches.is_empty(), "Empty record batches"); - - // Combine record batches using the first batch's schema - let first_batch = record_batches.first().unwrap(); - let record_batch = - arrow_select::concat::concat_batches(&first_batch.schema(), &record_batches).unwrap(); - - let mut columns = record_batch.columns().to_vec(); - if let Some(sort_column) = sort_column { - let column = record_batch.column_by_name(sort_column).unwrap(); - let indices = arrow_ord::sort::sort_to_indices(column, None, None).unwrap(); - columns = columns - .iter() - .map(|column| arrow_select::take::take(column.as_ref(), &indices, None).unwrap()) - .collect_vec(); - } - - expected_schema.assert_eq(&format!( - "{}", - record_batch.schema().fields().iter().format(",\n") - )); - expected_data.assert_eq(&format!( - "{}", - record_batch - .schema() - .fields() - .iter() - .zip_eq(columns) - .map(|(field, column)| { - if ignore_check_columns.contains(&field.name().as_str()) { - format!("{}: (skipped)", field.name()) - } else { - format!("{}: {:?}", field.name(), column) - } - }) - .format(",\n") - )); - } -} diff --git a/crates/iceberg/src/inspect/snapshots.rs b/crates/iceberg/src/inspect/snapshots.rs index 1ee89963d..854892e88 100644 --- a/crates/iceberg/src/inspect/snapshots.rs +++ b/crates/iceberg/src/inspect/snapshots.rs @@ -94,14 +94,17 @@ impl<'a> SnapshotsTable<'a> { summary.append(true)?; } - let batch = RecordBatch::try_new(Arc::new(self.schema()), vec![ - Arc::new(committed_at.finish()), - Arc::new(snapshot_id.finish()), - Arc::new(parent_id.finish()), - Arc::new(operation.finish()), - Arc::new(manifest_list.finish()), - Arc::new(summary.finish()), - ])?; + let batch = RecordBatch::try_new( + Arc::new(self.schema()), + vec![ + Arc::new(committed_at.finish()), + Arc::new(snapshot_id.finish()), + Arc::new(parent_id.finish()), + Arc::new(operation.finish()), + Arc::new(manifest_list.finish()), + Arc::new(summary.finish()), + ], + )?; Ok(stream::iter(vec![Ok(batch)]).boxed()) } @@ -110,9 +113,9 @@ impl<'a> SnapshotsTable<'a> { #[cfg(test)] mod tests { use expect_test::expect; + use futures::TryStreamExt; - use crate::inspect::metadata_table::tests::check_record_batches; - use crate::scan::tests::TableTestFixture; + use crate::{scan::tests::TableTestFixture, test_utils::check_record_batches}; #[tokio::test] async fn test_snapshots_table() { @@ -121,7 +124,7 @@ mod tests { let batch_stream = table.inspect().snapshots().scan().await.unwrap(); check_record_batches( - batch_stream, + batch_stream.try_collect::>().await.unwrap(), expect![[r#" Field { name: "committed_at", data_type: Timestamp(Millisecond, Some("+00:00")), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "snapshot_id", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, @@ -184,6 +187,6 @@ mod tests { ]"#]], &["manifest_list"], Some("committed_at"), - ).await; + ); } } diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs index fe5a52999..38eea34e7 100644 --- a/crates/iceberg/src/lib.rs +++ b/crates/iceberg/src/lib.rs @@ -83,6 +83,7 @@ pub mod transform; mod runtime; pub mod arrow; +pub mod test_utils; mod utils; pub mod writer; diff --git a/crates/iceberg/src/test_utils.rs b/crates/iceberg/src/test_utils.rs new file mode 100644 index 000000000..527d37bb6 --- /dev/null +++ b/crates/iceberg/src/test_utils.rs @@ -0,0 +1,78 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Test utilities. +//! This module is pub just for internal testing. +//! It is subject to change and is not intended to be used by external users. + +use arrow_array::RecordBatch; +use expect_test::Expect; +use itertools::Itertools; + +/// Snapshot testing to check the resulting record batch. +/// +/// - `expected_schema/data`: put `expect![[""]]` as a placeholder, +/// and then run test with `UPDATE_EXPECT=1 cargo test` to automatically update the result, +/// or use rust-analyzer (see [video](https://github.com/rust-analyzer/expect-test)). +/// Check the doc of [`expect_test`] for more details. +/// - `ignore_check_columns`: Some columns are not stable, so we can skip them. +/// - `sort_column`: The order of the data might be non-deterministic, so we can sort it by a column. +pub fn check_record_batches( + record_batches: Vec, + expected_schema: Expect, + expected_data: Expect, + ignore_check_columns: &[&str], + sort_column: Option<&str>, +) { + assert!(!record_batches.is_empty(), "Empty record batches"); + + // Combine record batches using the first batch's schema + let first_batch = record_batches.first().unwrap(); + let record_batch = + arrow_select::concat::concat_batches(&first_batch.schema(), &record_batches).unwrap(); + + let mut columns = record_batch.columns().to_vec(); + if let Some(sort_column) = sort_column { + let column = record_batch.column_by_name(sort_column).unwrap(); + let indices = arrow_ord::sort::sort_to_indices(column, None, None).unwrap(); + columns = columns + .iter() + .map(|column| arrow_select::take::take(column.as_ref(), &indices, None).unwrap()) + .collect_vec(); + } + + expected_schema.assert_eq(&format!( + "{}", + record_batch.schema().fields().iter().format(",\n") + )); + expected_data.assert_eq(&format!( + "{}", + record_batch + .schema() + .fields() + .iter() + .zip_eq(columns) + .map(|(field, column)| { + if ignore_check_columns.contains(&field.name().as_str()) { + format!("{}: (skipped)", field.name()) + } else { + format!("{}: {:?}", field.name(), column) + } + }) + .format(",\n") + )); +} diff --git a/crates/integrations/datafusion/Cargo.toml b/crates/integrations/datafusion/Cargo.toml index 81a94d839..479504ec2 100644 --- a/crates/integrations/datafusion/Cargo.toml +++ b/crates/integrations/datafusion/Cargo.toml @@ -37,5 +37,6 @@ iceberg = { workspace = true } tokio = { workspace = true } [dev-dependencies] +expect-test = { workspace = true } iceberg-catalog-memory = { workspace = true } tempfile = { workspace = true } diff --git a/crates/integrations/datafusion/src/physical_plan/metadata_scan.rs b/crates/integrations/datafusion/src/physical_plan/metadata_scan.rs new file mode 100644 index 000000000..b3fa6e2fb --- /dev/null +++ b/crates/integrations/datafusion/src/physical_plan/metadata_scan.rs @@ -0,0 +1,96 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use datafusion::catalog::TableProvider; +use datafusion::physical_expr::EquivalenceProperties; +use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; +use datafusion::physical_plan::stream::RecordBatchStreamAdapter; +use datafusion::physical_plan::{DisplayAs, ExecutionPlan, Partitioning, PlanProperties}; +use futures::{FutureExt, StreamExt}; + +use crate::metadata_table::IcebergMetadataTableProvider; + +#[derive(Debug)] +pub struct IcebergMetadataScan { + provider: IcebergMetadataTableProvider, + properties: PlanProperties, +} + +impl IcebergMetadataScan { + pub fn new(provider: IcebergMetadataTableProvider) -> Self { + let properties = PlanProperties::new( + EquivalenceProperties::new(provider.schema()), + Partitioning::UnknownPartitioning(1), + EmissionType::Incremental, + Boundedness::Bounded, + ); + Self { + provider, + properties, + } + } +} + +impl DisplayAs for IcebergMetadataScan { + fn fmt_as( + &self, + _t: datafusion::physical_plan::DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + write!(f, "IcebergMetadataScan") + } +} + +impl ExecutionPlan for IcebergMetadataScan { + fn name(&self) -> &str { + "IcebergMetadataScan" + } + + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn properties(&self) -> &PlanProperties { + &self.properties + } + + fn children(&self) -> Vec<&std::sync::Arc> { + vec![] + } + + fn with_new_children( + self: std::sync::Arc, + _children: Vec>, + ) -> datafusion::error::Result> { + Ok(self) + } + + fn execute( + &self, + _partition: usize, + _context: std::sync::Arc, + ) -> datafusion::error::Result { + let batches = self.provider.clone().scan(); + let schema = self.provider.schema(); + Ok(Box::pin(RecordBatchStreamAdapter::new( + schema, + batches + .into_stream() + .flat_map(|batches| futures::stream::iter(batches)), + ))) + } +} diff --git a/crates/integrations/datafusion/src/physical_plan/mod.rs b/crates/integrations/datafusion/src/physical_plan/mod.rs index 2fab109d7..58fb065dd 100644 --- a/crates/integrations/datafusion/src/physical_plan/mod.rs +++ b/crates/integrations/datafusion/src/physical_plan/mod.rs @@ -16,4 +16,5 @@ // under the License. pub(crate) mod expr_to_predicate; +pub(crate) mod metadata_scan; pub(crate) mod scan; diff --git a/crates/integrations/datafusion/src/schema.rs b/crates/integrations/datafusion/src/schema.rs index 3be6da426..3920ee73c 100644 --- a/crates/integrations/datafusion/src/schema.rs +++ b/crates/integrations/datafusion/src/schema.rs @@ -22,8 +22,9 @@ use std::sync::Arc; use async_trait::async_trait; use datafusion::catalog::SchemaProvider; use datafusion::datasource::TableProvider; -use datafusion::error::Result as DFResult; +use datafusion::error::{DataFusionError, Result as DFResult}; use futures::future::try_join_all; +use iceberg::inspect::MetadataTableType; use iceberg::{Catalog, NamespaceIdent, Result}; use crate::table::IcebergTableProvider; @@ -35,7 +36,7 @@ pub(crate) struct IcebergSchemaProvider { /// A `HashMap` where keys are table names /// and values are dynamic references to objects implementing the /// [`TableProvider`] trait. - tables: HashMap>, + tables: HashMap>, } impl IcebergSchemaProvider { @@ -69,13 +70,10 @@ impl IcebergSchemaProvider { ) .await?; - let tables: HashMap> = table_names + let tables: HashMap> = table_names .into_iter() .zip(providers.into_iter()) - .map(|(name, provider)| { - let provider = Arc::new(provider) as Arc; - (name, provider) - }) + .map(|(name, provider)| (name, Arc::new(provider))) .collect(); Ok(IcebergSchemaProvider { tables }) @@ -89,14 +87,43 @@ impl SchemaProvider for IcebergSchemaProvider { } fn table_names(&self) -> Vec { - self.tables.keys().cloned().collect() + self.tables + .keys() + .flat_map(|table_name| { + [table_name.clone()] + .into_iter() + .chain(MetadataTableType::all_types().map(|metadata_table_name| { + format!("{}${}", table_name.clone(), metadata_table_name.as_str()) + })) + }) + .collect() } fn table_exist(&self, name: &str) -> bool { - self.tables.contains_key(name) + if let Some((table_name, metadata_table_name)) = name.split_once('$') { + self.tables.contains_key(table_name) + && MetadataTableType::try_from(metadata_table_name).is_ok() + } else { + self.tables.contains_key(name) + } } async fn table(&self, name: &str) -> DFResult>> { - Ok(self.tables.get(name).cloned()) + if let Some((table_name, metadata_table_name)) = name.split_once('$') { + let metadata_table_type = + MetadataTableType::try_from(metadata_table_name).map_err(DataFusionError::Plan)?; + if let Some(table) = self.tables.get(table_name) { + let metadata_table = table.metadata_table(metadata_table_type); + return Ok(Some(Arc::new(metadata_table))); + } else { + return Ok(None); + } + } + + Ok(self + .tables + .get(name) + .cloned() + .map(|t| t as Arc)) } } diff --git a/crates/integrations/datafusion/src/table/metadata_table.rs b/crates/integrations/datafusion/src/table/metadata_table.rs new file mode 100644 index 000000000..8a7649e58 --- /dev/null +++ b/crates/integrations/datafusion/src/table/metadata_table.rs @@ -0,0 +1,95 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; +use std::sync::Arc; + +use async_trait::async_trait; +use datafusion::arrow::array::RecordBatch; +use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef; +use datafusion::catalog::Session; +use datafusion::datasource::{TableProvider, TableType}; +use datafusion::error::Result as DFResult; +use datafusion::logical_expr::Expr; +use datafusion::physical_plan::ExecutionPlan; +use futures::StreamExt; +use iceberg::inspect::MetadataTableType; +use iceberg::table::Table; + +use crate::physical_plan::metadata_scan::IcebergMetadataScan; +use crate::to_datafusion_error; + +/// Represents a [`TableProvider`] for the Iceberg [`Catalog`], +/// managing access to a [`MetadataTable`]. +#[derive(Debug, Clone)] +pub struct IcebergMetadataTableProvider { + pub(crate) table: Table, + pub(crate) r#type: MetadataTableType, +} + +#[async_trait] +impl TableProvider for IcebergMetadataTableProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> ArrowSchemaRef { + let metadata_table = self.table.inspect(); + match self.r#type { + MetadataTableType::Snapshots => metadata_table.snapshots().schema().into(), + MetadataTableType::Manifests => metadata_table.manifests().schema().into(), + } + } + + fn table_type(&self) -> TableType { + TableType::Base + } + + async fn scan( + &self, + _state: &dyn Session, + _projection: Option<&Vec>, + _filters: &[Expr], + _limit: Option, + ) -> DFResult> { + Ok(Arc::new(IcebergMetadataScan::new(self.clone()))) + } +} + +impl IcebergMetadataTableProvider { + pub async fn scan(self) -> Vec> { + let metadata_table = self.table.inspect(); + let stream = match self.r#type { + MetadataTableType::Snapshots => metadata_table.snapshots().scan().await, + MetadataTableType::Manifests => metadata_table.manifests().scan().await, + }; + let stream = match stream { + Ok(stream) => stream, + Err(e) => return vec![Err(to_datafusion_error(e))], + }; + // It seems hard to + // convert Pin>> + // to Pin>>> + // So we just collect the record batches. + // Metadata tables should not have too many data so this should be fine. + let record_batches: Vec> = stream + .map(|res| res.map_err(to_datafusion_error)) + .collect::>() + .await; + record_batches + } +} diff --git a/crates/integrations/datafusion/src/table/mod.rs b/crates/integrations/datafusion/src/table/mod.rs index 00c9e1322..aaa37d914 100644 --- a/crates/integrations/datafusion/src/table/mod.rs +++ b/crates/integrations/datafusion/src/table/mod.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +pub mod metadata_table; pub mod table_provider_factory; use std::any::Any; @@ -28,8 +29,10 @@ use datafusion::error::Result as DFResult; use datafusion::logical_expr::{Expr, TableProviderFilterPushDown}; use datafusion::physical_plan::ExecutionPlan; use iceberg::arrow::schema_to_arrow_schema; +use iceberg::inspect::MetadataTableType; use iceberg::table::Table; use iceberg::{Catalog, Error, ErrorKind, NamespaceIdent, Result, TableIdent}; +use metadata_table::IcebergMetadataTableProvider; use crate::physical_plan::scan::IcebergTableScan; @@ -107,6 +110,13 @@ impl IcebergTableProvider { schema, }) } + + pub(crate) fn metadata_table(&self, r#type: MetadataTableType) -> IcebergMetadataTableProvider { + IcebergMetadataTableProvider { + table: self.table.clone(), + r#type, + } + } } #[async_trait] diff --git a/crates/integrations/datafusion/tests/integration_datafusion_test.rs b/crates/integrations/datafusion/tests/integration_datafusion_test.rs index 715635e06..9e1223134 100644 --- a/crates/integrations/datafusion/tests/integration_datafusion_test.rs +++ b/crates/integrations/datafusion/tests/integration_datafusion_test.rs @@ -25,8 +25,10 @@ use datafusion::arrow::array::{Array, StringArray}; use datafusion::arrow::datatypes::{DataType, Field, Schema as ArrowSchema}; use datafusion::execution::context::SessionContext; use datafusion::parquet::arrow::PARQUET_FIELD_ID_META_KEY; +use expect_test::expect; use iceberg::io::FileIOBuilder; use iceberg::spec::{NestedField, PrimitiveType, Schema, StructType, Type}; +use iceberg::test_utils::check_record_batches; use iceberg::{Catalog, NamespaceIdent, Result, TableCreation}; use iceberg_catalog_memory::MemoryCatalog; use iceberg_datafusion::IcebergCatalogProvider; @@ -153,10 +155,16 @@ async fn test_provider_list_table_names() -> Result<()> { let provider = ctx.catalog("catalog").unwrap(); let schema = provider.schema("test_provider_list_table_names").unwrap(); - let expected = vec!["my_table"]; let result = schema.table_names(); - assert_eq!(result, expected); + expect![[r#" + [ + "my_table", + "my_table$snapshots", + "my_table$manifests", + ] + "#]] + .assert_debug_eq(&result); Ok(()) } @@ -295,3 +303,129 @@ async fn test_table_predict_pushdown() -> Result<()> { assert!(s.value(1).trim().contains(expected)); Ok(()) } + +#[tokio::test] +async fn test_metadata_table() -> Result<()> { + let iceberg_catalog = get_iceberg_catalog(); + let namespace = NamespaceIdent::new("ns".to_string()); + set_test_namespace(&iceberg_catalog, &namespace).await?; + + let schema = Schema::builder() + .with_schema_id(0) + .with_fields(vec![ + NestedField::required(1, "foo", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::optional(2, "bar", Type::Primitive(PrimitiveType::String)).into(), + ]) + .build()?; + let creation = get_table_creation(temp_path(), "t1", Some(schema))?; + iceberg_catalog.create_table(&namespace, creation).await?; + + let client = Arc::new(iceberg_catalog); + let catalog = Arc::new(IcebergCatalogProvider::try_new(client).await?); + + let ctx = SessionContext::new(); + ctx.register_catalog("catalog", catalog); + let snapshots = ctx + .sql("select * from catalog.ns.t1$snapshots") + .await + .unwrap() + .collect() + .await + .unwrap(); + check_record_batches( + snapshots, + expect![[r#" + Field { name: "committed_at", data_type: Timestamp(Millisecond, Some("+00:00")), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, + Field { name: "snapshot_id", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, + Field { name: "parent_id", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, + Field { name: "operation", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, + Field { name: "manifest_list", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, + Field { name: "summary", data_type: Map(Field { name: "entries", data_type: Struct([Field { name: "keys", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "values", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, false), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }"#]], + expect![[r#" + committed_at: PrimitiveArray + [ + ], + snapshot_id: PrimitiveArray + [ + ], + parent_id: PrimitiveArray + [ + ], + operation: StringArray + [ + ], + manifest_list: StringArray + [ + ], + summary: MapArray + [ + ]"#]], + &[], + None, + ); + + let manifests = ctx + .sql("select * from catalog.ns.t1$manifests") + .await + .unwrap() + .collect() + .await + .unwrap(); + check_record_batches( + manifests, + expect![[r#" + Field { name: "content", data_type: Int8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, + Field { name: "path", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, + Field { name: "length", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, + Field { name: "partition_spec_id", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, + Field { name: "added_snapshot_id", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, + Field { name: "added_data_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, + Field { name: "existing_data_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, + Field { name: "deleted_data_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, + Field { name: "added_delete_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, + Field { name: "existing_delete_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, + Field { name: "deleted_delete_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, + Field { name: "partition_summaries", data_type: List(Field { name: "item", data_type: Struct([Field { name: "contains_null", data_type: Boolean, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "contains_nan", data_type: Boolean, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "lower_bound", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "upper_bound", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }"#]], + expect![[r#" + content: PrimitiveArray + [ + ], + path: StringArray + [ + ], + length: PrimitiveArray + [ + ], + partition_spec_id: PrimitiveArray + [ + ], + added_snapshot_id: PrimitiveArray + [ + ], + added_data_files_count: PrimitiveArray + [ + ], + existing_data_files_count: PrimitiveArray + [ + ], + deleted_data_files_count: PrimitiveArray + [ + ], + added_delete_files_count: PrimitiveArray + [ + ], + existing_delete_files_count: PrimitiveArray + [ + ], + deleted_delete_files_count: PrimitiveArray + [ + ], + partition_summaries: ListArray + [ + ]"#]], + &[], + None, + ); + + Ok(()) +}