Skip to content

Commit

Permalink
support scan nested type(struct, map, list)
Browse files Browse the repository at this point in the history
  • Loading branch information
ZENOTME committed Jan 7, 2025
1 parent e34f428 commit d744a4d
Show file tree
Hide file tree
Showing 7 changed files with 431 additions and 35 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

53 changes: 46 additions & 7 deletions crates/iceberg/src/arrow/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ use crate::expr::visitors::row_group_metrics_evaluator::RowGroupMetricsEvaluator
use crate::expr::{BoundPredicate, BoundReference};
use crate::io::{FileIO, FileMetadata, FileRead};
use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream};
use crate::spec::{Datum, PrimitiveType, Schema};
use crate::spec::{Datum, NestedField, PrimitiveType, Schema, Type};
use crate::utils::available_parallelism;
use crate::{Error, ErrorKind};

Expand Down Expand Up @@ -273,6 +273,28 @@ impl ArrowReader {
Ok((iceberg_field_ids, field_id_map))
}

/// Insert the leaf field id into the field_ids using for projection.
/// For nested type, it will recursively insert the leaf field id.
fn include_leaf_field_id(field: &NestedField, field_ids: &mut Vec<i32>) {
match field.field_type.as_ref() {
Type::Primitive(_) => {
field_ids.push(field.id);
}
Type::Struct(struct_type) => {
for nested_field in struct_type.fields() {
Self::include_leaf_field_id(nested_field, field_ids);
}
}
Type::List(list_type) => {
Self::include_leaf_field_id(&list_type.element_field, field_ids);
}
Type::Map(map_type) => {
Self::include_leaf_field_id(&map_type.key_field, field_ids);
Self::include_leaf_field_id(&map_type.value_field, field_ids);
}
}
}

fn get_arrow_projection_mask(
field_ids: &[i32],
iceberg_schema_of_task: &Schema,
Expand All @@ -297,11 +319,21 @@ impl ArrowReader {
scale: requested_scale,
}),
) if requested_precision >= file_precision && file_scale == requested_scale => true,
// Uuid will be store as Fixed(16) in parquet file, so the read back type will be Fixed(16).
(Some(PrimitiveType::Fixed(16)), Some(PrimitiveType::Uuid)) => true,
_ => false,
}
}

if field_ids.is_empty() {
let mut leaf_field_ids = vec![];
for field_id in field_ids {
let field = iceberg_schema_of_task.field_by_id(*field_id);
if let Some(field) = field {
Self::include_leaf_field_id(field, &mut leaf_field_ids);
}
}

if leaf_field_ids.is_empty() {
Ok(ProjectionMask::all())
} else {
// Build the map between field id and column index in Parquet schema.
Expand All @@ -318,7 +350,7 @@ impl ArrowReader {
.and_then(|field_id| i32::from_str(field_id).ok())
.map_or(false, |field_id| {
projected_fields.insert((*f).clone(), field_id);
field_ids.contains(&field_id)
leaf_field_ids.contains(&field_id)
})
}),
arrow_schema.metadata().clone(),
Expand Down Expand Up @@ -351,19 +383,26 @@ impl ArrowReader {
true
});

if column_map.len() != field_ids.len() {
if column_map.len() != leaf_field_ids.len() {
let miss_field = leaf_field_ids
.iter()
.filter(|field_id| !column_map.contains_key(field_id))
.collect::<Vec<_>>();
return Err(Error::new(
ErrorKind::DataInvalid,
format!(
"Parquet schema {} and Iceberg schema {} do not match.",
iceberg_schema, iceberg_schema_of_task
),
));
)
.with_context("column_map", format! {"{:?}", column_map})
.with_context("field_ids", format! {"{:?}", leaf_field_ids})
.with_context("miss_field", format! {"{:?}", miss_field}));
}

