diff --git a/crates/integration_tests/Cargo.toml b/crates/integration_tests/Cargo.toml index a047d7580..e0411b316 100644 --- a/crates/integration_tests/Cargo.toml +++ b/crates/integration_tests/Cargo.toml @@ -27,9 +27,11 @@ rust-version = { workspace = true } [dependencies] arrow-array = { workspace = true } arrow-schema = { workspace = true } +datafusion = "43" futures = { workspace = true } iceberg = { workspace = true } iceberg-catalog-rest = { workspace = true } +iceberg-datafusion = { workspace = true } iceberg_test_utils = { path = "../test_utils", features = ["tests"] } parquet = { workspace = true } tokio = { workspace = true } diff --git a/crates/integration_tests/testdata/spark/provision.py b/crates/integration_tests/testdata/spark/provision.py index 364e366b4..3b0742125 100755 --- a/crates/integration_tests/testdata/spark/provision.py +++ b/crates/integration_tests/testdata/spark/provision.py @@ -119,3 +119,25 @@ spark.sql("INSERT INTO rest.default.test_promote_column VALUES (19)") spark.sql("ALTER TABLE rest.default.test_promote_column ALTER COLUMN foo TYPE bigint") spark.sql("INSERT INTO rest.default.test_promote_column VALUES (25)") + +# Create a table with various types +spark.sql(""" +CREATE OR REPLACE TABLE rest.default.types_test USING ICEBERG AS +SELECT + CAST(s % 2 = 1 AS BOOLEAN) AS cboolean, + CAST(s % 256 - 128 AS TINYINT) AS ctinyint, + CAST(s AS SMALLINT) AS csmallint, + CAST(s AS INT) AS cint, + CAST(s AS BIGINT) AS cbigint, + CAST(s AS FLOAT) AS cfloat, + CAST(s AS DOUBLE) AS cdouble, + CAST(s / 100.0 AS DECIMAL(8, 2)) AS cdecimal, + CAST(DATE('1970-01-01') + s AS DATE) AS cdate, + CAST(from_unixtime(s) AS TIMESTAMP_NTZ) AS ctimestamp_ntz, + CAST(from_unixtime(s) AS TIMESTAMP) AS ctimestamp, + CAST(s AS STRING) AS cstring, + CAST(s AS BINARY) AS cbinary +FROM ( + SELECT EXPLODE(SEQUENCE(0, 1000)) AS s +); +""") diff --git a/crates/integration_tests/tests/datafusion.rs b/crates/integration_tests/tests/datafusion.rs new file mode 100644 index 000000000..1586298ff --- /dev/null +++ b/crates/integration_tests/tests/datafusion.rs @@ -0,0 +1,138 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; +use std::sync::Arc; + +use arrow_schema::TimeUnit; +use datafusion::arrow::datatypes::{DataType, Field, Schema}; +use datafusion::assert_batches_eq; +use datafusion::catalog::TableProvider; +use datafusion::error::DataFusionError; +use datafusion::prelude::SessionContext; +use iceberg::{Catalog, TableIdent}; +use iceberg_datafusion::IcebergTableProvider; +use iceberg_integration_tests::set_test_fixture; +use parquet::arrow::PARQUET_FIELD_ID_META_KEY; + +#[tokio::test] +async fn test_basic_queries() -> Result<(), DataFusionError> { + let fixture = set_test_fixture("datafusion_basic_read").await; + + let catalog = fixture.rest_catalog; + + let table = catalog + .load_table(&TableIdent::from_strs(["default", "types_test"]).unwrap()) + .await + .unwrap(); + + let ctx = SessionContext::new(); + + let table_provider = Arc::new( + IcebergTableProvider::try_new_from_table(table) + .await + .unwrap(), + ); + + let schema = table_provider.schema(); + + assert_eq!( + schema.as_ref(), + &Schema::new(vec![ + Field::new("cboolean", DataType::Boolean, true).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "1".to_string(), + )])), + Field::new("ctinyint", DataType::Int32, true).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "2".to_string(), + )])), + Field::new("csmallint", DataType::Int32, true).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "3".to_string(), + )])), + Field::new("cint", DataType::Int32, true).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "4".to_string(), + )])), + Field::new("cbigint", DataType::Int64, true).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "5".to_string(), + )])), + Field::new("cfloat", DataType::Float32, true).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "6".to_string(), + )])), + Field::new("cdouble", DataType::Float64, true).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "7".to_string(), + )])), + Field::new("cdecimal", DataType::Decimal128(8, 2), true).with_metadata(HashMap::from( + [(PARQUET_FIELD_ID_META_KEY.to_string(), "8".to_string(),)] + )), + Field::new("cdate", DataType::Date32, true).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "9".to_string(), + )])), + Field::new( + "ctimestamp_ntz", + DataType::Timestamp(TimeUnit::Microsecond, None), + true + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "10".to_string(), + )])), + Field::new( + "ctimestamp", + DataType::Timestamp(TimeUnit::Microsecond, Some(Arc::from("+00:00"))), + true + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "11".to_string(), + )])), + Field::new("cstring", DataType::Utf8, true).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "12".to_string(), + )])), + Field::new("cbinary", DataType::LargeBinary, true).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "13".to_string(), + )])), + ]) + ); + + ctx.register_table("types_table", table_provider)?; + + let batches = ctx + .sql("SELECT * FROM types_table ORDER BY cbigint LIMIT 3") + .await? + .collect() + .await?; + let expected = [ + "+----------+----------+-----------+------+---------+--------+---------+----------+------------+---------------------+----------------------+---------+----------+", + "| cboolean | ctinyint | csmallint | cint | cbigint | cfloat | cdouble | cdecimal | cdate | ctimestamp_ntz | ctimestamp | cstring | cbinary |", + "+----------+----------+-----------+------+---------+--------+---------+----------+------------+---------------------+----------------------+---------+----------+", + "| false | -128 | 0 | 0 | 0 | 0.0 | 0.0 | 0.00 | 1970-01-01 | 1970-01-01T00:00:00 | 1970-01-01T00:00:00Z | 0 | 00000000 |", + "| true | -127 | 1 | 1 | 1 | 1.0 | 1.0 | 0.01 | 1970-01-02 | 1970-01-01T00:00:01 | 1970-01-01T00:00:01Z | 1 | 00000001 |", + "| false | -126 | 2 | 2 | 2 | 2.0 | 2.0 | 0.02 | 1970-01-03 | 1970-01-01T00:00:02 | 1970-01-01T00:00:02Z | 2 | 00000002 |", + "+----------+----------+-----------+------+---------+--------+---------+----------+------------+---------------------+----------------------+---------+----------+", + ]; + assert_batches_eq!(expected, &batches); + Ok(()) +}