Skip to content

Commit

Permalink
use rust scan task in iceberg
Browse files Browse the repository at this point in the history
  • Loading branch information
samster25 committed Nov 14, 2023
1 parent 71d710d commit 3111951
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 18 deletions.
39 changes: 37 additions & 2 deletions daft/iceberg/iceberg_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,16 @@
from pyiceberg.schema import Schema as IcebergSchema
from pyiceberg.table import Table

from daft.daft import (
FileFormatConfig,
NativeStorageConfig,
ParquetSourceConfig,
ScanTask,
StorageConfig,
)
from daft.datatype import DataType
from daft.expressions.expressions import col
from daft.io import IOConfig, S3Config
from daft.io.scan import PartitionField, ScanOperator, make_partition_field
from daft.logical.schema import Field, Schema

Expand Down Expand Up @@ -62,9 +70,10 @@ def iceberg_partition_spec_to_fields(iceberg_schema: IcebergSchema, spec: Iceber


class IcebergScanOperator(ScanOperator):
def __init__(self, iceberg_table: Table) -> None:
def __init__(self, iceberg_table: Table, io_config: IOConfig | None = None) -> None:
super().__init__()
self._table = iceberg_table
self._io_config = io_config
arrow_schema = schema_to_pyarrow(iceberg_table.schema())
self._schema = Schema.from_pyarrow_schema(arrow_schema)
self._partition_keys = iceberg_partition_spec_to_fields(self._table.schema(), self._table.spec())
Expand All @@ -75,6 +84,30 @@ def schema(self) -> Schema:
def partitioning_keys(self) -> list[PartitionField]:
return self._partition_keys

def _make_scan_tasks(self) -> list[ScanTask]:
iceberg_tasks = self._table.scan().plan_files()
scan_tasks = []
storage_config = StorageConfig.native(NativeStorageConfig(True, self._io_config))
for task in iceberg_tasks:
file = task.file
path = file.file_path
record_count = file.record_count
file_format = file.file_format
if file_format == "PARQUET":
file_format_config = FileFormatConfig.from_parquet_config(ParquetSourceConfig())
else:
raise NotImplementedError(f"{file_format} for iceberg not implemented!")

st = ScanTask.catalog_scan_task(
file=path,
file_format=file_format_config,
schema=self._schema._schema,
num_rows=record_count,
storage_config=storage_config,
)
scan_tasks.append(st)
return scan_tasks


def catalog() -> Catalog:
return load_catalog(
Expand All @@ -91,4 +124,6 @@ def catalog() -> Catalog:

cat = catalog()
tab = cat.load_table("default.test_partitioned_by_years")
ice = IcebergScanOperator(tab)

io_config = IOConfig(s3=S3Config(endpoint_url="http://localhost:9000", key_id="admin", access_key="password"))
ice = IcebergScanOperator(tab, io_config=io_config)
15 changes: 0 additions & 15 deletions daft/io/scan.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,12 @@
from __future__ import annotations

import abc
from dataclasses import dataclass

from daft.daft import PartitionField
from daft.expressions.expressions import Expression
from daft.logical.schema import Field, Schema


@dataclass(frozen=True)
class ScanTask:
file_type: str
columns: list[str] | None
limit: int | None


# @dataclass(frozen=True)
# class PartitionField:
# field: Field
# source_field: Field
# transform: Expression


def make_partition_field(
field: Field, source_field: Field | None = None, transform: Expression | None = None
) -> PartitionField:
Expand Down
3 changes: 2 additions & 1 deletion src/daft-scan/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{

use common_error::{DaftError, DaftResult};
use daft_core::{datatypes::Field, schema::SchemaRef};
use daft_dsl::{optimization::get_required_columns, Expr, ExprRef};
use daft_dsl::{Expr, ExprRef, optimization::get_required_columns};
use daft_stats::{PartitionSpec, TableMetadata, TableStatistics};
use file_format::FileFormatConfig;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -242,6 +242,7 @@ impl Display for PartitionField {
}
}


pub trait ScanOperator: Send + Sync + Display + Debug {
fn schema(&self) -> SchemaRef;
fn partitioning_keys(&self) -> &[PartitionField];
Expand Down
4 changes: 4 additions & 0 deletions src/daft-scan/src/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,10 @@ pub mod pylib {
let scan_task = ScanTask::new(vec![data_source], file_format.into(), schema.schema, storage_config.into(), Pushdowns::default());
Ok(PyScanTask(scan_task.into()))
}

pub fn __repr__(&self) -> PyResult<String> {
Ok(format!("{:?}", self.0))
}
}

impl From<Arc<ScanTask>> for PyScanTask {
Expand Down

0 comments on commit 3111951

Please sign in to comment.