From ec127eceaa92a214d45bb5228bab93b92b5046fd Mon Sep 17 00:00:00 2001 From: Willi Raschkowski Date: Mon, 30 Dec 2024 14:56:10 +0100 Subject: [PATCH] Support 'entries' metadata table --- crates/iceberg/Cargo.toml | 1 + crates/iceberg/src/arrow/schema.rs | 187 +++++ crates/iceberg/src/inspect/entries.rs | 688 ++++++++++++++++++ crates/iceberg/src/inspect/manifests.rs | 74 +- crates/iceberg/src/inspect/metadata_table.rs | 66 +- crates/iceberg/src/inspect/mod.rs | 2 + crates/iceberg/src/inspect/snapshots.rs | 59 +- crates/iceberg/src/scan.rs | 160 +++- crates/iceberg/src/spec/manifest.rs | 6 + .../testdata/example_table_metadata_v2.json | 10 +- 10 files changed, 1114 insertions(+), 139 deletions(-) create mode 100644 crates/iceberg/src/inspect/entries.rs diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index 97e77a2c5..9ad0fe145 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -85,6 +85,7 @@ uuid = { workspace = true } zstd = { workspace = true } [dev-dependencies] +arrow-cast = { workspace = true, features = ["prettyprint"] } ctor = { workspace = true } expect-test = { workspace = true } iceberg-catalog-memory = { workspace = true } diff --git a/crates/iceberg/src/arrow/schema.rs b/crates/iceberg/src/arrow/schema.rs index b590c8bc8..d9c3e79f3 100644 --- a/crates/iceberg/src/arrow/schema.rs +++ b/crates/iceberg/src/arrow/schema.rs @@ -821,6 +821,193 @@ get_parquet_stat_as_datum!(min); get_parquet_stat_as_datum!(max); +/// Utilities to deal with [arrow_array::builder] types in the Iceberg context. +pub(crate) mod builder { + use arrow_array::builder::*; + use arrow_array::cast::AsArray; + use arrow_array::types::*; + use arrow_array::{ArrayRef, Datum as ArrowDatum}; + use arrow_schema::{DataType, TimeUnit}; + use ordered_float::OrderedFloat; + + use crate::spec::{Literal, PrimitiveLiteral}; + use crate::{Error, ErrorKind}; + + /// A helper wrapping [ArrayBuilder] for building arrays without declaring the inner type at + /// compile-time when types are determined dynamically (e.g. based on some column type). + /// A [DataType] is given at construction time which is used to later downcast the inner array + /// and provided values. + pub(crate) struct AnyArrayBuilder { + data_type: DataType, + inner: Box, + } + + impl AnyArrayBuilder { + pub(crate) fn new(data_type: &DataType) -> Self { + Self { + data_type: data_type.clone(), + inner: make_builder(data_type, 0), + } + } + + pub(crate) fn finish(&mut self) -> ArrayRef { + self.inner.finish() + } + + /// Append an [[arrow_array::Datum]] value. + pub(crate) fn append_datum(&mut self, value: &dyn ArrowDatum) -> crate::Result<()> { + let (array, is_scalar) = value.get(); + assert!(is_scalar, "Can only append scalar datum"); + + match array.data_type() { + DataType::Boolean => self + .builder::()? + .append_value(array.as_boolean().value(0)), + DataType::Int32 => self + .builder::()? + .append_value(array.as_primitive::().value(0)), + DataType::Int64 => self + .builder::()? + .append_value(array.as_primitive::().value(0)), + DataType::Float32 => self + .builder::()? + .append_value(array.as_primitive::().value(0)), + DataType::Float64 => self + .builder::()? + .append_value(array.as_primitive::().value(0)), + DataType::Decimal128(_, _) => self + .builder::()? + .append_value(array.as_primitive::().value(0)), + DataType::Date32 => self + .builder::()? + .append_value(array.as_primitive::().value(0)), + DataType::Time64(TimeUnit::Microsecond) => self + .builder::()? + .append_value(array.as_primitive::().value(0)), + DataType::Timestamp(TimeUnit::Microsecond, _) => self + .builder::()? + .append_value(array.as_primitive::().value(0)), + DataType::Timestamp(TimeUnit::Nanosecond, _) => self + .builder::()? + .append_value(array.as_primitive::().value(0)), + DataType::Utf8 => self + .builder::()? + .append_value(array.as_string::().value(0)), + DataType::FixedSizeBinary(_) => self + .builder::()? + .append_value(array.as_fixed_size_binary().value(0)), + DataType::LargeBinary => self + .builder::()? + .append_value(array.as_binary::().value(0)), + _ => { + return Err(Error::new( + ErrorKind::FeatureUnsupported, + format!("Cannot append data type: {:?}", array.data_type(),), + )); + } + } + Ok(()) + } + + /// Append a literal with the provided [DataType]. We're not solely relying on the literal to + /// infer the type because [Literal] values do not specify the expected type of builder. E.g., + /// a [PrimitiveLiteral::Long] may go into an array builder for longs but also for timestamps. + pub(crate) fn append_literal(&mut self, value: &Literal) -> crate::Result<()> { + let Some(primitive) = value.as_primitive_literal() else { + return Err(Error::new( + ErrorKind::FeatureUnsupported, + "Expected primitive type", + )); + }; + + match (&self.data_type, primitive.clone()) { + (DataType::Boolean, PrimitiveLiteral::Boolean(value)) => { + self.builder::()?.append_value(value) + } + (DataType::Int32, PrimitiveLiteral::Int(value)) => { + self.builder::()?.append_value(value) + } + (DataType::Int64, PrimitiveLiteral::Long(value)) => { + self.builder::()?.append_value(value) + } + (DataType::Float32, PrimitiveLiteral::Float(OrderedFloat(value))) => { + self.builder::()?.append_value(value) + } + (DataType::Float64, PrimitiveLiteral::Double(OrderedFloat(value))) => { + self.builder::()?.append_value(value) + } + (DataType::Utf8, PrimitiveLiteral::String(value)) => { + self.builder::()?.append_value(value) + } + (DataType::FixedSizeBinary(_), PrimitiveLiteral::Binary(value)) => self + .builder::()? + .append_value(value)?, + (DataType::LargeBinary, PrimitiveLiteral::Binary(value)) => { + self.builder::()?.append_value(value) + } + (_, _) => { + return Err(Error::new( + ErrorKind::FeatureUnsupported, + format!( + "Builder of type {:?} does not accept literal {:?}", + self.data_type, primitive + ), + )); + } + } + + Ok(()) + } + + /// Append a null value for the provided [DataType]. + pub(crate) fn append_null(&mut self) -> crate::Result<()> { + match self.data_type { + DataType::Boolean => self.builder::()?.append_null(), + DataType::Int32 => self.builder::()?.append_null(), + DataType::Int64 => self.builder::()?.append_null(), + DataType::Float32 => self.builder::()?.append_null(), + DataType::Float64 => self.builder::()?.append_null(), + DataType::Decimal128(_, _) => self.builder::()?.append_null(), + DataType::Date32 => self.builder::()?.append_null(), + DataType::Time64(TimeUnit::Microsecond) => { + self.builder::()?.append_null() + } + DataType::Timestamp(TimeUnit::Microsecond, _) => { + self.builder::()?.append_null() + } + DataType::Timestamp(TimeUnit::Nanosecond, _) => { + self.builder::()?.append_null() + } + DataType::Utf8 => self.builder::()?.append_null(), + DataType::FixedSizeBinary(_) => { + self.builder::()?.append_null() + } + DataType::LargeBinary => self.builder::()?.append_null(), + _ => { + return Err(Error::new( + ErrorKind::FeatureUnsupported, + format!( + "Cannot append null values for data type: {:?}", + self.data_type + ), + )) + } + } + Ok(()) + } + + /// Cast the `inner` builder to a specific type or return [Error]. + fn builder(&mut self) -> crate::Result<&mut T> { + self.inner.as_any_mut().downcast_mut::().ok_or_else(|| { + Error::new( + ErrorKind::Unexpected, + "Failed to cast builder to expected type", + ) + }) + } + } +} + impl TryFrom<&ArrowSchema> for crate::spec::Schema { type Error = Error; diff --git a/crates/iceberg/src/inspect/entries.rs b/crates/iceberg/src/inspect/entries.rs new file mode 100644 index 000000000..ecb3c7770 --- /dev/null +++ b/crates/iceberg/src/inspect/entries.rs @@ -0,0 +1,688 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; +use std::sync::Arc; + +use arrow_array::builder::{ + BinaryBuilder, Int32Builder, Int64Builder, Int8Builder, ListBuilder, MapBuilder, StringBuilder, +}; +use arrow_array::{ArrayRef, RecordBatch, StructArray}; +use arrow_schema::{DataType, Field, FieldRef, Fields, Schema, SchemaRef as ArrowSchemaRef}; +use futures::StreamExt; +use itertools::Itertools; + +use crate::arrow::builder::AnyArrayBuilder; +use crate::arrow::{get_arrow_datum, schema_to_arrow_schema, type_to_arrow_type}; +use crate::io::FileIO; +use crate::scan::ArrowRecordBatchStream; +use crate::spec::{ + DataFile, ManifestFile, PartitionField, PartitionSpec, SchemaRef, Struct, TableMetadata, +}; +use crate::table::Table; +use crate::{Error, ErrorKind}; + +/// Entries table containing the entries of the current snapshot's manifest files. +/// +/// The table has one row for each manifest file entry in the current snapshot's manifest list file. +/// For reference, see the Java implementation of [`ManifestEntry`][1]. +/// +/// [1]: https://github.com/apache/iceberg/blob/apache-iceberg-1.7.1/core/src/main/java/org/apache/iceberg/ManifestEntry.java +pub struct EntriesTable<'a> { + table: &'a Table, +} + +impl<'a> EntriesTable<'a> { + /// Create a new Entries table instance. + pub fn new(table: &'a Table) -> Self { + Self { table } + } + + /// Get the schema for the manifest entries table. + pub fn schema(&self) -> Schema { + Schema::new(vec![ + Field::new("status", DataType::Int32, false), + Field::new("snapshot_id", DataType::Int64, true), + Field::new("sequence_number", DataType::Int64, true), + Field::new("file_sequence_number", DataType::Int64, true), + Field::new( + "data_file", + DataType::Struct(DataFileStructBuilder::fields(self.table.metadata())), + false, + ), + Field::new( + "readable_metrics", + DataType::Struct( + ReadableMetricsStructBuilder::fields(self.table.metadata().current_schema()) + .expect("Failed to build schema for readable metrics"), + ), + false, + ), + ]) + } + + /// Scan the manifest entries table. + pub async fn scan(&self) -> crate::Result { + let current_snapshot = self.table.metadata().current_snapshot().ok_or_else(|| { + Error::new( + ErrorKind::Unexpected, + "Cannot scan entries for table without current snapshot", + ) + })?; + + let manifest_list = current_snapshot + .load_manifest_list(self.table.file_io(), self.table.metadata()) + .await?; + + let arrow_schema = Arc::new(self.schema()); + let table_metadata = self.table.metadata_ref(); + let file_io = Arc::new(self.table.file_io().clone()); + + Ok(futures::stream::iter(manifest_list.entries().to_vec()) + .then(move |manifest_file| { + let arrow_schema = arrow_schema.clone(); + let table_metadata = table_metadata.clone(); + let file_io = file_io.clone(); + + async move { + Self::batch_for_manifest_file( + &arrow_schema, + &table_metadata, + &file_io, + &manifest_file, + ) + .await + } + }) + .boxed()) + } + + async fn batch_for_manifest_file( + arrow_schema: &ArrowSchemaRef, + table_metadata: &TableMetadata, + file_io: &FileIO, + manifest_file: &ManifestFile, + ) -> crate::Result { + let mut status = Int32Builder::new(); + let mut snapshot_id = Int64Builder::new(); + let mut sequence_number = Int64Builder::new(); + let mut file_sequence_number = Int64Builder::new(); + let mut data_file = DataFileStructBuilder::new(table_metadata); + let mut readable_metrics = + ReadableMetricsStructBuilder::new(table_metadata.current_schema())?; + + for manifest_entry in manifest_file.load_manifest(file_io).await?.entries() { + status.append_value(manifest_entry.status() as i32); + snapshot_id.append_option(manifest_entry.snapshot_id()); + sequence_number.append_option(manifest_entry.sequence_number()); + file_sequence_number.append_option(manifest_entry.file_sequence_number()); + data_file.append(manifest_file, manifest_entry.data_file())?; + readable_metrics.append(manifest_entry.data_file())?; + } + + Ok(RecordBatch::try_new(arrow_schema.clone(), vec![ + Arc::new(status.finish()), + Arc::new(snapshot_id.finish()), + Arc::new(sequence_number.finish()), + Arc::new(file_sequence_number.finish()), + Arc::new(data_file.finish()), + Arc::new(readable_metrics.finish()), + ])?) + } +} + +/// Builds the struct describing data files listed in a table manifest. +/// +/// For reference, see the Java implementation of [`DataFile`][1]. +/// +/// [1]: https://github.com/apache/iceberg/blob/apache-iceberg-1.7.1/api/src/main/java/org/apache/iceberg/DataFile.java +struct DataFileStructBuilder<'a> { + // Reference to table metadata to retrieve partition specs based on partition spec ids + table_metadata: &'a TableMetadata, + // Below are the field builders of the "data_file" struct + content: Int8Builder, + file_path: StringBuilder, + file_format: StringBuilder, + partition: PartitionValuesStructBuilder, + record_count: Int64Builder, + file_size_in_bytes: Int64Builder, + column_sizes: MapBuilder, + value_counts: MapBuilder, + null_value_counts: MapBuilder, + nan_value_counts: MapBuilder, + lower_bounds: MapBuilder, + upper_bounds: MapBuilder, + key_metadata: BinaryBuilder, + split_offsets: ListBuilder, + equality_ids: ListBuilder, + sort_order_ids: Int32Builder, +} + +impl<'a> DataFileStructBuilder<'a> { + fn new(table_metadata: &'a TableMetadata) -> Self { + Self { + table_metadata, + content: Int8Builder::new(), + file_path: StringBuilder::new(), + file_format: StringBuilder::new(), + partition: PartitionValuesStructBuilder::new(table_metadata), + record_count: Int64Builder::new(), + file_size_in_bytes: Int64Builder::new(), + column_sizes: MapBuilder::new(None, Int32Builder::new(), Int64Builder::new()), + value_counts: MapBuilder::new(None, Int32Builder::new(), Int64Builder::new()), + null_value_counts: MapBuilder::new(None, Int32Builder::new(), Int64Builder::new()), + nan_value_counts: MapBuilder::new(None, Int32Builder::new(), Int64Builder::new()), + lower_bounds: MapBuilder::new(None, Int32Builder::new(), BinaryBuilder::new()), + upper_bounds: MapBuilder::new(None, Int32Builder::new(), BinaryBuilder::new()), + key_metadata: BinaryBuilder::new(), + split_offsets: ListBuilder::new(Int64Builder::new()), + equality_ids: ListBuilder::new(Int32Builder::new()), + sort_order_ids: Int32Builder::new(), + } + } + + fn fields(table_metadata: &TableMetadata) -> Fields { + vec![ + Field::new("content", DataType::Int8, false), + Field::new("file_path", DataType::Utf8, false), + Field::new("file_format", DataType::Utf8, false), + Field::new( + "partition", + DataType::Struct(PartitionValuesStructBuilder::combined_partition_fields( + table_metadata, + )), + false, + ), + Field::new("record_count", DataType::Int64, false), + Field::new("file_size_in_bytes", DataType::Int64, false), + Field::new( + "column_sizes", + Self::column_id_to_value_type(DataType::Int64), + true, + ), + Field::new( + "value_counts", + Self::column_id_to_value_type(DataType::Int64), + true, + ), + Field::new( + "null_value_counts", + Self::column_id_to_value_type(DataType::Int64), + true, + ), + Field::new( + "nan_value_counts", + Self::column_id_to_value_type(DataType::Int64), + true, + ), + Field::new( + "lower_bounds", + Self::column_id_to_value_type(DataType::Binary), + true, + ), + Field::new( + "upper_bounds", + Self::column_id_to_value_type(DataType::Binary), + true, + ), + Field::new("key_metadata", DataType::Binary, true), + Field::new( + "split_offsets", + DataType::new_list(DataType::Int64, true), + true, + ), + Field::new( + "equality_ids", + DataType::new_list(DataType::Int32, true), + true, + ), + Field::new("sort_order_id", DataType::Int32, true), + ] + .into() + } + + /// Construct a new struct type that maps from column ids (i32) to the provided value type. + /// Keys, values, and the whole struct are non-nullable. + fn column_id_to_value_type(value_type: DataType) -> DataType { + DataType::Map( + Arc::new(Field::new( + "entries", + DataType::Struct( + vec![ + Field::new("keys", DataType::Int32, false), + Field::new("values", value_type, true), + ] + .into(), + ), + false, + )), + false, + ) + } + + fn append(&mut self, manifest_file: &ManifestFile, data_file: &DataFile) -> crate::Result<()> { + self.content.append_value(data_file.content as i8); + self.file_path.append_value(data_file.file_path()); + self.file_format + .append_value(data_file.file_format().to_string().to_uppercase()); + self.partition.append( + self.partition_spec(manifest_file)?.clone().fields(), + data_file.partition(), + )?; + self.record_count + .append_value(data_file.record_count() as i64); + self.file_size_in_bytes + .append_value(data_file.file_size_in_bytes() as i64); + + // Sort keys to get matching order between rows + for (k, v) in data_file.column_sizes.iter().sorted_by_key(|(k, _)| *k) { + self.column_sizes.keys().append_value(*k); + self.column_sizes.values().append_value(*v as i64); + } + self.column_sizes.append(true)?; + + for (k, v) in data_file.value_counts.iter().sorted_by_key(|(k, _)| *k) { + self.value_counts.keys().append_value(*k); + self.value_counts.values().append_value(*v as i64); + } + self.value_counts.append(true)?; + + for (k, v) in data_file + .null_value_counts + .iter() + .sorted_by_key(|(k, _)| *k) + { + self.null_value_counts.keys().append_value(*k); + self.null_value_counts.values().append_value(*v as i64); + } + self.null_value_counts.append(true)?; + + for (k, v) in data_file.nan_value_counts.iter().sorted_by_key(|(k, _)| *k) { + self.nan_value_counts.keys().append_value(*k); + self.nan_value_counts.values().append_value(*v as i64); + } + self.nan_value_counts.append(true)?; + + for (k, v) in data_file.lower_bounds.iter().sorted_by_key(|(k, _)| *k) { + self.lower_bounds.keys().append_value(*k); + self.lower_bounds.values().append_value(v.to_bytes()?); + } + self.lower_bounds.append(true)?; + + for (k, v) in data_file.upper_bounds.iter().sorted_by_key(|(k, _)| *k) { + self.upper_bounds.keys().append_value(*k); + self.upper_bounds.values().append_value(v.to_bytes()?); + } + self.upper_bounds.append(true)?; + + self.key_metadata.append_option(data_file.key_metadata()); + + self.split_offsets + .values() + .append_slice(data_file.split_offsets()); + self.split_offsets.append(true); + + self.equality_ids + .values() + .append_slice(data_file.equality_ids()); + self.equality_ids.append(true); + + self.sort_order_ids.append_option(data_file.sort_order_id()); + Ok(()) + } + + fn partition_spec(&self, manifest_file: &ManifestFile) -> crate::Result<&PartitionSpec> { + self.table_metadata + .partition_spec_by_id(manifest_file.partition_spec_id) + .ok_or_else(|| { + Error::new( + ErrorKind::Unexpected, + "Partition spec not found for manifest file", + ) + }) + .map(|spec| spec.as_ref()) + } + + fn finish(&mut self) -> StructArray { + let inner_arrays: Vec = vec![ + Arc::new(self.content.finish()), + Arc::new(self.file_path.finish()), + Arc::new(self.file_format.finish()), + Arc::new(self.partition.finish()), + Arc::new(self.record_count.finish()), + Arc::new(self.file_size_in_bytes.finish()), + Arc::new(self.column_sizes.finish()), + Arc::new(self.value_counts.finish()), + Arc::new(self.null_value_counts.finish()), + Arc::new(self.nan_value_counts.finish()), + Arc::new(self.lower_bounds.finish()), + Arc::new(self.upper_bounds.finish()), + Arc::new(self.key_metadata.finish()), + Arc::new(self.split_offsets.finish()), + Arc::new(self.equality_ids.finish()), + Arc::new(self.sort_order_ids.finish()), + ]; + + StructArray::from( + Self::fields(self.table_metadata) + .into_iter() + .cloned() + .zip_eq(inner_arrays) + .collect::>(), + ) + } +} + +/// Builds a readable metrics struct for a single column. +/// +/// For reference, see [Java][1] and [Python][2] implementations. +/// +/// [1]: https://github.com/apache/iceberg/blob/4a432839233f2343a9eae8255532f911f06358ef/core/src/main/java/org/apache/iceberg/MetricsUtil.java#L337 +/// [2]: https://github.com/apache/iceberg-python/blob/a051584a3684392d2db6556449eb299145d47d15/pyiceberg/table/inspect.py#L101-L110 +struct PerColumnReadableMetricsBuilder { + field_id: i32, + data_type: DataType, + column_size: Int64Builder, + value_count: Int64Builder, + null_value_count: Int64Builder, + nan_value_count: Int64Builder, + lower_bound: AnyArrayBuilder, + upper_bound: AnyArrayBuilder, +} + +impl PerColumnReadableMetricsBuilder { + fn fields(data_type: &DataType) -> Fields { + vec![ + Field::new("column_size", DataType::Int64, true), + Field::new("value_count", DataType::Int64, true), + Field::new("null_value_count", DataType::Int64, true), + Field::new("nan_value_count", DataType::Int64, true), + Field::new("lower_bound", data_type.clone(), true), + Field::new("upper_bound", data_type.clone(), true), + ] + .into() + } + + fn new_for_field(field_id: i32, data_type: &DataType) -> Self { + Self { + field_id, + data_type: data_type.clone(), + column_size: Int64Builder::new(), + value_count: Int64Builder::new(), + null_value_count: Int64Builder::new(), + nan_value_count: Int64Builder::new(), + lower_bound: AnyArrayBuilder::new(data_type), + upper_bound: AnyArrayBuilder::new(data_type), + } + } + + fn append(&mut self, data_file: &DataFile) -> crate::Result<()> { + self.column_size.append_option( + data_file + .column_sizes() + .get(&self.field_id) + .map(|&v| v as i64), + ); + self.value_count.append_option( + data_file + .value_counts() + .get(&self.field_id) + .map(|&v| v as i64), + ); + self.null_value_count.append_option( + data_file + .null_value_counts() + .get(&self.field_id) + .map(|&v| v as i64), + ); + self.nan_value_count.append_option( + data_file + .nan_value_counts() + .get(&self.field_id) + .map(|&v| v as i64), + ); + match data_file.lower_bounds().get(&self.field_id) { + Some(datum) => self + .lower_bound + .append_datum(get_arrow_datum(datum)?.as_ref())?, + None => self.lower_bound.append_null()?, + } + match data_file.upper_bounds().get(&self.field_id) { + Some(datum) => self + .upper_bound + .append_datum(get_arrow_datum(datum)?.as_ref())?, + None => self.upper_bound.append_null()?, + } + Ok(()) + } + + fn finish(&mut self) -> StructArray { + let inner_arrays: Vec = vec![ + Arc::new(self.column_size.finish()), + Arc::new(self.value_count.finish()), + Arc::new(self.null_value_count.finish()), + Arc::new(self.nan_value_count.finish()), + Arc::new(self.lower_bound.finish()), + Arc::new(self.upper_bound.finish()), + ]; + + StructArray::from( + Self::fields(&self.data_type) + .into_iter() + .cloned() + .zip_eq(inner_arrays) + .collect::>(), + ) + } +} + +/// Build a [StructArray] with partition columns as fields and partition values as rows. +struct PartitionValuesStructBuilder { + fields: Fields, + builders: Vec, +} + +impl PartitionValuesStructBuilder { + /// Construct a new builder from the combined partition columns of the table metadata. + fn new(table_metadata: &TableMetadata) -> Self { + let combined_fields = Self::combined_partition_fields(table_metadata); + Self { + builders: combined_fields + .iter() + .map(|field| AnyArrayBuilder::new(field.data_type())) + .collect_vec(), + fields: combined_fields, + } + } + + /// Build the combined partition spec union-ing past and current partition specs + fn combined_partition_fields(table_metadata: &TableMetadata) -> Fields { + let combined_fields: HashMap = table_metadata + .partition_specs_iter() + .flat_map(|spec| spec.fields()) + .map(|field| (field.field_id, field)) + .collect(); + + combined_fields + .into_iter() + // Sort by field id to get a deterministic order + .sorted_by_key(|(id, _)| *id) + .map(|(_, field)| { + let source_type = &table_metadata + .current_schema() + .field_by_id(field.source_id) + .unwrap() + .field_type; + let result_type = field.transform.result_type(source_type).unwrap(); + Field::new( + field.name.clone(), + type_to_arrow_type(&result_type).unwrap(), + true, + ) + }) + .collect() + } + + fn append( + &mut self, + partition_fields: &[PartitionField], + partition_values: &Struct, + ) -> crate::Result<()> { + for (field, value) in partition_fields.iter().zip_eq(partition_values.iter()) { + let index = self.find_field(&field.name)?; + + match value { + Some(literal) => self.builders[index].append_literal(literal)?, + None => self.builders[index].append_null()?, + } + } + Ok(()) + } + + fn finish(&mut self) -> StructArray { + let arrays: Vec = self + .builders + .iter_mut() + .map::(|builder| Arc::new(builder.finish())) + .collect(); + StructArray::from( + self.fields + .iter() + .cloned() + .zip_eq(arrays) + .collect::>(), + ) + } + + fn find_field(&self, name: &str) -> crate::Result { + match self.fields.find(name) { + Some((index, _)) => Ok(index), + None => Err(Error::new( + ErrorKind::Unexpected, + format!("Field not found: {}", name), + )), + } + } +} + +struct ReadableMetricsStructBuilder<'a> { + table_schema: &'a SchemaRef, + column_builders: Vec, +} + +impl<'a> ReadableMetricsStructBuilder<'a> { + /// Helper to construct per-column readable metrics. The metrics are "readable" in that the reported + /// and lower and upper bounds are reported as deserialized values. + fn fields(table_schema: &SchemaRef) -> crate::Result { + let arrow_schema = schema_to_arrow_schema(table_schema)?; + + Ok(arrow_schema + .fields() + .iter() + .map(|field| { + Field::new( + field.name(), + DataType::Struct(PerColumnReadableMetricsBuilder::fields(field.data_type())), + false, + ) + }) + .collect_vec() + .into()) + } + + fn new(table_schema: &'a SchemaRef) -> crate::Result { + Ok(Self { + table_schema, + column_builders: table_schema + .as_struct() + .fields() + .iter() + .map(|field| { + type_to_arrow_type(&field.field_type).map(|arrow_type| { + PerColumnReadableMetricsBuilder::new_for_field(field.id, &arrow_type) + }) + }) + .collect::>>()?, + }) + } + + fn append(&mut self, data_file: &DataFile) -> crate::Result<()> { + for column_builder in &mut self.column_builders { + column_builder.append(data_file)?; + } + Ok(()) + } + + fn finish(&mut self) -> StructArray { + let fields: Vec = Self::fields(self.table_schema) + // We already checked the schema conversion in the constructor + .unwrap() + .into_iter() + .cloned() + .collect(); + let arrays: Vec = self + .column_builders + .iter_mut() + .map::(|builder| Arc::new(builder.finish())) + .collect(); + StructArray::from( + fields + .into_iter() + .zip_eq(arrays) + .collect::>(), + ) + } +} + +#[cfg(test)] +mod tests { + use expect_test::expect; + + use crate::inspect::metadata_table::tests::check_record_batches; + use crate::scan::tests::TableTestFixture; + + #[tokio::test] + async fn test_entries_table() { + let mut fixture = TableTestFixture::new(); + fixture.setup_manifest_files().await; + let table = fixture.table; + + let batch_stream = table.inspect().entries().scan().await.unwrap(); + + check_record_batches( + batch_stream, + expect![[r#" + Field { name: "status", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, + Field { name: "snapshot_id", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, + Field { name: "sequence_number", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, + Field { name: "file_sequence_number", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, + Field { name: "data_file", data_type: Struct([Field { name: "content", data_type: Int8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "file_path", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "file_format", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "partition", data_type: Struct([Field { name: "x", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "record_count", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "file_size_in_bytes", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "column_sizes", data_type: Map(Field { name: "entries", data_type: Struct([Field { name: "keys", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "values", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, false), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "value_counts", data_type: Map(Field { name: "entries", data_type: Struct([Field { name: "keys", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "values", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, false), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "null_value_counts", data_type: Map(Field { name: "entries", data_type: Struct([Field { name: "keys", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "values", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, false), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "nan_value_counts", data_type: Map(Field { name: "entries", data_type: Struct([Field { name: "keys", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "values", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, false), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "lower_bounds", data_type: Map(Field { name: "entries", data_type: Struct([Field { name: "keys", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "values", data_type: Binary, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, false), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "upper_bounds", data_type: Map(Field { name: "entries", data_type: Struct([Field { name: "keys", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "values", data_type: Binary, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, false), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "key_metadata", data_type: Binary, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "split_offsets", data_type: List(Field { name: "item", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "equality_ids", data_type: List(Field { name: "item", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "sort_order_id", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, + Field { name: "readable_metrics", data_type: Struct([Field { name: "x", data_type: Struct([Field { name: "column_size", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "null_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "nan_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "lower_bound", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "upper_bound", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "y", data_type: Struct([Field { name: "column_size", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "null_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "nan_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "lower_bound", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "upper_bound", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "z", data_type: Struct([Field { name: "column_size", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "null_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "nan_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "lower_bound", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "upper_bound", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "a", data_type: Struct([Field { name: "column_size", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "null_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "nan_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "lower_bound", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "upper_bound", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "dbl", data_type: Struct([Field { name: "column_size", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "null_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "nan_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "lower_bound", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "upper_bound", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "i32", data_type: Struct([Field { name: "column_size", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "null_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "nan_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "lower_bound", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "upper_bound", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "i64", data_type: Struct([Field { name: "column_size", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "null_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "nan_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "lower_bound", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "upper_bound", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "bool", data_type: Struct([Field { name: "column_size", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "null_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "nan_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "lower_bound", data_type: Boolean, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "upper_bound", data_type: Boolean, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "float", data_type: Struct([Field { name: "column_size", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "null_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "nan_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "lower_bound", data_type: Float32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "upper_bound", data_type: Float32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "decimal", data_type: Struct([Field { name: "column_size", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "null_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "nan_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "lower_bound", data_type: Decimal128(3, 2), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "upper_bound", data_type: Decimal128(3, 2), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "date", data_type: Struct([Field { name: "column_size", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "null_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "nan_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "lower_bound", data_type: Date32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "upper_bound", data_type: Date32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "timestamp", data_type: Struct([Field { name: "column_size", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "null_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "nan_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "lower_bound", data_type: Timestamp(Microsecond, None), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "upper_bound", data_type: Timestamp(Microsecond, None), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "timestamptz", data_type: Struct([Field { name: "column_size", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "null_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "nan_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "lower_bound", data_type: Timestamp(Microsecond, Some("+00:00")), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "upper_bound", data_type: Timestamp(Microsecond, Some("+00:00")), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "timestampns", data_type: Struct([Field { name: "column_size", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "null_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "nan_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "lower_bound", data_type: Timestamp(Nanosecond, None), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "upper_bound", data_type: Timestamp(Nanosecond, None), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "timestamptzns", data_type: Struct([Field { name: "column_size", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "null_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "nan_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "lower_bound", data_type: Timestamp(Nanosecond, Some("+00:00")), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "upper_bound", data_type: Timestamp(Nanosecond, Some("+00:00")), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "binary", data_type: Struct([Field { name: "column_size", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "null_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "nan_value_count", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "lower_bound", data_type: LargeBinary, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "upper_bound", data_type: LargeBinary, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }"#]], + expect![[r#" + +--------+---------------------+-----------------+----------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ + | status | snapshot_id | sequence_number | file_sequence_number | data_file | readable_metrics | + +--------+---------------------+-----------------+----------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ + | 1 | 3055729675574597004 | 1 | 1 | {content: 0, file_format: PARQUET, partition: {x: 100}, record_count: 1, file_size_in_bytes: 100, column_sizes: {1: 1, 2: 1}, value_counts: {1: 2, 2: 2}, null_value_counts: {1: 3, 2: 3}, nan_value_counts: {1: 4, 2: 4}, lower_bounds: {1: 0100000000000000, 2: 0200000000000000, 3: 0300000000000000, 4: 417061636865, 5: 0000000000005940, 6: 64000000, 7: 6400000000000000, 8: 00, 9: 0000c842, 11: 00000000, 12: 0000000000000000, 13: 0000000000000000}, upper_bounds: {1: 0100000000000000, 2: 0500000000000000, 3: 0400000000000000, 4: 49636562657267, 5: 0000000000006940, 6: c8000000, 7: c800000000000000, 8: 01, 9: 00004843, 11: 00000000, 12: 0000000000000000, 13: 0000000000000000}, key_metadata: , split_offsets: [], equality_ids: [], sort_order_id: } | {x: {column_size: 1, value_count: 2, null_value_count: 3, nan_value_count: 4, lower_bound: 1, upper_bound: 1}, y: {column_size: 1, value_count: 2, null_value_count: 3, nan_value_count: 4, lower_bound: 2, upper_bound: 5}, z: {column_size: , value_count: , null_value_count: , nan_value_count: , lower_bound: 3, upper_bound: 4}, a: {column_size: , value_count: , null_value_count: , nan_value_count: , lower_bound: Apache, upper_bound: Iceberg}, dbl: {column_size: , value_count: , null_value_count: , nan_value_count: , lower_bound: 100.0, upper_bound: 200.0}, i32: {column_size: , value_count: , null_value_count: , nan_value_count: , lower_bound: 100, upper_bound: 200}, i64: {column_size: , value_count: , null_value_count: , nan_value_count: , lower_bound: 100, upper_bound: 200}, bool: {column_size: , value_count: , null_value_count: , nan_value_count: , lower_bound: false, upper_bound: true}, float: {column_size: , value_count: , null_value_count: , nan_value_count: , lower_bound: 100.0, upper_bound: 200.0}, decimal: {column_size: , value_count: , null_value_count: , nan_value_count: , lower_bound: , upper_bound: }, date: {column_size: , value_count: , null_value_count: , nan_value_count: , lower_bound: 1970-01-01, upper_bound: 1970-01-01}, timestamp: {column_size: , value_count: , null_value_count: , nan_value_count: , lower_bound: 1970-01-01T00:00:00, upper_bound: 1970-01-01T00:00:00}, timestamptz: {column_size: , value_count: , null_value_count: , nan_value_count: , lower_bound: 1970-01-01T00:00:00Z, upper_bound: 1970-01-01T00:00:00Z}, timestampns: {column_size: , value_count: , null_value_count: , nan_value_count: , lower_bound: , upper_bound: }, timestamptzns: {column_size: , value_count: , null_value_count: , nan_value_count: , lower_bound: , upper_bound: }, binary: {column_size: , value_count: , null_value_count: , nan_value_count: , lower_bound: , upper_bound: }} | + | 2 | 3051729675574597004 | 0 | 0 | {content: 0, file_format: PARQUET, partition: {x: 200}, record_count: 1, file_size_in_bytes: 100, column_sizes: {}, value_counts: {}, null_value_counts: {}, nan_value_counts: {}, lower_bounds: {}, upper_bounds: {}, key_metadata: , split_offsets: [], equality_ids: [], sort_order_id: } | {x: {column_size: , value_count: , null_value_count: , nan_value_count: , lower_bound: , upper_bound: }, y: {column_size: , value_count: , null_value_count: , nan_value_count: , lower_bound: , upper_bound: }, z: {column_size: , value_count: , null_value_count: , nan_value_count: , lower_bound: , upper_bound: }, a: {column_size: , value_count: , null_value_count: , nan_value_count: , lower_bound: , upper_bound: }, dbl: {column_size: , value_count: , null_value_count: , nan_value_count: , lower_bound: , upper_bound: }, i32: {column_size: , value_count: , null_value_count: , nan_value_count: , lower_bound: , upper_bound: }, i64: {column_size: , value_count: , null_value_count: , nan_value_count: , lower_bound: , upper_bound: }, bool: {column_size: , value_count: , null_value_count: , nan_value_count: , lower_bound: , upper_bound: }, float: {column_size: , value_count: , null_value_count: , nan_value_count: , lower_bound: , upper_bound: }, decimal: {column_size: , value_count: , null_value_count: , nan_value_count: , lower_bound: , upper_bound: }, date: {column_size: , value_count: , null_value_count: , nan_value_count: , lower_bound: , upper_bound: }, timestamp: {column_size: , value_count: , null_value_count: , nan_value_count: , lower_bound: , upper_bound: }, timestamptz: {column_size: , value_count: , null_value_count: , nan_value_count: , lower_bound: , upper_bound: }, timestampns: {column_size: , value_count: , null_value_count: , nan_value_count: , lower_bound: , upper_bound: }, timestamptzns: {column_size: , value_count: , null_value_count: , nan_value_count: , lower_bound: , upper_bound: }, binary: {column_size: , value_count: , null_value_count: , nan_value_count: , lower_bound: , upper_bound: }} | + | 0 | 3051729675574597004 | 0 | 0 | {content: 0, file_format: PARQUET, partition: {x: 300}, record_count: 1, file_size_in_bytes: 100, column_sizes: {}, value_counts: {}, null_value_counts: {}, nan_value_counts: {}, lower_bounds: {}, upper_bounds: {}, key_metadata: , split_offsets: [], equality_ids: [], sort_order_id: } | {x: {column_size: , value_count: , null_value_count: , nan_value_count: , lower_bound: , upper_bound: }, y: {column_size: , value_count: , null_value_count: , nan_value_count: , lower_bound: , upper_bound: }, z: {column_size: , value_count: , null_value_count: , nan_value_count: , lower_bound: , upper_bound: }, a: {column_size: , value_count: , null_value_count: , nan_value_count: , lower_bound: , upper_bound: }, dbl: {column_size: , value_count: , null_value_count: , nan_value_count: , lower_bound: , upper_bound: }, i32: {column_size: , value_count: , null_value_count: , nan_value_count: , lower_bound: , upper_bound: }, i64: {column_size: , value_count: , null_value_count: , nan_value_count: , lower_bound: , upper_bound: }, bool: {column_size: , value_count: , null_value_count: , nan_value_count: , lower_bound: , upper_bound: }, float: {column_size: , value_count: , null_value_count: , nan_value_count: , lower_bound: , upper_bound: }, decimal: {column_size: , value_count: , null_value_count: , nan_value_count: , lower_bound: , upper_bound: }, date: {column_size: , value_count: , null_value_count: , nan_value_count: , lower_bound: , upper_bound: }, timestamp: {column_size: , value_count: , null_value_count: , nan_value_count: , lower_bound: , upper_bound: }, timestamptz: {column_size: , value_count: , null_value_count: , nan_value_count: , lower_bound: , upper_bound: }, timestampns: {column_size: , value_count: , null_value_count: , nan_value_count: , lower_bound: , upper_bound: }, timestamptzns: {column_size: , value_count: , null_value_count: , nan_value_count: , lower_bound: , upper_bound: }, binary: {column_size: , value_count: , null_value_count: , nan_value_count: , lower_bound: , upper_bound: }} | + +--------+---------------------+-----------------+----------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+"#]], + &[], + &["file_path"], + None, + ).await; + } +} diff --git a/crates/iceberg/src/inspect/manifests.rs b/crates/iceberg/src/inspect/manifests.rs index ab63d2f6e..b0809e290 100644 --- a/crates/iceberg/src/inspect/manifests.rs +++ b/crates/iceberg/src/inspect/manifests.rs @@ -193,75 +193,13 @@ mod tests { Field { name: "deleted_delete_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "partition_summaries", data_type: List(Field { name: "item", data_type: Struct([Field { name: "contains_null", data_type: Boolean, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "contains_nan", data_type: Boolean, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "lower_bound", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "upper_bound", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }"#]], expect![[r#" - content: PrimitiveArray - [ - 0, - ], - path: (skipped), - length: (skipped), - partition_spec_id: PrimitiveArray - [ - 0, - ], - added_snapshot_id: PrimitiveArray - [ - 3055729675574597004, - ], - added_data_files_count: PrimitiveArray - [ - 1, - ], - existing_data_files_count: PrimitiveArray - [ - 1, - ], - deleted_data_files_count: PrimitiveArray - [ - 1, - ], - added_delete_files_count: PrimitiveArray - [ - 1, - ], - existing_delete_files_count: PrimitiveArray - [ - 1, - ], - deleted_delete_files_count: PrimitiveArray - [ - 1, - ], - partition_summaries: ListArray - [ - StructArray - -- validity: - [ - valid, - ] - [ - -- child 0: "contains_null" (Boolean) - BooleanArray - [ - false, - ] - -- child 1: "contains_nan" (Boolean) - BooleanArray - [ - false, - ] - -- child 2: "lower_bound" (Utf8) - StringArray - [ - "100", - ] - -- child 3: "upper_bound" (Utf8) - StringArray - [ - "300", - ] - ], - ]"#]], + +---------+-------------------+---------------------+------------------------+---------------------------+--------------------------+--------------------------+-----------------------------+----------------------------+-----------------------------------------------------------------------------------+ + | content | partition_spec_id | added_snapshot_id | added_data_files_count | existing_data_files_count | deleted_data_files_count | added_delete_files_count | existing_delete_files_count | deleted_delete_files_count | partition_summaries | + +---------+-------------------+---------------------+------------------------+---------------------------+--------------------------+--------------------------+-----------------------------+----------------------------+-----------------------------------------------------------------------------------+ + | 0 | 0 | 3055729675574597004 | 1 | 1 | 1 | 1 | 1 | 1 | [{contains_null: false, contains_nan: false, lower_bound: 100, upper_bound: 300}] | + +---------+-------------------+---------------------+------------------------+---------------------------+--------------------------+--------------------------+-----------------------------+----------------------------+-----------------------------------------------------------------------------------+"#]], &["path", "length"], + &[], Some("path"), ).await; } diff --git a/crates/iceberg/src/inspect/metadata_table.rs b/crates/iceberg/src/inspect/metadata_table.rs index 75dbc7472..6b74f1e19 100644 --- a/crates/iceberg/src/inspect/metadata_table.rs +++ b/crates/iceberg/src/inspect/metadata_table.rs @@ -16,6 +16,7 @@ // under the License. use super::{ManifestsTable, SnapshotsTable}; +use crate::inspect::entries::EntriesTable; use crate::table::Table; /// Metadata table is used to inspect a table's history, snapshots, and other metadata as a table. @@ -33,6 +34,11 @@ impl<'a> MetadataTable<'a> { Self(table) } + /// Returns the current manifest file's entries. + pub fn entries(&self) -> EntriesTable { + EntriesTable::new(self.0) + } + /// Get the snapshots table. pub fn snapshots(&self) -> SnapshotsTable { SnapshotsTable::new(self.0) @@ -46,9 +52,14 @@ impl<'a> MetadataTable<'a> { #[cfg(test)] pub mod tests { + use std::sync::Arc; + use expect_test::Expect; use futures::TryStreamExt; use itertools::Itertools; + use arrow_array::{ArrayRef, RecordBatch, StructArray}; + use arrow_cast::pretty::pretty_format_batches; + use arrow_schema::{DataType, Field, FieldRef, Schema}; use crate::scan::ArrowRecordBatchStream; @@ -59,12 +70,14 @@ pub mod tests { /// or use rust-analyzer (see [video](https://github.com/rust-analyzer/expect-test)). /// Check the doc of [`expect_test`] for more details. /// - `ignore_check_columns`: Some columns are not stable, so we can skip them. + /// - `ignore_check_struct_fields`: Same as `ignore_check_columns` but for (top-level) struct fields. /// - `sort_column`: The order of the data might be non-deterministic, so we can sort it by a column. pub async fn check_record_batches( batch_stream: ArrowRecordBatchStream, expected_schema: Expect, expected_data: Expect, ignore_check_columns: &[&str], + ignore_check_struct_fields: &[&str], sort_column: Option<&str>, ) { let record_batches = batch_stream.try_collect::>().await.unwrap(); @@ -85,25 +98,46 @@ pub mod tests { .collect_vec(); } + // Filter columns + let (fields, columns): (Vec<_>, Vec<_>) = record_batch + .schema() + .fields + .iter() + .zip_eq(columns) + // Filter ignored columns + .filter(|(field, _)| !ignore_check_columns.contains(&field.name().as_str())) + // For struct fields, filter ignored struct fields + .map(|(field, column)| match field.data_type() { + DataType::Struct(fields) => { + let struct_array = column.as_any().downcast_ref::().unwrap(); + let filtered: Vec<(FieldRef, ArrayRef)> = fields + .iter() + .zip_eq(struct_array.columns().iter()) + .filter(|(f, _)| !ignore_check_struct_fields.contains(&f.name().as_str())) + .map(|(f, c)| (f.clone(), c.clone())) + .collect_vec(); + let filtered_struct_type: DataType = DataType::Struct( + filtered.iter().map(|(f, _)| f.clone()).collect_vec().into(), + ); + ( + Field::new(field.name(), filtered_struct_type, field.is_nullable()).into(), + Arc::new(StructArray::from(filtered)) as ArrayRef, + ) + } + _ => (field.clone(), column), + }) + .unzip(); + expected_schema.assert_eq(&format!( "{}", record_batch.schema().fields().iter().format(",\n") )); - expected_data.assert_eq(&format!( - "{}", - record_batch - .schema() - .fields() - .iter() - .zip_eq(columns) - .map(|(field, column)| { - if ignore_check_columns.contains(&field.name().as_str()) { - format!("{}: (skipped)", field.name()) - } else { - format!("{}: {:?}", field.name(), column) - } - }) - .format(",\n") - )); + expected_data.assert_eq( + &pretty_format_batches(&[ + RecordBatch::try_new(Arc::new(Schema::new(fields)), columns).unwrap() + ]) + .unwrap() + .to_string(), + ); } } diff --git a/crates/iceberg/src/inspect/mod.rs b/crates/iceberg/src/inspect/mod.rs index b64420ea1..02a57e664 100644 --- a/crates/iceberg/src/inspect/mod.rs +++ b/crates/iceberg/src/inspect/mod.rs @@ -17,10 +17,12 @@ //! Metadata table APIs. +mod entries; mod manifests; mod metadata_table; mod snapshots; +pub use entries::EntriesTable; pub use manifests::ManifestsTable; pub use metadata_table::*; pub use snapshots::SnapshotsTable; diff --git a/crates/iceberg/src/inspect/snapshots.rs b/crates/iceberg/src/inspect/snapshots.rs index 1ee89963d..f5703b043 100644 --- a/crates/iceberg/src/inspect/snapshots.rs +++ b/crates/iceberg/src/inspect/snapshots.rs @@ -130,59 +130,14 @@ mod tests { Field { name: "manifest_list", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "summary", data_type: Map(Field { name: "entries", data_type: Struct([Field { name: "keys", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "values", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, false), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }"#]], expect![[r#" - committed_at: PrimitiveArray - [ - 2018-01-04T21:22:35.770+00:00, - 2019-04-12T20:29:15.770+00:00, - ], - snapshot_id: PrimitiveArray - [ - 3051729675574597004, - 3055729675574597004, - ], - parent_id: PrimitiveArray - [ - null, - 3051729675574597004, - ], - operation: StringArray - [ - "append", - "append", - ], - manifest_list: (skipped), - summary: MapArray - [ - StructArray - -- validity: - [ - ] - [ - -- child 0: "keys" (Utf8) - StringArray - [ - ] - -- child 1: "values" (Utf8) - StringArray - [ - ] - ], - StructArray - -- validity: - [ - ] - [ - -- child 0: "keys" (Utf8) - StringArray - [ - ] - -- child 1: "values" (Utf8) - StringArray - [ - ] - ], - ]"#]], + +--------------------------+---------------------+---------------------+-----------+---------+ + | committed_at | snapshot_id | parent_id | operation | summary | + +--------------------------+---------------------+---------------------+-----------+---------+ + | 2018-01-04T21:22:35.770Z | 3051729675574597004 | | append | {} | + | 2019-04-12T20:29:15.770Z | 3055729675574597004 | 3051729675574597004 | append | {} | + +--------------------------+---------------------+---------------------+-----------+---------+"#]], &["manifest_list"], + &[], Some("committed_at"), ).await; } diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index 5a97e74e7..380424cad 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -968,8 +968,11 @@ pub mod tests { use std::sync::Arc; use arrow_array::{ - ArrayRef, BooleanArray, Float64Array, Int32Array, Int64Array, RecordBatch, StringArray, + ArrayRef, BooleanArray, Date32Array, Decimal128Array, Float32Array, Float64Array, + Int32Array, Int64Array, LargeBinaryArray, RecordBatch, StringArray, + TimestampMicrosecondArray, TimestampNanosecondArray, }; + use arrow_schema::{DataType, TimeUnit}; use futures::{stream, TryStreamExt}; use parquet::arrow::{ArrowWriter, PARQUET_FIELD_ID_META_KEY}; use parquet::basic::Compression; @@ -1084,6 +1087,50 @@ pub mod tests { .record_count(1) .partition(Struct::from_iter([Some(Literal::long(100))])) .key_metadata(None) + // Note: + // The bounds below need to agree with the test data written below + // into the Parquet file. If not, tests that rely on filter scans + // fail because of wrong bounds. + .lower_bounds(HashMap::from([ + (1, Datum::long(1)), + (2, Datum::long(2)), + (3, Datum::long(3)), + (4, Datum::string("Apache")), + (5, Datum::double(100)), + (6, Datum::int(100)), + (7, Datum::long(100)), + (8, Datum::bool(false)), + (9, Datum::float(100.0)), + // decimal values are not supported by schema::get_arrow_datum + // (10, Datum::decimal(Decimal(123, 2))), + (11, Datum::date(0)), + (12, Datum::timestamp_micros(0)), + (13, Datum::timestamptz_micros(0)), + // ns timestamps, uuid, fixed, binary are currently not + // supported in schema::get_arrow_datum + ])) + .upper_bounds(HashMap::from([ + (1, Datum::long(1)), + (2, Datum::long(5)), + (3, Datum::long(4)), + (4, Datum::string("Iceberg")), + (5, Datum::double(200)), + (6, Datum::int(200)), + (7, Datum::long(200)), + (8, Datum::bool(true)), + (9, Datum::float(200.0)), + // decimal values are not supported by schema::get_arrow_datum + // (10, Datum::decimal(Decimal(123, 2))), + (11, Datum::date(0)), + (12, Datum::timestamp_micros(0)), + (13, Datum::timestamptz_micros(0)), + // ns timestamps, uuid, fixed, binary are currently not + // supported in schema::get_arrow_datum + ])) + .column_sizes(HashMap::from([(1, 1u64), (2, 1u64)])) + .value_counts(HashMap::from([(1, 2u64), (2, 2u64)])) + .null_value_counts(HashMap::from([(1, 3u64), (2, 3u64)])) + .nan_value_counts(HashMap::from([(1, 4u64), (2, 4u64)])) .build() .unwrap(), ) @@ -1185,6 +1232,69 @@ pub mod tests { PARQUET_FIELD_ID_META_KEY.to_string(), "8".to_string(), )])), + arrow_schema::Field::new("float", arrow_schema::DataType::Float32, false) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "9".to_string(), + )])), + arrow_schema::Field::new( + "decimal", + arrow_schema::DataType::Decimal128(3, 2), + false, + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "10".to_string(), + )])), + arrow_schema::Field::new("date", arrow_schema::DataType::Date32, false) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "11".to_string(), + )])), + arrow_schema::Field::new( + "timestamp", + arrow_schema::DataType::Timestamp(TimeUnit::Microsecond, None), + false, + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "12".to_string(), + )])), + arrow_schema::Field::new( + "timestamptz", + arrow_schema::DataType::Timestamp( + TimeUnit::Microsecond, + Some("UTC".into()), + ), + false, + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "13".to_string(), + )])), + arrow_schema::Field::new( + "timestampns", + arrow_schema::DataType::Timestamp(TimeUnit::Nanosecond, None), + false, + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "14".to_string(), + )])), + arrow_schema::Field::new( + "timestamptzns", + arrow_schema::DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())), + false, + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "15".to_string(), + )])), + arrow_schema::Field::new("binary", arrow_schema::DataType::LargeBinary, false) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "16".to_string(), + )])), ]; Arc::new(arrow_schema::Schema::new(fields)) }; @@ -1234,8 +1344,54 @@ pub mod tests { let values: BooleanArray = values.into(); let col8 = Arc::new(values) as ArrayRef; + // float: + let mut values = vec![100.0f32; 512]; + values.append(vec![150.0f32; 12].as_mut()); + values.append(vec![200.0f32; 500].as_mut()); + let col9 = Arc::new(Float32Array::from_iter_values(values)) as ArrayRef; + + // decimal: + let values = vec![123i128; 1024]; + let col10 = Arc::new( + Decimal128Array::from_iter_values(values) + .with_data_type(DataType::Decimal128(3, 2)), + ); + + // date: + let values = vec![0i32; 1024]; + let col11 = Arc::new(Date32Array::from_iter_values(values)); + + // timestamp: + let values = vec![0i64; 1024]; + let col12 = Arc::new(TimestampMicrosecondArray::from_iter_values(values)); + + // timestamptz: + let values = vec![0i64; 1024]; + let col13 = Arc::new( + TimestampMicrosecondArray::from_iter_values(values).with_data_type( + DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())), + ), + ); + + // timestampns: + let values = vec![0i64; 1024]; + let col14 = Arc::new(TimestampNanosecondArray::from_iter_values(values)); + + // timestamptzns: + let values = vec![0i64; 1024]; + let col15 = Arc::new( + TimestampNanosecondArray::from_iter_values(values).with_data_type( + DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())), + ), + ); + + // binary: + let values = vec![[0u8; 8]; 1024]; + let col16 = Arc::new(LargeBinaryArray::from_iter_values(values)); + let to_write = RecordBatch::try_new(schema.clone(), vec![ - col1, col2, col3, col4, col5, col6, col7, col8, + col1, col2, col3, col4, col5, col6, col7, col8, col9, col10, col11, col12, col13, + col14, col15, col16, ]) .unwrap(); diff --git a/crates/iceberg/src/spec/manifest.rs b/crates/iceberg/src/spec/manifest.rs index f517b8e0d..fef3a3aa3 100644 --- a/crates/iceberg/src/spec/manifest.rs +++ b/crates/iceberg/src/spec/manifest.rs @@ -966,6 +966,12 @@ impl ManifestEntry { self.sequence_number } + /// File sequence number. + #[inline] + pub fn file_sequence_number(&self) -> Option { + self.file_sequence_number + } + /// File size in bytes. #[inline] pub fn file_size_in_bytes(&self) -> u64 { diff --git a/crates/iceberg/testdata/example_table_metadata_v2.json b/crates/iceberg/testdata/example_table_metadata_v2.json index 35230966a..2c827de4c 100644 --- a/crates/iceberg/testdata/example_table_metadata_v2.json +++ b/crates/iceberg/testdata/example_table_metadata_v2.json @@ -20,7 +20,15 @@ {"id": 5, "name": "dbl", "required": true, "type": "double"}, {"id": 6, "name": "i32", "required": true, "type": "int"}, {"id": 7, "name": "i64", "required": true, "type": "long"}, - {"id": 8, "name": "bool", "required": true, "type": "boolean"} + {"id": 8, "name": "bool", "required": true, "type": "boolean"}, + {"id": 9, "name": "float", "required": true, "type": "float"}, + {"id": 10, "name": "decimal", "required": true, "type": "decimal(3,2)"}, + {"id": 11, "name": "date", "required": true, "type": "date"}, + {"id": 12, "name": "timestamp", "required": true, "type": "timestamp"}, + {"id": 13, "name": "timestamptz", "required": true, "type": "timestamptz"}, + {"id": 14, "name": "timestampns", "required": true, "type": "timestampns"}, + {"id": 15, "name": "timestamptzns", "required": true, "type": "timestamptzns"}, + {"id": 16, "name": "binary", "required": true, "type": "binary"} ] } ],