Skip to content

Commit

Permalink
add null checks
Browse files Browse the repository at this point in the history
  • Loading branch information
Bryan Keller committed Jan 17, 2024
1 parent de2e62b commit c0dbe41
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -133,19 +133,33 @@ private static void commitSchemaUpdates(Table table, SchemaUpdate.Consumer updat
}

private static boolean columnExists(org.apache.iceberg.Schema schema, AddColumn update) {
StructType struct =
update.parentName() == null
? schema.asStruct()
: schema.findType(update.parentName()).asStructType();
StructType struct;
if (update.parentName() == null) {
struct = schema.asStruct();
} else {
Type type = schema.findType(update.parentName()).asStructType();
if (type == null) {
return false;
}
struct = type.asStructType();
}
return struct.field(update.name()) != null;
}

private static boolean typeMatches(org.apache.iceberg.Schema schema, UpdateType update) {
return schema.findType(update.name()).typeId() == update.type().typeId();
Type type = schema.findType(update.name());
if (type == null) {
throw new IllegalArgumentException("Invalid column: " + update.name());
}
return type.typeId() == update.type().typeId();
}

private static boolean isOptional(org.apache.iceberg.Schema schema, MakeOptional update) {
return schema.findField(update.name()).isOptional();
NestedField field = schema.findField(update.name());
if (field == null) {
throw new IllegalArgumentException("Invalid column: " + update.name());
}
return field.isOptional();
}

public static PartitionSpec createPartitionSpec(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.relocated.com.google.common.primitives.Ints;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types.NestedField;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.kafka.connect.data.Struct;
import org.slf4j.Logger;
Expand Down Expand Up @@ -175,7 +176,14 @@ public static TaskWriter<Record> createTableWriter(
if (!idCols.isEmpty()) {
identifierFieldIds =
idCols.stream()
.map(colName -> table.schema().findField(colName).fieldId())
.map(
colName -> {
NestedField field = table.schema().findField(colName);
if (field == null) {
throw new IllegalArgumentException("ID column not found: " + colName);
}
return field.fieldId();
})
.collect(toSet());
}

Expand Down

0 comments on commit c0dbe41

Please sign in to comment.