From 224fc3c5ab013aea82a03643fd3537f211a6f8b8 Mon Sep 17 00:00:00 2001 From: Bryan Keller Date: Sat, 4 Nov 2023 10:15:39 -0700 Subject: [PATCH] remove mapping array --- .../org/apache/iceberg/connect/events/AvroUtil.java | 12 +++++------- .../iceberg/connect/events/CommitComplete.java | 8 ++------ .../apache/iceberg/connect/events/CommitToTable.java | 8 ++------ .../apache/iceberg/connect/events/DataComplete.java | 8 ++------ .../apache/iceberg/connect/events/DataWritten.java | 7 ++----- .../org/apache/iceberg/connect/events/Event.java | 7 ++----- .../apache/iceberg/connect/events/StartCommit.java | 8 ++------ .../iceberg/connect/events/TableReference.java | 8 ++------ .../iceberg/connect/events/TopicPartitionOffset.java | 8 ++------ 9 files changed, 21 insertions(+), 53 deletions(-) diff --git a/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/AvroUtil.java b/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/AvroUtil.java index 84714142524f..8e40c37f3230 100644 --- a/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/AvroUtil.java +++ b/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/AvroUtil.java @@ -29,6 +29,7 @@ import org.apache.iceberg.avro.AvroEncoderUtil; import org.apache.iceberg.avro.AvroSchemaUtil; import org.apache.iceberg.data.avro.DecoderResolver; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.types.Types; @@ -82,14 +83,11 @@ static Schema convert( struct.equals(icebergSchema) ? javaClass.getName() : typeMap.get(fieldId)); } - static int[] positionsToIds(Schema avroSchema) { - int[] result = new int[avroSchema.getFields().size()]; + static int positionToId(int position, Schema avroSchema) { List fields = avroSchema.getFields(); - for (int i = 0; i < result.length; i++) { - Object val = fields.get(i).getObjectProp(AvroSchemaUtil.FIELD_ID_PROP); - result[i] = val == null ? -1 : (int) val; - } - return result; + Preconditions.checkArgument(position < fields.size(), "Invalid field position: " + position); + Object val = fields.get(position).getObjectProp(AvroSchemaUtil.FIELD_ID_PROP); + return val == null ? -1 : (int) val; } private AvroUtil() {} diff --git a/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/CommitComplete.java b/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/CommitComplete.java index 318de8b077a3..8a2cf632407d 100644 --- a/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/CommitComplete.java +++ b/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/CommitComplete.java @@ -37,7 +37,6 @@ public class CommitComplete implements Payload { private UUID commitId; private OffsetDateTime validThroughTs; private final Schema avroSchema; - private final int[] positionsToIds; static final int COMMIT_ID = 10_000; static final int VALID_THROUGH_TS = 10_001; @@ -47,19 +46,16 @@ public class CommitComplete implements Payload { NestedField.required(COMMIT_ID, "commit_id", UUIDType.get()), NestedField.optional(VALID_THROUGH_TS, "valid_through_ts", TimestampType.withZone())); private static final Schema AVRO_SCHEMA = AvroUtil.convert(ICEBERG_SCHEMA, CommitComplete.class); - private static final int[] POSITIONS_TO_IDS = AvroUtil.positionsToIds(AVRO_SCHEMA); // Used by Avro reflection to instantiate this class when reading events public CommitComplete(Schema avroSchema) { this.avroSchema = avroSchema; - this.positionsToIds = AvroUtil.positionsToIds(avroSchema); } public CommitComplete(UUID commitId, OffsetDateTime validThroughTs) { this.commitId = commitId; this.validThroughTs = validThroughTs; this.avroSchema = AVRO_SCHEMA; - this.positionsToIds = POSITIONS_TO_IDS; } @Override @@ -91,7 +87,7 @@ public Schema getSchema() { @Override public void put(int i, Object v) { - switch (positionsToIds[i]) { + switch (AvroUtil.positionToId(i, avroSchema)) { case COMMIT_ID: this.commitId = (UUID) v; return; @@ -105,7 +101,7 @@ public void put(int i, Object v) { @Override public Object get(int i) { - switch (positionsToIds[i]) { + switch (AvroUtil.positionToId(i, avroSchema)) { case COMMIT_ID: return commitId; case VALID_THROUGH_TS: diff --git a/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/CommitToTable.java b/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/CommitToTable.java index 9df2bb12f98d..df32ff017690 100644 --- a/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/CommitToTable.java +++ b/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/CommitToTable.java @@ -40,7 +40,6 @@ public class CommitToTable implements Payload { private Long snapshotId; private OffsetDateTime validThroughTs; private final Schema avroSchema; - private final int[] positionsToIds; static final int COMMIT_ID = 10_400; static final int TABLE_REFERENCE = 10_401; @@ -54,12 +53,10 @@ public class CommitToTable implements Payload { NestedField.optional(SNAPSHOT_ID, "snapshot_id", LongType.get()), NestedField.optional(VALID_THROUGH_TS, "valid_through_ts", TimestampType.withZone())); private static final Schema AVRO_SCHEMA = AvroUtil.convert(ICEBERG_SCHEMA, CommitToTable.class); - private static final int[] POSITIONS_TO_IDS = AvroUtil.positionsToIds(AVRO_SCHEMA); // Used by Avro reflection to instantiate this class when reading events public CommitToTable(Schema avroSchema) { this.avroSchema = avroSchema; - this.positionsToIds = AvroUtil.positionsToIds(avroSchema); } public CommitToTable( @@ -72,7 +69,6 @@ public CommitToTable( this.snapshotId = snapshotId; this.validThroughTs = validThroughTs; this.avroSchema = AVRO_SCHEMA; - this.positionsToIds = POSITIONS_TO_IDS; } @Override @@ -108,7 +104,7 @@ public Schema getSchema() { @Override public void put(int i, Object v) { - switch (positionsToIds[i]) { + switch (AvroUtil.positionToId(i, avroSchema)) { case COMMIT_ID: this.commitId = (UUID) v; return; @@ -128,7 +124,7 @@ public void put(int i, Object v) { @Override public Object get(int i) { - switch (positionsToIds[i]) { + switch (AvroUtil.positionToId(i, avroSchema)) { case COMMIT_ID: return commitId; case TABLE_REFERENCE: diff --git a/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/DataComplete.java b/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/DataComplete.java index a4b763b7583d..e91414d2512d 100644 --- a/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/DataComplete.java +++ b/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/DataComplete.java @@ -35,7 +35,6 @@ public class DataComplete implements Payload { private UUID commitId; private List assignments; private final Schema avroSchema; - private final int[] positionsToIds; static final int COMMIT_ID = 10_100; static final int ASSIGNMENTS = 10_101; @@ -49,19 +48,16 @@ public class DataComplete implements Payload { "assignments", ListType.ofRequired(ASSIGNMENTS_ELEMENT, TopicPartitionOffset.ICEBERG_SCHEMA))); private static final Schema AVRO_SCHEMA = AvroUtil.convert(ICEBERG_SCHEMA, DataComplete.class); - private static final int[] POSITIONS_TO_IDS = AvroUtil.positionsToIds(AVRO_SCHEMA); // Used by Avro reflection to instantiate this class when reading events public DataComplete(Schema avroSchema) { this.avroSchema = avroSchema; - this.positionsToIds = AvroUtil.positionsToIds(avroSchema); } public DataComplete(UUID commitId, List assignments) { this.commitId = commitId; this.assignments = assignments; this.avroSchema = AVRO_SCHEMA; - this.positionsToIds = POSITIONS_TO_IDS; } @Override @@ -90,7 +86,7 @@ public Schema getSchema() { @Override @SuppressWarnings("unchecked") public void put(int i, Object v) { - switch (positionsToIds[i]) { + switch (AvroUtil.positionToId(i, avroSchema)) { case COMMIT_ID: this.commitId = (UUID) v; return; @@ -104,7 +100,7 @@ public void put(int i, Object v) { @Override public Object get(int i) { - switch (positionsToIds[i]) { + switch (AvroUtil.positionToId(i, avroSchema)) { case COMMIT_ID: return commitId; case ASSIGNMENTS: diff --git a/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/DataWritten.java b/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/DataWritten.java index c79b0c9f9425..5edc401a2919 100644 --- a/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/DataWritten.java +++ b/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/DataWritten.java @@ -42,7 +42,6 @@ public class DataWritten implements Payload { private List deleteFiles; private StructType icebergSchema; private final Schema avroSchema; - private final int[] positionsToIds; static final int COMMIT_ID = 10_300; static final int TABLE_REFERENCE = 10_301; @@ -55,7 +54,6 @@ public class DataWritten implements Payload { // set the partition type so the instance cannot be re-serialized public DataWritten(Schema avroSchema) { this.avroSchema = avroSchema; - this.positionsToIds = AvroUtil.positionsToIds(avroSchema); } public DataWritten( @@ -70,7 +68,6 @@ public DataWritten( this.dataFiles = dataFiles; this.deleteFiles = deleteFiles; this.avroSchema = AvroUtil.convert(writeSchema(), getClass()); - this.positionsToIds = AvroUtil.positionsToIds(avroSchema); } @Override @@ -125,7 +122,7 @@ public Schema getSchema() { @Override @SuppressWarnings("unchecked") public void put(int i, Object v) { - switch (positionsToIds[i]) { + switch (AvroUtil.positionToId(i, avroSchema)) { case COMMIT_ID: this.commitId = (UUID) v; return; @@ -145,7 +142,7 @@ public void put(int i, Object v) { @Override public Object get(int i) { - switch (positionsToIds[i]) { + switch (AvroUtil.positionToId(i, avroSchema)) { case COMMIT_ID: return commitId; case TABLE_REFERENCE: diff --git a/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/Event.java b/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/Event.java index 8d2a46509650..f8ee619142c9 100644 --- a/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/Event.java +++ b/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/Event.java @@ -46,7 +46,6 @@ public class Event implements IndexedRecord { private String groupId; private Payload payload; private final Schema avroSchema; - private final int[] positionsToIds; static final int ID = 10_500; static final int TYPE = 10_501; @@ -57,7 +56,6 @@ public class Event implements IndexedRecord { // Used by Avro reflection to instantiate this class when reading events public Event(Schema avroSchema) { this.avroSchema = avroSchema; - this.positionsToIds = AvroUtil.positionsToIds(avroSchema); } public Event(String groupId, Payload payload) { @@ -79,7 +77,6 @@ public Event(String groupId, Payload payload) { typeMap.put(PAYLOAD, payload.getClass().getName()); this.avroSchema = AvroUtil.convert(icebergSchema, getClass(), typeMap); - this.positionsToIds = AvroUtil.positionsToIds(avroSchema); } public UUID id() { @@ -109,7 +106,7 @@ public Schema getSchema() { @Override public void put(int i, Object v) { - switch (positionsToIds[i]) { + switch (AvroUtil.positionToId(i, avroSchema)) { case ID: this.id = (UUID) v; return; @@ -132,7 +129,7 @@ public void put(int i, Object v) { @Override public Object get(int i) { - switch (positionsToIds[i]) { + switch (AvroUtil.positionToId(i, avroSchema)) { case ID: return id; case TYPE: diff --git a/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/StartCommit.java b/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/StartCommit.java index ef9c4b694f03..f26a92249507 100644 --- a/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/StartCommit.java +++ b/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/StartCommit.java @@ -32,25 +32,21 @@ public class StartCommit implements Payload { private UUID commitId; private final Schema avroSchema; - private final int[] positionsToIds; static final int COMMIT_ID = 10_200; private static final StructType ICEBERG_SCHEMA = StructType.of(NestedField.required(COMMIT_ID, "commit_id", UUIDType.get())); private static final Schema AVRO_SCHEMA = AvroUtil.convert(ICEBERG_SCHEMA, StartCommit.class); - private static final int[] POSITIONS_TO_IDS = AvroUtil.positionsToIds(AVRO_SCHEMA); // Used by Avro reflection to instantiate this class when reading events public StartCommit(Schema avroSchema) { this.avroSchema = avroSchema; - this.positionsToIds = AvroUtil.positionsToIds(avroSchema); } public StartCommit(UUID commitId) { this.commitId = commitId; this.avroSchema = AVRO_SCHEMA; - this.positionsToIds = POSITIONS_TO_IDS; } @Override @@ -74,7 +70,7 @@ public Schema getSchema() { @Override public void put(int i, Object v) { - switch (positionsToIds[i]) { + switch (AvroUtil.positionToId(i, avroSchema)) { case COMMIT_ID: this.commitId = (UUID) v; return; @@ -85,7 +81,7 @@ public void put(int i, Object v) { @Override public Object get(int i) { - switch (positionsToIds[i]) { + switch (AvroUtil.positionToId(i, avroSchema)) { case COMMIT_ID: return commitId; default: diff --git a/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/TableReference.java b/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/TableReference.java index 2348865dbb34..4d8d43e69633 100644 --- a/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/TableReference.java +++ b/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/TableReference.java @@ -39,7 +39,6 @@ public class TableReference implements IndexedRecord { private List namespace; private String name; private final Schema avroSchema; - private final int[] positionsToIds; static final int CATALOG = 10_600; static final int NAMESPACE = 10_601; @@ -52,7 +51,6 @@ public class TableReference implements IndexedRecord { NAMESPACE, "namespace", ListType.ofRequired(NAMESPACE + 1, StringType.get())), NestedField.required(NAME, "name", StringType.get())); private static final Schema AVRO_SCHEMA = AvroUtil.convert(ICEBERG_SCHEMA, TableReference.class); - private static final int[] POSITIONS_TO_IDS = AvroUtil.positionsToIds(AVRO_SCHEMA); public static TableReference of(String catalog, TableIdentifier tableIdentifier) { return new TableReference( @@ -62,7 +60,6 @@ public static TableReference of(String catalog, TableIdentifier tableIdentifier) // Used by Avro reflection to instantiate this class when reading events public TableReference(Schema avroSchema) { this.avroSchema = avroSchema; - this.positionsToIds = AvroUtil.positionsToIds(avroSchema); } public TableReference(String catalog, List namespace, String name) { @@ -70,7 +67,6 @@ public TableReference(String catalog, List namespace, String name) { this.namespace = namespace; this.name = name; this.avroSchema = AVRO_SCHEMA; - this.positionsToIds = POSITIONS_TO_IDS; } public String catalog() { @@ -90,7 +86,7 @@ public Schema getSchema() { @Override @SuppressWarnings("unchecked") public void put(int i, Object v) { - switch (positionsToIds[i]) { + switch (AvroUtil.positionToId(i, avroSchema)) { case CATALOG: this.catalog = v == null ? null : v.toString(); return; @@ -108,7 +104,7 @@ public void put(int i, Object v) { @Override public Object get(int i) { - switch (positionsToIds[i]) { + switch (AvroUtil.positionToId(i, avroSchema)) { case CATALOG: return catalog; case NAMESPACE: diff --git a/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/TopicPartitionOffset.java b/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/TopicPartitionOffset.java index 388e52dde81e..7bcf5327a8b5 100644 --- a/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/TopicPartitionOffset.java +++ b/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/TopicPartitionOffset.java @@ -37,7 +37,6 @@ public class TopicPartitionOffset implements IndexedRecord { private Long offset; private OffsetDateTime timestamp; private final Schema avroSchema; - private final int[] positionsToIds; static final int TOPIC = 10_700; static final int PARTITION = 10_701; @@ -52,12 +51,10 @@ public class TopicPartitionOffset implements IndexedRecord { NestedField.optional(TIMESTAMP, "timestamp", TimestampType.withZone())); private static final Schema AVRO_SCHEMA = AvroUtil.convert(ICEBERG_SCHEMA, TopicPartitionOffset.class); - private static final int[] POSITIONS_TO_IDS = AvroUtil.positionsToIds(AVRO_SCHEMA); // Used by Avro reflection to instantiate this class when reading events public TopicPartitionOffset(Schema avroSchema) { this.avroSchema = avroSchema; - this.positionsToIds = AvroUtil.positionsToIds(avroSchema); } public TopicPartitionOffset(String topic, int partition, Long offset, OffsetDateTime timestamp) { @@ -66,7 +63,6 @@ public TopicPartitionOffset(String topic, int partition, Long offset, OffsetDate this.offset = offset; this.timestamp = timestamp; this.avroSchema = AVRO_SCHEMA; - this.positionsToIds = POSITIONS_TO_IDS; } public String topic() { @@ -92,7 +88,7 @@ public Schema getSchema() { @Override public void put(int i, Object v) { - switch (positionsToIds[i]) { + switch (AvroUtil.positionToId(i, avroSchema)) { case TOPIC: this.topic = v == null ? null : v.toString(); return; @@ -112,7 +108,7 @@ public void put(int i, Object v) { @Override public Object get(int i) { - switch (positionsToIds[i]) { + switch (AvroUtil.positionToId(i, avroSchema)) { case TOPIC: return topic; case PARTITION: