--ignore-incompatible |
It is default false, in this case, if MySQL table name exists in Paimon and their schema is incompatible,an exception will be thrown. You can specify it to true explicitly to ignore the incompatible tables and exception. |
diff --git a/docs/layouts/shortcodes/generated/pulsar_sync_database.html b/docs/layouts/shortcodes/generated/pulsar_sync_database.html
new file mode 100644
index 000000000000..8b3cde7678ef
--- /dev/null
+++ b/docs/layouts/shortcodes/generated/pulsar_sync_database.html
@@ -0,0 +1,73 @@
+{{/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/}}
+{{ $ref := ref . "maintenance/configurations.md" }}
+
+
+
+ Configuration |
+ Description |
+
+
+
+
+ --warehouse |
+ The path to Paimon warehouse. |
+
+
+ --database |
+ The database name in Paimon catalog. |
+
+
+ --ignore-incompatible |
+ It is default false, in this case, if MySQL table name exists in Paimon and their schema is incompatible,an exception will be thrown. You can specify it to true explicitly to ignore the incompatible tables and exception. |
+
+
+ --table-prefix |
+ The prefix of all Paimon tables to be synchronized. For example, if you want all synchronized tables to have "ods_" as prefix, you can specify "--table-prefix ods_". |
+
+
+ --table-suffix |
+ The suffix of all Paimon tables to be synchronized. The usage is same as "--table-prefix". |
+
+
+ --including-tables |
+ It is used to specify which source tables are to be synchronized. You must use '|' to separate multiple tables.Because '|' is a special character, a comma is required, for example: 'a|b|c'.Regular expression is supported, for example, specifying "--including-tables test|paimon.*" means to synchronize table 'test' and all tables start with 'paimon'. |
+
+
+ --excluding-tables |
+ It is used to specify which source tables are not to be synchronized. The usage is same as "--including-tables". "--excluding-tables" has higher priority than "--including-tables" if you specified both. |
+
+
+ --type-mapping |
+ It is used to specify how to map MySQL data type to Paimon type. Currently, only support option "to-string": maps all MySQL types to STRING. |
+
+
+ --pulsar-conf |
+ The configuration for Flink Pulsar sources. Each configuration should be specified in the format `key=value`. `topic`, `value.format`, `pulsar.client.serviceUrl`, `pulsar.admin.adminUrl`, and `pulsar.consumer.subscriptionName` are required configurations, others are optional.See its document for a complete list of configurations. |
+
+
+ --catalog-conf |
+ The configuration for Paimon catalog. Each configuration should be specified in the format "key=value". See here for a complete list of catalog configurations. |
+
+
+ --table-conf |
+ The configuration for Paimon table sink. Each configuration should be specified in the format "key=value". See here for a complete list of table configurations. |
+
+
+
\ No newline at end of file
diff --git a/docs/layouts/shortcodes/generated/pulsar_sync_table.html b/docs/layouts/shortcodes/generated/pulsar_sync_table.html
new file mode 100644
index 000000000000..c957524eb10e
--- /dev/null
+++ b/docs/layouts/shortcodes/generated/pulsar_sync_table.html
@@ -0,0 +1,69 @@
+{{/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/}}
+{{ $ref := ref . "maintenance/configurations.md" }}
+
+
+
+ Configuration |
+ Description |
+
+
+
+
+ --warehouse |
+ The path to Paimon warehouse. |
+
+
+ --database |
+ The database name in Paimon catalog. |
+
+
+ --table |
+ The Paimon table name. |
+
+
+ --partition-keys |
+ The partition keys for Paimon table. If there are multiple partition keys, connect them with comma, for example "dt,hh,mm". |
+
+
+ --primary-keys |
+ The primary keys for Paimon table. If there are multiple primary keys, connect them with comma, for example "buyer_id,seller_id". |
+
+
+ --type-mapping |
+ It is used to specify how to map MySQL data type to Paimon type. Currently, only support option "to-string": maps all MySQL types to STRING. |
+
+
+ --computed-column |
+ The definitions of computed columns. The argument field is from Pulsar topic's table field name. See here for a complete list of configurations. |
+
+
+ --pulsar-conf |
+ The configuration for Flink Pulsar sources. Each configuration should be specified in the format `key=value`. `topic`, `value.format`, `pulsar.client.serviceUrl`, `pulsar.admin.adminUrl`, and `pulsar.consumer.subscriptionName` are required configurations, others are optional.See its document for a complete list of configurations. |
+
+
+ --catalog-conf |
+ The configuration for Paimon catalog. Each configuration should be specified in the format "key=value". See here for a complete list of catalog configurations. |
+
+
+ --table-conf |
+ The configuration for Paimon table sink. Each configuration should be specified in the format "key=value". See here for a complete list of table configurations. |
+
+
+
\ No newline at end of file
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSyncDatabaseActionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSyncDatabaseActionBase.java
new file mode 100644
index 000000000000..8ed169a7375e
--- /dev/null
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSyncDatabaseActionBase.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc;
+
+import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.catalog.AbstractCatalog;
+import org.apache.paimon.flink.action.Action;
+import org.apache.paimon.flink.action.ActionBase;
+import org.apache.paimon.flink.action.MultiTablesSinkMode;
+import org.apache.paimon.flink.action.cdc.format.DataFormat;
+import org.apache.paimon.flink.action.cdc.format.RecordParser;
+import org.apache.paimon.flink.sink.cdc.EventParser;
+import org.apache.paimon.flink.sink.cdc.FlinkCdcSyncDatabaseSinkBuilder;
+import org.apache.paimon.flink.sink.cdc.NewTableSchemaBuilder;
+import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
+import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecordEventParser;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+/**
+ * An {@link Action} which synchronize the Multiple message queue topics into one Paimon database.
+ *
+ * For each message queue topic's table to be synchronized, if the corresponding Paimon table
+ * does not exist, this action will automatically create the table, and its schema will be derived
+ * from all specified message queue topic's tables. If the Paimon table already exists and its
+ * schema is different from that parsed from message queue record, this action will try to preform
+ * schema evolution.
+ *
+ *
This action supports a limited number of schema changes. Currently, the framework can not drop
+ * columns, so the behaviors of `DROP` will be ignored, `RENAME` will add a new column. Currently
+ * supported schema changes includes:
+ *
+ *
+ * - Adding columns.
+ *
- Altering column types. More specifically,
+ *
+ * - altering from a string type (char, varchar, text) to another string type with longer
+ * length,
+ *
- altering from a binary type (binary, varbinary, blob) to another binary type with
+ * longer length,
+ *
- altering from an integer type (tinyint, smallint, int, bigint) to another integer
+ * type with wider range,
+ *
- altering from a floating-point type (float, double) to another floating-point type
+ * with wider range,
+ *
+ * are supported.
+ *
+ *
+ * To automatically synchronize new table, This action creates a single sink for all Paimon
+ * tables to be written. See {@link MultiTablesSinkMode#COMBINED}.
+ */
+public abstract class MessageQueueSyncDatabaseActionBase extends ActionBase {
+
+ protected final String database;
+ protected final Configuration cdcSourceConfig;
+
+ private Map tableConfig = new HashMap<>();
+ private String tablePrefix = "";
+ private String tableSuffix = "";
+ private String includingTables = ".*";
+ @Nullable String excludingTables;
+ private TypeMapping typeMapping = TypeMapping.defaultMapping();
+
+ public MessageQueueSyncDatabaseActionBase(
+ String warehouse,
+ String database,
+ Map catalogConfig,
+ Map mqConfig) {
+ super(warehouse, catalogConfig);
+ this.database = database;
+ this.cdcSourceConfig = Configuration.fromMap(mqConfig);
+ }
+
+ public MessageQueueSyncDatabaseActionBase withTableConfig(Map tableConfig) {
+ this.tableConfig = tableConfig;
+ return this;
+ }
+
+ public MessageQueueSyncDatabaseActionBase withTablePrefix(@Nullable String tablePrefix) {
+ if (tablePrefix != null) {
+ this.tablePrefix = tablePrefix;
+ }
+ return this;
+ }
+
+ public MessageQueueSyncDatabaseActionBase withTableSuffix(@Nullable String tableSuffix) {
+ if (tableSuffix != null) {
+ this.tableSuffix = tableSuffix;
+ }
+ return this;
+ }
+
+ public MessageQueueSyncDatabaseActionBase includingTables(@Nullable String includingTables) {
+ if (includingTables != null) {
+ this.includingTables = includingTables;
+ }
+ return this;
+ }
+
+ public MessageQueueSyncDatabaseActionBase excludingTables(@Nullable String excludingTables) {
+ this.excludingTables = excludingTables;
+ return this;
+ }
+
+ public MessageQueueSyncDatabaseActionBase withTypeMapping(TypeMapping typeMapping) {
+ this.typeMapping = typeMapping;
+ return this;
+ }
+
+ protected abstract DataStreamSource buildSource() throws Exception;
+
+ protected abstract String sourceName();
+
+ protected abstract DataFormat getDataFormat();
+
+ protected abstract String jobName();
+
+ @Override
+ public void build() throws Exception {
+ boolean caseSensitive = catalog.caseSensitive();
+
+ validateCaseInsensitive(caseSensitive);
+
+ catalog.createDatabase(database, true);
+
+ DataFormat format = getDataFormat();
+ RecordParser recordParser =
+ format.createParser(caseSensitive, typeMapping, Collections.emptyList());
+ NewTableSchemaBuilder schemaBuilder = new NewTableSchemaBuilder(tableConfig, caseSensitive);
+ Pattern includingPattern = Pattern.compile(includingTables);
+ Pattern excludingPattern =
+ excludingTables == null ? null : Pattern.compile(excludingTables);
+ TableNameConverter tableNameConverter =
+ new TableNameConverter(caseSensitive, true, tablePrefix, tableSuffix);
+ EventParser.Factory parserFactory =
+ () ->
+ new RichCdcMultiplexRecordEventParser(
+ schemaBuilder,
+ includingPattern,
+ excludingPattern,
+ tableNameConverter);
+
+ new FlinkCdcSyncDatabaseSinkBuilder()
+ .withInput(buildSource().flatMap(recordParser).name("Parse"))
+ .withParserFactory(parserFactory)
+ .withCatalogLoader(catalogLoader())
+ .withDatabase(database)
+ .withMode(MultiTablesSinkMode.COMBINED)
+ .withTableOptions(tableConfig)
+ .build();
+ }
+
+ private void validateCaseInsensitive(boolean caseSensitive) {
+ AbstractCatalog.validateCaseInsensitive(caseSensitive, "Database", database);
+ AbstractCatalog.validateCaseInsensitive(caseSensitive, "Table prefix", tablePrefix);
+ AbstractCatalog.validateCaseInsensitive(caseSensitive, "Table suffix", tableSuffix);
+ }
+
+ @VisibleForTesting
+ public Map tableConfig() {
+ return tableConfig;
+ }
+
+ // ------------------------------------------------------------------------
+ // Flink run methods
+ // ------------------------------------------------------------------------
+
+ @Override
+ public void run() throws Exception {
+ build();
+ execute(jobName());
+ }
+}
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java
index 51f7b8c261a6..88419bc1b465 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java
@@ -18,185 +18,45 @@
package org.apache.paimon.flink.action.cdc.kafka;
-import org.apache.paimon.annotation.VisibleForTesting;
-import org.apache.paimon.catalog.AbstractCatalog;
-import org.apache.paimon.flink.action.Action;
-import org.apache.paimon.flink.action.ActionBase;
-import org.apache.paimon.flink.action.MultiTablesSinkMode;
-import org.apache.paimon.flink.action.cdc.TableNameConverter;
-import org.apache.paimon.flink.action.cdc.TypeMapping;
+import org.apache.paimon.flink.action.cdc.MessageQueueSyncDatabaseActionBase;
import org.apache.paimon.flink.action.cdc.format.DataFormat;
-import org.apache.paimon.flink.action.cdc.format.RecordParser;
-import org.apache.paimon.flink.sink.cdc.EventParser;
-import org.apache.paimon.flink.sink.cdc.FlinkCdcSyncDatabaseSinkBuilder;
-import org.apache.paimon.flink.sink.cdc.NewTableSchemaBuilder;
-import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
-import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecordEventParser;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.connector.kafka.source.KafkaSource;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import javax.annotation.Nullable;
-
-import java.util.Collections;
-import java.util.HashMap;
import java.util.Map;
-import java.util.regex.Pattern;
-
-import static org.apache.paimon.flink.action.cdc.kafka.KafkaActionUtils.getDataFormat;
-/**
- * An {@link Action} which synchronize the Multiple topics into one Paimon database.
- *
- * You should specify Kafka source topic in {@code kafkaConfig}. See document
- * of flink-connectors for detailed keys and values.
- *
- *
For each Kafka topic's table to be synchronized, if the corresponding Paimon table does not
- * exist, this action will automatically create the table, and its schema will be derived from all
- * specified Kafka topic's tables. If the Paimon table already exists and its schema is different
- * from that parsed from Kafka record, this action will try to preform schema evolution.
- *
- *
This action supports a limited number of schema changes. Currently, the framework can not drop
- * columns, so the behaviors of `DROP` will be ignored, `RENAME` will add a new column. Currently
- * supported schema changes includes:
- *
- *
- * - Adding columns.
- *
- Altering column types. More specifically,
- *
- * - altering from a string type (char, varchar, text) to another string type with longer
- * length,
- *
- altering from a binary type (binary, varbinary, blob) to another binary type with
- * longer length,
- *
- altering from an integer type (tinyint, smallint, int, bigint) to another integer
- * type with wider range,
- *
- altering from a floating-point type (float, double) to another floating-point type
- * with wider range,
- *
- * are supported.
- *
- *
- * To automatically synchronize new table, This action creates a single sink for all Paimon
- * tables to be written. See {@link MultiTablesSinkMode#COMBINED}.
- */
-public class KafkaSyncDatabaseAction extends ActionBase {
-
- private final String database;
- private final Configuration kafkaConfig;
-
- private Map tableConfig = new HashMap<>();
- private String tablePrefix = "";
- private String tableSuffix = "";
- private String includingTables = ".*";
- @Nullable String excludingTables;
- private TypeMapping typeMapping = TypeMapping.defaultMapping();
+/** Synchronize database from Kafka. */
+public class KafkaSyncDatabaseAction extends MessageQueueSyncDatabaseActionBase {
public KafkaSyncDatabaseAction(
String warehouse,
String database,
Map catalogConfig,
Map kafkaConfig) {
- super(warehouse, catalogConfig);
- this.database = database;
- this.kafkaConfig = Configuration.fromMap(kafkaConfig);
- }
-
- public KafkaSyncDatabaseAction withTableConfig(Map tableConfig) {
- this.tableConfig = tableConfig;
- return this;
- }
-
- public KafkaSyncDatabaseAction withTablePrefix(@Nullable String tablePrefix) {
- if (tablePrefix != null) {
- this.tablePrefix = tablePrefix;
- }
- return this;
- }
-
- public KafkaSyncDatabaseAction withTableSuffix(@Nullable String tableSuffix) {
- if (tableSuffix != null) {
- this.tableSuffix = tableSuffix;
- }
- return this;
- }
-
- public KafkaSyncDatabaseAction includingTables(@Nullable String includingTables) {
- if (includingTables != null) {
- this.includingTables = includingTables;
- }
- return this;
- }
-
- public KafkaSyncDatabaseAction excludingTables(@Nullable String excludingTables) {
- this.excludingTables = excludingTables;
- return this;
- }
-
- public KafkaSyncDatabaseAction withTypeMapping(TypeMapping typeMapping) {
- this.typeMapping = typeMapping;
- return this;
+ super(warehouse, database, catalogConfig, kafkaConfig);
}
@Override
- public void build() throws Exception {
- boolean caseSensitive = catalog.caseSensitive();
-
- validateCaseInsensitive(caseSensitive);
-
- catalog.createDatabase(database, true);
-
- KafkaSource source = KafkaActionUtils.buildKafkaSource(kafkaConfig);
-
- DataFormat format = getDataFormat(kafkaConfig);
- RecordParser recordParser =
- format.createParser(caseSensitive, typeMapping, Collections.emptyList());
- NewTableSchemaBuilder schemaBuilder = new NewTableSchemaBuilder(tableConfig, caseSensitive);
- Pattern includingPattern = Pattern.compile(includingTables);
- Pattern excludingPattern =
- excludingTables == null ? null : Pattern.compile(excludingTables);
- TableNameConverter tableNameConverter =
- new TableNameConverter(caseSensitive, true, tablePrefix, tableSuffix);
- EventParser.Factory parserFactory =
- () ->
- new RichCdcMultiplexRecordEventParser(
- schemaBuilder,
- includingPattern,
- excludingPattern,
- tableNameConverter);
-
- new FlinkCdcSyncDatabaseSinkBuilder()
- .withInput(
- env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source")
- .flatMap(recordParser)
- .name("Parse"))
- .withParserFactory(parserFactory)
- .withCatalogLoader(catalogLoader())
- .withDatabase(database)
- .withMode(MultiTablesSinkMode.COMBINED)
- .withTableOptions(tableConfig)
- .build();
+ protected DataStreamSource buildSource() throws Exception {
+ return env.fromSource(
+ KafkaActionUtils.buildKafkaSource(cdcSourceConfig),
+ WatermarkStrategy.noWatermarks(),
+ sourceName());
}
- private void validateCaseInsensitive(boolean caseSensitive) {
- AbstractCatalog.validateCaseInsensitive(caseSensitive, "Database", database);
- AbstractCatalog.validateCaseInsensitive(caseSensitive, "Table prefix", tablePrefix);
- AbstractCatalog.validateCaseInsensitive(caseSensitive, "Table suffix", tableSuffix);
+ @Override
+ protected String sourceName() {
+ return "Kafka Source";
}
- @VisibleForTesting
- public Map tableConfig() {
- return tableConfig;
+ @Override
+ protected DataFormat getDataFormat() {
+ return KafkaActionUtils.getDataFormat(cdcSourceConfig);
}
- // ------------------------------------------------------------------------
- // Flink run methods
- // ------------------------------------------------------------------------
-
@Override
- public void run() throws Exception {
- build();
- execute(String.format("KAFKA-Paimon Database Sync: %s", database));
+ protected String jobName() {
+ return String.format("Kafka-Paimon Database Sync: %s", database);
}
}
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseActionFactory.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseActionFactory.java
index 9a33e6f91eec..c20d925fe279 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseActionFactory.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseActionFactory.java
@@ -106,11 +106,11 @@ public void printHelp() {
System.out.println("kafka source conf syntax:");
System.out.println(" key=value");
System.out.println(
- "'topic', 'properties.bootstrap.servers', 'properties.group.id'"
+ "'topic', 'properties.bootstrap.servers', 'properties.group.id', 'value.format' "
+ "are required configurations, others are optional.");
System.out.println(
"For a complete list of supported configurations, "
- + "see https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/table/kafka/");
+ + "see https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/table/kafka/#connector-options");
System.out.println();
System.out.println();
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionFactory.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionFactory.java
index 08663d6343d1..e9367b7f041d 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionFactory.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionFactory.java
@@ -115,11 +115,11 @@ public void printHelp() {
System.out.println("kafka source conf syntax:");
System.out.println(" key=value");
System.out.println(
- "'topic', 'properties.bootstrap.servers', 'properties.group.id'"
+ "'topic', 'properties.bootstrap.servers', 'properties.group.id', 'value.format' "
+ "are required configurations, others are optional.");
System.out.println(
"For a complete list of supported configurations, "
- + "see https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/table/kafka/");
+ + "see https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/table/kafka/#connector-options");
System.out.println();
System.out.println("Paimon catalog and table sink conf syntax:");
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarActionUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarActionUtils.java
index c49c985f3755..bc8bc3f01ea1 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarActionUtils.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarActionUtils.java
@@ -324,8 +324,7 @@ static DataFormat getDataFormat(Configuration pulsarConfig) {
/** Referenced to {@link PulsarPartitionSplitReader#createPulsarConsumer}. */
static MessageQueueSchemaUtils.ConsumerWrapper createPulsarConsumer(
Configuration pulsarConfig, String topic) throws PulsarClientException {
- SourceConfiguration pulsarSourceConfiguration =
- toSourceConfiguration(preprocessPulsarConfig(pulsarConfig));
+ SourceConfiguration pulsarSourceConfiguration = toSourceConfiguration(pulsarConfig);
PulsarClient pulsarClient = PulsarClientFactory.createClient(pulsarSourceConfiguration);
ConsumerBuilder consumerBuilder =
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncDatabaseAction.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncDatabaseAction.java
new file mode 100644
index 000000000000..589332cdf54d
--- /dev/null
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncDatabaseAction.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.pulsar;
+
+import org.apache.paimon.flink.action.cdc.MessageQueueSyncDatabaseActionBase;
+import org.apache.paimon.flink.action.cdc.format.DataFormat;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+
+import java.util.Map;
+
+/** Synchronize database from Pulsar. */
+public class PulsarSyncDatabaseAction extends MessageQueueSyncDatabaseActionBase {
+
+ public PulsarSyncDatabaseAction(
+ String warehouse,
+ String database,
+ Map catalogConfig,
+ Map pulsarConfig) {
+ super(warehouse, database, catalogConfig, pulsarConfig);
+ }
+
+ @Override
+ protected DataStreamSource buildSource() throws Exception {
+ return env.fromSource(
+ PulsarActionUtils.buildPulsarSource(cdcSourceConfig),
+ WatermarkStrategy.noWatermarks(),
+ sourceName());
+ }
+
+ @Override
+ protected DataFormat getDataFormat() {
+ return PulsarActionUtils.getDataFormat(cdcSourceConfig);
+ }
+
+ @Override
+ protected String sourceName() {
+ return "Pulsar Source";
+ }
+
+ @Override
+ protected String jobName() {
+ return String.format("Pulsar-Paimon Database Sync: %s", database);
+ }
+}
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncDatabaseActionFactory.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncDatabaseActionFactory.java
new file mode 100644
index 000000000000..b7f0d3ce0a64
--- /dev/null
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncDatabaseActionFactory.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.pulsar;
+
+import org.apache.paimon.flink.action.Action;
+import org.apache.paimon.flink.action.ActionFactory;
+import org.apache.paimon.flink.action.cdc.TypeMapping;
+
+import org.apache.flink.api.java.utils.MultipleParameterTool;
+
+import java.util.Optional;
+
+/** Factory to create {@link PulsarSyncDatabaseAction}. */
+public class PulsarSyncDatabaseActionFactory implements ActionFactory {
+
+ public static final String IDENTIFIER = "pulsar-sync-database";
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public Optional create(MultipleParameterTool params) {
+ checkRequiredArgument(params, "pulsar-conf");
+
+ PulsarSyncDatabaseAction action =
+ new PulsarSyncDatabaseAction(
+ getRequiredValue(params, "warehouse"),
+ getRequiredValue(params, "database"),
+ optionalConfigMap(params, "catalog-conf"),
+ optionalConfigMap(params, "pulsar-conf"));
+
+ action.withTableConfig(optionalConfigMap(params, "table-conf"))
+ .withTablePrefix(params.get("table-prefix"))
+ .withTableSuffix(params.get("table-suffix"))
+ .includingTables(params.get("including-tables"))
+ .excludingTables(params.get("excluding-tables"));
+
+ if (params.has("type-mapping")) {
+ String[] options = params.get("type-mapping").split(",");
+ action.withTypeMapping(TypeMapping.parse(options));
+ }
+
+ return Optional.of(action);
+ }
+
+ @Override
+ public void printHelp() {
+ System.out.println(
+ "Action \"pulsar-sync-database\" creates a streaming job "
+ + "with a Flink Pulsar source and multiple Paimon table sinks "
+ + "to synchronize multiple tables into one Paimon database.\n"
+ + "Only tables with primary keys will be considered. ");
+ System.out.println();
+
+ System.out.println("Syntax:");
+ System.out.println(
+ " pulsar-sync-database --warehouse --database "
+ + "[--table-prefix ] "
+ + "[--table-suffix ] "
+ + "[--including-tables ] "
+ + "[--excluding-tables ] "
+ + "[--type-mapping ] "
+ + "[--pulsar-conf [--pulsar-conf ...]] "
+ + "[--catalog-conf [--catalog-conf ...]] "
+ + "[--table-conf [--table-conf ...]]");
+ System.out.println();
+
+ System.out.println(
+ "--table-prefix is the prefix of all Paimon tables to be synchronized. For example, if you want all "
+ + "synchronized tables to have \"ods_\" as prefix, you can specify `--table-prefix ods_`.");
+ System.out.println("The usage of --table-suffix is same as `--table-prefix`");
+ System.out.println();
+
+ System.out.println(
+ "--including-tables is used to specify which source tables are to be synchronized. "
+ + "You must use '|' to separate multiple tables. Regular expression is supported.");
+ System.out.println(
+ "--excluding-tables is used to specify which source tables are not to be synchronized. "
+ + "The usage is same as --including-tables.");
+ System.out.println(
+ "--excluding-tables has higher priority than --including-tables if you specified both.");
+ System.out.println();
+
+ System.out.println(
+ "--type-mapping is used to specify how to map MySQL type to Paimon type. Please see the doc for usage.");
+ System.out.println();
+
+ System.out.println("pulsar source conf syntax:");
+ System.out.println(" key=value");
+ System.out.println(
+ "'topic', 'value.format', 'pulsar.client.serviceUrl', 'pulsar.admin.adminUrl', 'pulsar.consumer.subscriptionName' "
+ + "are required configurations, others are optional.");
+ System.out.println(
+ "For a complete list of supported configurations, "
+ + "see https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/pulsar/#source-configurable-options");
+ System.out.println();
+ System.out.println();
+
+ System.out.println("Paimon catalog and table sink conf syntax:");
+ System.out.println(" key=value");
+ System.out.println("All Paimon sink table will be applied the same set of configurations.");
+ System.out.println(
+ "For a complete list of supported configurations, "
+ + "see https://paimon.apache.org/docs/master/maintenance/configurations/");
+ System.out.println();
+
+ System.out.println("Examples:");
+ System.out.println(
+ " pulsar-sync-database \\\n"
+ + " --warehouse hdfs:///path/to/warehouse \\\n"
+ + " --database test_db \\\n"
+ + " --pulsar-conf topic=order,logistic,user \\\n"
+ + " --pulsar-conf value.format=canal-json \\\n"
+ + " --pulsar-conf pulsar.client.serviceUrl=pulsar://127.0.0.1:6650 \\\n"
+ + " --pulsar-conf pulsar.admin.adminUrl=http://127.0.0.1:8080 \\\n"
+ + " --pulsar-conf pulsar.consumer.subscriptionName=paimon-tests \\\n"
+ + " --catalog-conf metastore=hive \\\n"
+ + " --catalog-conf uri=thrift://hive-metastore:9083 \\\n"
+ + " --table-conf bucket=4 \\\n"
+ + " --table-conf changelog-producer=input \\\n"
+ + " --table-conf sink.parallelism=4");
+ }
+}
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncTableActionFactory.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncTableActionFactory.java
index 0a19763be646..4876086da15f 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncTableActionFactory.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncTableActionFactory.java
@@ -77,9 +77,75 @@ public Optional create(MultipleParameterTool params) {
public void printHelp() {
System.out.println(
"Action \"pulsar-sync-table\" creates a streaming job "
- + "with a Flink pulsar CDC source and a Paimon table sink to consume CDC events.");
+ + "with a Flink Pulsar CDC source and a Paimon table sink to consume CDC events.");
System.out.println();
- // TODO
+ System.out.println("Syntax:");
+ System.out.println(
+ " pulsar-sync-table --warehouse --database "
+ + "--table "
+ + "[--partition-keys ] "
+ + "[--primary-keys ] "
+ + "[--type-mapping ] "
+ + "[--computed-column <'column-name=expr-name(args[, ...])'> [--computed-column ...]] "
+ + "[--pulsar-conf [--pulsar-conf ...]] "
+ + "[--catalog-conf [--catalog-conf ...]] "
+ + "[--table-conf [--table-conf ...]]");
+ System.out.println();
+
+ System.out.println("Partition keys syntax:");
+ System.out.println(" key1,key2,...");
+ System.out.println(
+ "If partition key is not defined and the specified Paimon table does not exist, "
+ + "this action will automatically create an unpartitioned Paimon table.");
+ System.out.println();
+
+ System.out.println("Primary keys syntax:");
+ System.out.println(" key1,key2,...");
+ System.out.println("Primary keys will be derived from tables if not specified.");
+ System.out.println();
+
+ System.out.println(
+ "--type-mapping is used to specify how to map MySQL type to Paimon type. Please see the doc for usage.");
+ System.out.println();
+
+ System.out.println("Please see doc for usage of --computed-column.");
+ System.out.println();
+
+ System.out.println("pulsar source conf syntax:");
+ System.out.println(" key=value");
+ System.out.println(
+ "'topic', 'value.format', 'pulsar.client.serviceUrl', 'pulsar.admin.adminUrl', 'pulsar.consumer.subscriptionName' "
+ + "are required configurations, others are optional.");
+ System.out.println(
+ "For a complete list of supported configurations, "
+ + "see https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/pulsar/#source-configurable-options");
+ System.out.println();
+
+ System.out.println("Paimon catalog and table sink conf syntax:");
+ System.out.println(" key=value");
+ System.out.println(
+ "For a complete list of supported configurations, "
+ + "see https://paimon.apache.org/docs/master/maintenance/configurations/");
+ System.out.println();
+
+ System.out.println("Examples:");
+ System.out.println(
+ " pulsar-sync-table \\\n"
+ + " --warehouse hdfs:///path/to/warehouse \\\n"
+ + " --database test_db \\\n"
+ + " --table test_table \\\n"
+ + " --partition-keys pt \\\n"
+ + " --primary-keys pt,uid \\\n"
+ + " --pulsar-conf topic=order \\\n"
+ + " --pulsar-conf value.format=canal-json \\\n"
+ + " --pulsar-conf pulsar.client.serviceUrl=pulsar://127.0.0.1:6650 \\\n"
+ + " --pulsar-conf pulsar.admin.adminUrl=http://127.0.0.1:8080 \\\n"
+ + " --pulsar-conf pulsar.consumer.subscriptionName=paimon-tests \\\n"
+ + " --catalog-conf metastore=hive \\\n"
+ + " --catalog-conf uri=thrift://hive-metastore:9083 \\\n"
+ + " --table-conf bucket=4 \\\n"
+ + " --table-conf changelog-producer=input \\\n"
+ + " --table-conf sink.parallelism=4");
}
}
diff --git a/paimon-flink/paimon-flink-cdc/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory b/paimon-flink/paimon-flink-cdc/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index d536d8c52000..e866606d0d8d 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++ b/paimon-flink/paimon-flink-cdc/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -21,5 +21,6 @@ org.apache.paimon.flink.action.cdc.mysql.MySqlSyncDatabaseActionFactory
org.apache.paimon.flink.action.cdc.kafka.KafkaSyncTableActionFactory
org.apache.paimon.flink.action.cdc.kafka.KafkaSyncDatabaseActionFactory
org.apache.paimon.flink.action.cdc.pulsar.PulsarSyncTableActionFactory
+org.apache.paimon.flink.action.cdc.pulsar.PulsarSyncDatabaseActionFactory
org.apache.paimon.flink.action.cdc.mongodb.MongoDBSyncTableActionFactory
org.apache.paimon.flink.action.cdc.mongodb.MongoDBSyncDatabaseActionFactory
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
index 87fccc74dbe1..2a0b6ef3c5c4 100644
--- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
@@ -36,6 +36,8 @@
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TOPIC;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.VALUE_FORMAT;
import static org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_STRING;
import static org.apache.paimon.testutils.assertj.AssertionUtils.anyCauseMatches;
import static org.assertj.core.api.Assertions.assertThat;
@@ -71,8 +73,8 @@ public void testSchemaEvolutionMultiTopic() throws Exception {
}
Map kafkaConfig = getBasicKafkaConfig();
- kafkaConfig.put("value.format", "canal-json");
- kafkaConfig.put("topic", String.join(";", topics));
+ kafkaConfig.put(VALUE_FORMAT.key(), "canal-json");
+ kafkaConfig.put(TOPIC.key(), String.join(";", topics));
KafkaSyncDatabaseAction action =
syncDatabaseActionBuilder(kafkaConfig)
.withTableConfig(getBasicTableConfig())
@@ -107,8 +109,8 @@ public void testSchemaEvolutionOneTopic() throws Exception {
}
Map kafkaConfig = getBasicKafkaConfig();
- kafkaConfig.put("value.format", "canal-json");
- kafkaConfig.put("topic", String.join(";", topics));
+ kafkaConfig.put(VALUE_FORMAT.key(), "canal-json");
+ kafkaConfig.put(TOPIC.key(), String.join(";", topics));
KafkaSyncDatabaseAction action =
syncDatabaseActionBuilder(kafkaConfig)
@@ -242,7 +244,7 @@ private void testSchemaEvolutionImpl(List topics, boolean writeOne, int
@Test
public void testTopicIsEmpty() {
Map kafkaConfig = getBasicKafkaConfig();
- kafkaConfig.put("value.format", "canal-json");
+ kafkaConfig.put(VALUE_FORMAT.key(), "canal-json");
KafkaSyncDatabaseAction action = syncDatabaseActionBuilder(kafkaConfig).build();
@@ -290,8 +292,8 @@ public void testTableAffixMultiTopic() throws Exception {
// try synchronization
Map kafkaConfig = getBasicKafkaConfig();
- kafkaConfig.put("value.format", "canal-json");
- kafkaConfig.put("topic", String.join(";", topics));
+ kafkaConfig.put(VALUE_FORMAT.key(), "canal-json");
+ kafkaConfig.put(TOPIC.key(), String.join(";", topics));
KafkaSyncDatabaseAction action =
syncDatabaseActionBuilder(kafkaConfig)
.withTablePrefix("test_prefix_")
@@ -341,8 +343,8 @@ public void testTableAffixOneTopic() throws Exception {
// try synchronization
Map kafkaConfig = getBasicKafkaConfig();
- kafkaConfig.put("value.format", "canal-json");
- kafkaConfig.put("topic", String.join(";", topics));
+ kafkaConfig.put(VALUE_FORMAT.key(), "canal-json");
+ kafkaConfig.put(TOPIC.key(), String.join(";", topics));
KafkaSyncDatabaseAction action =
syncDatabaseActionBuilder(kafkaConfig)
.withTablePrefix("test_prefix_")
@@ -514,8 +516,8 @@ private void includingAndExcludingTablesImpl(
}
// try synchronization
Map kafkaConfig = getBasicKafkaConfig();
- kafkaConfig.put("value.format", "canal-json");
- kafkaConfig.put("topic", String.join(";", topics));
+ kafkaConfig.put(VALUE_FORMAT.key(), "canal-json");
+ kafkaConfig.put(TOPIC.key(), String.join(";", topics));
KafkaSyncDatabaseAction action =
syncDatabaseActionBuilder(kafkaConfig)
.includingTables(includingTables)
@@ -539,8 +541,8 @@ public void testTypeMappingToString() throws Exception {
writeRecordsToKafka(topic, readLines("kafka/canal/database/tostring/canal-data-1.txt"));
Map kafkaConfig = getBasicKafkaConfig();
- kafkaConfig.put("value.format", "canal-json");
- kafkaConfig.put("topic", topic);
+ kafkaConfig.put(VALUE_FORMAT.key(), "canal-json");
+ kafkaConfig.put(TOPIC.key(), topic);
KafkaSyncDatabaseAction action =
syncDatabaseActionBuilder(kafkaConfig)
@@ -588,8 +590,8 @@ public void testCaseInsensitive() throws Exception {
topic, readLines("kafka/canal/database/case-insensitive/canal-data-1.txt"));
Map kafkaConfig = getBasicKafkaConfig();
- kafkaConfig.put("value.format", "canal-json");
- kafkaConfig.put("topic", topic);
+ kafkaConfig.put(VALUE_FORMAT.key(), "canal-json");
+ kafkaConfig.put(TOPIC.key(), topic);
KafkaSyncDatabaseAction action =
syncDatabaseActionBuilder(kafkaConfig)
@@ -624,8 +626,8 @@ public void testCannotSynchronizeIncompleteJson() throws Exception {
writeRecordsToKafka(topic, readLines("kafka/canal/database/incomplete/canal-data-1.txt"));
Map kafkaConfig = getBasicKafkaConfig();
- kafkaConfig.put("value.format", "canal-json");
- kafkaConfig.put("topic", topic);
+ kafkaConfig.put(VALUE_FORMAT.key(), "canal-json");
+ kafkaConfig.put(TOPIC.key(), topic);
KafkaSyncDatabaseAction action =
syncDatabaseActionBuilder(kafkaConfig)
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
index e4047cf32155..1af4e117f24e 100644
--- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
@@ -33,6 +33,16 @@
import java.util.Map;
import java.util.UUID;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_MODE;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_SPECIFIC_OFFSETS;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_TIMESTAMP_MILLIS;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.ScanStartupMode.EARLIEST_OFFSET;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.ScanStartupMode.GROUP_OFFSETS;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.ScanStartupMode.LATEST_OFFSET;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.ScanStartupMode.SPECIFIC_OFFSETS;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.ScanStartupMode.TIMESTAMP;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TOPIC;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.VALUE_FORMAT;
import static org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_STRING;
import static org.apache.paimon.testutils.assertj.AssertionUtils.anyCauseMatches;
import static org.assertj.core.api.Assertions.assertThat;
@@ -65,8 +75,8 @@ private void runSingleTableSchemaEvolution(String sourceDir) throws Exception {
throw new Exception("Failed to write canal data to Kafka.", e);
}
Map kafkaConfig = getBasicKafkaConfig();
- kafkaConfig.put("value.format", "canal-json");
- kafkaConfig.put("topic", topic);
+ kafkaConfig.put(VALUE_FORMAT.key(), "canal-json");
+ kafkaConfig.put(TOPIC.key(), topic);
KafkaSyncTableAction action =
syncTableActionBuilder(kafkaConfig)
.withPartitionKeys("pt")
@@ -223,9 +233,9 @@ public void testMultipleSchemaEvolutions() throws Exception {
throw new Exception("Failed to write canal data to Kafka.", e);
}
Map kafkaConfig = getBasicKafkaConfig();
- kafkaConfig.put("value.format", "canal-json");
+ kafkaConfig.put(VALUE_FORMAT.key(), "canal-json");
- kafkaConfig.put("topic", topic);
+ kafkaConfig.put(TOPIC.key(), topic);
KafkaSyncTableAction action =
syncTableActionBuilder(kafkaConfig).withPrimaryKeys("_id").build();
runActionWithDefaultEnv(action);
@@ -299,9 +309,9 @@ private void testAllTypesOnce() throws Exception {
throw new Exception("Failed to write canal data to Kafka.", e);
}
Map kafkaConfig = getBasicKafkaConfig();
- kafkaConfig.put("value.format", "canal-json");
+ kafkaConfig.put(VALUE_FORMAT.key(), "canal-json");
- kafkaConfig.put("topic", topic);
+ kafkaConfig.put(TOPIC.key(), topic);
KafkaSyncTableAction action =
syncTableActionBuilder(kafkaConfig)
.withPartitionKeys("pt")
@@ -564,8 +574,8 @@ public void testNotSupportFormat() throws Exception {
throw new Exception("Failed to write canal data to Kafka.", e);
}
Map kafkaConfig = getBasicKafkaConfig();
- kafkaConfig.put("value.format", "togg-json");
- kafkaConfig.put("topic", topic);
+ kafkaConfig.put(VALUE_FORMAT.key(), "togg-json");
+ kafkaConfig.put(TOPIC.key(), topic);
KafkaSyncTableAction action =
syncTableActionBuilder(kafkaConfig)
.withPartitionKeys("pt")
@@ -592,8 +602,8 @@ public void testKafkaNoNonDdlData() throws Exception {
throw new Exception("Failed to write canal data to Kafka.", e);
}
Map kafkaConfig = getBasicKafkaConfig();
- kafkaConfig.put("value.format", "canal-json");
- kafkaConfig.put("topic", topic);
+ kafkaConfig.put(VALUE_FORMAT.key(), "canal-json");
+ kafkaConfig.put(TOPIC.key(), topic);
KafkaSyncTableAction action =
syncTableActionBuilder(kafkaConfig)
.withPartitionKeys("pt")
@@ -620,8 +630,8 @@ public void testAssertSchemaCompatible() throws Exception {
throw new Exception("Failed to write canal data to Kafka.", e);
}
Map kafkaConfig = getBasicKafkaConfig();
- kafkaConfig.put("value.format", "canal-json");
- kafkaConfig.put("topic", topic);
+ kafkaConfig.put(VALUE_FORMAT.key(), "canal-json");
+ kafkaConfig.put(TOPIC.key(), topic);
// create an incompatible table
createFileStoreTable(
@@ -661,10 +671,10 @@ public void testStarUpOptionSpecific() throws Exception {
throw new Exception("Failed to write canal data to Kafka.", e);
}
Map kafkaConfig = getBasicKafkaConfig();
- kafkaConfig.put("value.format", "canal-json");
- kafkaConfig.put("topic", topic);
- kafkaConfig.put("scan.startup.mode", "specific-offsets");
- kafkaConfig.put("scan.startup.specific-offsets", "partition:0,offset:1");
+ kafkaConfig.put(VALUE_FORMAT.key(), "canal-json");
+ kafkaConfig.put(TOPIC.key(), topic);
+ kafkaConfig.put(SCAN_STARTUP_MODE.key(), SPECIFIC_OFFSETS.toString());
+ kafkaConfig.put(SCAN_STARTUP_SPECIFIC_OFFSETS.key(), "partition:0,offset:1");
KafkaSyncTableAction action =
syncTableActionBuilder(kafkaConfig)
.withPartitionKeys("pt")
@@ -702,9 +712,9 @@ public void testStarUpOptionLatest() throws Exception {
throw new Exception("Failed to write canal data to Kafka.", e);
}
Map kafkaConfig = getBasicKafkaConfig();
- kafkaConfig.put("value.format", "canal-json");
- kafkaConfig.put("topic", topic);
- kafkaConfig.put("scan.startup.mode", "latest-offset");
+ kafkaConfig.put(VALUE_FORMAT.key(), "canal-json");
+ kafkaConfig.put(TOPIC.key(), topic);
+ kafkaConfig.put(SCAN_STARTUP_MODE.key(), LATEST_OFFSET.toString());
KafkaSyncTableAction action =
syncTableActionBuilder(kafkaConfig)
.withPartitionKeys("pt")
@@ -747,11 +757,11 @@ public void testStarUpOptionTimestamp() throws Exception {
throw new Exception("Failed to write canal data to Kafka.", e);
}
Map kafkaConfig = getBasicKafkaConfig();
- kafkaConfig.put("value.format", "canal-json");
- kafkaConfig.put("topic", topic);
- kafkaConfig.put("scan.startup.mode", "timestamp");
+ kafkaConfig.put(VALUE_FORMAT.key(), "canal-json");
+ kafkaConfig.put(TOPIC.key(), topic);
+ kafkaConfig.put(SCAN_STARTUP_MODE.key(), TIMESTAMP.toString());
kafkaConfig.put(
- "scan.startup.timestamp-millis", String.valueOf(System.currentTimeMillis()));
+ SCAN_STARTUP_TIMESTAMP_MILLIS.key(), String.valueOf(System.currentTimeMillis()));
KafkaSyncTableAction action =
syncTableActionBuilder(kafkaConfig)
.withPartitionKeys("pt")
@@ -794,9 +804,9 @@ public void testStarUpOptionEarliest() throws Exception {
throw new Exception("Failed to write canal data to Kafka.", e);
}
Map kafkaConfig = getBasicKafkaConfig();
- kafkaConfig.put("value.format", "canal-json");
- kafkaConfig.put("topic", topic);
- kafkaConfig.put("scan.startup.mode", "earliest-offset");
+ kafkaConfig.put(VALUE_FORMAT.key(), "canal-json");
+ kafkaConfig.put(TOPIC.key(), topic);
+ kafkaConfig.put(SCAN_STARTUP_MODE.key(), EARLIEST_OFFSET.toString());
KafkaSyncTableAction action =
syncTableActionBuilder(kafkaConfig)
.withPartitionKeys("pt")
@@ -841,9 +851,9 @@ public void testStarUpOptionGroup() throws Exception {
throw new Exception("Failed to write canal data to Kafka.", e);
}
Map kafkaConfig = getBasicKafkaConfig();
- kafkaConfig.put("value.format", "canal-json");
- kafkaConfig.put("topic", topic);
- kafkaConfig.put("scan.startup.mode", "group-offsets");
+ kafkaConfig.put(VALUE_FORMAT.key(), "canal-json");
+ kafkaConfig.put(TOPIC.key(), topic);
+ kafkaConfig.put(SCAN_STARTUP_MODE.key(), GROUP_OFFSETS.toString());
KafkaSyncTableAction action =
syncTableActionBuilder(kafkaConfig)
.withPartitionKeys("pt")
@@ -888,8 +898,8 @@ public void testComputedColumn() throws Exception {
throw new Exception("Failed to write canal data to Kafka.", e);
}
Map kafkaConfig = getBasicKafkaConfig();
- kafkaConfig.put("value.format", "canal-json");
- kafkaConfig.put("topic", topic);
+ kafkaConfig.put(VALUE_FORMAT.key(), "canal-json");
+ kafkaConfig.put(TOPIC.key(), topic);
KafkaSyncTableAction action =
syncTableActionBuilder(kafkaConfig)
.withPartitionKeys("_year")
@@ -922,8 +932,8 @@ public void testTypeMappingToString() throws Exception {
writeRecordsToKafka(topic, readLines("kafka/canal/table/tostring/canal-data-1.txt"));
Map kafkaConfig = getBasicKafkaConfig();
- kafkaConfig.put("value.format", "canal-json");
- kafkaConfig.put("topic", topic);
+ kafkaConfig.put(VALUE_FORMAT.key(), "canal-json");
+ kafkaConfig.put(TOPIC.key(), topic);
KafkaSyncTableAction action =
syncTableActionBuilder(kafkaConfig)
@@ -968,8 +978,8 @@ public void testCDCOperations() throws Exception {
writeRecordsToKafka(topic, readLines("kafka/canal/table/event/event-row.txt"));
Map kafkaConfig = getBasicKafkaConfig();
- kafkaConfig.put("value.format", "canal-json");
- kafkaConfig.put("topic", topic);
+ kafkaConfig.put(VALUE_FORMAT.key(), "canal-json");
+ kafkaConfig.put(TOPIC.key(), topic);
KafkaSyncTableAction action =
syncTableActionBuilder(kafkaConfig).withTableConfig(getBasicTableConfig()).build();
@@ -1045,8 +1055,8 @@ public void testSyncWithInitialEmptyTopic() throws Exception {
Collections.emptyMap());
Map kafkaConfig = getBasicKafkaConfig();
- kafkaConfig.put("value.format", "canal-json");
- kafkaConfig.put("topic", topic);
+ kafkaConfig.put(VALUE_FORMAT.key(), "canal-json");
+ kafkaConfig.put(TOPIC.key(), topic);
KafkaSyncTableAction action =
syncTableActionBuilder(kafkaConfig)
.withTableConfig(getBasicTableConfig())
@@ -1082,8 +1092,8 @@ public void testSynchronizeIncompleteJson() throws Exception {
writeRecordsToKafka(topic, lines);
Map kafkaConfig = getBasicKafkaConfig();
- kafkaConfig.put("value.format", "canal-json");
- kafkaConfig.put("topic", topic);
+ kafkaConfig.put(VALUE_FORMAT.key(), "canal-json");
+ kafkaConfig.put(TOPIC.key(), topic);
KafkaSyncTableAction action =
syncTableActionBuilder(kafkaConfig)
.withPrimaryKeys("k")
@@ -1114,8 +1124,8 @@ public void testSynchronizeNonPkTable() throws Exception {
writeRecordsToKafka(topic, lines);
Map kafkaConfig = getBasicKafkaConfig();
- kafkaConfig.put("value.format", "canal-json");
- kafkaConfig.put("topic", topic);
+ kafkaConfig.put(VALUE_FORMAT.key(), "canal-json");
+ kafkaConfig.put(TOPIC.key(), topic);
KafkaSyncTableAction action =
syncTableActionBuilder(kafkaConfig).withTableConfig(getBasicTableConfig()).build();
@@ -1142,8 +1152,8 @@ public void testMissingDecimalPrecision() throws Exception {
writeRecordsToKafka(topic, readLines("kafka/canal/table/incomplete/canal-data-2.txt"));
Map kafkaConfig = getBasicKafkaConfig();
- kafkaConfig.put("value.format", "canal-json");
- kafkaConfig.put("topic", topic);
+ kafkaConfig.put(VALUE_FORMAT.key(), "canal-json");
+ kafkaConfig.put(TOPIC.key(), topic);
KafkaSyncTableAction action =
syncTableActionBuilder(kafkaConfig).withTableConfig(getBasicTableConfig()).build();
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java
index f73734df30bb..c9b66d12c953 100644
--- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java
@@ -31,6 +31,9 @@
import java.util.List;
import java.util.Map;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TOPIC;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.VALUE_FORMAT;
+
/** IT cases for {@link KafkaDebeziumSyncTableActionITCase}. */
public class KafkaDebeziumSyncTableActionITCase extends KafkaActionITCaseBase {
@@ -52,8 +55,8 @@ private void runSingleTableSchemaEvolution(String sourceDir) throws Exception {
throw new Exception("Failed to write debezium data to Kafka.", e);
}
Map kafkaConfig = getBasicKafkaConfig();
- kafkaConfig.put("value.format", "debezium-json");
- kafkaConfig.put("topic", topic);
+ kafkaConfig.put(VALUE_FORMAT.key(), "debezium-json");
+ kafkaConfig.put(TOPIC.key(), topic);
KafkaSyncTableAction action =
syncTableActionBuilder(kafkaConfig)
.withPrimaryKeys("id")
@@ -153,8 +156,8 @@ public void testComputedColumn() throws Exception {
throw new Exception("Failed to write canal data to Kafka.", e);
}
Map kafkaConfig = getBasicKafkaConfig();
- kafkaConfig.put("value.format", "debezium-json");
- kafkaConfig.put("topic", topic);
+ kafkaConfig.put(VALUE_FORMAT.key(), "debezium-json");
+ kafkaConfig.put(TOPIC.key(), topic);
KafkaSyncTableAction action =
syncTableActionBuilder(kafkaConfig)
.withPrimaryKeys("id")
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumWithSchemaSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumWithSchemaSyncTableActionITCase.java
index cf2847951510..9e56c538d25c 100644
--- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumWithSchemaSyncTableActionITCase.java
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumWithSchemaSyncTableActionITCase.java
@@ -31,6 +31,9 @@
import java.util.List;
import java.util.Map;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TOPIC;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.VALUE_FORMAT;
+
/** IT cases for {@link KafkaSyncTableAction}. */
public class KafkaDebeziumWithSchemaSyncTableActionITCase extends KafkaActionITCaseBase {
@@ -54,9 +57,9 @@ private void runSingleTableSchemaEvolution(String sourceDir) throws Exception {
throw new Exception("Failed to write debezium data to Kafka.", e);
}
Map kafkaConfig = getBasicKafkaConfig();
- kafkaConfig.put("value.format", "debezium-json");
+ kafkaConfig.put(VALUE_FORMAT.key(), "debezium-json");
kafkaConfig.put("value.debezium-json.schema-include", "true");
- kafkaConfig.put("topic", topic);
+ kafkaConfig.put(TOPIC.key(), topic);
KafkaSyncTableAction action =
syncTableActionBuilder(kafkaConfig)
.withPrimaryKeys("id")
@@ -159,9 +162,9 @@ public void testComputedColumn() throws Exception {
throw new Exception("Failed to write canal data to Kafka.", e);
}
Map kafkaConfig = getBasicKafkaConfig();
- kafkaConfig.put("value.format", "debezium-json");
+ kafkaConfig.put(VALUE_FORMAT.key(), "debezium-json");
kafkaConfig.put("value.debezium-json.schema-include", "true");
- kafkaConfig.put("topic", topic);
+ kafkaConfig.put(TOPIC.key(), topic);
KafkaSyncTableAction action =
syncTableActionBuilder(kafkaConfig)
.withPrimaryKeys("id")
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaMaxwellSyncDatabaseActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaMaxwellSyncDatabaseActionITCase.java
index 645a0a9c9e92..d85217844867 100644
--- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaMaxwellSyncDatabaseActionITCase.java
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaMaxwellSyncDatabaseActionITCase.java
@@ -36,6 +36,8 @@
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TOPIC;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.VALUE_FORMAT;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** IT cases for {@link KafkaSyncDatabaseAction}. */
@@ -67,8 +69,8 @@ public void testSchemaEvolutionMultiTopic() throws Exception {
}
Map kafkaConfig = getBasicKafkaConfig();
- kafkaConfig.put("value.format", "maxwell-json");
- kafkaConfig.put("topic", String.join(";", topics));
+ kafkaConfig.put(VALUE_FORMAT.key(), "maxwell-json");
+ kafkaConfig.put(TOPIC.key(), String.join(";", topics));
KafkaSyncDatabaseAction action =
syncDatabaseActionBuilder(kafkaConfig)
.withTableConfig(getBasicTableConfig())
@@ -103,8 +105,8 @@ public void testSchemaEvolutionOneTopic() throws Exception {
}
Map kafkaConfig = getBasicKafkaConfig();
- kafkaConfig.put("value.format", "maxwell-json");
- kafkaConfig.put("topic", String.join(";", topics));
+ kafkaConfig.put(VALUE_FORMAT.key(), "maxwell-json");
+ kafkaConfig.put(TOPIC.key(), String.join(";", topics));
KafkaSyncDatabaseAction action =
syncDatabaseActionBuilder(kafkaConfig)
.withTableConfig(getBasicTableConfig())
@@ -204,7 +206,7 @@ private void testSchemaEvolutionImpl(List topics, boolean writeOne, int
@Test
public void testTopicIsEmpty() {
Map kafkaConfig = getBasicKafkaConfig();
- kafkaConfig.put("value.format", "maxwell-json");
+ kafkaConfig.put(VALUE_FORMAT.key(), "maxwell-json");
KafkaSyncDatabaseAction action = syncDatabaseActionBuilder(kafkaConfig).build();
@@ -257,8 +259,8 @@ public void testTableAffixMultiTopic() throws Exception {
// try synchronization
Map kafkaConfig = getBasicKafkaConfig();
- kafkaConfig.put("value.format", "maxwell-json");
- kafkaConfig.put("topic", String.join(";", topics));
+ kafkaConfig.put(VALUE_FORMAT.key(), "maxwell-json");
+ kafkaConfig.put(TOPIC.key(), String.join(";", topics));
KafkaSyncDatabaseAction action =
syncDatabaseActionBuilder(kafkaConfig)
.withTablePrefix("test_prefix_")
@@ -313,8 +315,8 @@ public void testTableAffixOneTopic() throws Exception {
// try synchronization
Map kafkaConfig = getBasicKafkaConfig();
- kafkaConfig.put("value.format", "maxwell-json");
- kafkaConfig.put("topic", String.join(";", topics));
+ kafkaConfig.put(VALUE_FORMAT.key(), "maxwell-json");
+ kafkaConfig.put(TOPIC.key(), String.join(";", topics));
KafkaSyncDatabaseAction action =
syncDatabaseActionBuilder(kafkaConfig)
.withTablePrefix("test_prefix_")
@@ -463,8 +465,8 @@ private void includingAndExcludingTablesImpl(
}
// try synchronization
Map kafkaConfig = getBasicKafkaConfig();
- kafkaConfig.put("value.format", "maxwell-json");
- kafkaConfig.put("topic", String.join(";", topics));
+ kafkaConfig.put(VALUE_FORMAT.key(), "maxwell-json");
+ kafkaConfig.put(TOPIC.key(), String.join(";", topics));
KafkaSyncDatabaseAction action =
syncDatabaseActionBuilder(kafkaConfig)
.includingTables(includingTables)
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaMaxwellSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaMaxwellSyncTableActionITCase.java
index 8bfae94930c4..6a860508129a 100644
--- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaMaxwellSyncTableActionITCase.java
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaMaxwellSyncTableActionITCase.java
@@ -31,6 +31,16 @@
import java.util.List;
import java.util.Map;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_MODE;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_SPECIFIC_OFFSETS;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_TIMESTAMP_MILLIS;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.ScanStartupMode.EARLIEST_OFFSET;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.ScanStartupMode.GROUP_OFFSETS;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.ScanStartupMode.LATEST_OFFSET;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.ScanStartupMode.SPECIFIC_OFFSETS;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.ScanStartupMode.TIMESTAMP;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TOPIC;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.VALUE_FORMAT;
import static org.apache.paimon.testutils.assertj.AssertionUtils.anyCauseMatches;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -55,8 +65,8 @@ private void runSingleTableSchemaEvolution(String sourceDir) throws Exception {
throw new Exception("Failed to write maxwell data to Kafka.", e);
}
Map kafkaConfig = getBasicKafkaConfig();
- kafkaConfig.put("value.format", "maxwell-json");
- kafkaConfig.put("topic", topic);
+ kafkaConfig.put(VALUE_FORMAT.key(), "maxwell-json");
+ kafkaConfig.put(TOPIC.key(), topic);
KafkaSyncTableAction action =
syncTableActionBuilder(kafkaConfig)
.withPrimaryKeys("id")
@@ -154,8 +164,8 @@ public void testNotSupportFormat() throws Exception {
throw new Exception("Failed to write maxwell data to Kafka.", e);
}
Map kafkaConfig = getBasicKafkaConfig();
- kafkaConfig.put("value.format", "togg-json");
- kafkaConfig.put("topic", topic);
+ kafkaConfig.put(VALUE_FORMAT.key(), "togg-json");
+ kafkaConfig.put(TOPIC.key(), topic);
KafkaSyncTableAction action =
syncTableActionBuilder(kafkaConfig)
.withPrimaryKeys("id")
@@ -182,8 +192,8 @@ public void testAssertSchemaCompatible() throws Exception {
throw new Exception("Failed to write maxwell data to Kafka.", e);
}
Map kafkaConfig = getBasicKafkaConfig();
- kafkaConfig.put("value.format", "maxwell-json");
- kafkaConfig.put("topic", topic);
+ kafkaConfig.put(VALUE_FORMAT.key(), "maxwell-json");
+ kafkaConfig.put(TOPIC.key(), topic);
// create an incompatible table
createFileStoreTable(
@@ -222,10 +232,10 @@ public void testStarUpOptionSpecific() throws Exception {
throw new Exception("Failed to write maxwell data to Kafka.", e);
}
Map kafkaConfig = getBasicKafkaConfig();
- kafkaConfig.put("value.format", "maxwell-json");
- kafkaConfig.put("topic", topic);
- kafkaConfig.put("scan.startup.mode", "specific-offsets");
- kafkaConfig.put("scan.startup.specific-offsets", "partition:0,offset:1");
+ kafkaConfig.put(VALUE_FORMAT.key(), "maxwell-json");
+ kafkaConfig.put(TOPIC.key(), topic);
+ kafkaConfig.put(SCAN_STARTUP_MODE.key(), SPECIFIC_OFFSETS.toString());
+ kafkaConfig.put(SCAN_STARTUP_SPECIFIC_OFFSETS.key(), "partition:0,offset:1");
KafkaSyncTableAction action =
syncTableActionBuilder(kafkaConfig)
.withPrimaryKeys("id")
@@ -264,9 +274,9 @@ public void testStarUpOptionLatest() throws Exception {
throw new Exception("Failed to write maxwell data to Kafka.", e);
}
Map kafkaConfig = getBasicKafkaConfig();
- kafkaConfig.put("value.format", "maxwell-json");
- kafkaConfig.put("topic", topic);
- kafkaConfig.put("scan.startup.mode", "latest-offset");
+ kafkaConfig.put(VALUE_FORMAT.key(), "maxwell-json");
+ kafkaConfig.put(TOPIC.key(), topic);
+ kafkaConfig.put(SCAN_STARTUP_MODE.key(), LATEST_OFFSET.toString());
KafkaSyncTableAction action =
syncTableActionBuilder(kafkaConfig)
.withPrimaryKeys("id")
@@ -314,11 +324,11 @@ public void testStarUpOptionTimestamp() throws Exception {
throw new Exception("Failed to write maxwell data to Kafka.", e);
}
Map kafkaConfig = getBasicKafkaConfig();
- kafkaConfig.put("value.format", "maxwell-json");
- kafkaConfig.put("topic", topic);
- kafkaConfig.put("scan.startup.mode", "timestamp");
+ kafkaConfig.put(VALUE_FORMAT.key(), "maxwell-json");
+ kafkaConfig.put(TOPIC.key(), topic);
+ kafkaConfig.put(SCAN_STARTUP_MODE.key(), TIMESTAMP.toString());
kafkaConfig.put(
- "scan.startup.timestamp-millis", String.valueOf(System.currentTimeMillis()));
+ SCAN_STARTUP_TIMESTAMP_MILLIS.key(), String.valueOf(System.currentTimeMillis()));
KafkaSyncTableAction action =
syncTableActionBuilder(kafkaConfig)
.withPrimaryKeys("id")
@@ -365,9 +375,9 @@ public void testStarUpOptionEarliest() throws Exception {
throw new Exception("Failed to write maxwell data to Kafka.", e);
}
Map kafkaConfig = getBasicKafkaConfig();
- kafkaConfig.put("value.format", "maxwell-json");
- kafkaConfig.put("topic", topic);
- kafkaConfig.put("scan.startup.mode", "earliest-offset");
+ kafkaConfig.put(VALUE_FORMAT.key(), "maxwell-json");
+ kafkaConfig.put(TOPIC.key(), topic);
+ kafkaConfig.put(SCAN_STARTUP_MODE.key(), EARLIEST_OFFSET.toString());
KafkaSyncTableAction action =
syncTableActionBuilder(kafkaConfig)
.withPrimaryKeys("id")
@@ -416,9 +426,9 @@ public void testStarUpOptionGroup() throws Exception {
throw new Exception("Failed to write maxwell data to Kafka.", e);
}
Map kafkaConfig = getBasicKafkaConfig();
- kafkaConfig.put("value.format", "maxwell-json");
- kafkaConfig.put("topic", topic);
- kafkaConfig.put("scan.startup.mode", "group-offsets");
+ kafkaConfig.put(VALUE_FORMAT.key(), "maxwell-json");
+ kafkaConfig.put(TOPIC.key(), topic);
+ kafkaConfig.put(SCAN_STARTUP_MODE.key(), GROUP_OFFSETS.toString());
KafkaSyncTableAction action =
syncTableActionBuilder(kafkaConfig)
.withPrimaryKeys("id")
@@ -467,8 +477,8 @@ public void testComputedColumn() throws Exception {
throw new Exception("Failed to write canal data to Kafka.", e);
}
Map kafkaConfig = getBasicKafkaConfig();
- kafkaConfig.put("value.format", "maxwell-json");
- kafkaConfig.put("topic", topic);
+ kafkaConfig.put(VALUE_FORMAT.key(), "maxwell-json");
+ kafkaConfig.put(TOPIC.key(), topic);
KafkaSyncTableAction action =
syncTableActionBuilder(kafkaConfig)
.withPartitionKeys("_year")
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaOggSyncDatabaseActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaOggSyncDatabaseActionITCase.java
index c19611712a20..d13c61b99ded 100644
--- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaOggSyncDatabaseActionITCase.java
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaOggSyncDatabaseActionITCase.java
@@ -37,6 +37,8 @@
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TOPIC;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.VALUE_FORMAT;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** IT cases for {@link KafkaSyncDatabaseAction}. */
@@ -68,8 +70,8 @@ public void testSchemaEvolutionMultiTopic() throws Exception {
}
Map kafkaConfig = getBasicKafkaConfig();
- kafkaConfig.put("value.format", "ogg-json");
- kafkaConfig.put("topic", String.join(";", topics));
+ kafkaConfig.put(VALUE_FORMAT.key(), "ogg-json");
+ kafkaConfig.put(TOPIC.key(), String.join(";", topics));
KafkaSyncDatabaseAction action =
syncDatabaseActionBuilder(kafkaConfig)
.withTableConfig(getBasicTableConfig())
@@ -104,8 +106,8 @@ public void testSchemaEvolutionOneTopic() throws Exception {
}
Map kafkaConfig = getBasicKafkaConfig();
- kafkaConfig.put("value.format", "ogg-json");
- kafkaConfig.put("topic", String.join(";", topics));
+ kafkaConfig.put(VALUE_FORMAT.key(), "ogg-json");
+ kafkaConfig.put(TOPIC.key(), String.join(";", topics));
KafkaSyncDatabaseAction action =
syncDatabaseActionBuilder(kafkaConfig)
.withTableConfig(getBasicTableConfig())
@@ -205,7 +207,7 @@ private void testSchemaEvolutionImpl(List topics, boolean writeOne, int
@Test
public void testTopicIsEmpty() {
Map kafkaConfig = getBasicKafkaConfig();
- kafkaConfig.put("value.format", "ogg-json");
+ kafkaConfig.put(VALUE_FORMAT.key(), "ogg-json");
KafkaSyncDatabaseAction action = syncDatabaseActionBuilder(kafkaConfig).build();
@@ -255,8 +257,8 @@ public void testTableAffixMultiTopic() throws Exception {
// try synchronization
Map kafkaConfig = getBasicKafkaConfig();
- kafkaConfig.put("value.format", "ogg-json");
- kafkaConfig.put("topic", String.join(";", topics));
+ kafkaConfig.put(VALUE_FORMAT.key(), "ogg-json");
+ kafkaConfig.put(TOPIC.key(), String.join(";", topics));
KafkaSyncDatabaseAction action =
syncDatabaseActionBuilder(kafkaConfig)
.withTablePrefix("TEST_PREFIX_")
@@ -308,8 +310,8 @@ public void testTableAffixOneTopic() throws Exception {
// try synchronization
Map kafkaConfig = getBasicKafkaConfig();
- kafkaConfig.put("value.format", "ogg-json");
- kafkaConfig.put("topic", String.join(";", topics));
+ kafkaConfig.put(VALUE_FORMAT.key(), "ogg-json");
+ kafkaConfig.put(TOPIC.key(), String.join(";", topics));
KafkaSyncDatabaseAction action =
syncDatabaseActionBuilder(kafkaConfig)
.withTablePrefix("TEST_PREFIX_")
@@ -454,8 +456,8 @@ private void includingAndExcludingTablesImpl(
}
// try synchronization
Map kafkaConfig = getBasicKafkaConfig();
- kafkaConfig.put("value.format", "ogg-json");
- kafkaConfig.put("topic", String.join(";", topics));
+ kafkaConfig.put(VALUE_FORMAT.key(), "ogg-json");
+ kafkaConfig.put(TOPIC.key(), String.join(";", topics));
KafkaSyncDatabaseAction action =
syncDatabaseActionBuilder(kafkaConfig)
.includingTables(includingTables)
@@ -480,8 +482,8 @@ public void testCaseInsensitive() throws Exception {
writeRecordsToKafka(topic, readLines("kafka/ogg/database/case-insensitive/ogg-data-1.txt"));
Map kafkaConfig = getBasicKafkaConfig();
- kafkaConfig.put("value.format", "ogg-json");
- kafkaConfig.put("topic", topic);
+ kafkaConfig.put(VALUE_FORMAT.key(), "ogg-json");
+ kafkaConfig.put(TOPIC.key(), topic);
KafkaSyncDatabaseAction action =
syncDatabaseActionBuilder(kafkaConfig)
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaOggSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaOggSyncTableActionITCase.java
index 588119854b20..3ca5504cc5b9 100644
--- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaOggSyncTableActionITCase.java
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaOggSyncTableActionITCase.java
@@ -31,6 +31,8 @@
import java.util.List;
import java.util.Map;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TOPIC;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.VALUE_FORMAT;
import static org.apache.paimon.testutils.assertj.AssertionUtils.anyCauseMatches;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -55,8 +57,8 @@ private void runSingleTableSchemaEvolution(String sourceDir) throws Exception {
throw new Exception("Failed to write ogg data to Kafka.", e);
}
Map kafkaConfig = getBasicKafkaConfig();
- kafkaConfig.put("value.format", "ogg-json");
- kafkaConfig.put("topic", topic);
+ kafkaConfig.put(VALUE_FORMAT.key(), "ogg-json");
+ kafkaConfig.put(TOPIC.key(), topic);
KafkaSyncTableAction action =
syncTableActionBuilder(kafkaConfig)
.withPrimaryKeys("id")
@@ -152,8 +154,8 @@ public void testNotSupportFormat() throws Exception {
throw new Exception("Failed to write ogg data to Kafka.", e);
}
Map kafkaConfig = getBasicKafkaConfig();
- kafkaConfig.put("value.format", "togg-json");
- kafkaConfig.put("topic", topic);
+ kafkaConfig.put(VALUE_FORMAT.key(), "togg-json");
+ kafkaConfig.put(TOPIC.key(), topic);
KafkaSyncTableAction action =
syncTableActionBuilder(kafkaConfig)
.withPrimaryKeys("id")
@@ -180,8 +182,8 @@ public void testAssertSchemaCompatible() throws Exception {
throw new Exception("Failed to write ogg data to Kafka.", e);
}
Map kafkaConfig = getBasicKafkaConfig();
- kafkaConfig.put("value.format", "ogg-json");
- kafkaConfig.put("topic", topic);
+ kafkaConfig.put(VALUE_FORMAT.key(), "ogg-json");
+ kafkaConfig.put(TOPIC.key(), topic);
// create an incompatible table
createFileStoreTable(
@@ -220,8 +222,8 @@ public void testStarUpOptionSpecific() throws Exception {
throw new Exception("Failed to write ogg data to Kafka.", e);
}
Map kafkaConfig = getBasicKafkaConfig();
- kafkaConfig.put("value.format", "ogg-json");
- kafkaConfig.put("topic", topic);
+ kafkaConfig.put(VALUE_FORMAT.key(), "ogg-json");
+ kafkaConfig.put(TOPIC.key(), topic);
kafkaConfig.put("scan.startup.mode", "specific-offsets");
kafkaConfig.put("scan.startup.specific-offsets", "partition:0,offset:1");
KafkaSyncTableAction action =
@@ -263,8 +265,8 @@ public void testStarUpOptionLatest() throws Exception {
throw new Exception("Failed to write ogg data to Kafka.", e);
}
Map kafkaConfig = getBasicKafkaConfig();
- kafkaConfig.put("value.format", "ogg-json");
- kafkaConfig.put("topic", topic);
+ kafkaConfig.put(VALUE_FORMAT.key(), "ogg-json");
+ kafkaConfig.put(TOPIC.key(), topic);
kafkaConfig.put("scan.startup.mode", "latest-offset");
KafkaSyncTableAction action =
syncTableActionBuilder(kafkaConfig)
@@ -312,8 +314,8 @@ public void testStarUpOptionTimestamp() throws Exception {
throw new Exception("Failed to write ogg data to Kafka.", e);
}
Map kafkaConfig = getBasicKafkaConfig();
- kafkaConfig.put("value.format", "ogg-json");
- kafkaConfig.put("topic", topic);
+ kafkaConfig.put(VALUE_FORMAT.key(), "ogg-json");
+ kafkaConfig.put(TOPIC.key(), topic);
kafkaConfig.put("scan.startup.mode", "timestamp");
kafkaConfig.put(
"scan.startup.timestamp-millis", String.valueOf(System.currentTimeMillis()));
@@ -362,8 +364,8 @@ public void testStarUpOptionEarliest() throws Exception {
throw new Exception("Failed to write ogg data to Kafka.", e);
}
Map kafkaConfig = getBasicKafkaConfig();
- kafkaConfig.put("value.format", "ogg-json");
- kafkaConfig.put("topic", topic);
+ kafkaConfig.put(VALUE_FORMAT.key(), "ogg-json");
+ kafkaConfig.put(TOPIC.key(), topic);
kafkaConfig.put("scan.startup.mode", "earliest-offset");
KafkaSyncTableAction action =
syncTableActionBuilder(kafkaConfig)
@@ -412,8 +414,8 @@ public void testStarUpOptionGroup() throws Exception {
throw new Exception("Failed to write ogg data to Kafka.", e);
}
Map kafkaConfig = getBasicKafkaConfig();
- kafkaConfig.put("value.format", "ogg-json");
- kafkaConfig.put("topic", topic);
+ kafkaConfig.put(VALUE_FORMAT.key(), "ogg-json");
+ kafkaConfig.put(TOPIC.key(), topic);
kafkaConfig.put("scan.startup.mode", "group-offsets");
KafkaSyncTableAction action =
syncTableActionBuilder(kafkaConfig)
@@ -462,8 +464,8 @@ public void testComputedColumn() throws Exception {
throw new Exception("Failed to write canal data to Kafka.", e);
}
Map kafkaConfig = getBasicKafkaConfig();
- kafkaConfig.put("value.format", "ogg-json");
- kafkaConfig.put("topic", topic);
+ kafkaConfig.put(VALUE_FORMAT.key(), "ogg-json");
+ kafkaConfig.put(TOPIC.key(), topic);
KafkaSyncTableAction action =
syncTableActionBuilder(kafkaConfig)
.withPartitionKeys("_year")
@@ -501,8 +503,8 @@ public void testCDCOperations() throws Exception {
throw new Exception("Failed to write canal data to Kafka.", e);
}
Map kafkaConfig = getBasicKafkaConfig();
- kafkaConfig.put("value.format", "ogg-json");
- kafkaConfig.put("topic", topic);
+ kafkaConfig.put(VALUE_FORMAT.key(), "ogg-json");
+ kafkaConfig.put(TOPIC.key(), topic);
KafkaSyncTableAction action =
syncTableActionBuilder(kafkaConfig)
.withPrimaryKeys("id")
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchemaITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchemaITCase.java
index e7113feb0110..a62a4380e6c1 100644
--- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchemaITCase.java
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchemaITCase.java
@@ -35,6 +35,8 @@
import java.util.List;
import java.util.Map;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TOPIC;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.VALUE_FORMAT;
import static org.apache.paimon.flink.action.cdc.kafka.KafkaActionUtils.getDataFormat;
import static org.apache.paimon.flink.action.cdc.kafka.KafkaActionUtils.getKafkaEarliestConsumer;
import static org.assertj.core.api.Assertions.assertThat;
@@ -54,8 +56,8 @@ public void testKafkaSchema() throws Exception {
throw new Exception("Failed to write canal data to Kafka.", e);
}
Configuration kafkaConfig = Configuration.fromMap(getBasicKafkaConfig());
- kafkaConfig.setString("value.format", "canal-json");
- kafkaConfig.setString("topic", topic);
+ kafkaConfig.setString(VALUE_FORMAT.key(), "canal-json");
+ kafkaConfig.setString(TOPIC.key(), topic);
Schema kafkaSchema =
MessageQueueSchemaUtils.getSchema(
@@ -80,8 +82,8 @@ public void testTableOptionsChange() throws Exception {
writeRecordsToKafka(topic, readLines("kafka/canal/table/optionschange/canal-data-1.txt"));
Map kafkaConfig = getBasicKafkaConfig();
- kafkaConfig.put("value.format", "canal-json");
- kafkaConfig.put("topic", topic);
+ kafkaConfig.put(VALUE_FORMAT.key(), "canal-json");
+ kafkaConfig.put(TOPIC.key(), topic);
Map tableConfig = new HashMap<>();
tableConfig.put("bucket", "1");
tableConfig.put("sink.parallelism", "1");
@@ -122,8 +124,8 @@ public void testNewlyAddedTablesOptionsChange() throws Exception {
writeRecordsToKafka(
topic, readLines("kafka/canal/database/schemaevolution/topic0/canal-data-1.txt"));
Map kafkaConfig = getBasicKafkaConfig();
- kafkaConfig.put("value.format", "canal-json");
- kafkaConfig.put("topic", topic);
+ kafkaConfig.put(VALUE_FORMAT.key(), "canal-json");
+ kafkaConfig.put(TOPIC.key(), topic);
Map tableConfig = new HashMap<>();
tableConfig.put("bucket", "1");
tableConfig.put("sink.parallelism", "1");
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarActionITCaseBase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarActionITCaseBase.java
index 8172cd65206d..c696124c8500 100644
--- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarActionITCaseBase.java
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarActionITCaseBase.java
@@ -81,6 +81,7 @@ public class PulsarActionITCaseBase extends CdcActionITCaseBase {
private PulsarAdmin admin;
private PulsarClient client;
+ protected List topics = new ArrayList<>();
@RegisterExtension
public static final PulsarContainerExtension PULSAR_CONTAINER =
@@ -191,7 +192,6 @@ private void createPartitionedTopic(String topic, int numberOfPartitions) {
}
private void deleteTopics() throws Exception {
- List topics = admin.topics().getList("public/default");
for (String topic : topics) {
String topicName = topicName(topic);
PartitionedTopicMetadata metadata =
@@ -260,6 +260,11 @@ protected PulsarSyncTableActionBuilder syncTableActionBuilder(
return new PulsarSyncTableActionBuilder(pulsarConfig);
}
+ protected PulsarSyncDatabaseActionBuilder syncDatabaseActionBuilder(
+ Map pulsarConfig) {
+ return new PulsarSyncDatabaseActionBuilder(pulsarConfig);
+ }
+
/** Builder to build {@link PulsarSyncTableAction} from action arguments. */
protected class PulsarSyncTableActionBuilder
extends SyncTableActionBuilder {
@@ -298,6 +303,51 @@ public PulsarSyncTableAction build() {
}
}
+ /** Builder to build {@link PulsarSyncDatabaseAction} from action arguments. */
+ protected class PulsarSyncDatabaseActionBuilder
+ extends SyncDatabaseActionBuilder {
+
+ public PulsarSyncDatabaseActionBuilder(Map pulsarConfig) {
+ super(pulsarConfig);
+ }
+
+ public PulsarSyncDatabaseActionBuilder ignoreIncompatible(boolean ignoreIncompatible) {
+ throw new UnsupportedOperationException();
+ }
+
+ public PulsarSyncDatabaseActionBuilder mergeShards(boolean mergeShards) {
+ throw new UnsupportedOperationException();
+ }
+
+ public PulsarSyncDatabaseActionBuilder withMode(String mode) {
+ throw new UnsupportedOperationException();
+ }
+
+ public PulsarSyncDatabaseAction build() {
+ List args =
+ new ArrayList<>(
+ Arrays.asList("--warehouse", warehouse, "--database", database));
+
+ args.addAll(mapToArgs("--pulsar-conf", sourceConfig));
+ args.addAll(mapToArgs("--catalog-conf", catalogConfig));
+ args.addAll(mapToArgs("--table-conf", tableConfig));
+
+ args.addAll(nullableToArgs("--table-prefix", tablePrefix));
+ args.addAll(nullableToArgs("--table-suffix", tableSuffix));
+ args.addAll(nullableToArgs("--including-tables", includingTables));
+ args.addAll(nullableToArgs("--excluding-tables", excludingTables));
+
+ args.addAll(listToArgs("--type-mapping", typeMappingModes));
+
+ MultipleParameterTool params =
+ MultipleParameterTool.fromArgs(args.toArray(args.toArray(new String[0])));
+ return (PulsarSyncDatabaseAction)
+ new PulsarSyncDatabaseActionFactory()
+ .create(params)
+ .orElseThrow(RuntimeException::new);
+ }
+ }
+
/** Pulsar container extension for junit5. */
private static class PulsarContainerExtension extends PulsarContainer
implements BeforeAllCallback, AfterAllCallback {
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncDatabaseActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncDatabaseActionITCase.java
new file mode 100644
index 000000000000..946a1f6e4b2e
--- /dev/null
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncDatabaseActionITCase.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.pulsar;
+
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS;
+import static org.apache.paimon.flink.action.cdc.pulsar.PulsarActionUtils.TOPIC;
+import static org.apache.paimon.flink.action.cdc.pulsar.PulsarActionUtils.VALUE_FORMAT;
+
+/** IT cases for {@link PulsarSyncDatabaseAction}. */
+public class PulsarSyncDatabaseActionITCase extends PulsarActionITCaseBase {
+
+ @Test
+ @Timeout(60)
+ public void testSchemaEvolutionMultiTopic() throws Exception {
+ final String topic1 = "schema_evolution_0";
+ final String topic2 = "schema_evolution_1";
+ final String topic3 = "schema_evolution_2";
+ boolean writeOne = false;
+ int fileCount = 3;
+ topics = Arrays.asList(topic1, topic2, topic3);
+ topics.forEach(topic -> createTopic(topic, 1));
+
+ // ---------- Write the Canal json into Pulsar -------------------
+
+ for (int i = 0; i < fileCount; i++) {
+ try {
+ sendMessages(
+ topics.get(i),
+ getMessages(
+ "kafka/canal/database/schemaevolution/topic"
+ + i
+ + "/canal-data-1.txt"));
+ } catch (Exception e) {
+ throw new Exception("Failed to write canal data to Pulsar.", e);
+ }
+ }
+
+ Map pulsarConfig = getBasicPulsarConfig();
+ pulsarConfig.put(PULSAR_PARTITION_DISCOVERY_INTERVAL_MS.key(), "-1");
+ pulsarConfig.put(VALUE_FORMAT.key(), "canal-json");
+ pulsarConfig.put(TOPIC.key(), String.join(",", topics));
+ PulsarSyncDatabaseAction action =
+ syncDatabaseActionBuilder(pulsarConfig)
+ .withTableConfig(getBasicTableConfig())
+ .build();
+ runActionWithDefaultEnv(action);
+
+ testSchemaEvolutionImpl(topics, writeOne, fileCount);
+ }
+
+ @Test
+ @Timeout(60)
+ public void testSchemaEvolutionOneTopic() throws Exception {
+ final String topic = "schema_evolution";
+ boolean writeOne = true;
+ int fileCount = 3;
+ topics = Collections.singletonList(topic);
+ topics.forEach(t -> createTopic(t, 1));
+
+ // ---------- Write the Canal json into Pulsar -------------------
+
+ for (int i = 0; i < fileCount; i++) {
+ try {
+ sendMessages(
+ topics.get(0),
+ getMessages(
+ "kafka/canal/database/schemaevolution/topic"
+ + i
+ + "/canal-data-1.txt"));
+ } catch (Exception e) {
+ throw new Exception("Failed to write canal data to Pulsar.", e);
+ }
+ }
+
+ Map pulsarConfig = getBasicPulsarConfig();
+ pulsarConfig.put(PULSAR_PARTITION_DISCOVERY_INTERVAL_MS.key(), "-1");
+ pulsarConfig.put(VALUE_FORMAT.key(), "canal-json");
+ pulsarConfig.put(TOPIC.key(), String.join(";", topics));
+
+ PulsarSyncDatabaseAction action =
+ syncDatabaseActionBuilder(pulsarConfig)
+ .withTableConfig(getBasicTableConfig())
+ .build();
+ runActionWithDefaultEnv(action);
+ testSchemaEvolutionImpl(topics, writeOne, fileCount);
+ }
+
+ private void testSchemaEvolutionImpl(List topics, boolean writeOne, int fileCount)
+ throws Exception {
+ waitingTables("t1", "t2");
+
+ FileStoreTable table1 = getFileStoreTable("t1");
+ FileStoreTable table2 = getFileStoreTable("t2");
+
+ RowType rowType1 =
+ RowType.of(
+ new DataType[] {DataTypes.INT().notNull(), DataTypes.VARCHAR(10)},
+ new String[] {"k", "v1"});
+ List primaryKeys1 = Collections.singletonList("k");
+ List expected = Arrays.asList("+I[1, one]", "+I[3, three]");
+ waitForResult(expected, table1, rowType1, primaryKeys1);
+
+ RowType rowType2 =
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT().notNull(),
+ DataTypes.VARCHAR(10).notNull(),
+ DataTypes.INT(),
+ DataTypes.BIGINT()
+ },
+ new String[] {"k1", "k2", "v1", "v2"});
+ List primaryKeys2 = Arrays.asList("k1", "k2");
+ expected = Arrays.asList("+I[2, two, 20, 200]", "+I[4, four, 40, 400]");
+ waitForResult(expected, table2, rowType2, primaryKeys2);
+
+ for (int i = 0; i < fileCount; i++) {
+ try {
+ sendMessages(
+ writeOne ? topics.get(0) : topics.get(i),
+ getMessages(
+ "kafka/canal/database/schemaevolution/topic"
+ + i
+ + "/canal-data-2.txt"));
+ } catch (Exception e) {
+ throw new Exception("Failed to write canal data to Pulsar.", e);
+ }
+ }
+
+ rowType1 =
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT().notNull(), DataTypes.VARCHAR(10), DataTypes.INT()
+ },
+ new String[] {"k", "v1", "v2"});
+ expected =
+ Arrays.asList(
+ "+I[1, one, NULL]",
+ "+I[3, three, NULL]",
+ "+I[5, five, 50]",
+ "+I[7, seven, 70]");
+ waitForResult(expected, table1, rowType1, primaryKeys1);
+
+ rowType2 =
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT().notNull(),
+ DataTypes.VARCHAR(10).notNull(),
+ DataTypes.INT(),
+ DataTypes.BIGINT(),
+ DataTypes.VARCHAR(10)
+ },
+ new String[] {"k1", "k2", "v1", "v2", "v3"});
+ expected =
+ Arrays.asList(
+ "+I[2, two, 20, 200, NULL]",
+ "+I[4, four, 40, 400, NULL]",
+ "+I[6, six, 60, 600, string_6]",
+ "+I[8, eight, 80, 800, string_8]");
+ waitForResult(expected, table2, rowType2, primaryKeys2);
+
+ for (int i = 0; i < fileCount; i++) {
+ try {
+ sendMessages(
+ writeOne ? topics.get(0) : topics.get(i),
+ getMessages(
+ "kafka/canal/database/schemaevolution/topic"
+ + i
+ + "/canal-data-3.txt"));
+ } catch (Exception e) {
+ throw new Exception("Failed to write canal data to Pulsar.", e);
+ }
+ }
+
+ rowType1 =
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT().notNull(), DataTypes.VARCHAR(10), DataTypes.BIGINT()
+ },
+ new String[] {"k", "v1", "v2"});
+ expected =
+ Arrays.asList(
+ "+I[1, one, NULL]",
+ "+I[3, three, NULL]",
+ "+I[5, five, 50]",
+ "+I[7, seven, 70]",
+ "+I[9, nine, 9000000000000]");
+ waitForResult(expected, table1, rowType1, primaryKeys1);
+
+ rowType2 =
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT().notNull(),
+ DataTypes.VARCHAR(10).notNull(),
+ DataTypes.INT(),
+ DataTypes.BIGINT(),
+ DataTypes.VARCHAR(20)
+ },
+ new String[] {"k1", "k2", "v1", "v2", "v3"});
+ expected =
+ Arrays.asList(
+ "+I[2, two, 20, 200, NULL]",
+ "+I[4, four, 40, 400, NULL]",
+ "+I[6, six, 60, 600, string_6]",
+ "+I[8, eight, 80, 800, string_8]",
+ "+I[10, ten, 100, 1000, long_long_string_10]");
+ waitForResult(expected, table2, rowType2, primaryKeys2);
+ }
+}
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncTableActionITCase.java
index a39507b5f537..e61b94261042 100644
--- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncTableActionITCase.java
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncTableActionITCase.java
@@ -27,6 +27,7 @@
import org.junit.jupiter.api.Timeout;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -45,6 +46,7 @@ public void testSchemaEvolution() throws Exception {
private void runSingleTableSchemaEvolution(String sourceDir) throws Exception {
final String topic = "schema_evolution";
+ topics = Collections.singletonList(topic);
createTopic(topic, 1);
// ---------- Write the Canal json into Pulsar -------------------
sendMessages(