Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Use schema_hints as hints instead of definitive schema #1636

Merged
merged 5 commits into from
Dec 3, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions daft/daft.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ class ScanOperatorHandle:
glob_path: list[str],
file_format_config: FileFormatConfig,
storage_config: StorageConfig,
schema: PySchema | None = None,
schema_hint: PySchema | None = None,
) -> ScanOperatorHandle: ...
@staticmethod
def from_python_scan_operator(operator: ScanOperator) -> ScanOperatorHandle: ...
Expand Down Expand Up @@ -636,6 +636,7 @@ class PySchema:
def __getitem__(self, name: str) -> PyField: ...
def names(self) -> list[str]: ...
def union(self, other: PySchema) -> PySchema: ...
def apply_hints(self, other: PySchema) -> PySchema: ...
def eq(self, other: PySchema) -> bool: ...
@staticmethod
def from_field_name_and_types(names_and_types: list[tuple[str, PyDataType]]) -> PySchema: ...
Expand Down Expand Up @@ -917,9 +918,7 @@ class LogicalPlanBuilder:
partition_key: str, cache_entry: PartitionCacheEntry, schema: PySchema, num_partitions: int
) -> LogicalPlanBuilder: ...
@staticmethod
def table_scan_with_scan_operator(
scan_operator: ScanOperatorHandle, schema_hint: PySchema | None
) -> LogicalPlanBuilder: ...
def table_scan_with_scan_operator(scan_operator: ScanOperatorHandle) -> LogicalPlanBuilder: ...
@staticmethod
def table_scan(
file_infos: FileInfos, schema: PySchema, file_format_config: FileFormatConfig, storage_config: StorageConfig
Expand Down
4 changes: 1 addition & 3 deletions daft/io/_iceberg.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,5 @@
iceberg_operator = IcebergScanOperator(pyiceberg_table, storage_config=storage_config)

handle = ScanOperatorHandle.from_python_scan_operator(iceberg_operator)
builder = LogicalPlanBuilder.from_tabular_scan_with_scan_operator(
scan_operator=handle, schema_hint=iceberg_operator.schema()
)
builder = LogicalPlanBuilder.from_tabular_scan_with_scan_operator(scan_operator=handle)

Check warning on line 84 in daft/io/_iceberg.py

View check run for this annotation

Codecov / codecov/patch

daft/io/_iceberg.py#L84

Added line #L84 was not covered by tests
return DataFrame(builder)
28 changes: 18 additions & 10 deletions daft/io/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from daft.context import get_context
from daft.daft import (
CsvSourceConfig,
FileFormatConfig,
NativeStorageConfig,
PythonStorageConfig,
Expand Down Expand Up @@ -55,38 +56,45 @@ def _get_tabular_files_scan(
path,
file_format_config,
storage_config,
schema=schema_hint._schema if schema_hint is not None else None,
schema_hint=schema_hint._schema if schema_hint is not None else None,
)
elif isinstance(path, str):
scan_op = ScanOperatorHandle.glob_scan(
[path],
file_format_config,
storage_config,
schema=schema_hint._schema if schema_hint is not None else None,
schema_hint=schema_hint._schema if schema_hint is not None else None,
)
else:
raise NotImplementedError(f"_get_tabular_files_scan cannot construct ScanOperatorHandle for input: {path}")

builder = LogicalPlanBuilder.from_tabular_scan_with_scan_operator(
scan_operator=scan_op,
schema_hint=schema_hint,
)
return builder

paths = path if isinstance(path, list) else [str(path)]
runner_io = get_context().runner().runner_io()
file_infos = runner_io.glob_paths_details(paths, file_format_config=file_format_config, io_config=io_config)

# Infer schema if no hints provided
inferred_or_provided_schema = (
schema_hint
if schema_hint is not None
else runner_io.get_schema_from_first_filepath(file_infos, file_format_config, storage_config)
)
# Infer schema
schema = runner_io.get_schema_from_first_filepath(file_infos, file_format_config, storage_config)

# Apply hints from schema_hints if provided
if schema_hint is not None:
# If CSV and no headers, then use the schema hint as the schema
if isinstance(file_format_config.config, CsvSourceConfig) and file_format_config.config.has_headers == False:
if len(schema) != len(schema_hint):
raise ValueError(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you decide to enforce this invariant here?

Wouldn't the code still work naively if we provided partial hints like "column_0": DataType.string(), and those hints were applied as per the rest of the PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on this existing test case

def test_create_dataframe_csv_specify_schema_no_headers(
valid_data: list[dict[str, float]], use_native_downloader
) -> None:
with create_temp_filename() as fname:
with open(fname, "w") as f:
header = list(valid_data[0].keys())
writer = csv.writer(f, delimiter="\t")
writer.writerows([[item[col] for col in header] for item in valid_data])
f.flush()
df = daft.read_csv(
fname,
delimiter="\t",
schema_hints={
"sepal_length": DataType.float64(),
"sepal_width": DataType.float64(),
"petal_length": DataType.float64(),
"petal_width": DataType.float64(),
"variety": DataType.string(),
},
has_headers=False,
use_native_downloader=use_native_downloader,
)
assert df.column_names == COL_NAMES
pd_df = df.to_pandas()
assert list(pd_df.columns) == COL_NAMES
assert len(pd_df) == len(valid_data)

I thought maybe it makes sense for schema_hints to be the definitive schema when csv has no headers, as a way to provide named columns instead of the default "column_0" or "column_1", and this would only work if hints for all columns are provided.

But I also agree that it would be simpler and consistent to remove this invariant and let the user realize that column names will default to "column_0" etc., and they can rename their schema hints accordingly. and I also realize that column names can be changed with .alias 😅

removed these checks in latest commit

f"For CSV with no headers, number of columns in schema hint ({len(schema_hint)} columns were provided) must match number of columns in data: {len(schema)}."
)
schema = schema_hint
else:
schema = schema.apply_hints(schema_hint)
# Construct plan
builder = LogicalPlanBuilder.from_tabular_scan(
file_infos=file_infos,
schema=inferred_or_provided_schema,
schema=schema,
file_format_config=file_format_config,
storage_config=storage_config,
)
Expand Down
4 changes: 1 addition & 3 deletions daft/logical/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,8 @@ def from_tabular_scan_with_scan_operator(
cls,
*,
scan_operator: ScanOperatorHandle,
schema_hint: Schema | None,
) -> LogicalPlanBuilder:
pyschema = schema_hint._schema if schema_hint is not None else None
builder = _LogicalPlanBuilder.table_scan_with_scan_operator(scan_operator, pyschema)
builder = _LogicalPlanBuilder.table_scan_with_scan_operator(scan_operator)
return cls(builder)

@classmethod
Expand Down
6 changes: 6 additions & 0 deletions daft/logical/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,12 @@

return Schema._from_pyschema(self._schema.union(other._schema))

def apply_hints(self, other: Schema) -> Schema:
if not isinstance(other, Schema):
raise ValueError(f"Expected Schema, got other: {type(other)}")

Check warning on line 145 in daft/logical/schema.py

View check run for this annotation

Codecov / codecov/patch

daft/logical/schema.py#L145

Added line #L145 was not covered by tests

return Schema._from_pyschema(self._schema.apply_hints(other._schema))

def __reduce__(self) -> tuple:
return Schema._from_pyschema, (self._schema,)

Expand Down
5 changes: 5 additions & 0 deletions src/daft-core/src/python/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ impl PySchema {
Ok(new_schema.into())
}

pub fn apply_hints(&self, hints: &PySchema) -> PyResult<PySchema> {
let new_schema = Arc::new(self.schema.apply_hints(&hints.schema)?);
Ok(new_schema.into())
}

pub fn eq(&self, other: &PySchema) -> PyResult<bool> {
Ok(self.schema.fields.eq(&other.schema.fields))
}
Expand Down
11 changes: 11 additions & 0 deletions src/daft-core/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,17 @@ impl Schema {
}
}

pub fn apply_hints(&self, hints: &Schema) -> DaftResult<Schema> {
let mut fields = IndexMap::new();
for (name, field) in self.fields.iter() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! This preserves ordering of the original schema as well which is important.

This is completely fine as-is, but if you wanted you can try to use Rust iterators instead which would help you avoid needing an intermediate mut fields variable.

I think IndexMaps can be "collected" from an iterator and something like this might work:

let applied_fields = self.fields
    .iter()
    .map(|(name, field)| match hints.fields.get(name) {
        None => (name.clone(), field.clone()),
        Some(hint_field) => (name.clone(), hint_field.clone()),
    })
    .collect::<IndexMap<String, Field>>();

Ok(Schema {fields: applied_fields});

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup! it works, made the changes. I like it a lot better too, it's more concise and expressive (and more performant? not sure tho will need to learn more about rust)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

more performant

Maybe -- depending on how the compiler chooses to optimize it!

Iterators are pretty idiomatic in Rust :)

match hints.fields.get(name) {
None => fields.insert(name.clone(), field.clone()),
Some(hint_field) => fields.insert(name.clone(), hint_field.clone()),
};
}
Ok(Schema { fields })
}

