Skip to content

Commit

Permalink
add catalog to ref
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanck committed Oct 27, 2023
1 parent 94fb590 commit a1daf94
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
/** Element representing a table identifier, with namespace and name. */
public class TableReference implements Element {

private String catalog;
private List<String> namespace;
private String name;
private final Schema avroSchema;
Expand All @@ -40,31 +41,37 @@ public class TableReference implements Element {
SchemaBuilder.builder()
.record(TableReference.class.getName())
.fields()
.name("namespace")
.name("catalog")
.prop(AvroSchemaUtil.FIELD_ID_PROP, 1600)
.type()
.stringType()
.noDefault()
.name("namespace")
.prop(AvroSchemaUtil.FIELD_ID_PROP, 1601)
.type()
.array()
.items()
.stringType()
.noDefault()
.name("name")
.prop(AvroSchemaUtil.FIELD_ID_PROP, 1601)
.prop(AvroSchemaUtil.FIELD_ID_PROP, 1602)
.type()
.stringType()
.noDefault()
.endRecord();

public static TableReference of(TableIdentifier tableIdentifier) {
public static TableReference of(String catalog, TableIdentifier tableIdentifier) {
return new TableReference(
Arrays.asList(tableIdentifier.namespace().levels()), tableIdentifier.name());
catalog, Arrays.asList(tableIdentifier.namespace().levels()), tableIdentifier.name());
}

// Used by Avro reflection to instantiate this class when reading events
public TableReference(Schema avroSchema) {
this.avroSchema = avroSchema;
}

public TableReference(List<String> namespace, String name) {
public TableReference(String catalog, List<String> namespace, String name) {
this.catalog = catalog;
this.namespace = namespace;
this.name = name;
this.avroSchema = AVRO_SCHEMA;
Expand All @@ -85,10 +92,13 @@ public Schema getSchema() {
public void put(int i, Object v) {
switch (i) {
case 0:
this.catalog = v == null ? null : v.toString();
return;
case 1:
this.namespace =
v == null ? null : ((List<Utf8>) v).stream().map(Utf8::toString).collect(toList());
return;
case 1:
case 2:
this.name = v == null ? null : v.toString();
return;
default:
Expand All @@ -100,8 +110,10 @@ public void put(int i, Object v) {
public Object get(int i) {
switch (i) {
case 0:
return namespace;
return catalog;
case 1:
return namespace;
case 2:
return name;
default:
throw new UnsupportedOperationException("Unknown field ordinal: " + i);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public void testCommitResponseSerialization() {
new CommitResponsePayload(
StructType.of(),
commitId,
new TableReference(Collections.singletonList("db"), "tbl"),
new TableReference("catalog", Collections.singletonList("db"), "tbl"),
Arrays.asList(EventTestUtil.createDataFile(), EventTestUtil.createDataFile()),
Arrays.asList(EventTestUtil.createDeleteFile(), EventTestUtil.createDeleteFile())));

Expand Down Expand Up @@ -101,7 +101,10 @@ public void testCommitTableSerialization() {
"cg-connector",
EventType.COMMIT_TABLE,
new CommitTablePayload(
commitId, new TableReference(Collections.singletonList("db"), "tbl"), 1L, 2L));
commitId,
new TableReference("catalog", Collections.singletonList("db"), "tbl"),
1L,
2L));

byte[] data = Event.encode(event);
Event result = Event.decode(data);
Expand Down

0 comments on commit a1daf94

Please sign in to comment.