Skip to content

Commit

Permalink
add catalog to ref
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanck committed Oct 27, 2023
1 parent 94fb590 commit ff5d269
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
/** Element representing a table identifier, with namespace and name. */
public class TableReference implements Element {

private String catalog;
private List<String> namespace;
private String name;
private final Schema avroSchema;
Expand All @@ -40,37 +41,47 @@ public class TableReference implements Element {
SchemaBuilder.builder()
.record(TableReference.class.getName())
.fields()
.name("namespace")
.name("catalog")
.prop(AvroSchemaUtil.FIELD_ID_PROP, 1600)
.type()
.stringType()
.noDefault()
.name("namespace")
.prop(AvroSchemaUtil.FIELD_ID_PROP, 1601)
.type()
.array()
.items()
.stringType()
.noDefault()
.name("name")
.prop(AvroSchemaUtil.FIELD_ID_PROP, 1601)
.prop(AvroSchemaUtil.FIELD_ID_PROP, 1602)
.type()
.stringType()
.noDefault()
.endRecord();

public static TableReference of(TableIdentifier tableIdentifier) {
public static TableReference of(String catalog, TableIdentifier tableIdentifier) {
return new TableReference(
Arrays.asList(tableIdentifier.namespace().levels()), tableIdentifier.name());
catalog, Arrays.asList(tableIdentifier.namespace().levels()), tableIdentifier.name());
}

// Used by Avro reflection to instantiate this class when reading events
public TableReference(Schema avroSchema) {
this.avroSchema = avroSchema;
}

public TableReference(List<String> namespace, String name) {
public TableReference(String catalog, List<String> namespace, String name) {
this.catalog = catalog;
this.namespace = namespace;
this.name = name;
this.avroSchema = AVRO_SCHEMA;
}

public TableIdentifier toIdentifier() {
public String catalog() {
return catalog;
}

public TableIdentifier identifier() {
Namespace icebergNamespace = Namespace.of(namespace.toArray(new String[0]));
return TableIdentifier.of(icebergNamespace, name);
}
Expand All @@ -85,10 +96,13 @@ public Schema getSchema() {
public void put(int i, Object v) {
switch (i) {
case 0:
this.catalog = v == null ? null : v.toString();
return;
case 1:
this.namespace =
v == null ? null : ((List<Utf8>) v).stream().map(Utf8::toString).collect(toList());
return;
case 1:
case 2:
this.name = v == null ? null : v.toString();
return;
default:
Expand All @@ -100,8 +114,10 @@ public void put(int i, Object v) {
public Object get(int i) {
switch (i) {
case 0:
return namespace;
return catalog;
case 1:
return namespace;
case 2:
return name;
default:
throw new UnsupportedOperationException("Unknown field ordinal: " + i);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public void testCommitResponseSerialization() {
new CommitResponsePayload(
StructType.of(),
commitId,
new TableReference(Collections.singletonList("db"), "tbl"),
new TableReference("catalog", Collections.singletonList("db"), "tbl"),
Arrays.asList(EventTestUtil.createDataFile(), EventTestUtil.createDataFile()),
Arrays.asList(EventTestUtil.createDeleteFile(), EventTestUtil.createDeleteFile())));

Expand All @@ -63,7 +63,8 @@ public void testCommitResponseSerialization() {
assertThat(result.type()).isEqualTo(event.type());
CommitResponsePayload payload = (CommitResponsePayload) result.payload();
assertThat(payload.commitId()).isEqualTo(commitId);
assertThat(payload.tableReference().toIdentifier()).isEqualTo(TableIdentifier.parse("db.tbl"));
assertThat(payload.tableReference().catalog()).isEqualTo("catalog");
assertThat(payload.tableReference().identifier()).isEqualTo(TableIdentifier.parse("db.tbl"));
assertThat(payload.dataFiles()).hasSize(2);
assertThat(payload.dataFiles()).allMatch(f -> f.specId() == 1);
assertThat(payload.deleteFiles()).hasSize(2);
Expand Down Expand Up @@ -101,15 +102,19 @@ public void testCommitTableSerialization() {
"cg-connector",
EventType.COMMIT_TABLE,
new CommitTablePayload(
commitId, new TableReference(Collections.singletonList("db"), "tbl"), 1L, 2L));
commitId,
new TableReference("catalog", Collections.singletonList("db"), "tbl"),
1L,
2L));

byte[] data = Event.encode(event);
Event result = Event.decode(data);

assertThat(result.type()).isEqualTo(event.type());
CommitTablePayload payload = (CommitTablePayload) result.payload();
assertThat(payload.commitId()).isEqualTo(commitId);
assertThat(payload.tableReference().toIdentifier()).isEqualTo(TableIdentifier.parse("db.tbl"));
assertThat(payload.tableReference().catalog()).isEqualTo("catalog");
assertThat(payload.tableReference().identifier()).isEqualTo(TableIdentifier.parse("db.tbl"));
assertThat(payload.snapshotId()).isEqualTo(1L);
assertThat(payload.validThroughTs()).isEqualTo(2L);
}
Expand Down

0 comments on commit ff5d269

Please sign in to comment.