From a5819a598ce46af0595ca4d3379595f8d9da939d Mon Sep 17 00:00:00 2001 From: Bryan Keller Date: Wed, 10 Jan 2024 06:07:38 -0800 Subject: [PATCH] Kafka Connect: Initial project setup and event data structures (#8701) --- .github/labeler.yml | 6 + .../apache/iceberg/avro/AvroSchemaUtil.java | 13 +- .../org/apache/iceberg/avro/TypeToSchema.java | 94 ++++++++--- .../iceberg/data/avro/DecoderResolver.java | 4 + kafka-connect/build.gradle | 32 ++++ .../iceberg/connect/events/AvroUtil.java | 95 +++++++++++ .../connect/events/CommitComplete.java | 113 +++++++++++++ .../iceberg/connect/events/CommitToTable.java | 140 ++++++++++++++++ .../iceberg/connect/events/DataComplete.java | 112 +++++++++++++ .../iceberg/connect/events/DataWritten.java | 158 ++++++++++++++++++ .../apache/iceberg/connect/events/Event.java | 147 ++++++++++++++++ .../iceberg/connect/events/Payload.java | 33 ++++ .../iceberg/connect/events/PayloadType.java | 43 +++++ .../iceberg/connect/events/StartCommit.java | 91 ++++++++++ .../connect/events/TableReference.java | 118 +++++++++++++ .../connect/events/TopicPartitionOffset.java | 124 ++++++++++++++ .../events/EventSerializationTest.java | 126 ++++++++++++++ .../iceberg/connect/events/EventTestUtil.java | 99 +++++++++++ settings.gradle | 5 + 19 files changed, 1529 insertions(+), 24 deletions(-) create mode 100644 kafka-connect/build.gradle create mode 100644 kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/AvroUtil.java create mode 100644 kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/CommitComplete.java create mode 100644 kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/CommitToTable.java create mode 100644 kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/DataComplete.java create mode 100644 kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/DataWritten.java create mode 100644 kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/Event.java create mode 100644 kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/Payload.java create mode 100644 kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/PayloadType.java create mode 100644 kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/StartCommit.java create mode 100644 kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/TableReference.java create mode 100644 kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/TopicPartitionOffset.java create mode 100644 kafka-connect/kafka-connect-events/src/test/java/org/apache/iceberg/connect/events/EventSerializationTest.java create mode 100644 kafka-connect/kafka-connect-events/src/test/java/org/apache/iceberg/connect/events/EventTestUtil.java diff --git a/.github/labeler.yml b/.github/labeler.yml index 554cbc03f59b..50eecfb90feb 100644 --- a/.github/labeler.yml +++ b/.github/labeler.yml @@ -185,3 +185,9 @@ AZURE: 'azure/**/*', 'azure-bundle/**/*' ] + +KAFKACONNECT: + - changed-files: + - any-glob-to-any-file: [ + 'kafka-connect/**/*' + ] diff --git a/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java b/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java index 2d7c5e4a88a3..89cebf7598b5 100644 --- a/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java +++ b/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java @@ -21,6 +21,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.BiFunction; import org.apache.avro.JsonProperties; import org.apache.avro.LogicalType; import org.apache.avro.LogicalTypes; @@ -59,7 +60,7 @@ public static Schema convert(org.apache.iceberg.Schema schema, String tableName) public static Schema convert( org.apache.iceberg.Schema schema, Map names) { - return TypeUtil.visit(schema, new TypeToSchema(names)); + return TypeUtil.visit(schema, new TypeToSchema.WithTypeToName(names)); } public static Schema convert(Type type) { @@ -71,7 +72,12 @@ public static Schema convert(Types.StructType type, String name) { } public static Schema convert(Type type, Map names) { - return TypeUtil.visit(type, new TypeToSchema(names)); + return TypeUtil.visit(type, new TypeToSchema.WithTypeToName(names)); + } + + public static Schema convert( + Type type, BiFunction namesFunction) { + return TypeUtil.visit(type, new TypeToSchema.WithNamesFunction(namesFunction)); } public static Type convert(Schema schema) { @@ -111,7 +117,8 @@ static boolean missingIds(Schema schema) { } public static Map convertTypes(Types.StructType type, String name) { - TypeToSchema converter = new TypeToSchema(ImmutableMap.of(type, name)); + TypeToSchema.WithTypeToName converter = + new TypeToSchema.WithTypeToName(ImmutableMap.of(type, name)); TypeUtil.visit(type, converter); return ImmutableMap.copyOf(converter.getConversionMap()); } diff --git a/core/src/main/java/org/apache/iceberg/avro/TypeToSchema.java b/core/src/main/java/org/apache/iceberg/avro/TypeToSchema.java index bc2847e1b4ba..05ce4e618662 100644 --- a/core/src/main/java/org/apache/iceberg/avro/TypeToSchema.java +++ b/core/src/main/java/org/apache/iceberg/avro/TypeToSchema.java @@ -21,6 +21,7 @@ import java.util.Deque; import java.util.List; import java.util.Map; +import java.util.function.BiFunction; import org.apache.avro.JsonProperties; import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; @@ -30,7 +31,7 @@ import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.types.Types; -class TypeToSchema extends TypeUtil.SchemaVisitor { +abstract class TypeToSchema extends TypeUtil.SchemaVisitor { private static final Schema BOOLEAN_SCHEMA = Schema.create(Schema.Type.BOOLEAN); private static final Schema INTEGER_SCHEMA = Schema.create(Schema.Type.INT); private static final Schema LONG_SCHEMA = Schema.create(Schema.Type.LONG); @@ -55,15 +56,10 @@ class TypeToSchema extends TypeUtil.SchemaVisitor { } private final Deque fieldIds = Lists.newLinkedList(); - private final Map results = Maps.newHashMap(); - private final Map names; + private final BiFunction namesFunction; - TypeToSchema(Map names) { - this.names = names; - } - - Map getConversionMap() { - return results; + TypeToSchema(BiFunction namesFunction) { + this.namesFunction = namesFunction; } @Override @@ -81,16 +77,29 @@ public void afterField(Types.NestedField field) { fieldIds.pop(); } + Schema lookupSchema(Type type) { + return lookupSchema(type, null); + } + + abstract Schema lookupSchema(Type type, String recordName); + + void cacheSchema(Type struct, Schema schema) { + cacheSchema(struct, null, schema); + } + + abstract void cacheSchema(Type struct, String recordName, Schema schema); + @Override public Schema struct(Types.StructType struct, List fieldSchemas) { - Schema recordSchema = results.get(struct); - if (recordSchema != null) { - return recordSchema; + Integer fieldId = fieldIds.peek(); + String recordName = namesFunction.apply(fieldId, struct); + if (recordName == null) { + recordName = "r" + fieldId; } - String recordName = names.get(struct); - if (recordName == null) { - recordName = "r" + fieldIds.peek(); + Schema recordSchema = lookupSchema(struct, recordName); + if (recordSchema != null) { + return recordSchema; } List structFields = struct.fields(); @@ -115,7 +124,7 @@ public Schema struct(Types.StructType struct, List fieldSchemas) { recordSchema = Schema.createRecord(recordName, null, null, false, fields); - results.put(struct, recordSchema); + cacheSchema(struct, recordName, recordSchema); return recordSchema; } @@ -131,7 +140,7 @@ public Schema field(Types.NestedField field, Schema fieldSchema) { @Override public Schema list(Types.ListType list, Schema elementSchema) { - Schema listSchema = results.get(list); + Schema listSchema = lookupSchema(list); if (listSchema != null) { return listSchema; } @@ -144,14 +153,14 @@ public Schema list(Types.ListType list, Schema elementSchema) { listSchema.addProp(AvroSchemaUtil.ELEMENT_ID_PROP, list.elementId()); - results.put(list, listSchema); + cacheSchema(list, listSchema); return listSchema; } @Override public Schema map(Types.MapType map, Schema keySchema, Schema valueSchema) { - Schema mapSchema = results.get(map); + Schema mapSchema = lookupSchema(map); if (mapSchema != null) { return mapSchema; } @@ -173,7 +182,7 @@ public Schema map(Types.MapType map, Schema keySchema, Schema valueSchema) { map.isValueOptional() ? AvroSchemaUtil.toOption(valueSchema) : valueSchema); } - results.put(map, mapSchema); + cacheSchema(map, mapSchema); return mapSchema; } @@ -238,8 +247,51 @@ public Schema primitive(Type.PrimitiveType primitive) { throw new UnsupportedOperationException("Unsupported type ID: " + primitive.typeId()); } - results.put(primitive, primitiveSchema); + cacheSchema(primitive, primitiveSchema); return primitiveSchema; } + + static class WithTypeToName extends TypeToSchema { + + private final Map results = Maps.newHashMap(); + + WithTypeToName(Map names) { + super((id, struct) -> names.get(struct)); + } + + Map getConversionMap() { + return results; + } + + @Override + void cacheSchema(Type type, String recordName, Schema schema) { + results.put(type, schema); + } + + @Override + Schema lookupSchema(Type type, String recordName) { + return results.get(type); + } + } + + static class WithNamesFunction extends TypeToSchema { + private final Map schemaCache = Maps.newHashMap(); + + WithNamesFunction(BiFunction namesFunction) { + super(namesFunction); + } + + @Override + void cacheSchema(Type type, String recordName, Schema schema) { + if (recordName != null) { + schemaCache.put(recordName, schema); + } + } + + @Override + Schema lookupSchema(Type type, String recordName) { + return recordName == null ? null : schemaCache.get(recordName); + } + } } diff --git a/core/src/main/java/org/apache/iceberg/data/avro/DecoderResolver.java b/core/src/main/java/org/apache/iceberg/data/avro/DecoderResolver.java index 3cf3d1bcde2f..03de0ac987e7 100644 --- a/core/src/main/java/org/apache/iceberg/data/avro/DecoderResolver.java +++ b/core/src/main/java/org/apache/iceberg/data/avro/DecoderResolver.java @@ -51,6 +51,10 @@ public static T resolveAndRead( return value; } + public static void clearCache() { + DECODER_CACHES.get().clear(); + } + @VisibleForTesting static ResolvingDecoder resolve(Decoder decoder, Schema readSchema, Schema fileSchema) throws IOException { diff --git a/kafka-connect/build.gradle b/kafka-connect/build.gradle new file mode 100644 index 000000000000..102cf6a89283 --- /dev/null +++ b/kafka-connect/build.gradle @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +project(":iceberg-kafka-connect:iceberg-kafka-connect-events") { + dependencies { + api project(':iceberg-api') + implementation project(':iceberg-core') + implementation project(':iceberg-common') + implementation project(path: ':iceberg-bundled-guava', configuration: 'shadow') + implementation libs.avro.avro + } + + test { + useJUnitPlatform() + } +} diff --git a/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/AvroUtil.java b/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/AvroUtil.java new file mode 100644 index 000000000000..8f184c21b401 --- /dev/null +++ b/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/AvroUtil.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.events; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.List; +import java.util.Map; +import org.apache.avro.Schema; +import org.apache.avro.generic.IndexedRecord; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.PartitionData; +import org.apache.iceberg.avro.AvroEncoderUtil; +import org.apache.iceberg.avro.AvroSchemaUtil; +import org.apache.iceberg.data.avro.DecoderResolver; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.types.Types; + +/** Class for Avro-related utility methods. */ +class AvroUtil { + static final Map FIELD_ID_TO_CLASS = + ImmutableMap.of( + DataComplete.ASSIGNMENTS_ELEMENT, + TopicPartitionOffset.class.getName(), + DataFile.PARTITION_ID, + PartitionData.class.getName(), + DataWritten.TABLE_REFERENCE, + TableReference.class.getName(), + DataWritten.DATA_FILES_ELEMENT, + "org.apache.iceberg.GenericDataFile", + DataWritten.DELETE_FILES_ELEMENT, + "org.apache.iceberg.GenericDeleteFile", + CommitToTable.TABLE_REFERENCE, + TableReference.class.getName()); + + public static byte[] encode(Event event) { + try { + return AvroEncoderUtil.encode(event, event.getSchema()); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + public static Event decode(byte[] bytes) { + try { + Event event = AvroEncoderUtil.decode(bytes); + // clear the cache to avoid memory leak + DecoderResolver.clearCache(); + return event; + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + static Schema convert(Types.StructType icebergSchema, Class javaClass) { + return convert(icebergSchema, javaClass, FIELD_ID_TO_CLASS); + } + + static Schema convert( + Types.StructType icebergSchema, + Class javaClass, + Map typeMap) { + return AvroSchemaUtil.convert( + icebergSchema, + (fieldId, struct) -> + struct.equals(icebergSchema) ? javaClass.getName() : typeMap.get(fieldId)); + } + + static int positionToId(int position, Schema avroSchema) { + List fields = avroSchema.getFields(); + Preconditions.checkArgument( + position >= 0 && position < fields.size(), "Invalid field position: " + position); + Object val = fields.get(position).getObjectProp(AvroSchemaUtil.FIELD_ID_PROP); + return val == null ? -1 : (int) val; + } + + private AvroUtil() {} +} diff --git a/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/CommitComplete.java b/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/CommitComplete.java new file mode 100644 index 000000000000..8a2cf632407d --- /dev/null +++ b/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/CommitComplete.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.events; + +import java.time.OffsetDateTime; +import java.util.UUID; +import org.apache.avro.Schema; +import org.apache.iceberg.types.Types.NestedField; +import org.apache.iceberg.types.Types.StructType; +import org.apache.iceberg.types.Types.TimestampType; +import org.apache.iceberg.types.Types.UUIDType; +import org.apache.iceberg.util.DateTimeUtil; + +/** + * A control event payload for events sent by a coordinator that indicates it has completed a commit + * cycle. Events with this payload are not consumed by the sink, they are informational and can be + * used by consumers to trigger downstream processes. + */ +public class CommitComplete implements Payload { + + private UUID commitId; + private OffsetDateTime validThroughTs; + private final Schema avroSchema; + + static final int COMMIT_ID = 10_000; + static final int VALID_THROUGH_TS = 10_001; + + private static final StructType ICEBERG_SCHEMA = + StructType.of( + NestedField.required(COMMIT_ID, "commit_id", UUIDType.get()), + NestedField.optional(VALID_THROUGH_TS, "valid_through_ts", TimestampType.withZone())); + private static final Schema AVRO_SCHEMA = AvroUtil.convert(ICEBERG_SCHEMA, CommitComplete.class); + + // Used by Avro reflection to instantiate this class when reading events + public CommitComplete(Schema avroSchema) { + this.avroSchema = avroSchema; + } + + public CommitComplete(UUID commitId, OffsetDateTime validThroughTs) { + this.commitId = commitId; + this.validThroughTs = validThroughTs; + this.avroSchema = AVRO_SCHEMA; + } + + @Override + public PayloadType type() { + return PayloadType.COMMIT_COMPLETE; + } + + public UUID commitId() { + return commitId; + } + + /** + * Valid-through timestamp, which is the min-of-max record timestamps across all workers for the + * commit. + */ + public OffsetDateTime validThroughTs() { + return validThroughTs; + } + + @Override + public StructType writeSchema() { + return ICEBERG_SCHEMA; + } + + @Override + public Schema getSchema() { + return avroSchema; + } + + @Override + public void put(int i, Object v) { + switch (AvroUtil.positionToId(i, avroSchema)) { + case COMMIT_ID: + this.commitId = (UUID) v; + return; + case VALID_THROUGH_TS: + this.validThroughTs = v == null ? null : DateTimeUtil.timestamptzFromMicros((Long) v); + return; + default: + // ignore the object, it must be from a newer version of the format + } + } + + @Override + public Object get(int i) { + switch (AvroUtil.positionToId(i, avroSchema)) { + case COMMIT_ID: + return commitId; + case VALID_THROUGH_TS: + return validThroughTs == null ? null : DateTimeUtil.microsFromTimestamptz(validThroughTs); + default: + throw new UnsupportedOperationException("Unknown field ordinal: " + i); + } + } +} diff --git a/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/CommitToTable.java b/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/CommitToTable.java new file mode 100644 index 000000000000..df32ff017690 --- /dev/null +++ b/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/CommitToTable.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.events; + +import java.time.OffsetDateTime; +import java.util.UUID; +import org.apache.avro.Schema; +import org.apache.iceberg.types.Types.LongType; +import org.apache.iceberg.types.Types.NestedField; +import org.apache.iceberg.types.Types.StructType; +import org.apache.iceberg.types.Types.TimestampType; +import org.apache.iceberg.types.Types.UUIDType; +import org.apache.iceberg.util.DateTimeUtil; + +/** + * A control event payload for events sent by a coordinator that indicates it has completed a commit + * cycle. Events with this payload are not consumed by the sink, they are informational and can be + * used by consumers to trigger downstream processes. + */ +public class CommitToTable implements Payload { + + private UUID commitId; + private TableReference tableReference; + private Long snapshotId; + private OffsetDateTime validThroughTs; + private final Schema avroSchema; + + static final int COMMIT_ID = 10_400; + static final int TABLE_REFERENCE = 10_401; + static final int SNAPSHOT_ID = 10_402; + static final int VALID_THROUGH_TS = 10_403; + + private static final StructType ICEBERG_SCHEMA = + StructType.of( + NestedField.required(COMMIT_ID, "commit_id", UUIDType.get()), + NestedField.required(TABLE_REFERENCE, "table_reference", TableReference.ICEBERG_SCHEMA), + NestedField.optional(SNAPSHOT_ID, "snapshot_id", LongType.get()), + NestedField.optional(VALID_THROUGH_TS, "valid_through_ts", TimestampType.withZone())); + private static final Schema AVRO_SCHEMA = AvroUtil.convert(ICEBERG_SCHEMA, CommitToTable.class); + + // Used by Avro reflection to instantiate this class when reading events + public CommitToTable(Schema avroSchema) { + this.avroSchema = avroSchema; + } + + public CommitToTable( + UUID commitId, + TableReference tableReference, + Long snapshotId, + OffsetDateTime validThroughTs) { + this.commitId = commitId; + this.tableReference = tableReference; + this.snapshotId = snapshotId; + this.validThroughTs = validThroughTs; + this.avroSchema = AVRO_SCHEMA; + } + + @Override + public PayloadType type() { + return PayloadType.COMMIT_TO_TABLE; + } + + public UUID commitId() { + return commitId; + } + + public TableReference tableReference() { + return tableReference; + } + + public Long snapshotId() { + return snapshotId; + } + + public OffsetDateTime validThroughTs() { + return validThroughTs; + } + + @Override + public StructType writeSchema() { + return ICEBERG_SCHEMA; + } + + @Override + public Schema getSchema() { + return avroSchema; + } + + @Override + public void put(int i, Object v) { + switch (AvroUtil.positionToId(i, avroSchema)) { + case COMMIT_ID: + this.commitId = (UUID) v; + return; + case TABLE_REFERENCE: + this.tableReference = (TableReference) v; + return; + case SNAPSHOT_ID: + this.snapshotId = (Long) v; + return; + case VALID_THROUGH_TS: + this.validThroughTs = v == null ? null : DateTimeUtil.timestamptzFromMicros((Long) v); + return; + default: + // ignore the object, it must be from a newer version of the format + } + } + + @Override + public Object get(int i) { + switch (AvroUtil.positionToId(i, avroSchema)) { + case COMMIT_ID: + return commitId; + case TABLE_REFERENCE: + return tableReference; + case SNAPSHOT_ID: + return snapshotId; + case VALID_THROUGH_TS: + return validThroughTs == null ? null : DateTimeUtil.microsFromTimestamptz(validThroughTs); + default: + throw new UnsupportedOperationException("Unknown field ordinal: " + i); + } + } +} diff --git a/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/DataComplete.java b/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/DataComplete.java new file mode 100644 index 000000000000..e91414d2512d --- /dev/null +++ b/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/DataComplete.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.events; + +import java.util.List; +import java.util.UUID; +import org.apache.avro.Schema; +import org.apache.iceberg.types.Types.ListType; +import org.apache.iceberg.types.Types.NestedField; +import org.apache.iceberg.types.Types.StructType; +import org.apache.iceberg.types.Types.UUIDType; + +/** + * A control event payload for events sent by a worker that indicates it has finished sending all + * data for a commit request. + */ +public class DataComplete implements Payload { + + private UUID commitId; + private List assignments; + private final Schema avroSchema; + + static final int COMMIT_ID = 10_100; + static final int ASSIGNMENTS = 10_101; + static final int ASSIGNMENTS_ELEMENT = 10_102; + + private static final StructType ICEBERG_SCHEMA = + StructType.of( + NestedField.required(COMMIT_ID, "commit_id", UUIDType.get()), + NestedField.optional( + ASSIGNMENTS, + "assignments", + ListType.ofRequired(ASSIGNMENTS_ELEMENT, TopicPartitionOffset.ICEBERG_SCHEMA))); + private static final Schema AVRO_SCHEMA = AvroUtil.convert(ICEBERG_SCHEMA, DataComplete.class); + + // Used by Avro reflection to instantiate this class when reading events + public DataComplete(Schema avroSchema) { + this.avroSchema = avroSchema; + } + + public DataComplete(UUID commitId, List assignments) { + this.commitId = commitId; + this.assignments = assignments; + this.avroSchema = AVRO_SCHEMA; + } + + @Override + public PayloadType type() { + return PayloadType.DATA_COMPLETE; + } + + public UUID commitId() { + return commitId; + } + + public List assignments() { + return assignments; + } + + @Override + public StructType writeSchema() { + return ICEBERG_SCHEMA; + } + + @Override + public Schema getSchema() { + return avroSchema; + } + + @Override + @SuppressWarnings("unchecked") + public void put(int i, Object v) { + switch (AvroUtil.positionToId(i, avroSchema)) { + case COMMIT_ID: + this.commitId = (UUID) v; + return; + case ASSIGNMENTS: + this.assignments = (List) v; + return; + default: + // ignore the object, it must be from a newer version of the format + } + } + + @Override + public Object get(int i) { + switch (AvroUtil.positionToId(i, avroSchema)) { + case COMMIT_ID: + return commitId; + case ASSIGNMENTS: + return assignments; + default: + throw new UnsupportedOperationException("Unknown field ordinal: " + i); + } + } +} diff --git a/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/DataWritten.java b/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/DataWritten.java new file mode 100644 index 000000000000..5edc401a2919 --- /dev/null +++ b/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/DataWritten.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.events; + +import java.util.List; +import java.util.UUID; +import org.apache.avro.Schema; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.types.Types.ListType; +import org.apache.iceberg.types.Types.NestedField; +import org.apache.iceberg.types.Types.StructType; +import org.apache.iceberg.types.Types.UUIDType; + +/** + * A control event payload for events sent by a worker that contains the table data that has been + * written and is ready to commit. + */ +public class DataWritten implements Payload { + + private StructType partitionType; + + private UUID commitId; + private TableReference tableReference; + private List dataFiles; + private List deleteFiles; + private StructType icebergSchema; + private final Schema avroSchema; + + static final int COMMIT_ID = 10_300; + static final int TABLE_REFERENCE = 10_301; + static final int DATA_FILES = 10_302; + static final int DATA_FILES_ELEMENT = 10_303; + static final int DELETE_FILES = 10_304; + static final int DELETE_FILES_ELEMENT = 10_304; + + // Used by Avro reflection to instantiate this class when reading events, note that this does not + // set the partition type so the instance cannot be re-serialized + public DataWritten(Schema avroSchema) { + this.avroSchema = avroSchema; + } + + public DataWritten( + StructType partitionType, + UUID commitId, + TableReference tableReference, + List dataFiles, + List deleteFiles) { + this.partitionType = partitionType; + this.commitId = commitId; + this.tableReference = tableReference; + this.dataFiles = dataFiles; + this.deleteFiles = deleteFiles; + this.avroSchema = AvroUtil.convert(writeSchema(), getClass()); + } + + @Override + public PayloadType type() { + return PayloadType.DATA_WRITTEN; + } + + public UUID commitId() { + return commitId; + } + + public TableReference tableReference() { + return tableReference; + } + + public List dataFiles() { + return dataFiles; + } + + public List deleteFiles() { + return deleteFiles; + } + + @Override + public StructType writeSchema() { + if (icebergSchema == null) { + StructType dataFileStruct = DataFile.getType(partitionType); + + this.icebergSchema = + StructType.of( + NestedField.required(COMMIT_ID, "commit_id", UUIDType.get()), + NestedField.required( + TABLE_REFERENCE, "table_reference", TableReference.ICEBERG_SCHEMA), + NestedField.optional( + DATA_FILES, + "data_files", + ListType.ofRequired(DATA_FILES_ELEMENT, dataFileStruct)), + NestedField.optional( + DELETE_FILES, + "delete_files", + ListType.ofRequired(DELETE_FILES_ELEMENT, dataFileStruct))); + } + + return icebergSchema; + } + + @Override + public Schema getSchema() { + return avroSchema; + } + + @Override + @SuppressWarnings("unchecked") + public void put(int i, Object v) { + switch (AvroUtil.positionToId(i, avroSchema)) { + case COMMIT_ID: + this.commitId = (UUID) v; + return; + case TABLE_REFERENCE: + this.tableReference = (TableReference) v; + return; + case DATA_FILES: + this.dataFiles = (List) v; + return; + case DELETE_FILES: + this.deleteFiles = (List) v; + return; + default: + // ignore the object, it must be from a newer version of the format + } + } + + @Override + public Object get(int i) { + switch (AvroUtil.positionToId(i, avroSchema)) { + case COMMIT_ID: + return commitId; + case TABLE_REFERENCE: + return tableReference; + case DATA_FILES: + return dataFiles; + case DELETE_FILES: + return deleteFiles; + default: + throw new UnsupportedOperationException("Unknown field ordinal: " + i); + } + } +} diff --git a/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/Event.java b/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/Event.java new file mode 100644 index 000000000000..f8ee619142c9 --- /dev/null +++ b/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/Event.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.events; + +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.time.temporal.ChronoUnit; +import java.util.Map; +import java.util.UUID; +import org.apache.avro.Schema; +import org.apache.avro.generic.IndexedRecord; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Types.IntegerType; +import org.apache.iceberg.types.Types.NestedField; +import org.apache.iceberg.types.Types.StringType; +import org.apache.iceberg.types.Types.StructType; +import org.apache.iceberg.types.Types.TimestampType; +import org.apache.iceberg.types.Types.UUIDType; +import org.apache.iceberg.util.DateTimeUtil; + +/** + * Class representing all events produced to the control topic. Different event types have different + * payloads. + */ +public class Event implements IndexedRecord { + + private UUID id; + private PayloadType type; + private OffsetDateTime timestamp; + private String groupId; + private Payload payload; + private final Schema avroSchema; + + static final int ID = 10_500; + static final int TYPE = 10_501; + static final int TIMESTAMP = 10_502; + static final int GROUP_ID = 10_503; + static final int PAYLOAD = 10_504; + + // Used by Avro reflection to instantiate this class when reading events + public Event(Schema avroSchema) { + this.avroSchema = avroSchema; + } + + public Event(String groupId, Payload payload) { + this.id = UUID.randomUUID(); + this.type = payload.type(); + this.timestamp = OffsetDateTime.now(ZoneOffset.UTC).truncatedTo(ChronoUnit.MICROS); + this.groupId = groupId; + this.payload = payload; + + StructType icebergSchema = + StructType.of( + NestedField.required(ID, "id", UUIDType.get()), + NestedField.required(TYPE, "type", IntegerType.get()), + NestedField.required(TIMESTAMP, "timestamp", TimestampType.withZone()), + NestedField.required(GROUP_ID, "group_id", StringType.get()), + NestedField.required(PAYLOAD, "payload", payload.writeSchema())); + + Map typeMap = Maps.newHashMap(AvroUtil.FIELD_ID_TO_CLASS); + typeMap.put(PAYLOAD, payload.getClass().getName()); + + this.avroSchema = AvroUtil.convert(icebergSchema, getClass(), typeMap); + } + + public UUID id() { + return id; + } + + public PayloadType type() { + return type; + } + + public OffsetDateTime timestamp() { + return timestamp; + } + + public Payload payload() { + return payload; + } + + public String groupId() { + return groupId; + } + + @Override + public Schema getSchema() { + return avroSchema; + } + + @Override + public void put(int i, Object v) { + switch (AvroUtil.positionToId(i, avroSchema)) { + case ID: + this.id = (UUID) v; + return; + case TYPE: + this.type = v == null ? null : PayloadType.values()[(Integer) v]; + return; + case TIMESTAMP: + this.timestamp = v == null ? null : DateTimeUtil.timestamptzFromMicros((Long) v); + return; + case GROUP_ID: + this.groupId = v == null ? null : v.toString(); + return; + case PAYLOAD: + this.payload = (Payload) v; + return; + default: + // ignore the object, it must be from a newer version of the format + } + } + + @Override + public Object get(int i) { + switch (AvroUtil.positionToId(i, avroSchema)) { + case ID: + return id; + case TYPE: + return type == null ? null : type.id(); + case TIMESTAMP: + return timestamp == null ? null : DateTimeUtil.microsFromTimestamptz(timestamp); + case GROUP_ID: + return groupId; + case PAYLOAD: + return payload; + default: + throw new UnsupportedOperationException("Unknown field ordinal: " + i); + } + } +} diff --git a/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/Payload.java b/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/Payload.java new file mode 100644 index 000000000000..0c0c883bda98 --- /dev/null +++ b/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/Payload.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.events; + +import org.apache.avro.generic.IndexedRecord; +import org.apache.iceberg.types.Types; + +/** + * Interface for an element that is an event payload. Different event types contain different + * payloads. + */ +public interface Payload extends IndexedRecord { + + PayloadType type(); + + Types.StructType writeSchema(); +} diff --git a/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/PayloadType.java b/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/PayloadType.java new file mode 100644 index 000000000000..6e114f116888 --- /dev/null +++ b/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/PayloadType.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.events; + +/** Control event types. */ +public enum PayloadType { + /** Maps to payload of type {@link StartCommit} */ + START_COMMIT(0), + /** Maps to payload of type {@link DataWritten} */ + DATA_WRITTEN(1), + /** Maps to payload of type {@link DataComplete} */ + DATA_COMPLETE(2), + /** Maps to payload of type {@link CommitToTable} */ + COMMIT_TO_TABLE(3), + /** Maps to payload of type {@link CommitComplete} */ + COMMIT_COMPLETE(4); + + private final int id; + + PayloadType(int id) { + this.id = id; + } + + public int id() { + return id; + } +} diff --git a/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/StartCommit.java b/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/StartCommit.java new file mode 100644 index 000000000000..f26a92249507 --- /dev/null +++ b/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/StartCommit.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.events; + +import java.util.UUID; +import org.apache.avro.Schema; +import org.apache.iceberg.types.Types.NestedField; +import org.apache.iceberg.types.Types.StructType; +import org.apache.iceberg.types.Types.UUIDType; + +/** + * A control event payload for events sent by a coordinator to request workers to send back the + * table data that has been written and is ready to commit. + */ +public class StartCommit implements Payload { + + private UUID commitId; + private final Schema avroSchema; + + static final int COMMIT_ID = 10_200; + + private static final StructType ICEBERG_SCHEMA = + StructType.of(NestedField.required(COMMIT_ID, "commit_id", UUIDType.get())); + private static final Schema AVRO_SCHEMA = AvroUtil.convert(ICEBERG_SCHEMA, StartCommit.class); + + // Used by Avro reflection to instantiate this class when reading events + public StartCommit(Schema avroSchema) { + this.avroSchema = avroSchema; + } + + public StartCommit(UUID commitId) { + this.commitId = commitId; + this.avroSchema = AVRO_SCHEMA; + } + + @Override + public PayloadType type() { + return PayloadType.START_COMMIT; + } + + public UUID commitId() { + return commitId; + } + + @Override + public StructType writeSchema() { + return ICEBERG_SCHEMA; + } + + @Override + public Schema getSchema() { + return avroSchema; + } + + @Override + public void put(int i, Object v) { + switch (AvroUtil.positionToId(i, avroSchema)) { + case COMMIT_ID: + this.commitId = (UUID) v; + return; + default: + // ignore the object, it must be from a newer version of the format + } + } + + @Override + public Object get(int i) { + switch (AvroUtil.positionToId(i, avroSchema)) { + case COMMIT_ID: + return commitId; + default: + throw new UnsupportedOperationException("Unknown field ordinal: " + i); + } + } +} diff --git a/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/TableReference.java b/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/TableReference.java new file mode 100644 index 000000000000..4d8d43e69633 --- /dev/null +++ b/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/TableReference.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.events; + +import static java.util.stream.Collectors.toList; + +import java.util.Arrays; +import java.util.List; +import org.apache.avro.Schema; +import org.apache.avro.generic.IndexedRecord; +import org.apache.avro.util.Utf8; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.types.Types.ListType; +import org.apache.iceberg.types.Types.NestedField; +import org.apache.iceberg.types.Types.StringType; +import org.apache.iceberg.types.Types.StructType; + +/** Element representing a table identifier, with namespace and name. */ +public class TableReference implements IndexedRecord { + + private String catalog; + private List namespace; + private String name; + private final Schema avroSchema; + + static final int CATALOG = 10_600; + static final int NAMESPACE = 10_601; + static final int NAME = 10_603; + + public static final StructType ICEBERG_SCHEMA = + StructType.of( + NestedField.required(CATALOG, "catalog", StringType.get()), + NestedField.required( + NAMESPACE, "namespace", ListType.ofRequired(NAMESPACE + 1, StringType.get())), + NestedField.required(NAME, "name", StringType.get())); + private static final Schema AVRO_SCHEMA = AvroUtil.convert(ICEBERG_SCHEMA, TableReference.class); + + public static TableReference of(String catalog, TableIdentifier tableIdentifier) { + return new TableReference( + catalog, Arrays.asList(tableIdentifier.namespace().levels()), tableIdentifier.name()); + } + + // Used by Avro reflection to instantiate this class when reading events + public TableReference(Schema avroSchema) { + this.avroSchema = avroSchema; + } + + public TableReference(String catalog, List namespace, String name) { + this.catalog = catalog; + this.namespace = namespace; + this.name = name; + this.avroSchema = AVRO_SCHEMA; + } + + public String catalog() { + return catalog; + } + + public TableIdentifier identifier() { + Namespace icebergNamespace = Namespace.of(namespace.toArray(new String[0])); + return TableIdentifier.of(icebergNamespace, name); + } + + @Override + public Schema getSchema() { + return avroSchema; + } + + @Override + @SuppressWarnings("unchecked") + public void put(int i, Object v) { + switch (AvroUtil.positionToId(i, avroSchema)) { + case CATALOG: + this.catalog = v == null ? null : v.toString(); + return; + case NAMESPACE: + this.namespace = + v == null ? null : ((List) v).stream().map(Utf8::toString).collect(toList()); + return; + case NAME: + this.name = v == null ? null : v.toString(); + return; + default: + // ignore the object, it must be from a newer version of the format + } + } + + @Override + public Object get(int i) { + switch (AvroUtil.positionToId(i, avroSchema)) { + case CATALOG: + return catalog; + case NAMESPACE: + return namespace; + case NAME: + return name; + default: + throw new UnsupportedOperationException("Unknown field ordinal: " + i); + } + } +} diff --git a/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/TopicPartitionOffset.java b/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/TopicPartitionOffset.java new file mode 100644 index 000000000000..7bcf5327a8b5 --- /dev/null +++ b/kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/TopicPartitionOffset.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.events; + +import java.time.OffsetDateTime; +import org.apache.avro.Schema; +import org.apache.avro.generic.IndexedRecord; +import org.apache.iceberg.types.Types.IntegerType; +import org.apache.iceberg.types.Types.LongType; +import org.apache.iceberg.types.Types.NestedField; +import org.apache.iceberg.types.Types.StringType; +import org.apache.iceberg.types.Types.StructType; +import org.apache.iceberg.types.Types.TimestampType; +import org.apache.iceberg.util.DateTimeUtil; + +/** Element representing an offset, with topic name, partition number, and offset. */ +public class TopicPartitionOffset implements IndexedRecord { + + private String topic; + private Integer partition; + private Long offset; + private OffsetDateTime timestamp; + private final Schema avroSchema; + + static final int TOPIC = 10_700; + static final int PARTITION = 10_701; + static final int OFFSET = 10_702; + static final int TIMESTAMP = 10_703; + + public static final StructType ICEBERG_SCHEMA = + StructType.of( + NestedField.required(TOPIC, "topic", StringType.get()), + NestedField.required(PARTITION, "partition", IntegerType.get()), + NestedField.optional(OFFSET, "offset", LongType.get()), + NestedField.optional(TIMESTAMP, "timestamp", TimestampType.withZone())); + private static final Schema AVRO_SCHEMA = + AvroUtil.convert(ICEBERG_SCHEMA, TopicPartitionOffset.class); + + // Used by Avro reflection to instantiate this class when reading events + public TopicPartitionOffset(Schema avroSchema) { + this.avroSchema = avroSchema; + } + + public TopicPartitionOffset(String topic, int partition, Long offset, OffsetDateTime timestamp) { + this.topic = topic; + this.partition = partition; + this.offset = offset; + this.timestamp = timestamp; + this.avroSchema = AVRO_SCHEMA; + } + + public String topic() { + return topic; + } + + public Integer partition() { + return partition; + } + + public Long offset() { + return offset; + } + + public OffsetDateTime timestamp() { + return timestamp; + } + + @Override + public Schema getSchema() { + return avroSchema; + } + + @Override + public void put(int i, Object v) { + switch (AvroUtil.positionToId(i, avroSchema)) { + case TOPIC: + this.topic = v == null ? null : v.toString(); + return; + case PARTITION: + this.partition = (Integer) v; + return; + case OFFSET: + this.offset = (Long) v; + return; + case TIMESTAMP: + this.timestamp = v == null ? null : DateTimeUtil.timestamptzFromMicros((Long) v); + return; + default: + // ignore the object, it must be from a newer version of the format + } + } + + @Override + public Object get(int i) { + switch (AvroUtil.positionToId(i, avroSchema)) { + case TOPIC: + return topic; + case PARTITION: + return partition; + case OFFSET: + return offset; + case TIMESTAMP: + return timestamp == null ? null : DateTimeUtil.microsFromTimestamptz(timestamp); + default: + throw new UnsupportedOperationException("Unknown field ordinal: " + i); + } + } +} diff --git a/kafka-connect/kafka-connect-events/src/test/java/org/apache/iceberg/connect/events/EventSerializationTest.java b/kafka-connect/kafka-connect-events/src/test/java/org/apache/iceberg/connect/events/EventSerializationTest.java new file mode 100644 index 000000000000..3ea6143a1861 --- /dev/null +++ b/kafka-connect/kafka-connect-events/src/test/java/org/apache/iceberg/connect/events/EventSerializationTest.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.events; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.Arrays; +import java.util.Collections; +import java.util.UUID; +import org.junit.jupiter.api.Test; + +public class EventSerializationTest { + + @Test + public void testStartCommitSerialization() { + UUID commitId = UUID.randomUUID(); + Event event = new Event("cg-connector", new StartCommit(commitId)); + + byte[] data = AvroUtil.encode(event); + Event result = AvroUtil.decode(data); + + assertThat(result) + .usingRecursiveComparison() + .ignoringFieldsMatchingRegexes(".*avroSchema") + .isEqualTo(event); + } + + @Test + public void testDataWrittenSerialization() { + UUID commitId = UUID.randomUUID(); + Event event = + new Event( + "cg-connector", + new DataWritten( + EventTestUtil.SPEC.partitionType(), + commitId, + new TableReference("catalog", Collections.singletonList("db"), "tbl"), + Arrays.asList(EventTestUtil.createDataFile(), EventTestUtil.createDataFile()), + Arrays.asList(EventTestUtil.createDeleteFile(), EventTestUtil.createDeleteFile()))); + + byte[] data = AvroUtil.encode(event); + Event result = AvroUtil.decode(data); + + assertThat(result) + .usingRecursiveComparison() + .ignoringFieldsMatchingRegexes( + "payload\\.partitionType", + ".*avroSchema", + ".*icebergSchema", + ".*schema", + ".*fromProjectionPos") + .isEqualTo(event); + } + + @Test + public void testDataCompleteSerialization() { + UUID commitId = UUID.randomUUID(); + Event event = + new Event( + "cg-connector", + new DataComplete( + commitId, + Arrays.asList( + new TopicPartitionOffset("topic", 1, 1L, EventTestUtil.now()), + new TopicPartitionOffset("topic", 2, null, null)))); + + byte[] data = AvroUtil.encode(event); + Event result = AvroUtil.decode(data); + + assertThat(result) + .usingRecursiveComparison() + .ignoringFieldsMatchingRegexes(".*avroSchema") + .isEqualTo(event); + } + + @Test + public void testCommitToTableSerialization() { + UUID commitId = UUID.randomUUID(); + Event event = + new Event( + "cg-connector", + new CommitToTable( + commitId, + new TableReference("catalog", Collections.singletonList("db"), "tbl"), + 1L, + EventTestUtil.now())); + + byte[] data = AvroUtil.encode(event); + Event result = AvroUtil.decode(data); + + assertThat(result) + .usingRecursiveComparison() + .ignoringFieldsMatchingRegexes(".*avroSchema") + .isEqualTo(event); + } + + @Test + public void testCommitCompleteSerialization() { + UUID commitId = UUID.randomUUID(); + Event event = new Event("cg-connector", new CommitComplete(commitId, EventTestUtil.now())); + + byte[] data = AvroUtil.encode(event); + Event result = AvroUtil.decode(data); + + assertThat(result) + .usingRecursiveComparison() + .ignoringFieldsMatchingRegexes(".*avroSchema") + .isEqualTo(event); + } +} diff --git a/kafka-connect/kafka-connect-events/src/test/java/org/apache/iceberg/connect/events/EventTestUtil.java b/kafka-connect/kafka-connect-events/src/test/java/org/apache/iceberg/connect/events/EventTestUtil.java new file mode 100644 index 000000000000..8f1f7a601f86 --- /dev/null +++ b/kafka-connect/kafka-connect-events/src/test/java/org/apache/iceberg/connect/events/EventTestUtil.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.connect.events; + +import java.nio.ByteBuffer; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.time.temporal.ChronoUnit; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.FileMetadata; +import org.apache.iceberg.Metrics; +import org.apache.iceberg.NullOrder; +import org.apache.iceberg.PartitionData; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SortDirection; +import org.apache.iceberg.SortOrder; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.types.Types; + +class EventTestUtil { + private EventTestUtil() {} + + static final Schema SCHEMA = + new Schema(ImmutableList.of(Types.NestedField.required(1, "id", Types.LongType.get()))); + + static final PartitionSpec SPEC = + PartitionSpec.builderFor(SCHEMA).identity("id").withSpecId(1).build(); + + static final SortOrder ORDER = + SortOrder.builderFor(SCHEMA).sortBy("id", SortDirection.ASC, NullOrder.NULLS_FIRST).build(); + + static final Metrics METRICS = + new Metrics( + 1L, + ImmutableMap.of(1, 1L), + ImmutableMap.of(1, 1L), + ImmutableMap.of(1, 1L), + ImmutableMap.of(1, 1L), + ImmutableMap.of(1, ByteBuffer.wrap(new byte[10])), + ImmutableMap.of(1, ByteBuffer.wrap(new byte[10]))); + + static OffsetDateTime now() { + return OffsetDateTime.now(ZoneOffset.UTC).truncatedTo(ChronoUnit.MICROS); + } + + static DataFile createDataFile() { + PartitionData data = new PartitionData(SPEC.partitionType()); + data.set(0, 1L); + + return DataFiles.builder(SPEC) + .withEncryptionKeyMetadata(ByteBuffer.wrap(new byte[] {0})) + .withFileSizeInBytes(100L) + .withFormat(FileFormat.PARQUET) + .withMetrics(METRICS) + .withPartition(data) + .withPath("path/to/file.parquet") + .withSortOrder(ORDER) + .withSplitOffsets(ImmutableList.of(4L)) + .build(); + } + + static DeleteFile createDeleteFile() { + PartitionData data = new PartitionData(SPEC.partitionType()); + data.set(0, 1L); + + return FileMetadata.deleteFileBuilder(SPEC) + .ofEqualityDeletes(1) + .withEncryptionKeyMetadata(ByteBuffer.wrap(new byte[] {0})) + .withFileSizeInBytes(100L) + .withFormat(FileFormat.PARQUET) + .withMetrics(METRICS) + .withPartition(data) + .withPath("path/to/file.parquet") + .withSortOrder(ORDER) + .withSplitOffsets(ImmutableList.of(4L)) + .build(); + } +} diff --git a/settings.gradle b/settings.gradle index d2c64da78a36..17be2bfc67a7 100644 --- a/settings.gradle +++ b/settings.gradle @@ -40,6 +40,7 @@ include 'gcp-bundle' include 'dell' include 'snowflake' include 'delta-lake' +include 'kafka-connect' project(':api').name = 'iceberg-api' project(':common').name = 'iceberg-common' @@ -63,6 +64,7 @@ project(':gcp-bundle').name = 'iceberg-gcp-bundle' project(':dell').name = 'iceberg-dell' project(':snowflake').name = 'iceberg-snowflake' project(':delta-lake').name = 'iceberg-delta-lake' +project(':kafka-connect').name = 'iceberg-kafka-connect' if (null != System.getProperty("allVersions")) { System.setProperty("flinkVersions", System.getProperty("knownFlinkVersions")) @@ -188,3 +190,6 @@ if (JavaVersion.current() == JavaVersion.VERSION_1_8) { } } +include ":iceberg-kafka-connect:kafka-connect-events" +project(":iceberg-kafka-connect:kafka-connect-events").projectDir = file('kafka-connect/kafka-connect-events') +project(":iceberg-kafka-connect:kafka-connect-events").name = "iceberg-kafka-connect-events"