Skip to content

Commit

Permalink
simpler nested field routine
Browse files Browse the repository at this point in the history
  • Loading branch information
Bryan Keller committed Jan 21, 2024
1 parent 8892606 commit 11daa38
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,15 @@
import org.apache.iceberg.io.TaskWriter;
import org.apache.iceberg.io.UnpartitionedWriter;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.base.Splitter;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.relocated.com.google.common.primitives.Ints;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types.NestedField;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -121,37 +123,49 @@ private static Object loadHadoopConfig(IcebergSinkConfig config) {
}

public static Object extractFromRecordValue(Object recordValue, String fieldName) {
String[] fields = fieldName.split("\\.");
List<String> fields = Splitter.on('.').splitToList(fieldName);
if (recordValue instanceof Struct) {
return valueFromStruct((Struct) recordValue, fields, 0);
return valueFromStruct((Struct) recordValue, fields);
} else if (recordValue instanceof Map) {
return valueFromMap((Map<?, ?>) recordValue, fields, 0);
return valueFromMap((Map<?, ?>) recordValue, fields);
} else {
throw new UnsupportedOperationException(
"Cannot extract value from type: " + recordValue.getClass().getName());
}
}

private static Object valueFromStruct(Struct struct, String[] fields, int idx) {
Preconditions.checkArgument(idx < fields.length, "Invalid field index");
Object value = struct.get(fields[idx]);
if (value == null || idx == fields.length - 1) {
return value;
private static Object valueFromStruct(Struct parent, List<String> fields) {
Struct struct = parent;
for (int idx = 0; idx < fields.size() - 1; idx++) {
Object value = fieldValueFromStruct(struct, fields.get(idx));
if (value == null) {
return null;
}
Preconditions.checkState(value instanceof Struct, "Expected a struct type");
struct = (Struct) value;
}

Preconditions.checkState(value instanceof Struct, "Expected a struct type");
return valueFromStruct((Struct) value, fields, idx + 1);
return fieldValueFromStruct(struct, fields.get(fields.size() - 1));
}

private static Object valueFromMap(Map<?, ?> map, String[] fields, int idx) {
Preconditions.checkArgument(idx < fields.length, "Invalid field index");
Object value = map.get(fields[idx]);
if (value == null || idx == fields.length - 1) {
return value;
private static Object fieldValueFromStruct(Struct struct, String fieldName) {
Field structField = struct.schema().field(fieldName);
if (structField == null) {
return null;
}
return struct.get(structField);
}

Preconditions.checkState(value instanceof Map, "Expected a map type");
return valueFromMap((Map<?, ?>) value, fields, idx + 1);
private static Object valueFromMap(Map<?, ?> parent, List<String> fields) {
Map<?, ?> map = parent;
for (int idx = 0; idx < fields.size() - 1; idx++) {
Object value = map.get(fields.get(idx));
if (value == null) {
return null;
}
Preconditions.checkState(value instanceof Map, "Expected a map type");
map = (Map<?, ?>) value;
}
return map.get(fields.get(fields.size() - 1));
}

public static TaskWriter<Record> createTableWriter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,18 @@ public void testExtractFromRecordValueStructNested() {
assertThat(result).isEqualTo(123L);
}

@Test
public void testExtractFromRecordValueStructNull() {
Schema valSchema = SchemaBuilder.struct().field("key", Schema.INT64_SCHEMA).build();
Struct val = new Struct(valSchema).put("key", 123L);

Object result = Utilities.extractFromRecordValue(val, "");
assertThat(result).isNull();

result = Utilities.extractFromRecordValue(val, "xkey");
assertThat(result).isNull();
}

@Test
public void testExtractFromRecordValueMap() {
Map<String, Object> val = ImmutableMap.of("key", 123L);
Expand All @@ -157,4 +169,15 @@ public void testExtractFromRecordValueMapNested() {
Object result = Utilities.extractFromRecordValue(val, "data.id.key");
assertThat(result).isEqualTo(123L);
}

@Test
public void testExtractFromRecordValueMapNull() {
Map<String, Object> val = ImmutableMap.of("key", 123L);

Object result = Utilities.extractFromRecordValue(val, "");
assertThat(result).isNull();

result = Utilities.extractFromRecordValue(val, "xkey");
assertThat(result).isNull();
}
}

0 comments on commit 11daa38

Please sign in to comment.