Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support scan nested type(struct, map, list) #882

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

53 changes: 46 additions & 7 deletions crates/iceberg/src/arrow/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ use crate::expr::visitors::row_group_metrics_evaluator::RowGroupMetricsEvaluator
use crate::expr::{BoundPredicate, BoundReference};
use crate::io::{FileIO, FileMetadata, FileRead};
use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream};
use crate::spec::{Datum, PrimitiveType, Schema};
use crate::spec::{Datum, NestedField, PrimitiveType, Schema, Type};
use crate::utils::available_parallelism;
use crate::{Error, ErrorKind};

Expand Down Expand Up @@ -273,6 +273,28 @@ impl ArrowReader {
Ok((iceberg_field_ids, field_id_map))
}

/// Insert the leaf field id into the field_ids using for projection.
/// For nested type, it will recursively insert the leaf field id.
fn include_leaf_field_id(field: &NestedField, field_ids: &mut Vec<i32>) {
match field.field_type.as_ref() {
Type::Primitive(_) => {
field_ids.push(field.id);
}
Type::Struct(struct_type) => {
for nested_field in struct_type.fields() {
Self::include_leaf_field_id(nested_field, field_ids);
}
}
Type::List(list_type) => {
Self::include_leaf_field_id(&list_type.element_field, field_ids);
}
Type::Map(map_type) => {
Self::include_leaf_field_id(&map_type.key_field, field_ids);
Self::include_leaf_field_id(&map_type.value_field, field_ids);
}
}
}

fn get_arrow_projection_mask(
field_ids: &[i32],
iceberg_schema_of_task: &Schema,
Expand All @@ -297,11 +319,21 @@ impl ArrowReader {
scale: requested_scale,
}),
) if requested_precision >= file_precision && file_scale == requested_scale => true,
// Uuid will be store as Fixed(16) in parquet file, so the read back type will be Fixed(16).
(Some(PrimitiveType::Fixed(16)), Some(PrimitiveType::Uuid)) => true,
_ => false,
}
}

if field_ids.is_empty() {
let mut leaf_field_ids = vec![];
for field_id in field_ids {
let field = iceberg_schema_of_task.field_by_id(*field_id);
if let Some(field) = field {
Self::include_leaf_field_id(field, &mut leaf_field_ids);
}
}

if leaf_field_ids.is_empty() {
Ok(ProjectionMask::all())
} else {
// Build the map between field id and column index in Parquet schema.
Expand All @@ -318,7 +350,7 @@ impl ArrowReader {
.and_then(|field_id| i32::from_str(field_id).ok())
.map_or(false, |field_id| {
projected_fields.insert((*f).clone(), field_id);
field_ids.contains(&field_id)
leaf_field_ids.contains(&field_id)
})
}),
arrow_schema.metadata().clone(),
Expand Down Expand Up @@ -351,19 +383,26 @@ impl ArrowReader {
true
});

if column_map.len() != field_ids.len() {
if column_map.len() != leaf_field_ids.len() {
let miss_field = leaf_field_ids
.iter()
.filter(|field_id| !column_map.contains_key(field_id))
.collect::<Vec<_>>();
return Err(Error::new(
ErrorKind::DataInvalid,
format!(
"Parquet schema {} and Iceberg schema {} do not match.",
iceberg_schema, iceberg_schema_of_task
),
));
)
.with_context("column_map", format! {"{:?}", column_map})
.with_context("field_ids", format! {"{:?}", leaf_field_ids})
.with_context("miss_field", format! {"{:?}", miss_field}));
}

