Skip to content

Commit

Permalink
[cdc] Refactor Kafka and MongoDB cdc schema build (apache#2069)
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzelin authored Sep 26, 2023
1 parent aca3fd5 commit 613a261
Show file tree
Hide file tree
Showing 12 changed files with 153 additions and 353 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -133,6 +132,18 @@ public static List<String> listCaseConvert(List<String> origin, boolean caseSens
: origin.stream().map(String::toLowerCase).collect(Collectors.toList());
}

public static String columnCaseConvertAndDuplicateCheck(
String column,
Set<String> existedFields,
boolean caseSensitive,
Function<String, String> columnDuplicateErrMsg) {
if (caseSensitive) {
return column;
}
checkArgument(existedFields.add(column.toLowerCase()), columnDuplicateErrMsg.apply(column));
return column.toLowerCase();
}

public static Schema buildPaimonSchema(
List<String> specifiedPartitionKeys,
List<String> specifiedPrimaryKeys,
Expand Down Expand Up @@ -194,66 +205,6 @@ public static Schema buildPaimonSchema(
return builder.build();
}

public static Schema buildPaimonSchema(
List<String> specifiedPartitionKeys,
List<String> specifiedPrimaryKeys,
List<ComputedColumn> computedColumns,
Map<String, String> tableConfig,
LinkedHashMap<String, DataType> sourceColumns,
@Nullable List<String> sourceColumnComments,
List<String> sourcePrimaryKeys) {
Schema.Builder builder = Schema.newBuilder();

// options
builder.options(tableConfig);

// columns
if (sourceColumnComments != null) {
checkArgument(
sourceColumns.size() == sourceColumnComments.size(),
"Source table columns count and column comments count should be equal.");

int i = 0;
for (Map.Entry<String, DataType> entry : sourceColumns.entrySet()) {
builder.column(entry.getKey(), entry.getValue(), sourceColumnComments.get(i++));
}
} else {
sourceColumns.forEach(builder::column);
}

for (ComputedColumn computedColumn : computedColumns) {
builder.column(computedColumn.columnName(), computedColumn.columnType());
}

// primary keys
if (!specifiedPrimaryKeys.isEmpty()) {
for (String key : specifiedPrimaryKeys) {
if (!sourceColumns.containsKey(key)
&& computedColumns.stream().noneMatch(c -> c.columnName().equals(key))) {
throw new IllegalArgumentException(
"Specified primary key '"
+ key
+ "' does not exist in source tables or computed columns.");
}
}
builder.primaryKey(specifiedPrimaryKeys);
} else if (!sourcePrimaryKeys.isEmpty()) {
builder.primaryKey(sourcePrimaryKeys);
} else {
throw new IllegalArgumentException(
"Primary keys are not specified. "
+ "Also, can't infer primary keys from source table schemas because "
+ "source tables have no primary keys or have different primary keys.");
}

// partition keys
if (!specifiedPartitionKeys.isEmpty()) {
builder.partitionKeys(specifiedPartitionKeys);
}

return builder.build();
}

public static String tableList(
MultiTablesSinkMode mode,
String databasePattern,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,11 @@ public class ComputedColumnUtils {

public static List<ComputedColumn> buildComputedColumns(
List<String> computedColumnArgs, Schema schema) {
Map<String, DataType> dataFields =
Map<String, DataType> typeMapping =
schema.fields().stream()
.collect(
Collectors.toMap(DataField::name, DataField::type, (v1, v2) -> v2));
return buildComputedColumns(computedColumnArgs, dataFields);
}

public static List<ComputedColumn> buildComputedColumns(
List<String> computedColumnArgs, Map<String, DataType> typeMapping) {
List<ComputedColumn> computedColumns = new ArrayList<>();
for (String columnArg : computedColumnArgs) {
String[] kv = columnArg.split("=");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,6 @@

package org.apache.paimon.flink.action.cdc.kafka;

import org.apache.paimon.flink.action.cdc.CdcActionCommonUtils;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.types.DataType;
import org.apache.paimon.utils.StringUtils;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
Expand All @@ -41,7 +37,6 @@

import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand All @@ -50,9 +45,6 @@
import java.util.stream.Collectors;

import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_SPECIFIC_OFFSETS;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.columnDuplicateErrMsg;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.listCaseConvert;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.mapKeyCaseConvert;
import static org.apache.paimon.utils.Preconditions.checkArgument;

class KafkaActionUtils {
Expand All @@ -62,30 +54,6 @@ class KafkaActionUtils {
private static final String PARTITION = "partition";
private static final String OFFSET = "offset";

static Schema buildPaimonSchema(
KafkaSchema kafkaSchema,
List<String> specifiedPartitionKeys,
List<String> specifiedPrimaryKeys,
List<ComputedColumn> computedColumns,
Map<String, String> tableConfig,
boolean caseSensitive) {
LinkedHashMap<String, DataType> sourceColumns =
mapKeyCaseConvert(
kafkaSchema.fields(),
caseSensitive,
columnDuplicateErrMsg(kafkaSchema.tableName()));
List<String> primaryKeys = listCaseConvert(kafkaSchema.primaryKeys(), caseSensitive);

return CdcActionCommonUtils.buildPaimonSchema(
specifiedPartitionKeys,
specifiedPrimaryKeys,
computedColumns,
tableConfig,
sourceColumns,
null,
primaryKeys);
}

static KafkaSource<String> buildKafkaSource(Configuration kafkaConfig) {
validateKafkaConfig(kafkaConfig);
KafkaSourceBuilder<String> kafkaSourceBuilder = KafkaSource.builder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.kafka.formats.DataFormat;
import org.apache.paimon.flink.action.cdc.kafka.formats.RecordParser;
import org.apache.paimon.types.DataType;
import org.apache.paimon.schema.Schema;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions;
Expand All @@ -36,7 +36,6 @@
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
Expand All @@ -47,42 +46,11 @@
import static org.apache.paimon.flink.action.cdc.kafka.KafkaActionUtils.kafkaPropertiesGroupId;
import static org.apache.paimon.flink.action.cdc.kafka.formats.DataFormat.getDataFormat;

/** Utility class to load canal kafka schema. */
public class KafkaSchema {
/** Utility class to load kafka schema. */
public class KafkaSchemaUtils {

private static final int MAX_RETRY = 5;
private static final int POLL_TIMEOUT_MILLIS = 1000;
private final String databaseName;
private final String tableName;
private final LinkedHashMap<String, DataType> fields;
private final List<String> primaryKeys;

public KafkaSchema(
String databaseName,
String tableName,
LinkedHashMap<String, DataType> fields,
List<String> primaryKeys) {
this.databaseName = databaseName;
this.tableName = tableName;
this.fields = fields;
this.primaryKeys = primaryKeys;
}

public String tableName() {
return tableName;
}

public String databaseName() {
return databaseName;
}

public LinkedHashMap<String, DataType> fields() {
return fields;
}

public List<String> primaryKeys() {
return primaryKeys;
}

private static KafkaConsumer<String, String> getKafkaEarliestConsumer(
Configuration kafkaConfig, String topic) {
Expand Down Expand Up @@ -123,8 +91,8 @@ private static KafkaConsumer<String, String> getKafkaEarliestConsumer(
* @return The Kafka schema for the topic.
* @throws KafkaSchemaRetrievalException If unable to retrieve the schema after max retries.
*/
public static KafkaSchema getKafkaSchema(
Configuration kafkaConfig, String topic, TypeMapping typeMapping)
public static Schema getKafkaSchema(
Configuration kafkaConfig, String topic, TypeMapping typeMapping, boolean caseSensitive)
throws KafkaSchemaRetrievalException {
KafkaConsumer<String, String> consumer = getKafkaEarliestConsumer(kafkaConfig, topic);
int retry = 0;
Expand All @@ -140,7 +108,7 @@ public static KafkaSchema getKafkaSchema(
Stream<ConsumerRecord<String, String>> recordStream =
StreamSupport.stream(records.spliterator(), false);

Optional<KafkaSchema> kafkaSchema =
Optional<Schema> kafkaSchema =
recordStream
.map(record -> recordParser.getKafkaSchema(record.value()))
.filter(Objects::nonNull)
Expand Down Expand Up @@ -175,24 +143,4 @@ public KafkaSchemaRetrievalException(String message) {
super(message);
}
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof KafkaSchema)) {
return false;
}
KafkaSchema that = (KafkaSchema) o;
return databaseName.equals(that.databaseName)
&& tableName.equals(that.tableName)
&& fields.equals(that.fields)
&& primaryKeys.equals(that.primaryKeys);
}

@Override
public int hashCode() {
return Objects.hash(databaseName, tableName, fields, primaryKeys);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.action.Action;
import org.apache.paimon.flink.action.ActionBase;
import org.apache.paimon.flink.action.cdc.CdcActionCommonUtils;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.kafka.formats.DataFormat;
Expand All @@ -48,6 +47,8 @@
import java.util.List;
import java.util.Map;

import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.assertSchemaCompatible;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.buildPaimonSchema;
import static org.apache.paimon.flink.action.cdc.ComputedColumnUtils.buildComputedColumns;

/**
Expand Down Expand Up @@ -144,28 +145,25 @@ public KafkaSyncTableAction withTypeMapping(TypeMapping typeMapping) {
public void build(StreamExecutionEnvironment env) throws Exception {
KafkaSource<String> source = KafkaActionUtils.buildKafkaSource(kafkaConfig);
String topic = kafkaConfig.get(KafkaConnectorOptions.TOPIC).get(0);
KafkaSchema kafkaSchema = KafkaSchema.getKafkaSchema(kafkaConfig, topic, typeMapping);

catalog.createDatabase(database, true);
boolean caseSensitive = catalog.caseSensitive();
Schema kafkaSchema =
KafkaSchemaUtils.getKafkaSchema(kafkaConfig, topic, typeMapping, caseSensitive);

Identifier identifier = new Identifier(database, table);
FileStoreTable table;
List<ComputedColumn> computedColumns =
buildComputedColumns(computedColumnArgs, kafkaSchema.fields());
Schema fromCanal =
KafkaActionUtils.buildPaimonSchema(
kafkaSchema,
partitionKeys,
primaryKeys,
computedColumns,
tableConfig,
caseSensitive);
buildComputedColumns(computedColumnArgs, kafkaSchema);
Schema fromKafka =
buildPaimonSchema(
partitionKeys, primaryKeys, computedColumns, tableConfig, kafkaSchema);

try {
table = (FileStoreTable) catalog.getTable(identifier);
CdcActionCommonUtils.assertSchemaCompatible(table.schema(), fromCanal.fields());
assertSchemaCompatible(table.schema(), fromKafka.fields());
} catch (Catalog.TableNotExistException e) {
catalog.createTable(identifier, fromCanal, false);
catalog.createTable(identifier, fromKafka, false);
table = (FileStoreTable) catalog.getTable(identifier);
}
DataFormat format = DataFormat.getDataFormat(kafkaConfig);
Expand Down
Loading

0 comments on commit 613a261

Please sign in to comment.