-
Notifications
You must be signed in to change notification settings - Fork 174
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BUG] Use schema_hints as hints instead of definitive schema #1636
Changes from 1 commit
bf32432
115fcec
6ecc859
f5d4c92
ff67fa7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -86,6 +86,17 @@ impl Schema { | |
} | ||
} | ||
|
||
pub fn apply_hints(&self, hints: &Schema) -> DaftResult<Schema> { | ||
let mut fields = IndexMap::new(); | ||
for (name, field) in self.fields.iter() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice! This preserves ordering of the original schema as well which is important. This is completely fine as-is, but if you wanted you can try to use Rust iterators instead which would help you avoid needing an intermediate I think IndexMaps can be "collected" from an iterator and something like this might work: let applied_fields = self.fields
.iter()
.map(|(name, field)| match hints.fields.get(name) {
None => (name.clone(), field.clone()),
Some(hint_field) => (name.clone(), hint_field.clone()),
})
.collect::<IndexMap<String, Field>>();
Ok(Schema {fields: applied_fields}); There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yup! it works, made the changes. I like it a lot better too, it's more concise and expressive (and more performant? not sure tho will need to learn more about rust) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Maybe -- depending on how the compiler chooses to optimize it! Iterators are pretty idiomatic in Rust :) |
||
match hints.fields.get(name) { | ||
None => fields.insert(name.clone(), field.clone()), | ||
Some(hint_field) => fields.insert(name.clone(), hint_field.clone()), | ||
}; | ||
} | ||
Ok(Schema { fields }) | ||
} | ||
|
||
pub fn to_arrow(&self) -> DaftResult<arrow2::datatypes::Schema> { | ||
let arrow_fields: DaftResult<Vec<arrow2::datatypes::Field>> = | ||
self.fields.iter().map(|(_, f)| f.to_arrow()).collect(); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -348,6 +348,7 @@ impl MicroPartition { | |
read_parquet_into_micropartition( | ||
uris.as_slice(), | ||
columns.as_deref(), | ||
Some(schema), | ||
None, | ||
scan_task.pushdowns.limit, | ||
row_groups, | ||
|
@@ -615,6 +616,7 @@ pub(crate) fn read_csv_into_micropartition( | |
pub(crate) fn read_parquet_into_micropartition( | ||
uris: &[&str], | ||
columns: Option<&[&str]>, | ||
schema: Option<SchemaRef>, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Parquet reads differ a little from CSV reads: for Parquet, the file format itself contains a schema and thus no external schema information is required when reading that file. Therefore for Instead, we can let // Naively read Parquet file(s) into a MicroPartition, no schema coercion applied
// Note that this all happens lazily because of the nature of MicroPartitions
// being a lazy-loading abstraction
let mp = read_parquet_into_micropartition(...);
let applied_schema = mp.schema().apply(schema_hints);
let mp = mp.cast_to_schema(&applied_schema); There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ah got it, made this change in latest commit |
||
start_offset: Option<usize>, | ||
num_rows: Option<usize>, | ||
row_groups: Option<Vec<Option<Vec<i64>>>>, | ||
|
@@ -659,8 +661,18 @@ pub(crate) fn read_parquet_into_micropartition( | |
let schemas = metadata | ||
.iter() | ||
.map(|m| { | ||
let schema = infer_schema_with_options(m, &Some((*schema_infer_options).into()))?; | ||
let daft_schema = daft_core::schema::Schema::try_from(&schema)?; | ||
// if schema provided use schema, else use inferred schema | ||
let daft_schema = match schema.as_ref() { | ||
Some(s) => Schema { | ||
fields: s.fields.clone(), | ||
}, | ||
None => { | ||
let inferred_schema = | ||
infer_schema_with_options(m, &Some((*schema_infer_options).into()))?; | ||
daft_core::schema::Schema::try_from(&inferred_schema)? | ||
} | ||
}; | ||
|
||
DaftResult::Ok(daft_schema) | ||
}) | ||
.collect::<DaftResult<Vec<_>>>()?; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did you decide to enforce this invariant here?
Wouldn't the code still work naively if we provided partial hints like
"column_0": DataType.string()
, and those hints were applied as per the rest of the PR?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on this existing test case
Daft/tests/dataframe/test_creation.py
Lines 484 to 511 in 06c2ccf
I thought maybe it makes sense for schema_hints to be the definitive schema when csv has no headers, as a way to provide named columns instead of the default "column_0" or "column_1", and this would only work if hints for all columns are provided.
But I also agree that it would be simpler and consistent to remove this invariant and let the user realize that column names will default to "column_0" etc., and they can rename their schema hints accordingly. and I also realize that column names can be changed with
.alias
😅removed these checks in latest commit