-
Notifications
You must be signed in to change notification settings - Fork 174
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BUG] Use schema_hints as hints instead of definitive schema #1636
Conversation
Codecov Report
Additional details and impacted files@@ Coverage Diff @@
## main #1636 +/- ##
==========================================
- Coverage 85.07% 85.06% -0.01%
==========================================
Files 55 55
Lines 5345 5350 +5
==========================================
+ Hits 4547 4551 +4
- Misses 798 799 +1
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Amazing work, and nice tests!! Two main questions on my end:
Added option to pass in schema into read_parquet_into_micropartition (this was necessary because the schema created from scan operator was not passed in)
I think read_parquet_into_micropartition
should stay agnostic to any external schema information. Could we perform schema hint application and coercion after performing a "naive read" in read_parquet_into_micropartition
? See: PR comment.
For read_csv, I added a test case to ensure that if has_headers=false, then the schema_hints should be used as definitive schema.
Any reason why you decided to go with these semantics? Wouldn't the code be simpler and easier to reason about if we allowed for partial schema hints as well for has_headers=false
? Are there any fundamental limitations preventing us from doing this?
daft/io/common.py
Outdated
# If CSV and no headers, then use the schema hint as the schema | ||
if isinstance(file_format_config.config, CsvSourceConfig) and file_format_config.config.has_headers == False: | ||
if len(schema) != len(schema_hint): | ||
raise ValueError( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did you decide to enforce this invariant here?
Wouldn't the code still work naively if we provided partial hints like "column_0": DataType.string()
, and those hints were applied as per the rest of the PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on this existing test case
Daft/tests/dataframe/test_creation.py
Lines 484 to 511 in 06c2ccf
def test_create_dataframe_csv_specify_schema_no_headers( | |
valid_data: list[dict[str, float]], use_native_downloader | |
) -> None: | |
with create_temp_filename() as fname: | |
with open(fname, "w") as f: | |
header = list(valid_data[0].keys()) | |
writer = csv.writer(f, delimiter="\t") | |
writer.writerows([[item[col] for col in header] for item in valid_data]) | |
f.flush() | |
df = daft.read_csv( | |
fname, | |
delimiter="\t", | |
schema_hints={ | |
"sepal_length": DataType.float64(), | |
"sepal_width": DataType.float64(), | |
"petal_length": DataType.float64(), | |
"petal_width": DataType.float64(), | |
"variety": DataType.string(), | |
}, | |
has_headers=False, | |
use_native_downloader=use_native_downloader, | |
) | |
assert df.column_names == COL_NAMES | |
pd_df = df.to_pandas() | |
assert list(pd_df.columns) == COL_NAMES | |
assert len(pd_df) == len(valid_data) |
I thought maybe it makes sense for schema_hints to be the definitive schema when csv has no headers, as a way to provide named columns instead of the default "column_0" or "column_1", and this would only work if hints for all columns are provided.
But I also agree that it would be simpler and consistent to remove this invariant and let the user realize that column names will default to "column_0" etc., and they can rename their schema hints accordingly. and I also realize that column names can be changed with .alias
😅
removed these checks in latest commit
src/daft-core/src/schema.rs
Outdated
@@ -86,6 +86,17 @@ impl Schema { | |||
} | |||
} | |||
|
|||
pub fn apply_hints(&self, hints: &Schema) -> DaftResult<Schema> { | |||
let mut fields = IndexMap::new(); | |||
for (name, field) in self.fields.iter() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice! This preserves ordering of the original schema as well which is important.
This is completely fine as-is, but if you wanted you can try to use Rust iterators instead which would help you avoid needing an intermediate mut fields
variable.
I think IndexMaps can be "collected" from an iterator and something like this might work:
let applied_fields = self.fields
.iter()
.map(|(name, field)| match hints.fields.get(name) {
None => (name.clone(), field.clone()),
Some(hint_field) => (name.clone(), hint_field.clone()),
})
.collect::<IndexMap<String, Field>>();
Ok(Schema {fields: applied_fields});
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup! it works, made the changes. I like it a lot better too, it's more concise and expressive (and more performant? not sure tho will need to learn more about rust)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
more performant
Maybe -- depending on how the compiler chooses to optimize it!
Iterators are pretty idiomatic in Rust :)
@@ -615,6 +616,7 @@ pub(crate) fn read_csv_into_micropartition( | |||
pub(crate) fn read_parquet_into_micropartition( | |||
uris: &[&str], | |||
columns: Option<&[&str]>, | |||
schema: Option<SchemaRef>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Parquet reads differ a little from CSV reads: for Parquet, the file format itself contains a schema and thus no external schema information is required when reading that file.
Therefore for read_parquet_into_micropartition
, we will probably not want to pass in the schema (unlike the CSV reads!)
Instead, we can let read_parquet_into_micropartition
perform its own schema inference/data parsing, and then later on we can coerce the resultant MicroPartition
into the inferred schema. The overall flow would look something like:
// Naively read Parquet file(s) into a MicroPartition, no schema coercion applied
// Note that this all happens lazily because of the nature of MicroPartitions
// being a lazy-loading abstraction
let mp = read_parquet_into_micropartition(...);
let applied_schema = mp.schema().apply(schema_hints);
let mp = mp.cast_to_schema(&applied_schema);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah got it, made this change in latest commit
Awesome, just fixed the merge conflict! |
Schema hint documentation was out of date after: #1636 This PR fixes our docs Co-authored-by: Jay Chia <[email protected]@users.noreply.github.com>
Schema hint documentation was out of date after: #1636 This PR fixes our docs Co-authored-by: Jay Chia <[email protected]@users.noreply.github.com>
Addresses #1599
Instead of using
schema_hints
as a definitive schema, use them as 'hints' as to the intended datatype of each column.This is implemented via running schema inference first, then applying the 'hints' onto the inferred schema.
Tests:
Feedback greatly appreciated! Let me know if this is the correctly intended behaviour, and also if the code can be optimized/refactored since this is my first time writing Rust!