let mut indices = vec![];
for field_id in field_ids {
if let Some(col_idx) = column_map.get(field_id) {
for field_id in leaf_field_ids {
if let Some(col_idx) = column_map.get(&field_id) {
indices.push(*col_idx);
} else {
return Err(Error::new(
Expand Down
9 changes: 7 additions & 2 deletions crates/iceberg/src/arrow/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ use crate::{Error, ErrorKind};
/// When iceberg map type convert to Arrow map type, the default map field name is "key_value".
pub(crate) const DEFAULT_MAP_FIELD_NAME: &str = "key_value";

/// UTC time zone for Arrow timestamp type.
pub const UTC_TIME_ZONE: &str = "+00:00";
/// When iceberg map type convert to Arrow map type, the default map field name is "key_value".
pub const MAP_KEY_VALUE_FIELD_NAME: &str = "key_value";
Copy link
Contributor

@sdd sdd Jan 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how does this differ from DEFAULT_MAP_FIELD_NAME above? The descriptions in their comments are the same.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch. Remove duplicate one.


/// A post order arrow schema visitor.
///
/// For order of methods called, please refer to [`visit_schema`].
Expand Down Expand Up @@ -594,14 +599,14 @@ impl SchemaVisitor for ToArrowSchemaConverter {
)),
crate::spec::PrimitiveType::Timestamptz => Ok(ArrowSchemaOrFieldOrType::Type(
// Timestampz always stored as UTC
DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())),
DataType::Timestamp(TimeUnit::Microsecond, Some(UTC_TIME_ZONE.into())),
)),
crate::spec::PrimitiveType::TimestampNs => Ok(ArrowSchemaOrFieldOrType::Type(
DataType::Timestamp(TimeUnit::Nanosecond, None),
)),
crate::spec::PrimitiveType::TimestamptzNs => Ok(ArrowSchemaOrFieldOrType::Type(
// Store timestamptz_ns as UTC
DataType::Timestamp(TimeUnit::Nanosecond, Some("+00:00".into())),
DataType::Timestamp(TimeUnit::Nanosecond, Some(UTC_TIME_ZONE.into())),
)),
crate::spec::PrimitiveType::String => {
Ok(ArrowSchemaOrFieldOrType::Type(DataType::Utf8))
Expand Down
23 changes: 0 additions & 23 deletions crates/iceberg/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,29 +248,6 @@ impl<'a> TableScanBuilder<'a> {
)
})?;

let field = schema
.as_struct()
.field_by_id(field_id)
.ok_or_else(|| {
Error::new(
ErrorKind::FeatureUnsupported,
format!(
"Column {} is not a direct child of schema but a nested field, which is not supported now. Schema: {}",
column_name, schema
),
)
})?;

if !field.field_type.is_primitive() {
return Err(Error::new(
ErrorKind::FeatureUnsupported,
format!(
"Column {} is not a primitive type. Schema: {}",
column_name, schema
),
));
}

field_ids.push(field_id);
}

Expand Down
10 changes: 7 additions & 3 deletions crates/iceberg/src/spec/datatypes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,11 @@ use crate::spec::datatypes::_decimal::{MAX_PRECISION, REQUIRED_LENGTH};
use crate::spec::PrimitiveLiteral;

/// Field name for list type.
pub(crate) const LIST_FILED_NAME: &str = "element";
pub(crate) const MAP_KEY_FIELD_NAME: &str = "key";
pub(crate) const MAP_VALUE_FIELD_NAME: &str = "value";
pub const LIST_FILED_NAME: &str = "element";
/// Field name for map type's key.
pub const MAP_KEY_FIELD_NAME: &str = "key";
/// Field name for map type's value.
pub const MAP_VALUE_FIELD_NAME: &str = "value";

pub(crate) const MAX_DECIMAL_BYTES: u32 = 24;
pub(crate) const MAX_DECIMAL_PRECISION: u32 = 38;
Expand Down Expand Up @@ -226,8 +228,10 @@ pub enum PrimitiveType {
/// Timestamp in microsecond precision, with timezone
Timestamptz,
/// Timestamp in nanosecond precision, without timezone
#[serde(rename = "timestamp_ns")]
TimestampNs,
/// Timestamp in nanosecond precision with timezone
#[serde(rename = "timestamptz_ns")]
TimestamptzNs,
/// Arbitrary-length character sequences encoded in utf-8
String,
Expand Down
1 change: 1 addition & 0 deletions crates/integration_tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,4 @@ iceberg-catalog-rest = { workspace = true }
iceberg_test_utils = { path = "../test_utils", features = ["tests"] }
parquet = { workspace = true }
tokio = { workspace = true }
uuid = { workspace = true }
Loading
Loading