Skip to content

Commit

Permalink
[FEAT] daft-connect range use python generator (#3308)
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewgazelka authored Nov 20, 2024
1 parent 731a73e commit 066cde1
Show file tree
Hide file tree
Showing 20 changed files with 281 additions and 435 deletions.
129 changes: 11 additions & 118 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

35 changes: 11 additions & 24 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,20 @@ sysinfo = {workspace = true}
# maturin will turn this on
python = [
"common-daft-config/python",
"common-daft-config/python",
"common-daft-config/python",
"common-display/python",
"common-display/python",
"common-display/python",
"common-resource-request/python",
"common-resource-request/python",
"common-resource-request/python",
"common-scan-info/python",
"common-system-info/python",
"common-daft-config/python",
"common-display/python",
"common-resource-request/python",
"common-system-info/python",
"daft-catalog/python",
"common-system-info/python",
"daft-catalog-python-catalog/python",
"daft-catalog/python",
"daft-connect/python",
"daft-core/python",
"daft-csv/python",
Expand All @@ -76,14 +80,10 @@ python = [
"daft-sql/python",
"daft-stats/python",
"daft-stats/python",
"daft-writers/python",
"daft-table/python",
"dep:daft-connect",
"common-daft-config/python",
"common-system-info/python",
"common-display/python",
"common-resource-request/python",
"daft-writers/python",
"dep:daft-catalog-python-catalog",
"dep:daft-connect",
"dep:pyo3",
"dep:pyo3-log"
]
Expand Down Expand Up @@ -179,8 +179,6 @@ members = [
]

[workspace.dependencies]
ahash = "0.8.11"
anyhow = "1.0.89"
approx = "0.5.1"
async-compat = "0.2.3"
async-compression = {version = "0.4.12", features = [
Expand All @@ -195,25 +193,15 @@ chrono = "0.4.38"
chrono-tz = "0.8.4"
comfy-table = "7.1.1"
common-daft-config = {path = "src/common/daft-config"}
common-display = {path = "src/common/display"}
common-error = {path = "src/common/error", default-features = false}
common-file-formats = {path = "src/common/file-formats"}
daft-connect = {path = "src/daft-connect", default-features = false}
daft-core = {path = "src/daft-core"}
daft-dsl = {path = "src/daft-dsl"}
daft-hash = {path = "src/daft-hash"}
daft-local-execution = {path = "src/daft-local-execution"}
daft-local-plan = {path = "src/daft-local-plan"}
daft-logical-plan = {path = "src/daft-logical-plan"}
daft-micropartition = {path = "src/daft-micropartition"}
daft-physical-plan = {path = "src/daft-physical-plan"}
daft-schema = {path = "src/daft-schema"}
daft-sql = {path = "src/daft-sql"}
daft-scan = {path = "src/daft-scan"}
daft-table = {path = "src/daft-table"}
derivative = "2.2.0"
derive_builder = "0.20.2"
divan = "0.1.14"
dyn-clone = "1"
futures = "0.3.30"
html-escape = "0.2.13"
indexmap = "2.1.0"
Expand All @@ -233,7 +221,6 @@ rand = "^0.8"
rayon = "1.10.0"
regex = "1.10.4"
rstest = "0.18.2"
rustc-hash = "2.0.0"
serde_json = "1.0.116"
sha1 = "0.11.0-pre.4"
sketches-ddsketch = {version = "0.2.2", features = ["use_serde"]}
Expand Down
40 changes: 40 additions & 0 deletions daft/io/_range.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from __future__ import annotations

from typing import TYPE_CHECKING, Callable

if TYPE_CHECKING:
from collections.abc import Iterator

from daft import DataType
from daft.io._generator import GeneratorScanOperator
from daft.logical.schema import Schema
from daft.table.table import Table


def _range_generators(start: int, end: int, step: int, partitions: int) -> Iterator[Callable[[], Iterator[Table]]]:
# TODO: Partitioning with range scan is currently untested and unused.
# There may be issues with balanced partitions and step size.

# Calculate partition bounds upfront
partition_size = (end - start) // partitions
partition_bounds = [
(start + (i * partition_size), start + ((i + 1) * partition_size) if i < partitions - 1 else end)
for i in range(partitions)
]

def generator(partition_idx: int) -> Iterator[Table]:
partition_start, partition_end = partition_bounds[partition_idx]
values = list(range(partition_start, partition_end, step))
yield Table.from_pydict({"id": values})

from functools import partial

for partition_idx in range(partitions):
yield partial(generator, partition_idx)


class RangeScanOperator(GeneratorScanOperator):
def __init__(self, start: int, end: int, step: int = 1, partitions: int = 1) -> None:
schema = Schema._from_field_name_and_types([("id", DataType.int64())])

super().__init__(schema=schema, generators=_range_generators(start, end, step, partitions))
20 changes: 11 additions & 9 deletions src/daft-connect/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,22 +1,24 @@
[dependencies]
arrow2 = {workspace = true}
common-daft-config = {workspace = true}
daft-local-execution = {workspace = true}
daft-local-plan = {workspace = true}
daft-logical-plan = {workspace = true}
daft-scan = {workspace = true}
daft-table = {workspace = true}
dashmap = "6.1.0"
eyre = "0.6.12"
futures = "0.3.31"
pyo3 = {workspace = true, optional = true}
spark-connect = {workspace = true}
tokio = {version = "1.40.0", features = ["full"]}
tokio-util = {workspace = true}
tonic = "0.12.3"
tracing-subscriber = {version = "0.3.18", features = ["env-filter"]}
tracing-tracy = "0.11.3"
tracing = {workspace = true}
uuid = {version = "1.10.0", features = ["v4"]}
arrow2.workspace = true
daft-core.workspace = true
daft-schema.workspace = true
daft-table.workspace = true
spark-connect.workspace = true
tracing.workspace = true

[features]
python = ["dep:pyo3"]
python = ["dep:pyo3", "common-daft-config/python", "daft-local-execution/python", "daft-local-plan/python", "daft-logical-plan/python", "daft-scan/python", "daft-table/python"]

[lints]
workspace = true
Expand Down
6 changes: 0 additions & 6 deletions src/daft-connect/src/convert.rs

This file was deleted.

Loading

0 comments on commit 066cde1

Please sign in to comment.