From c4688ee17e9993c609758f5a641e479093320d9d Mon Sep 17 00:00:00 2001
From: kindbgen <153709372+kindbgen@users.noreply.github.com>
Date: Wed, 7 Feb 2024 10:49:03 +0800
Subject: [PATCH] [Optimization-3146][CDCSOURCE] Optimized CDCSOURCE from Mysql
to Doris, with support for light_schema_change (#3151)
---
dinky-assembly/src/main/assembly/package.xml | 8 +
.../org/dinky/cdc/debezium/DataBaseType.java | 54 +++++
.../cdc/debezium/DebeziumCustomConverter.java | 116 ++++++++++
.../converter/MysqlDebeziumConverter.java | 105 +++++++++
.../converter/OracleDebeziumConverter.java | 80 +++++++
.../converter/PostgresDebeziumConverter.java | 90 ++++++++
.../converter/SqlServerDebeziumConverter.java | 104 +++++++++
.../DorisSchemaEvolutionSinkBuilder.java | 16 +-
.../org/dinky/cdc/doris/DorisSinkOptions.java | 8 +
.../java/org/dinky/assertion/Asserts.java | 4 +
.../main/java/org/dinky/data/model/Table.java | 3 +
.../metadata/driver/AbstractJdbcDriver.java | 2 +
.../org/dinky/metadata/driver/Driver.java | 21 +-
.../org/dinky/metadata/enums/DriverType.java | 61 ++++++
.../metadata/driver/ClickHouseDriver.java | 3 +-
.../metadata/constant/DorisConstant.java | 11 +
.../org/dinky/metadata/convert/DorisType.java | 47 ++++
.../metadata/convert/source/MysqlType.java | 200 ++++++++++++++++++
.../metadata/convert/source/OracleType.java | 109 ++++++++++
.../metadata/convert/source/PostgresType.java | 156 ++++++++++++++
.../convert/source/SqlServerType.java | 110 ++++++++++
.../dinky/metadata/driver/DorisDriver.java | 113 +++++++++-
.../org/dinky/metadata/driver/HiveDriver.java | 3 +-
.../dinky/metadata/driver/MySqlDriver.java | 3 +-
.../dinky/metadata/driver/OracleDriver.java | 3 +-
.../dinky/metadata/driver/PhoenixDriver.java | 3 +-
.../metadata/driver/PostgreSqlDriver.java | 3 +-
.../dinky/metadata/driver/PrestoDriver.java | 3 +-
.../metadata/driver/SqlServerDriver.java | 3 +-
.../metadata/driver/StarRocksDriver.java | 3 +-
30 files changed, 1420 insertions(+), 25 deletions(-)
create mode 100644 dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/debezium/DataBaseType.java
create mode 100644 dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/debezium/DebeziumCustomConverter.java
create mode 100644 dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/debezium/converter/MysqlDebeziumConverter.java
create mode 100644 dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/debezium/converter/OracleDebeziumConverter.java
create mode 100644 dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/debezium/converter/PostgresDebeziumConverter.java
create mode 100644 dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/debezium/converter/SqlServerDebeziumConverter.java
create mode 100644 dinky-metadata/dinky-metadata-base/src/main/java/org/dinky/metadata/enums/DriverType.java
create mode 100644 dinky-metadata/dinky-metadata-doris/src/main/java/org/dinky/metadata/convert/DorisType.java
create mode 100644 dinky-metadata/dinky-metadata-doris/src/main/java/org/dinky/metadata/convert/source/MysqlType.java
create mode 100644 dinky-metadata/dinky-metadata-doris/src/main/java/org/dinky/metadata/convert/source/OracleType.java
create mode 100644 dinky-metadata/dinky-metadata-doris/src/main/java/org/dinky/metadata/convert/source/PostgresType.java
create mode 100644 dinky-metadata/dinky-metadata-doris/src/main/java/org/dinky/metadata/convert/source/SqlServerType.java
diff --git a/dinky-assembly/src/main/assembly/package.xml b/dinky-assembly/src/main/assembly/package.xml
index 3673874523..bc7fc05962 100644
--- a/dinky-assembly/src/main/assembly/package.xml
+++ b/dinky-assembly/src/main/assembly/package.xml
@@ -122,6 +122,14 @@
%regex[dinky-app-1.\d+-${project.version}-jar-with-dependencies.jar]
+
+ ${project.parent.basedir}/dinky-cdc/dinky-cdc-plus/target
+
+ lib
+
+ dinky-cdc-plus-${project.version}.jar
+
+
${project.parent.basedir}/dinky-metadata/dinky-metadata-clickhouse/target
diff --git a/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/debezium/DataBaseType.java b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/debezium/DataBaseType.java
new file mode 100644
index 0000000000..0db7be4808
--- /dev/null
+++ b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/debezium/DataBaseType.java
@@ -0,0 +1,54 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.dinky.cdc.debezium;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * @author Kindbgen
+ * @description 数据库类型
+ * @date 2024/2/6
+ */
+public enum DataBaseType {
+ MYSQL("mysql"),
+ SQLSERVER("sqlserver"),
+ ORACLE("oracle"),
+ POSTGRESQL("postgresql");
+
+ private String type;
+
+ DataBaseType(String type) {
+ this.type = type;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ private static final Map MAP =
+ Arrays.stream(values()).collect(Collectors.toMap(DataBaseType::getType, Function.identity()));
+
+ public static DataBaseType get(String type) {
+ return MAP.get(type);
+ }
+}
diff --git a/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/debezium/DebeziumCustomConverter.java b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/debezium/DebeziumCustomConverter.java
new file mode 100644
index 0000000000..0b09e27c58
--- /dev/null
+++ b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/debezium/DebeziumCustomConverter.java
@@ -0,0 +1,116 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.dinky.cdc.debezium;
+
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.util.Properties;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder;
+
+import io.debezium.spi.converter.CustomConverter;
+import io.debezium.spi.converter.RelationalColumn;
+
+/**
+ * 处理 Debezium 源库 时间转换的问题
+ * Debezium 默认将源库中 datetime 类型转成 UTC 的时间戳({@link io.debezium.time.Timestamp}),时区是写死的没法儿改,
+ * 导致数据库中设置的 UTC+8,到目标库中变成了多八个小时的 long 型时间戳
+ * Debezium 默认将源库中的 timestamp 类型转成UTC的字符串。
+ * 以下是 mysql 中时间字段类型和 debezium 中字段类型的对应关系:
+ * | mysql | mysql-binlog-connector | debezium |
+ * | ----------------------------------- | ---------------------------------------- | --------------------------------- |
+ * | date
(2021-01-28) | LocalDate
(2021-01-28) | Integer
(18655) |
+ * | time
(17:29:04) | Duration
(PT17H29M4S) | Long
(62944000000) |
+ * | timestamp
(2021-01-28 17:29:04) | ZonedDateTime
(2021-01-28T09:29:04Z) | String
(2021-01-28T09:29:04Z) |
+ * | Datetime
(2021-01-28 17:29:04) | LocalDateTime
(2021-01-28T17:29:04) | Long
(1611854944000) |
+ *
+ * @author Kindbgen
+ * @description 自定义 debezium 转换器
+ * @date 2024/2/6
+ * @see io.debezium.connector.mysql.converters.TinyIntOneToBooleanConverter
+ */
+public class DebeziumCustomConverter implements CustomConverter {
+
+ private static final Logger logger = LoggerFactory.getLogger(DebeziumCustomConverter.class);
+ protected static final String DATE_FORMAT = "yyyy-MM-dd";
+ protected static final String TIME_FORMAT = "HH:mm:ss";
+ protected static final String DATETIME_FORMAT = "yyyy-MM-dd HH:mm:ss";
+ protected DateTimeFormatter dateFormatter;
+ protected DateTimeFormatter timeFormatter;
+ protected DateTimeFormatter datetimeFormatter;
+ protected SchemaBuilder schemaBuilder;
+ protected String databaseType;
+ protected String schemaNamePrefix;
+ // 获取默认时区
+ protected final ZoneId zoneId = ZoneOffset.systemDefault();
+
+ @Override
+ public void configure(Properties properties) {
+ // 必填参数:database.type。获取数据库的类型,暂时支持mysql、sqlserver、oracle、postgresql
+ this.databaseType = properties.getProperty("database.type");
+ // 如果未设置,或者设置的不是mysql、sqlserver、oracle、postgresql,则抛出异常。
+ switch (DataBaseType.get(this.databaseType)) {
+ case MYSQL:
+ case SQLSERVER:
+ case ORACLE:
+ case POSTGRESQL:
+ break;
+ default:
+ String errMsg = "Not support " + databaseType + " database type";
+ logger.error(errMsg);
+ throw new UnsupportedOperationException(errMsg);
+ }
+ // 选填参数:format.date、format.time、format.datetime。获取时间格式化的格式
+ String dateFormat = properties.getProperty("format.date", DATE_FORMAT);
+ String timeFormat = properties.getProperty("format.time", TIME_FORMAT);
+ String datetimeFormat = properties.getProperty("format.datetime", DATETIME_FORMAT);
+ // 获取自身类的包名+数据库类型为默认schema.name
+ String className = this.getClass().getName();
+ // 查看是否设置schema.name.prefix
+ this.schemaNamePrefix = properties.getProperty("schema.name.prefix", className + "." + this.databaseType);
+ // 初始化时间格式化器
+ dateFormatter = DateTimeFormatter.ofPattern(dateFormat);
+ timeFormatter = DateTimeFormatter.ofPattern(timeFormat);
+ datetimeFormatter = DateTimeFormatter.ofPattern(datetimeFormat);
+ }
+
+ @Override
+ public void converterFor(
+ RelationalColumn relationalColumn, ConverterRegistration converterRegistration) {
+ schemaBuilder = null;
+ }
+
+ public String failConvert(Object value, String type) {
+ String valueClass = this.getClassName(value);
+ String valueString = valueClass == null ? null : value.toString();
+ return valueString;
+ }
+
+ public String getClassName(Object value) {
+ if (value == null) {
+ return null;
+ }
+ return value.getClass().getName();
+ }
+}
diff --git a/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/debezium/converter/MysqlDebeziumConverter.java b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/debezium/converter/MysqlDebeziumConverter.java
new file mode 100644
index 0000000000..2a363b30c2
--- /dev/null
+++ b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/debezium/converter/MysqlDebeziumConverter.java
@@ -0,0 +1,105 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.dinky.cdc.debezium.converter;
+
+import org.dinky.cdc.debezium.DebeziumCustomConverter;
+
+import java.time.Instant;
+import java.time.ZoneOffset;
+
+import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder;
+
+import io.debezium.spi.converter.RelationalColumn;
+
+/**
+ * @author Kindbgen
+ * @description Mysql 转换器
+ * @date 2024/2/6
+ */
+public class MysqlDebeziumConverter extends DebeziumCustomConverter {
+
+ @Override
+ public void converterFor(
+ RelationalColumn relationalColumn, ConverterRegistration converterRegistration) {
+ // 获取字段类型
+ String columnType = relationalColumn.typeName().toUpperCase();
+ this.registerConverter(columnType, converterRegistration);
+ }
+
+ public void registerConverter(String columnType, ConverterRegistration converterRegistration) {
+ String schemaName = this.schemaNamePrefix + "." + columnType.toLowerCase();
+ schemaBuilder = SchemaBuilder.string().name(schemaName);
+ switch (columnType) {
+ case "DATE":
+ converterRegistration.register(schemaBuilder, value -> {
+ if (value == null) {
+ return null;
+ } else if (value instanceof java.time.LocalDate) {
+ return dateFormatter.format((java.time.LocalDate) value);
+ } else if (value instanceof java.time.LocalDateTime) {
+ return datetimeFormatter.format((java.time.LocalDateTime) value);
+ } else {
+ return this.failConvert(value, schemaName);
+ }
+ });
+ break;
+ case "TIME":
+ converterRegistration.register(schemaBuilder, value -> {
+ if (value == null) {
+ return null;
+ } else if (value instanceof java.time.Duration) {
+ return timeFormatter.format(
+ java.time.LocalTime.ofNanoOfDay(((java.time.Duration) value).toNanos()));
+ } else if (value instanceof java.time.LocalDateTime) {
+ return datetimeFormatter.format((java.time.LocalDateTime) value);
+ } else {
+ return this.failConvert(value, schemaName);
+ }
+ });
+ break;
+ case "DATETIME":
+ case "TIMESTAMP":
+ converterRegistration.register(schemaBuilder, value -> {
+ if (value == null) {
+ return null;
+ } else if (value instanceof java.time.LocalDateTime) {
+ return datetimeFormatter.format((java.time.LocalDateTime) value);
+ } else if (value instanceof java.time.ZonedDateTime) {
+ return datetimeFormatter.format(((java.time.ZonedDateTime) value)
+ .withZoneSameInstant(zoneId)
+ .toLocalDateTime());
+ } else if (value instanceof java.sql.Timestamp) {
+ return datetimeFormatter.format(((java.sql.Timestamp) value).toLocalDateTime());
+ } else if (value instanceof String) {
+ // 初始化出现1970-01-01T00:00:00Zd的值,需要转换
+ Instant instant = Instant.parse((String) value);
+ java.time.LocalDateTime dateTime = java.time.LocalDateTime.ofInstant(instant, ZoneOffset.UTC);
+ return datetimeFormatter.format(dateTime);
+ } else {
+ return this.failConvert(value, schemaName);
+ }
+ });
+ break;
+ default:
+ schemaBuilder = null;
+ break;
+ }
+ }
+}
diff --git a/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/debezium/converter/OracleDebeziumConverter.java b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/debezium/converter/OracleDebeziumConverter.java
new file mode 100644
index 0000000000..84a30caab5
--- /dev/null
+++ b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/debezium/converter/OracleDebeziumConverter.java
@@ -0,0 +1,80 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.dinky.cdc.debezium.converter;
+
+import org.dinky.cdc.debezium.DebeziumCustomConverter;
+
+import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder;
+
+import io.debezium.spi.converter.RelationalColumn;
+
+/**
+ * @author Kindbgen
+ * @description Oracle 转换器
+ * @date 2024/2/6
+ */
+public class OracleDebeziumConverter extends DebeziumCustomConverter {
+
+ @Override
+ public void converterFor(
+ RelationalColumn relationalColumn, ConverterRegistration converterRegistration) {
+ // 获取字段类型
+ String columnType = relationalColumn.typeName().toUpperCase();
+ this.registerConverter(columnType, converterRegistration);
+ }
+
+ public void registerConverter(String columnType, ConverterRegistration converterRegistration) {
+ String schemaName = this.schemaNamePrefix + "." + columnType.toLowerCase();
+ schemaBuilder = SchemaBuilder.string().name(schemaName);
+ switch (columnType) {
+ case "DATE":
+ converterRegistration.register(schemaBuilder, value -> {
+ if (value == null) {
+ return null;
+ } else if (value instanceof java.sql.Date) {
+ return dateFormatter.format(((java.sql.Date) value).toLocalDate());
+ } else if (value instanceof java.time.LocalDate) {
+ return dateFormatter.format((java.time.LocalDate) value);
+ } else {
+ return this.failConvert(value, schemaName);
+ }
+ });
+ break;
+ case "TIMESTAMP":
+ case "TIMESTAMP WITH TIME ZONE":
+ case "TIMESTAMP WITH LOCAL TIME ZONE":
+ converterRegistration.register(schemaBuilder, value -> {
+ if (value == null) {
+ return null;
+ } else if (value instanceof java.sql.Timestamp) {
+ return datetimeFormatter.format(((java.sql.Timestamp) value).toLocalDateTime());
+ } else if (value instanceof java.time.LocalDateTime) {
+ return datetimeFormatter.format((java.time.LocalDateTime) value);
+ } else {
+ return this.failConvert(value, schemaName);
+ }
+ });
+ break;
+ default:
+ schemaBuilder = null;
+ break;
+ }
+ }
+}
diff --git a/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/debezium/converter/PostgresDebeziumConverter.java b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/debezium/converter/PostgresDebeziumConverter.java
new file mode 100644
index 0000000000..c888e593d9
--- /dev/null
+++ b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/debezium/converter/PostgresDebeziumConverter.java
@@ -0,0 +1,90 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.dinky.cdc.debezium.converter;
+
+import org.dinky.cdc.debezium.DebeziumCustomConverter;
+
+import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder;
+
+import io.debezium.spi.converter.RelationalColumn;
+
+/**
+ * @author Kindbgen
+ * @description PostgreSQL 转换器
+ * @date 2024/2/6
+ */
+public class PostgresDebeziumConverter extends DebeziumCustomConverter {
+ @Override
+ public void converterFor(
+ RelationalColumn relationalColumn, ConverterRegistration converterRegistration) {
+ // 获取字段类型
+ String columnType = relationalColumn.typeName().toUpperCase();
+ this.registerConverter(columnType, converterRegistration);
+ }
+
+ public void registerConverter(String columnType, ConverterRegistration converterRegistration) {
+ String schemaName = this.schemaNamePrefix + "." + columnType.toLowerCase();
+ schemaBuilder = SchemaBuilder.string().name(schemaName);
+ switch (columnType) {
+ case "DATE":
+ converterRegistration.register(schemaBuilder, value -> {
+ if (value == null) {
+ return null;
+ } else if (value instanceof java.sql.Date) {
+ return dateFormatter.format(((java.sql.Date) value).toLocalDate());
+ } else if (value instanceof java.time.LocalDate) {
+ return dateFormatter.format((java.time.LocalDate) value);
+ } else {
+ return this.failConvert(value, schemaName);
+ }
+ });
+ break;
+ case "TIME":
+ converterRegistration.register(schemaBuilder, value -> {
+ if (value == null) {
+ return null;
+ } else if (value instanceof java.sql.Time) {
+ return timeFormatter.format(((java.sql.Time) value).toLocalTime());
+ } else if (value instanceof java.time.LocalTime) {
+ return timeFormatter.format((java.time.LocalTime) value);
+ } else {
+ return this.failConvert(value, schemaName);
+ }
+ });
+ break;
+ case "TIMESTAMP":
+ converterRegistration.register(schemaBuilder, value -> {
+ if (value == null) {
+ return null;
+ } else if (value instanceof java.sql.Timestamp) {
+ return datetimeFormatter.format(((java.sql.Timestamp) value).toLocalDateTime());
+ } else if (value instanceof java.time.LocalDateTime) {
+ return datetimeFormatter.format((java.time.LocalDateTime) value);
+ } else {
+ return this.failConvert(value, schemaName);
+ }
+ });
+ break;
+ default:
+ schemaBuilder = null;
+ break;
+ }
+ }
+}
diff --git a/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/debezium/converter/SqlServerDebeziumConverter.java b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/debezium/converter/SqlServerDebeziumConverter.java
new file mode 100644
index 0000000000..8f980746e2
--- /dev/null
+++ b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/debezium/converter/SqlServerDebeziumConverter.java
@@ -0,0 +1,104 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.dinky.cdc.debezium.converter;
+
+import org.dinky.cdc.debezium.DebeziumCustomConverter;
+
+import java.time.ZoneOffset;
+
+import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder;
+
+import io.debezium.spi.converter.RelationalColumn;
+
+/**
+ * @author Kindbgen
+ * @description SqlServer 转换器
+ * @date 2024/2/6
+ */
+public class SqlServerDebeziumConverter extends DebeziumCustomConverter {
+ @Override
+ public void converterFor(
+ RelationalColumn relationalColumn, ConverterRegistration converterRegistration) {
+ // 获取字段类型
+ String columnType = relationalColumn.typeName().toUpperCase();
+ this.registerConverter(columnType, converterRegistration);
+ }
+
+ public void registerConverter(String columnType, ConverterRegistration converterRegistration) {
+ String schemaName = this.schemaNamePrefix + "." + columnType.toLowerCase();
+ schemaBuilder = SchemaBuilder.string().name(schemaName);
+ switch (columnType) {
+ case "DATE":
+ converterRegistration.register(schemaBuilder, value -> {
+ if (value == null) {
+ return null;
+ } else if (value instanceof java.sql.Date) {
+ return dateFormatter.format(((java.sql.Date) value).toLocalDate());
+ } else if (value instanceof java.time.LocalDateTime) {
+ return datetimeFormatter.format((java.time.LocalDateTime) value);
+ } else {
+ return this.failConvert(value, schemaName);
+ }
+ });
+ break;
+ case "TIME":
+ converterRegistration.register(schemaBuilder, value -> {
+ if (value == null) {
+ return null;
+ } else if (value instanceof java.sql.Time) {
+ return timeFormatter.format(((java.sql.Time) value).toLocalTime());
+ } else if (value instanceof java.sql.Timestamp) {
+ return timeFormatter.format(
+ ((java.sql.Timestamp) value).toLocalDateTime().toLocalTime());
+ } else if (value instanceof java.time.LocalDateTime) {
+ return datetimeFormatter.format((java.time.LocalDateTime) value);
+ } else {
+ return this.failConvert(value, schemaName);
+ }
+ });
+ break;
+ case "DATETIME":
+ case "DATETIME2":
+ case "SMALLDATETIME":
+ case "DATETIMEOFFSET":
+ converterRegistration.register(schemaBuilder, value -> {
+ if (value == null) {
+ return null;
+ } else if (value instanceof java.sql.Timestamp) {
+ return datetimeFormatter.format(((java.sql.Timestamp) value).toLocalDateTime());
+ } else if (value instanceof microsoft.sql.DateTimeOffset) {
+ microsoft.sql.DateTimeOffset dateTimeOffset = (microsoft.sql.DateTimeOffset) value;
+ return datetimeFormatter.format(dateTimeOffset
+ .getOffsetDateTime()
+ .withOffsetSameInstant(ZoneOffset.UTC)
+ .toLocalDateTime());
+ } else if (value instanceof java.time.LocalDateTime) {
+ return datetimeFormatter.format((java.time.LocalDateTime) value);
+ } else {
+ return this.failConvert(value, schemaName);
+ }
+ });
+ break;
+ default:
+ schemaBuilder = null;
+ break;
+ }
+ }
+}
diff --git a/dinky-cdc/dinky-cdc-plus/src/main/java/org/dinky/cdc/doris/DorisSchemaEvolutionSinkBuilder.java b/dinky-cdc/dinky-cdc-plus/src/main/java/org/dinky/cdc/doris/DorisSchemaEvolutionSinkBuilder.java
index 92c6182b74..a6d348441b 100644
--- a/dinky-cdc/dinky-cdc-plus/src/main/java/org/dinky/cdc/doris/DorisSchemaEvolutionSinkBuilder.java
+++ b/dinky-cdc/dinky-cdc-plus/src/main/java/org/dinky/cdc/doris/DorisSchemaEvolutionSinkBuilder.java
@@ -32,8 +32,7 @@
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.sink.DorisSink;
-import org.apache.doris.flink.sink.writer.JsonDebeziumSchemaSerializer;
-import org.apache.doris.flink.sink.writer.serializer.DorisRecordSerializer;
+import org.apache.doris.flink.sink.writer.serializer.JsonDebeziumSchemaSerializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -151,8 +150,8 @@ public void processElement(Map map, Context ctx, Collector out) throws E
getSinkSchemaName(table),
getSinkTableName(table)));
} else {
- executionBuilder.setLabelPrefix(String.format(
- "dinky-%s_%s%s", getSinkSchemaName(table), getSinkTableName(table), UUID.randomUUID()));
+ // flink-cdc-pipeline-connector-doris 3.0.0 以上版本内部已经拼接了 SchemaName + SinkTableName,并且约定 TableLabel 正则表达式如下 --> regex: ^[-_A-Za-z0-9]{1,128}$
+ executionBuilder.setLabelPrefix("dinky");
}
if (sink.containsKey(DorisSinkOptions.SINK_MAX_RETRIES.key())) {
@@ -161,11 +160,18 @@ public void processElement(Map map, Context ctx, Collector out) throws E
executionBuilder.setStreamLoadProp(properties).setDeletable(true);
+ JsonDebeziumSchemaSerializer.Builder jsonDebeziumSchemaSerializerBuilder = JsonDebeziumSchemaSerializer.builder();
+
+ // use new schema change
+ if (sink.containsKey(DorisSinkOptions.SINK_USE_NEW_SCHEMA_CHANGE.key())) {
+ jsonDebeziumSchemaSerializerBuilder.setNewSchemaChange(Boolean.valueOf(sink.get(DorisSinkOptions.SINK_USE_NEW_SCHEMA_CHANGE.key())));
+ }
+
DorisSink.Builder builder = DorisSink.builder();
builder.setDorisReadOptions(DorisReadOptions.builder().build())
.setDorisExecutionOptions(executionBuilder.build())
.setDorisOptions(dorisOptions)
- .setSerializer((DorisRecordSerializer) JsonDebeziumSchemaSerializer.builder()
+ .setSerializer(jsonDebeziumSchemaSerializerBuilder
.setDorisOptions(dorisOptions)
.build());
diff --git a/dinky-cdc/dinky-cdc-plus/src/main/java/org/dinky/cdc/doris/DorisSinkOptions.java b/dinky-cdc/dinky-cdc-plus/src/main/java/org/dinky/cdc/doris/DorisSinkOptions.java
index 5186797dc2..383e1d3b65 100644
--- a/dinky-cdc/dinky-cdc-plus/src/main/java/org/dinky/cdc/doris/DorisSinkOptions.java
+++ b/dinky-cdc/dinky-cdc-plus/src/main/java/org/dinky/cdc/doris/DorisSinkOptions.java
@@ -118,4 +118,12 @@ public class DorisSinkOptions {
.intType()
.defaultValue(1)
.withDescription("In the 2pc scenario, the number of retries after the commit phase fails.");
+
+ public static final ConfigOption SINK_USE_NEW_SCHEMA_CHANGE = ConfigOptions.key("sink.use-new-schema-change")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "supports table column name, column type, default, comment synchronization, supports multi-column changes, "
+ +"and supports column name rename. Need to be enabled by configuring use-new-schema-change.");
+
}
diff --git a/dinky-common/src/main/java/org/dinky/assertion/Asserts.java b/dinky-common/src/main/java/org/dinky/assertion/Asserts.java
index 0d8b2bd2ce..71dadd0c1c 100644
--- a/dinky-common/src/main/java/org/dinky/assertion/Asserts.java
+++ b/dinky-common/src/main/java/org/dinky/assertion/Asserts.java
@@ -116,4 +116,8 @@ public static void checkNullMap(Map, ?> map, String msg) {
throw new BusException(msg);
}
}
+
+ public static boolean isContainsString(String str1, String str2) {
+ return !isNullString(str1) && str1.contains(str2);
+ }
}
diff --git a/dinky-common/src/main/java/org/dinky/data/model/Table.java b/dinky-common/src/main/java/org/dinky/data/model/Table.java
index 1aba079b15..75741dfdcf 100644
--- a/dinky-common/src/main/java/org/dinky/data/model/Table.java
+++ b/dinky-common/src/main/java/org/dinky/data/model/Table.java
@@ -63,6 +63,9 @@ public class Table implements Serializable, Comparable, Cloneable {
private List columns;
+ /** 驱动类型, @see org.dinky.metadata.enums.DriverType */
+ private String driverType;
+
public Table() {}
public Table(String name, String schema, List columns) {
diff --git a/dinky-metadata/dinky-metadata-base/src/main/java/org/dinky/metadata/driver/AbstractJdbcDriver.java b/dinky-metadata/dinky-metadata-base/src/main/java/org/dinky/metadata/driver/AbstractJdbcDriver.java
index c8f6e62ca3..ffa0aaa14b 100644
--- a/dinky-metadata/dinky-metadata-base/src/main/java/org/dinky/metadata/driver/AbstractJdbcDriver.java
+++ b/dinky-metadata/dinky-metadata-base/src/main/java/org/dinky/metadata/driver/AbstractJdbcDriver.java
@@ -254,6 +254,7 @@ public List listTables(String schemaName) {
String tableName = results.getString(dbQuery.tableName());
if (Asserts.isNotNullString(tableName)) {
Table tableInfo = new Table();
+ tableInfo.setDriverType(getType());
tableInfo.setName(tableName);
if (columnList.contains(dbQuery.tableComment())) {
tableInfo.setComment(results.getString(dbQuery.tableComment()));
@@ -806,6 +807,7 @@ && contains(tableName, x.get(dbQuery.tableName())))
.stream()
.map(x -> {
Table tableInfo = new Table();
+ tableInfo.setDriverType(getType());
tableInfo.setName(getReValue(x.get(dbQuery.tableName()), splitConfig));
tableInfo.setComment(x.get(dbQuery.tableComment()));
tableInfo.setSchema(getReValue(x.get(dbQuery.schemaName()), splitConfig));
diff --git a/dinky-metadata/dinky-metadata-base/src/main/java/org/dinky/metadata/driver/Driver.java b/dinky-metadata/dinky-metadata-base/src/main/java/org/dinky/metadata/driver/Driver.java
index 504829d2f8..989ea7aee7 100644
--- a/dinky-metadata/dinky-metadata-base/src/main/java/org/dinky/metadata/driver/Driver.java
+++ b/dinky-metadata/dinky-metadata-base/src/main/java/org/dinky/metadata/driver/Driver.java
@@ -29,6 +29,7 @@
import org.dinky.data.result.SqlExplainResult;
import org.dinky.metadata.config.AbstractJdbcConfig;
import org.dinky.metadata.config.DriverConfig;
+import org.dinky.metadata.enums.DriverType;
import org.dinky.metadata.result.JdbcSelectResult;
import org.dinky.utils.JsonUtils;
@@ -110,25 +111,25 @@ static Driver getHealthDriver(String key) {
static Driver build(String connector, String url, String username, String password) {
String type = null;
- if (Asserts.isEqualsIgnoreCase(connector, "doris")) {
- type = "Doris";
+ if (Asserts.isContainsString(connector, "doris")) {
+ type = DriverType.DORIS.getValue();
} else if (Asserts.isEqualsIgnoreCase(connector, "starrocks")) {
- type = "StarRocks";
+ type = DriverType.STARROCKS.getValue();
} else if (Asserts.isEqualsIgnoreCase(connector, "clickhouse")) {
- type = "ClickHouse";
+ type = DriverType.CLICKHOUSE.getValue();
} else if (Asserts.isEqualsIgnoreCase(connector, "jdbc")) {
if (url.startsWith("jdbc:mysql")) {
- type = "MySQL";
+ type = DriverType.MYSQL.getValue();
} else if (url.startsWith("jdbc:postgresql")) {
- type = "PostgreSql";
+ type = DriverType.POSTGRESQL.getValue();
} else if (url.startsWith("jdbc:oracle")) {
- type = "Oracle";
+ type = DriverType.ORACLE.getValue();
} else if (url.startsWith("jdbc:sqlserver")) {
- type = "SQLServer";
+ type = DriverType.SQLSERVER.getValue();
} else if (url.startsWith("jdbc:phoenix")) {
- type = "Phoenix";
+ type = DriverType.PHOENIX.getValue();
} else if (url.startsWith("jdbc:pivotal")) {
- type = "Greenplum";
+ type = DriverType.GREENPLUM.getValue();
}
}
diff --git a/dinky-metadata/dinky-metadata-base/src/main/java/org/dinky/metadata/enums/DriverType.java b/dinky-metadata/dinky-metadata-base/src/main/java/org/dinky/metadata/enums/DriverType.java
new file mode 100644
index 0000000000..a0232e9159
--- /dev/null
+++ b/dinky-metadata/dinky-metadata-base/src/main/java/org/dinky/metadata/enums/DriverType.java
@@ -0,0 +1,61 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.dinky.metadata.enums;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * DriverType
+ *
+ * @since 2024/2/6
+ */
+public enum DriverType {
+ MYSQL("MySQL"),
+ ORACLE("Oracle"),
+ POSTGRESQL("PostgreSql"),
+ SQLSERVER("SQLServer"),
+ DORIS("Doris"),
+ STARROCKS("StarRocks"),
+ CLICKHOUSE("ClickHouse"),
+ PHOENIX("Phoenix"),
+ GREENPLUM("Greenplum"),
+ HIVE("Hive"),
+ PRESTO("Presto");
+
+ public final String value;
+
+ DriverType(String value) {
+ this.value = value;
+ }
+
+ public String getValue() {
+ return value;
+ }
+
+ private static final Map MAP =
+ Arrays.stream(values()).collect(Collectors.toMap(DriverType::getValue, Function.identity()));
+
+ public static DriverType get(String value) {
+ return MAP.get(value);
+ }
+}
diff --git a/dinky-metadata/dinky-metadata-clickhouse/src/main/java/org/dinky/metadata/driver/ClickHouseDriver.java b/dinky-metadata/dinky-metadata-clickhouse/src/main/java/org/dinky/metadata/driver/ClickHouseDriver.java
index 41239b39d9..b9d4a0de7b 100644
--- a/dinky-metadata/dinky-metadata-clickhouse/src/main/java/org/dinky/metadata/driver/ClickHouseDriver.java
+++ b/dinky-metadata/dinky-metadata-clickhouse/src/main/java/org/dinky/metadata/driver/ClickHouseDriver.java
@@ -27,6 +27,7 @@
import org.dinky.metadata.config.AbstractJdbcConfig;
import org.dinky.metadata.convert.ClickHouseTypeConvert;
import org.dinky.metadata.convert.ITypeConvert;
+import org.dinky.metadata.enums.DriverType;
import org.dinky.metadata.parser.Clickhouse20StatementParser;
import org.dinky.metadata.query.ClickHouseQuery;
import org.dinky.metadata.query.IDBQuery;
@@ -74,7 +75,7 @@ public ITypeConvert getTypeConvert() {
@Override
public String getType() {
- return "ClickHouse";
+ return DriverType.CLICKHOUSE.getValue();
}
@Override
diff --git a/dinky-metadata/dinky-metadata-doris/src/main/java/org/dinky/metadata/constant/DorisConstant.java b/dinky-metadata/dinky-metadata-doris/src/main/java/org/dinky/metadata/constant/DorisConstant.java
index 983d74daf6..b3dd12be91 100644
--- a/dinky-metadata/dinky-metadata-doris/src/main/java/org/dinky/metadata/constant/DorisConstant.java
+++ b/dinky-metadata/dinky-metadata-doris/src/main/java/org/dinky/metadata/constant/DorisConstant.java
@@ -38,4 +38,15 @@ public interface DorisConstant {
+ " TABLE_SCHEMA = '%s' ";
/** 查询指定schema.table下的所有列信息 */
String QUERY_COLUMNS_BY_TABLE_AND_SCHEMA = " show full columns from `%s`.`%s` ";
+
+ /** Max size of char type of Doris. */
+ int MAX_CHAR_SIZE = 255;
+
+ /** Max size of varchar type of Doris. */
+ int MAX_VARCHAR_SIZE = 65533;
+ /* Max precision of datetime type of Doris. */
+ int MAX_SUPPORTED_DATE_TIME_PRECISION = 6;
+
+ /* Max precision of timestamp type. */
+ int MAX_TIMESTAMP_PRECISION = 9;
}
diff --git a/dinky-metadata/dinky-metadata-doris/src/main/java/org/dinky/metadata/convert/DorisType.java b/dinky-metadata/dinky-metadata-doris/src/main/java/org/dinky/metadata/convert/DorisType.java
new file mode 100644
index 0000000000..313b03d395
--- /dev/null
+++ b/dinky-metadata/dinky-metadata-doris/src/main/java/org/dinky/metadata/convert/DorisType.java
@@ -0,0 +1,47 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.dinky.metadata.convert;
+
+public class DorisType {
+ public static final String BOOLEAN = "BOOLEAN";
+ public static final String TINYINT = "TINYINT";
+ public static final String SMALLINT = "SMALLINT";
+ public static final String INT = "INT";
+ public static final String BIGINT = "BIGINT";
+ public static final String LARGEINT = "LARGEINT";
+ public static final String FLOAT = "FLOAT";
+ public static final String DOUBLE = "DOUBLE";
+ public static final String DECIMAL = "DECIMAL";
+ public static final String DECIMAL_V3 = "DECIMALV3";
+ public static final String DATE = "DATE";
+ public static final String DATE_V2 = "DATEV2";
+ public static final String DATETIME = "DATETIME";
+ public static final String DATETIME_V2 = "DATETIMEV2";
+ public static final String CHAR = "CHAR";
+ public static final String VARCHAR = "VARCHAR";
+ public static final String STRING = "STRING";
+ public static final String HLL = "HLL";
+ public static final String BITMAP = "BITMAP";
+ public static final String ARRAY = "ARRAY";
+ public static final String JSONB = "JSONB";
+ public static final String JSON = "JSON";
+ public static final String MAP = "MAP";
+ public static final String STRUCT = "STRUCT";
+}
diff --git a/dinky-metadata/dinky-metadata-doris/src/main/java/org/dinky/metadata/convert/source/MysqlType.java b/dinky-metadata/dinky-metadata-doris/src/main/java/org/dinky/metadata/convert/source/MysqlType.java
new file mode 100644
index 0000000000..e4306b7578
--- /dev/null
+++ b/dinky-metadata/dinky-metadata-doris/src/main/java/org/dinky/metadata/convert/source/MysqlType.java
@@ -0,0 +1,200 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.dinky.metadata.convert.source;
+
+import org.dinky.assertion.Asserts;
+import org.dinky.metadata.constant.DorisConstant;
+import org.dinky.metadata.convert.DorisType;
+
+public class MysqlType {
+
+ // MySQL driver returns width of timestamp types instead of precision.
+ // 19 characters are used for zero-precision timestamps while others
+ // require 19 + precision + 1 characters with the additional character
+ // required for the decimal separator.
+ private static final int ZERO_PRECISION_TIMESTAMP_COLUMN_SIZE = 19;
+ private static final String BIT = "BIT";
+ private static final String BOOLEAN = "BOOLEAN";
+ private static final String BOOL = "BOOL";
+ private static final String TINYINT = "TINYINT";
+ private static final String TINYINT_UNSIGNED = "TINYINT UNSIGNED";
+ private static final String TINYINT_UNSIGNED_ZEROFILL = "TINYINT UNSIGNED ZEROFILL";
+ private static final String SMALLINT = "SMALLINT";
+ private static final String SMALLINT_UNSIGNED = "SMALLINT UNSIGNED";
+ private static final String SMALLINT_UNSIGNED_ZEROFILL = "SMALLINT UNSIGNED ZEROFILL";
+ private static final String MEDIUMINT = "MEDIUMINT";
+ private static final String MEDIUMINT_UNSIGNED = "MEDIUMINT UNSIGNED";
+ private static final String MEDIUMINT_UNSIGNED_ZEROFILL = "MEDIUMINT UNSIGNED ZEROFILL";
+ private static final String INT = "INT";
+ private static final String INT_UNSIGNED = "INT UNSIGNED";
+ private static final String INT_UNSIGNED_ZEROFILL = "INT UNSIGNED ZEROFILL";
+ private static final String BIGINT = "BIGINT";
+ private static final String SERIAL = "SERIAL";
+ private static final String BIGINT_UNSIGNED = "BIGINT UNSIGNED";
+ private static final String BIGINT_UNSIGNED_ZEROFILL = "BIGINT UNSIGNED ZEROFILL";
+ private static final String REAL = "REAL";
+ private static final String REAL_UNSIGNED = "REAL UNSIGNED";
+ private static final String REAL_UNSIGNED_ZEROFILL = "REAL UNSIGNED ZEROFILL";
+ private static final String FLOAT = "FLOAT";
+ private static final String FLOAT_UNSIGNED = "FLOAT UNSIGNED";
+ private static final String FLOAT_UNSIGNED_ZEROFILL = "FLOAT UNSIGNED ZEROFILL";
+ private static final String DOUBLE = "DOUBLE";
+ private static final String DOUBLE_UNSIGNED = "DOUBLE UNSIGNED";
+ private static final String DOUBLE_UNSIGNED_ZEROFILL = "DOUBLE UNSIGNED ZEROFILL";
+ private static final String DOUBLE_PRECISION = "DOUBLE PRECISION";
+ private static final String DOUBLE_PRECISION_UNSIGNED = "DOUBLE PRECISION UNSIGNED";
+ private static final String DOUBLE_PRECISION_UNSIGNED_ZEROFILL = "DOUBLE PRECISION UNSIGNED ZEROFILL";
+ private static final String NUMERIC = "NUMERIC";
+ private static final String NUMERIC_UNSIGNED = "NUMERIC UNSIGNED";
+ private static final String NUMERIC_UNSIGNED_ZEROFILL = "NUMERIC UNSIGNED ZEROFILL";
+ private static final String FIXED = "FIXED";
+ private static final String FIXED_UNSIGNED = "FIXED UNSIGNED";
+ private static final String FIXED_UNSIGNED_ZEROFILL = "FIXED UNSIGNED ZEROFILL";
+ private static final String DECIMAL = "DECIMAL";
+ private static final String DECIMAL_UNSIGNED = "DECIMAL UNSIGNED";
+ private static final String DECIMAL_UNSIGNED_ZEROFILL = "DECIMAL UNSIGNED ZEROFILL";
+ private static final String CHAR = "CHAR";
+ private static final String VARCHAR = "VARCHAR";
+ private static final String TINYTEXT = "TINYTEXT";
+ private static final String MEDIUMTEXT = "MEDIUMTEXT";
+ private static final String TEXT = "TEXT";
+ private static final String LONGTEXT = "LONGTEXT";
+ private static final String DATE = "DATE";
+ private static final String TIME = "TIME";
+ private static final String DATETIME = "DATETIME";
+ private static final String TIMESTAMP = "TIMESTAMP";
+ private static final String YEAR = "YEAR";
+ private static final String BINARY = "BINARY";
+ private static final String VARBINARY = "VARBINARY";
+ private static final String TINYBLOB = "TINYBLOB";
+ private static final String MEDIUMBLOB = "MEDIUMBLOB";
+ private static final String BLOB = "BLOB";
+ private static final String LONGBLOB = "LONGBLOB";
+ private static final String JSON = "JSON";
+ private static final String ENUM = "ENUM";
+ private static final String SET = "SET";
+
+ public static String toDorisType(String type, Integer length, Integer scale) {
+ switch (type.toUpperCase()) {
+ case BIT:
+ case BOOLEAN:
+ case BOOL:
+ return DorisType.BOOLEAN;
+ case TINYINT:
+ return DorisType.TINYINT;
+ case TINYINT_UNSIGNED:
+ case TINYINT_UNSIGNED_ZEROFILL:
+ case SMALLINT:
+ return DorisType.SMALLINT;
+ case SMALLINT_UNSIGNED:
+ case SMALLINT_UNSIGNED_ZEROFILL:
+ case INT:
+ case MEDIUMINT:
+ case YEAR:
+ return DorisType.INT;
+ case INT_UNSIGNED:
+ case INT_UNSIGNED_ZEROFILL:
+ case MEDIUMINT_UNSIGNED:
+ case MEDIUMINT_UNSIGNED_ZEROFILL:
+ case BIGINT:
+ return DorisType.BIGINT;
+ case BIGINT_UNSIGNED:
+ case BIGINT_UNSIGNED_ZEROFILL:
+ return DorisType.LARGEINT;
+ case FLOAT:
+ case FLOAT_UNSIGNED:
+ case FLOAT_UNSIGNED_ZEROFILL:
+ return DorisType.FLOAT;
+ case REAL:
+ case REAL_UNSIGNED:
+ case REAL_UNSIGNED_ZEROFILL:
+ case DOUBLE:
+ case DOUBLE_UNSIGNED:
+ case DOUBLE_UNSIGNED_ZEROFILL:
+ case DOUBLE_PRECISION:
+ case DOUBLE_PRECISION_UNSIGNED:
+ case DOUBLE_PRECISION_UNSIGNED_ZEROFILL:
+ return DorisType.DOUBLE;
+ case NUMERIC:
+ case NUMERIC_UNSIGNED:
+ case NUMERIC_UNSIGNED_ZEROFILL:
+ case FIXED:
+ case FIXED_UNSIGNED:
+ case FIXED_UNSIGNED_ZEROFILL:
+ case DECIMAL:
+ case DECIMAL_UNSIGNED:
+ case DECIMAL_UNSIGNED_ZEROFILL:
+ return length != null && length <= 38
+ ? String.format(
+ "%s(%s,%s)", DorisType.DECIMAL_V3, length, scale != null && scale >= 0 ? scale : 0)
+ : DorisType.STRING;
+ case DATE:
+ return DorisType.DATE_V2;
+ case DATETIME:
+ case TIMESTAMP:
+ // default precision is 0
+ // see https://dev.mysql.com/doc/refman/8.0/en/date-and-time-type-syntax.html
+ if (length == null || length <= 0 || length == ZERO_PRECISION_TIMESTAMP_COLUMN_SIZE) {
+ return String.format("%s(%s)", DorisType.DATETIME_V2, 0);
+ } else if (length > ZERO_PRECISION_TIMESTAMP_COLUMN_SIZE + 1) {
+ // Timestamp with a fraction of seconds.
+ // For example, 2024-01-01 01:01:01.1
+ // The decimal point will occupy 1 character.
+ // Thus,the length of the timestamp is 21.
+ return String.format(
+ "%s(%s)",
+ DorisType.DATETIME_V2,
+ Math.min(
+ length - ZERO_PRECISION_TIMESTAMP_COLUMN_SIZE - 1,
+ DorisConstant.MAX_SUPPORTED_DATE_TIME_PRECISION));
+ } else if (length <= DorisConstant.MAX_TIMESTAMP_PRECISION) {
+ // For Debezium JSON data, the timestamp/datetime length ranges from 0 to 9.
+ return String.format(
+ "%s(%s)",
+ DorisType.DATETIME_V2, Math.min(length, DorisConstant.MAX_SUPPORTED_DATE_TIME_PRECISION));
+ } else {
+ throw new UnsupportedOperationException(
+ "Unsupported length: " + length + " for MySQL TIMESTAMP/DATETIME types");
+ }
+ case CHAR:
+ case VARCHAR:
+ Asserts.checkNotNull(length, "VARCHAR length is null");
+ return length * 3 > 65533 ? DorisType.STRING : String.format("%s(%s)", DorisType.VARCHAR, length * 3);
+ case TINYTEXT:
+ case TEXT:
+ case MEDIUMTEXT:
+ case LONGTEXT:
+ case ENUM:
+ case TIME:
+ case TINYBLOB:
+ case BLOB:
+ case MEDIUMBLOB:
+ case LONGBLOB:
+ case BINARY:
+ case VARBINARY:
+ case SET:
+ return DorisType.STRING;
+ case JSON:
+ return DorisType.JSONB;
+ default:
+ throw new UnsupportedOperationException("Unsupported MySQL Type: " + type);
+ }
+ }
+}
diff --git a/dinky-metadata/dinky-metadata-doris/src/main/java/org/dinky/metadata/convert/source/OracleType.java b/dinky-metadata/dinky-metadata-doris/src/main/java/org/dinky/metadata/convert/source/OracleType.java
new file mode 100644
index 0000000000..7168f1eefa
--- /dev/null
+++ b/dinky-metadata/dinky-metadata-doris/src/main/java/org/dinky/metadata/convert/source/OracleType.java
@@ -0,0 +1,109 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.dinky.metadata.convert.source;
+
+import org.dinky.assertion.Asserts;
+import org.dinky.metadata.convert.DorisType;
+
+public class OracleType {
+ private static final String VARCHAR2 = "VARCHAR2";
+ private static final String NVARCHAR2 = "NVARCHAR2";
+ private static final String NUMBER = "NUMBER";
+ private static final String FLOAT = "FLOAT";
+ private static final String LONG = "LONG";
+ private static final String DATE = "DATE";
+ private static final String BINARY_FLOAT = "BINARY_FLOAT";
+ private static final String BINARY_DOUBLE = "BINARY_DOUBLE";
+ private static final String TIMESTAMP = "TIMESTAMP";
+ private static final String INTERVAL = "INTERVAL";
+ private static final String RAW = "RAW";
+ private static final String LONG_RAW = "LONG RAW";
+ private static final String ROWID = "ROWID";
+ private static final String UROWID = "UROWID";
+ private static final String CHAR = "CHAR";
+ private static final String NCHAR = "NCHAR";
+ private static final String CLOB = "CLOB";
+ private static final String NCLOB = "NCLOB";
+ private static final String BLOB = "BLOB";
+ private static final String BFILE = "BFILE";
+
+ public static String toDorisType(String oracleType, Integer precision, Integer scale) {
+ oracleType = oracleType.toUpperCase();
+ if (oracleType.startsWith(INTERVAL)) {
+ oracleType = oracleType.substring(0, 8);
+ } else if (oracleType.startsWith(TIMESTAMP)) {
+ return String.format("%s(%s)", DorisType.DATETIME_V2, Math.min(scale, 6));
+ }
+ switch (oracleType) {
+ case NUMBER:
+ if (scale <= 0) {
+ precision -= scale;
+ if (precision < 3) {
+ return DorisType.TINYINT;
+ } else if (precision < 5) {
+ return DorisType.SMALLINT;
+ } else if (precision < 10) {
+ return DorisType.INT;
+ } else if (precision < 19) {
+ return DorisType.BIGINT;
+ } else if (precision < 39) {
+ // LARGEINT supports up to 38 numbers.
+ return DorisType.LARGEINT;
+ } else {
+ return DorisType.STRING;
+ }
+ }
+ // scale > 0
+ if (precision < scale) {
+ precision = scale;
+ }
+ return precision != null && precision <= 38
+ ? String.format(
+ "%s(%s,%s)", DorisType.DECIMAL_V3, precision, scale != null && scale >= 0 ? scale : 0)
+ : DorisType.STRING;
+ case FLOAT:
+ return DorisType.DOUBLE;
+ case DATE:
+ // can save date and time with second precision
+ return DorisType.DATETIME_V2;
+ case CHAR:
+ case VARCHAR2:
+ case NCHAR:
+ case NVARCHAR2:
+ Asserts.checkNotNull(precision, "NVARCHAR2 precision is null");
+ return precision * 3 > 65533
+ ? DorisType.STRING
+ : String.format("%s(%s)", DorisType.VARCHAR, precision * 3);
+ case LONG:
+ case RAW:
+ case LONG_RAW:
+ case INTERVAL:
+ case BLOB:
+ case CLOB:
+ case NCLOB:
+ return DorisType.STRING;
+ case BFILE:
+ case BINARY_FLOAT:
+ case BINARY_DOUBLE:
+ default:
+ throw new UnsupportedOperationException("Unsupported Oracle Type: " + oracleType);
+ }
+ }
+}
diff --git a/dinky-metadata/dinky-metadata-doris/src/main/java/org/dinky/metadata/convert/source/PostgresType.java b/dinky-metadata/dinky-metadata-doris/src/main/java/org/dinky/metadata/convert/source/PostgresType.java
new file mode 100644
index 0000000000..c5c72ea0c2
--- /dev/null
+++ b/dinky-metadata/dinky-metadata-doris/src/main/java/org/dinky/metadata/convert/source/PostgresType.java
@@ -0,0 +1,156 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.dinky.metadata.convert.source;
+
+import org.dinky.assertion.Asserts;
+import org.dinky.metadata.convert.DorisType;
+
+public class PostgresType {
+ private static final String INT2 = "int2";
+ private static final String SMALLSERIAL = "smallserial";
+ private static final String INT4 = "int4";
+ private static final String SERIAL = "serial";
+ private static final String INT8 = "int8";
+ private static final String BIGSERIAL = "bigserial";
+ private static final String NUMERIC = "numeric";
+ private static final String FLOAT4 = "float4";
+ private static final String FLOAT8 = "float8";
+ private static final String BPCHAR = "bpchar";
+ private static final String TIMESTAMP = "timestamp";
+ private static final String TIMESTAMPTZ = "timestamptz";
+ private static final String DATE = "date";
+ private static final String BOOL = "bool";
+ private static final String BIT = "bit";
+ private static final String POINT = "point";
+ private static final String LINE = "line";
+ private static final String LSEG = "lseg";
+ private static final String BOX = "box";
+ private static final String PATH = "path";
+ private static final String POLYGON = "polygon";
+ private static final String CIRCLE = "circle";
+ private static final String VARCHAR = "varchar";
+ private static final String TEXT = "text";
+ private static final String TIME = "time";
+ private static final String TIMETZ = "timetz";
+ private static final String INTERVAL = "interval";
+ private static final String CIDR = "cidr";
+ private static final String INET = "inet";
+ private static final String MACADDR = "macaddr";
+ private static final String VARBIT = "varbit";
+ private static final String UUID = "uuid";
+ private static final String BYTEA = "bytea";
+ private static final String JSON = "json";
+ private static final String JSONB = "jsonb";
+ private static final String _INT2 = "_int2";
+ private static final String _INT4 = "_int4";
+ private static final String _INT8 = "_int8";
+ private static final String _FLOAT4 = "_float4";
+ private static final String _FLOAT8 = "_float8";
+ private static final String _DATE = "_date";
+ private static final String _TIMESTAMP = "_timestamp";
+ private static final String _BOOL = "_bool";
+ private static final String _TEXT = "_text";
+
+ public static String toDorisType(String postgresType, Integer precision, Integer scale) {
+ postgresType = postgresType.toLowerCase();
+ if (postgresType.startsWith("_")) {
+ return DorisType.STRING;
+ }
+ switch (postgresType) {
+ case INT2:
+ case SMALLSERIAL:
+ return DorisType.TINYINT;
+ case INT4:
+ case SERIAL:
+ return DorisType.INT;
+ case INT8:
+ case BIGSERIAL:
+ return DorisType.BIGINT;
+ case NUMERIC:
+ return precision != null && precision > 0 && precision <= 38
+ ? String.format(
+ "%s(%s,%s)", DorisType.DECIMAL_V3, precision, scale != null && scale >= 0 ? scale : 0)
+ : DorisType.STRING;
+ case FLOAT4:
+ return DorisType.FLOAT;
+ case FLOAT8:
+ return DorisType.DOUBLE;
+ case TIMESTAMP:
+ case TIMESTAMPTZ:
+ return String.format("%s(%s)", DorisType.DATETIME_V2, Math.min(scale == null ? 0 : scale, 6));
+ case DATE:
+ return DorisType.DATE_V2;
+ case BOOL:
+ return DorisType.BOOLEAN;
+ case BIT:
+ return precision == 1 ? DorisType.BOOLEAN : DorisType.STRING;
+ case BPCHAR:
+ case VARCHAR:
+ Asserts.checkNotNull(precision, "VARCHAR precision is null");
+ return precision * 3 > 65533
+ ? DorisType.STRING
+ : String.format("%s(%s)", DorisType.VARCHAR, precision * 3);
+ case POINT:
+ case LINE:
+ case LSEG:
+ case BOX:
+ case PATH:
+ case POLYGON:
+ case CIRCLE:
+ case TEXT:
+ case TIME:
+ case TIMETZ:
+ case INTERVAL:
+ case CIDR:
+ case INET:
+ case MACADDR:
+ case VARBIT:
+ case UUID:
+ case BYTEA:
+ return DorisType.STRING;
+ case JSON:
+ case JSONB:
+ return DorisType.JSONB;
+ /* Compatible with doris1.2 array type can only be used in dup table,
+ and then converted to array in the next version
+ case _BOOL:
+ return String.format("%s<%s>", DorisType.ARRAY, DorisType.BOOLEAN);
+ case _INT2:
+ return String.format("%s<%s>", DorisType.ARRAY, DorisType.TINYINT);
+ case _INT4:
+ return String.format("%s<%s>", DorisType.ARRAY, DorisType.INT);
+ case _INT8:
+ return String.format("%s<%s>", DorisType.ARRAY, DorisType.BIGINT);
+ case _FLOAT4:
+ return String.format("%s<%s>", DorisType.ARRAY, DorisType.FLOAT);
+ case _FLOAT8:
+ return String.format("%s<%s>", DorisType.ARRAY, DorisType.DOUBLE);
+ case _TEXT:
+ return String.format("%s<%s>", DorisType.ARRAY, DorisType.STRING);
+ case _DATE:
+ return String.format("%s<%s>", DorisType.ARRAY, DorisType.DATE_V2);
+ case _TIMESTAMP:
+ return String.format("%s<%s>", DorisType.ARRAY, DorisType.DATETIME_V2);
+ **/
+ default:
+ throw new UnsupportedOperationException("Unsupported Postgres Type: " + postgresType);
+ }
+ }
+}
diff --git a/dinky-metadata/dinky-metadata-doris/src/main/java/org/dinky/metadata/convert/source/SqlServerType.java b/dinky-metadata/dinky-metadata-doris/src/main/java/org/dinky/metadata/convert/source/SqlServerType.java
new file mode 100644
index 0000000000..1b49752ffa
--- /dev/null
+++ b/dinky-metadata/dinky-metadata-doris/src/main/java/org/dinky/metadata/convert/source/SqlServerType.java
@@ -0,0 +1,110 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.dinky.metadata.convert.source;
+
+import org.dinky.metadata.convert.DorisType;
+
+public class SqlServerType {
+ private static final String BIT = "bit";
+ private static final String TINYINT = "tinyint";
+ private static final String SMALLINT = "smallint";
+ private static final String INT = "int";
+ private static final String BIGINT = "bigint";
+ private static final String REAL = "real";
+ private static final String FLOAT = "float";
+ private static final String MONEY = "money";
+ private static final String SMALLMONEY = "smallmoney";
+ private static final String DECIMAL = "decimal";
+ private static final String NUMERIC = "numeric";
+ private static final String DATE = "date";
+ private static final String DATETIME = "datetime";
+ private static final String DATETIME2 = "datetime2";
+ private static final String SMALLDATETIME = "smalldatetime";
+ private static final String CHAR = "char";
+ private static final String VARCHAR = "varchar";
+ private static final String NCHAR = "nchar";
+ private static final String NVARCHAR = "nvarchar";
+ private static final String TEXT = "text";
+ private static final String NTEXT = "ntext";
+ private static final String UNIQUEIDENTIFIER = "uniqueidentifier";
+ private static final String TIME = "time";
+ private static final String TIMESTAMP = "timestamp";
+ private static final String DATETIMEOFFSET = "datetimeoffset";
+ private static final String IMAGE = "image";
+ private static final String BINARY = "binary";
+ private static final String VARBINARY = "varbinary";
+
+ public static String toDorisType(String originSqlServerType, Integer precision, Integer scale) {
+ originSqlServerType = originSqlServerType.toLowerCase();
+ // For sqlserver IDENTITY type, such as 'INT IDENTITY'
+ // originSqlServerType is "int identity", so we only get "int".
+ String sqlServerType = originSqlServerType.split(" ")[0];
+ switch (sqlServerType) {
+ case BIT:
+ return DorisType.BOOLEAN;
+ case TINYINT:
+ return DorisType.TINYINT;
+ case SMALLINT:
+ return DorisType.SMALLINT;
+ case INT:
+ return DorisType.INT;
+ case BIGINT:
+ return DorisType.BIGINT;
+ case REAL:
+ return DorisType.FLOAT;
+ case FLOAT:
+ return DorisType.DOUBLE;
+ case MONEY:
+ return String.format("%s(%s,%s)", DorisType.DECIMAL_V3, 19, 4);
+ case SMALLMONEY:
+ return String.format("%s(%s,%s)", DorisType.DECIMAL_V3, 10, 4);
+ case DECIMAL:
+ case NUMERIC:
+ return precision != null && precision > 0 && precision <= 38
+ ? String.format(
+ "%s(%s,%s)", DorisType.DECIMAL_V3, precision, scale != null && scale >= 0 ? scale : 0)
+ : DorisType.STRING;
+ case DATE:
+ return DorisType.DATE_V2;
+ case DATETIME:
+ case DATETIME2:
+ case SMALLDATETIME:
+ return String.format("%s(%s)", DorisType.DATETIME_V2, Math.min(scale == null ? 0 : scale, 6));
+ case CHAR:
+ case VARCHAR:
+ case NCHAR:
+ case NVARCHAR:
+ return precision * 3 > 65533
+ ? DorisType.STRING
+ : String.format("%s(%s)", DorisType.VARCHAR, precision * 3);
+ case TEXT:
+ case NTEXT:
+ case TIME:
+ case DATETIMEOFFSET:
+ case TIMESTAMP:
+ case UNIQUEIDENTIFIER:
+ case BINARY:
+ case VARBINARY:
+ return DorisType.STRING;
+ default:
+ throw new UnsupportedOperationException("Unsupported SqlServer Type: " + sqlServerType);
+ }
+ }
+}
diff --git a/dinky-metadata/dinky-metadata-doris/src/main/java/org/dinky/metadata/driver/DorisDriver.java b/dinky-metadata/dinky-metadata-doris/src/main/java/org/dinky/metadata/driver/DorisDriver.java
index 81cb8533ba..5920c6b9da 100644
--- a/dinky-metadata/dinky-metadata-doris/src/main/java/org/dinky/metadata/driver/DorisDriver.java
+++ b/dinky-metadata/dinky-metadata-doris/src/main/java/org/dinky/metadata/driver/DorisDriver.java
@@ -19,19 +19,29 @@
package org.dinky.metadata.driver;
+import org.dinky.assertion.Asserts;
+import org.dinky.data.model.Column;
+import org.dinky.data.model.Table;
import org.dinky.metadata.config.AbstractJdbcConfig;
import org.dinky.metadata.convert.DorisTypeConvert;
import org.dinky.metadata.convert.ITypeConvert;
+import org.dinky.metadata.convert.source.MysqlType;
+import org.dinky.metadata.convert.source.OracleType;
+import org.dinky.metadata.convert.source.PostgresType;
+import org.dinky.metadata.convert.source.SqlServerType;
+import org.dinky.metadata.enums.DriverType;
import org.dinky.metadata.query.DorisQuery;
import org.dinky.metadata.query.IDBQuery;
import org.dinky.metadata.result.JdbcSelectResult;
import org.dinky.utils.LogUtil;
import org.dinky.utils.SqlUtil;
+import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
import cn.hutool.core.text.CharSequenceUtil;
import lombok.extern.slf4j.Slf4j;
@@ -56,7 +66,7 @@ String getDriverClass() {
@Override
public String getType() {
- return "Doris";
+ return DriverType.DORIS.getValue();
}
@Override
@@ -122,4 +132,105 @@ public Map getFlinkColumnTypeConversion() {
map.put("DATETIME", "TIMESTAMP");
return map;
}
+
+ @Override
+ public String generateCreateTableSql(Table table) {
+ String genTableSql = genTable(table);
+ log.info("Auto generateCreateTableSql {}", genTableSql);
+ return genTableSql;
+ }
+
+ @Override
+ public String getCreateTableSql(Table table) {
+ return genTable(table);
+ }
+
+ private String genTable(Table table) {
+ String columnStrs = table.getColumns().stream()
+ .map(column -> {
+ return generateColumnSql(column, table.getDriverType());
+ })
+ .collect(Collectors.joining(",\n"));
+
+ List columnKeys = table.getColumns().stream()
+ .filter(Column::isKeyFlag)
+ .map(Column::getName)
+ .map(t -> String.format("`%s`", t))
+ .collect(Collectors.toList());
+
+ String primaryKeyStr = columnKeys.isEmpty()
+ ? ""
+ : columnKeys.stream().collect(Collectors.joining(",", "\n UNIQUE KEY (", ")"));
+
+ // 默认开启 BUCKETS AUTO
+ String distributeKeyStr = columnKeys.isEmpty()
+ ? ""
+ : columnKeys.stream().collect(Collectors.joining(",", "\n DISTRIBUTED BY HASH (", ") BUCKETS AUTO"));
+
+ // 默认开启light_schema_change
+ String propertiesStr = "\n PROPERTIES ( \"light_schema_change\" = \"true\" )";
+
+ String commentStr =
+ Asserts.isNullString(table.getComment()) ? "" : String.format("\n COMMENT '%s'", table.getComment());
+
+ return MessageFormat.format(
+ "CREATE TABLE IF NOT EXISTS `{0}`.`{1}` (\n{2}\n) ENGINE=OLAP{3}{4}{5}{6}",
+ table.getSchema(),
+ table.getName(),
+ columnStrs,
+ primaryKeyStr,
+ commentStr,
+ distributeKeyStr,
+ propertiesStr);
+ }
+
+ private String generateColumnSql(Column column, String driverType) {
+ String columnType = column.getType();
+ int length = Asserts.isNull(column.getLength()) ? 0 : column.getLength();
+ int scale = Asserts.isNull(column.getScale()) ? 0 : column.getScale();
+ DriverType sourceConnector = DriverType.get(driverType);
+ switch (sourceConnector) {
+ case MYSQL:
+ columnType = MysqlType.toDorisType(column.getType(), length, scale);
+ break;
+ case ORACLE:
+ columnType = OracleType.toDorisType(column.getType(), length, scale);
+ break;
+ case POSTGRESQL:
+ columnType = PostgresType.toDorisType(column.getType(), length, scale);
+ break;
+ case SQLSERVER:
+ columnType = SqlServerType.toDorisType(column.getType(), length, scale);
+ break;
+ default:
+ String errMsg = "Not support " + driverType + " schema change.";
+ throw new UnsupportedOperationException(errMsg);
+ }
+
+ String dv = column.getDefaultValue();
+ String defaultValue = Asserts.isNotNull(dv)
+ ? String.format(" DEFAULT %s", quoteDefaultValue(dv))
+ : String.format("%s NULL ", !column.isNullable() ? " NOT " : "");
+
+ return String.format(
+ " `%s` %s%s%s%s",
+ column.getName(),
+ columnType,
+ defaultValue,
+ column.isAutoIncrement() ? " AUTO_INCREMENT " : "",
+ Asserts.isNotNullString(column.getComment())
+ ? String.format(" COMMENT '%s'", column.getComment())
+ : "");
+ }
+
+ private String quoteDefaultValue(String defaultValue) {
+ // DEFAULT current_timestamp not need quote
+ if (defaultValue.toLowerCase().contains("current_timestamp")) {
+ return "CURRENT_TIMESTAMP";
+ }
+ if (defaultValue.isEmpty()) {
+ return "''";
+ }
+ return "'" + defaultValue + "'";
+ }
}
diff --git a/dinky-metadata/dinky-metadata-hive/src/main/java/org/dinky/metadata/driver/HiveDriver.java b/dinky-metadata/dinky-metadata-hive/src/main/java/org/dinky/metadata/driver/HiveDriver.java
index 04dc5a3fb1..9946f9a3de 100644
--- a/dinky-metadata/dinky-metadata-hive/src/main/java/org/dinky/metadata/driver/HiveDriver.java
+++ b/dinky-metadata/dinky-metadata-hive/src/main/java/org/dinky/metadata/driver/HiveDriver.java
@@ -27,6 +27,7 @@
import org.dinky.metadata.constant.HiveConstant;
import org.dinky.metadata.convert.HiveTypeConvert;
import org.dinky.metadata.convert.ITypeConvert;
+import org.dinky.metadata.enums.DriverType;
import org.dinky.metadata.query.HiveQuery;
import org.dinky.metadata.query.IDBQuery;
import org.dinky.metadata.result.JdbcSelectResult;
@@ -299,7 +300,7 @@ String getDriverClass() {
@Override
public String getType() {
- return "Hive";
+ return DriverType.HIVE.getValue();
}
@Override
diff --git a/dinky-metadata/dinky-metadata-mysql/src/main/java/org/dinky/metadata/driver/MySqlDriver.java b/dinky-metadata/dinky-metadata-mysql/src/main/java/org/dinky/metadata/driver/MySqlDriver.java
index adc1d963a3..759224c952 100644
--- a/dinky-metadata/dinky-metadata-mysql/src/main/java/org/dinky/metadata/driver/MySqlDriver.java
+++ b/dinky-metadata/dinky-metadata-mysql/src/main/java/org/dinky/metadata/driver/MySqlDriver.java
@@ -26,6 +26,7 @@
import org.dinky.metadata.config.AbstractJdbcConfig;
import org.dinky.metadata.convert.ITypeConvert;
import org.dinky.metadata.convert.MySqlTypeConvert;
+import org.dinky.metadata.enums.DriverType;
import org.dinky.metadata.query.IDBQuery;
import org.dinky.metadata.query.MySqlQuery;
import org.dinky.utils.TextUtil;
@@ -58,7 +59,7 @@ public ITypeConvert getTypeConvert() {
@Override
public String getType() {
- return "MySql";
+ return DriverType.MYSQL.getValue();
}
@Override
diff --git a/dinky-metadata/dinky-metadata-oracle/src/main/java/org/dinky/metadata/driver/OracleDriver.java b/dinky-metadata/dinky-metadata-oracle/src/main/java/org/dinky/metadata/driver/OracleDriver.java
index 6b963b3925..4bb897d745 100644
--- a/dinky-metadata/dinky-metadata-oracle/src/main/java/org/dinky/metadata/driver/OracleDriver.java
+++ b/dinky-metadata/dinky-metadata-oracle/src/main/java/org/dinky/metadata/driver/OracleDriver.java
@@ -26,6 +26,7 @@
import org.dinky.metadata.config.AbstractJdbcConfig;
import org.dinky.metadata.convert.ITypeConvert;
import org.dinky.metadata.convert.OracleTypeConvert;
+import org.dinky.metadata.enums.DriverType;
import org.dinky.metadata.query.IDBQuery;
import org.dinky.metadata.query.OracleQuery;
@@ -66,7 +67,7 @@ public ITypeConvert getTypeConvert() {
@Override
public String getType() {
- return "Oracle";
+ return DriverType.ORACLE.getValue();
}
@Override
diff --git a/dinky-metadata/dinky-metadata-phoenix/src/main/java/org/dinky/metadata/driver/PhoenixDriver.java b/dinky-metadata/dinky-metadata-phoenix/src/main/java/org/dinky/metadata/driver/PhoenixDriver.java
index 0676f1e991..f138d00052 100644
--- a/dinky-metadata/dinky-metadata-phoenix/src/main/java/org/dinky/metadata/driver/PhoenixDriver.java
+++ b/dinky-metadata/dinky-metadata-phoenix/src/main/java/org/dinky/metadata/driver/PhoenixDriver.java
@@ -26,6 +26,7 @@
import org.dinky.metadata.constant.PhoenixConstant;
import org.dinky.metadata.convert.ITypeConvert;
import org.dinky.metadata.convert.PhoenixTypeConvert;
+import org.dinky.metadata.enums.DriverType;
import org.dinky.metadata.query.IDBQuery;
import org.dinky.metadata.query.PhoenixQuery;
import org.dinky.metadata.result.JdbcSelectResult;
@@ -57,7 +58,7 @@ String getDriverClass() {
@Override
public String getType() {
- return "Phoenix";
+ return DriverType.PHOENIX.getValue();
}
/** sql拼接,目前还未实现limit方法 */
diff --git a/dinky-metadata/dinky-metadata-postgresql/src/main/java/org/dinky/metadata/driver/PostgreSqlDriver.java b/dinky-metadata/dinky-metadata-postgresql/src/main/java/org/dinky/metadata/driver/PostgreSqlDriver.java
index 4b4687a1af..fcdbb7bb65 100644
--- a/dinky-metadata/dinky-metadata-postgresql/src/main/java/org/dinky/metadata/driver/PostgreSqlDriver.java
+++ b/dinky-metadata/dinky-metadata-postgresql/src/main/java/org/dinky/metadata/driver/PostgreSqlDriver.java
@@ -26,6 +26,7 @@
import org.dinky.metadata.config.AbstractJdbcConfig;
import org.dinky.metadata.convert.ITypeConvert;
import org.dinky.metadata.convert.PostgreSqlTypeConvert;
+import org.dinky.metadata.enums.DriverType;
import org.dinky.metadata.query.IDBQuery;
import org.dinky.metadata.query.PostgreSqlQuery;
import org.dinky.utils.TextUtil;
@@ -59,7 +60,7 @@ public ITypeConvert getTypeConvert() {
@Override
public String getType() {
- return "PostgreSql";
+ return DriverType.POSTGRESQL.getValue();
}
@Override
diff --git a/dinky-metadata/dinky-metadata-presto/src/main/java/org/dinky/metadata/driver/PrestoDriver.java b/dinky-metadata/dinky-metadata-presto/src/main/java/org/dinky/metadata/driver/PrestoDriver.java
index c376233b10..f8a5498132 100644
--- a/dinky-metadata/dinky-metadata-presto/src/main/java/org/dinky/metadata/driver/PrestoDriver.java
+++ b/dinky-metadata/dinky-metadata-presto/src/main/java/org/dinky/metadata/driver/PrestoDriver.java
@@ -28,6 +28,7 @@
import org.dinky.metadata.constant.PrestoConstant;
import org.dinky.metadata.convert.ITypeConvert;
import org.dinky.metadata.convert.PrestoTypeConvert;
+import org.dinky.metadata.enums.DriverType;
import org.dinky.metadata.query.IDBQuery;
import org.dinky.metadata.query.PrestoQuery;
import org.dinky.metadata.result.JdbcSelectResult;
@@ -335,7 +336,7 @@ String getDriverClass() {
@Override
public String getType() {
- return "Presto";
+ return DriverType.PRESTO.getValue();
}
@Override
diff --git a/dinky-metadata/dinky-metadata-sqlserver/src/main/java/org/dinky/metadata/driver/SqlServerDriver.java b/dinky-metadata/dinky-metadata-sqlserver/src/main/java/org/dinky/metadata/driver/SqlServerDriver.java
index 1686cecdac..6acbfbe47b 100644
--- a/dinky-metadata/dinky-metadata-sqlserver/src/main/java/org/dinky/metadata/driver/SqlServerDriver.java
+++ b/dinky-metadata/dinky-metadata-sqlserver/src/main/java/org/dinky/metadata/driver/SqlServerDriver.java
@@ -27,6 +27,7 @@
import org.dinky.metadata.constant.SqlServerConstant;
import org.dinky.metadata.convert.ITypeConvert;
import org.dinky.metadata.convert.SqlServerTypeConvert;
+import org.dinky.metadata.enums.DriverType;
import org.dinky.metadata.query.IDBQuery;
import org.dinky.metadata.query.SqlServerQuery;
@@ -54,7 +55,7 @@ String getDriverClass() {
@Override
public String getType() {
- return "SqlServer";
+ return DriverType.SQLSERVER.getValue();
}
@Override
diff --git a/dinky-metadata/dinky-metadata-starrocks/src/main/java/org/dinky/metadata/driver/StarRocksDriver.java b/dinky-metadata/dinky-metadata-starrocks/src/main/java/org/dinky/metadata/driver/StarRocksDriver.java
index 52554c5719..beff9972d9 100644
--- a/dinky-metadata/dinky-metadata-starrocks/src/main/java/org/dinky/metadata/driver/StarRocksDriver.java
+++ b/dinky-metadata/dinky-metadata-starrocks/src/main/java/org/dinky/metadata/driver/StarRocksDriver.java
@@ -22,6 +22,7 @@
import org.dinky.metadata.config.AbstractJdbcConfig;
import org.dinky.metadata.convert.ITypeConvert;
import org.dinky.metadata.convert.StarRocksTypeConvert;
+import org.dinky.metadata.enums.DriverType;
import org.dinky.metadata.query.IDBQuery;
import org.dinky.metadata.query.StarRocksQuery;
import org.dinky.metadata.result.JdbcSelectResult;
@@ -52,7 +53,7 @@ String getDriverClass() {
@Override
public String getType() {
- return "StarRocks";
+ return DriverType.STARROCKS.getValue();
}
@Override