Skip to content

Commit

Permalink
Update to use field_ids from field metadata in arrow2 fork
Browse files Browse the repository at this point in the history
  • Loading branch information
Jay Chia committed Feb 22, 2024
1 parent 56562e5 commit e8e22c2
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 31 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,11 @@ tokio-util = "0.7.8"
url = "2.4.0"

[workspace.dependencies.arrow2]
# branch = "daft-fork"
# TODO: Update this to daft-fork
# branch = "jay/fd-add-rename-test"
git = "https://github.com/Eventual-Inc/arrow2"
package = "arrow2"
rev = "d5685eebf1d65c3f3d854370ad39f93dcd91971a"
rev = "79654e8475"

[workspace.dependencies.bincode]
version = "1.3.3"
Expand Down
64 changes: 37 additions & 27 deletions src/daft-parquet/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ pub(crate) struct RowGroupRange {
pub(crate) struct ParquetFileReader {
uri: String,
metadata: Arc<parquet2::metadata::FileMetaData>,
arrow_schema: arrow2::datatypes::SchemaRef,
arrow_schema_from_pq: arrow2::datatypes::SchemaRef,
row_ranges: Arc<Vec<RowGroupRange>>,
field_id_mapping: Option<Arc<BTreeMap<i32, Field>>>,
}
Expand Down Expand Up @@ -354,25 +354,25 @@ impl ParquetFileReader {
fn new(
uri: String,
metadata: parquet2::metadata::FileMetaData,
arrow_schema: arrow2::datatypes::Schema,
arrow_schema_from_pq: arrow2::datatypes::Schema,
row_ranges: Vec<RowGroupRange>,
field_id_mapping: Option<Arc<BTreeMap<i32, Field>>>,
) -> super::Result<Self> {
Ok(ParquetFileReader {
uri,
metadata: Arc::new(metadata),
arrow_schema: arrow_schema.into(),
arrow_schema_from_pq: arrow_schema_from_pq.into(),
row_ranges: Arc::new(row_ranges),
field_id_mapping,
})
}

pub fn arrow_schema(&self) -> &Arc<arrow2::datatypes::Schema> {
&self.arrow_schema
&self.arrow_schema_from_pq
}

fn naive_read_plan(&self) -> super::Result<ReadPlanner> {
let arrow_fields = &self.arrow_schema.fields;
let arrow_fields = &self.arrow_schema_from_pq.fields;

let mut read_planner = ReadPlanner::new(&self.uri);

Expand Down Expand Up @@ -430,30 +430,45 @@ impl ParquetFileReader {
) -> DaftResult<Table> {
let metadata = self.metadata;
let all_handles = self
.arrow_schema
.arrow_schema_from_pq
.fields
.iter()
.map(|field| {
.map(|pq_arrow_field| {
// Retrieve the appropriate field name from the field_id mapping
// TODO: We should do this recursively as well
let target_field_name = if let (Some(field_id_mapping), Some(field_id)) = (
self.field_id_mapping.as_ref(),
pq_arrow_field.metadata.get("field_id"),
) {
field_id_mapping
.get(&str::parse::<i32>(field_id).unwrap())
.map(|f| f.name.clone())
.unwrap_or(pq_arrow_field.name.clone())
} else {
pq_arrow_field.name.clone()
};

let owned_row_ranges = self.row_ranges.clone();

let field_handles = owned_row_ranges
.iter()
.map(|row_range| {
let row_range = *row_range;
let rt_handle = tokio::runtime::Handle::current();
let field = field.clone();
let pq_arrow_field = pq_arrow_field.clone();
let owned_uri = self.uri.clone();
let rg = metadata
.row_groups
.get(row_range.row_group_index)
.expect("Row Group index should be in bounds");
let num_rows = rg.num_rows().min(row_range.start + row_range.num_rows);
let columns = rg.columns();
let field_name = &field.name;
let filtered_cols_idx = columns
.iter()
.enumerate()
.filter(|(_, x)| &x.descriptor().path_in_schema[0] == field_name)
.filter(|(_, x)| {
x.descriptor().path_in_schema[0] == pq_arrow_field.name
})
.map(|(i, _)| i)
.collect::<Vec<_>>();

Expand All @@ -471,6 +486,7 @@ impl ParquetFileReader {
})
.collect::<Vec<_>>();
let metadata = metadata.clone();
let cloned_target_field_name = target_field_name.clone();
let handle = tokio::task::spawn(async move {
let mut decompressed_iters =
Vec::with_capacity(filtered_cols_idx.len());
Expand Down Expand Up @@ -516,7 +532,7 @@ impl ParquetFileReader {
let arr_iter = column_iter_to_arrays(
decompressed_iters,
ptypes.iter().collect(),
field.clone(),
pq_arrow_field.clone(),
Some(2048),
num_rows,
);
Expand Down Expand Up @@ -544,7 +560,7 @@ impl ParquetFileReader {
.into_iter()
.map(|a| {
Series::try_from((
field.name.as_str(),
cloned_target_field_name.as_str(),
cast_array_for_daft_if_needed(a),
))
})
Expand All @@ -559,7 +575,7 @@ impl ParquetFileReader {
})
.collect::<DaftResult<Vec<_>>>()?;
let owned_uri = self.uri.clone();
let owned_field = field.clone();
let owned_field = pq_arrow_field.clone();
let concated_handle = tokio::task::spawn(async move {
let series_to_concat =
try_join_all(field_handles).await.context(JoinSnafu {
Expand Down Expand Up @@ -595,30 +611,24 @@ impl ParquetFileReader {
})?
.into_iter()
.collect::<DaftResult<Vec<_>>>()?;
let daft_schema = daft_core::schema::Schema::try_from(self.arrow_schema.as_ref())?;
let daft_schema = daft_core::schema::Schema::try_from(self.arrow_schema_from_pq.as_ref())?;

// Apply `field_id_mapping` to the parsed daft_schema and data (if provided)
let column_name_mapping = self
.field_id_mapping
.map(|field_id_mapping| get_column_name_mapping(&field_id_mapping, metadata.schema()));
let (daft_schema, all_series) = if let Some(column_name_mapping) = column_name_mapping {
let daft_schema = if let Some(column_name_mapping) = column_name_mapping {
let new_daft_fields = daft_schema.fields.into_iter().map(|(name, field)| {
match column_name_mapping.get(&name) {
None => field,
Some(mapped_name) => field.rename(mapped_name),
}
});
let new_daft_schema = daft_core::schema::Schema::new(new_daft_fields.collect())
.expect("Daft schema should be constructed from column name mapping");
let new_all_series = all_series
.into_iter()
.map(|s| match column_name_mapping.get(s.name()) {
None => s,
Some(mapped_name) => s.rename(mapped_name),
})
.collect();
(new_daft_schema, new_all_series)

daft_core::schema::Schema::new(new_daft_fields.collect())
.expect("Daft schema should be constructed from column name mapping")
} else {
(daft_schema, all_series)
daft_schema
};

Table::new(daft_schema, all_series)
Expand All @@ -630,7 +640,7 @@ impl ParquetFileReader {
) -> DaftResult<Vec<Vec<Box<dyn arrow2::array::Array>>>> {
let metadata = self.metadata;
let all_handles = self
.arrow_schema
.arrow_schema_from_pq
.fields
.iter()
.map(|field| {
Expand Down
2 changes: 1 addition & 1 deletion src/daft-table/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ impl Table {
let mut num_rows = 1;

for (field, series) in schema.fields.values().zip(columns.iter()) {
if field != series.field() {
if field.name != series.field().name || field.dtype != series.field().dtype {
return Err(DaftError::SchemaMismatch(format!("While building a Table, we found that the Schema Field and the Series Field did not match. schema field: {field} vs series field: {}", series.field())));
}
if (series.len() != 1) && (series.len() != num_rows) {
Expand Down

0 comments on commit e8e22c2

Please sign in to comment.