Skip to content

Commit

Permalink
Remove old non-MicroPartition/non-scan operator path; refactor optimi…
Browse files Browse the repository at this point in the history
…zation tests to be based on structural equality.
  • Loading branch information
clarkzinzow committed Feb 23, 2024
1 parent 5b2fe98 commit dc7978e
Show file tree
Hide file tree
Showing 46 changed files with 1,058 additions and 1,721 deletions.
5 changes: 0 additions & 5 deletions .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ jobs:
# cargo llvm-cov --no-run --lcov --output-path report-output/rust-coverage-${{ runner.os }}-${{ matrix.python-version }}-${{ matrix.daft-runner }}-${{ matrix.pyarrow-version }}.lcov
env:
DAFT_RUNNER: ${{ matrix.daft-runner }}
DAFT_MICROPARTITIONS: ${{ matrix.micropartitions }}
- name: Build library and Test with pytest (windows)
if: ${{ (matrix.os == 'windows') }}
run: |
Expand Down Expand Up @@ -222,7 +221,6 @@ jobs:
pytest tests/integration/test_tpch.py --durations=50
env:
DAFT_RUNNER: ${{ matrix.daft-runner }}
DAFT_MICROPARTITIONS: ${{ matrix.micropartitions }}
- name: Send Slack notification on failure
uses: slackapi/[email protected]
if: ${{ failure() && (github.ref == 'refs/heads/main') }}
Expand Down Expand Up @@ -298,7 +296,6 @@ jobs:
pytest tests/integration/io -m 'integration' --durations=50
env:
DAFT_RUNNER: ${{ matrix.daft-runner }}
DAFT_MICROPARTITIONS: ${{ matrix.micropartitions }}
- name: Send Slack notification on failure
uses: slackapi/[email protected]
if: ${{ failure() && (github.ref == 'refs/heads/main') }}
Expand Down Expand Up @@ -395,7 +392,6 @@ jobs:
pytest tests/integration/io -m 'integration' --credentials --durations=50
env:
DAFT_RUNNER: ${{ matrix.daft-runner }}
DAFT_MICROPARTITIONS: ${{ matrix.micropartitions }}
- name: Send Slack notification on failure
uses: slackapi/[email protected]
if: ${{ failure() }}
Expand Down Expand Up @@ -475,7 +471,6 @@ jobs:
pytest tests/integration/iceberg -m 'integration' --durations=50
env:
DAFT_RUNNER: ${{ matrix.daft-runner }}
DAFT_MICROPARTITIONS: ${{ matrix.micropartitions }}
- name: Send Slack notification on failure
uses: slackapi/[email protected]
if: ${{ failure() && (github.ref == 'refs/heads/main') }}
Expand Down
11 changes: 2 additions & 9 deletions daft/daft.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -845,7 +845,6 @@ class PySchema:
def __getitem__(self, name: str) -> PyField: ...
def names(self) -> list[str]: ...
def union(self, other: PySchema) -> PySchema: ...
def apply_hints(self, other: PySchema) -> PySchema: ...
def eq(self, other: PySchema) -> bool: ...
@staticmethod
def from_field_name_and_types(names_and_types: list[tuple[str, PyDataType]]) -> PySchema: ...
Expand Down Expand Up @@ -1157,9 +1156,7 @@ class PhysicalPlanScheduler:
def num_partitions(self) -> int: ...
def partition_spec(self) -> PartitionSpec: ...
def repr_ascii(self, simple: bool) -> str: ...
def to_partition_tasks(
self, psets: dict[str, list[PartitionT]], is_ray_runner: bool
) -> physical_plan.InProgressPhysicalPlan: ...
def to_partition_tasks(self, psets: dict[str, list[PartitionT]]) -> physical_plan.InProgressPhysicalPlan: ...

