Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add generate_series() udtf (and introduce 'lazy' MemoryExec) #13540

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ members = [
"datafusion-examples/examples/ffi/ffi_module_loader",
"test-utils",
"benchmarks",
"datafusion/functions-table",
]
resolver = "2"

Expand Down Expand Up @@ -108,6 +109,7 @@ datafusion-functions = { path = "datafusion/functions", version = "43.0.0" }
datafusion-functions-aggregate = { path = "datafusion/functions-aggregate", version = "43.0.0" }
datafusion-functions-aggregate-common = { path = "datafusion/functions-aggregate-common", version = "43.0.0" }
datafusion-functions-nested = { path = "datafusion/functions-nested", version = "43.0.0" }
datafusion-functions-table = { path = "datafusion/functions-table", version = "43.0.0" }
datafusion-functions-window = { path = "datafusion/functions-window", version = "43.0.0" }
datafusion-functions-window-common = { path = "datafusion/functions-window-common", version = "43.0.0" }
datafusion-optimizer = { path = "datafusion/optimizer", version = "43.0.0", default-features = false }
Expand Down
24 changes: 24 additions & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions datafusion-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ datafusion = { path = "../datafusion/core", version = "43.0.0", features = [
"unicode_expressions",
"compression",
] }
datafusion-catalog = { path = "../datafusion/catalog", version = "43.0.0" }
dirs = "5.0.1"
env_logger = "0.11"
futures = "0.3"
Expand Down
2 changes: 1 addition & 1 deletion datafusion-cli/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ use async_trait::async_trait;

use datafusion::catalog::Session;
use datafusion::common::{plan_err, Column};
use datafusion::datasource::function::TableFunctionImpl;
use datafusion::datasource::TableProvider;
use datafusion::error::Result;
use datafusion::logical_expr::Expr;
use datafusion::physical_plan::memory::MemoryExec;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::scalar::ScalarValue;
use datafusion_catalog::TableFunctionImpl;
use parquet::basic::ConvertedType;
use parquet::data_type::{ByteArray, FixedLenByteArray};
use parquet::file::reader::FileReader;
Expand Down
1 change: 1 addition & 0 deletions datafusion-examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ async-trait = { workspace = true }
bytes = { workspace = true }
dashmap = { workspace = true }
datafusion = { workspace = true, default-features = true, features = ["avro"] }
datafusion-catalog = { workspace = true }
datafusion-common = { workspace = true, default-features = true }
datafusion-expr = { workspace = true }
datafusion-functions-window-common = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion datafusion-examples/examples/simple_udtf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@ use async_trait::async_trait;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::catalog::Session;
use datafusion::datasource::function::TableFunctionImpl;
use datafusion::datasource::TableProvider;
use datafusion::error::Result;
use datafusion::execution::context::ExecutionProps;
use datafusion::physical_plan::memory::MemoryExec;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::SessionContext;
use datafusion_catalog::TableFunctionImpl;
use datafusion_common::{plan_err, ScalarValue};
use datafusion_expr::simplify::SimplifyContext;
use datafusion_expr::{Expr, TableType};
Expand Down
41 changes: 40 additions & 1 deletion datafusion/catalog/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@ use arrow_schema::SchemaRef;
use async_trait::async_trait;
use datafusion_common::Result;
use datafusion_common::{not_impl_err, Constraints, Statistics};
use datafusion_expr::Expr;

use datafusion_expr::dml::InsertOp;
use datafusion_expr::{
CreateExternalTable, Expr, LogicalPlan, TableProviderFilterPushDown, TableType,
CreateExternalTable, LogicalPlan, TableProviderFilterPushDown, TableType,
};
use datafusion_physical_plan::ExecutionPlan;

Expand Down Expand Up @@ -297,3 +299,40 @@ pub trait TableProviderFactory: Debug + Sync + Send {
cmd: &CreateExternalTable,
) -> Result<Arc<dyn TableProvider>>;
}

