Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEATURE] daft-connect #3038

Closed
wants to merge 22 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
495 changes: 411 additions & 84 deletions Cargo.lock

Large diffs are not rendered by default.

46 changes: 32 additions & 14 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ common-system-info = {path = "src/common/system-info", default-features = false}
common-tracing = {path = "src/common/tracing", default-features = false}
common-version = {path = "src/common/version", default-features = false}
daft-compression = {path = "src/daft-compression", default-features = false}
daft-connect = {path = "src/daft-connect"}
daft-core = {path = "src/daft-core", default-features = false}
daft-csv = {path = "src/daft-csv", default-features = false}
daft-dsl = {path = "src/daft-dsl", default-features = false}
daft-functions = {path = "src/daft-functions", default-features = false}
daft-functions = {path = "src/daft-functions"}
daft-functions-json = {path = "src/daft-functions-json", default-features = false}
daft-image = {path = "src/daft-image", default-features = false}
daft-io = {path = "src/daft-io", default-features = false}
Expand All @@ -38,27 +39,28 @@ sysinfo = {workspace = true}
python = [
"dep:pyo3",
"dep:pyo3-log",
"common-daft-config/python",
"common-display/python",
"common-resource-request/python",
"common-system-info/python",
"daft-connect/python",
"daft-core/python",
"daft-csv/python",
"daft-dsl/python",
"daft-local-execution/python",
"daft-io/python",
"daft-functions-json/python",
"daft-functions/python",
"daft-image/python",
"daft-io/python",
"daft-json/python",
"daft-local-execution/python",
"daft-micropartition/python",
"daft-parquet/python",
"daft-plan/python",
"daft-scan/python",
"daft-scheduler/python",
"daft-stats/python",
"daft-sql/python",
"daft-table/python",
"daft-functions/python",
"daft-functions-json/python",
"common-daft-config/python",
"common-system-info/python",
"common-display/python",
"common-resource-request/python"
"daft-stats/python",
"daft-table/python"
]

[lib]
Expand Down Expand Up @@ -137,10 +139,13 @@ members = [
"src/daft-functions",
"src/daft-functions-json",
"src/daft-sql",
"src/hyperloglog"
"src/hyperloglog",
"src/spark-connect",
"src/daft-connect"
]

