Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Use schema_hints as hints instead of definitive schema #1636

Merged
merged 5 commits into from
Dec 3, 2023

Conversation

colin-ho
Copy link
Contributor

@colin-ho colin-ho commented Nov 19, 2023

Addresses #1599

Instead of using schema_hints as a definitive schema, use them as 'hints' as to the intended datatype of each column.
This is implemented via running schema inference first, then applying the 'hints' onto the inferred schema.

Tests:

  • Added tests for for read_csv, read_json, read_parquet

Feedback greatly appreciated! Let me know if this is the correctly intended behaviour, and also if the code can be optimized/refactored since this is my first time writing Rust!

Copy link

codecov bot commented Nov 19, 2023

Codecov Report

Merging #1636 (ff67fa7) into main (b679661) will decrease coverage by 0.01%.
The diff coverage is 77.77%.

Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #1636      +/-   ##
==========================================
- Coverage   85.07%   85.06%   -0.01%     
==========================================
  Files          55       55              
  Lines        5345     5350       +5     
==========================================
+ Hits         4547     4551       +4     
- Misses        798      799       +1     
Files Coverage Δ
daft/io/common.py 89.47% <100.00%> (+0.58%) ⬆️
daft/logical/builder.py 89.28% <100.00%> (-0.10%) ⬇️
daft/io/_iceberg.py 21.27% <0.00%> (ø)
daft/logical/schema.py 90.90% <75.00%> (-0.61%) ⬇️

Copy link
Contributor

@jaychia jaychia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Amazing work, and nice tests!! Two main questions on my end:

Added option to pass in schema into read_parquet_into_micropartition (this was necessary because the schema created from scan operator was not passed in)

I think read_parquet_into_micropartition should stay agnostic to any external schema information. Could we perform schema hint application and coercion after performing a "naive read" in read_parquet_into_micropartition? See: PR comment.

For read_csv, I added a test case to ensure that if has_headers=false, then the schema_hints should be used as definitive schema.

Any reason why you decided to go with these semantics? Wouldn't the code be simpler and easier to reason about if we allowed for partial schema hints as well for has_headers=false? Are there any fundamental limitations preventing us from doing this?

# If CSV and no headers, then use the schema hint as the schema
if isinstance(file_format_config.config, CsvSourceConfig) and file_format_config.config.has_headers == False:
if len(schema) != len(schema_hint):
raise ValueError(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you decide to enforce this invariant here?

Wouldn't the code still work naively if we provided partial hints like "column_0": DataType.string(), and those hints were applied as per the rest of the PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on this existing test case

def test_create_dataframe_csv_specify_schema_no_headers(
valid_data: list[dict[str, float]], use_native_downloader
) -> None:
with create_temp_filename() as fname:
with open(fname, "w") as f:
header = list(valid_data[0].keys())
writer = csv.writer(f, delimiter="\t")
writer.writerows([[item[col] for col in header] for item in valid_data])
f.flush()
df = daft.read_csv(
fname,
delimiter="\t",
schema_hints={
"sepal_length": DataType.float64(),
"sepal_width": DataType.float64(),
"petal_length": DataType.float64(),
"petal_width": DataType.float64(),
"variety": DataType.string(),
},
has_headers=False,
use_native_downloader=use_native_downloader,
)
assert df.column_names == COL_NAMES
pd_df = df.to_pandas()
assert list(pd_df.columns) == COL_NAMES
assert len(pd_df) == len(valid_data)

I thought maybe it makes sense for schema_hints to be the definitive schema when csv has no headers, as a way to provide named columns instead of the default "column_0" or "column_1", and this would only work if hints for all columns are provided.

But I also agree that it would be simpler and consistent to remove this invariant and let the user realize that column names will default to "column_0" etc., and they can rename their schema hints accordingly. and I also realize that column names can be changed with .alias 😅

removed these checks in latest commit

@@ -86,6 +86,17 @@ impl Schema {
}
}

pub fn apply_hints(&self, hints: &Schema) -> DaftResult<Schema> {
let mut fields = IndexMap::new();
for (name, field) in self.fields.iter() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! This preserves ordering of the original schema as well which is important.

This is completely fine as-is, but if you wanted you can try to use Rust iterators instead which would help you avoid needing an intermediate mut fields variable.

I think IndexMaps can be "collected" from an iterator and something like this might work:

let applied_fields = self.fields
    .iter()
    .map(|(name, field)| match hints.fields.get(name) {
        None => (name.clone(), field.clone()),
        Some(hint_field) => (name.clone(), hint_field.clone()),
    })
    .collect::<IndexMap<String, Field>>();

Ok(Schema {fields: applied_fields});

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup! it works, made the changes. I like it a lot better too, it's more concise and expressive (and more performant? not sure tho will need to learn more about rust)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

more performant

Maybe -- depending on how the compiler chooses to optimize it!

Iterators are pretty idiomatic in Rust :)

@@ -615,6 +616,7 @@ pub(crate) fn read_csv_into_micropartition(
pub(crate) fn read_parquet_into_micropartition(
uris: &[&str],
columns: Option<&[&str]>,
schema: Option<SchemaRef>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Parquet reads differ a little from CSV reads: for Parquet, the file format itself contains a schema and thus no external schema information is required when reading that file.

Therefore for read_parquet_into_micropartition, we will probably not want to pass in the schema (unlike the CSV reads!)

Instead, we can let read_parquet_into_micropartition perform its own schema inference/data parsing, and then later on we can coerce the resultant MicroPartition into the inferred schema. The overall flow would look something like:

// Naively read Parquet file(s) into a MicroPartition, no schema coercion applied
// Note that this all happens lazily because of the nature of MicroPartitions
// being a lazy-loading abstraction
let mp = read_parquet_into_micropartition(...);

let applied_schema = mp.schema().apply(schema_hints);
let mp = mp.cast_to_schema(&applied_schema);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah got it, made this change in latest commit

tests/dataframe/test_creation.py Outdated Show resolved Hide resolved
@jaychia
Copy link
Contributor

jaychia commented Dec 1, 2023

Hey @colin-ho - we recently merged #1686 which should help the full test suite turn green here!

Looks like there's a small 1-line merge conflict left. Feel free to resolve that and we should be able to get this merged :) apologies for the delay!

@colin-ho
Copy link
Contributor Author

colin-ho commented Dec 1, 2023

Awesome, just fixed the merge conflict!

@samster25 samster25 added the bug Something isn't working label Dec 1, 2023
@jaychia jaychia merged commit 3a7fe3b into Eventual-Inc:main Dec 3, 2023
42 of 44 checks passed
jaychia added a commit that referenced this pull request Feb 21, 2024
Schema hint documentation was out of date after: #1636 

This PR fixes our docs

Co-authored-by: Jay Chia <[email protected]@users.noreply.github.com>
samster25 pushed a commit that referenced this pull request Feb 27, 2024
Schema hint documentation was out of date after: #1636 

This PR fixes our docs

Co-authored-by: Jay Chia <[email protected]@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants