Skip to content

Commit

Permalink
field ID mapping
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanck committed Jan 4, 2024
1 parent 8f93de1 commit 43acff8
Show file tree
Hide file tree
Showing 9 changed files with 183 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.Map;
import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
Expand All @@ -35,17 +36,17 @@
class AvroUtil {
static final Map<Integer, String> FIELD_ID_TO_CLASS =
ImmutableMap.of(
10_102,
DataComplete.ASSIGNMENTS_ELEMENT,
TopicPartitionOffset.class.getName(),
DataFile.PARTITION_ID,
PartitionData.class.getName(),
10_301,
DataWritten.TABLE_REFERENCE,
TableReference.class.getName(),
10_303,
DataWritten.DATA_FILES_ELEMENT,
"org.apache.iceberg.GenericDataFile",
10_305,
DataWritten.DELETE_FILES_ELEMENT,
"org.apache.iceberg.GenericDeleteFile",
10_401,
CommitToTable.TABLE_REFERENCE,
TableReference.class.getName());

public static byte[] encode(Event event) {
Expand Down Expand Up @@ -81,5 +82,15 @@ static Schema convert(
struct.equals(icebergSchema) ? javaClass.getName() : typeMap.get(fieldId));
}

static int[] positionsToIds(Schema avroSchema) {
int[] result = new int[avroSchema.getFields().size()];
List<Schema.Field> fields = avroSchema.getFields();
for (int i = 0; i < result.length; i++) {
Object val = fields.get(i).getObjectProp(AvroSchemaUtil.FIELD_ID_PROP);
result[i] = val == null ? -1 : (int) val;
}
return result;
}

private AvroUtil() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,23 +37,29 @@ public class CommitComplete implements Payload {
private UUID commitId;
private OffsetDateTime validThroughTs;
private final Schema avroSchema;
private final int[] positionsToIds;

static final int COMMIT_ID = 10_000;
static final int VALID_THROUGH_TS = 10_001;

private static final StructType ICEBERG_SCHEMA =
StructType.of(
NestedField.required(10_000, "commit_id", UUIDType.get()),
NestedField.optional(10_001, "valid_through_ts", TimestampType.withZone()));

NestedField.required(COMMIT_ID, "commit_id", UUIDType.get()),
NestedField.optional(VALID_THROUGH_TS, "valid_through_ts", TimestampType.withZone()));
private static final Schema AVRO_SCHEMA = AvroUtil.convert(ICEBERG_SCHEMA, CommitComplete.class);
private static final int[] POSITIONS_TO_IDS = AvroUtil.positionsToIds(AVRO_SCHEMA);

// Used by Avro reflection to instantiate this class when reading events
public CommitComplete(Schema avroSchema) {
this.avroSchema = avroSchema;
this.positionsToIds = AvroUtil.positionsToIds(avroSchema);
}

public CommitComplete(UUID commitId, OffsetDateTime validThroughTs) {
this.commitId = commitId;
this.validThroughTs = validThroughTs;
this.avroSchema = AVRO_SCHEMA;
this.positionsToIds = POSITIONS_TO_IDS;
}

@Override
Expand Down Expand Up @@ -85,11 +91,11 @@ public Schema getSchema() {

@Override
public void put(int i, Object v) {
switch (i) {
case 0:
switch (positionsToIds[i]) {
case COMMIT_ID:
this.commitId = (UUID) v;
return;
case 1:
case VALID_THROUGH_TS:
this.validThroughTs = v == null ? null : DateTimeUtil.timestamptzFromMicros((Long) v);
return;
default:
Expand All @@ -99,10 +105,10 @@ public void put(int i, Object v) {

@Override
public Object get(int i) {
switch (i) {
case 0:
switch (positionsToIds[i]) {
case COMMIT_ID:
return commitId;
case 1:
case VALID_THROUGH_TS:
return validThroughTs == null ? null : DateTimeUtil.microsFromTimestamptz(validThroughTs);
default:
throw new UnsupportedOperationException("Unknown field ordinal: " + i);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,19 +40,26 @@ public class CommitToTable implements Payload {
private Long snapshotId;
private OffsetDateTime validThroughTs;
private final Schema avroSchema;
private final int[] positionsToIds;

static final int COMMIT_ID = 10_400;
static final int TABLE_REFERENCE = 10_401;
static final int SNAPSHOT_ID = 10_402;
static final int VALID_THROUGH_TS = 10_403;

private static final StructType ICEBERG_SCHEMA =
StructType.of(
NestedField.required(10_400, "commit_id", UUIDType.get()),
NestedField.required(10_401, "table_reference", TableReference.ICEBERG_SCHEMA),
NestedField.optional(10_402, "snapshot_id", LongType.get()),
NestedField.optional(10_403, "valid_through_ts", TimestampType.withZone()));

NestedField.required(COMMIT_ID, "commit_id", UUIDType.get()),
NestedField.required(TABLE_REFERENCE, "table_reference", TableReference.ICEBERG_SCHEMA),
NestedField.optional(SNAPSHOT_ID, "snapshot_id", LongType.get()),
NestedField.optional(VALID_THROUGH_TS, "valid_through_ts", TimestampType.withZone()));
private static final Schema AVRO_SCHEMA = AvroUtil.convert(ICEBERG_SCHEMA, CommitToTable.class);
private static final int[] POSITIONS_TO_IDS = AvroUtil.positionsToIds(AVRO_SCHEMA);

// Used by Avro reflection to instantiate this class when reading events
public CommitToTable(Schema avroSchema) {
this.avroSchema = avroSchema;
this.positionsToIds = AvroUtil.positionsToIds(avroSchema);
}

public CommitToTable(
Expand All @@ -65,6 +72,7 @@ public CommitToTable(
this.snapshotId = snapshotId;
this.validThroughTs = validThroughTs;
this.avroSchema = AVRO_SCHEMA;
this.positionsToIds = POSITIONS_TO_IDS;
}

@Override
Expand Down Expand Up @@ -100,17 +108,17 @@ public Schema getSchema() {

@Override
public void put(int i, Object v) {
switch (i) {
case 0:
switch (positionsToIds[i]) {
case COMMIT_ID:
this.commitId = (UUID) v;
return;
case 1:
case TABLE_REFERENCE:
this.tableReference = (TableReference) v;
return;
case 2:
case SNAPSHOT_ID:
this.snapshotId = (Long) v;
return;
case 3:
case VALID_THROUGH_TS:
this.validThroughTs = v == null ? null : DateTimeUtil.timestamptzFromMicros((Long) v);
return;
default:
Expand All @@ -120,14 +128,14 @@ public void put(int i, Object v) {

@Override
public Object get(int i) {
switch (i) {
case 0:
switch (positionsToIds[i]) {
case COMMIT_ID:
return commitId;
case 1:
case TABLE_REFERENCE:
return tableReference;
case 2:
case SNAPSHOT_ID:
return snapshotId;
case 3:
case VALID_THROUGH_TS:
return validThroughTs == null ? null : DateTimeUtil.microsFromTimestamptz(validThroughTs);
default:
throw new UnsupportedOperationException("Unknown field ordinal: " + i);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,26 +35,33 @@ public class DataComplete implements Payload {
private UUID commitId;
private List<TopicPartitionOffset> assignments;
private final Schema avroSchema;
private final int[] positionsToIds;

static final int COMMIT_ID = 10_100;
static final int ASSIGNMENTS = 10_101;
static final int ASSIGNMENTS_ELEMENT = 10_102;

private static final StructType ICEBERG_SCHEMA =
StructType.of(
NestedField.required(10_100, "commit_id", UUIDType.get()),
NestedField.required(COMMIT_ID, "commit_id", UUIDType.get()),
NestedField.optional(
10_101,
ASSIGNMENTS,
"assignments",
ListType.ofRequired(10_102, TopicPartitionOffset.ICEBERG_SCHEMA)));

ListType.ofRequired(ASSIGNMENTS_ELEMENT, TopicPartitionOffset.ICEBERG_SCHEMA)));
private static final Schema AVRO_SCHEMA = AvroUtil.convert(ICEBERG_SCHEMA, DataComplete.class);
private static final int[] POSITIONS_TO_IDS = AvroUtil.positionsToIds(AVRO_SCHEMA);

// Used by Avro reflection to instantiate this class when reading events
public DataComplete(Schema avroSchema) {
this.avroSchema = avroSchema;
this.positionsToIds = AvroUtil.positionsToIds(avroSchema);
}

public DataComplete(UUID commitId, List<TopicPartitionOffset> assignments) {
this.commitId = commitId;
this.assignments = assignments;
this.avroSchema = AVRO_SCHEMA;
this.positionsToIds = POSITIONS_TO_IDS;
}

@Override
Expand Down Expand Up @@ -83,11 +90,11 @@ public Schema getSchema() {
@Override
@SuppressWarnings("unchecked")
public void put(int i, Object v) {
switch (i) {
case 0:
switch (positionsToIds[i]) {
case COMMIT_ID:
this.commitId = (UUID) v;
return;
case 1:
case ASSIGNMENTS:
this.assignments = (List<TopicPartitionOffset>) v;
return;
default:
Expand All @@ -97,10 +104,10 @@ public void put(int i, Object v) {

@Override
public Object get(int i) {
switch (i) {
case 0:
switch (positionsToIds[i]) {
case COMMIT_ID:
return commitId;
case 1:
case ASSIGNMENTS:
return assignments;
default:
throw new UnsupportedOperationException("Unknown field ordinal: " + i);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,21 @@ public class DataWritten implements Payload {
private List<DataFile> dataFiles;
private List<DeleteFile> deleteFiles;
private StructType icebergSchema;
private Schema avroSchema;
private final Schema avroSchema;
private final int[] positionsToIds;

static final int COMMIT_ID = 10_300;
static final int TABLE_REFERENCE = 10_301;
static final int DATA_FILES = 10_302;
static final int DATA_FILES_ELEMENT = 10_303;
static final int DELETE_FILES = 10_304;
static final int DELETE_FILES_ELEMENT = 10_304;

// Used by Avro reflection to instantiate this class when reading events, note that this does not
// set the partition type so the instance cannot be re-serialized
public DataWritten(Schema avroSchema) {
this.avroSchema = avroSchema;
this.positionsToIds = AvroUtil.positionsToIds(avroSchema);
}

public DataWritten(
Expand All @@ -60,6 +69,8 @@ public DataWritten(
this.tableReference = tableReference;
this.dataFiles = dataFiles;
this.deleteFiles = deleteFiles;
this.avroSchema = AvroUtil.convert(writeSchema(), getClass());
this.positionsToIds = AvroUtil.positionsToIds(avroSchema);
}

@Override
Expand Down Expand Up @@ -90,39 +101,41 @@ public StructType writeSchema() {

this.icebergSchema =
StructType.of(
NestedField.required(10_300, "commit_id", UUIDType.get()),
NestedField.required(10_301, "table_reference", TableReference.ICEBERG_SCHEMA),
NestedField.required(COMMIT_ID, "commit_id", UUIDType.get()),
NestedField.required(
TABLE_REFERENCE, "table_reference", TableReference.ICEBERG_SCHEMA),
NestedField.optional(
10_302, "data_files", ListType.ofRequired(10_303, dataFileStruct)),
DATA_FILES,
"data_files",
ListType.ofRequired(DATA_FILES_ELEMENT, dataFileStruct)),
NestedField.optional(
10_304, "delete_files", ListType.ofRequired(10_305, dataFileStruct)));
DELETE_FILES,
"delete_files",
ListType.ofRequired(DELETE_FILES_ELEMENT, dataFileStruct)));
}

return icebergSchema;
}

@Override
public Schema getSchema() {
if (avroSchema == null) {
this.avroSchema = AvroUtil.convert(writeSchema(), getClass());
}
return avroSchema;
}

@Override
@SuppressWarnings("unchecked")
public void put(int i, Object v) {
switch (i) {
case 0:
switch (positionsToIds[i]) {
case COMMIT_ID:
this.commitId = (UUID) v;
return;
case 1:
case TABLE_REFERENCE:
this.tableReference = (TableReference) v;
return;
case 2:
case DATA_FILES:
this.dataFiles = (List<DataFile>) v;
return;
case 3:
case DELETE_FILES:
this.deleteFiles = (List<DeleteFile>) v;
return;
default:
Expand All @@ -132,14 +145,14 @@ public void put(int i, Object v) {

@Override
public Object get(int i) {
switch (i) {
case 0:
switch (positionsToIds[i]) {
case COMMIT_ID:
return commitId;
case 1:
case TABLE_REFERENCE:
return tableReference;
case 2:
case DATA_FILES:
return dataFiles;
case 3:
case DELETE_FILES:
return deleteFiles;
default:
throw new UnsupportedOperationException("Unknown field ordinal: " + i);
Expand Down
Loading

0 comments on commit 43acff8

Please sign in to comment.