/// A trait for table function implementations
pub trait TableFunctionImpl: Debug + Sync + Send {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pub trait TableFunctionImpl: Debug + Sync + Send {
/// Returns this function's name
fn name(&self) -> &str;

This is much more consistent with other UDFs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the part of the refactor, changing this requires several fixes. We can leave it to a separate PR to make this PR smaller

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also recommend to take name() inside TableFunctionImpl

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think doing it as a follow on PR makes sense to me -- perhaps we can file a ticket

Another thing that would be nice for a follow on PR is to move this trait into its own module (catalog/src/table_function.rs perhaps)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filed #13613

/// Create a table provider
fn call(&self, args: &[Expr]) -> Result<Arc<dyn TableProvider>>;
}

/// A table that uses a function to generate data
#[derive(Debug)]
pub struct TableFunction {
/// Name of the table function
name: String,
/// Function implementation
fun: Arc<dyn TableFunctionImpl>,
}

impl TableFunction {
/// Create a new table function
pub fn new(name: String, fun: Arc<dyn TableFunctionImpl>) -> Self {
Self { name, fun }
}

/// Get the name of the table function
pub fn name(&self) -> &str {
&self.name
}

/// Get the implementation of the table function
pub fn function(&self) -> &Arc<dyn TableFunctionImpl> {
&self.fun
}

/// Get the function implementation and generate a table
pub fn create_table_provider(&self, args: &[Expr]) -> Result<Arc<dyn TableProvider>> {
self.fun.call(args)
}
}
1 change: 1 addition & 0 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ datafusion-expr = { workspace = true }
datafusion-functions = { workspace = true }
datafusion-functions-aggregate = { workspace = true }
datafusion-functions-nested = { workspace = true, optional = true }
datafusion-functions-table = { workspace = true }
datafusion-functions-window = { workspace = true }
datafusion-optimizer = { workspace = true }
datafusion-physical-expr = { workspace = true }
Expand Down
63 changes: 0 additions & 63 deletions datafusion/core/src/datasource/function.rs

This file was deleted.

1 change: 0 additions & 1 deletion datafusion/core/src/datasource/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ pub mod default_table_source;
pub mod dynamic_file;
pub mod empty;
pub mod file_format;
pub mod function;
pub mod listing;
pub mod listing_table_factory;
pub mod memory;
Expand Down
9 changes: 5 additions & 4 deletions datafusion/core/src/execution/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,8 @@ use crate::{
catalog_common::memory::MemorySchemaProvider,
catalog_common::MemoryCatalogProvider,
dataframe::DataFrame,
datasource::{
function::{TableFunction, TableFunctionImpl},
listing::{ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl},
datasource::listing::{
ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
},
datasource::{provider_as_source, MemTable, ViewTable},
error::{DataFusionError, Result},
Expand Down Expand Up @@ -74,7 +73,9 @@ use crate::datasource::dynamic_file::DynamicListTableFactory;
use crate::execution::session_state::SessionStateBuilder;
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use datafusion_catalog::{DynamicFileCatalog, SessionStore, UrlTableFactory};
use datafusion_catalog::{
DynamicFileCatalog, SessionStore, TableFunction, TableFunctionImpl, UrlTableFactory,
};
pub use datafusion_execution::config::SessionConfig;
pub use datafusion_execution::TaskContext;
pub use datafusion_expr::execution_props::ExecutionProps;
Expand Down
17 changes: 15 additions & 2 deletions datafusion/core/src/execution/session_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use crate::catalog_common::information_schema::{
use crate::catalog_common::MemoryCatalogProviderList;
use crate::datasource::cte_worktable::CteWorkTable;
use crate::datasource::file_format::{format_as_file_type, FileFormatFactory};
use crate::datasource::function::{TableFunction, TableFunctionImpl};
use crate::datasource::provider_as_source;
use crate::execution::context::{EmptySerializerRegistry, FunctionFactory, QueryPlanner};
use crate::execution::SessionStateDefaults;
Expand All @@ -33,7 +32,7 @@ use crate::physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner};
use arrow_schema::{DataType, SchemaRef};
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use datafusion_catalog::Session;
use datafusion_catalog::{Session, TableFunction, TableFunctionImpl};
use datafusion_common::alias::AliasGenerator;
use datafusion_common::config::{ConfigExtension, ConfigOptions, TableOptions};
use datafusion_common::display::{PlanType, StringifiedPlan, ToStringifiedPlan};
Expand Down Expand Up @@ -1074,6 +1073,7 @@ impl SessionStateBuilder {
.with_scalar_functions(SessionStateDefaults::default_scalar_functions())
.with_aggregate_functions(SessionStateDefaults::default_aggregate_functions())
.with_window_functions(SessionStateDefaults::default_window_functions())
.with_table_function_list(SessionStateDefaults::default_table_functions())
}

/// Set the session id.
Expand Down Expand Up @@ -1188,6 +1188,19 @@ impl SessionStateBuilder {
self
}

/// Set the list of [`TableFunction`]s
pub fn with_table_function_list(
mut self,
table_functions: Vec<Arc<TableFunction>>,
) -> Self {
let functions = table_functions
.into_iter()
.map(|f| (f.name().to_string(), f))
.collect();
self.table_functions = Some(functions);
self
}

/// Set the map of [`ScalarUDF`]s
pub fn with_scalar_functions(
mut self,
Expand Down
8 changes: 7 additions & 1 deletion datafusion/core/src/execution/session_state_defaults.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ use crate::datasource::provider::DefaultTableFactory;
use crate::execution::context::SessionState;
#[cfg(feature = "nested_expressions")]
use crate::functions_nested;
use crate::{functions, functions_aggregate, functions_window};
use crate::{functions, functions_aggregate, functions_table, functions_window};
use datafusion_catalog::TableFunction;
use datafusion_execution::config::SessionConfig;
use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_execution::runtime_env::RuntimeEnv;
Expand Down Expand Up @@ -119,6 +120,11 @@ impl SessionStateDefaults {
functions_window::all_default_window_functions()
}

/// returns the list of default [`TableFunction`]s
pub fn default_table_functions() -> Vec<Arc<TableFunction>> {
functions_table::all_default_table_functions()
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we get the defaults from a singleton ? Like other default getters, with get_or_init or smth similar

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good idea, updated

/// returns the list of default [`FileFormatFactory']'s
pub fn default_file_formats() -> Vec<Arc<dyn FileFormatFactory>> {
let file_formats: Vec<Arc<dyn FileFormatFactory>> = vec![
Expand Down
5 changes: 5 additions & 0 deletions datafusion/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -773,6 +773,11 @@ pub mod functions_window {
pub use datafusion_functions_window::*;
}

/// re-export of [`datafusion_functions_table`] crate
pub mod functions_table {
pub use datafusion_functions_table::*;
}

/// re-export of variable provider for `@name` and `@@name` style runtime values.
pub mod variable {
pub use datafusion_expr::var_provider::{VarProvider, VarType};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ use arrow::csv::ReaderBuilder;
use async_trait::async_trait;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::datasource::function::TableFunctionImpl;
use datafusion::datasource::TableProvider;
use datafusion::error::Result;
use datafusion::execution::TaskContext;
use datafusion::physical_plan::memory::MemoryExec;
use datafusion::physical_plan::{collect, ExecutionPlan};
use datafusion::prelude::SessionContext;
use datafusion_catalog::Session;
use datafusion_catalog::TableFunctionImpl;
use datafusion_common::{assert_batches_eq, DFSchema, ScalarValue};
use datafusion_expr::{EmptyRelation, Expr, LogicalPlan, Projection, TableType};
use std::fs::File;
Expand Down
Loading
Loading