From 613a261c5fdc07accae0690e38dc4dfdfad58fe6 Mon Sep 17 00:00:00 2001 From: yuzelin <33053040+yuzelin@users.noreply.github.com> Date: Tue, 26 Sep 2023 12:43:32 +0800 Subject: [PATCH] [cdc] Refactor Kafka and MongoDB cdc schema build (#2069) --- .../action/cdc/CdcActionCommonUtils.java | 75 ++--------- .../flink/action/cdc/ComputedColumnUtils.java | 6 +- .../action/cdc/kafka/KafkaActionUtils.java | 32 ----- ...KafkaSchema.java => KafkaSchemaUtils.java} | 64 +--------- .../cdc/kafka/KafkaSyncTableAction.java | 24 ++-- .../cdc/kafka/formats/RecordParser.java | 30 ++++- .../cdc/mongodb/MongoDBActionUtils.java | 35 ------ .../cdc/mongodb/MongoDBSyncTableAction.java | 23 ++-- ...odbSchema.java => MongodbSchemaUtils.java} | 119 +++++------------- .../cdc/mysql/schema/MySqlSchemaUtils.java | 31 +++-- .../action/cdc/kafka/KafkaSchemaITCase.java | 28 ++--- .../cdc/mongodb/MongodbSchemaITCase.java | 39 +++--- 12 files changed, 153 insertions(+), 353 deletions(-) rename paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/{KafkaSchema.java => KafkaSchemaUtils.java} (78%) rename paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/{MongodbSchema.java => MongodbSchemaUtils.java} (65%) diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java index a73d98ba69d4..19e3283fc926 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java @@ -29,12 +29,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.Nullable; - import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -133,6 +132,18 @@ public static List listCaseConvert(List origin, boolean caseSens : origin.stream().map(String::toLowerCase).collect(Collectors.toList()); } + public static String columnCaseConvertAndDuplicateCheck( + String column, + Set existedFields, + boolean caseSensitive, + Function columnDuplicateErrMsg) { + if (caseSensitive) { + return column; + } + checkArgument(existedFields.add(column.toLowerCase()), columnDuplicateErrMsg.apply(column)); + return column.toLowerCase(); + } + public static Schema buildPaimonSchema( List specifiedPartitionKeys, List specifiedPrimaryKeys, @@ -194,66 +205,6 @@ public static Schema buildPaimonSchema( return builder.build(); } - public static Schema buildPaimonSchema( - List specifiedPartitionKeys, - List specifiedPrimaryKeys, - List computedColumns, - Map tableConfig, - LinkedHashMap sourceColumns, - @Nullable List sourceColumnComments, - List sourcePrimaryKeys) { - Schema.Builder builder = Schema.newBuilder(); - - // options - builder.options(tableConfig); - - // columns - if (sourceColumnComments != null) { - checkArgument( - sourceColumns.size() == sourceColumnComments.size(), - "Source table columns count and column comments count should be equal."); - - int i = 0; - for (Map.Entry entry : sourceColumns.entrySet()) { - builder.column(entry.getKey(), entry.getValue(), sourceColumnComments.get(i++)); - } - } else { - sourceColumns.forEach(builder::column); - } - - for (ComputedColumn computedColumn : computedColumns) { - builder.column(computedColumn.columnName(), computedColumn.columnType()); - } - - // primary keys - if (!specifiedPrimaryKeys.isEmpty()) { - for (String key : specifiedPrimaryKeys) { - if (!sourceColumns.containsKey(key) - && computedColumns.stream().noneMatch(c -> c.columnName().equals(key))) { - throw new IllegalArgumentException( - "Specified primary key '" - + key - + "' does not exist in source tables or computed columns."); - } - } - builder.primaryKey(specifiedPrimaryKeys); - } else if (!sourcePrimaryKeys.isEmpty()) { - builder.primaryKey(sourcePrimaryKeys); - } else { - throw new IllegalArgumentException( - "Primary keys are not specified. " - + "Also, can't infer primary keys from source table schemas because " - + "source tables have no primary keys or have different primary keys."); - } - - // partition keys - if (!specifiedPartitionKeys.isEmpty()) { - builder.partitionKeys(specifiedPartitionKeys); - } - - return builder.build(); - } - public static String tableList( MultiTablesSinkMode mode, String databasePattern, diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumnUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumnUtils.java index 8a3e3a86699b..ccc0764a23ed 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumnUtils.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumnUtils.java @@ -36,15 +36,11 @@ public class ComputedColumnUtils { public static List buildComputedColumns( List computedColumnArgs, Schema schema) { - Map dataFields = + Map typeMapping = schema.fields().stream() .collect( Collectors.toMap(DataField::name, DataField::type, (v1, v2) -> v2)); - return buildComputedColumns(computedColumnArgs, dataFields); - } - public static List buildComputedColumns( - List computedColumnArgs, Map typeMapping) { List computedColumns = new ArrayList<>(); for (String columnArg : computedColumnArgs) { String[] kv = columnArg.split("="); diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java index c249db8eecd0..7a5f46bf7057 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java @@ -18,10 +18,6 @@ package org.apache.paimon.flink.action.cdc.kafka; -import org.apache.paimon.flink.action.cdc.CdcActionCommonUtils; -import org.apache.paimon.flink.action.cdc.ComputedColumn; -import org.apache.paimon.schema.Schema; -import org.apache.paimon.types.DataType; import org.apache.paimon.utils.StringUtils; import org.apache.flink.api.common.serialization.SimpleStringSchema; @@ -41,7 +37,6 @@ import java.util.Arrays; import java.util.HashMap; -import java.util.LinkedHashMap; import java.util.List; import java.util.Locale; import java.util.Map; @@ -50,9 +45,6 @@ import java.util.stream.Collectors; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_SPECIFIC_OFFSETS; -import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.columnDuplicateErrMsg; -import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.listCaseConvert; -import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.mapKeyCaseConvert; import static org.apache.paimon.utils.Preconditions.checkArgument; class KafkaActionUtils { @@ -62,30 +54,6 @@ class KafkaActionUtils { private static final String PARTITION = "partition"; private static final String OFFSET = "offset"; - static Schema buildPaimonSchema( - KafkaSchema kafkaSchema, - List specifiedPartitionKeys, - List specifiedPrimaryKeys, - List computedColumns, - Map tableConfig, - boolean caseSensitive) { - LinkedHashMap sourceColumns = - mapKeyCaseConvert( - kafkaSchema.fields(), - caseSensitive, - columnDuplicateErrMsg(kafkaSchema.tableName())); - List primaryKeys = listCaseConvert(kafkaSchema.primaryKeys(), caseSensitive); - - return CdcActionCommonUtils.buildPaimonSchema( - specifiedPartitionKeys, - specifiedPrimaryKeys, - computedColumns, - tableConfig, - sourceColumns, - null, - primaryKeys); - } - static KafkaSource buildKafkaSource(Configuration kafkaConfig) { validateKafkaConfig(kafkaConfig); KafkaSourceBuilder kafkaSourceBuilder = KafkaSource.builder(); diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchema.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchemaUtils.java similarity index 78% rename from paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchema.java rename to paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchemaUtils.java index a01ce3de6936..330df8e79918 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchema.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchemaUtils.java @@ -21,7 +21,7 @@ import org.apache.paimon.flink.action.cdc.TypeMapping; import org.apache.paimon.flink.action.cdc.kafka.formats.DataFormat; import org.apache.paimon.flink.action.cdc.kafka.formats.RecordParser; -import org.apache.paimon.types.DataType; +import org.apache.paimon.schema.Schema; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions; @@ -36,7 +36,6 @@ import java.time.Duration; import java.util.Collection; import java.util.Collections; -import java.util.LinkedHashMap; import java.util.List; import java.util.Objects; import java.util.Optional; @@ -47,42 +46,11 @@ import static org.apache.paimon.flink.action.cdc.kafka.KafkaActionUtils.kafkaPropertiesGroupId; import static org.apache.paimon.flink.action.cdc.kafka.formats.DataFormat.getDataFormat; -/** Utility class to load canal kafka schema. */ -public class KafkaSchema { +/** Utility class to load kafka schema. */ +public class KafkaSchemaUtils { private static final int MAX_RETRY = 5; private static final int POLL_TIMEOUT_MILLIS = 1000; - private final String databaseName; - private final String tableName; - private final LinkedHashMap fields; - private final List primaryKeys; - - public KafkaSchema( - String databaseName, - String tableName, - LinkedHashMap fields, - List primaryKeys) { - this.databaseName = databaseName; - this.tableName = tableName; - this.fields = fields; - this.primaryKeys = primaryKeys; - } - - public String tableName() { - return tableName; - } - - public String databaseName() { - return databaseName; - } - - public LinkedHashMap fields() { - return fields; - } - - public List primaryKeys() { - return primaryKeys; - } private static KafkaConsumer getKafkaEarliestConsumer( Configuration kafkaConfig, String topic) { @@ -123,8 +91,8 @@ private static KafkaConsumer getKafkaEarliestConsumer( * @return The Kafka schema for the topic. * @throws KafkaSchemaRetrievalException If unable to retrieve the schema after max retries. */ - public static KafkaSchema getKafkaSchema( - Configuration kafkaConfig, String topic, TypeMapping typeMapping) + public static Schema getKafkaSchema( + Configuration kafkaConfig, String topic, TypeMapping typeMapping, boolean caseSensitive) throws KafkaSchemaRetrievalException { KafkaConsumer consumer = getKafkaEarliestConsumer(kafkaConfig, topic); int retry = 0; @@ -140,7 +108,7 @@ public static KafkaSchema getKafkaSchema( Stream> recordStream = StreamSupport.stream(records.spliterator(), false); - Optional kafkaSchema = + Optional kafkaSchema = recordStream .map(record -> recordParser.getKafkaSchema(record.value())) .filter(Objects::nonNull) @@ -175,24 +143,4 @@ public KafkaSchemaRetrievalException(String message) { super(message); } } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (!(o instanceof KafkaSchema)) { - return false; - } - KafkaSchema that = (KafkaSchema) o; - return databaseName.equals(that.databaseName) - && tableName.equals(that.tableName) - && fields.equals(that.fields) - && primaryKeys.equals(that.primaryKeys); - } - - @Override - public int hashCode() { - return Objects.hash(databaseName, tableName, fields, primaryKeys); - } } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java index 2804b04e0a3f..87d3f0ddbe0c 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java @@ -24,7 +24,6 @@ import org.apache.paimon.flink.FlinkConnectorOptions; import org.apache.paimon.flink.action.Action; import org.apache.paimon.flink.action.ActionBase; -import org.apache.paimon.flink.action.cdc.CdcActionCommonUtils; import org.apache.paimon.flink.action.cdc.ComputedColumn; import org.apache.paimon.flink.action.cdc.TypeMapping; import org.apache.paimon.flink.action.cdc.kafka.formats.DataFormat; @@ -48,6 +47,8 @@ import java.util.List; import java.util.Map; +import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.assertSchemaCompatible; +import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.buildPaimonSchema; import static org.apache.paimon.flink.action.cdc.ComputedColumnUtils.buildComputedColumns; /** @@ -144,28 +145,25 @@ public KafkaSyncTableAction withTypeMapping(TypeMapping typeMapping) { public void build(StreamExecutionEnvironment env) throws Exception { KafkaSource source = KafkaActionUtils.buildKafkaSource(kafkaConfig); String topic = kafkaConfig.get(KafkaConnectorOptions.TOPIC).get(0); - KafkaSchema kafkaSchema = KafkaSchema.getKafkaSchema(kafkaConfig, topic, typeMapping); catalog.createDatabase(database, true); boolean caseSensitive = catalog.caseSensitive(); + Schema kafkaSchema = + KafkaSchemaUtils.getKafkaSchema(kafkaConfig, topic, typeMapping, caseSensitive); Identifier identifier = new Identifier(database, table); FileStoreTable table; List computedColumns = - buildComputedColumns(computedColumnArgs, kafkaSchema.fields()); - Schema fromCanal = - KafkaActionUtils.buildPaimonSchema( - kafkaSchema, - partitionKeys, - primaryKeys, - computedColumns, - tableConfig, - caseSensitive); + buildComputedColumns(computedColumnArgs, kafkaSchema); + Schema fromKafka = + buildPaimonSchema( + partitionKeys, primaryKeys, computedColumns, tableConfig, kafkaSchema); + try { table = (FileStoreTable) catalog.getTable(identifier); - CdcActionCommonUtils.assertSchemaCompatible(table.schema(), fromCanal.fields()); + assertSchemaCompatible(table.schema(), fromKafka.fields()); } catch (Catalog.TableNotExistException e) { - catalog.createTable(identifier, fromCanal, false); + catalog.createTable(identifier, fromKafka, false); table = (FileStoreTable) catalog.getTable(identifier); } DataFormat format = DataFormat.getDataFormat(kafkaConfig); diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/RecordParser.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/RecordParser.java index 5e3d0f886cb6..bf6be1b7de17 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/RecordParser.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/RecordParser.java @@ -20,9 +20,9 @@ import org.apache.paimon.flink.action.cdc.ComputedColumn; import org.apache.paimon.flink.action.cdc.TypeMapping; -import org.apache.paimon.flink.action.cdc.kafka.KafkaSchema; import org.apache.paimon.flink.sink.cdc.CdcRecord; import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord; +import org.apache.paimon.schema.Schema; import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowKind; @@ -39,14 +39,22 @@ import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.util.Collector; +import javax.annotation.Nullable; + import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.StreamSupport; +import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.columnCaseConvertAndDuplicateCheck; +import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.columnDuplicateErrMsg; +import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.listCaseConvert; import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.mapKeyCaseConvert; import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.recordKeyDuplicateErrMsg; @@ -81,12 +89,13 @@ public RecordParser( this.computedColumns = computedColumns; } - public KafkaSchema getKafkaSchema(String record) { + @Nullable + public Schema getKafkaSchema(String record) { this.parseRootJson(record); if (this.isDDL()) { return null; } - databaseName = extractStringFromRootJson(FIELD_DATABASE); + tableName = extractStringFromRootJson(FIELD_TABLE); this.setPrimaryField(); this.setDataField(); @@ -94,7 +103,20 @@ public KafkaSchema getKafkaSchema(String record) { this.extractPrimaryKeys(); this.extractFieldTypesFromDatabaseSchema(); LinkedHashMap paimonFieldTypes = this.setPaimonFieldType(); - return new KafkaSchema(databaseName, tableName, paimonFieldTypes, primaryKeys); + + Schema.Builder builder = Schema.newBuilder(); + Set existedFields = new HashSet<>(); + Function columnDuplicateErrMsg = columnDuplicateErrMsg(tableName); + for (Map.Entry entry : paimonFieldTypes.entrySet()) { + builder.column( + columnCaseConvertAndDuplicateCheck( + entry.getKey(), existedFields, caseSensitive, columnDuplicateErrMsg), + entry.getValue()); + } + + builder.primaryKey(listCaseConvert(primaryKeys, caseSensitive)); + + return builder.build(); } protected abstract List extractRecords(); diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionUtils.java index fefc4a6873e5..3c13a48d3d04 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionUtils.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionUtils.java @@ -18,13 +18,6 @@ package org.apache.paimon.flink.action.cdc.mongodb; -import org.apache.paimon.flink.action.cdc.CdcActionCommonUtils; -import org.apache.paimon.flink.action.cdc.ComputedColumn; -import org.apache.paimon.schema.Schema; -import org.apache.paimon.types.DataType; - -import org.apache.paimon.shade.guava30.com.google.common.collect.Lists; - import com.ververica.cdc.connectors.base.options.SourceOptions; import com.ververica.cdc.connectors.base.options.StartupOptions; import com.ververica.cdc.connectors.mongodb.source.MongoDBSource; @@ -36,15 +29,10 @@ import org.apache.flink.configuration.Configuration; import org.apache.kafka.connect.json.JsonConverterConfig; -import java.util.Collections; import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.List; import java.util.Map; import java.util.Optional; -import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.columnDuplicateErrMsg; -import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.mapKeyCaseConvert; import static org.apache.paimon.utils.Preconditions.checkArgument; /** @@ -70,7 +58,6 @@ public class MongoDBActionUtils { private static final String INITIAL_MODE = "initial"; private static final String LATEST_OFFSET_MODE = "latest-offset"; private static final String TIMESTAMP_MODE = "timestamp"; - private static final String PRIMARY_KEY = "_id"; public static final ConfigOption FIELD_NAME = ConfigOptions.key("field.name") @@ -158,26 +145,4 @@ private static void validateMongodbConfig(Configuration mongodbConfig) { "mongodb-conf [%s] must be specified.", MongoDBSourceOptions.DATABASE.key())); } - - static Schema buildPaimonSchema( - MongodbSchema mongodbSchema, - List specifiedPartitionKeys, - List computedColumns, - Map tableConfig, - boolean caseSensitive) { - LinkedHashMap sourceColumns = - mapKeyCaseConvert( - mongodbSchema.fields(), - caseSensitive, - columnDuplicateErrMsg(mongodbSchema.tableName())); - - return CdcActionCommonUtils.buildPaimonSchema( - specifiedPartitionKeys, - Lists.newArrayList(PRIMARY_KEY), - computedColumns, - tableConfig, - sourceColumns, - null, - Collections.emptyList()); - } } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java index 3b25c2b6993e..ddadcc3054e5 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java @@ -38,10 +38,13 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.assertSchemaCompatible; +import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.buildPaimonSchema; import static org.apache.paimon.flink.action.cdc.ComputedColumnUtils.buildComputedColumns; import static org.apache.paimon.utils.Preconditions.checkArgument; @@ -125,25 +128,25 @@ public void build(StreamExecutionEnvironment env) throws Exception { validateCaseInsensitive(); } - MongodbSchema mongodbSchema = MongodbSchema.getMongodbSchema(mongodbConfig); + Schema mongodbSchema = MongodbSchemaUtils.getMongodbSchema(mongodbConfig, caseSensitive); catalog.createDatabase(database, true); List computedColumns = - buildComputedColumns(computedColumnArgs, mongodbSchema.fields()); + buildComputedColumns(computedColumnArgs, mongodbSchema); Identifier identifier = new Identifier(database, table); FileStoreTable table; - + Schema fromMongodb = + buildPaimonSchema( + partitionKeys, + Collections.emptyList(), + computedColumns, + tableConfig, + mongodbSchema); // Check if table exists before trying to get or create it if (catalog.tableExists(identifier)) { table = (FileStoreTable) catalog.getTable(identifier); + assertSchemaCompatible(table.schema(), fromMongodb.fields()); } else { - Schema fromMongodb = - MongoDBActionUtils.buildPaimonSchema( - mongodbSchema, - partitionKeys, - computedColumns, - tableConfig, - caseSensitive); catalog.createTable(identifier, fromMongodb, false); table = (FileStoreTable) catalog.getTable(identifier); } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongodbSchema.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongodbSchemaUtils.java similarity index 65% rename from paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongodbSchema.java rename to paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongodbSchemaUtils.java index fbf648eebb6c..f81bdfaf75e4 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongodbSchema.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongodbSchemaUtils.java @@ -18,7 +18,7 @@ package org.apache.paimon.flink.action.cdc.mongodb; -import org.apache.paimon.types.DataType; +import org.apache.paimon.schema.Schema; import org.apache.paimon.types.DataTypes; import com.mongodb.ConnectionString; @@ -37,65 +37,32 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.LinkedHashMap; +import java.util.HashSet; import java.util.List; import java.util.Objects; +import java.util.Set; +import java.util.function.Function; import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.encodeValue; import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.columnCaseConvertAndDuplicateCheck; +import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.columnDuplicateErrMsg; import static org.apache.paimon.flink.action.cdc.mongodb.MongoDBActionUtils.FIELD_NAME; import static org.apache.paimon.flink.action.cdc.mongodb.MongoDBActionUtils.START_MODE; /** - * Represents the schema of a MongoDB collection. + * Build schema from a MongoDB collection. * - *

This class provides methods to retrieve and manage the schema details of a MongoDB collection, - * including the database name, table (collection) name, fields, and primary keys. The schema can be - * acquired in two modes: SPECIFIED and DYNAMIC. In the SPECIFIED mode, the schema details are - * provided explicitly, while in the DYNAMIC mode, the schema is inferred from the first document in - * the collection. - * - *

The class also provides utility methods to generate schema fields and create a new MongoDB - * schema instance. + *

The schema can be acquired in two modes: SPECIFIED and DYNAMIC. In the SPECIFIED mode, the + * schema details are provided explicitly, while in the DYNAMIC mode, the schema is inferred from + * the first document in the collection. */ -public class MongodbSchema { +public class MongodbSchemaUtils { private static final String ID_FIELD = "_id"; - private final String databaseName; - private final String tableName; - private final LinkedHashMap fields; - private final List primaryKeys; - - public MongodbSchema( - String databaseName, - String tableName, - LinkedHashMap fields, - List primaryKeys) { - this.databaseName = databaseName; - this.tableName = tableName; - this.fields = fields; - this.primaryKeys = primaryKeys; - } - - public String tableName() { - return tableName; - } - - public String databaseName() { - return databaseName; - } - - public LinkedHashMap fields() { - return fields; - } - - public List primaryKeys() { - return primaryKeys; - } /** - * Utility class for creating a MongoDB schema based on the provided configuration. The schema - * can be created in one of the two modes: + * The schema can be created in one of the two modes: * *

    *
  • SPECIFIED: In this mode, the schema is created based on the explicit column @@ -110,7 +77,7 @@ public List primaryKeys() { * name, and optionally, the username and password for authentication. For the SPECIFIED mode, * the field names should also be specified in the configuration. */ - public static MongodbSchema getMongodbSchema(Configuration mongodbConfig) { + public static Schema getMongodbSchema(Configuration mongodbConfig, boolean caseSensitive) { SchemaAcquisitionMode mode = getModeFromConfig(mongodbConfig); String databaseName = Objects.requireNonNull( @@ -127,13 +94,8 @@ public static MongodbSchema getMongodbSchema(Configuration mongodbConfig) { Objects.requireNonNull( mongodbConfig.get(FIELD_NAME), "Field names cannot be null") .split(","); - LinkedHashMap schemaFields = - generateSchemaFields(Arrays.asList(columnNames)); - return new MongodbSchema( - databaseName, - collectionName, - schemaFields, - Collections.singletonList(ID_FIELD)); + + return createMongodbSchema(collectionName, columnNames, caseSensitive); case DYNAMIC: String hosts = Objects.requireNonNull( @@ -165,7 +127,7 @@ public static MongodbSchema getMongodbSchema(Configuration mongodbConfig) { } return createMongodbSchema( - databaseName, collectionName, getColumnNames(firstDocument)); + collectionName, getColumnNames(firstDocument), caseSensitive); } catch (Exception e) { throw new RuntimeException( "Failed to create schema from MongoDB collection", e); @@ -175,7 +137,7 @@ public static MongodbSchema getMongodbSchema(Configuration mongodbConfig) { } } - public static String buildConnectionString( + private static String buildConnectionString( @Nullable String username, @Nullable String password, String scheme, @@ -204,40 +166,25 @@ private static List getColumnNames(Document document) { return document != null ? new ArrayList<>(document.keySet()) : Collections.emptyList(); } - private static LinkedHashMap generateSchemaFields(List columnNames) { - LinkedHashMap schemaFields = new LinkedHashMap<>(); - for (String columnName : columnNames) { - schemaFields.put(columnName, DataTypes.STRING()); - } - return schemaFields; - } - - private static MongodbSchema createMongodbSchema( - String databaseName, String collectionName, List columnNames) { - return new MongodbSchema( - databaseName, - collectionName, - generateSchemaFields(columnNames), - Collections.singletonList(ID_FIELD)); + private static Schema createMongodbSchema( + String collectionName, String[] columnNames, boolean caseSensitive) { + return createMongodbSchema(collectionName, Arrays.asList(columnNames), caseSensitive); } - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (!(o instanceof MongodbSchema)) { - return false; + private static Schema createMongodbSchema( + String collectionName, List columnNames, boolean caseSensitive) { + Schema.Builder builder = Schema.newBuilder(); + Set existedFields = new HashSet<>(); + Function columnDuplicateErrMsg = columnDuplicateErrMsg(collectionName); + for (String column : columnNames) { + builder.column( + columnCaseConvertAndDuplicateCheck( + column, existedFields, caseSensitive, columnDuplicateErrMsg), + DataTypes.STRING()); } - MongodbSchema that = (MongodbSchema) o; - return databaseName.equals(that.databaseName) - && tableName.equals(that.tableName) - && fields.equals(that.fields) - && primaryKeys.equals(that.primaryKeys); - } - @Override - public int hashCode() { - return Objects.hash(databaseName, tableName, fields, primaryKeys); + builder.primaryKey(ID_FIELD); + + return builder.build(); } } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/schema/MySqlSchemaUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/schema/MySqlSchemaUtils.java index 27a5388c07fb..45762d5500f0 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/schema/MySqlSchemaUtils.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/schema/MySqlSchemaUtils.java @@ -32,15 +32,16 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.util.ArrayList; -import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; -import java.util.Map; import java.util.Objects; +import java.util.Set; +import java.util.function.Function; +import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.columnCaseConvertAndDuplicateCheck; import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.columnDuplicateErrMsg; import static org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_NULLABLE; -import static org.apache.paimon.utils.Preconditions.checkArgument; import static org.apache.paimon.utils.StringUtils.caseSensitiveConversion; /** Utility class to load MySQL table schema with JDBC. */ @@ -56,9 +57,10 @@ public static Schema buildSchema( TypeMapping typeMapping, boolean caseSensitive) throws SQLException { - Map duplicateFields = new HashMap<>(); Schema.Builder builder = Schema.newBuilder(); try (ResultSet rs = metaData.getColumns(databaseName, null, tableName, null)) { + Set existedFields = new HashSet<>(); + Function columnDuplicateErrMsg = columnDuplicateErrMsg(tableName); while (rs.next()) { String fieldName = rs.getString("COLUMN_NAME"); String fieldType = rs.getString("TYPE_NAME"); @@ -73,23 +75,18 @@ public static Schema buildSchema( if (rs.wasNull()) { scale = null; } - DataType paimonType = - MySqlTypeUtils.toDataType(fieldType, precision, scale, typeMapping); - - if (!caseSensitive) { - checkArgument( - !duplicateFields.containsKey(fieldName.toLowerCase()), - columnDuplicateErrMsg(tableName).apply(fieldName)); - fieldName = fieldName.toLowerCase(); - } - boolean isNullable = typeMapping.containsMode(TO_NULLABLE) || isNullableColumn(rs.getString("IS_NULLABLE")); - DataType updateType = paimonType.copy(isNullable); + DataType paimonType = + MySqlTypeUtils.toDataType(fieldType, precision, scale, typeMapping) + .copy(isNullable); + + fieldName = + columnCaseConvertAndDuplicateCheck( + fieldName, existedFields, caseSensitive, columnDuplicateErrMsg); - builder.column(fieldName, updateType, fieldComment); - duplicateFields.put(fieldName, 1); + builder.column(fieldName, paimonType, fieldComment); } } diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchemaITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchemaITCase.java index 6f389c22012e..58a8779efd8d 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchemaITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchemaITCase.java @@ -19,20 +19,21 @@ package org.apache.paimon.flink.action.cdc.kafka; import org.apache.paimon.flink.action.cdc.TypeMapping; -import org.apache.paimon.types.DataType; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataTypes; import org.apache.flink.configuration.Configuration; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; -import java.util.LinkedHashMap; +import java.util.ArrayList; import java.util.List; import java.util.Map; import static org.assertj.core.api.Assertions.assertThat; -/** Tests for {@link KafkaSchema}. */ +/** Tests for {@link KafkaSchemaUtils}. */ public class KafkaSchemaITCase extends KafkaActionITCaseBase { @Test @Timeout(60) @@ -50,17 +51,16 @@ public void testKafkaSchema() throws Exception { kafkaConfig.put("value.format", "canal-json"); kafkaConfig.put("topic", topic); - KafkaSchema kafkaSchema = - KafkaSchema.getKafkaSchema( - Configuration.fromMap(kafkaConfig), topic, TypeMapping.defaultMapping()); - Map fields = new LinkedHashMap<>(); - fields.put("pt", DataTypes.INT()); - fields.put("_id", DataTypes.INT()); - fields.put("v1", DataTypes.VARCHAR(10)); - String tableName = "schema_evolution_1"; - String databasesName = "paimon_sync_table"; + Schema kafkaSchema = + KafkaSchemaUtils.getKafkaSchema( + Configuration.fromMap(kafkaConfig), + topic, + TypeMapping.defaultMapping(), + true); + List fields = new ArrayList<>(); + fields.add(new DataField(0, "pt", DataTypes.INT())); + fields.add(new DataField(1, "_id", DataTypes.INT().notNull())); + fields.add(new DataField(2, "v1", DataTypes.VARCHAR(10))); assertThat(kafkaSchema.fields()).isEqualTo(fields); - assertThat(kafkaSchema.tableName()).isEqualTo(tableName); - assertThat(kafkaSchema.databaseName()).isEqualTo(databasesName); } } diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongodbSchemaITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongodbSchemaITCase.java index fa08d2fc8431..1881747d10ee 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongodbSchemaITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongodbSchemaITCase.java @@ -18,7 +18,8 @@ package org.apache.paimon.flink.action.cdc.mongodb; -import org.apache.paimon.types.DataType; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataTypes; import com.mongodb.MongoClientSettings; @@ -34,14 +35,15 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import java.util.ArrayList; import java.util.Collections; -import java.util.LinkedHashMap; +import java.util.List; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; -/** Tests for {@link MongodbSchema}. */ +/** Tests for {@link MongodbSchemaUtils}. */ public class MongodbSchemaITCase extends MongoDBActionITCaseBase { @BeforeAll @@ -81,10 +83,8 @@ public void testCreateSchemaFromValidConfig() { mongodbConfig.setString(MongoDBSourceOptions.CONNECTION_OPTIONS, "authSource=admin"); mongodbConfig.setString(MongoDBSourceOptions.DATABASE, "testDatabase"); mongodbConfig.setString(MongoDBSourceOptions.COLLECTION, "testCollection"); - MongodbSchema schema = MongodbSchema.getMongodbSchema(mongodbConfig); + Schema schema = MongodbSchemaUtils.getMongodbSchema(mongodbConfig, true); assertNotNull(schema); - assertEquals("testDatabase", schema.databaseName()); - assertEquals("testCollection", schema.tableName()); } @Test @@ -98,7 +98,9 @@ public void testCreateSchemaFromInvalidHost() { mongodbConfig.setString(MongoDBSourceOptions.DATABASE, "testDatabase"); mongodbConfig.setString(MongoDBSourceOptions.COLLECTION, "testCollection"); - assertThrows(RuntimeException.class, () -> MongodbSchema.getMongodbSchema(mongodbConfig)); + assertThrows( + RuntimeException.class, + () -> MongodbSchemaUtils.getMongodbSchema(mongodbConfig, true)); } @Test @@ -108,7 +110,8 @@ public void testCreateSchemaFromIncompleteConfig() { mongodbConfig.setString(MongoDBSourceOptions.HOSTS, MONGODB_CONTAINER.getHostAndPort()); // Expect an exception to be thrown due to missing necessary settings assertThrows( - NullPointerException.class, () -> MongodbSchema.getMongodbSchema(mongodbConfig)); + NullPointerException.class, + () -> MongodbSchemaUtils.getMongodbSchema(mongodbConfig, true)); } @Test @@ -124,17 +127,15 @@ public void testCreateSchemaFromDynamicConfig() { mongodbConfig.setString(MongoDBSourceOptions.COLLECTION, "testCollection"); // Call the method and check the results - MongodbSchema schema = MongodbSchema.getMongodbSchema(mongodbConfig); + Schema schema = MongodbSchemaUtils.getMongodbSchema(mongodbConfig, true); // Verify the schema assertNotNull(schema); - assertEquals("testDatabase", schema.databaseName()); - assertEquals("testCollection", schema.tableName()); - LinkedHashMap expectedFields = new LinkedHashMap<>(); - expectedFields.put("name", DataTypes.STRING()); - expectedFields.put("age", DataTypes.STRING()); - expectedFields.put("_id", DataTypes.STRING()); + List expectedFields = new ArrayList<>(); + expectedFields.add(new DataField(0, "_id", DataTypes.STRING().notNull())); + expectedFields.add(new DataField(1, "name", DataTypes.STRING())); + expectedFields.add(new DataField(2, "age", DataTypes.STRING())); assertEquals(expectedFields, schema.fields()); } @@ -150,7 +151,9 @@ public void testCreateSchemaFromInvalidDatabase() { mongodbConfig.setString(MongoDBSourceOptions.DATABASE, "invalidDatabase"); mongodbConfig.setString(MongoDBSourceOptions.COLLECTION, "testCollection"); - assertThrows(RuntimeException.class, () -> MongodbSchema.getMongodbSchema(mongodbConfig)); + assertThrows( + RuntimeException.class, + () -> MongodbSchemaUtils.getMongodbSchema(mongodbConfig, true)); } @Test @@ -164,6 +167,8 @@ public void testCreateSchemaFromInvalidCollection() { mongodbConfig.setString(MongoDBSourceOptions.DATABASE, "testDatabase"); mongodbConfig.setString(MongoDBSourceOptions.COLLECTION, "invalidCollection"); - assertThrows(RuntimeException.class, () -> MongodbSchema.getMongodbSchema(mongodbConfig)); + assertThrows( + RuntimeException.class, + () -> MongodbSchemaUtils.getMongodbSchema(mongodbConfig, true)); } }