pub fn to_arrow(&self) -> DaftResult<arrow2::datatypes::Schema> {
let arrow_fields: DaftResult<Vec<arrow2::datatypes::Field>> =
self.fields.iter().map(|(_, f)| f.to_arrow()).collect();
Expand Down
16 changes: 14 additions & 2 deletions src/daft-micropartition/src/micropartition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,7 @@ impl MicroPartition {
read_parquet_into_micropartition(
uris.as_slice(),
columns.as_deref(),
Some(schema),
None,
scan_task.pushdowns.limit,
row_groups,
Expand Down Expand Up @@ -615,6 +616,7 @@ pub(crate) fn read_csv_into_micropartition(
pub(crate) fn read_parquet_into_micropartition(
uris: &[&str],
columns: Option<&[&str]>,
schema: Option<SchemaRef>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Parquet reads differ a little from CSV reads: for Parquet, the file format itself contains a schema and thus no external schema information is required when reading that file.

Therefore for read_parquet_into_micropartition, we will probably not want to pass in the schema (unlike the CSV reads!)

Instead, we can let read_parquet_into_micropartition perform its own schema inference/data parsing, and then later on we can coerce the resultant MicroPartition into the inferred schema. The overall flow would look something like:

// Naively read Parquet file(s) into a MicroPartition, no schema coercion applied
// Note that this all happens lazily because of the nature of MicroPartitions
// being a lazy-loading abstraction
let mp = read_parquet_into_micropartition(...);

let applied_schema = mp.schema().apply(schema_hints);
let mp = mp.cast_to_schema(&applied_schema);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah got it, made this change in latest commit

start_offset: Option<usize>,
num_rows: Option<usize>,
row_groups: Option<Vec<Option<Vec<i64>>>>,
Expand Down Expand Up @@ -659,8 +661,18 @@ pub(crate) fn read_parquet_into_micropartition(
let schemas = metadata
.iter()
.map(|m| {
let schema = infer_schema_with_options(m, &Some((*schema_infer_options).into()))?;
let daft_schema = daft_core::schema::Schema::try_from(&schema)?;
// if schema provided use schema, else use inferred schema
let daft_schema = match schema.as_ref() {
Some(s) => Schema {
fields: s.fields.clone(),
},
None => {
let inferred_schema =
infer_schema_with_options(m, &Some((*schema_infer_options).into()))?;
daft_core::schema::Schema::try_from(&inferred_schema)?
}
};

DaftResult::Ok(daft_schema)
})
.collect::<DaftResult<Vec<_>>>()?;
Expand Down
2 changes: 2 additions & 0 deletions src/daft-micropartition/src/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,7 @@ impl PyMicroPartition {
crate::micropartition::read_parquet_into_micropartition(
[uri].as_ref(),
columns.as_deref(),
None,
start_offset,
num_rows,
row_groups.map(|rg| vec![Some(rg)]),
Expand Down Expand Up @@ -476,6 +477,7 @@ impl PyMicroPartition {
crate::micropartition::read_parquet_into_micropartition(
uris.as_ref(),
columns.as_deref(),
None,
start_offset,
num_rows,
row_groups,
Expand Down
18 changes: 4 additions & 14 deletions src/daft-plan/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,8 @@ impl LogicalPlanBuilder {
Ok(logical_plan.into())
}

pub fn table_scan_with_scan_operator(
scan_operator: ScanOperatorRef,
schema_hint: Option<SchemaRef>,
) -> DaftResult<Self> {
let schema = schema_hint.unwrap_or_else(|| scan_operator.0.schema());
pub fn table_scan_with_scan_operator(scan_operator: ScanOperatorRef) -> DaftResult<Self> {
let schema = scan_operator.0.schema();
let partitioning_keys = scan_operator.0.partitioning_keys();
let source_info =
SourceInfo::ExternalInfo(ExternalSourceInfo::Scan(ScanExternalInfo::new(
Expand Down Expand Up @@ -298,15 +295,8 @@ impl PyLogicalPlanBuilder {
}

#[staticmethod]
pub fn table_scan_with_scan_operator(
scan_operator: ScanOperatorHandle,
schema_hint: Option<PySchema>,
) -> PyResult<Self> {
Ok(LogicalPlanBuilder::table_scan_with_scan_operator(
scan_operator.into(),
schema_hint.map(|s| s.into()),
)?
.into())
pub fn table_scan_with_scan_operator(scan_operator: ScanOperatorHandle) -> PyResult<Self> {
Ok(LogicalPlanBuilder::table_scan_with_scan_operator(scan_operator.into())?.into())
}

#[staticmethod]
Expand Down
Loading
Loading