From 11daa386072a8d9bfae8d0424ba703e36074c73f Mon Sep 17 00:00:00 2001 From: Bryan Keller Date: Sat, 20 Jan 2024 20:04:48 -0800 Subject: [PATCH] simpler nested field routine --- .../iceberg/connect/data/Utilities.java | 50 ++++++++++++------- .../iceberg/connect/data/UtilitiesTest.java | 23 +++++++++ 2 files changed, 55 insertions(+), 18 deletions(-) diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/Utilities.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/Utilities.java index abf9b24153c4..d47957fe6376 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/Utilities.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/Utilities.java @@ -49,6 +49,7 @@ import org.apache.iceberg.io.TaskWriter; import org.apache.iceberg.io.UnpartitionedWriter; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.base.Splitter; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; @@ -56,6 +57,7 @@ import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.types.Types.NestedField; import org.apache.iceberg.util.PropertyUtil; +import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Struct; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -121,37 +123,49 @@ private static Object loadHadoopConfig(IcebergSinkConfig config) { } public static Object extractFromRecordValue(Object recordValue, String fieldName) { - String[] fields = fieldName.split("\\."); + List fields = Splitter.on('.').splitToList(fieldName); if (recordValue instanceof Struct) { - return valueFromStruct((Struct) recordValue, fields, 0); + return valueFromStruct((Struct) recordValue, fields); } else if (recordValue instanceof Map) { - return valueFromMap((Map) recordValue, fields, 0); + return valueFromMap((Map) recordValue, fields); } else { throw new UnsupportedOperationException( "Cannot extract value from type: " + recordValue.getClass().getName()); } } - private static Object valueFromStruct(Struct struct, String[] fields, int idx) { - Preconditions.checkArgument(idx < fields.length, "Invalid field index"); - Object value = struct.get(fields[idx]); - if (value == null || idx == fields.length - 1) { - return value; + private static Object valueFromStruct(Struct parent, List fields) { + Struct struct = parent; + for (int idx = 0; idx < fields.size() - 1; idx++) { + Object value = fieldValueFromStruct(struct, fields.get(idx)); + if (value == null) { + return null; + } + Preconditions.checkState(value instanceof Struct, "Expected a struct type"); + struct = (Struct) value; } - - Preconditions.checkState(value instanceof Struct, "Expected a struct type"); - return valueFromStruct((Struct) value, fields, idx + 1); + return fieldValueFromStruct(struct, fields.get(fields.size() - 1)); } - private static Object valueFromMap(Map map, String[] fields, int idx) { - Preconditions.checkArgument(idx < fields.length, "Invalid field index"); - Object value = map.get(fields[idx]); - if (value == null || idx == fields.length - 1) { - return value; + private static Object fieldValueFromStruct(Struct struct, String fieldName) { + Field structField = struct.schema().field(fieldName); + if (structField == null) { + return null; } + return struct.get(structField); + } - Preconditions.checkState(value instanceof Map, "Expected a map type"); - return valueFromMap((Map) value, fields, idx + 1); + private static Object valueFromMap(Map parent, List fields) { + Map map = parent; + for (int idx = 0; idx < fields.size() - 1; idx++) { + Object value = map.get(fields.get(idx)); + if (value == null) { + return null; + } + Preconditions.checkState(value instanceof Map, "Expected a map type"); + map = (Map) value; + } + return map.get(fields.get(fields.size() - 1)); } public static TaskWriter createTableWriter( diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/UtilitiesTest.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/UtilitiesTest.java index 46c35b79b98c..cfa1709da744 100644 --- a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/UtilitiesTest.java +++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/UtilitiesTest.java @@ -141,6 +141,18 @@ public void testExtractFromRecordValueStructNested() { assertThat(result).isEqualTo(123L); } + @Test + public void testExtractFromRecordValueStructNull() { + Schema valSchema = SchemaBuilder.struct().field("key", Schema.INT64_SCHEMA).build(); + Struct val = new Struct(valSchema).put("key", 123L); + + Object result = Utilities.extractFromRecordValue(val, ""); + assertThat(result).isNull(); + + result = Utilities.extractFromRecordValue(val, "xkey"); + assertThat(result).isNull(); + } + @Test public void testExtractFromRecordValueMap() { Map val = ImmutableMap.of("key", 123L); @@ -157,4 +169,15 @@ public void testExtractFromRecordValueMapNested() { Object result = Utilities.extractFromRecordValue(val, "data.id.key"); assertThat(result).isEqualTo(123L); } + + @Test + public void testExtractFromRecordValueMapNull() { + Map val = ImmutableMap.of("key", 123L); + + Object result = Utilities.extractFromRecordValue(val, ""); + assertThat(result).isNull(); + + result = Utilities.extractFromRecordValue(val, "xkey"); + assertThat(result).isNull(); + } }