Skip to content

Commit

Permalink
add interface to help serialize/deserialize DataFile
Browse files Browse the repository at this point in the history
  • Loading branch information
ZENOTME committed Dec 18, 2024
1 parent 2e0b646 commit f766445
Showing 1 changed file with 108 additions and 44 deletions.
152 changes: 108 additions & 44 deletions crates/iceberg/src/spec/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ use typed_builder::TypedBuilder;
use self::_const_schema::{manifest_schema_v1, manifest_schema_v2};
use super::{
BoundPartitionSpec, Datum, FieldSummary, FormatVersion, ManifestContentType, ManifestFile,
Schema, SchemaId, SchemaRef, Struct, INITIAL_SEQUENCE_NUMBER, UNASSIGNED_SEQUENCE_NUMBER,
Schema, SchemaId, SchemaRef, Struct, StructType, INITIAL_SEQUENCE_NUMBER,
UNASSIGNED_SEQUENCE_NUMBER,
};
use crate::error::Result;
use crate::io::OutputFile;
Expand Down Expand Up @@ -625,6 +626,38 @@ mod _const_schema {
})
};

fn data_file_fields_v2(partition_type: StructType) -> Vec<NestedFieldRef> {
vec![
CONTENT.clone(),
FILE_PATH.clone(),
FILE_FORMAT.clone(),
Arc::new(NestedField::required(
102,
"partition",
Type::Struct(partition_type),
)),
RECORD_COUNT.clone(),
FILE_SIZE_IN_BYTES.clone(),
COLUMN_SIZES.clone(),
VALUE_COUNTS.clone(),
NULL_VALUE_COUNTS.clone(),
NAN_VALUE_COUNTS.clone(),
LOWER_BOUNDS.clone(),
UPPER_BOUNDS.clone(),
KEY_METADATA.clone(),
SPLIT_OFFSETS.clone(),
EQUALITY_IDS.clone(),
SORT_ORDER_ID.clone(),
]
}

pub(super) fn data_file_schema_v2(partition_type: StructType) -> Result<AvroSchema, Error> {
let schema = Schema::builder()
.with_fields(data_file_fields_v2(partition_type))
.build()?;
schema_to_avro_schema("data_file", &schema)
}

pub(super) fn manifest_schema_v2(partition_type: StructType) -> Result<AvroSchema, Error> {
let fields = vec![
STATUS.clone(),
Expand All @@ -634,62 +667,52 @@ mod _const_schema {
Arc::new(NestedField::required(
2,
"data_file",
Type::Struct(StructType::new(vec![
CONTENT.clone(),
FILE_PATH.clone(),
FILE_FORMAT.clone(),
Arc::new(NestedField::required(
102,
"partition",
Type::Struct(partition_type),
)),
RECORD_COUNT.clone(),
FILE_SIZE_IN_BYTES.clone(),
COLUMN_SIZES.clone(),
VALUE_COUNTS.clone(),
NULL_VALUE_COUNTS.clone(),
NAN_VALUE_COUNTS.clone(),
LOWER_BOUNDS.clone(),
UPPER_BOUNDS.clone(),
KEY_METADATA.clone(),
SPLIT_OFFSETS.clone(),
EQUALITY_IDS.clone(),
SORT_ORDER_ID.clone(),
])),
Type::Struct(StructType::new(data_file_fields_v2(partition_type))),
)),
];
let schema = Schema::builder().with_fields(fields).build()?;
schema_to_avro_schema("manifest_entry", &schema)
}

fn data_file_fields_v1(partition_type: StructType) -> Vec<NestedFieldRef> {
vec![
FILE_PATH.clone(),
FILE_FORMAT.clone(),
Arc::new(NestedField::required(
102,
"partition",
Type::Struct(partition_type),
)),
RECORD_COUNT.clone(),
FILE_SIZE_IN_BYTES.clone(),
BLOCK_SIZE_IN_BYTES.clone(),
COLUMN_SIZES.clone(),
VALUE_COUNTS.clone(),
NULL_VALUE_COUNTS.clone(),
NAN_VALUE_COUNTS.clone(),
LOWER_BOUNDS.clone(),
UPPER_BOUNDS.clone(),
KEY_METADATA.clone(),
SPLIT_OFFSETS.clone(),
SORT_ORDER_ID.clone(),
]
}

pub(super) fn data_file_schema_v1(partition_type: StructType) -> Result<AvroSchema, Error> {
let schema = Schema::builder()
.with_fields(data_file_fields_v1(partition_type))
.build()?;
schema_to_avro_schema("data_file", &schema)
}

pub(super) fn manifest_schema_v1(partition_type: StructType) -> Result<AvroSchema, Error> {
let fields = vec![
STATUS.clone(),
SNAPSHOT_ID_V1.clone(),
Arc::new(NestedField::required(
2,
"data_file",
Type::Struct(StructType::new(vec![
FILE_PATH.clone(),
FILE_FORMAT.clone(),
Arc::new(NestedField::required(
102,
"partition",
Type::Struct(partition_type),
)),
RECORD_COUNT.clone(),
FILE_SIZE_IN_BYTES.clone(),
BLOCK_SIZE_IN_BYTES.clone(),
COLUMN_SIZES.clone(),
VALUE_COUNTS.clone(),
NULL_VALUE_COUNTS.clone(),
NAN_VALUE_COUNTS.clone(),
LOWER_BOUNDS.clone(),
UPPER_BOUNDS.clone(),
KEY_METADATA.clone(),
SPLIT_OFFSETS.clone(),
SORT_ORDER_ID.clone(),
])),
Type::Struct(StructType::new(data_file_fields_v1(partition_type))),
)),
];
let schema = Schema::builder().with_fields(fields).build()?;
Expand Down Expand Up @@ -1158,6 +1181,47 @@ impl DataFile {
self.sort_order_id
}
}

/// Convert data files to avro bytes.
pub fn data_files_to_avro(
data_files: impl IntoIterator<Item = DataFile>,
partition_type: &StructType,
version: FormatVersion,
) -> Result<Vec<u8>> {
let avro_schema = match version {
FormatVersion::V1 => _const_schema::data_file_schema_v1(partition_type.clone()).unwrap(),
FormatVersion::V2 => _const_schema::data_file_schema_v2(partition_type.clone()).unwrap(),
};
let mut writer = AvroWriter::new(&avro_schema, Vec::new());

for data_file in data_files {
let value = to_value(_serde::DataFile::try_from(data_file, partition_type, true)?)?
.resolve(&avro_schema)?;
writer.append(value)?;
}

Ok(writer.into_inner()?)
}

/// Parse data files from avro bytes.
pub fn parse_from_avro(
bytes: &[u8],
schema: &Schema,
partition_type: &StructType,
version: FormatVersion,
) -> Result<Vec<DataFile>> {
let avro_schema = match version {
FormatVersion::V1 => _const_schema::data_file_schema_v1(partition_type.clone()).unwrap(),
FormatVersion::V2 => _const_schema::data_file_schema_v2(partition_type.clone()).unwrap(),
};

let reader = AvroReader::with_schema(&avro_schema, bytes)?;
reader
.into_iter()
.map(|value| from_value::<_serde::DataFile>(&value?)?.try_into(partition_type, &schema))
.collect::<Result<Vec<_>>>()
}

/// Type of content stored by the data file: data, equality deletes, or
/// position deletes (all v1 files are data files)
#[derive(Debug, PartialEq, Eq, Clone, Copy, Serialize, Deserialize)]
Expand Down

0 comments on commit f766445

Please sign in to comment.