class LogicalPlanBuilder:
"""
Expand All @@ -1175,11 +1172,7 @@ class LogicalPlanBuilder:
partition_key: str, cache_entry: PartitionCacheEntry, schema: PySchema, num_partitions: int, size_bytes: int
) -> LogicalPlanBuilder: ...
@staticmethod
def table_scan_with_scan_operator(scan_operator: ScanOperatorHandle) -> LogicalPlanBuilder: ...
@staticmethod
def table_scan(
file_infos: FileInfos, schema: PySchema, file_format_config: FileFormatConfig, storage_config: StorageConfig
) -> LogicalPlanBuilder: ...
def table_scan(scan_operator: ScanOperatorHandle) -> LogicalPlanBuilder: ...
def project(self, projection: list[PyExpr], resource_request: ResourceRequest) -> LogicalPlanBuilder: ...
def filter(self, predicate: PyExpr) -> LogicalPlanBuilder: ...
def limit(self, limit: int, eager: bool) -> LogicalPlanBuilder: ...
Expand Down
137 changes: 1 addition & 136 deletions daft/execution/execution_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,7 @@
else:
from typing import Protocol

from daft.daft import (
CsvSourceConfig,
FileFormat,
FileFormatConfig,
IOConfig,
JoinType,
JsonReadOptions,
JsonSourceConfig,
ParquetSourceConfig,
ResourceRequest,
StorageConfig,
)
from daft.daft import FileFormat, IOConfig, JoinType, ResourceRequest
from daft.expressions import Expression, ExpressionsProjection, col
from daft.logical.map_partition_ops import MapPartitionOp
from daft.logical.schema import Schema
Expand All @@ -32,8 +21,6 @@
PartialPartitionMetadata,
PartitionMetadata,
PartitionT,
TableParseCSVOptions,
TableReadOptions,
)
from daft.table import MicroPartition, table_io

Expand Down Expand Up @@ -303,128 +290,6 @@ def num_outputs(self) -> int:
return 1


@dataclass(frozen=True)
class ReadFile(SingleOutputInstruction):
index: int | None
# Known number of rows.
file_rows: int | None
# Max number of rows to read.
limit_rows: int | None
schema: Schema
storage_config: StorageConfig
columns_to_read: list[str] | None
file_format_config: FileFormatConfig

def run(self, inputs: list[MicroPartition]) -> list[MicroPartition]:
return self._read_file(inputs)

def _read_file(self, inputs: list[MicroPartition]) -> list[MicroPartition]:
assert len(inputs) == 1
[filepaths_partition] = inputs
partition = self._handle_tabular_files_scan(
filepaths_partition=filepaths_partition,
)
return [partition]

def run_partial_metadata(self, input_metadatas: list[PartialPartitionMetadata]) -> list[PartialPartitionMetadata]:
assert len(input_metadatas) == 1

num_rows = self.file_rows
# Only take the file read limit into account if we know how big the file is to begin with.
if num_rows is not None and self.limit_rows is not None:
num_rows = min(num_rows, self.limit_rows)

return [
PartialPartitionMetadata(
num_rows=num_rows,
size_bytes=None,
)
]

def _handle_tabular_files_scan(
self,
filepaths_partition: MicroPartition,
) -> MicroPartition:
data = filepaths_partition.to_pydict()
filepaths = data["path"]

if self.index is not None:
filepaths = [filepaths[self.index]]

# Common options for reading vPartition
read_options = TableReadOptions(
num_rows=self.limit_rows,
column_names=self.columns_to_read, # read only specified columns
)

file_format = self.file_format_config.file_format()
format_config = self.file_format_config.config
if file_format == FileFormat.Csv:
assert isinstance(format_config, CsvSourceConfig)
table = MicroPartition.concat(
[
table_io.read_csv(
file=fp,
schema=self.schema,
storage_config=self.storage_config,
csv_options=TableParseCSVOptions(
delimiter=format_config.delimiter,
header_index=0 if format_config.has_headers else None,
double_quote=format_config.double_quote,
quote=format_config.quote,
escape_char=format_config.escape_char,
comment=format_config.comment,
buffer_size=format_config.buffer_size,
chunk_size=format_config.chunk_size,
),
read_options=read_options,
)
for fp in filepaths
]
)
elif file_format == FileFormat.Json:
assert isinstance(format_config, JsonSourceConfig)
table = MicroPartition.concat(
[
table_io.read_json(
file=fp,
schema=self.schema,
storage_config=self.storage_config,
json_read_options=JsonReadOptions(
buffer_size=format_config.buffer_size, chunk_size=format_config.chunk_size
),
read_options=read_options,
)
for fp in filepaths
]
)
elif file_format == FileFormat.Parquet:
assert isinstance(format_config, ParquetSourceConfig)
table = MicroPartition.concat(
[
table_io.read_parquet(
file=fp,
schema=self.schema,
storage_config=self.storage_config,
read_options=read_options,
)
for fp in filepaths
]
)
else:
raise NotImplementedError(f"PyRunner has not implemented scan: {file_format}")

expected_schema = (
Schema._from_fields([self.schema[name] for name in read_options.column_names])
if read_options.column_names is not None
else self.schema
)
assert (
table.schema() == expected_schema
), f"Expected table to have schema:\n{expected_schema}\n\nReceived instead:\n{table.schema()}"
return table


@dataclass(frozen=True)
class WriteFile(SingleOutputInstruction):
file_format: FileFormat
Expand Down
74 changes: 1 addition & 73 deletions daft/execution/physical_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,7 @@
from typing import Generator, Generic, Iterable, Iterator, TypeVar, Union

from daft.context import get_context
from daft.daft import (
FileFormat,
FileFormatConfig,
IOConfig,
JoinType,
ResourceRequest,
StorageConfig,
)
from daft.daft import FileFormat, IOConfig, JoinType, ResourceRequest
from daft.execution import execution_step
from daft.execution.execution_step import (
Instruction,
Expand Down Expand Up @@ -84,71 +77,6 @@ def partition_read(
)


def file_read(
child_plan: InProgressPhysicalPlan[PartitionT],
# Max number of rows to read.
limit_rows: int | None,
schema: Schema,
storage_config: StorageConfig,
columns_to_read: list[str] | None,
file_format_config: FileFormatConfig,
) -> InProgressPhysicalPlan[PartitionT]:
"""child_plan represents partitions with filenames.
Yield a plan to read those filenames.
"""
materializations: deque[SingleOutputPartitionTask[PartitionT]] = deque()
stage_id = next(stage_id_counter)
output_partition_index = 0

while True:
# Check if any inputs finished executing.
while len(materializations) > 0 and materializations[0].done():
done_task = materializations.popleft()

vpartition = done_task.vpartition()
file_infos = vpartition.to_pydict()
file_sizes_bytes = file_infos["size"]
file_rows = file_infos["num_rows"]

# Emit one partition for each file (NOTE: hardcoded for now).
for i in range(len(vpartition)):
file_read_step = PartitionTaskBuilder[PartitionT](
inputs=[done_task.partition()],
partial_metadatas=None, # Child's metadata doesn't really matter for a file read
).add_instruction(
instruction=execution_step.ReadFile(
index=i,
file_rows=file_rows[i],
limit_rows=limit_rows,
schema=schema,
storage_config=storage_config,
columns_to_read=columns_to_read,
file_format_config=file_format_config,
),
# Set the filesize as the memory request.
# (Note: this is very conservative; file readers empirically use much more peak memory than 1x file size.)
resource_request=ResourceRequest(memory_bytes=file_sizes_bytes[i]),
)
yield file_read_step
output_partition_index += 1

# Materialize a single dependency.
try:
child_step = next(child_plan)
if isinstance(child_step, PartitionTaskBuilder):
child_step = child_step.finalize_partition_task_single_output(stage_id=stage_id)
materializations.append(child_step)
yield child_step

except StopIteration:
if len(materializations) > 0:
logger.debug("file_read blocked on completion of first source in: %s", materializations)
yield None
else:
return


def file_write(
child_plan: InProgressPhysicalPlan[PartitionT],
file_format: FileFormat,
Expand Down
34 changes: 0 additions & 34 deletions daft/execution/rust_physical_plan_shim.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,15 @@
from __future__ import annotations

from dataclasses import dataclass
from typing import Iterator, cast

from daft.daft import (
FileFormat,
FileFormatConfig,
IOConfig,
JoinType,
PyExpr,
PySchema,
PyTable,
ResourceRequest,
ScanTask,
StorageConfig,
)
from daft.execution import execution_step, physical_plan
from daft.expressions import Expression, ExpressionsProjection
Expand Down Expand Up @@ -94,36 +90,6 @@ def run_partial_metadata(self, input_metadatas: list[PartialPartitionMetadata])
]


def tabular_scan(
schema: PySchema,
columns_to_read: list[str] | None,
file_info_table: PyTable,
file_format_config: FileFormatConfig,
storage_config: StorageConfig,
limit: int,
is_ray_runner: bool,
) -> physical_plan.InProgressPhysicalPlan[PartitionT]:
# TODO(Clark): Fix this Ray runner hack.
part = MicroPartition._from_pytable(file_info_table)
if is_ray_runner:
import ray

parts = [ray.put(part)]
else:
parts = [part]
parts_t = cast(Iterator[PartitionT], parts)

file_info_iter = physical_plan.partition_read(iter(parts_t))
return physical_plan.file_read(
child_plan=file_info_iter,
limit_rows=limit,
schema=Schema._from_pyschema(schema),
storage_config=storage_config,
columns_to_read=columns_to_read,
file_format_config=file_format_config,
)


def project(
input: physical_plan.InProgressPhysicalPlan[PartitionT], projection: list[PyExpr], resource_request: ResourceRequest
) -> physical_plan.InProgressPhysicalPlan[PartitionT]:
Expand Down
4 changes: 2 additions & 2 deletions daft/io/_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
)
from daft.dataframe import DataFrame
from daft.datatype import DataType
from daft.io.common import _get_tabular_files_scan
from daft.io.common import get_tabular_files_scan


@PublicAPI
Expand Down Expand Up @@ -84,5 +84,5 @@ def read_csv(
storage_config = StorageConfig.native(NativeStorageConfig(True, io_config))
else:
storage_config = StorageConfig.python(PythonStorageConfig(io_config=io_config))
builder = _get_tabular_files_scan(path, schema_hints, file_format_config, storage_config=storage_config)
builder = get_tabular_files_scan(path, schema_hints, file_format_config, storage_config=storage_config)
return DataFrame(builder)
2 changes: 1 addition & 1 deletion daft/io/_iceberg.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,5 +109,5 @@ def read_iceberg(
iceberg_operator = IcebergScanOperator(pyiceberg_table, storage_config=storage_config)

handle = ScanOperatorHandle.from_python_scan_operator(iceberg_operator)
builder = LogicalPlanBuilder.from_tabular_scan_with_scan_operator(scan_operator=handle)
builder = LogicalPlanBuilder.from_tabular_scan(scan_operator=handle)

Check warning on line 112 in daft/io/_iceberg.py

View check run for this annotation

Codecov / codecov/patch

daft/io/_iceberg.py#L112

Added line #L112 was not covered by tests
return DataFrame(builder)
Loading

0 comments on commit dc7978e

Please sign in to comment.