diff --git a/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/TableReference.java b/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/TableReference.java index f6acd077ed97..c40bafb8bf08 100644 --- a/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/TableReference.java +++ b/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/TableReference.java @@ -32,6 +32,7 @@ /** Element representing a table identifier, with namespace and name. */ public class TableReference implements Element { + private String catalog; private List namespace; private String name; private final Schema avroSchema; @@ -40,23 +41,28 @@ public class TableReference implements Element { SchemaBuilder.builder() .record(TableReference.class.getName()) .fields() - .name("namespace") + .name("catalog") .prop(AvroSchemaUtil.FIELD_ID_PROP, 1600) .type() + .stringType() + .noDefault() + .name("namespace") + .prop(AvroSchemaUtil.FIELD_ID_PROP, 1601) + .type() .array() .items() .stringType() .noDefault() .name("name") - .prop(AvroSchemaUtil.FIELD_ID_PROP, 1601) + .prop(AvroSchemaUtil.FIELD_ID_PROP, 1602) .type() .stringType() .noDefault() .endRecord(); - public static TableReference of(TableIdentifier tableIdentifier) { + public static TableReference of(String catalog, TableIdentifier tableIdentifier) { return new TableReference( - Arrays.asList(tableIdentifier.namespace().levels()), tableIdentifier.name()); + catalog, Arrays.asList(tableIdentifier.namespace().levels()), tableIdentifier.name()); } // Used by Avro reflection to instantiate this class when reading events @@ -64,13 +70,18 @@ public TableReference(Schema avroSchema) { this.avroSchema = avroSchema; } - public TableReference(List namespace, String name) { + public TableReference(String catalog, List namespace, String name) { + this.catalog = catalog; this.namespace = namespace; this.name = name; this.avroSchema = AVRO_SCHEMA; } - public TableIdentifier toIdentifier() { + public String catalog() { + return catalog; + } + + public TableIdentifier identifier() { Namespace icebergNamespace = Namespace.of(namespace.toArray(new String[0])); return TableIdentifier.of(icebergNamespace, name); } @@ -85,10 +96,13 @@ public Schema getSchema() { public void put(int i, Object v) { switch (i) { case 0: + this.catalog = v == null ? null : v.toString(); + return; + case 1: this.namespace = v == null ? null : ((List) v).stream().map(Utf8::toString).collect(toList()); return; - case 1: + case 2: this.name = v == null ? null : v.toString(); return; default: @@ -100,8 +114,10 @@ public void put(int i, Object v) { public Object get(int i) { switch (i) { case 0: - return namespace; + return catalog; case 1: + return namespace; + case 2: return name; default: throw new UnsupportedOperationException("Unknown field ordinal: " + i); diff --git a/kafka-connect/kafka-connect-events/src/test/java/org/apache/iceberg/connect/events/EventSerializationTest.java b/kafka-connect/kafka-connect-events/src/test/java/org/apache/iceberg/connect/events/EventSerializationTest.java index 318324d8d60e..d99754d96182 100644 --- a/kafka-connect/kafka-connect-events/src/test/java/org/apache/iceberg/connect/events/EventSerializationTest.java +++ b/kafka-connect/kafka-connect-events/src/test/java/org/apache/iceberg/connect/events/EventSerializationTest.java @@ -53,7 +53,7 @@ public void testCommitResponseSerialization() { new CommitResponsePayload( StructType.of(), commitId, - new TableReference(Collections.singletonList("db"), "tbl"), + new TableReference("catalog", Collections.singletonList("db"), "tbl"), Arrays.asList(EventTestUtil.createDataFile(), EventTestUtil.createDataFile()), Arrays.asList(EventTestUtil.createDeleteFile(), EventTestUtil.createDeleteFile()))); @@ -63,7 +63,8 @@ public void testCommitResponseSerialization() { assertThat(result.type()).isEqualTo(event.type()); CommitResponsePayload payload = (CommitResponsePayload) result.payload(); assertThat(payload.commitId()).isEqualTo(commitId); - assertThat(payload.tableReference().toIdentifier()).isEqualTo(TableIdentifier.parse("db.tbl")); + assertThat(payload.tableReference().catalog()).isEqualTo("catalog"); + assertThat(payload.tableReference().identifier()).isEqualTo(TableIdentifier.parse("db.tbl")); assertThat(payload.dataFiles()).hasSize(2); assertThat(payload.dataFiles()).allMatch(f -> f.specId() == 1); assertThat(payload.deleteFiles()).hasSize(2); @@ -101,7 +102,10 @@ public void testCommitTableSerialization() { "cg-connector", EventType.COMMIT_TABLE, new CommitTablePayload( - commitId, new TableReference(Collections.singletonList("db"), "tbl"), 1L, 2L)); + commitId, + new TableReference("catalog", Collections.singletonList("db"), "tbl"), + 1L, + 2L)); byte[] data = Event.encode(event); Event result = Event.decode(data); @@ -109,7 +113,8 @@ public void testCommitTableSerialization() { assertThat(result.type()).isEqualTo(event.type()); CommitTablePayload payload = (CommitTablePayload) result.payload(); assertThat(payload.commitId()).isEqualTo(commitId); - assertThat(payload.tableReference().toIdentifier()).isEqualTo(TableIdentifier.parse("db.tbl")); + assertThat(payload.tableReference().catalog()).isEqualTo("catalog"); + assertThat(payload.tableReference().identifier()).isEqualTo(TableIdentifier.parse("db.tbl")); assertThat(payload.snapshotId()).isEqualTo(1L); assertThat(payload.validThroughTs()).isEqualTo(2L); }