From 43acff834b65f0692c6dabb0a550577d6ad49564 Mon Sep 17 00:00:00 2001 From: Bryan Keller Date: Sat, 4 Nov 2023 09:37:07 -0700 Subject: [PATCH] field ID mapping --- .../iceberg/connect/events/AvroUtil.java | 21 ++++++-- .../connect/events/CommitComplete.java | 24 +++++---- .../iceberg/connect/events/CommitToTable.java | 38 ++++++++------ .../iceberg/connect/events/DataComplete.java | 27 ++++++---- .../iceberg/connect/events/DataWritten.java | 49 ++++++++++++------- .../apache/iceberg/connect/events/Event.java | 45 ++++++++++------- .../iceberg/connect/events/StartCommit.java | 17 ++++--- .../connect/events/TableReference.java | 32 +++++++----- .../connect/events/TopicPartitionOffset.java | 38 ++++++++------ 9 files changed, 183 insertions(+), 108 deletions(-) diff --git a/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/AvroUtil.java b/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/AvroUtil.java index fc14a1f28771..84714142524f 100644 --- a/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/AvroUtil.java +++ b/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/AvroUtil.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.io.UncheckedIOException; +import java.util.List; import java.util.Map; import org.apache.avro.Schema; import org.apache.avro.generic.IndexedRecord; @@ -35,17 +36,17 @@ class AvroUtil { static final Map FIELD_ID_TO_CLASS = ImmutableMap.of( - 10_102, + DataComplete.ASSIGNMENTS_ELEMENT, TopicPartitionOffset.class.getName(), DataFile.PARTITION_ID, PartitionData.class.getName(), - 10_301, + DataWritten.TABLE_REFERENCE, TableReference.class.getName(), - 10_303, + DataWritten.DATA_FILES_ELEMENT, "org.apache.iceberg.GenericDataFile", - 10_305, + DataWritten.DELETE_FILES_ELEMENT, "org.apache.iceberg.GenericDeleteFile", - 10_401, + CommitToTable.TABLE_REFERENCE, TableReference.class.getName()); public static byte[] encode(Event event) { @@ -81,5 +82,15 @@ static Schema convert( struct.equals(icebergSchema) ? javaClass.getName() : typeMap.get(fieldId)); } + static int[] positionsToIds(Schema avroSchema) { + int[] result = new int[avroSchema.getFields().size()]; + List fields = avroSchema.getFields(); + for (int i = 0; i < result.length; i++) { + Object val = fields.get(i).getObjectProp(AvroSchemaUtil.FIELD_ID_PROP); + result[i] = val == null ? -1 : (int) val; + } + return result; + } + private AvroUtil() {} } diff --git a/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/CommitComplete.java b/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/CommitComplete.java index 70acadc62ccc..318de8b077a3 100644 --- a/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/CommitComplete.java +++ b/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/CommitComplete.java @@ -37,23 +37,29 @@ public class CommitComplete implements Payload { private UUID commitId; private OffsetDateTime validThroughTs; private final Schema avroSchema; + private final int[] positionsToIds; + + static final int COMMIT_ID = 10_000; + static final int VALID_THROUGH_TS = 10_001; private static final StructType ICEBERG_SCHEMA = StructType.of( - NestedField.required(10_000, "commit_id", UUIDType.get()), - NestedField.optional(10_001, "valid_through_ts", TimestampType.withZone())); - + NestedField.required(COMMIT_ID, "commit_id", UUIDType.get()), + NestedField.optional(VALID_THROUGH_TS, "valid_through_ts", TimestampType.withZone())); private static final Schema AVRO_SCHEMA = AvroUtil.convert(ICEBERG_SCHEMA, CommitComplete.class); + private static final int[] POSITIONS_TO_IDS = AvroUtil.positionsToIds(AVRO_SCHEMA); // Used by Avro reflection to instantiate this class when reading events public CommitComplete(Schema avroSchema) { this.avroSchema = avroSchema; + this.positionsToIds = AvroUtil.positionsToIds(avroSchema); } public CommitComplete(UUID commitId, OffsetDateTime validThroughTs) { this.commitId = commitId; this.validThroughTs = validThroughTs; this.avroSchema = AVRO_SCHEMA; + this.positionsToIds = POSITIONS_TO_IDS; } @Override @@ -85,11 +91,11 @@ public Schema getSchema() { @Override public void put(int i, Object v) { - switch (i) { - case 0: + switch (positionsToIds[i]) { + case COMMIT_ID: this.commitId = (UUID) v; return; - case 1: + case VALID_THROUGH_TS: this.validThroughTs = v == null ? null : DateTimeUtil.timestamptzFromMicros((Long) v); return; default: @@ -99,10 +105,10 @@ public void put(int i, Object v) { @Override public Object get(int i) { - switch (i) { - case 0: + switch (positionsToIds[i]) { + case COMMIT_ID: return commitId; - case 1: + case VALID_THROUGH_TS: return validThroughTs == null ? null : DateTimeUtil.microsFromTimestamptz(validThroughTs); default: throw new UnsupportedOperationException("Unknown field ordinal: " + i); diff --git a/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/CommitToTable.java b/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/CommitToTable.java index ece42be6a357..9df2bb12f98d 100644 --- a/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/CommitToTable.java +++ b/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/CommitToTable.java @@ -40,19 +40,26 @@ public class CommitToTable implements Payload { private Long snapshotId; private OffsetDateTime validThroughTs; private final Schema avroSchema; + private final int[] positionsToIds; + + static final int COMMIT_ID = 10_400; + static final int TABLE_REFERENCE = 10_401; + static final int SNAPSHOT_ID = 10_402; + static final int VALID_THROUGH_TS = 10_403; private static final StructType ICEBERG_SCHEMA = StructType.of( - NestedField.required(10_400, "commit_id", UUIDType.get()), - NestedField.required(10_401, "table_reference", TableReference.ICEBERG_SCHEMA), - NestedField.optional(10_402, "snapshot_id", LongType.get()), - NestedField.optional(10_403, "valid_through_ts", TimestampType.withZone())); - + NestedField.required(COMMIT_ID, "commit_id", UUIDType.get()), + NestedField.required(TABLE_REFERENCE, "table_reference", TableReference.ICEBERG_SCHEMA), + NestedField.optional(SNAPSHOT_ID, "snapshot_id", LongType.get()), + NestedField.optional(VALID_THROUGH_TS, "valid_through_ts", TimestampType.withZone())); private static final Schema AVRO_SCHEMA = AvroUtil.convert(ICEBERG_SCHEMA, CommitToTable.class); + private static final int[] POSITIONS_TO_IDS = AvroUtil.positionsToIds(AVRO_SCHEMA); // Used by Avro reflection to instantiate this class when reading events public CommitToTable(Schema avroSchema) { this.avroSchema = avroSchema; + this.positionsToIds = AvroUtil.positionsToIds(avroSchema); } public CommitToTable( @@ -65,6 +72,7 @@ public CommitToTable( this.snapshotId = snapshotId; this.validThroughTs = validThroughTs; this.avroSchema = AVRO_SCHEMA; + this.positionsToIds = POSITIONS_TO_IDS; } @Override @@ -100,17 +108,17 @@ public Schema getSchema() { @Override public void put(int i, Object v) { - switch (i) { - case 0: + switch (positionsToIds[i]) { + case COMMIT_ID: this.commitId = (UUID) v; return; - case 1: + case TABLE_REFERENCE: this.tableReference = (TableReference) v; return; - case 2: + case SNAPSHOT_ID: this.snapshotId = (Long) v; return; - case 3: + case VALID_THROUGH_TS: this.validThroughTs = v == null ? null : DateTimeUtil.timestamptzFromMicros((Long) v); return; default: @@ -120,14 +128,14 @@ public void put(int i, Object v) { @Override public Object get(int i) { - switch (i) { - case 0: + switch (positionsToIds[i]) { + case COMMIT_ID: return commitId; - case 1: + case TABLE_REFERENCE: return tableReference; - case 2: + case SNAPSHOT_ID: return snapshotId; - case 3: + case VALID_THROUGH_TS: return validThroughTs == null ? null : DateTimeUtil.microsFromTimestamptz(validThroughTs); default: throw new UnsupportedOperationException("Unknown field ordinal: " + i); diff --git a/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/DataComplete.java b/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/DataComplete.java index 23555892eff1..a4b763b7583d 100644 --- a/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/DataComplete.java +++ b/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/DataComplete.java @@ -35,26 +35,33 @@ public class DataComplete implements Payload { private UUID commitId; private List assignments; private final Schema avroSchema; + private final int[] positionsToIds; + + static final int COMMIT_ID = 10_100; + static final int ASSIGNMENTS = 10_101; + static final int ASSIGNMENTS_ELEMENT = 10_102; private static final StructType ICEBERG_SCHEMA = StructType.of( - NestedField.required(10_100, "commit_id", UUIDType.get()), + NestedField.required(COMMIT_ID, "commit_id", UUIDType.get()), NestedField.optional( - 10_101, + ASSIGNMENTS, "assignments", - ListType.ofRequired(10_102, TopicPartitionOffset.ICEBERG_SCHEMA))); - + ListType.ofRequired(ASSIGNMENTS_ELEMENT, TopicPartitionOffset.ICEBERG_SCHEMA))); private static final Schema AVRO_SCHEMA = AvroUtil.convert(ICEBERG_SCHEMA, DataComplete.class); + private static final int[] POSITIONS_TO_IDS = AvroUtil.positionsToIds(AVRO_SCHEMA); // Used by Avro reflection to instantiate this class when reading events public DataComplete(Schema avroSchema) { this.avroSchema = avroSchema; + this.positionsToIds = AvroUtil.positionsToIds(avroSchema); } public DataComplete(UUID commitId, List assignments) { this.commitId = commitId; this.assignments = assignments; this.avroSchema = AVRO_SCHEMA; + this.positionsToIds = POSITIONS_TO_IDS; } @Override @@ -83,11 +90,11 @@ public Schema getSchema() { @Override @SuppressWarnings("unchecked") public void put(int i, Object v) { - switch (i) { - case 0: + switch (positionsToIds[i]) { + case COMMIT_ID: this.commitId = (UUID) v; return; - case 1: + case ASSIGNMENTS: this.assignments = (List) v; return; default: @@ -97,10 +104,10 @@ public void put(int i, Object v) { @Override public Object get(int i) { - switch (i) { - case 0: + switch (positionsToIds[i]) { + case COMMIT_ID: return commitId; - case 1: + case ASSIGNMENTS: return assignments; default: throw new UnsupportedOperationException("Unknown field ordinal: " + i); diff --git a/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/DataWritten.java b/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/DataWritten.java index 89887888442d..c79b0c9f9425 100644 --- a/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/DataWritten.java +++ b/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/DataWritten.java @@ -41,12 +41,21 @@ public class DataWritten implements Payload { private List dataFiles; private List deleteFiles; private StructType icebergSchema; - private Schema avroSchema; + private final Schema avroSchema; + private final int[] positionsToIds; + + static final int COMMIT_ID = 10_300; + static final int TABLE_REFERENCE = 10_301; + static final int DATA_FILES = 10_302; + static final int DATA_FILES_ELEMENT = 10_303; + static final int DELETE_FILES = 10_304; + static final int DELETE_FILES_ELEMENT = 10_304; // Used by Avro reflection to instantiate this class when reading events, note that this does not // set the partition type so the instance cannot be re-serialized public DataWritten(Schema avroSchema) { this.avroSchema = avroSchema; + this.positionsToIds = AvroUtil.positionsToIds(avroSchema); } public DataWritten( @@ -60,6 +69,8 @@ public DataWritten( this.tableReference = tableReference; this.dataFiles = dataFiles; this.deleteFiles = deleteFiles; + this.avroSchema = AvroUtil.convert(writeSchema(), getClass()); + this.positionsToIds = AvroUtil.positionsToIds(avroSchema); } @Override @@ -90,12 +101,17 @@ public StructType writeSchema() { this.icebergSchema = StructType.of( - NestedField.required(10_300, "commit_id", UUIDType.get()), - NestedField.required(10_301, "table_reference", TableReference.ICEBERG_SCHEMA), + NestedField.required(COMMIT_ID, "commit_id", UUIDType.get()), + NestedField.required( + TABLE_REFERENCE, "table_reference", TableReference.ICEBERG_SCHEMA), NestedField.optional( - 10_302, "data_files", ListType.ofRequired(10_303, dataFileStruct)), + DATA_FILES, + "data_files", + ListType.ofRequired(DATA_FILES_ELEMENT, dataFileStruct)), NestedField.optional( - 10_304, "delete_files", ListType.ofRequired(10_305, dataFileStruct))); + DELETE_FILES, + "delete_files", + ListType.ofRequired(DELETE_FILES_ELEMENT, dataFileStruct))); } return icebergSchema; @@ -103,26 +119,23 @@ public StructType writeSchema() { @Override public Schema getSchema() { - if (avroSchema == null) { - this.avroSchema = AvroUtil.convert(writeSchema(), getClass()); - } return avroSchema; } @Override @SuppressWarnings("unchecked") public void put(int i, Object v) { - switch (i) { - case 0: + switch (positionsToIds[i]) { + case COMMIT_ID: this.commitId = (UUID) v; return; - case 1: + case TABLE_REFERENCE: this.tableReference = (TableReference) v; return; - case 2: + case DATA_FILES: this.dataFiles = (List) v; return; - case 3: + case DELETE_FILES: this.deleteFiles = (List) v; return; default: @@ -132,14 +145,14 @@ public void put(int i, Object v) { @Override public Object get(int i) { - switch (i) { - case 0: + switch (positionsToIds[i]) { + case COMMIT_ID: return commitId; - case 1: + case TABLE_REFERENCE: return tableReference; - case 2: + case DATA_FILES: return dataFiles; - case 3: + case DELETE_FILES: return deleteFiles; default: throw new UnsupportedOperationException("Unknown field ordinal: " + i); diff --git a/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/Event.java b/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/Event.java index ea8cb6a9de92..8d2a46509650 100644 --- a/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/Event.java +++ b/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/Event.java @@ -46,10 +46,18 @@ public class Event implements IndexedRecord { private String groupId; private Payload payload; private final Schema avroSchema; + private final int[] positionsToIds; + + static final int ID = 10_500; + static final int TYPE = 10_501; + static final int TIMESTAMP = 10_502; + static final int GROUP_ID = 10_503; + static final int PAYLOAD = 10_504; // Used by Avro reflection to instantiate this class when reading events public Event(Schema avroSchema) { this.avroSchema = avroSchema; + this.positionsToIds = AvroUtil.positionsToIds(avroSchema); } public Event(String groupId, Payload payload) { @@ -61,16 +69,17 @@ public Event(String groupId, Payload payload) { StructType icebergSchema = StructType.of( - NestedField.required(10_500, "id", UUIDType.get()), - NestedField.required(10_501, "type", IntegerType.get()), - NestedField.required(10_502, "timestamp", TimestampType.withZone()), - NestedField.required(10_503, "group_id", StringType.get()), - NestedField.required(10_504, "payload", payload.writeSchema())); + NestedField.required(ID, "id", UUIDType.get()), + NestedField.required(TYPE, "type", IntegerType.get()), + NestedField.required(TIMESTAMP, "timestamp", TimestampType.withZone()), + NestedField.required(GROUP_ID, "group_id", StringType.get()), + NestedField.required(PAYLOAD, "payload", payload.writeSchema())); Map typeMap = Maps.newHashMap(AvroUtil.FIELD_ID_TO_CLASS); - typeMap.put(10_504, payload.getClass().getName()); + typeMap.put(PAYLOAD, payload.getClass().getName()); this.avroSchema = AvroUtil.convert(icebergSchema, getClass(), typeMap); + this.positionsToIds = AvroUtil.positionsToIds(avroSchema); } public UUID id() { @@ -100,20 +109,20 @@ public Schema getSchema() { @Override public void put(int i, Object v) { - switch (i) { - case 0: + switch (positionsToIds[i]) { + case ID: this.id = (UUID) v; return; - case 1: + case TYPE: this.type = v == null ? null : PayloadType.values()[(Integer) v]; return; - case 2: + case TIMESTAMP: this.timestamp = v == null ? null : DateTimeUtil.timestamptzFromMicros((Long) v); return; - case 3: + case GROUP_ID: this.groupId = v == null ? null : v.toString(); return; - case 4: + case PAYLOAD: this.payload = (Payload) v; return; default: @@ -123,16 +132,16 @@ public void put(int i, Object v) { @Override public Object get(int i) { - switch (i) { - case 0: + switch (positionsToIds[i]) { + case ID: return id; - case 1: + case TYPE: return type == null ? null : type.id(); - case 2: + case TIMESTAMP: return timestamp == null ? null : DateTimeUtil.microsFromTimestamptz(timestamp); - case 3: + case GROUP_ID: return groupId; - case 4: + case PAYLOAD: return payload; default: throw new UnsupportedOperationException("Unknown field ordinal: " + i); diff --git a/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/StartCommit.java b/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/StartCommit.java index 3c46ec88b999..ef9c4b694f03 100644 --- a/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/StartCommit.java +++ b/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/StartCommit.java @@ -32,20 +32,25 @@ public class StartCommit implements Payload { private UUID commitId; private final Schema avroSchema; + private final int[] positionsToIds; - private static final StructType ICEBERG_SCHEMA = - StructType.of(NestedField.required(10_200, "commit_id", UUIDType.get())); + static final int COMMIT_ID = 10_200; + private static final StructType ICEBERG_SCHEMA = + StructType.of(NestedField.required(COMMIT_ID, "commit_id", UUIDType.get())); private static final Schema AVRO_SCHEMA = AvroUtil.convert(ICEBERG_SCHEMA, StartCommit.class); + private static final int[] POSITIONS_TO_IDS = AvroUtil.positionsToIds(AVRO_SCHEMA); // Used by Avro reflection to instantiate this class when reading events public StartCommit(Schema avroSchema) { this.avroSchema = avroSchema; + this.positionsToIds = AvroUtil.positionsToIds(avroSchema); } public StartCommit(UUID commitId) { this.commitId = commitId; this.avroSchema = AVRO_SCHEMA; + this.positionsToIds = POSITIONS_TO_IDS; } @Override @@ -69,8 +74,8 @@ public Schema getSchema() { @Override public void put(int i, Object v) { - switch (i) { - case 0: + switch (positionsToIds[i]) { + case COMMIT_ID: this.commitId = (UUID) v; return; default: @@ -80,8 +85,8 @@ public void put(int i, Object v) { @Override public Object get(int i) { - switch (i) { - case 0: + switch (positionsToIds[i]) { + case COMMIT_ID: return commitId; default: throw new UnsupportedOperationException("Unknown field ordinal: " + i); diff --git a/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/TableReference.java b/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/TableReference.java index f741658cd1f4..2348865dbb34 100644 --- a/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/TableReference.java +++ b/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/TableReference.java @@ -39,14 +39,20 @@ public class TableReference implements IndexedRecord { private List namespace; private String name; private final Schema avroSchema; + private final int[] positionsToIds; + + static final int CATALOG = 10_600; + static final int NAMESPACE = 10_601; + static final int NAME = 10_603; public static final StructType ICEBERG_SCHEMA = StructType.of( - NestedField.required(10_600, "catalog", StringType.get()), - NestedField.required(10_601, "namespace", ListType.ofRequired(10_602, StringType.get())), - NestedField.required(10_603, "name", StringType.get())); - + NestedField.required(CATALOG, "catalog", StringType.get()), + NestedField.required( + NAMESPACE, "namespace", ListType.ofRequired(NAMESPACE + 1, StringType.get())), + NestedField.required(NAME, "name", StringType.get())); private static final Schema AVRO_SCHEMA = AvroUtil.convert(ICEBERG_SCHEMA, TableReference.class); + private static final int[] POSITIONS_TO_IDS = AvroUtil.positionsToIds(AVRO_SCHEMA); public static TableReference of(String catalog, TableIdentifier tableIdentifier) { return new TableReference( @@ -56,6 +62,7 @@ public static TableReference of(String catalog, TableIdentifier tableIdentifier) // Used by Avro reflection to instantiate this class when reading events public TableReference(Schema avroSchema) { this.avroSchema = avroSchema; + this.positionsToIds = AvroUtil.positionsToIds(avroSchema); } public TableReference(String catalog, List namespace, String name) { @@ -63,6 +70,7 @@ public TableReference(String catalog, List namespace, String name) { this.namespace = namespace; this.name = name; this.avroSchema = AVRO_SCHEMA; + this.positionsToIds = POSITIONS_TO_IDS; } public String catalog() { @@ -82,15 +90,15 @@ public Schema getSchema() { @Override @SuppressWarnings("unchecked") public void put(int i, Object v) { - switch (i) { - case 0: + switch (positionsToIds[i]) { + case CATALOG: this.catalog = v == null ? null : v.toString(); return; - case 1: + case NAMESPACE: this.namespace = v == null ? null : ((List) v).stream().map(Utf8::toString).collect(toList()); return; - case 2: + case NAME: this.name = v == null ? null : v.toString(); return; default: @@ -100,12 +108,12 @@ public void put(int i, Object v) { @Override public Object get(int i) { - switch (i) { - case 0: + switch (positionsToIds[i]) { + case CATALOG: return catalog; - case 1: + case NAMESPACE: return namespace; - case 2: + case NAME: return name; default: throw new UnsupportedOperationException("Unknown field ordinal: " + i); diff --git a/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/TopicPartitionOffset.java b/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/TopicPartitionOffset.java index efb1c085d592..388e52dde81e 100644 --- a/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/TopicPartitionOffset.java +++ b/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/TopicPartitionOffset.java @@ -37,20 +37,27 @@ public class TopicPartitionOffset implements IndexedRecord { private Long offset; private OffsetDateTime timestamp; private final Schema avroSchema; + private final int[] positionsToIds; + + static final int TOPIC = 10_700; + static final int PARTITION = 10_701; + static final int OFFSET = 10_702; + static final int TIMESTAMP = 10_703; public static final StructType ICEBERG_SCHEMA = StructType.of( - NestedField.required(10_700, "topic", StringType.get()), - NestedField.required(10_701, "partition", IntegerType.get()), - NestedField.optional(10_702, "offset", LongType.get()), - NestedField.optional(10_703, "timestamp", TimestampType.withZone())); - + NestedField.required(TOPIC, "topic", StringType.get()), + NestedField.required(PARTITION, "partition", IntegerType.get()), + NestedField.optional(OFFSET, "offset", LongType.get()), + NestedField.optional(TIMESTAMP, "timestamp", TimestampType.withZone())); private static final Schema AVRO_SCHEMA = AvroUtil.convert(ICEBERG_SCHEMA, TopicPartitionOffset.class); + private static final int[] POSITIONS_TO_IDS = AvroUtil.positionsToIds(AVRO_SCHEMA); // Used by Avro reflection to instantiate this class when reading events public TopicPartitionOffset(Schema avroSchema) { this.avroSchema = avroSchema; + this.positionsToIds = AvroUtil.positionsToIds(avroSchema); } public TopicPartitionOffset(String topic, int partition, Long offset, OffsetDateTime timestamp) { @@ -59,6 +66,7 @@ public TopicPartitionOffset(String topic, int partition, Long offset, OffsetDate this.offset = offset; this.timestamp = timestamp; this.avroSchema = AVRO_SCHEMA; + this.positionsToIds = POSITIONS_TO_IDS; } public String topic() { @@ -84,17 +92,17 @@ public Schema getSchema() { @Override public void put(int i, Object v) { - switch (i) { - case 0: + switch (positionsToIds[i]) { + case TOPIC: this.topic = v == null ? null : v.toString(); return; - case 1: + case PARTITION: this.partition = (Integer) v; return; - case 2: + case OFFSET: this.offset = (Long) v; return; - case 3: + case TIMESTAMP: this.timestamp = v == null ? null : DateTimeUtil.timestamptzFromMicros((Long) v); return; default: @@ -104,14 +112,14 @@ public void put(int i, Object v) { @Override public Object get(int i) { - switch (i) { - case 0: + switch (positionsToIds[i]) { + case TOPIC: return topic; - case 1: + case PARTITION: return partition; - case 2: + case OFFSET: return offset; - case 3: + case TIMESTAMP: return timestamp == null ? null : DateTimeUtil.microsFromTimestamptz(timestamp); default: throw new UnsupportedOperationException("Unknown field ordinal: " + i);