From a1daf9484936f81afb7b1343d33523dfabf2473b Mon Sep 17 00:00:00 2001 From: Bryan Keller Date: Fri, 27 Oct 2023 15:49:15 -0700 Subject: [PATCH] add catalog to ref --- .../connect/events/TableReference.java | 26 ++++++++++++++----- .../events/EventSerializationTest.java | 7 +++-- 2 files changed, 24 insertions(+), 9 deletions(-) diff --git a/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/TableReference.java b/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/TableReference.java index f6acd077ed97..517aba25255e 100644 --- a/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/TableReference.java +++ b/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/TableReference.java @@ -32,6 +32,7 @@ /** Element representing a table identifier, with namespace and name. */ public class TableReference implements Element { + private String catalog; private List namespace; private String name; private final Schema avroSchema; @@ -40,23 +41,28 @@ public class TableReference implements Element { SchemaBuilder.builder() .record(TableReference.class.getName()) .fields() - .name("namespace") + .name("catalog") .prop(AvroSchemaUtil.FIELD_ID_PROP, 1600) .type() + .stringType() + .noDefault() + .name("namespace") + .prop(AvroSchemaUtil.FIELD_ID_PROP, 1601) + .type() .array() .items() .stringType() .noDefault() .name("name") - .prop(AvroSchemaUtil.FIELD_ID_PROP, 1601) + .prop(AvroSchemaUtil.FIELD_ID_PROP, 1602) .type() .stringType() .noDefault() .endRecord(); - public static TableReference of(TableIdentifier tableIdentifier) { + public static TableReference of(String catalog, TableIdentifier tableIdentifier) { return new TableReference( - Arrays.asList(tableIdentifier.namespace().levels()), tableIdentifier.name()); + catalog, Arrays.asList(tableIdentifier.namespace().levels()), tableIdentifier.name()); } // Used by Avro reflection to instantiate this class when reading events @@ -64,7 +70,8 @@ public TableReference(Schema avroSchema) { this.avroSchema = avroSchema; } - public TableReference(List namespace, String name) { + public TableReference(String catalog, List namespace, String name) { + this.catalog = catalog; this.namespace = namespace; this.name = name; this.avroSchema = AVRO_SCHEMA; @@ -85,10 +92,13 @@ public Schema getSchema() { public void put(int i, Object v) { switch (i) { case 0: + this.catalog = v == null ? null : v.toString(); + return; + case 1: this.namespace = v == null ? null : ((List) v).stream().map(Utf8::toString).collect(toList()); return; - case 1: + case 2: this.name = v == null ? null : v.toString(); return; default: @@ -100,8 +110,10 @@ public void put(int i, Object v) { public Object get(int i) { switch (i) { case 0: - return namespace; + return catalog; case 1: + return namespace; + case 2: return name; default: throw new UnsupportedOperationException("Unknown field ordinal: " + i); diff --git a/kafka-connect/kafka-connect-events/src/test/java/org/apache/iceberg/connect/events/EventSerializationTest.java b/kafka-connect/kafka-connect-events/src/test/java/org/apache/iceberg/connect/events/EventSerializationTest.java index 318324d8d60e..8b7a51d63c9f 100644 --- a/kafka-connect/kafka-connect-events/src/test/java/org/apache/iceberg/connect/events/EventSerializationTest.java +++ b/kafka-connect/kafka-connect-events/src/test/java/org/apache/iceberg/connect/events/EventSerializationTest.java @@ -53,7 +53,7 @@ public void testCommitResponseSerialization() { new CommitResponsePayload( StructType.of(), commitId, - new TableReference(Collections.singletonList("db"), "tbl"), + new TableReference("catalog", Collections.singletonList("db"), "tbl"), Arrays.asList(EventTestUtil.createDataFile(), EventTestUtil.createDataFile()), Arrays.asList(EventTestUtil.createDeleteFile(), EventTestUtil.createDeleteFile()))); @@ -101,7 +101,10 @@ public void testCommitTableSerialization() { "cg-connector", EventType.COMMIT_TABLE, new CommitTablePayload( - commitId, new TableReference(Collections.singletonList("db"), "tbl"), 1L, 2L)); + commitId, + new TableReference("catalog", Collections.singletonList("db"), "tbl"), + 1L, + 2L)); byte[] data = Event.encode(event); Event result = Event.decode(data);