From 77f10a04af71449aecb1e4f76acdf6d559b159da Mon Sep 17 00:00:00 2001 From: Kerwin <37063904+zhuangchong@users.noreply.github.com> Date: Tue, 26 Sep 2023 15:02:03 +0800 Subject: [PATCH] [cdc] Refactor mysql debezium json event parser (#2053) --- .../action/cdc/kafka/KafkaSchemaUtils.java | 6 +- .../cdc/kafka/KafkaSyncDatabaseAction.java | 4 +- .../cdc/kafka/KafkaSyncTableAction.java | 4 +- .../kafka/{formats => format}/DataFormat.java | 8 +- .../{formats => format}/RecordParser.java | 2 +- .../RecordParserFactory.java | 2 +- .../canal/CanalFieldParser.java | 2 +- .../canal/CanalRecordParser.java | 4 +- .../maxwell/MaxwellRecordParser.java | 4 +- .../ogg/OggRecordParser.java | 4 +- .../mysql/MySqlDebeziumJsonEventParser.java | 179 +++++-------- .../cdc/mysql/MySqlTableSchemaBuilder.java | 70 +++-- .../action/cdc/mysql/MySqlTypeUtils.java | 4 +- .../cdc/mysql/format/DebeziumEvent.java | 245 ++++++++++++++++++ .../cdc/mysql/format/DebeziumEventUtils.java | 45 ++++ .../action/cdc/mysql/DebeziumEventTest.java | 71 +++++ .../mysql/debezium-event-change.json | 141 ++++++++++ 17 files changed, 626 insertions(+), 169 deletions(-) rename paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/{formats => format}/DataFormat.java (91%) rename paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/{formats => format}/RecordParser.java (99%) rename paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/{formats => format}/RecordParserFactory.java (96%) rename paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/{formats => format}/canal/CanalFieldParser.java (98%) rename paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/{formats => format}/canal/CanalRecordParser.java (98%) rename paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/{formats => format}/maxwell/MaxwellRecordParser.java (96%) rename paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/{formats => format}/ogg/OggRecordParser.java (97%) create mode 100644 paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/format/DebeziumEvent.java create mode 100644 paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/format/DebeziumEventUtils.java create mode 100644 paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/DebeziumEventTest.java create mode 100644 paimon-flink/paimon-flink-cdc/src/test/resources/mysql/debezium-event-change.json diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchemaUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchemaUtils.java index 330df8e79918..7ed9a4aab708 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchemaUtils.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchemaUtils.java @@ -19,8 +19,8 @@ package org.apache.paimon.flink.action.cdc.kafka; import org.apache.paimon.flink.action.cdc.TypeMapping; -import org.apache.paimon.flink.action.cdc.kafka.formats.DataFormat; -import org.apache.paimon.flink.action.cdc.kafka.formats.RecordParser; +import org.apache.paimon.flink.action.cdc.kafka.format.DataFormat; +import org.apache.paimon.flink.action.cdc.kafka.format.RecordParser; import org.apache.paimon.schema.Schema; import org.apache.flink.configuration.Configuration; @@ -44,7 +44,7 @@ import java.util.stream.StreamSupport; import static org.apache.paimon.flink.action.cdc.kafka.KafkaActionUtils.kafkaPropertiesGroupId; -import static org.apache.paimon.flink.action.cdc.kafka.formats.DataFormat.getDataFormat; +import static org.apache.paimon.flink.action.cdc.kafka.format.DataFormat.getDataFormat; /** Utility class to load kafka schema. */ public class KafkaSchemaUtils { diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java index 61aa45108ba9..e8df881fbfb9 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java @@ -24,8 +24,8 @@ import org.apache.paimon.flink.action.MultiTablesSinkMode; import org.apache.paimon.flink.action.cdc.TableNameConverter; import org.apache.paimon.flink.action.cdc.TypeMapping; -import org.apache.paimon.flink.action.cdc.kafka.formats.DataFormat; -import org.apache.paimon.flink.action.cdc.kafka.formats.RecordParser; +import org.apache.paimon.flink.action.cdc.kafka.format.DataFormat; +import org.apache.paimon.flink.action.cdc.kafka.format.RecordParser; import org.apache.paimon.flink.sink.cdc.EventParser; import org.apache.paimon.flink.sink.cdc.FlinkCdcSyncDatabaseSinkBuilder; import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord; diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java index 87d3f0ddbe0c..b00fa38b050d 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java @@ -26,8 +26,8 @@ import org.apache.paimon.flink.action.ActionBase; import org.apache.paimon.flink.action.cdc.ComputedColumn; import org.apache.paimon.flink.action.cdc.TypeMapping; -import org.apache.paimon.flink.action.cdc.kafka.formats.DataFormat; -import org.apache.paimon.flink.action.cdc.kafka.formats.RecordParser; +import org.apache.paimon.flink.action.cdc.kafka.format.DataFormat; +import org.apache.paimon.flink.action.cdc.kafka.format.RecordParser; import org.apache.paimon.flink.sink.cdc.CdcSinkBuilder; import org.apache.paimon.flink.sink.cdc.EventParser; import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord; diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/DataFormat.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/format/DataFormat.java similarity index 91% rename from paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/DataFormat.java rename to paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/format/DataFormat.java index 71c4af8f4d6f..e178fec4014a 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/DataFormat.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/format/DataFormat.java @@ -16,13 +16,13 @@ * limitations under the License. */ -package org.apache.paimon.flink.action.cdc.kafka.formats; +package org.apache.paimon.flink.action.cdc.kafka.format; import org.apache.paimon.flink.action.cdc.ComputedColumn; import org.apache.paimon.flink.action.cdc.TypeMapping; -import org.apache.paimon.flink.action.cdc.kafka.formats.canal.CanalRecordParser; -import org.apache.paimon.flink.action.cdc.kafka.formats.maxwell.MaxwellRecordParser; -import org.apache.paimon.flink.action.cdc.kafka.formats.ogg.OggRecordParser; +import org.apache.paimon.flink.action.cdc.kafka.format.canal.CanalRecordParser; +import org.apache.paimon.flink.action.cdc.kafka.format.maxwell.MaxwellRecordParser; +import org.apache.paimon.flink.action.cdc.kafka.format.ogg.OggRecordParser; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions; diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/RecordParser.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/format/RecordParser.java similarity index 99% rename from paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/RecordParser.java rename to paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/format/RecordParser.java index bf6be1b7de17..2c0ffb19ab09 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/RecordParser.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/format/RecordParser.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.paimon.flink.action.cdc.kafka.formats; +package org.apache.paimon.flink.action.cdc.kafka.format; import org.apache.paimon.flink.action.cdc.ComputedColumn; import org.apache.paimon.flink.action.cdc.TypeMapping; diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/RecordParserFactory.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/format/RecordParserFactory.java similarity index 96% rename from paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/RecordParserFactory.java rename to paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/format/RecordParserFactory.java index a47bca23f118..e83d4b0d78ce 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/RecordParserFactory.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/format/RecordParserFactory.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.paimon.flink.action.cdc.kafka.formats; +package org.apache.paimon.flink.action.cdc.kafka.format; import org.apache.paimon.flink.action.cdc.ComputedColumn; import org.apache.paimon.flink.action.cdc.TypeMapping; diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/canal/CanalFieldParser.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/format/canal/CanalFieldParser.java similarity index 98% rename from paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/canal/CanalFieldParser.java rename to paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/format/canal/CanalFieldParser.java index cb77fe8e7f80..619da6b79db6 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/canal/CanalFieldParser.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/format/canal/CanalFieldParser.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.paimon.flink.action.cdc.kafka.formats.canal; +package org.apache.paimon.flink.action.cdc.kafka.format.canal; /** Converts some special types such as enum、set、geometry. */ public class CanalFieldParser { diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/canal/CanalRecordParser.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/format/canal/CanalRecordParser.java similarity index 98% rename from paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/canal/CanalRecordParser.java rename to paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/format/canal/CanalRecordParser.java index e0f2ec748656..62ca27a4fa6a 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/canal/CanalRecordParser.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/format/canal/CanalRecordParser.java @@ -16,11 +16,11 @@ * limitations under the License. */ -package org.apache.paimon.flink.action.cdc.kafka.formats.canal; +package org.apache.paimon.flink.action.cdc.kafka.format.canal; import org.apache.paimon.flink.action.cdc.ComputedColumn; import org.apache.paimon.flink.action.cdc.TypeMapping; -import org.apache.paimon.flink.action.cdc.kafka.formats.RecordParser; +import org.apache.paimon.flink.action.cdc.kafka.format.RecordParser; import org.apache.paimon.flink.action.cdc.mysql.MySqlTypeUtils; import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord; import org.apache.paimon.types.DataType; diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/maxwell/MaxwellRecordParser.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/format/maxwell/MaxwellRecordParser.java similarity index 96% rename from paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/maxwell/MaxwellRecordParser.java rename to paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/format/maxwell/MaxwellRecordParser.java index 8b51edad98e7..f116d5d27969 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/maxwell/MaxwellRecordParser.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/format/maxwell/MaxwellRecordParser.java @@ -16,11 +16,11 @@ * limitations under the License. */ -package org.apache.paimon.flink.action.cdc.kafka.formats.maxwell; +package org.apache.paimon.flink.action.cdc.kafka.format.maxwell; import org.apache.paimon.flink.action.cdc.ComputedColumn; import org.apache.paimon.flink.action.cdc.TypeMapping; -import org.apache.paimon.flink.action.cdc.kafka.formats.RecordParser; +import org.apache.paimon.flink.action.cdc.kafka.format.RecordParser; import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord; import org.apache.paimon.types.RowKind; diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/ogg/OggRecordParser.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/format/ogg/OggRecordParser.java similarity index 97% rename from paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/ogg/OggRecordParser.java rename to paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/format/ogg/OggRecordParser.java index 7f4fcc76a405..0c389d48d712 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/ogg/OggRecordParser.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/format/ogg/OggRecordParser.java @@ -16,11 +16,11 @@ * limitations under the License. */ -package org.apache.paimon.flink.action.cdc.kafka.formats.ogg; +package org.apache.paimon.flink.action.cdc.kafka.format.ogg; import org.apache.paimon.flink.action.cdc.ComputedColumn; import org.apache.paimon.flink.action.cdc.TypeMapping; -import org.apache.paimon.flink.action.cdc.kafka.formats.RecordParser; +import org.apache.paimon.flink.action.cdc.kafka.format.RecordParser; import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord; import org.apache.paimon.types.RowKind; diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java index fd6c23e0db79..e18cc1931d93 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java @@ -27,18 +27,17 @@ import org.apache.paimon.flink.action.cdc.ComputedColumn; import org.apache.paimon.flink.action.cdc.TableNameConverter; import org.apache.paimon.flink.action.cdc.TypeMapping; +import org.apache.paimon.flink.action.cdc.mysql.format.DebeziumEvent; import org.apache.paimon.flink.sink.cdc.CdcRecord; import org.apache.paimon.flink.sink.cdc.EventParser; import org.apache.paimon.flink.sink.cdc.NewTableSchemaBuilder; import org.apache.paimon.schema.Schema; import org.apache.paimon.types.DataField; -import org.apache.paimon.types.DataType; import org.apache.paimon.types.RowKind; import org.apache.paimon.utils.DateTimeUtils; import org.apache.paimon.utils.Preconditions; import org.apache.paimon.utils.StringUtils; -import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.DeserializationFeature; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper; @@ -69,6 +68,8 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -77,7 +78,6 @@ import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.mapKeyCaseConvert; import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.recordKeyDuplicateErrMsg; -import static org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_NULLABLE; import static org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_STRING; /** {@link EventParser} for MySQL Debezium JSON. */ @@ -90,15 +90,15 @@ public class MySqlDebeziumJsonEventParser implements EventParser { private final boolean caseSensitive; private final TableNameConverter tableNameConverter; private final List computedColumns; - private final NewTableSchemaBuilder schemaBuilder; + private final NewTableSchemaBuilder schemaBuilder; @Nullable private final Pattern includingPattern; @Nullable private final Pattern excludingPattern; private final Set includedTables = new HashSet<>(); private final Set excludedTables = new HashSet<>(); private final TypeMapping typeMapping; - private JsonNode root; - private JsonNode payload; + private DebeziumEvent root; + // NOTE: current table name is not converted by tableNameConverter private String currentTable; private boolean shouldSynchronizeCurrentTable; @@ -113,7 +113,7 @@ public MySqlDebeziumJsonEventParser( caseSensitive, computedColumns, new TableNameConverter(caseSensitive), - ddl -> Optional.empty(), + new MySqlTableSchemaBuilder(new HashMap<>(), caseSensitive, typeMapping), null, null, typeMapping); @@ -123,7 +123,7 @@ public MySqlDebeziumJsonEventParser( ZoneId serverTimeZone, boolean caseSensitive, TableNameConverter tableNameConverter, - NewTableSchemaBuilder schemaBuilder, + NewTableSchemaBuilder schemaBuilder, @Nullable Pattern includingPattern, @Nullable Pattern excludingPattern, TypeMapping typeMapping) { @@ -143,7 +143,7 @@ public MySqlDebeziumJsonEventParser( boolean caseSensitive, List computedColumns, TableNameConverter tableNameConverter, - NewTableSchemaBuilder schemaBuilder, + NewTableSchemaBuilder schemaBuilder, @Nullable Pattern includingPattern, @Nullable Pattern excludingPattern, TypeMapping typeMapping) { @@ -160,10 +160,11 @@ public MySqlDebeziumJsonEventParser( @Override public void setRawEvent(String rawEvent) { try { - objectMapper.configure(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS, true); - root = objectMapper.readValue(rawEvent, JsonNode.class); - payload = root.get("payload"); - currentTable = payload.get("source").get("table").asText(); + objectMapper + .configure(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS, true) + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + root = objectMapper.readValue(rawEvent, DebeziumEvent.class); + currentTable = root.payload().source().table(); shouldSynchronizeCurrentTable = shouldSynchronizeCurrentTable(); } catch (Exception e) { throw new RuntimeException(e); @@ -175,63 +176,37 @@ public String parseTableName() { return tableNameConverter.convert(Identifier.create(getDatabaseName(), currentTable)); } - private boolean isSchemaChange() { - return payload.get("op") == null; - } - @Override public List parseSchemaChange() { - if (!shouldSynchronizeCurrentTable || !isSchemaChange()) { + if (!shouldSynchronizeCurrentTable || !root.payload().isSchemaChange()) { return Collections.emptyList(); } - JsonNode historyRecord = payload.get("historyRecord"); - if (historyRecord == null) { + DebeziumEvent.Payload payload = root.payload(); + if (!payload.hasHistoryRecord()) { return Collections.emptyList(); } - JsonNode columns; + TableChanges.TableChange tableChange = null; try { - String historyRecordString = historyRecord.asText(); - JsonNode tableChanges = objectMapper.readTree(historyRecordString).get("tableChanges"); - if (tableChanges.size() != 1) { + Iterator tableChanges = payload.getTableChanges(); + long count; + for (count = 0L; tableChanges.hasNext(); ++count) { + tableChange = tableChanges.next(); + } + if (count != 1) { LOG.error( "Invalid historyRecord, because tableChanges should contain exactly 1 item.\n" - + historyRecord.asText()); + + payload.historyRecord()); return Collections.emptyList(); } - columns = tableChanges.get(0).get("table").get("columns"); } catch (Exception e) { LOG.info("Failed to parse history record for schema changes", e); return Collections.emptyList(); } - if (columns == null) { - return Collections.emptyList(); - } - List result = new ArrayList<>(); - for (int i = 0; i < columns.size(); i++) { - JsonNode column = columns.get(i); - JsonNode length = column.get("length"); - JsonNode scale = column.get("scale"); - DataType dataType = - MySqlTypeUtils.toDataType( - column.get("typeName").asText(), - length == null ? null : length.asInt(), - scale == null ? null : scale.asInt(), - typeMapping); - - dataType = - dataType.copy( - typeMapping.containsMode(TO_NULLABLE) - || column.get("optional").asBoolean()); - - String fieldName = column.get("name").asText(); - result.add( - new DataField( - i, caseSensitive ? fieldName : fieldName.toLowerCase(), dataType)); - } - return result; + Optional schema = schemaBuilder.build(tableChange); + return schema.get().fields(); } @Override @@ -240,31 +215,31 @@ public Optional parseNewTable() { return Optional.empty(); } - JsonNode historyRecord = payload.get("historyRecord"); - if (historyRecord == null) { + DebeziumEvent.Payload payload = root.payload(); + if (!payload.hasHistoryRecord()) { return Optional.empty(); } try { - String historyRecordString = historyRecord.asText(); - JsonNode tableChanges = objectMapper.readTree(historyRecordString).get("tableChanges"); - if (tableChanges.size() != 1) { + TableChanges.TableChange tableChange = null; + Iterator tableChanges = payload.getTableChanges(); + long count; + for (count = 0L; tableChanges.hasNext(); ++count) { + tableChange = tableChanges.next(); + } + if (count != 1) { LOG.error( "Invalid historyRecord, because tableChanges should contain exactly 1 item.\n" - + historyRecord.asText()); + + payload.historyRecord()); return Optional.empty(); } - JsonNode tableChange = tableChanges.get(0); - if (!tableChange - .get("type") - .asText() - .equals(TableChanges.TableChangeType.CREATE.name())) { + if (TableChanges.TableChangeType.CREATE != tableChange.getType()) { return Optional.empty(); } - JsonNode primaryKeyColumnNames = tableChange.get("table").get("primaryKeyColumnNames"); - if (primaryKeyColumnNames.size() == 0) { + List primaryKeyColumnNames = tableChange.getTable().primaryKeyColumnNames(); + if (primaryKeyColumnNames.isEmpty()) { LOG.debug( "Didn't find primary keys from MySQL DDL for table '{}'. " + "This table won't be synchronized.", @@ -283,19 +258,19 @@ public Optional parseNewTable() { @Override public List parseRecords() { - if (!shouldSynchronizeCurrentTable || isSchemaChange()) { + if (!shouldSynchronizeCurrentTable || root.payload().isSchemaChange()) { return Collections.emptyList(); } List records = new ArrayList<>(); - Map before = extractRow(payload.get("before")); - if (before.size() > 0) { + Map before = extractRow(root.payload().before()); + if (!before.isEmpty()) { before = mapKeyCaseConvert(before, caseSensitive, recordKeyDuplicateErrMsg(before)); records.add(new CdcRecord(RowKind.DELETE, before)); } - Map after = extractRow(payload.get("after")); - if (after.size() > 0) { + Map after = extractRow(root.payload().after()); + if (!after.isEmpty()) { after = mapKeyCaseConvert(after, caseSensitive, recordKeyDuplicateErrMsg(after)); records.add(new CdcRecord(RowKind.INSERT, after)); } @@ -304,57 +279,34 @@ public List parseRecords() { } private String getDatabaseName() { - return payload.get("source").get("db").asText(); + return root.payload().source().db(); } private Map extractRow(JsonNode recordRow) { - JsonNode schema = + if (recordRow == null) { + return new HashMap<>(); + } + + DebeziumEvent.Field schema = Preconditions.checkNotNull( - root.get("schema"), + root.schema(), "MySqlDebeziumJsonEventParser only supports debezium JSON with schema. " + "Please make sure that `includeSchema` is true " + "in the JsonDebeziumDeserializationSchema you created"); - Map mySqlFieldTypes = new HashMap<>(); - Map fieldClassNames = new HashMap<>(); - JsonNode arrayNode = schema.get("fields"); - for (int i = 0; i < arrayNode.size(); i++) { - JsonNode elementNode = arrayNode.get(i); - String field = elementNode.get("field").asText(); - if ("before".equals(field) || "after".equals(field)) { - JsonNode innerArrayNode = elementNode.get("fields"); - for (int j = 0; j < innerArrayNode.size(); j++) { - JsonNode innerElementNode = innerArrayNode.get(j); - String fieldName = innerElementNode.get("field").asText(); - String fieldType = innerElementNode.get("type").asText(); - mySqlFieldTypes.put(fieldName, fieldType); - if (innerElementNode.get("name") != null) { - String className = innerElementNode.get("name").asText(); - fieldClassNames.put(fieldName, className); - } - } - } - } - - // the geometry, point type can not be converted to string, so we convert it to Object - // first. - Map jsonMap = - objectMapper.convertValue(recordRow, new TypeReference>() {}); - if (jsonMap == null) { - return new HashMap<>(); - } + Map fields = schema.beforeAndAfterFields(); - Map resultMap = new HashMap<>(); - for (Map.Entry field : mySqlFieldTypes.entrySet()) { + LinkedHashMap resultMap = new LinkedHashMap<>(); + for (Map.Entry field : fields.entrySet()) { String fieldName = field.getKey(); - String mySqlType = field.getValue(); - Object objectValue = jsonMap.get(fieldName); - if (objectValue == null) { + String mySqlType = field.getValue().type(); + JsonNode objectValue = recordRow.get(fieldName); + if (objectValue == null || objectValue.isNull()) { continue; } - String className = fieldClassNames.get(fieldName); - String oldValue = objectValue.toString(); + String className = field.getValue().name(); + String oldValue = objectValue.asText(); String newValue = oldValue; if (Bits.LOGICAL_NAME.equals(className)) { @@ -444,13 +396,13 @@ else if (Date.SCHEMA_NAME.equals(className)) { .toString(); } else if (Point.LOGICAL_NAME.equals(className) || Geometry.LOGICAL_NAME.equals(className)) { - JsonNode jsonNode = recordRow.get(fieldName); try { - byte[] wkb = jsonNode.get("wkb").binaryValue(); + byte[] wkb = objectValue.get(Geometry.WKB_FIELD).binaryValue(); newValue = MySqlTypeUtils.convertWkbArray(wkb); } catch (Exception e) { throw new IllegalArgumentException( - String.format("Failed to convert %s to geometry JSON.", jsonNode), e); + String.format("Failed to convert %s to geometry JSON.", objectValue), + e); } } @@ -468,6 +420,11 @@ else if (Date.SCHEMA_NAME.equals(className)) { } private boolean shouldSynchronizeCurrentTable() { + // When database DDL operation, the current table is null. + if (currentTable == null) { + return false; + } + if (excludedTables.contains(currentTable)) { return false; } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTableSchemaBuilder.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTableSchemaBuilder.java index aec731fb2112..24edbc66fc08 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTableSchemaBuilder.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTableSchemaBuilder.java @@ -23,22 +23,22 @@ import org.apache.paimon.schema.Schema; import org.apache.paimon.types.DataType; -import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode; -import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ArrayNode; +import io.debezium.relational.Column; +import io.debezium.relational.Table; +import io.debezium.relational.history.TableChanges; -import java.util.ArrayList; -import java.util.LinkedHashMap; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.columnDuplicateErrMsg; import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.listCaseConvert; -import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.mapKeyCaseConvert; import static org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_NULLABLE; +import static org.apache.paimon.utils.Preconditions.checkArgument; /** Schema builder for MySQL cdc. */ -public class MySqlTableSchemaBuilder implements NewTableSchemaBuilder { +public class MySqlTableSchemaBuilder implements NewTableSchemaBuilder { private final Map tableConfig; private final boolean caseSensitive; @@ -52,47 +52,45 @@ public MySqlTableSchemaBuilder( } @Override - public Optional build(JsonNode tableChange) { - JsonNode jsonTable = tableChange.get("table"); - String tableName = tableChange.get("id").asText(); - ArrayNode columns = (ArrayNode) jsonTable.get("columns"); - LinkedHashMap fields = new LinkedHashMap<>(); - - for (JsonNode element : columns) { - JsonNode length = element.get("length"); - JsonNode scale = element.get("scale"); + public Optional build(TableChanges.TableChange tableChange) { + Table table = tableChange.getTable(); + String tableName = tableChange.getId().toString(); + List columns = table.columns(); + + Schema.Builder builder = Schema.newBuilder(); + Map duplicateFields = new HashMap<>(); + + // column + for (Column column : columns) { DataType dataType = MySqlTypeUtils.toDataType( - element.get("typeExpression").asText(), - length == null ? null : length.asInt(), - scale == null ? null : scale.asInt(), + column.typeExpression(), + column.length(), + column.scale().orElse(null), typeMapping); - dataType = - dataType.copy( - typeMapping.containsMode(TO_NULLABLE) - || element.get("optional").asBoolean()); + dataType = dataType.copy(typeMapping.containsMode(TO_NULLABLE) || column.isOptional()); + String columnName = column.name(); + if (!caseSensitive) { + checkArgument( + !duplicateFields.containsKey(columnName.toLowerCase()), + columnDuplicateErrMsg(tableName).apply(columnName)); + columnName = columnName.toLowerCase(); + } // TODO : add table comment and column comment when we upgrade flink cdc to 2.4 - fields.put(element.get("name").asText(), dataType); + builder.column(columnName, dataType, null); + duplicateFields.put(columnName, 1); } - ArrayNode arrayNode = (ArrayNode) jsonTable.get("primaryKeyColumnNames"); - List primaryKeys = new ArrayList<>(); - for (JsonNode primary : arrayNode) { - primaryKeys.add(primary.asText()); - } - - fields = mapKeyCaseConvert(fields, caseSensitive, columnDuplicateErrMsg(tableName)); + // primaryKey + List primaryKeys = table.primaryKeyColumnNames(); primaryKeys = listCaseConvert(primaryKeys, caseSensitive); + builder.primaryKey(primaryKeys); - Schema.Builder builder = Schema.newBuilder(); + // options builder.options(tableConfig); - for (Map.Entry entry : fields.entrySet()) { - builder.column(entry.getKey(), entry.getValue()); - } - Schema schema = builder.primaryKey(primaryKeys).build(); - return Optional.of(schema); + return Optional.of(builder.build()); } } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java index a9a3c9630a4f..616ba572645c 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java @@ -228,7 +228,7 @@ public static DataType toDataType( return DataTypes.TIME(); case DATETIME: case TIMESTAMP: - if (length == null) { + if (length == null || length <= 0) { // default precision is 0 // see https://dev.mysql.com/doc/refman/8.0/en/date-and-time-type-syntax.html return DataTypes.TIMESTAMP(0); @@ -241,7 +241,7 @@ public static DataType toDataType( } else { return DataTypes.TIMESTAMP(0); } - } else if (length >= 0 && length <= TimestampType.MAX_PRECISION) { + } else if (length <= TimestampType.MAX_PRECISION) { return DataTypes.TIMESTAMP(length); } else { throw new UnsupportedOperationException( diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/format/DebeziumEvent.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/format/DebeziumEvent.java new file mode 100644 index 000000000000..48ab55ba4157 --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/format/DebeziumEvent.java @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action.cdc.mysql.format; + +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode; + +import io.debezium.relational.history.TableChanges; + +import java.io.IOException; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +/** Debezium Event Records Entity. */ +public class DebeziumEvent { + + private static final String FIELD_PAYLOAD = "payload"; + private static final String FIELD_SCHEMA = "schema"; + private static final String FIELD_SOURCE = "source"; + private static final String FIELD_BEFORE = "before"; + private static final String FIELD_AFTER = "after"; + private static final String FIELD_HISTORY_RECORD = "historyRecord"; + private static final String FIELD_OP = "op"; + private static final String FIELD_DB = "db"; + private static final String FIELD_TABLE = "table"; + private static final String FIELD_FIELDS = "fields"; + private static final String FIELD_NAME = "name"; + private static final String FIELD_TYPE = "type"; + private static final String FIELD_FIELD = "field"; + private static final String FIELD_OPTIONAL = "optional"; + + @JsonProperty(FIELD_PAYLOAD) + private final Payload payload; + + @JsonProperty(FIELD_SCHEMA) + private final Field schema; + + @JsonCreator + public DebeziumEvent( + @JsonProperty(FIELD_PAYLOAD) Payload payload, + @JsonProperty(FIELD_SCHEMA) Field schema) { + this.payload = payload; + this.schema = schema; + } + + @JsonGetter(FIELD_PAYLOAD) + public Payload payload() { + return payload; + } + + @JsonGetter(FIELD_SCHEMA) + public Field schema() { + return schema; + } + + /** Payload elements in Debezium event record. */ + public static class Payload { + @JsonProperty(FIELD_SOURCE) + private final Source source; + + @JsonProperty(FIELD_BEFORE) + private final JsonNode before; + + @JsonProperty(FIELD_AFTER) + private final JsonNode after; + + @JsonProperty(FIELD_HISTORY_RECORD) + private final String historyRecord; + + @JsonProperty(FIELD_OP) + private final String op; + + @JsonCreator + public Payload( + @JsonProperty(FIELD_SOURCE) Source source, + @JsonProperty(FIELD_BEFORE) JsonNode before, + @JsonProperty(FIELD_AFTER) JsonNode after, + @JsonProperty(FIELD_HISTORY_RECORD) String historyRecord, + @JsonProperty(FIELD_OP) String op) { + this.source = source; + this.before = before; + this.after = after; + this.historyRecord = historyRecord; + this.op = op; + } + + @JsonGetter(FIELD_SOURCE) + public Source source() { + return source; + } + + @JsonGetter(FIELD_BEFORE) + public JsonNode before() { + return before; + } + + @JsonGetter(FIELD_AFTER) + public JsonNode after() { + return after; + } + + @JsonGetter(FIELD_HISTORY_RECORD) + public String historyRecord() { + return historyRecord; + } + + @JsonGetter(FIELD_OP) + public String op() { + return op; + } + + public boolean isSchemaChange() { + return op() == null; + } + + public boolean hasHistoryRecord() { + return historyRecord != null; + } + + /** Get table changes in history record. */ + public Iterator getTableChanges() throws IOException { + return DebeziumEventUtils.getTableChanges(historyRecord).iterator(); + } + } + + /** Payload elements in Debezium event record. */ + public static class Field { + + @JsonProperty(FIELD_FIELD) + private final String field; + + @JsonProperty(FIELD_TYPE) + private final String type; + + @JsonProperty(FIELD_NAME) + private final String name; + + @JsonProperty(FIELD_OPTIONAL) + private final Boolean optional; + + @JsonProperty(FIELD_FIELDS) + private final List fields; + + @JsonCreator + public Field( + @JsonProperty(FIELD_FIELD) String field, + @JsonProperty(FIELD_TYPE) String type, + @JsonProperty(FIELD_NAME) String name, + @JsonProperty(FIELD_OPTIONAL) Boolean optional, + @JsonProperty(FIELD_FIELDS) List fields) { + this.field = field; + this.type = type; + this.name = name; + this.optional = optional; + this.fields = fields; + } + + @JsonGetter(FIELD_FIELD) + public String field() { + return field; + } + + @JsonGetter(FIELD_TYPE) + public String type() { + return type; + } + + @JsonGetter(FIELD_NAME) + public String name() { + return name; + } + + @JsonGetter(FIELD_OPTIONAL) + public Boolean optional() { + return optional; + } + + @JsonGetter(FIELD_FIELDS) + public List fields() { + return fields; + } + + public Map beforeAndAfterFields() { + return fields.stream() + .filter( + item -> + FIELD_BEFORE.equals(item.field) + || FIELD_AFTER.equals(item.field)) + .flatMap(item -> item.fields.stream()) + .collect( + Collectors.toMap( + Field::field, + Function.identity(), + (v1, v2) -> v2, + LinkedHashMap::new)); + } + } + + /** Source element of payload in Debezium event record. */ + public static class Source { + @JsonProperty(FIELD_DB) + private final String db; + + @JsonProperty(FIELD_TABLE) + private final String table; + + @JsonCreator + public Source(@JsonProperty(FIELD_DB) String db, @JsonProperty(FIELD_TABLE) String table) { + this.db = db; + this.table = table; + } + + @JsonGetter(FIELD_DB) + public String db() { + return db; + } + + @JsonGetter(FIELD_TABLE) + public String table() { + return table; + } + } +} diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/format/DebeziumEventUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/format/DebeziumEventUtils.java new file mode 100644 index 000000000000..562d138ad822 --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/format/DebeziumEventUtils.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action.cdc.mysql.format; + +import com.ververica.cdc.debezium.history.FlinkJsonTableChangeSerializer; +import io.debezium.document.Array; +import io.debezium.document.DocumentReader; +import io.debezium.relational.history.HistoryRecord; +import io.debezium.relational.history.TableChanges; + +import java.io.IOException; + +/** A utility class that provide abilities for debezium event {@link DebeziumEvent}. */ +public class DebeziumEventUtils { + + private static final DocumentReader DOCUMENT_READER = DocumentReader.defaultReader(); + private static final FlinkJsonTableChangeSerializer TABLE_CHANGE_SERIALIZER = + new FlinkJsonTableChangeSerializer(); + + public static HistoryRecord getHistoryRecord(String historyRecordStr) throws IOException { + return new HistoryRecord(DOCUMENT_READER.read(historyRecordStr)); + } + + public static TableChanges getTableChanges(String historyRecordStr) throws IOException { + HistoryRecord historyRecord = getHistoryRecord(historyRecordStr); + Array tableChanges = historyRecord.document().getArray(HistoryRecord.Fields.TABLE_CHANGES); + return TABLE_CHANGE_SERIALIZER.deserialize(tableChanges, true); + } +} diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/DebeziumEventTest.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/DebeziumEventTest.java new file mode 100644 index 000000000000..0e27d0b1111f --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/DebeziumEventTest.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action.cdc.mysql; + +import org.apache.paimon.flink.action.cdc.mysql.format.DebeziumEvent; + +import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonParser; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.DeserializationFeature; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import io.debezium.relational.history.TableChanges; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.net.URL; +import java.util.Iterator; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link DebeziumEvent}. */ +public class DebeziumEventTest { + + private ObjectMapper objectMapper; + + @BeforeEach + public void before() { + objectMapper = new ObjectMapper(); + objectMapper + .configure(JsonParser.Feature.ALLOW_COMMENTS, true) + .configure(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS, true) + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + } + + @Test + public void testDeserialize() throws IOException { + final URL url = + DebeziumEventTest.class + .getClassLoader() + .getResource("mysql/debezium-event-change.json"); + assertThat(url).isNotNull(); + DebeziumEvent debeziumEvent = objectMapper.readValue(url, DebeziumEvent.class); + assertThat(debeziumEvent).isNotNull(); + assertThat(debeziumEvent.payload().hasHistoryRecord()).isTrue(); + Iterator tableChanges = debeziumEvent.payload().getTableChanges(); + assertThat(Iterators.size(tableChanges)).isEqualTo(1); + tableChanges.forEachRemaining( + tableChange -> { + assertThat(tableChange.getType()).isEqualTo(TableChanges.TableChangeType.ALTER); + assertThat(tableChange.getTable().id().toString()) + .isEqualTo("tinyint1_not_bool_test.t1"); + }); + } +} diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/debezium-event-change.json b/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/debezium-event-change.json new file mode 100644 index 000000000000..d3c11acefcaa --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/debezium-event-change.json @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{ + "schema":{ + "type":"struct", + "fields":[ + { + "type":"struct", + "fields":[ + { + "type":"string", + "optional":false, + "field":"version" + }, + { + "type":"string", + "optional":false, + "field":"connector" + }, + { + "type":"string", + "optional":false, + "field":"name" + }, + { + "type":"int64", + "optional":false, + "field":"ts_ms" + }, + { + "type":"string", + "optional":true, + "name":"io.debezium.data.Enum", + "version":1, + "parameters":{ + "allowed":"true,last,false" + }, + "default":"false", + "field":"snapshot" + }, + { + "type":"string", + "optional":false, + "field":"db" + }, + { + "type":"string", + "optional":true, + "field":"sequence" + }, + { + "type":"string", + "optional":true, + "field":"table" + }, + { + "type":"int64", + "optional":false, + "field":"server_id" + }, + { + "type":"string", + "optional":true, + "field":"gtid" + }, + { + "type":"string", + "optional":false, + "field":"file" + }, + { + "type":"int64", + "optional":false, + "field":"pos" + }, + { + "type":"int32", + "optional":false, + "field":"row" + }, + { + "type":"int64", + "optional":true, + "field":"thread" + }, + { + "type":"string", + "optional":true, + "field":"query" + } + ], + "optional":false, + "name":"io.debezium.connector.mysql.Source", + "field":"source" + }, + { + "type":"string", + "optional":true, + "field":"historyRecord" + } + ], + "optional":false, + "name":"io.debezium.connector.mysql.SchemaChangeValue" + }, + "payload":{ + "source":{ + "version":"1.6.4.Final", + "connector":"mysql", + "name":"mysql_binlog_source", + "ts_ms":1695203563233, + "snapshot":"false", + "db":"tinyint1_not_bool_test", + "sequence":null, + "table":"t1", + "server_id":223344, + "gtid":null, + "file":"mysql-bin.000003", + "pos":219, + "row":0, + "thread":null, + "query":null + }, + "historyRecord":"{\"source\":{\"file\":\"mysql-bin.000003\",\"pos\":219,\"server_id\":223344},\"position\":{\"transaction_id\":null,\"ts_sec\":1695203563,\"file\":\"mysql-bin.000003\",\"pos\":379,\"server_id\":223344},\"databaseName\":\"tinyint1_not_bool_test\",\"ddl\":\"ALTER TABLE t1 ADD COLUMN _new_tinyint1 TINYINT(1)\",\"tableChanges\":[{\"type\":\"ALTER\",\"id\":\"\\\"tinyint1_not_bool_test\\\".\\\"t1\\\"\",\"table\":{\"defaultCharsetName\":\"latin1\",\"primaryKeyColumnNames\":[\"pk\"],\"columns\":[{\"name\":\"pk\",\"jdbcType\":4,\"typeName\":\"INT\",\"typeExpression\":\"INT\",\"charsetName\":null,\"length\":11,\"position\":1,\"optional\":false,\"autoIncremented\":false,\"generated\":false},{\"name\":\"_tinyint1\",\"jdbcType\":5,\"typeName\":\"TINYINT\",\"typeExpression\":\"TINYINT\",\"charsetName\":null,\"length\":1,\"position\":2,\"optional\":true,\"autoIncremented\":false,\"generated\":false},{\"name\":\"_new_tinyint1\",\"jdbcType\":5,\"typeName\":\"TINYINT\",\"typeExpression\":\"TINYINT\",\"charsetName\":null,\"length\":1,\"position\":3,\"optional\":true,\"autoIncremented\":false,\"generated\":false}]}}]}" + } +}