From 1129b69f07bb430b2c5c3e77cf5d19ffdca44252 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=91=A8=E5=B7=9E?= <54611681+javaht@users.noreply.github.com> Date: Sat, 9 Mar 2024 23:47:28 +0800 Subject: [PATCH] [Fix] fix the decimal precision of postgres transform (#3263) --- .../org/dinky/cdc/AbstractSinkBuilder.java | 2 +- .../dinky/cdc/utils/FlinkStatementUtil.java | 18 +++++++++++++++--- .../convert/PostgreSqlTypeConvert.java | 19 +++++++++++++++++-- 3 files changed, 33 insertions(+), 6 deletions(-) diff --git a/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/AbstractSinkBuilder.java b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/AbstractSinkBuilder.java index 58a1aefcfe..82109481e9 100644 --- a/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/AbstractSinkBuilder.java +++ b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/AbstractSinkBuilder.java @@ -238,7 +238,7 @@ public void addSink( protected List createInsertOperations( CustomTableEnvironment customTableEnvironment, Table table, String viewName, String tableName) { - String cdcSqlInsert = FlinkStatementUtil.getCDCInsertSql(table, tableName, viewName); + String cdcSqlInsert = FlinkStatementUtil.getCDCInsertSql(table, tableName, viewName, config); logger.info(cdcSqlInsert); List operations = customTableEnvironment.getParser().parse(cdcSqlInsert); diff --git a/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/utils/FlinkStatementUtil.java b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/utils/FlinkStatementUtil.java index c5b388a742..ab3956eb7c 100644 --- a/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/utils/FlinkStatementUtil.java +++ b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/utils/FlinkStatementUtil.java @@ -19,6 +19,7 @@ package org.dinky.cdc.utils; +import org.dinky.data.model.Column; import org.dinky.data.model.FlinkCDCConfig; import org.dinky.data.model.Table; import org.dinky.utils.SqlUtil; @@ -33,7 +34,7 @@ public class FlinkStatementUtil { private FlinkStatementUtil() {} - public static String getCDCInsertSql(Table table, String targetName, String sourceName) { + public static String getCDCInsertSql(Table table, String targetName, String sourceName, FlinkCDCConfig config) { StringBuilder sb = new StringBuilder("INSERT INTO "); sb.append("`").append(targetName).append("`"); sb.append(" SELECT\n"); @@ -42,8 +43,7 @@ public static String getCDCInsertSql(Table table, String targetName, String sour if (i > 0) { sb.append(","); } - sb.append(String.format("`%s`", table.getColumns().get(i).getName())) - .append(" \n"); + sb.append(getColumnProcessing(table.getColumns().get(i), config)).append(" \n"); } sb.append(" FROM `"); sb.append(sourceName); @@ -51,6 +51,18 @@ public static String getCDCInsertSql(Table table, String targetName, String sour return sb.toString(); } + public static String getColumnProcessing(Column column, FlinkCDCConfig config) { + String configType = config.getType(); + String columnType = column.getType(); + if (configType.contains("postgres-cdc") + && (columnType.contains("numeric") || columnType.contains("decimal")) + && column.getPrecision().intValue() > 38) { + return " CAST(" + column.getName() + " AS STRING) AS `" + column.getName() + "`"; + } else { + return String.format("`%s`", column.getName()); + } + } + public static String getFlinkDDL( Table table, String tableName, diff --git a/dinky-metadata/dinky-metadata-postgresql/src/main/java/org/dinky/metadata/convert/PostgreSqlTypeConvert.java b/dinky-metadata/dinky-metadata-postgresql/src/main/java/org/dinky/metadata/convert/PostgreSqlTypeConvert.java index 64315decca..4ed4621d07 100644 --- a/dinky-metadata/dinky-metadata-postgresql/src/main/java/org/dinky/metadata/convert/PostgreSqlTypeConvert.java +++ b/dinky-metadata/dinky-metadata-postgresql/src/main/java/org/dinky/metadata/convert/PostgreSqlTypeConvert.java @@ -20,6 +20,11 @@ package org.dinky.metadata.convert; import org.dinky.data.enums.ColumnType; +import org.dinky.data.model.Column; +import org.dinky.metadata.config.AbstractJdbcConfig; +import org.dinky.metadata.config.DriverConfig; + +import java.util.Optional; /** * PostgreSqlTypeConvert @@ -44,8 +49,8 @@ public PostgreSqlTypeConvert() { register("float4", ColumnType.FLOAT, ColumnType.JAVA_LANG_FLOAT); register("float8", ColumnType.DOUBLE, ColumnType.JAVA_LANG_DOUBLE); register("double precision", ColumnType.DOUBLE, ColumnType.JAVA_LANG_DOUBLE); - register("numeric", ColumnType.DECIMAL); - register("decimal", ColumnType.DECIMAL); + register("numeric", PostgreSqlTypeConvert::convertDecimalOrNumeric); + register("decimal", PostgreSqlTypeConvert::convertDecimalOrNumeric); register("boolean", ColumnType.BOOLEAN, ColumnType.JAVA_LANG_BOOLEAN); register("bool", ColumnType.BOOLEAN, ColumnType.JAVA_LANG_BOOLEAN); register("timestamp", ColumnType.TIMESTAMP); @@ -57,4 +62,14 @@ public PostgreSqlTypeConvert() { register("jsonb", ColumnType.STRING); register("json", ColumnType.STRING); } + + private static Optional convertDecimalOrNumeric( + Column column, DriverConfig driverConfig) { + // 该字段的精度 + int intValue = column.getPrecision().intValue(); + if (intValue > 38) { + return Optional.of(ColumnType.STRING); + } + return Optional.of(ColumnType.DECIMAL); + } }