Skip to content

Commit

Permalink
[FEAT] connect: add parquet support
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewgazelka committed Dec 10, 2024
1 parent c8f8490 commit 0c6b7c8
Show file tree
Hide file tree
Showing 16 changed files with 313 additions and 35 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ chrono-tz = "0.10.0"
comfy-table = "7.1.1"
common-daft-config = {path = "src/common/daft-config"}
common-error = {path = "src/common/error", default-features = false}
common-file-formats = {path = "src/common/file-formats"}
common-runtime = {path = "src/common/runtime", default-features = false}
daft-core = {path = "src/daft-core"}
daft-dsl = {path = "src/daft-dsl"}
Expand Down
2 changes: 2 additions & 0 deletions src/daft-connect/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ arrow2 = {workspace = true, features = ["io_json_integration"]}
async-stream = "0.3.6"
color-eyre = "0.6.3"
common-daft-config = {workspace = true}
common-file-formats = {workspace = true}
common-runtime = {workspace = true}
daft-core = {workspace = true}
daft-dsl = {workspace = true}
daft-local-execution = {workspace = true}
Expand Down
15 changes: 6 additions & 9 deletions src/daft-connect/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,14 +142,10 @@ impl DaftSparkConnectService {
#[tonic::async_trait]
impl SparkConnectService for DaftSparkConnectService {
type ExecutePlanStream = std::pin::Pin<
Box<
dyn futures::Stream<Item = Result<ExecutePlanResponse, Status>> + Send + Sync + 'static,
>,
Box<dyn futures::Stream<Item = Result<ExecutePlanResponse, Status>> + Send + 'static>,
>;
type ReattachExecuteStream = std::pin::Pin<
Box<
dyn futures::Stream<Item = Result<ExecutePlanResponse, Status>> + Send + Sync + 'static,
>,
Box<dyn futures::Stream<Item = Result<ExecutePlanResponse, Status>> + Send + 'static>,
>;

#[tracing::instrument(skip_all)]
Expand Down Expand Up @@ -190,8 +186,9 @@ impl SparkConnectService for DaftSparkConnectService {
CommandType::RegisterFunction(_) => {
unimplemented_err!("RegisterFunction not implemented")
}
CommandType::WriteOperation(_) => {
unimplemented_err!("WriteOperation not implemented")
CommandType::WriteOperation(op) => {
let result = session.handle_write_command(op, operation).await?;
return Ok(Response::new(result));
}
CommandType::CreateDataframeView(_) => {
unimplemented_err!("CreateDataframeView not implemented")
Expand Down Expand Up @@ -305,7 +302,7 @@ impl SparkConnectService for DaftSparkConnectService {
return Err(Status::invalid_argument("op_type is required to be root"));
};

let result = match translation::relation_to_schema(relation) {
let result = match translation::relation_to_schema(relation).await {
Ok(schema) => schema,
Err(e) => {
return invalid_argument_err!(
Expand Down
1 change: 1 addition & 0 deletions src/daft-connect/src/op/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use uuid::Uuid;
use crate::{DaftSparkConnectService, Session};

mod root;
mod write;

pub type ExecuteStream = <DaftSparkConnectService as SparkConnectService>::ExecutePlanStream;

Expand Down
10 changes: 8 additions & 2 deletions src/daft-connect/src/op/execute/root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,14 @@ impl Session {
let (tx, rx) = tokio::sync::mpsc::channel::<eyre::Result<ExecutePlanResponse>>(1);
tokio::spawn(async move {
let execution_fut = async {
let Plan { builder, psets } = translation::to_logical_plan(command)?;
let optimized_plan = builder.optimize()?;
let Plan { builder, psets } = translation::to_logical_plan(command).await?;

// todo: convert optimize to async (looks like A LOT of work)... it touches a lot of API
// I tried and spent about an hour and gave up ~ Andrew Gazelka 🪦 2024-12-09
let optimized_plan = tokio::task::spawn_blocking(move || builder.optimize())
.await
.unwrap()?;

let cfg = Arc::new(DaftExecutionConfig::default());
let native_executor = NativeExecutor::from_logical_plan_builder(&optimized_plan)?;
let mut result_stream = native_executor.run(psets, cfg, None)?.into_stream();
Expand Down
145 changes: 145 additions & 0 deletions src/daft-connect/src/op/execute/write.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
use std::future::ready;

use common_daft_config::DaftExecutionConfig;
use common_file_formats::FileFormat;
use daft_local_execution::NativeExecutor;
use eyre::{bail, WrapErr};
use spark_connect::{
write_operation::{SaveMode, SaveType},
WriteOperation,
};
use tonic::Status;
use tracing::warn;

use crate::{
op::execute::{ExecuteStream, PlanIds},
session::Session,
translation,
};

impl Session {
pub async fn handle_write_command(
&self,
operation: WriteOperation,
operation_id: String,
) -> Result<ExecuteStream, Status> {
use futures::StreamExt;

let context = PlanIds {
session: self.client_side_session_id().to_string(),
server_side_session: self.server_side_session_id().to_string(),
operation: operation_id,
};

let finished = context.finished();

let result = async move {
let WriteOperation {
input,
source,
mode,
sort_column_names,
partitioning_columns,
bucket_by,
options,
clustering_columns,
save_type,
} = operation;

let Some(input) = input else {
bail!("Input is required");
};

let Some(source) = source else {
bail!("Source is required");
};

if source != "parquet" {
bail!("Unsupported source: {source}; only parquet is supported");
}

let Ok(mode) = SaveMode::try_from(mode) else {
bail!("Invalid save mode: {mode}");
};

if !sort_column_names.is_empty() {
// todo(completeness): implement sort
warn!("Ignoring sort_column_names: {sort_column_names:?} (not yet implemented)");
}

if !partitioning_columns.is_empty() {
// todo(completeness): implement partitioning
warn!(
"Ignoring partitioning_columns: {partitioning_columns:?} (not yet implemented)"
);
}

if let Some(bucket_by) = bucket_by {
// todo(completeness): implement bucketing
warn!("Ignoring bucket_by: {bucket_by:?} (not yet implemented)");
}

if !options.is_empty() {
// todo(completeness): implement options
warn!("Ignoring options: {options:?} (not yet implemented)");
}

if !clustering_columns.is_empty() {
// todo(completeness): implement clustering
warn!("Ignoring clustering_columns: {clustering_columns:?} (not yet implemented)");
}

match mode {
SaveMode::Unspecified => {}
SaveMode::Append => {}
SaveMode::Overwrite => {}
SaveMode::ErrorIfExists => {}
SaveMode::Ignore => {}
}

let Some(save_type) = save_type else {
bail!("Save type is required");
};

let path = match save_type {
SaveType::Path(path) => path,
SaveType::Table(table) => {
let name = table.table_name;
bail!("Tried to write to table {name} but it is not yet implemented. Try to write to a path instead.");
}
};

let mut plan = translation::to_logical_plan(input).await?;

plan.builder = plan
.builder
.table_write(&path, FileFormat::Parquet, None, None, None)
.wrap_err("Failed to create table write plan")?;

let optimized_plan = plan.builder.optimize()?;
let cfg = DaftExecutionConfig::default();
let native_executor = NativeExecutor::from_logical_plan_builder(&optimized_plan)?;
let mut result_stream = native_executor
.run(plan.psets, cfg.into(), None)?
.into_stream();

// this is so we make sure the operation is actually done
// before we return
//
// an example where this is important is if we write to a parquet file
// and then read immediately after, we need to wait for the write to finish
while let Some(_result) = result_stream.next().await {}

Ok(())
};

use futures::TryFutureExt;

let result = result.map_err(|e| Status::internal(format!("Error in Daft server: {e:?}")));

let future = result.and_then(|()| ready(Ok(finished)));
let stream = futures::stream::once(future);

Ok(Box::pin(stream))
}
}
36 changes: 23 additions & 13 deletions src/daft-connect/src/translation/logical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@ use tracing::warn;

use crate::translation::logical_plan::{
aggregate::aggregate, local_relation::local_relation, project::project, range::range,
to_df::to_df, with_columns::with_columns,
read::read, to_df::to_df, with_columns::with_columns,
};

mod aggregate;
mod local_relation;
mod project;
mod range;
mod read;
mod to_df;
mod with_columns;

Expand All @@ -39,7 +40,7 @@ impl From<LogicalPlanBuilder> for Plan {
}
}

pub fn to_logical_plan(relation: Relation) -> eyre::Result<Plan> {
pub async fn to_logical_plan(relation: Relation) -> eyre::Result<Plan> {
if let Some(common) = relation.common {
if common.origin.is_some() {
warn!("Ignoring common metadata for relation: {common:?}; not yet implemented");
Expand All @@ -51,31 +52,40 @@ pub fn to_logical_plan(relation: Relation) -> eyre::Result<Plan> {
};

match rel_type {
RelType::Limit(l) => limit(*l).wrap_err("Failed to apply limit to logical plan"),
RelType::Limit(l) => limit(*l)
.await
.wrap_err("Failed to apply limit to logical plan"),
RelType::Range(r) => range(r).wrap_err("Failed to apply range to logical plan"),
RelType::Project(p) => project(*p).wrap_err("Failed to apply project to logical plan"),
RelType::Aggregate(a) => {
aggregate(*a).wrap_err("Failed to apply aggregate to logical plan")
}
RelType::WithColumns(w) => {
with_columns(*w).wrap_err("Failed to apply with_columns to logical plan")
}
RelType::ToDf(t) => to_df(*t).wrap_err("Failed to apply to_df to logical plan"),
RelType::Project(p) => project(*p)
.await
.wrap_err("Failed to apply project to logical plan"),
RelType::Aggregate(a) => aggregate(*a)
.await
.wrap_err("Failed to apply aggregate to logical plan"),
RelType::WithColumns(w) => with_columns(*w)
.await
.wrap_err("Failed to apply with_columns to logical plan"),
RelType::ToDf(t) => to_df(*t)
.await
.wrap_err("Failed to apply to_df to logical plan"),
RelType::LocalRelation(l) => {
local_relation(l).wrap_err("Failed to apply local_relation to logical plan")
}
RelType::Read(r) => read(r)
.await
.wrap_err("Failed to apply read to logical plan"),
plan => bail!("Unsupported relation type: {plan:?}"),
}
}

fn limit(limit: Limit) -> eyre::Result<Plan> {
async fn limit(limit: Limit) -> eyre::Result<Plan> {
let Limit { input, limit } = limit;

let Some(input) = input else {
bail!("input must be set");
};

let mut plan = to_logical_plan(*input)?;
let mut plan = Box::pin(to_logical_plan(*input)).await?;
plan.builder = plan.builder.limit(i64::from(limit), false)?; // todo: eager or no

Ok(plan)
Expand Down
4 changes: 2 additions & 2 deletions src/daft-connect/src/translation/logical_plan/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use spark_connect::aggregate::GroupType;

use crate::translation::{logical_plan::Plan, to_daft_expr, to_logical_plan};

pub fn aggregate(aggregate: spark_connect::Aggregate) -> eyre::Result<Plan> {
pub async fn aggregate(aggregate: spark_connect::Aggregate) -> eyre::Result<Plan> {
let spark_connect::Aggregate {
input,
group_type,
Expand All @@ -17,7 +17,7 @@ pub fn aggregate(aggregate: spark_connect::Aggregate) -> eyre::Result<Plan> {
bail!("input is required");
};

let mut plan = to_logical_plan(*input)?;
let mut plan = Box::pin(to_logical_plan(*input)).await?;

let group_type = GroupType::try_from(group_type)
.wrap_err_with(|| format!("Invalid group type: {group_type:?}"))?;
Expand Down
4 changes: 2 additions & 2 deletions src/daft-connect/src/translation/logical_plan/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ use spark_connect::Project;

use crate::translation::{logical_plan::Plan, to_daft_expr, to_logical_plan};

pub fn project(project: Project) -> eyre::Result<Plan> {
pub async fn project(project: Project) -> eyre::Result<Plan> {
let Project { input, expressions } = project;

let Some(input) = input else {
bail!("Project input is required");
};

let mut plan = to_logical_plan(*input)?;
let mut plan = Box::pin(to_logical_plan(*input)).await?;

let daft_exprs: Vec<_> = expressions.iter().map(to_daft_expr).try_collect()?;
plan.builder = plan.builder.select(daft_exprs)?;
Expand Down
32 changes: 32 additions & 0 deletions src/daft-connect/src/translation/logical_plan/read.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
use eyre::{bail, WrapErr};
use spark_connect::read::ReadType;
use tracing::warn;

use crate::translation::Plan;

mod data_source;

pub async fn read(read: spark_connect::Read) -> eyre::Result<Plan> {
let spark_connect::Read {
is_streaming,
read_type,
} = read;

warn!("Ignoring is_streaming: {is_streaming}");

let Some(read_type) = read_type else {
bail!("Read type is required");
};

let builder = match read_type {
ReadType::NamedTable(table) => {
let name = table.unparsed_identifier;
bail!("Tried to read from table {name} but it is not yet implemented. Try to read from a path instead.");
}
ReadType::DataSource(source) => data_source::data_source(source)
.await
.wrap_err("Failed to create data source"),
}?;

Ok(Plan::from(builder))
}
Loading

0 comments on commit 0c6b7c8

Please sign in to comment.