-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add generate_series() udtf (and introduce 'lazy' MemoryExec
)
#13540
base: main
Are you sure you want to change the base?
Conversation
) -> Result<Self> { | ||
let cache = PlanProperties::new( | ||
EquivalenceProperties::new(Arc::clone(&schema)), | ||
Partitioning::RoundRobinBatch(generators.len()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this field mean: Let's say a exec only have 1 generator, the optimizer will try to insert a RepartitionExec
to the output of StreamingMemoryExec
, and use round robin to repartition output batches to target_partitions
number?
I found it works on some aggregate queries, but not for a sort query
> explain select * from generate_series(1, 10000) as t1(v1) order by v1;
+---------------+----------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+----------------------------------------------------------------------------------------------------------------+
| logical_plan | Sort: t1.v1 ASC NULLS LAST |
| | SubqueryAlias: t1 |
| | Projection: tmp_table.value AS v1 |
| | TableScan: tmp_table projection=[value] |
| physical_plan | SortExec: expr=[v1@0 ASC NULLS LAST], preserve_partitioning=[false] |
| | ProjectionExec: expr=[value@0 as v1] |
| | StreamingMemoryExec: partitions=1, batch_generators=[generate_series: start=1, end=10000, batch_size=8192] |
| | |
+---------------+----------------------------------------------------------------------------------------------------------------+
Parallelized sort should look like
> explain select * from lineitem order by l_orderkey;
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Sort: lineitem.l_orderkey ASC NULLS LAST |
| | TableScan: lineitem projection=[l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment] |
| physical_plan | SortPreservingMergeExec: [l_orderkey@0 ASC NULLS LAST] |
| | SortExec: expr=[l_orderkey@0 ASC NULLS LAST], preserve_partitioning=[true] |
| | ParquetExec: file_groups={14 groups: [[Users/yongting/Code/datafusion/benchmarks/data/tpch_sf1/lineitem/part-0.parquet:0..11525426], [Users/yongting/Code/datafusion/benchmarks/data/tpch_sf1/lineitem/part-0.parquet:11525426..20311205, Users/yongting/Code/datafusion/benchmarks/data/tpch_sf1/lineitem/part-1.parquet:0..2739647], [Users/yongting/Code/datafusion/benchmarks/data/tpch_sf1/lineitem/part-1.parquet:2739647..14265073], [Users/yongting/Code/datafusion/benchmarks/data/tpch_sf1/lineitem/part-1.parquet:14265073..20193593, Users/yongting/Code/datafusion/benchmarks/data/tpch_sf1/lineitem/part-2.parquet:0..5596906], [Users/yongting/Code/datafusion/benchmarks/data/tpch_sf1/lineitem/part-2.parquet:5596906..17122332], ...]}, projection=[l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment] |
| | |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
I want to know is it correct here, and this sort query in theory can be made parallelized by changing somewhere in optimization phase? 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe you can look at https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.ExecutionPlan.html#method.repartitioned or similar methods 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure about this too
/// Stream that generates record batches on demand | ||
pub struct StreamingMemoryStream { | ||
schema: SchemaRef, | ||
generator: Box<dyn StreamingBatchGenerator>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we use Arc
given it makes sense to have generator generate across threads
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, Arc
is more flexible: implementation can choose to let a StreamingBatchGenerate
share between multiple streams, or create separate generators for each stream
/// Schema representing the data | ||
schema: SchemaRef, | ||
/// Functions to generate batches for each partition | ||
batch_generators: Vec<Box<dyn StreamingBatchGenerator>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
batch_generators: Vec<Box<dyn StreamingBatchGenerator>>, | |
batch_generators: Vec<Arc<dyn StreamingBatchGenerator>>, |
@@ -297,3 +299,40 @@ pub trait TableProviderFactory: Debug + Sync + Send { | |||
cmd: &CreateExternalTable, | |||
) -> Result<Arc<dyn TableProvider>>; | |||
} | |||
|
|||
/// A trait for table function implementations | |||
pub trait TableFunctionImpl: Debug + Sync + Send { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pub trait TableFunctionImpl: Debug + Sync + Send { | |
/// Returns this function's name | |
fn name(&self) -> &str; |
This is much more consistent with other UDFs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the part of the refactor, changing this requires several fixes. We can leave it to a separate PR to make this PR smaller
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also recommend to take name()
inside TableFunctionImpl
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think doing it as a follow on PR makes sense to me -- perhaps we can file a ticket
Another thing that would be nice for a follow on PR is to move this trait into its own module (catalog/src/table_function.rs
perhaps)
statement error DataFusion error: Error during planning: Second argument must be an integer literal | ||
SELECT * FROM generate_series(1, NULL) | ||
|
||
statement error DataFusion error: Error during planning: generate_series expects 2 arguments |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
D SELECT * FROM generate_series(1, 5, null);
┌─────────────────┐
│ generate_series │
│ int64 │
├─────────────────┤
│ 0 rows │
└─────────────────┘
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll open an issue for 3-arg support later
Thank you for the comments @jayzhan211 , I have updated. Now I think the concurrent generator might not be very straightforward, I'll first leave some rationale here. Rationale for
|
Given the quick look, I'm not sure whether we need 3rd case. It seems the 1st case runs execution parallelly too. I would need to think about the advantage of the 3rd case over 1st case. For 3rd case, if we need it, we might use |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @2010YOUY01. I reviewed the changes and LGTM. I have a few minor comments and one question: I noticed another approach of generate_series()
, which can be used like this:
SELECT generate_series(1, 5)
I assume it is not a udtf in that context. Does this implementation leave room for that usage?
@@ -297,3 +299,40 @@ pub trait TableProviderFactory: Debug + Sync + Send { | |||
cmd: &CreateExternalTable, | |||
) -> Result<Arc<dyn TableProvider>>; | |||
} | |||
|
|||
/// A trait for table function implementations | |||
pub trait TableFunctionImpl: Debug + Sync + Send { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also recommend to take name()
inside TableFunctionImpl
pub fn default_table_functions() -> Vec<Arc<TableFunction>> { | ||
functions_table::all_default_table_functions() | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we get the defaults from a singleton ? Like other default getters, with get_or_init or smth similar
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a good idea, updated
#[derive(Debug, Clone)] | ||
struct GenerateSeriesState { | ||
schema: SchemaRef, | ||
_start: i64, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any reason to keep this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This field is for display, i added a comment to explain
// Check input `exprs` type and number. Input validity check (e.g. start <= end) | ||
// will be performed in `TableProvider::scan` | ||
fn call(&self, exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> { | ||
// TODO: support 3 arguments following DuckDB: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is also a one length argument usage, perhaps can be added as todo
@@ -365,8 +366,165 @@ impl RecordBatchStream for MemoryStream { | |||
} | |||
} | |||
|
|||
pub trait StreamingBatchGenerator: Send + Sync + fmt::Debug + fmt::Display { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not very satisfied with these Streaming...
naming. Can we describe the behavior better with a different naming for these? Perhaps we could use 'lazy' term
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree Lazy*
is less ambiguous, updated all prefixes
Thank you for the review @berkaysynnada The answer to the question is yes, they are different function with the same name. @jayzhan211 I thought about the sharing generator issue again, I think making a interface more flexible is important, so I kept the lock, and updated |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank ou @2010YOUY01 @jayzhan211 and @berkaysynnada
I left some structural comments / thoughts about how we might be able to make LazyMemoryExec more general (StreamExec
or StreamWrapperExec
) but in general I think this is a really nice first step and we could do additional refactorings as follow on PRs.
@@ -30,6 +30,7 @@ members = [ | |||
"datafusion/functions", | |||
"datafusion/functions-aggregate", | |||
"datafusion/functions-aggregate-common", | |||
"datafusion/functions-table", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
100%
@@ -297,3 +299,40 @@ pub trait TableProviderFactory: Debug + Sync + Send { | |||
cmd: &CreateExternalTable, | |||
) -> Result<Arc<dyn TableProvider>>; | |||
} | |||
|
|||
/// A trait for table function implementations | |||
pub trait TableFunctionImpl: Debug + Sync + Send { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think doing it as a follow on PR makes sense to me -- perhaps we can file a ticket
Another thing that would be nice for a follow on PR is to move this trait into its own module (catalog/src/table_function.rs
perhaps)
#[derive(Debug, Clone)] | ||
struct GenerateSeriesState { | ||
schema: SchemaRef, | ||
_start: i64, // Kept for display |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does it need to be prefixed with _
if it is used for display 🤔
// Check input `exprs` type and number. Input validity check (e.g. start <= end) | ||
// will be performed in `TableProvider::scan` | ||
fn call(&self, exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> { | ||
// TODO: support 1 or 3 arguments following DuckDB: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we possibly file a ticket to track this?
) -> Result<Self> { | ||
let cache = PlanProperties::new( | ||
EquivalenceProperties::new(Arc::clone(&schema)), | ||
Partitioning::RoundRobinBatch(generators.len()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe you can look at https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.ExecutionPlan.html#method.repartitioned or similar methods 🤔
schema: SchemaRef, | ||
/// Functions to generate batches for each partition | ||
batch_generators: Vec<Arc<RwLock<dyn LazyBatchGenerator>>>, | ||
/// Total number of rows to generate for statistics |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this comment doesn't seem quite right
@@ -365,8 +366,165 @@ impl RecordBatchStream for MemoryStream { | |||
} | |||
} | |||
|
|||
pub trait LazyBatchGenerator: Send + Sync + fmt::Debug + fmt::Display { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks almost identical to https://docs.rs/datafusion/latest/datafusion/execution/type.SendableRecordBatchStream.html
I wonder if we really need a new pub
trait? Using the existing one allows using all the combinators in futures::stream
That way this exec could be something generic that simply wrapped an input stream which I think would be useful for several other reasons. For example, implementing custom TableProviders as well as implementing read_stram
like
/// Read the contents of anything that made RecordBatchStreams
///
let df = df.read_stream(record_batch_stream)
.aggregate(col("foo"))
.collect()?
Which issue does this PR close?
Closes #10069
Rationale for this change
To make it easier to test memory-limited queries, i was looking for a datasource which can generate long streams, and itself doesn't consume lots of memory.
There are several existing approaches are close, but can't fully satisfy this requirement:
MemoryExec
have to buffer all output data up frontselect * from unnest(generate_series(1,10000))
also have to buffer all output during constructionSo in this PR, a new
StreamingMemoryExec
is introduced, it can be used to generate batches lazily by defining a 'iterator' onRecordBatch
in its interface. And a new user-defined table function is implemented with this new execution plan.This function's behavior is kept consistent with DuckDB's generate_series() table function, and only 2 argument variant is implemented for now
I also think this UDTF can be useful to tests/micro-benchmarks in general
What changes are included in this PR?
TableFunctionImpl
fromdatafusion
crate ->datafusion-catalog
crate, so new UDTFs defined indatafusion-functions-table
crate won't have circular dependency withdatafusion
crate (TableFunctionImpl
depends onTableProvider
indatafusion-catalog
crate, so I think it's the best place to move it to)StreamingMemoryExec
generate_series()
Are these changes tested?
Unit tests for
StreamingMemoryExec
, andsqllogictest
s forgenerate_series()
UDTFAre there any user-facing changes?
No