[workspace.dependencies]
anyhow = "1.0.89"
async-compat = "0.2.3"
async-compression = {version = "0.4.12", features = [
"tokio",
Expand All @@ -153,6 +158,18 @@ bytes = "1.6.0"
chrono = "0.4.38"
chrono-tz = "0.8.4"
comfy-table = "7.1.1"
common-daft-config = {path = "src/common/daft-config"}
common-display = {path = "src/common/display"}
common-error = {path = "src/common/error"}
daft-connect = {path = "src/daft-connect", default-features = false}
daft-core = {path = "src/daft-core"}
daft-dsl = {path = "src/daft-dsl"}
daft-local-execution = {path = "src/daft-local-execution"}
daft-micropartition = {path = "src/daft-micropartition"}
daft-physical-plan = {path = "src/daft-physical-plan"}
daft-plan = {path = "src/daft-plan"}
daft-schema = {path = "src/daft-schema"}
daft-table = {path = "src/daft-table"}
derivative = "2.2.0"
dyn-clone = "1"
futures = "0.3.30"
Expand All @@ -175,6 +192,7 @@ rstest = "0.18.2"
serde_json = "1.0.116"
sketches-ddsketch = {version = "0.2.2", features = ["use_serde"]}
snafu = {version = "0.7.4", features = ["futures"]}
spark-connect = {path = "src/spark-connect", default-features = false}
sqlparser = "0.51.0"
sysinfo = "0.30.12"
test-log = "0.2.16"
Expand Down Expand Up @@ -202,7 +220,7 @@ path = "src/arrow2"
version = "1.3.3"

[workspace.dependencies.derive_more]
features = ["display"]
features = ["display", "from", "constructor"]
version = "1.0.0"

[workspace.dependencies.lazy_static]
Expand Down Expand Up @@ -290,7 +308,7 @@ uninlined_format_args = "allow"
unnecessary_wraps = "allow"
unnested_or_patterns = "allow"
unreadable_literal = "allow"
# todo: remove?
# todo: remove this at some point
unsafe_derive_deserialize = "allow"
unused_async = "allow"
# used_underscore_items = "allow" # REMOVE
Expand Down
2 changes: 2 additions & 0 deletions daft/daft/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ if TYPE_CHECKING:
from pyiceberg.schema import Schema as IcebergSchema
from pyiceberg.table import TableProperties as IcebergTableProperties

def connect_start() -> None: ...

class ImageMode(Enum):
"""
Supported image modes for Daft's image type.
Expand Down
2 changes: 1 addition & 1 deletion src/arrow2/src/array/utf8/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ impl<T: AsRef<str>> AsRef<[u8]> for StrAsBytes<T> {
/// * A slice of `values` taken from two consecutives `offsets` is valid `utf8`.
/// * `len` is equal to `validity.len()`, when defined.
#[derive(Clone)]
pub struct Utf8Array<O: Offset> {
pub struct Utf8Array<O: Offset = i64> {
data_type: DataType,
offsets: OffsetsBuffer<O>,
values: Buffer<u8>,
Expand Down
40 changes: 40 additions & 0 deletions src/daft-connect/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
[dependencies]
dashmap = "6.1.0"
# papaya = "0.1.3"
eyre = "0.6.12"
futures = "0.3.31"
pyo3 = {workspace = true, optional = true}
tokio = {version = "1.40.0", features = ["full"]}
tonic = "0.12.3"
tracing-subscriber = {version = "0.3.18", features = ["env-filter"]}
tracing-tree = "0.4.0"
uuid = {version = "1.10.0", features = ["v4"]}
arrow2.workspace = true
common-daft-config.workspace = true
common-error.workspace = true
daft-core.workspace = true
daft-dsl.workspace = true
daft-local-execution.workspace = true
daft-micropartition.workspace = true
daft-physical-plan.workspace = true
daft-plan.workspace = true
daft-schema.workspace = true
daft-table.workspace = true
spark-connect.workspace = true
tracing.workspace = true

[dev-dependencies]
tempfile = "3.4.0"

[features]
python = [
"dep:pyo3"
]

[lints]
workspace = true

[package]
edition = {workspace = true}
name = "daft-connect"
version = {workspace = true}
179 changes: 179 additions & 0 deletions src/daft-connect/src/command.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
// Stream of Result<ExecutePlanResponse, Status>

use std::{collections::HashMap, sync::Arc, thread};

use common_daft_config::DaftExecutionConfig;
use common_error::DaftResult;
use daft_local_execution::run::run_local;
use daft_micropartition::MicroPartition;
use daft_plan::LogicalPlanRef;
use daft_table::Table;
use eyre::Context;
use spark_connect::{
execute_plan_response::{ArrowBatch, ResponseType, ResultComplete},
spark_connect_service_server::SparkConnectService,
ExecutePlanResponse, Relation, WriteOperation,
};
use tonic::Status;
use uuid::Uuid;

use crate::{DaftSparkConnectService, Session};

type DaftStream = <DaftSparkConnectService as SparkConnectService>::ExecutePlanStream;

struct ResultEncoder {
session_id: String,
server_side_session_id: String,
operation_id: String,
tx: tokio::sync::mpsc::UnboundedSender<ExecutePlanResponse>,
}

pub trait Encoder {
fn create_batch(&self, row_count: i64, data: Vec<u8>) -> eyre::Result<()>;
}

impl Encoder for ResultEncoder {
fn create_batch(&self, row_count: i64, data: Vec<u8>) -> eyre::Result<()> {
let response = ExecutePlanResponse {
session_id: self.session_id.clone(),
server_side_session_id: self.server_side_session_id.clone(),
operation_id: self.operation_id.clone(),
response_id: Uuid::new_v4().to_string(), // todo: implement this
metrics: None, // todo: implement this
observed_metrics: vec![],
schema: None,
response_type: Some(ResponseType::ArrowBatch(ArrowBatch {
row_count,
data,
start_offset: None,
})),
};

self.tx
.send(response)
.wrap_err("Error sending response to client")
}
}

impl Session {
pub async fn handle_root_command(
&self,
command: Relation,
operation_id: String,
) -> Result<DaftStream, Status> {
// let data = thread::spawn(move || {
// // crate::convert::
//
// // let logical_plan = to_logical_plan(command).unwrap().build();
//
// let result = execute_plan(logical_plan);
// process_result(result)
// });
// let data = tokio::task::spawn_blocking(move || data.join().unwrap())
// .await
// .unwrap();
// let response = create_response(&self.id, &self.server_side_session_id, &operation_id, data);
// let result = create_stream(
// response,
// &self.id,
// &self.server_side_session_id,
// &operation_id,
// );
//
// Ok(result)
todo!()
}

pub fn write_operation(&self, operation: WriteOperation) -> Result<DaftStream, Status> {
println!("write_operation {:#?}", operation);
Err(Status::unimplemented(
"write_operation operation is not yet implemented",
))
}
}

pub fn execute_plan(
logical_plan: LogicalPlanRef,
) -> impl Iterator<Item = DaftResult<Arc<MicroPartition>>> {
let physical_plan = daft_physical_plan::translate(&logical_plan).unwrap();

let cfg = Arc::new(DaftExecutionConfig::default());
let psets = HashMap::new();
run_local(&physical_plan, psets, cfg, None).unwrap()
}

fn process_result(result: impl Iterator<Item = DaftResult<Arc<MicroPartition>>>) -> Vec<u8> {
let mut data = Vec::new();
let options = arrow2::io::ipc::write::WriteOptions { compression: None };
let mut writer = arrow2::io::ipc::write::StreamWriter::new(&mut data, options);

for elem in result {
let elem = elem.unwrap();
let tables = elem.get_tables().unwrap();
let tables = vec![tables.first().unwrap()];

for table in tables {
write_table_to_arrow(&mut writer, table);
}
}

data
}

fn write_table_to_arrow(
writer: &mut arrow2::io::ipc::write::StreamWriter<&mut Vec<u8>>,
table: &Table,
) {
let schema = table.schema.to_arrow().unwrap();
writer.start(&schema, None).unwrap();

let arrays = table.get_inner_arrow_arrays();
let chunk = arrow2::chunk::Chunk::new(arrays);
writer.write(&chunk, None).unwrap();
}

fn create_response(
session_id: &str,
server_side_session_id: &str,
operation_id: &str,
data: Vec<u8>,
) -> ExecutePlanResponse {
let response_type = ResponseType::ArrowBatch(ArrowBatch {
row_count: 10i64,
data,
start_offset: None,
});

ExecutePlanResponse {
session_id: session_id.to_string(),
server_side_session_id: server_side_session_id.to_string(),
operation_id: operation_id.to_string(),
response_id: Uuid::new_v4().to_string(),
metrics: None,
observed_metrics: vec![],
schema: None,
response_type: Some(response_type),
}
}

fn create_stream(
response: ExecutePlanResponse,
session_id: &str,
server_side_session_id: &str,
operation_id: &str,
) -> DaftStream {
let stream = futures::stream::iter(vec![
Ok(response),
Ok(ExecutePlanResponse {
session_id: session_id.to_string(),
server_side_session_id: server_side_session_id.to_string(),
operation_id: operation_id.to_string(),
response_id: Uuid::new_v4().to_string(),
metrics: None,
observed_metrics: vec![],
schema: None,
response_type: Some(ResponseType::ResultComplete(ResultComplete {})),
}),
]);
Box::pin(stream)
}
Loading
Loading