From c0dbe4139de443e56fbcb04e37a0d5a4d7610879 Mon Sep 17 00:00:00 2001 From: Bryan Keller Date: Tue, 16 Jan 2024 16:35:22 -0800 Subject: [PATCH] add null checks --- .../iceberg/connect/data/SchemaUtils.java | 26 ++++++++++++++----- .../iceberg/connect/data/Utilities.java | 10 ++++++- 2 files changed, 29 insertions(+), 7 deletions(-) diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SchemaUtils.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SchemaUtils.java index 204fe867f7c8..9d528bd8b268 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SchemaUtils.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SchemaUtils.java @@ -133,19 +133,33 @@ private static void commitSchemaUpdates(Table table, SchemaUpdate.Consumer updat } private static boolean columnExists(org.apache.iceberg.Schema schema, AddColumn update) { - StructType struct = - update.parentName() == null - ? schema.asStruct() - : schema.findType(update.parentName()).asStructType(); + StructType struct; + if (update.parentName() == null) { + struct = schema.asStruct(); + } else { + Type type = schema.findType(update.parentName()).asStructType(); + if (type == null) { + return false; + } + struct = type.asStructType(); + } return struct.field(update.name()) != null; } private static boolean typeMatches(org.apache.iceberg.Schema schema, UpdateType update) { - return schema.findType(update.name()).typeId() == update.type().typeId(); + Type type = schema.findType(update.name()); + if (type == null) { + throw new IllegalArgumentException("Invalid column: " + update.name()); + } + return type.typeId() == update.type().typeId(); } private static boolean isOptional(org.apache.iceberg.Schema schema, MakeOptional update) { - return schema.findField(update.name()).isOptional(); + NestedField field = schema.findField(update.name()); + if (field == null) { + throw new IllegalArgumentException("Invalid column: " + update.name()); + } + return field.isOptional(); } public static PartitionSpec createPartitionSpec( diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/Utilities.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/Utilities.java index 2d3cac06253e..7d909db82adb 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/Utilities.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/Utilities.java @@ -54,6 +54,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.relocated.com.google.common.primitives.Ints; import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.types.Types.NestedField; import org.apache.iceberg.util.PropertyUtil; import org.apache.kafka.connect.data.Struct; import org.slf4j.Logger; @@ -175,7 +176,14 @@ public static TaskWriter createTableWriter( if (!idCols.isEmpty()) { identifierFieldIds = idCols.stream() - .map(colName -> table.schema().findField(colName).fieldId()) + .map( + colName -> { + NestedField field = table.schema().findField(colName); + if (field == null) { + throw new IllegalArgumentException("ID column not found: " + colName); + } + return field.fieldId(); + }) .collect(toSet()); }