From b2930f6fd2cb5f36a8e28da0bfb301a00ce175fd Mon Sep 17 00:00:00 2001 From: Ryan D'Souza <93551010+ryandsouzaappfolio@users.noreply.github.com> Date: Thu, 1 Aug 2024 14:31:51 -0700 Subject: [PATCH] [FLINK-20454] Suport reading Debezium metadata fields using `debezium-avro-confluent` format --- .../debezium/DebeziumAvroDecodingFormat.java | 267 ++++++++++++++++++ .../DebeziumAvroDeserializationSchema.java | 116 ++++++-- .../debezium/DebeziumAvroFormatFactory.java | 33 +-- .../DebeziumAvroFormatFactoryTest.java | 17 +- .../debezium/DebeziumAvroSerDeSchemaTest.java | 7 +- 5 files changed, 383 insertions(+), 57 deletions(-) create mode 100644 flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/debezium/DebeziumAvroDecodingFormat.java diff --git a/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/debezium/DebeziumAvroDecodingFormat.java b/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/debezium/DebeziumAvroDecodingFormat.java new file mode 100644 index 0000000000000..f8e2a8a09ce16 --- /dev/null +++ b/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/debezium/DebeziumAvroDecodingFormat.java @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro.registry.confluent.debezium; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.formats.avro.registry.confluent.debezium.DebeziumAvroDeserializationSchema.MetadataConverter; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.Projection; +import org.apache.flink.table.connector.format.DecodingFormat; +import org.apache.flink.table.connector.format.ProjectableDecodingFormat; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.data.GenericMapData; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.utils.DataTypeUtils; +import org.apache.flink.types.RowKind; + +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +/** {@link DecodingFormat} for Debezium using Avro encoding. */ +public class DebeziumAvroDecodingFormat + implements ProjectableDecodingFormat> { + + // -------------------------------------------------------------------------------------------- + // Mutable attributes + // -------------------------------------------------------------------------------------------- + + private List metadataKeys; + + // -------------------------------------------------------------------------------------------- + // Debezium-specific attributes + // -------------------------------------------------------------------------------------------- + + private final String schemaRegistryURL; + private final String schema; + private final Map optionalPropertiesMap; + + public DebeziumAvroDecodingFormat( + String schemaRegistryURL, String schema, Map optionalPropertiesMap) { + this.schemaRegistryURL = schemaRegistryURL; + this.schema = schema; + this.optionalPropertiesMap = optionalPropertiesMap; + this.metadataKeys = Collections.emptyList(); + } + + @Override + public DeserializationSchema createRuntimeDecoder( + DynamicTableSource.Context context, DataType physicalDataType, int[][] projections) { + physicalDataType = Projection.of(projections).project(physicalDataType); + + final List readableMetadata = + metadataKeys.stream() + .map( + k -> + Stream.of(ReadableMetadata.values()) + .filter(rm -> rm.key.equals(k)) + .findFirst() + .orElseThrow(IllegalStateException::new)) + .collect(Collectors.toList()); + final List metadataFields = + readableMetadata.stream() + .map(m -> DataTypes.FIELD(m.key, m.dataType)) + .collect(Collectors.toList()); + + final DataType producedDataType = + DataTypeUtils.appendRowFields(physicalDataType, metadataFields); + final TypeInformation producedTypeInfo = + context.createTypeInformation(producedDataType); + + return new DebeziumAvroDeserializationSchema( + physicalDataType, + readableMetadata, + producedTypeInfo, + schemaRegistryURL, + schema, + optionalPropertiesMap); + } + + @Override + public Map listReadableMetadata() { + final Map metadataMap = new LinkedHashMap<>(); + Stream.of(ReadableMetadata.values()) + .forEachOrdered(m -> metadataMap.put(m.key, m.dataType)); + return metadataMap; + } + + @Override + public void applyReadableMetadata(List metadataKeys) { + this.metadataKeys = metadataKeys; + } + + @Override + public ChangelogMode getChangelogMode() { + return ChangelogMode.newBuilder() + .addContainedKind(RowKind.INSERT) + .addContainedKind(RowKind.UPDATE_BEFORE) + .addContainedKind(RowKind.UPDATE_AFTER) + .addContainedKind(RowKind.DELETE) + .build(); + } + + // -------------------------------------------------------------------------------------------- + // Metadata handling + // -------------------------------------------------------------------------------------------- + + /** List of metadata that can be read with this format. */ + enum ReadableMetadata { + INGESTION_TIMESTAMP( + "ingestion-timestamp", + DataTypes.TIMESTAMP(3).nullable(), + DataTypes.FIELD("ts_ms", DataTypes.TIMESTAMP(3)), + new MetadataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(GenericRowData row, int unused) { + return row; + } + }), + + SOURCE_TIMESTAMP( + "source.timestamp", + DataTypes.TIMESTAMP(3).nullable(), + SOURCE_FIELD, + new MetadataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(GenericRowData row, int unused) { + int pos = SOURCE_PROPERTY_POSITION.get("ts_ms"); + return row.getField(pos); + } + }), + + SOURCE_DATABASE( + "source.database", + DataTypes.STRING().nullable(), + SOURCE_FIELD, + new MetadataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(GenericRowData row, int unused) { + int pos = SOURCE_PROPERTY_POSITION.get("db"); + return row.getField(pos); + } + }), + + SOURCE_SCHEMA( + "source.schema", + DataTypes.STRING().nullable(), + SOURCE_FIELD, + new MetadataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(GenericRowData row, int unused) { + int pos = SOURCE_PROPERTY_POSITION.get("schema"); + return row.getField(pos); + } + }), + + SOURCE_TABLE( + "source.table", + DataTypes.STRING().nullable(), + SOURCE_FIELD, + new MetadataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(GenericRowData row, int unused) { + int pos = SOURCE_PROPERTY_POSITION.get("table"); + return row.getField(pos); + } + }), + + SOURCE_PROPERTIES( + "source.properties", + // key and value of the map are nullable to make handling easier in queries + DataTypes.MAP(DataTypes.STRING().nullable(), DataTypes.STRING().nullable()) + .nullable(), + SOURCE_FIELD, + new MetadataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(GenericRowData row, int unused) { + Map result = new HashMap<>(); + for (int i = 0; i < SOURCE_PROPERTY_FIELDS.length; i++) { + Object value = row.getField(i); + result.put( + StringData.fromString(SOURCE_PROPERTY_FIELDS[i].getName()), + value == null ? null : StringData.fromString(value.toString())); + } + return new GenericMapData(result); + } + }); + + final String key; + + final DataType dataType; + + final DataTypes.Field requiredAvroField; + + final MetadataConverter converter; + + ReadableMetadata( + String key, + DataType dataType, + DataTypes.Field requiredAvroField, + MetadataConverter converter) { + this.key = key; + this.dataType = dataType; + this.requiredAvroField = requiredAvroField; + this.converter = converter; + } + } + + private static final DataTypes.Field[] SOURCE_PROPERTY_FIELDS = { + DataTypes.FIELD("version", DataTypes.STRING()), + DataTypes.FIELD("connector", DataTypes.STRING()), + DataTypes.FIELD("name", DataTypes.STRING()), + DataTypes.FIELD("ts_ms", DataTypes.TIMESTAMP(3).nullable()), + DataTypes.FIELD("snapshot", DataTypes.STRING().nullable()), + DataTypes.FIELD("db", DataTypes.STRING()), + DataTypes.FIELD("sequence", DataTypes.STRING().nullable()), + DataTypes.FIELD("schema", DataTypes.STRING().nullable()), + DataTypes.FIELD("table", DataTypes.STRING()), + DataTypes.FIELD("txId", DataTypes.STRING().nullable()), + DataTypes.FIELD("scn", DataTypes.STRING().nullable()), + DataTypes.FIELD("commit_scn", DataTypes.STRING().nullable()), + DataTypes.FIELD("lcr_position", DataTypes.STRING().nullable()) + }; + private static final Map SOURCE_PROPERTY_POSITION = + IntStream.range(0, SOURCE_PROPERTY_FIELDS.length) + .boxed() + .collect(Collectors.toMap(i -> SOURCE_PROPERTY_FIELDS[i].getName(), i -> i)); + private static final DataTypes.Field SOURCE_FIELD = + DataTypes.FIELD("source", DataTypes.ROW(SOURCE_PROPERTY_FIELDS)); +} diff --git a/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/debezium/DebeziumAvroDeserializationSchema.java b/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/debezium/DebeziumAvroDeserializationSchema.java index f72309e69801d..4a06adbb54de7 100644 --- a/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/debezium/DebeziumAvroDeserializationSchema.java +++ b/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/debezium/DebeziumAvroDeserializationSchema.java @@ -25,12 +25,14 @@ import org.apache.flink.formats.avro.AvroRowDataDeserializationSchema; import org.apache.flink.formats.avro.AvroToRowDataConverters; import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema; +import org.apache.flink.formats.avro.registry.confluent.debezium.DebeziumAvroDecodingFormat.ReadableMetadata; import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.utils.DataTypeUtils; import org.apache.flink.types.RowKind; import org.apache.flink.util.Collector; @@ -40,12 +42,14 @@ import javax.annotation.Nullable; import java.io.IOException; +import java.io.Serializable; +import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.stream.Collectors; import static java.lang.String.format; import static org.apache.flink.formats.avro.registry.confluent.debezium.DebeziumAvroFormatFactory.validateSchemaString; -import static org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType; /** * Deserialization schema from Debezium Avro to Flink Table/SQL internal data structure {@link @@ -80,14 +84,22 @@ public final class DebeziumAvroDeserializationSchema implements DeserializationS /** TypeInformation of the produced {@link RowData}. */ private final TypeInformation producedTypeInfo; + /** Flag that indicates that an additional projection is required for metadata. */ + private final boolean hasMetadata; + + /** Metadata to be extracted for every record. */ + private final MetadataConverter[] metadataConverters; + public DebeziumAvroDeserializationSchema( - RowType rowType, + DataType physicalDataType, + List requestedMetadata, TypeInformation producedTypeInfo, String schemaRegistryUrl, @Nullable String schemaString, @Nullable Map registryConfigs) { this.producedTypeInfo = producedTypeInfo; - RowType debeziumAvroRowType = createDebeziumAvroRowType(fromLogicalToDataType(rowType)); + RowType debeziumAvroRowType = + createDebeziumAvroRowType(physicalDataType, requestedMetadata); validateSchemaString(schemaString, debeziumAvroRowType); Schema schema = @@ -101,6 +113,28 @@ public DebeziumAvroDeserializationSchema( schema, schemaRegistryUrl, registryConfigs), AvroToRowDataConverters.createRowConverter(debeziumAvroRowType), producedTypeInfo); + + this.hasMetadata = requestedMetadata.size() > 0; + this.metadataConverters = + requestedMetadata.stream() + .map( + m -> { + final int rootPosition = + debeziumAvroRowType + .getFieldNames() + .indexOf(m.requiredAvroField.getName()); + return (MetadataConverter) + (row, pos) -> { + Object result = row.getField(rootPosition); + if (result instanceof GenericRowData) { + result = + m.converter.convert( + (GenericRowData) result, pos); + } + return result; + }; + }) + .toArray(MetadataConverter[]::new); } @VisibleForTesting @@ -109,6 +143,8 @@ public DebeziumAvroDeserializationSchema( AvroRowDataDeserializationSchema avroDeserializer) { this.producedTypeInfo = producedTypeInfo; this.avroDeserializer = avroDeserializer; + this.hasMetadata = false; + this.metadataConverters = new MetadataConverter[0]; } @Override @@ -137,7 +173,7 @@ public void deserialize(byte[] message, Collector out) throws IOExcepti String op = row.getField(2).toString(); if (OP_CREATE.equals(op) || OP_READ.equals(op)) { after.setRowKind(RowKind.INSERT); - out.collect(after); + emitRow(row, after, out); } else if (OP_UPDATE.equals(op)) { if (before == null) { throw new IllegalStateException( @@ -145,15 +181,15 @@ public void deserialize(byte[] message, Collector out) throws IOExcepti } before.setRowKind(RowKind.UPDATE_BEFORE); after.setRowKind(RowKind.UPDATE_AFTER); - out.collect(before); - out.collect(after); + emitRow(row, before, out); + emitRow(row, after, out); } else if (OP_DELETE.equals(op)) { if (before == null) { throw new IllegalStateException( String.format(REPLICA_IDENTITY_EXCEPTION, "DELETE")); } before.setRowKind(RowKind.DELETE); - out.collect(before); + emitRow(row, before, out); } else { throw new IOException( format( @@ -166,6 +202,33 @@ public void deserialize(byte[] message, Collector out) throws IOExcepti } } + private void emitRow( + GenericRowData rootRow, GenericRowData physicalRow, Collector out) { + // shortcut in case no output projection is required + if (!hasMetadata) { + out.collect(physicalRow); + return; + } + + final int physicalArity = physicalRow.getArity(); + final int metadataArity = metadataConverters.length; + + final GenericRowData producedRow = + new GenericRowData(physicalRow.getRowKind(), physicalArity + metadataArity); + + for (int physicalPos = 0; physicalPos < physicalArity; physicalPos++) { + producedRow.setField(physicalPos, physicalRow.getField(physicalPos)); + } + + for (int metadataPos = 0; metadataPos < metadataArity; metadataPos++) { + producedRow.setField( + physicalArity + metadataPos, + metadataConverters[metadataPos].convert(rootRow, metadataPos)); + } + + out.collect(producedRow); + } + @Override public boolean isEndOfStream(RowData nextElement) { return false; @@ -186,22 +249,41 @@ public boolean equals(Object o) { } DebeziumAvroDeserializationSchema that = (DebeziumAvroDeserializationSchema) o; return Objects.equals(avroDeserializer, that.avroDeserializer) - && Objects.equals(producedTypeInfo, that.producedTypeInfo); + && Objects.equals(producedTypeInfo, that.producedTypeInfo) + && hasMetadata == that.hasMetadata; } @Override public int hashCode() { - return Objects.hash(avroDeserializer, producedTypeInfo); + return Objects.hash(avroDeserializer, producedTypeInfo, hasMetadata); } - public static RowType createDebeziumAvroRowType(DataType databaseSchema) { - // Debezium Avro contains other information, e.g. "source", "ts_ms" - // but we don't need them - return (RowType) + public static RowType createDebeziumAvroRowType( + DataType databaseSchema, List readableMetadata) { + DataType payload = DataTypes.ROW( - DataTypes.FIELD("before", databaseSchema.nullable()), - DataTypes.FIELD("after", databaseSchema.nullable()), - DataTypes.FIELD("op", DataTypes.STRING())) - .getLogicalType(); + DataTypes.FIELD("before", databaseSchema.nullable()), + DataTypes.FIELD("after", databaseSchema.nullable()), + DataTypes.FIELD("op", DataTypes.STRING())); + + // append fields that are required for reading metadata in the payload + final List payloadMetadataFields = + readableMetadata.stream() + .map(m -> m.requiredAvroField) + .distinct() + .collect(Collectors.toList()); + payload = DataTypeUtils.appendRowFields(payload, payloadMetadataFields); + + return (RowType) payload.getLogicalType(); + } + + // -------------------------------------------------------------------------------------------- + + /** + * Converter that extracts a metadata field from the row payload that comes out of the Avro + * schema and converts it to the desired data type. + */ + interface MetadataConverter extends Serializable { + Object convert(GenericRowData row, int pos); } } diff --git a/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/debezium/DebeziumAvroFormatFactory.java b/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/debezium/DebeziumAvroFormatFactory.java index 5925f23f76325..08fece5a7e18d 100644 --- a/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/debezium/DebeziumAvroFormatFactory.java +++ b/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/debezium/DebeziumAvroFormatFactory.java @@ -21,18 +21,14 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.serialization.SerializationSchema; -import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.connector.ChangelogMode; -import org.apache.flink.table.connector.Projection; import org.apache.flink.table.connector.format.DecodingFormat; import org.apache.flink.table.connector.format.EncodingFormat; -import org.apache.flink.table.connector.format.ProjectableDecodingFormat; import org.apache.flink.table.connector.sink.DynamicTableSink; -import org.apache.flink.table.connector.source.DynamicTableSource; import org.apache.flink.table.data.RowData; import org.apache.flink.table.factories.DeserializationFormatFactory; import org.apache.flink.table.factories.DynamicTableFactory; @@ -84,34 +80,7 @@ public DecodingFormat> createDecodingFormat( String schema = formatOptions.getOptional(SCHEMA).orElse(null); Map optionalPropertiesMap = buildOptionalPropertiesMap(formatOptions); - return new ProjectableDecodingFormat>() { - @Override - public DeserializationSchema createRuntimeDecoder( - DynamicTableSource.Context context, - DataType producedDataType, - int[][] projections) { - producedDataType = Projection.of(projections).project(producedDataType); - final RowType rowType = (RowType) producedDataType.getLogicalType(); - final TypeInformation producedTypeInfo = - context.createTypeInformation(producedDataType); - return new DebeziumAvroDeserializationSchema( - rowType, - producedTypeInfo, - schemaRegistryURL, - schema, - optionalPropertiesMap); - } - - @Override - public ChangelogMode getChangelogMode() { - return ChangelogMode.newBuilder() - .addContainedKind(RowKind.INSERT) - .addContainedKind(RowKind.UPDATE_BEFORE) - .addContainedKind(RowKind.UPDATE_AFTER) - .addContainedKind(RowKind.DELETE) - .build(); - } - }; + return new DebeziumAvroDecodingFormat(schemaRegistryURL, schema, optionalPropertiesMap); } @Override diff --git a/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/debezium/DebeziumAvroFormatFactoryTest.java b/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/debezium/DebeziumAvroFormatFactoryTest.java index b39cc1e283fb3..c14664547f771 100644 --- a/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/debezium/DebeziumAvroFormatFactoryTest.java +++ b/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/debezium/DebeziumAvroFormatFactoryTest.java @@ -20,6 +20,7 @@ import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.formats.avro.registry.confluent.debezium.DebeziumAvroDecodingFormat.ReadableMetadata; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.catalog.Column; import org.apache.flink.table.catalog.ResolvedSchema; @@ -32,14 +33,18 @@ import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; import org.apache.flink.table.types.logical.RowType; -import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.Test; +import javax.annotation.Nonnull; + +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSink; import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSource; +import static org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -117,7 +122,7 @@ class DebeziumAvroFormatFactoryTest { private static final RowType ROW_TYPE = (RowType) SCHEMA.toPhysicalRowDataType().getLogicalType(); - + private static final List REQUESTED_METADATA = Collections.emptyList(); private static final String SUBJECT = "test-debezium-avro"; private static final String REGISTRY_URL = "http://localhost:8081"; @@ -128,7 +133,8 @@ void testSeDeSchema() { DebeziumAvroDeserializationSchema expectedDeser = new DebeziumAvroDeserializationSchema( - ROW_TYPE, + fromLogicalToDataType(ROW_TYPE), + REQUESTED_METADATA, InternalTypeInfo.of(ROW_TYPE), REGISTRY_URL, null, @@ -152,7 +158,8 @@ public void testSeDeSchemaWithSchemaOption() { DebeziumAvroDeserializationSchema expectedDeser = new DebeziumAvroDeserializationSchema( - ROW_TYPE, + fromLogicalToDataType(ROW_TYPE), + REQUESTED_METADATA, InternalTypeInfo.of(ROW_TYPE), REGISTRY_URL, AVRO_SCHEMA, @@ -203,7 +210,7 @@ public void testSeDeSchemaWithInvalidSchemaOption() { assertThrows(IllegalArgumentException.class, () -> createSerializationSchema(options)); } - @NotNull + @Nonnull private Map getRegistryConfigs() { final Map registryConfigs = new HashMap<>(); registryConfigs.put("basic.auth.user.info", "something1"); diff --git a/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/debezium/DebeziumAvroSerDeSchemaTest.java b/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/debezium/DebeziumAvroSerDeSchemaTest.java index 2640b50e44c59..334b20e3730ad 100644 --- a/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/debezium/DebeziumAvroSerDeSchemaTest.java +++ b/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/debezium/DebeziumAvroSerDeSchemaTest.java @@ -27,6 +27,7 @@ import org.apache.flink.formats.avro.RegistryAvroSerializationSchema; import org.apache.flink.formats.avro.RowDataToAvroConverters; import org.apache.flink.formats.avro.registry.confluent.ConfluentSchemaRegistryCoder; +import org.apache.flink.formats.avro.registry.confluent.debezium.DebeziumAvroDecodingFormat.ReadableMetadata; import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; @@ -77,7 +78,7 @@ class DebeziumAvroSerDeSchemaTest { FIELD("description", STRING()), FIELD("weight", DOUBLE())) .getLogicalType(); - + private static final List requestedMetadata = Collections.emptyList(); private static final Schema DEBEZIUM_SCHEMA_COMPATIBLE_TEST = new Schema.Parser().parse(new String(readBytesFromFile("debezium-test-schema.json"))); @@ -88,7 +89,7 @@ void testSerializationDeserialization() throws Exception { RowType rowTypeDe = DebeziumAvroDeserializationSchema.createDebeziumAvroRowType( - fromLogicalToDataType(rowType)); + fromLogicalToDataType(rowType), requestedMetadata); RowType rowTypeSe = DebeziumAvroSerializationSchema.createDebeziumAvroRowType( fromLogicalToDataType(rowType)); @@ -149,7 +150,7 @@ void testDeleteDataDeserialization() throws Exception { public List testDeserialization(String dataPath) throws Exception { RowType rowTypeDe = DebeziumAvroDeserializationSchema.createDebeziumAvroRowType( - fromLogicalToDataType(rowType)); + fromLogicalToDataType(rowType), requestedMetadata); client.register(SUBJECT, DEBEZIUM_SCHEMA_COMPATIBLE_TEST, 1, 81);