Skip to content

Commit

Permalink
Remove Sort expression (Expr::Sort)
Browse files Browse the repository at this point in the history
Remove sort as an expression, i.e. remove `Expr::Sort` from `Expr` enum.
Use `expr::Sort` directly when sorting.

The sort expression was used in context of ordering (sort, topk, create
table, file sorting).  Those places  require their sort expression to be
of type Sort anyway and no other expression was allowed, so this change
improves static typing.  Sort as an expression was illegal in other
contexts.
  • Loading branch information
findepi committed Aug 27, 2024
1 parent 840d843 commit 1f74d8e
Show file tree
Hide file tree
Showing 65 changed files with 708 additions and 876 deletions.
2 changes: 1 addition & 1 deletion datafusion-examples/examples/advanced_udwf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ async fn main() -> Result<()> {
let window_expr = smooth_it
.call(vec![col("speed")]) // smooth_it(speed)
.partition_by(vec![col("car")]) // PARTITION BY car
.order_by(vec![col("time").sort(true, true).to_expr()]) // ORDER BY time ASC
.order_by(vec![col("time").sort(true, true)]) // ORDER BY time ASC
.window_frame(WindowFrame::new(None))
.build()?;
let df = ctx.table("cars").await?.window(vec![window_expr])?;
Expand Down
2 changes: 1 addition & 1 deletion datafusion-examples/examples/expr_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ fn expr_fn_demo() -> Result<()> {
// such as `FIRST_VALUE(price FILTER quantity > 100 ORDER BY ts )
let agg = first_value
.call(vec![col("price")])
.order_by(vec![col("ts").sort(false, false).to_expr()])
.order_by(vec![col("ts").sort(false, false)])
.filter(col("quantity").gt(lit(100)))
.build()?; // build the aggregate
assert_eq!(
Expand Down
6 changes: 3 additions & 3 deletions datafusion-examples/examples/file_stream_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ mod non_windows {
use datafusion::datasource::TableProvider;
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion_common::{exec_err, Result};
use datafusion_expr::Expr;
use datafusion_expr::SortExpr;

// Number of lines written to FIFO
const TEST_BATCH_SIZE: usize = 5;
Expand All @@ -49,7 +49,7 @@ mod non_windows {
fn fifo_table(
schema: SchemaRef,
path: impl Into<PathBuf>,
sort: Vec<Vec<Expr>>,
sort: Vec<Vec<SortExpr>>,
) -> Arc<dyn TableProvider> {
let source = FileStreamProvider::new_file(schema, path.into())
.with_batch_size(TEST_BATCH_SIZE)
Expand Down Expand Up @@ -157,7 +157,7 @@ mod non_windows {
]));

// Specify the ordering:
let order = vec![vec![datafusion_expr::col("a1").sort(true, false).to_expr()]];
let order = vec![vec![datafusion_expr::col("a1").sort(true, false)]];

let provider = fifo_table(schema.clone(), fifo_path, order.clone());
ctx.register_table("fifo", provider)?;
Expand Down
2 changes: 1 addition & 1 deletion datafusion-examples/examples/parse_sql_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ async fn query_parquet_demo() -> Result<()> {
)?
// Directly parsing the SQL text into a sort expression is not supported yet, so
// construct it programmatically
.sort(vec![col("double_col").sort(false, false).to_expr()])?
.sort(vec![col("double_col").sort(false, false)])?
.limit(0, Some(1))?;

let result = df.collect().await?;
Expand Down
2 changes: 1 addition & 1 deletion datafusion-examples/examples/simple_udwf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ async fn main() -> Result<()> {
let window_expr = smooth_it
.call(vec![col("speed")]) // smooth_it(speed)
.partition_by(vec![col("car")]) // PARTITION BY car
.order_by(vec![col("time").sort(true, true).to_expr()]) // ORDER BY time ASC
.order_by(vec![col("time").sort(true, true)]) // ORDER BY time ASC
.window_frame(WindowFrame::new(None))
.build()?;
let df = ctx.table("cars").await?.window(vec![window_expr])?;
Expand Down
73 changes: 38 additions & 35 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ use datafusion_common::config::{CsvOptions, JsonOptions};
use datafusion_common::{
plan_err, Column, DFSchema, DataFusionError, ParamValues, SchemaError, UnnestOptions,
};
use datafusion_expr::{case, is_null, lit};
use datafusion_expr::{case, is_null, lit, SortExpr};
use datafusion_expr::{
utils::COUNT_STAR_EXPANSION, TableProviderFilterPushDown, UNNAMED_TABLE,
};
Expand All @@ -62,7 +62,6 @@ use datafusion_functions_aggregate::expr_fn::{

use async_trait::async_trait;
use datafusion_catalog::Session;
use datafusion_expr::expr::sort_vec_from_expr;

/// Contains options that control how data is
/// written out from a DataFrame
Expand Down Expand Up @@ -578,7 +577,7 @@ impl DataFrame {
self,
on_expr: Vec<Expr>,
select_expr: Vec<Expr>,
sort_expr: Option<Vec<Expr>>,
sort_expr: Option<Vec<SortExpr>>,
) -> Result<DataFrame> {
let plan = LogicalPlanBuilder::from(self.plan)
.distinct_on(on_expr, select_expr, sort_expr)?
Expand Down Expand Up @@ -777,6 +776,15 @@ impl DataFrame {
})
}

/// Apply a sort by provided expressions with default direction
pub fn sort_by(self, expr: Vec<Expr>) -> Result<DataFrame> {
self.sort(
expr.into_iter()
.map(|e| e.sort(true, false))
.collect::<Vec<SortExpr>>(),
)
}

/// Sort the DataFrame by the specified sorting expressions.
///
/// Note that any expression can be turned into
Expand All @@ -792,16 +800,14 @@ impl DataFrame {
/// let ctx = SessionContext::new();
/// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
/// let df = df.sort(vec![
/// col("a").sort(true, true).to_expr(), // a ASC, nulls first
/// col("b").sort(false, false).to_expr(), // b DESC, nulls last
/// col("a").sort(true, true), // a ASC, nulls first
/// col("b").sort(false, false), // b DESC, nulls last
/// ])?;
/// # Ok(())
/// # }
/// ```
pub fn sort(self, expr: Vec<Expr>) -> Result<DataFrame> {
let plan = LogicalPlanBuilder::from(self.plan)
.sort(sort_vec_from_expr(expr))?
.build()?;
pub fn sort(self, expr: Vec<SortExpr>) -> Result<DataFrame> {
let plan = LogicalPlanBuilder::from(self.plan).sort(expr)?.build()?;
Ok(DataFrame {
session_state: self.session_state,
plan,
Expand Down Expand Up @@ -1322,7 +1328,7 @@ impl DataFrame {
/// let ctx = SessionContext::new();
/// // Sort the data by column "b" and write it to a new location
/// ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?
/// .sort(vec![col("b").sort(true, true).to_expr()])? // sort by b asc, nulls first
/// .sort(vec![col("b").sort(true, true)])? // sort by b asc, nulls first
/// .write_csv(
/// "output.csv",
/// DataFrameWriteOptions::new(),
Expand Down Expand Up @@ -1382,7 +1388,7 @@ impl DataFrame {
/// let ctx = SessionContext::new();
/// // Sort the data by column "b" and write it to a new location
/// ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?
/// .sort(vec![col("b").sort(true, true).to_expr()])? // sort by b asc, nulls first
/// .sort(vec![col("b").sort(true, true)])? // sort by b asc, nulls first
/// .write_json(
/// "output.json",
/// DataFrameWriteOptions::new(),
Expand Down Expand Up @@ -2406,10 +2412,7 @@ mod tests {

Expr::WindowFunction(w)
.null_treatment(NullTreatment::IgnoreNulls)
.order_by(vec![
col("c2").sort(true, true).to_expr(),
col("c3").sort(true, true).to_expr(),
])
.order_by(vec![col("c2").sort(true, true), col("c3").sort(true, true)])
.window_frame(WindowFrame::new_bounds(
WindowFrameUnits::Rows,
WindowFrameBound::Preceding(ScalarValue::UInt64(None)),
Expand Down Expand Up @@ -2499,7 +2502,7 @@ mod tests {
.unwrap()
.distinct()
.unwrap()
.sort(vec![col("c1").sort(true, true).to_expr()])
.sort(vec![col("c1").sort(true, true)])
.unwrap();

let df_results = plan.clone().collect().await?;
Expand Down Expand Up @@ -2530,7 +2533,7 @@ mod tests {
.distinct()
.unwrap()
// try to sort on some value not present in input to distinct
.sort(vec![col("c2").sort(true, true).to_expr()])
.sort(vec![col("c2").sort(true, true)])
.unwrap_err();
assert_eq!(err.strip_backtrace(), "Error during planning: For SELECT DISTINCT, ORDER BY expressions c2 must appear in select list");

Expand Down Expand Up @@ -2577,10 +2580,10 @@ mod tests {
.distinct_on(
vec![col("c1")],
vec![col("c1")],
Some(vec![col("c1").sort(true, true).to_expr()]),
Some(vec![col("c1").sort(true, true)]),
)
.unwrap()
.sort(vec![col("c1").sort(true, true).to_expr()])
.sort(vec![col("c1").sort(true, true)])
.unwrap();

let df_results = plan.clone().collect().await?;
Expand Down Expand Up @@ -2611,11 +2614,11 @@ mod tests {
.distinct_on(
vec![col("c1")],
vec![col("c1")],
Some(vec![col("c1").sort(true, true).to_expr()]),
Some(vec![col("c1").sort(true, true)]),
)
.unwrap()
// try to sort on some value not present in input to distinct
.sort(vec![col("c2").sort(true, true).to_expr()])
.sort(vec![col("c2").sort(true, true)])
.unwrap_err();
assert_eq!(err.strip_backtrace(), "Error during planning: For SELECT DISTINCT, ORDER BY expressions c2 must appear in select list");

Expand Down Expand Up @@ -3021,7 +3024,7 @@ mod tests {
)?
.sort(vec![
// make the test deterministic
col("t1.c1").sort(true, true).to_expr(),
col("t1.c1").sort(true, true),
])?
.limit(0, Some(1))?;

Expand Down Expand Up @@ -3098,7 +3101,7 @@ mod tests {
)?
.sort(vec![
// make the test deterministic
col("t1.c1").sort(true, true).to_expr(),
col("t1.c1").sort(true, true),
])?
.limit(0, Some(1))?;

Expand Down Expand Up @@ -3131,9 +3134,9 @@ mod tests {
.filter(col("c2").eq(lit(3)).and(col("c1").eq(lit("a"))))?
.sort(vec![
// make the test deterministic
col("c1").sort(true, true).to_expr(),
col("c2").sort(true, true).to_expr(),
col("c3").sort(true, true).to_expr(),
col("c1").sort(true, true),
col("c2").sort(true, true),
col("c3").sort(true, true),
])?
.limit(0, Some(1))?
.with_column("sum", col("c2") + col("c3"))?;
Expand Down Expand Up @@ -3211,12 +3214,12 @@ mod tests {
)?
.sort(vec![
// make the test deterministic
col("t1.c1").sort(true, true).to_expr(),
col("t1.c2").sort(true, true).to_expr(),
col("t1.c3").sort(true, true).to_expr(),
col("t2.c1").sort(true, true).to_expr(),
col("t2.c2").sort(true, true).to_expr(),
col("t2.c3").sort(true, true).to_expr(),
col("t1.c1").sort(true, true),
col("t1.c2").sort(true, true),
col("t1.c3").sort(true, true),
col("t2.c1").sort(true, true),
col("t2.c2").sort(true, true),
col("t2.c3").sort(true, true),
])?
.limit(0, Some(1))?;

Expand Down Expand Up @@ -3289,9 +3292,9 @@ mod tests {
.limit(0, Some(1))?
.sort(vec![
// make the test deterministic
col("c1").sort(true, true).to_expr(),
col("c2").sort(true, true).to_expr(),
col("c3").sort(true, true).to_expr(),
col("c1").sort(true, true),
col("c2").sort(true, true),
col("c3").sort(true, true),
])?
.select_columns(&["c1"])?;

Expand Down
3 changes: 1 addition & 2 deletions datafusion/core/src/datasource/listing/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,10 @@ pub fn expr_applicable_for_cols(col_names: &[String], expr: &Expr) -> bool {
}

// TODO other expressions are not handled yet:
// - AGGREGATE, WINDOW and SORT should not end up in filter conditions, except maybe in some edge cases
// - AGGREGATE and WINDOW should not end up in filter conditions, except maybe in some edge cases
// - Can `Wildcard` be considered as a `Literal`?
// - ScalarVariable could be `applicable`, but that would require access to the context
Expr::AggregateFunction { .. }
| Expr::Sort { .. }
| Expr::WindowFunction { .. }
| Expr::Wildcard { .. }
| Expr::Unnest { .. }
Expand Down
12 changes: 3 additions & 9 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ use datafusion_physical_expr::{

use async_trait::async_trait;
use datafusion_catalog::Session;
use datafusion_expr::expr::sort_vec_vec_to_expr;
use futures::{future, stream, StreamExt, TryStreamExt};
use itertools::Itertools;
use object_store::ObjectStore;
Expand Down Expand Up @@ -714,10 +713,7 @@ impl ListingTable {

/// If file_sort_order is specified, creates the appropriate physical expressions
fn try_create_output_ordering(&self) -> Result<Vec<LexOrdering>> {
create_ordering(
&self.table_schema,
&sort_vec_vec_to_expr(self.options.file_sort_order.clone()),
)
create_ordering(&self.table_schema, &self.options.file_sort_order)
}
}

Expand Down Expand Up @@ -1068,7 +1064,6 @@ mod tests {
use datafusion_physical_expr::PhysicalSortExpr;
use datafusion_physical_plan::ExecutionPlanProperties;

use datafusion_expr::expr::sort_vec_vec_from_expr;
use tempfile::TempDir;

#[tokio::test]
Expand Down Expand Up @@ -1159,6 +1154,7 @@ mod tests {

use crate::datasource::file_format::parquet::ParquetFormat;
use datafusion_physical_plan::expressions::col as physical_col;
use std::ops::Add;

// (file_sort_order, expected_result)
let cases = vec![
Expand Down Expand Up @@ -1207,9 +1203,7 @@ mod tests {
];

for (file_sort_order, expected_result) in cases {
let options = options.clone().with_file_sort_order(sort_vec_vec_from_expr(
sort_vec_vec_to_expr(file_sort_order),
));
let options = options.clone().with_file_sort_order(file_sort_order);

let config = ListingTableConfig::new(table_path.clone())
.with_listing_options(options)
Expand Down
3 changes: 1 addition & 2 deletions datafusion/core/src/datasource/listing_table_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ use datafusion_expr::CreateExternalTable;

use async_trait::async_trait;
use datafusion_catalog::Session;
use datafusion_expr::expr::sort_vec_vec_from_expr;

/// A `TableProviderFactory` capable of creating new `ListingTable`s
#[derive(Debug, Default)]
Expand Down Expand Up @@ -115,7 +114,7 @@ impl TableProviderFactory for ListingTableFactory {
.with_file_extension(file_extension)
.with_target_partitions(state.config().target_partitions())
.with_table_partition_cols(table_partition_cols)
.with_file_sort_order(sort_vec_vec_from_expr(cmd.order_exprs.clone()));
.with_file_sort_order(cmd.order_exprs.clone());

options
.validate_partitions(session_state, &table_path)
Expand Down
5 changes: 3 additions & 2 deletions datafusion/core/src/datasource/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use datafusion_physical_plan::metrics::MetricsSet;

use async_trait::async_trait;
use datafusion_catalog::Session;
use datafusion_expr::SortExpr;
use futures::StreamExt;
use log::debug;
use parking_lot::Mutex;
Expand All @@ -64,7 +65,7 @@ pub struct MemTable {
column_defaults: HashMap<String, Expr>,
/// Optional pre-known sort order(s). Must be `SortExpr`s.
/// inserting data into this table removes the order
pub sort_order: Arc<Mutex<Vec<Vec<Expr>>>>,
pub sort_order: Arc<Mutex<Vec<Vec<SortExpr>>>>,
}

impl MemTable {
Expand Down Expand Up @@ -118,7 +119,7 @@ impl MemTable {
///
/// Note that multiple sort orders are supported, if some are known to be
/// equivalent,
pub fn with_sort_order(self, mut sort_order: Vec<Vec<Expr>>) -> Self {
pub fn with_sort_order(self, mut sort_order: Vec<Vec<SortExpr>>) -> Self {
std::mem::swap(self.sort_order.lock().as_mut(), &mut sort_order);
self
}
Expand Down
Loading

0 comments on commit 1f74d8e

Please sign in to comment.