Skip to content

Commit

Permalink
Merge branch 'main' into dev/iceberg-provider-table
Browse files Browse the repository at this point in the history
  • Loading branch information
SergeiPatiakin committed Dec 9, 2024
2 parents 3cdb1e1 + 1a76e1c commit bc61289
Show file tree
Hide file tree
Showing 15 changed files with 541 additions and 18 deletions.
2 changes: 1 addition & 1 deletion .asf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ github:
required_approving_review_count: 1

required_linear_history: true

del_branch_on_merge: true
features:
wiki: false
issues: true
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/ci_typos.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,4 @@ jobs:
steps:
- uses: actions/checkout@v4
- name: Check typos
uses: crate-ci/typos@v1.27.3
uses: crate-ci/typos@v1.28.2
2 changes: 1 addition & 1 deletion .github/workflows/release_python.yml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ jobs:
- { os: ubuntu-latest, target: "armv7l" }
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v4
- uses: actions/setup-python@v5
with:
python-version: 3.9
- name: Setup Rust toolchain
Expand Down
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ members = [
"crates/iceberg",
"crates/integration_tests",
"crates/integrations/*",
"crates/puffin",
"crates/test_utils",
]
exclude = ["bindings/python"]
Expand Down Expand Up @@ -98,3 +99,4 @@ uuid = { version = "1.6.1", features = ["v7"] }
volo-thrift = "0.10"
hive_metastore = "0.1"
tera = "1"
zstd = "0.13.2"
15 changes: 13 additions & 2 deletions crates/iceberg/src/arrow/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -723,10 +723,21 @@ macro_rules! get_parquet_stat_as_datum {
let Some(bytes) = stats.[<$limit_type _bytes_opt>]() else {
return Ok(None);
};

Some(Datum::new(
primitive_type.clone(),
PrimitiveLiteral::Int128(i128::from_le_bytes(bytes.try_into()?)),
PrimitiveLiteral::Int128(i128::from_be_bytes(bytes.try_into()?)),
))
}
(PrimitiveType::Decimal {
precision: _,
scale: _,
}, Statistics::FixedLenByteArray(stats)) => {
let Some(bytes) = stats.[<$limit_type _bytes_opt>]() else {
return Ok(None);
};
Some(Datum::new(
primitive_type.clone(),
PrimitiveLiteral::Int128(i128::from_be_bytes(bytes.try_into()?)),
))
}
(
Expand Down
10 changes: 10 additions & 0 deletions crates/iceberg/src/io/file_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,16 @@ impl OutputFile {
writer.close().await
}

/// Create a new output file with given bytes.
/// Error out if the file already exists
pub async fn write_exclusive(&self, bs: Bytes) -> crate::Result<()> {
self.op
.write_with(&self.path[self.relative_path_pos..], bs)
.if_none_match("*")
.await?;
Ok(())
}

/// Creates output file for continues writing.
///
/// # Notes
Expand Down
18 changes: 17 additions & 1 deletion crates/iceberg/src/io/storage_s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ pub const S3_ASSUME_ROLE_ARN: &str = "client.assume-role.arn";
pub const S3_ASSUME_ROLE_EXTERNAL_ID: &str = "client.assume-role.external-id";
/// Optional session name used to assume an IAM role.
pub const S3_ASSUME_ROLE_SESSION_NAME: &str = "client.assume-role.session-name";
/// Option to skip signing request (e.g. for public buckets/folders)
pub const S3_ALLOW_ANONYMOUS: &str = "s3.allow-anonymous";
/// Option to skip loading the credential from EC2 metadata (typically used in conjunction with
/// `S3_ALLOW_ANONYMOUS`)
pub const S3_DISABLE_EC2_METADATA: &str = "s3.disable-ec2-metadata";

/// Parse iceberg props to s3 config.
pub(crate) fn s3_config_parse(mut m: HashMap<String, String>) -> Result<S3Config> {
Expand All @@ -81,7 +86,7 @@ pub(crate) fn s3_config_parse(mut m: HashMap<String, String>) -> Result<S3Config
cfg.region = Some(region);
};
if let Some(path_style_access) = m.remove(S3_PATH_STYLE_ACCESS) {
if ["true", "True", "1"].contains(&path_style_access.as_str()) {
if ["true", "t", "1"].contains(&path_style_access.to_lowercase().as_str()) {
cfg.enable_virtual_host_style = true;
}
};
Expand Down Expand Up @@ -126,6 +131,17 @@ pub(crate) fn s3_config_parse(mut m: HashMap<String, String>) -> Result<S3Config
}
};

