From e40c47f9ae5c70405bc495af34ed28beadd5a86c Mon Sep 17 00:00:00 2001 From: Marko Grujic Date: Wed, 29 Nov 2023 09:55:39 +0100 Subject: [PATCH] Replace unnecessary custom logic with DF provided functions --- src/context/logical.rs | 4 ++-- src/context/physical.rs | 39 ++++++++------------------------------- 2 files changed, 10 insertions(+), 33 deletions(-) diff --git a/src/context/logical.rs b/src/context/logical.rs index 54351cf8..7934415c 100644 --- a/src/context/logical.rs +++ b/src/context/logical.rs @@ -177,11 +177,11 @@ impl SeafowlContext { )) }; - if self.try_get_delta_table(old_table_name.to_owned()).await.is_err() { + if self.inner.table_provider(old_table_name.to_owned()).await.is_err() { return Err(Error::Plan( format!("Source table {old_table_name:?} doesn't exist") )) - } else if self.try_get_delta_table(new_table_name.to_owned()).await.is_ok() { + } else if self.inner.table_provider(new_table_name.to_owned()).await.is_ok() { return Err(Error::Plan( format!("Target table {new_table_name:?} already exists") )) diff --git a/src/context/physical.rs b/src/context/physical.rs index 13d08d7b..a9b82eb9 100644 --- a/src/context/physical.rs +++ b/src/context/physical.rs @@ -29,15 +29,13 @@ use datafusion::physical_optimizer::pruning::PruningPredicate; use datafusion::physical_plan::empty::EmptyExec; use datafusion::physical_plan::filter::FilterExec; use datafusion::physical_plan::projection::ProjectionExec; +use datafusion::physical_plan::{collect, execute_stream}; use datafusion::{ arrow::{datatypes::SchemaRef, record_batch::RecordBatch}, datasource::file_format::{parquet::ParquetFormat, FileFormat}, error::DataFusionError, execution::context::TaskContext, - physical_plan::{ - coalesce_partitions::CoalescePartitionsExec, EmptyRecordBatchStream, - ExecutionPlan, SendableRecordBatchStream, - }, + physical_plan::{ExecutionPlan, SendableRecordBatchStream}, sql::TableReference, }; use datafusion_expr::logical_plan::{ @@ -50,7 +48,6 @@ use deltalake::operations::transaction::commit; use deltalake::operations::vacuum::VacuumBuilder; use deltalake::protocol::{DeltaOperation, SaveMode}; use deltalake::DeltaTable; -use futures::TryStreamExt; use log::info; use std::borrow::Cow; use std::ops::Deref; @@ -777,34 +774,16 @@ impl SeafowlContext { &self, physical_plan: Arc, ) -> Result> { - let stream = self.execute_stream(physical_plan).await?; - stream.err_into().try_collect().await + let task_context = Arc::new(TaskContext::from(self.inner())); + collect(physical_plan, task_context).await } pub async fn execute_stream( &self, physical_plan: Arc, - ) -> Result { - match physical_plan.output_partitioning().partition_count() { - 0 => Ok(Box::pin(EmptyRecordBatchStream::new( - physical_plan.schema(), - ))), - 1 => self.execute_stream_partitioned(&physical_plan, 0).await, - _ => { - let plan: Arc = - Arc::new(CoalescePartitionsExec::new(physical_plan)); - self.execute_stream_partitioned(&plan, 0).await - } - } - } - - async fn execute_stream_partitioned( - &self, - physical_plan: &Arc, - partition: usize, ) -> Result { let task_context = Arc::new(TaskContext::from(self.inner())); - physical_plan.execute(partition, task_context) + execute_stream(physical_plan, task_context) } /// Append data from the provided file, creating a new schema/table if absent @@ -833,11 +812,9 @@ impl SeafowlContext { { Some(_) => { // Schema exists, check if existing table's schema matches the new one - match self.try_get_delta_table(&table_name).await { - Ok(mut table) => { - // Update table state to pick up the most recent schema - table.update().await?; - table_schema = Some(TableProvider::schema(&table)); + match self.inner.table_provider(&table_name).await { + Ok(table) => { + table_schema = Some(table.schema()); true } Err(_) => false,