let mut indices = vec![];
for field_id in field_ids {
if let Some(col_idx) = column_map.get(field_id) {
for field_id in leaf_field_ids {
if let Some(col_idx) = column_map.get(&field_id) {
indices.push(*col_idx);
} else {
return Err(Error::new(
Expand Down
9 changes: 7 additions & 2 deletions crates/iceberg/src/arrow/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ use crate::{Error, ErrorKind};
/// When iceberg map type convert to Arrow map type, the default map field name is "key_value".
pub(crate) const DEFAULT_MAP_FIELD_NAME: &str = "key_value";

/// UTC time zone for Arrow timestamp type.
pub const UTC_TIME_ZONE: &str = "+00:00";
/// When iceberg map type convert to Arrow map type, the default map field name is "key_value".
pub const MAP_KEY_VALUE_FIELD_NAME: &str = "key_value";

/// A post order arrow schema visitor.
///
/// For order of methods called, please refer to [`visit_schema`].
Expand Down Expand Up @@ -594,14 +599,14 @@ impl SchemaVisitor for ToArrowSchemaConverter {
)),
crate::spec::PrimitiveType::Timestamptz => Ok(ArrowSchemaOrFieldOrType::Type(
// Timestampz always stored as UTC
DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())),
DataType::Timestamp(TimeUnit::Microsecond, Some(UTC_TIME_ZONE.into())),
)),
crate::spec::PrimitiveType::TimestampNs => Ok(ArrowSchemaOrFieldOrType::Type(
DataType::Timestamp(TimeUnit::Nanosecond, None),
)),
crate::spec::PrimitiveType::TimestamptzNs => Ok(ArrowSchemaOrFieldOrType::Type(
// Store timestamptz_ns as UTC
DataType::Timestamp(TimeUnit::Nanosecond, Some("+00:00".into())),
DataType::Timestamp(TimeUnit::Nanosecond, Some(UTC_TIME_ZONE.into())),
)),
crate::spec::PrimitiveType::String => {
Ok(ArrowSchemaOrFieldOrType::Type(DataType::Utf8))
Expand Down
23 changes: 0 additions & 23 deletions crates/iceberg/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,29 +248,6 @@ impl<'a> TableScanBuilder<'a> {
)
})?;

let field = schema
.as_struct()
.field_by_id(field_id)
.ok_or_else(|| {
Error::new(
ErrorKind::FeatureUnsupported,
format!(
"Column {} is not a direct child of schema but a nested field, which is not supported now. Schema: {}",
column_name, schema
),
)
})?;

if !field.field_type.is_primitive() {
return Err(Error::new(
ErrorKind::FeatureUnsupported,
format!(
"Column {} is not a primitive type. Schema: {}",
column_name, schema
),
));
}

field_ids.push(field_id);
}

Expand Down
10 changes: 7 additions & 3 deletions crates/iceberg/src/spec/datatypes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,11 @@ use crate::spec::datatypes::_decimal::{MAX_PRECISION, REQUIRED_LENGTH};
use crate::spec::PrimitiveLiteral;

/// Field name for list type.
pub(crate) const LIST_FILED_NAME: &str = "element";
pub(crate) const MAP_KEY_FIELD_NAME: &str = "key";
pub(crate) const MAP_VALUE_FIELD_NAME: &str = "value";
pub const LIST_FILED_NAME: &str = "element";
/// Field name for map type's key.
pub const MAP_KEY_FIELD_NAME: &str = "key";
/// Field name for map type's value.
pub const MAP_VALUE_FIELD_NAME: &str = "value";

pub(crate) const MAX_DECIMAL_BYTES: u32 = 24;
pub(crate) const MAX_DECIMAL_PRECISION: u32 = 38;
Expand Down Expand Up @@ -226,8 +228,10 @@ pub enum PrimitiveType {
/// Timestamp in microsecond precision, with timezone
Timestamptz,
/// Timestamp in nanosecond precision, without timezone
#[serde(rename = "timestamp_ns")]
TimestampNs,
/// Timestamp in nanosecond precision with timezone
#[serde(rename = "timestamptz_ns")]
TimestamptzNs,
/// Arbitrary-length character sequences encoded in utf-8
String,
Expand Down
1 change: 1 addition & 0 deletions crates/integration_tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,4 @@ iceberg-catalog-rest = { workspace = true }
iceberg_test_utils = { path = "../test_utils", features = ["tests"] }
parquet = { workspace = true }
tokio = { workspace = true }
uuid = { workspace = true }
Loading

0 comments on commit d744a4d

Please sign in to comment.