Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(connect): add parquet support #3360

Merged
merged 2 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ chrono-tz = "0.10.0"
comfy-table = "7.1.1"
common-daft-config = {path = "src/common/daft-config"}
common-error = {path = "src/common/error", default-features = false}
common-file-formats = {path = "src/common/file-formats"}
common-runtime = {path = "src/common/runtime", default-features = false}
daft-core = {path = "src/daft-core"}
daft-dsl = {path = "src/daft-dsl"}
Expand Down
1 change: 1 addition & 0 deletions src/daft-connect/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ arrow2 = {workspace = true, features = ["io_json_integration"]}
async-stream = "0.3.6"
color-eyre = "0.6.3"
common-daft-config = {workspace = true}
common-file-formats = {workspace = true}
daft-core = {workspace = true}
daft-dsl = {workspace = true}
daft-local-execution = {workspace = true}
Expand Down
15 changes: 6 additions & 9 deletions src/daft-connect/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,14 +142,10 @@ impl DaftSparkConnectService {
#[tonic::async_trait]
impl SparkConnectService for DaftSparkConnectService {
type ExecutePlanStream = std::pin::Pin<
Box<
dyn futures::Stream<Item = Result<ExecutePlanResponse, Status>> + Send + Sync + 'static,
>,
Box<dyn futures::Stream<Item = Result<ExecutePlanResponse, Status>> + Send + 'static>,
>;
type ReattachExecuteStream = std::pin::Pin<
Box<
dyn futures::Stream<Item = Result<ExecutePlanResponse, Status>> + Send + Sync + 'static,
>,
Box<dyn futures::Stream<Item = Result<ExecutePlanResponse, Status>> + Send + 'static>,
>;

#[tracing::instrument(skip_all)]
Expand Down Expand Up @@ -190,8 +186,9 @@ impl SparkConnectService for DaftSparkConnectService {
CommandType::RegisterFunction(_) => {
unimplemented_err!("RegisterFunction not implemented")
}
CommandType::WriteOperation(_) => {
unimplemented_err!("WriteOperation not implemented")
CommandType::WriteOperation(op) => {
let result = session.handle_write_command(op, operation).await?;
return Ok(Response::new(result));
}
CommandType::CreateDataframeView(_) => {
unimplemented_err!("CreateDataframeView not implemented")
Expand Down Expand Up @@ -305,7 +302,7 @@ impl SparkConnectService for DaftSparkConnectService {
return Err(Status::invalid_argument("op_type is required to be root"));
};

let result = match translation::relation_to_schema(relation) {
let result = match translation::relation_to_schema(relation).await {
Ok(schema) => schema,
Err(e) => {
return invalid_argument_err!(
Expand Down
1 change: 1 addition & 0 deletions src/daft-connect/src/op/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use uuid::Uuid;
use crate::{DaftSparkConnectService, Session};

mod root;
mod write;

pub type ExecuteStream = <DaftSparkConnectService as SparkConnectService>::ExecutePlanStream;

Expand Down
10 changes: 8 additions & 2 deletions src/daft-connect/src/op/execute/root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,14 @@ impl Session {
let (tx, rx) = tokio::sync::mpsc::channel::<eyre::Result<ExecutePlanResponse>>(1);
tokio::spawn(async move {
let execution_fut = async {
let Plan { builder, psets } = translation::to_logical_plan(command)?;
let optimized_plan = builder.optimize()?;
let Plan { builder, psets } = translation::to_logical_plan(command).await?;

// todo: convert optimize to async (looks like A LOT of work)... it touches a lot of API
// I tried and spent about an hour and gave up ~ Andrew Gazelka 🪦 2024-12-09
let optimized_plan = tokio::task::spawn_blocking(move || builder.optimize())
.await
.unwrap()?;

let cfg = Arc::new(DaftExecutionConfig::default());
let native_executor = NativeExecutor::from_logical_plan_builder(&optimized_plan)?;
let mut result_stream = native_executor.run(psets, cfg, None)?.into_stream();
Expand Down
145 changes: 145 additions & 0 deletions src/daft-connect/src/op/execute/write.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
use std::future::ready;

use common_daft_config::DaftExecutionConfig;
use common_file_formats::FileFormat;
use daft_local_execution::NativeExecutor;
use eyre::{bail, WrapErr};
use spark_connect::{
write_operation::{SaveMode, SaveType},
WriteOperation,
};
use tonic::Status;
use tracing::warn;

use crate::{
op::execute::{ExecuteStream, PlanIds},
session::Session,
translation,
};

impl Session {
pub async fn handle_write_command(
&self,
operation: WriteOperation,
operation_id: String,
) -> Result<ExecuteStream, Status> {
use futures::StreamExt;

let context = PlanIds {
session: self.client_side_session_id().to_string(),
server_side_session: self.server_side_session_id().to_string(),
operation: operation_id,
};

let finished = context.finished();

let result = async move {
let WriteOperation {
input,
source,
mode,
sort_column_names,
partitioning_columns,
bucket_by,
options,
clustering_columns,
save_type,
} = operation;

let Some(input) = input else {
bail!("Input is required");

Check warning on line 50 in src/daft-connect/src/op/execute/write.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-connect/src/op/execute/write.rs#L50

Added line #L50 was not covered by tests
};

let Some(source) = source else {
bail!("Source is required");

Check warning on line 54 in src/daft-connect/src/op/execute/write.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-connect/src/op/execute/write.rs#L54

Added line #L54 was not covered by tests
};

if source != "parquet" {
bail!("Unsupported source: {source}; only parquet is supported");

Check warning on line 58 in src/daft-connect/src/op/execute/write.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-connect/src/op/execute/write.rs#L58

Added line #L58 was not covered by tests
}

let Ok(mode) = SaveMode::try_from(mode) else {
bail!("Invalid save mode: {mode}");

Check warning on line 62 in src/daft-connect/src/op/execute/write.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-connect/src/op/execute/write.rs#L62

Added line #L62 was not covered by tests
};

if !sort_column_names.is_empty() {
// todo(completeness): implement sort
warn!("Ignoring sort_column_names: {sort_column_names:?} (not yet implemented)");

Check warning on line 67 in src/daft-connect/src/op/execute/write.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-connect/src/op/execute/write.rs#L67

Added line #L67 was not covered by tests
}

if !partitioning_columns.is_empty() {
// todo(completeness): implement partitioning
warn!(
"Ignoring partitioning_columns: {partitioning_columns:?} (not yet implemented)"

Check warning on line 73 in src/daft-connect/src/op/execute/write.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-connect/src/op/execute/write.rs#L72-L73

Added lines #L72 - L73 were not covered by tests
);
}

if let Some(bucket_by) = bucket_by {
// todo(completeness): implement bucketing
warn!("Ignoring bucket_by: {bucket_by:?} (not yet implemented)");

Check warning on line 79 in src/daft-connect/src/op/execute/write.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-connect/src/op/execute/write.rs#L79

Added line #L79 was not covered by tests
}

if !options.is_empty() {
// todo(completeness): implement options
warn!("Ignoring options: {options:?} (not yet implemented)");

Check warning on line 84 in src/daft-connect/src/op/execute/write.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-connect/src/op/execute/write.rs#L84

Added line #L84 was not covered by tests
}

if !clustering_columns.is_empty() {
// todo(completeness): implement clustering
warn!("Ignoring clustering_columns: {clustering_columns:?} (not yet implemented)");

Check warning on line 89 in src/daft-connect/src/op/execute/write.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-connect/src/op/execute/write.rs#L89

Added line #L89 was not covered by tests
}

match mode {
SaveMode::Unspecified => {}
SaveMode::Append => {}
SaveMode::Overwrite => {}
SaveMode::ErrorIfExists => {}
SaveMode::Ignore => {}

Check warning on line 97 in src/daft-connect/src/op/execute/write.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-connect/src/op/execute/write.rs#L94-L97

Added lines #L94 - L97 were not covered by tests
}

let Some(save_type) = save_type else {
bail!("Save type is required");

Check warning on line 101 in src/daft-connect/src/op/execute/write.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-connect/src/op/execute/write.rs#L101

Added line #L101 was not covered by tests
};

let path = match save_type {
SaveType::Path(path) => path,
SaveType::Table(table) => {
let name = table.table_name;
bail!("Tried to write to table {name} but it is not yet implemented. Try to write to a path instead.");

Check warning on line 108 in src/daft-connect/src/op/execute/write.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-connect/src/op/execute/write.rs#L106-L108

Added lines #L106 - L108 were not covered by tests
}
};

let mut plan = translation::to_logical_plan(input).await?;

plan.builder = plan
.builder
.table_write(&path, FileFormat::Parquet, None, None, None)
.wrap_err("Failed to create table write plan")?;

let optimized_plan = plan.builder.optimize()?;
let cfg = DaftExecutionConfig::default();
let native_executor = NativeExecutor::from_logical_plan_builder(&optimized_plan)?;
let mut result_stream = native_executor
.run(plan.psets, cfg.into(), None)?
.into_stream();

// this is so we make sure the operation is actually done
// before we return
//
// an example where this is important is if we write to a parquet file
// and then read immediately after, we need to wait for the write to finish
while let Some(_result) = result_stream.next().await {}
andrewgazelka marked this conversation as resolved.
Show resolved Hide resolved

Ok(())
};

use futures::TryFutureExt;

let result = result.map_err(|e| Status::internal(format!("Error in Daft server: {e:?}")));

let future = result.and_then(|()| ready(Ok(finished)));
let stream = futures::stream::once(future);

Ok(Box::pin(stream))
}
}
36 changes: 23 additions & 13 deletions src/daft-connect/src/translation/logical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@

use crate::translation::logical_plan::{
aggregate::aggregate, local_relation::local_relation, project::project, range::range,
to_df::to_df, with_columns::with_columns,
read::read, to_df::to_df, with_columns::with_columns,
};

mod aggregate;
mod local_relation;
mod project;
mod range;
mod read;
mod to_df;
mod with_columns;

Expand All @@ -39,7 +40,7 @@
}
}

pub fn to_logical_plan(relation: Relation) -> eyre::Result<Plan> {
pub async fn to_logical_plan(relation: Relation) -> eyre::Result<Plan> {
if let Some(common) = relation.common {
if common.origin.is_some() {
warn!("Ignoring common metadata for relation: {common:?}; not yet implemented");
Expand All @@ -51,31 +52,40 @@
};

match rel_type {
RelType::Limit(l) => limit(*l).wrap_err("Failed to apply limit to logical plan"),
RelType::Limit(l) => limit(*l)
.await

Check warning on line 56 in src/daft-connect/src/translation/logical_plan.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-connect/src/translation/logical_plan.rs#L56

Added line #L56 was not covered by tests
.wrap_err("Failed to apply limit to logical plan"),
RelType::Range(r) => range(r).wrap_err("Failed to apply range to logical plan"),
RelType::Project(p) => project(*p).wrap_err("Failed to apply project to logical plan"),
RelType::Aggregate(a) => {
aggregate(*a).wrap_err("Failed to apply aggregate to logical plan")
}
RelType::WithColumns(w) => {
with_columns(*w).wrap_err("Failed to apply with_columns to logical plan")
}
RelType::ToDf(t) => to_df(*t).wrap_err("Failed to apply to_df to logical plan"),
RelType::Project(p) => project(*p)
.await

Check warning on line 60 in src/daft-connect/src/translation/logical_plan.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-connect/src/translation/logical_plan.rs#L60

Added line #L60 was not covered by tests
.wrap_err("Failed to apply project to logical plan"),
RelType::Aggregate(a) => aggregate(*a)
.await

Check warning on line 63 in src/daft-connect/src/translation/logical_plan.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-connect/src/translation/logical_plan.rs#L63

Added line #L63 was not covered by tests
.wrap_err("Failed to apply aggregate to logical plan"),
RelType::WithColumns(w) => with_columns(*w)
.await

Check warning on line 66 in src/daft-connect/src/translation/logical_plan.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-connect/src/translation/logical_plan.rs#L66

Added line #L66 was not covered by tests
.wrap_err("Failed to apply with_columns to logical plan"),
RelType::ToDf(t) => to_df(*t)
.await

Check warning on line 69 in src/daft-connect/src/translation/logical_plan.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-connect/src/translation/logical_plan.rs#L69

Added line #L69 was not covered by tests
.wrap_err("Failed to apply to_df to logical plan"),
RelType::LocalRelation(l) => {
local_relation(l).wrap_err("Failed to apply local_relation to logical plan")
}
RelType::Read(r) => read(r)
.await
.wrap_err("Failed to apply read to logical plan"),
plan => bail!("Unsupported relation type: {plan:?}"),
}
}

fn limit(limit: Limit) -> eyre::Result<Plan> {
async fn limit(limit: Limit) -> eyre::Result<Plan> {
let Limit { input, limit } = limit;

let Some(input) = input else {
bail!("input must be set");
};

let mut plan = to_logical_plan(*input)?;
let mut plan = Box::pin(to_logical_plan(*input)).await?;
plan.builder = plan.builder.limit(i64::from(limit), false)?; // todo: eager or no

Ok(plan)
Expand Down
4 changes: 2 additions & 2 deletions src/daft-connect/src/translation/logical_plan/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use spark_connect::aggregate::GroupType;

use crate::translation::{logical_plan::Plan, to_daft_expr, to_logical_plan};

pub fn aggregate(aggregate: spark_connect::Aggregate) -> eyre::Result<Plan> {
pub async fn aggregate(aggregate: spark_connect::Aggregate) -> eyre::Result<Plan> {
let spark_connect::Aggregate {
input,
group_type,
Expand All @@ -17,7 +17,7 @@ pub fn aggregate(aggregate: spark_connect::Aggregate) -> eyre::Result<Plan> {
bail!("input is required");
};

let mut plan = to_logical_plan(*input)?;
let mut plan = Box::pin(to_logical_plan(*input)).await?;

let group_type = GroupType::try_from(group_type)
.wrap_err_with(|| format!("Invalid group type: {group_type:?}"))?;
Expand Down
4 changes: 2 additions & 2 deletions src/daft-connect/src/translation/logical_plan/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ use spark_connect::Project;

use crate::translation::{logical_plan::Plan, to_daft_expr, to_logical_plan};

pub fn project(project: Project) -> eyre::Result<Plan> {
pub async fn project(project: Project) -> eyre::Result<Plan> {
let Project { input, expressions } = project;

let Some(input) = input else {
bail!("Project input is required");
};

let mut plan = to_logical_plan(*input)?;
let mut plan = Box::pin(to_logical_plan(*input)).await?;

let daft_exprs: Vec<_> = expressions.iter().map(to_daft_expr).try_collect()?;
plan.builder = plan.builder.select(daft_exprs)?;
Expand Down
32 changes: 32 additions & 0 deletions src/daft-connect/src/translation/logical_plan/read.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
use eyre::{bail, WrapErr};
use spark_connect::read::ReadType;
use tracing::warn;

use crate::translation::Plan;

mod data_source;

pub async fn read(read: spark_connect::Read) -> eyre::Result<Plan> {
let spark_connect::Read {
is_streaming,
read_type,
} = read;

warn!("Ignoring is_streaming: {is_streaming}");

let Some(read_type) = read_type else {
bail!("Read type is required");

Check warning on line 18 in src/daft-connect/src/translation/logical_plan/read.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-connect/src/translation/logical_plan/read.rs#L18

Added line #L18 was not covered by tests
};

let builder = match read_type {
ReadType::NamedTable(table) => {
let name = table.unparsed_identifier;
bail!("Tried to read from table {name} but it is not yet implemented. Try to read from a path instead.");

Check warning on line 24 in src/daft-connect/src/translation/logical_plan/read.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-connect/src/translation/logical_plan/read.rs#L22-L24

Added lines #L22 - L24 were not covered by tests
}
ReadType::DataSource(source) => data_source::data_source(source)
.await
.wrap_err("Failed to create data source"),
}?;

Check warning on line 29 in src/daft-connect/src/translation/logical_plan/read.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-connect/src/translation/logical_plan/read.rs#L29

Added line #L29 was not covered by tests

Ok(Plan::from(builder))
}
Loading
Loading