if let Some(allow_anonymous) = m.remove(S3_ALLOW_ANONYMOUS) {
if ["true", "t", "1", "on"].contains(&allow_anonymous.to_lowercase().as_str()) {
cfg.allow_anonymous = true;
}
}
if let Some(disable_ec2_metadata) = m.remove(S3_DISABLE_EC2_METADATA) {
if ["true", "t", "1", "on"].contains(&disable_ec2_metadata.to_lowercase().as_str()) {
cfg.disable_ec2_metadata = true;
}
};

Ok(cfg)
}

Expand Down
117 changes: 113 additions & 4 deletions crates/iceberg/src/spec/table_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1010,10 +1010,14 @@ pub(super) mod _serde {
.collect(),
default_spec_id: v.default_spec.spec_id(),
last_partition_id: v.last_partition_id,
properties: Some(v.properties),
current_snapshot_id: v.current_snapshot_id.or(Some(-1)),
properties: if v.properties.is_empty() {
None
} else {
Some(v.properties)
},
current_snapshot_id: v.current_snapshot_id,
snapshots: if v.snapshots.is_empty() {
Some(vec![])
None
} else {
Some(
v.snapshots
Expand Down Expand Up @@ -1091,7 +1095,7 @@ pub(super) mod _serde {
} else {
Some(v.properties)
},
current_snapshot_id: v.current_snapshot_id.or(Some(-1)),
current_snapshot_id: v.current_snapshot_id,
snapshots: if v.snapshots.is_empty() {
None
} else {
Expand Down Expand Up @@ -1279,6 +1283,7 @@ mod tests {
"timestamp-ms": 1515100
}
],
"refs": {},
"sort-orders": [
{
"order-id": 0,
Expand Down Expand Up @@ -1349,7 +1354,11 @@ mod tests {
refs: HashMap::new(),
};

let expected_json_value = serde_json::to_value(&expected).unwrap();
check_table_metadata_serde(data, expected);

let json_value = serde_json::from_str::<serde_json::Value>(data).unwrap();
assert_eq!(json_value, expected_json_value);
}

#[test]
Expand Down Expand Up @@ -1519,6 +1528,106 @@ mod tests {
check_table_metadata_serde(data, expected);
}

#[test]
fn test_table_data_v2_no_snapshots() {
let data = r#"
{
"format-version" : 2,
"table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94",
"location": "s3://b/wh/data.db/table",
"last-sequence-number" : 1,
"last-updated-ms": 1515100955770,
"last-column-id": 1,
"schemas": [
{
"schema-id" : 1,
"type" : "struct",
"fields" :[
{
"id": 1,
"name": "struct_name",
"required": true,
"type": "fixed[1]"
}
]
}
],
"current-schema-id" : 1,
"partition-specs": [
{
"spec-id": 0,
"fields": []
}
],
"refs": {},
"default-spec-id": 0,
"last-partition-id": 1000,
"metadata-log": [
{
"metadata-file": "s3://bucket/.../v1.json",
"timestamp-ms": 1515100
}
],
"sort-orders": [
{
"order-id": 0,
"fields": []
}
],
"default-sort-order-id": 0
}
"#;

let schema = Schema::builder()
.with_schema_id(1)
.with_fields(vec![Arc::new(NestedField::required(
1,
"struct_name",
Type::Primitive(PrimitiveType::Fixed(1)),
))])
.build()
.unwrap();

let partition_spec = BoundPartitionSpec::builder(schema.clone())
.with_spec_id(0)
.build()
.unwrap();

let expected = TableMetadata {
format_version: FormatVersion::V2,
table_uuid: Uuid::parse_str("fb072c92-a02b-11e9-ae9c-1bb7bc9eca94").unwrap(),
location: "s3://b/wh/data.db/table".to_string(),
last_updated_ms: 1515100955770,
last_column_id: 1,
schemas: HashMap::from_iter(vec![(1, Arc::new(schema))]),
current_schema_id: 1,
partition_specs: HashMap::from_iter(vec![(
0,
partition_spec.clone().into_schemaless().into(),
)]),
default_spec: partition_spec.into(),
last_partition_id: 1000,
default_sort_order_id: 0,
sort_orders: HashMap::from_iter(vec![(0, SortOrder::unsorted_order().into())]),
snapshots: HashMap::default(),
current_snapshot_id: None,
last_sequence_number: 1,
properties: HashMap::new(),
snapshot_log: Vec::new(),
metadata_log: vec![MetadataLog {
metadata_file: "s3://bucket/.../v1.json".to_string(),
timestamp_ms: 1515100,
}],
refs: HashMap::new(),
};

let expected_json_value = serde_json::to_value(&expected).unwrap();
check_table_metadata_serde(data, expected);

let json_value = serde_json::from_str::<serde_json::Value>(data).unwrap();
assert_eq!(json_value, expected_json_value);
}

#[test]
fn test_current_snapshot_id_must_match_main_branch() {
let data = r#"
Expand Down
6 changes: 4 additions & 2 deletions crates/iceberg/src/spec/table_metadata_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,10 @@ impl TableMetadataBuilder {
#[must_use]
pub fn new_from_metadata(
previous: TableMetadata,
previous_file_location: Option<String>,
current_file_location: Option<String>,
) -> Self {
Self {
previous_history_entry: previous_file_location.map(|l| MetadataLog {
previous_history_entry: current_file_location.map(|l| MetadataLog {
metadata_file: l,
timestamp_ms: previous.last_updated_ms,
}),
Expand Down Expand Up @@ -1220,6 +1220,7 @@ mod tests {
assert_eq!(metadata.last_partition_id, 1000);
assert_eq!(metadata.last_column_id, 3);
assert_eq!(metadata.snapshots.len(), 0);
assert_eq!(metadata.current_snapshot_id, None);
assert_eq!(metadata.refs.len(), 0);
assert_eq!(metadata.properties.len(), 0);
assert_eq!(metadata.metadata_log.len(), 0);
Expand Down Expand Up @@ -1268,6 +1269,7 @@ mod tests {
assert_eq!(metadata.last_partition_id, UNPARTITIONED_LAST_ASSIGNED_ID);
assert_eq!(metadata.last_column_id, 0);
assert_eq!(metadata.snapshots.len(), 0);
assert_eq!(metadata.current_snapshot_id, None);
assert_eq!(metadata.refs.len(), 0);
assert_eq!(metadata.properties.len(), 0);
assert_eq!(metadata.metadata_log.len(), 0);
Expand Down
6 changes: 3 additions & 3 deletions crates/iceberg/src/transform/truncate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -712,11 +712,11 @@ mod test {
);

// test decimal
let mut buidler = PrimitiveBuilder::<Decimal128Type>::new()
let mut builder = PrimitiveBuilder::<Decimal128Type>::new()
.with_precision_and_scale(20, 2)
.unwrap();
buidler.append_value(1065);
let input = Arc::new(buidler.finish());
builder.append_value(1065);
let input = Arc::new(builder.finish());
let res = super::Truncate::new(50).transform(input).unwrap();
assert_eq!(
res.as_any()
Expand Down
Loading

0 comments on commit bc61289

Please sign in to comment.