Skip to content

Commit

Permalink
remove mapping array
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanck committed Jan 4, 2024
1 parent 43acff8 commit 224fc3c
Show file tree
Hide file tree
Showing 9 changed files with 21 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.iceberg.avro.AvroEncoderUtil;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.data.avro.DecoderResolver;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.types.Types;

Expand Down Expand Up @@ -82,14 +83,11 @@ static Schema convert(
struct.equals(icebergSchema) ? javaClass.getName() : typeMap.get(fieldId));
}

static int[] positionsToIds(Schema avroSchema) {
int[] result = new int[avroSchema.getFields().size()];
static int positionToId(int position, Schema avroSchema) {
List<Schema.Field> fields = avroSchema.getFields();
for (int i = 0; i < result.length; i++) {
Object val = fields.get(i).getObjectProp(AvroSchemaUtil.FIELD_ID_PROP);
result[i] = val == null ? -1 : (int) val;
}
return result;
Preconditions.checkArgument(position < fields.size(), "Invalid field position: " + position);
Object val = fields.get(position).getObjectProp(AvroSchemaUtil.FIELD_ID_PROP);
return val == null ? -1 : (int) val;
}

private AvroUtil() {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ public class CommitComplete implements Payload {
private UUID commitId;
private OffsetDateTime validThroughTs;
private final Schema avroSchema;
private final int[] positionsToIds;

static final int COMMIT_ID = 10_000;
static final int VALID_THROUGH_TS = 10_001;
Expand All @@ -47,19 +46,16 @@ public class CommitComplete implements Payload {
NestedField.required(COMMIT_ID, "commit_id", UUIDType.get()),
NestedField.optional(VALID_THROUGH_TS, "valid_through_ts", TimestampType.withZone()));
private static final Schema AVRO_SCHEMA = AvroUtil.convert(ICEBERG_SCHEMA, CommitComplete.class);
private static final int[] POSITIONS_TO_IDS = AvroUtil.positionsToIds(AVRO_SCHEMA);

// Used by Avro reflection to instantiate this class when reading events
public CommitComplete(Schema avroSchema) {
this.avroSchema = avroSchema;
this.positionsToIds = AvroUtil.positionsToIds(avroSchema);
}

public CommitComplete(UUID commitId, OffsetDateTime validThroughTs) {
this.commitId = commitId;
this.validThroughTs = validThroughTs;
this.avroSchema = AVRO_SCHEMA;
this.positionsToIds = POSITIONS_TO_IDS;
}

@Override
Expand Down Expand Up @@ -91,7 +87,7 @@ public Schema getSchema() {

@Override
public void put(int i, Object v) {
switch (positionsToIds[i]) {
switch (AvroUtil.positionToId(i, avroSchema)) {
case COMMIT_ID:
this.commitId = (UUID) v;
return;
Expand All @@ -105,7 +101,7 @@ public void put(int i, Object v) {

@Override
public Object get(int i) {
switch (positionsToIds[i]) {
switch (AvroUtil.positionToId(i, avroSchema)) {
case COMMIT_ID:
return commitId;
case VALID_THROUGH_TS:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ public class CommitToTable implements Payload {
private Long snapshotId;
private OffsetDateTime validThroughTs;
private final Schema avroSchema;
private final int[] positionsToIds;

static final int COMMIT_ID = 10_400;
static final int TABLE_REFERENCE = 10_401;
Expand All @@ -54,12 +53,10 @@ public class CommitToTable implements Payload {
NestedField.optional(SNAPSHOT_ID, "snapshot_id", LongType.get()),
NestedField.optional(VALID_THROUGH_TS, "valid_through_ts", TimestampType.withZone()));
private static final Schema AVRO_SCHEMA = AvroUtil.convert(ICEBERG_SCHEMA, CommitToTable.class);
private static final int[] POSITIONS_TO_IDS = AvroUtil.positionsToIds(AVRO_SCHEMA);

// Used by Avro reflection to instantiate this class when reading events
public CommitToTable(Schema avroSchema) {
this.avroSchema = avroSchema;
this.positionsToIds = AvroUtil.positionsToIds(avroSchema);
}

public CommitToTable(
Expand All @@ -72,7 +69,6 @@ public CommitToTable(
this.snapshotId = snapshotId;
this.validThroughTs = validThroughTs;
this.avroSchema = AVRO_SCHEMA;
this.positionsToIds = POSITIONS_TO_IDS;
}

@Override
Expand Down Expand Up @@ -108,7 +104,7 @@ public Schema getSchema() {

@Override
public void put(int i, Object v) {
switch (positionsToIds[i]) {
switch (AvroUtil.positionToId(i, avroSchema)) {
case COMMIT_ID:
this.commitId = (UUID) v;
return;
Expand All @@ -128,7 +124,7 @@ public void put(int i, Object v) {

@Override
public Object get(int i) {
switch (positionsToIds[i]) {
switch (AvroUtil.positionToId(i, avroSchema)) {
case COMMIT_ID:
return commitId;
case TABLE_REFERENCE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ public class DataComplete implements Payload {
private UUID commitId;
private List<TopicPartitionOffset> assignments;
private final Schema avroSchema;
private final int[] positionsToIds;

static final int COMMIT_ID = 10_100;
static final int ASSIGNMENTS = 10_101;
Expand All @@ -49,19 +48,16 @@ public class DataComplete implements Payload {
"assignments",
ListType.ofRequired(ASSIGNMENTS_ELEMENT, TopicPartitionOffset.ICEBERG_SCHEMA)));
private static final Schema AVRO_SCHEMA = AvroUtil.convert(ICEBERG_SCHEMA, DataComplete.class);
private static final int[] POSITIONS_TO_IDS = AvroUtil.positionsToIds(AVRO_SCHEMA);

// Used by Avro reflection to instantiate this class when reading events
public DataComplete(Schema avroSchema) {
this.avroSchema = avroSchema;
this.positionsToIds = AvroUtil.positionsToIds(avroSchema);
}

public DataComplete(UUID commitId, List<TopicPartitionOffset> assignments) {
this.commitId = commitId;
this.assignments = assignments;
this.avroSchema = AVRO_SCHEMA;
this.positionsToIds = POSITIONS_TO_IDS;
}

@Override
Expand Down Expand Up @@ -90,7 +86,7 @@ public Schema getSchema() {
@Override
@SuppressWarnings("unchecked")
public void put(int i, Object v) {
switch (positionsToIds[i]) {
switch (AvroUtil.positionToId(i, avroSchema)) {
case COMMIT_ID:
this.commitId = (UUID) v;
return;
Expand All @@ -104,7 +100,7 @@ public void put(int i, Object v) {

@Override
public Object get(int i) {
switch (positionsToIds[i]) {
switch (AvroUtil.positionToId(i, avroSchema)) {
case COMMIT_ID:
return commitId;
case ASSIGNMENTS:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ public class DataWritten implements Payload {
private List<DeleteFile> deleteFiles;
private StructType icebergSchema;
private final Schema avroSchema;
private final int[] positionsToIds;

static final int COMMIT_ID = 10_300;
static final int TABLE_REFERENCE = 10_301;
Expand All @@ -55,7 +54,6 @@ public class DataWritten implements Payload {
// set the partition type so the instance cannot be re-serialized
public DataWritten(Schema avroSchema) {
this.avroSchema = avroSchema;
this.positionsToIds = AvroUtil.positionsToIds(avroSchema);
}

public DataWritten(
Expand All @@ -70,7 +68,6 @@ public DataWritten(
this.dataFiles = dataFiles;
this.deleteFiles = deleteFiles;
this.avroSchema = AvroUtil.convert(writeSchema(), getClass());
this.positionsToIds = AvroUtil.positionsToIds(avroSchema);
}

@Override
Expand Down Expand Up @@ -125,7 +122,7 @@ public Schema getSchema() {
@Override
@SuppressWarnings("unchecked")
public void put(int i, Object v) {
switch (positionsToIds[i]) {
switch (AvroUtil.positionToId(i, avroSchema)) {
case COMMIT_ID:
this.commitId = (UUID) v;
return;
Expand All @@ -145,7 +142,7 @@ public void put(int i, Object v) {

@Override
public Object get(int i) {
switch (positionsToIds[i]) {
switch (AvroUtil.positionToId(i, avroSchema)) {
case COMMIT_ID:
return commitId;
case TABLE_REFERENCE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ public class Event implements IndexedRecord {
private String groupId;
private Payload payload;
private final Schema avroSchema;
private final int[] positionsToIds;

static final int ID = 10_500;
static final int TYPE = 10_501;
Expand All @@ -57,7 +56,6 @@ public class Event implements IndexedRecord {
// Used by Avro reflection to instantiate this class when reading events
public Event(Schema avroSchema) {
this.avroSchema = avroSchema;
this.positionsToIds = AvroUtil.positionsToIds(avroSchema);
}

public Event(String groupId, Payload payload) {
Expand All @@ -79,7 +77,6 @@ public Event(String groupId, Payload payload) {
typeMap.put(PAYLOAD, payload.getClass().getName());

this.avroSchema = AvroUtil.convert(icebergSchema, getClass(), typeMap);
this.positionsToIds = AvroUtil.positionsToIds(avroSchema);
}

public UUID id() {
Expand Down Expand Up @@ -109,7 +106,7 @@ public Schema getSchema() {

@Override
public void put(int i, Object v) {
switch (positionsToIds[i]) {
switch (AvroUtil.positionToId(i, avroSchema)) {
case ID:
this.id = (UUID) v;
return;
Expand All @@ -132,7 +129,7 @@ public void put(int i, Object v) {

@Override
public Object get(int i) {
switch (positionsToIds[i]) {
switch (AvroUtil.positionToId(i, avroSchema)) {
case ID:
return id;
case TYPE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,25 +32,21 @@ public class StartCommit implements Payload {

private UUID commitId;
private final Schema avroSchema;
private final int[] positionsToIds;

static final int COMMIT_ID = 10_200;

private static final StructType ICEBERG_SCHEMA =
StructType.of(NestedField.required(COMMIT_ID, "commit_id", UUIDType.get()));
private static final Schema AVRO_SCHEMA = AvroUtil.convert(ICEBERG_SCHEMA, StartCommit.class);
private static final int[] POSITIONS_TO_IDS = AvroUtil.positionsToIds(AVRO_SCHEMA);

// Used by Avro reflection to instantiate this class when reading events
public StartCommit(Schema avroSchema) {
this.avroSchema = avroSchema;
this.positionsToIds = AvroUtil.positionsToIds(avroSchema);
}

public StartCommit(UUID commitId) {
this.commitId = commitId;
this.avroSchema = AVRO_SCHEMA;
this.positionsToIds = POSITIONS_TO_IDS;
}

@Override
Expand All @@ -74,7 +70,7 @@ public Schema getSchema() {

@Override
public void put(int i, Object v) {
switch (positionsToIds[i]) {
switch (AvroUtil.positionToId(i, avroSchema)) {
case COMMIT_ID:
this.commitId = (UUID) v;
return;
Expand All @@ -85,7 +81,7 @@ public void put(int i, Object v) {

@Override
public Object get(int i) {
switch (positionsToIds[i]) {
switch (AvroUtil.positionToId(i, avroSchema)) {
case COMMIT_ID:
return commitId;
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ public class TableReference implements IndexedRecord {
private List<String> namespace;
private String name;
private final Schema avroSchema;
private final int[] positionsToIds;

static final int CATALOG = 10_600;
static final int NAMESPACE = 10_601;
Expand All @@ -52,7 +51,6 @@ public class TableReference implements IndexedRecord {
NAMESPACE, "namespace", ListType.ofRequired(NAMESPACE + 1, StringType.get())),
NestedField.required(NAME, "name", StringType.get()));
private static final Schema AVRO_SCHEMA = AvroUtil.convert(ICEBERG_SCHEMA, TableReference.class);
private static final int[] POSITIONS_TO_IDS = AvroUtil.positionsToIds(AVRO_SCHEMA);

public static TableReference of(String catalog, TableIdentifier tableIdentifier) {
return new TableReference(
Expand All @@ -62,15 +60,13 @@ public static TableReference of(String catalog, TableIdentifier tableIdentifier)
// Used by Avro reflection to instantiate this class when reading events
public TableReference(Schema avroSchema) {
this.avroSchema = avroSchema;
this.positionsToIds = AvroUtil.positionsToIds(avroSchema);
}

public TableReference(String catalog, List<String> namespace, String name) {
this.catalog = catalog;
this.namespace = namespace;
this.name = name;
this.avroSchema = AVRO_SCHEMA;
this.positionsToIds = POSITIONS_TO_IDS;
}

public String catalog() {
Expand All @@ -90,7 +86,7 @@ public Schema getSchema() {
@Override
@SuppressWarnings("unchecked")
public void put(int i, Object v) {
switch (positionsToIds[i]) {
switch (AvroUtil.positionToId(i, avroSchema)) {
case CATALOG:
this.catalog = v == null ? null : v.toString();
return;
Expand All @@ -108,7 +104,7 @@ public void put(int i, Object v) {

@Override
public Object get(int i) {
switch (positionsToIds[i]) {
switch (AvroUtil.positionToId(i, avroSchema)) {
case CATALOG:
return catalog;
case NAMESPACE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ public class TopicPartitionOffset implements IndexedRecord {
private Long offset;
private OffsetDateTime timestamp;
private final Schema avroSchema;
private final int[] positionsToIds;

static final int TOPIC = 10_700;
static final int PARTITION = 10_701;
Expand All @@ -52,12 +51,10 @@ public class TopicPartitionOffset implements IndexedRecord {
NestedField.optional(TIMESTAMP, "timestamp", TimestampType.withZone()));
private static final Schema AVRO_SCHEMA =
AvroUtil.convert(ICEBERG_SCHEMA, TopicPartitionOffset.class);
private static final int[] POSITIONS_TO_IDS = AvroUtil.positionsToIds(AVRO_SCHEMA);

// Used by Avro reflection to instantiate this class when reading events
public TopicPartitionOffset(Schema avroSchema) {
this.avroSchema = avroSchema;
this.positionsToIds = AvroUtil.positionsToIds(avroSchema);
}

public TopicPartitionOffset(String topic, int partition, Long offset, OffsetDateTime timestamp) {
Expand All @@ -66,7 +63,6 @@ public TopicPartitionOffset(String topic, int partition, Long offset, OffsetDate
this.offset = offset;
this.timestamp = timestamp;
this.avroSchema = AVRO_SCHEMA;
this.positionsToIds = POSITIONS_TO_IDS;
}

public String topic() {
Expand All @@ -92,7 +88,7 @@ public Schema getSchema() {

@Override
public void put(int i, Object v) {
switch (positionsToIds[i]) {
switch (AvroUtil.positionToId(i, avroSchema)) {
case TOPIC:
this.topic = v == null ? null : v.toString();
return;
Expand All @@ -112,7 +108,7 @@ public void put(int i, Object v) {

@Override
public Object get(int i) {
switch (positionsToIds[i]) {
switch (AvroUtil.positionToId(i, avroSchema)) {
case TOPIC:
return topic;
case PARTITION:
Expand Down

0 comments on commit 224fc3c

Please sign in to comment.