Skip to content

Commit

Permalink
[FEAT] (WIP) connect: createDataFrame
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewgazelka committed Dec 4, 2024
1 parent 6d30e30 commit f636d74
Show file tree
Hide file tree
Showing 21 changed files with 688 additions and 43 deletions.
65 changes: 57 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ members = [

[workspace.dependencies]
approx = "0.5.1"
arrow-format = {version = "0.8", features = ["ipc"]}
async-compat = "0.2.3"
async-compression = {version = "0.4.12", features = [
"tokio",
Expand All @@ -200,6 +201,7 @@ daft-dsl = {path = "src/daft-dsl"}
daft-hash = {path = "src/daft-hash"}
daft-local-execution = {path = "src/daft-local-execution"}
daft-logical-plan = {path = "src/daft-logical-plan"}
daft-micropartition = {path = "src/daft-micropartition"}
daft-scan = {path = "src/daft-scan"}
daft-schema = {path = "src/daft-schema"}
daft-table = {path = "src/daft-table"}
Expand Down
28 changes: 28 additions & 0 deletions daft/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,34 @@
ManyColumnsInputType = Union[ColumnInputType, Iterable[ColumnInputType]]


def to_logical_plan_builder(*parts: MicroPartition) -> LogicalPlanBuilder:
"""Creates a Daft DataFrame from a single Table.
Args:
parts: The Tables that we wish to convert into a Daft DataFrame.
Returns:
DataFrame: Daft DataFrame created from the provided Table.
"""
if not parts:
raise ValueError("Can't create a DataFrame from an empty list of tables.")

Check warning on line 76 in daft/dataframe/dataframe.py

View check run for this annotation

Codecov / codecov/patch

daft/dataframe/dataframe.py#L75-L76

Added lines #L75 - L76 were not covered by tests

result_pset = LocalPartitionSet()

Check warning on line 78 in daft/dataframe/dataframe.py

View check run for this annotation

Codecov / codecov/patch

daft/dataframe/dataframe.py#L78

Added line #L78 was not covered by tests

for i, part in enumerate(parts):
result_pset.set_partition_from_table(i, part)

Check warning on line 81 in daft/dataframe/dataframe.py

View check run for this annotation

Codecov / codecov/patch

daft/dataframe/dataframe.py#L80-L81

Added lines #L80 - L81 were not covered by tests

context = get_context()
cache_entry = context.get_or_create_runner().put_partition_set_into_cache(result_pset)
size_bytes = result_pset.size_bytes()
num_rows = len(result_pset)

Check warning on line 86 in daft/dataframe/dataframe.py

View check run for this annotation

Codecov / codecov/patch

daft/dataframe/dataframe.py#L83-L86

Added lines #L83 - L86 were not covered by tests

assert size_bytes is not None, "In-memory data should always have non-None size in bytes"
return LogicalPlanBuilder.from_in_memory_scan(

Check warning on line 89 in daft/dataframe/dataframe.py

View check run for this annotation

Codecov / codecov/patch

daft/dataframe/dataframe.py#L88-L89

Added lines #L88 - L89 were not covered by tests
cache_entry, parts[0].schema(), result_pset.num_partitions(), size_bytes, num_rows=num_rows
)


class DataFrame:
"""A Daft DataFrame is a table of data. It has columns, where each column has a type and the same
number of items (rows) as all other columns.
Expand Down
2 changes: 1 addition & 1 deletion src/arrow2/src/io/ipc/read/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ pub fn read_record_batch<R: Read + Seek>(
file_size: u64,
scratch: &mut Vec<u8>,
) -> Result<Chunk<Box<dyn Array>>> {
assert_eq!(fields.len(), ipc_schema.fields.len());
assert_eq!(fields.len(), ipc_schema.fields.len(), "IPC schema fields and Arrow schema fields must be the same length");
let buffers = batch
.buffers()
.map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferBuffers(err)))?
Expand Down
10 changes: 8 additions & 2 deletions src/daft-connect/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,26 +1,32 @@
[dependencies]
arrow2 = {workspace = true}
arrow-format = {workspace = true}
arrow2 = {workspace = true, features = ["io_json_integration"]}
async-stream = "0.3.6"
color-eyre = "0.6.3"
common-daft-config = {workspace = true}
daft-core = {workspace = true}
daft-dsl = {workspace = true}
daft-local-execution = {workspace = true}
daft-logical-plan = {workspace = true}
daft-micropartition = {workspace = true}
daft-scan = {workspace = true}
daft-schema = {workspace = true}
daft-table = {workspace = true}
dashmap = "6.1.0"
derive_more = {workspace = true}
eyre = "0.6.12"
futures = "0.3.31"
itertools = {workspace = true}
pyo3 = {workspace = true, optional = true}
serde_json = {workspace = true}
spark-connect = {workspace = true}
tokio = {version = "1.40.0", features = ["full"]}
tonic = "0.12.3"
tracing = {workspace = true}
uuid = {version = "1.10.0", features = ["v4"]}

[features]
python = ["dep:pyo3", "common-daft-config/python", "daft-local-execution/python", "daft-logical-plan/python", "daft-scan/python", "daft-table/python", "daft-dsl/python", "daft-schema/python", "daft-core/python"]
python = ["dep:pyo3", "common-daft-config/python", "daft-local-execution/python", "daft-logical-plan/python", "daft-scan/python", "daft-table/python", "daft-dsl/python", "daft-schema/python", "daft-core/python", "daft-micropartition/python"]

[lints]
workspace = true
Expand Down
14 changes: 12 additions & 2 deletions src/daft-connect/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,9 +361,19 @@ impl SparkConnectService for DaftSparkConnectService {
#[tracing::instrument(skip_all)]
async fn release_execute(
&self,
_request: Request<ReleaseExecuteRequest>,
request: Request<ReleaseExecuteRequest>,
) -> Result<Response<ReleaseExecuteResponse>, Status> {
unimplemented_err!("release_execute operation is not yet implemented")
let request = request.into_inner();

let session = self.get_session(&request.session_id)?;

let response = ReleaseExecuteResponse {
session_id: session.client_side_session_id().to_string(),
server_side_session_id: session.server_side_session_id().to_string(),
operation_id: None, // todo: set but not strictly required
};

Ok(Response::new(response))
}

#[tracing::instrument(skip_all)]
Expand Down
11 changes: 5 additions & 6 deletions src/daft-connect/src/op/execute/root.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{collections::HashMap, future::ready, sync::Arc};
use std::{future::ready, sync::Arc};

use common_daft_config::DaftExecutionConfig;
use daft_local_execution::NativeExecutor;
Expand All @@ -10,6 +10,7 @@ use crate::{
op::execute::{ExecuteStream, PlanIds},
session::Session,
translation,
translation::Plan,
};

impl Session {
Expand All @@ -31,13 +32,11 @@ impl Session {
let (tx, rx) = tokio::sync::mpsc::channel::<eyre::Result<ExecutePlanResponse>>(1);
tokio::spawn(async move {
let execution_fut = async {
let plan = translation::to_logical_plan(command)?;
let optimized_plan = plan.optimize()?;
let Plan { builder, psets } = translation::to_logical_plan(command)?;
let optimized_plan = builder.optimize()?;
let cfg = Arc::new(DaftExecutionConfig::default());
let native_executor = NativeExecutor::from_logical_plan_builder(&optimized_plan)?;
let mut result_stream = native_executor
.run(HashMap::new(), cfg, None)?
.into_stream();
let mut result_stream = native_executor.run(psets, cfg, None)?.into_stream();

while let Some(result) = result_stream.next().await {
let result = result?;
Expand Down
4 changes: 2 additions & 2 deletions src/daft-connect/src/translation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ mod literal;
mod logical_plan;
mod schema;

pub use datatype::{to_daft_datatype, to_spark_datatype};
pub use datatype::{deser_spark_datatype, to_daft_datatype, to_spark_datatype};
pub use expr::to_daft_expr;
pub use literal::to_daft_literal;
pub use logical_plan::to_logical_plan;
pub use logical_plan::{to_logical_plan, Plan};
pub use schema::relation_to_schema;
3 changes: 3 additions & 0 deletions src/daft-connect/src/translation/datatype.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ use eyre::{bail, ensure, WrapErr};
use spark_connect::data_type::Kind;
use tracing::warn;

mod codec;
pub use codec::deser as deser_spark_datatype;

pub fn to_spark_datatype(datatype: &DataType) -> spark_connect::DataType {
match datatype {
DataType::Null => spark_connect::DataType {
Expand Down
Loading

0 comments on commit f636d74

Please sign in to comment.