Skip to content

Commit

Permalink
field IDs
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanck committed Oct 21, 2023
1 parent b5e9312 commit 0808530
Show file tree
Hide file tree
Showing 9 changed files with 31 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.UUID;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.iceberg.avro.AvroSchemaUtil;

/**
* A control event payload for events sent by a coordinator that indicates it has completed a commit
Expand All @@ -38,11 +39,11 @@ public class CommitCompletePayload implements Payload {
.record(CommitCompletePayload.class.getName())
.fields()
.name("commitId")
.prop(FIELD_ID_PROP, DUMMY_FIELD_ID)
.prop(AvroSchemaUtil.FIELD_ID_PROP, 1000)
.type(UUID_SCHEMA)
.noDefault()
.name("vtts")
.prop(FIELD_ID_PROP, DUMMY_FIELD_ID)
.prop(AvroSchemaUtil.FIELD_ID_PROP, 1001)
.type()
.nullable()
.longType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.UUID;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.iceberg.avro.AvroSchemaUtil;

/**
* A control event payload for events sent by a worker that indicates it has finished sending all
Expand All @@ -38,11 +39,11 @@ public class CommitReadyPayload implements Payload {
.record(CommitReadyPayload.class.getName())
.fields()
.name("commitId")
.prop(FIELD_ID_PROP, DUMMY_FIELD_ID)
.prop(AvroSchemaUtil.FIELD_ID_PROP, 1100)
.type(UUID_SCHEMA)
.noDefault()
.name("assignments")
.prop(FIELD_ID_PROP, DUMMY_FIELD_ID)
.prop(AvroSchemaUtil.FIELD_ID_PROP, 1101)
.type()
.nullable()
.array()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.UUID;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.iceberg.avro.AvroSchemaUtil;

/**
* A control event payload for events sent by a coordinator to request workers to send back the
Expand All @@ -36,7 +37,7 @@ public class CommitRequestPayload implements Payload {
.record(CommitRequestPayload.class.getName())
.fields()
.name("commitId")
.prop(FIELD_ID_PROP, DUMMY_FIELD_ID)
.prop(AvroSchemaUtil.FIELD_ID_PROP, 1200)
.type(UUID_SCHEMA)
.noDefault()
.endRecord();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,22 +81,22 @@ public CommitResponsePayload(
.record(getClass().getName())
.fields()
.name("commitId")
.prop(FIELD_ID_PROP, DUMMY_FIELD_ID)
.prop(AvroSchemaUtil.FIELD_ID_PROP, 1300)
.type(UUID_SCHEMA)
.noDefault()
.name("tableName")
.prop(FIELD_ID_PROP, DUMMY_FIELD_ID)
.prop(AvroSchemaUtil.FIELD_ID_PROP, 1301)
.type(TableName.AVRO_SCHEMA)
.noDefault()
.name("dataFiles")
.prop(FIELD_ID_PROP, DUMMY_FIELD_ID)
.prop(AvroSchemaUtil.FIELD_ID_PROP, 1302)
.type()
.nullable()
.array()
.items(dataFileSchema)
.noDefault()
.name("deleteFiles")
.prop(FIELD_ID_PROP, DUMMY_FIELD_ID)
.prop(AvroSchemaUtil.FIELD_ID_PROP, 1303)
.type()
.nullable()
.array()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.UUID;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.iceberg.avro.AvroSchemaUtil;

/**
* A control event payload for events sent by a coordinator that indicates it has completed a commit
Expand All @@ -40,21 +41,21 @@ public class CommitTablePayload implements Payload {
.record(CommitTablePayload.class.getName())
.fields()
.name("commitId")
.prop(FIELD_ID_PROP, DUMMY_FIELD_ID)
.prop(AvroSchemaUtil.FIELD_ID_PROP, 1400)
.type(UUID_SCHEMA)
.noDefault()
.name("tableName")
.prop(FIELD_ID_PROP, DUMMY_FIELD_ID)
.prop(AvroSchemaUtil.FIELD_ID_PROP, 1401)
.type(TableName.AVRO_SCHEMA)
.noDefault()
.name("snapshotId")
.prop(FIELD_ID_PROP, DUMMY_FIELD_ID)
.prop(AvroSchemaUtil.FIELD_ID_PROP, 1402)
.type()
.nullable()
.longType()
.noDefault()
.name("vtts")
.prop(FIELD_ID_PROP, DUMMY_FIELD_ID)
.prop(AvroSchemaUtil.FIELD_ID_PROP, 1403)
.type()
.nullable()
.longType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,9 @@
import org.apache.avro.SchemaBuilder;
import org.apache.avro.generic.IndexedRecord;
import org.apache.avro.specific.SpecificData.SchemaConstructable;
import org.apache.iceberg.avro.AvroSchemaUtil;

/** Interface for complex types that will be used in an event schema. */
public interface Element extends IndexedRecord, SchemaConstructable {
// this is required by Iceberg's Avro deserializer to check for special metadata
// fields, but we aren't using any
String DUMMY_FIELD_ID = "-1";

String FIELD_ID_PROP = AvroSchemaUtil.FIELD_ID_PROP;

Schema UUID_SCHEMA =
LogicalTypes.uuid().addToSchema(SchemaBuilder.builder().fixed("uuid").size(16));
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.iceberg.avro.AvroEncoderUtil;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.data.avro.DecoderResolver;

/**
Expand Down Expand Up @@ -75,25 +76,25 @@ public Event(String groupId, EventType type, Payload payload) {
.record(getClass().getName())
.fields()
.name("id")
.prop(FIELD_ID_PROP, DUMMY_FIELD_ID)
.prop(AvroSchemaUtil.FIELD_ID_PROP, 1500)
.type(UUID_SCHEMA)
.noDefault()
.name("type")
.prop(FIELD_ID_PROP, DUMMY_FIELD_ID)
.prop(AvroSchemaUtil.FIELD_ID_PROP, 1501)
.type()
.intType()
.noDefault()
.name("timestamp")
.prop(FIELD_ID_PROP, DUMMY_FIELD_ID)
.prop(AvroSchemaUtil.FIELD_ID_PROP, 1502)
.type()
.longType()
.noDefault()
.name("payload")
.prop(FIELD_ID_PROP, DUMMY_FIELD_ID)
.prop(AvroSchemaUtil.FIELD_ID_PROP, 1503)
.type(payload.getSchema())
.noDefault()
.name("groupId")
.prop(FIELD_ID_PROP, DUMMY_FIELD_ID)
.prop(AvroSchemaUtil.FIELD_ID_PROP, 1504)
.type()
.stringType()
.noDefault()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.util.Utf8;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;

Expand All @@ -40,14 +41,14 @@ public class TableName implements Element {
.record(TableName.class.getName())
.fields()
.name("namespace")
.prop(FIELD_ID_PROP, DUMMY_FIELD_ID)
.prop(AvroSchemaUtil.FIELD_ID_PROP, 1600)
.type()
.array()
.items()
.stringType()
.noDefault()
.name("name")
.prop(FIELD_ID_PROP, DUMMY_FIELD_ID)
.prop(AvroSchemaUtil.FIELD_ID_PROP, 1601)
.type()
.stringType()
.noDefault()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.iceberg.avro.AvroSchemaUtil;

/** Element representing an offset, with topic name, partition number, and offset. */
public class TopicPartitionOffset implements Element {
Expand All @@ -35,23 +36,23 @@ public class TopicPartitionOffset implements Element {
.record(TopicPartitionOffset.class.getName())
.fields()
.name("topic")
.prop(FIELD_ID_PROP, DUMMY_FIELD_ID)
.prop(AvroSchemaUtil.FIELD_ID_PROP, 1700)
.type()
.stringType()
.noDefault()
.name("partition")
.prop(FIELD_ID_PROP, DUMMY_FIELD_ID)
.prop(AvroSchemaUtil.FIELD_ID_PROP, 1701)
.type()
.intType()
.noDefault()
.name("offset")
.prop(FIELD_ID_PROP, DUMMY_FIELD_ID)
.prop(AvroSchemaUtil.FIELD_ID_PROP, 1702)
.type()
.nullable()
.longType()
.noDefault()
.name("timestamp")
.prop(FIELD_ID_PROP, DUMMY_FIELD_ID)
.prop(AvroSchemaUtil.FIELD_ID_PROP, 1703)
.type()
.nullable()
.longType()
Expand Down

0 comments on commit 0808530

Please sign in to comment.