Skip to content

Commit

Permalink
add generic dialect #7956
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Nov 25, 2024
1 parent e1010dc commit 83a1b07
Show file tree
Hide file tree
Showing 11 changed files with 661 additions and 15 deletions.
12 changes: 12 additions & 0 deletions seatunnel-connectors-v2/connector-jdbc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@
<iris.jdbc.version>3.0.0</iris.jdbc.version>
<tikv.version>3.2.0</tikv.version>
<opengauss.jdbc.version>5.1.0-og</opengauss.jdbc.version>
<mariadb.jdbc.version>3.5.1</mariadb.jdbc.version>

</properties>

<dependencyManagement>
Expand Down Expand Up @@ -215,6 +217,12 @@
<version>${opengauss.jdbc.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.mariadb.jdbc</groupId>
<artifactId>mariadb-java-client</artifactId>
<version>${mariadb.jdbc.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
</dependencyManagement>

Expand Down Expand Up @@ -329,6 +337,10 @@
<groupId>org.opengauss</groupId>
<artifactId>opengauss-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.mariadb.jdbc</groupId>
<artifactId>mariadb-java-client</artifactId>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect;

public class DatabaseIdentifier {
public static final String GENERIC = "Generic";
public static final String DB_2 = "DB2";
public static final String DAMENG = "Dameng";
public static final String GBASE_8A = "Gbase8a";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect;

import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.FieldIdeEnum;

import lombok.extern.slf4j.Slf4j;

import java.util.Optional;

@Slf4j
public class GenericDialect implements JdbcDialect {

public String fieldIde = FieldIdeEnum.ORIGINAL.getValue();

public GenericDialect() {}

public GenericDialect(String fieldIde) {
this.fieldIde = fieldIde;
}

@Override
public String dialectName() {
return DatabaseIdentifier.GENERIC;
}

@Override
public JdbcRowConverter getRowConverter() {
return new AbstractJdbcRowConverter() {
@Override
public String converterName() {
return DatabaseIdentifier.GENERIC;
}
};
}

@Override
public JdbcDialectTypeMapper getJdbcDialectTypeMapper() {
return new GenericTypeMapper();
}

@Override
public String quoteIdentifier(String identifier) {
return getFieldIde(identifier, fieldIde);
}

@Override
public String quoteDatabaseIdentifier(String identifier) {
return identifier;
}

@Override
public String tableIdentifier(TablePath tablePath) {
return tableIdentifier(tablePath.getDatabaseName(), tablePath.getTableName());
}

@Override
public Optional<String> getUpsertStatement(
String database, String tableName, String[] fieldNames, String[] uniqueKeyFields) {
throw new UnsupportedOperationException();
}

@Override
public TablePath parse(String tablePath) {
return TablePath.of(tablePath, false);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect;

import com.google.auto.service.AutoService;

import javax.annotation.Nonnull;

/** Factory for {@link GenericDialect}. */
@AutoService(JdbcDialectFactory.class)
public class GenericDialectFactory implements JdbcDialectFactory {

// GenericDialect does not have any special requirements.
@Override
public boolean acceptsURL(String url) {
return true;
}

@Override
public JdbcDialect create() {
return new GenericDialect();
}

@Override
public JdbcDialect create(@Nonnull String compatibleMode, String fieldIde) {
return new GenericDialect(fieldIde);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect;

import org.apache.seatunnel.shade.com.google.common.base.Preconditions;

import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
import org.apache.seatunnel.api.table.converter.TypeConverter;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.api.table.type.LocalTimeType;
import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;

import com.google.auto.service.AutoService;
import lombok.extern.slf4j.Slf4j;

import java.sql.JDBCType;

@Slf4j
@AutoService(TypeConverter.class)
public class GenericTypeConverter implements TypeConverter<BasicTypeDefine> {

public static final GenericTypeConverter DEFAULT_INSTANCE = new GenericTypeConverter();

public static final int MAX_PRECISION = 65;
public static final int DEFAULT_PRECISION = 38;
public static final int MAX_SCALE = MAX_PRECISION - 1;
public static final int DEFAULT_SCALE = 18;

@Override
public String identifier() {
return DatabaseIdentifier.GENERIC;
}

/**
* Convert an external system's type definition to {@link Column}.
*
* @param typeDefine type define
* @return column
*/
@Override
public Column convert(BasicTypeDefine typeDefine) {
PhysicalColumn.PhysicalColumnBuilder builder =
PhysicalColumn.builder()
.name(typeDefine.getName())
.nullable(typeDefine.isNullable())
.defaultValue(typeDefine.getDefaultValue())
.comment(typeDefine.getComment());
String dataType = typeDefine.getDataType().toUpperCase();
JDBCType sqlType = JDBCType.valueOf(dataType);
switch (sqlType) {
case NULL:
builder.dataType(BasicType.VOID_TYPE);
break;
case BOOLEAN:
builder.dataType(BasicType.BOOLEAN_TYPE);
break;
case BIT:
if (typeDefine.getLength() == null
|| typeDefine.getLength() <= 0
|| typeDefine.getLength() == 1) {
builder.dataType(BasicType.BOOLEAN_TYPE);
} else {
builder.dataType(PrimitiveByteArrayType.INSTANCE);
// BIT(M) -> BYTE(M/8)
long byteLength = typeDefine.getLength() / 8;
byteLength += typeDefine.getLength() % 8 > 0 ? 1 : 0;
builder.columnLength(byteLength);
}
break;
case TINYINT:
builder.dataType(BasicType.BYTE_TYPE);
break;
case SMALLINT:
builder.dataType(BasicType.SHORT_TYPE);
break;
case INTEGER:
builder.dataType(BasicType.INT_TYPE);
break;
case BIGINT:
builder.dataType(BasicType.LONG_TYPE);
break;
case REAL:
case FLOAT:
builder.dataType(BasicType.FLOAT_TYPE);
break;
case DOUBLE:
builder.dataType(BasicType.DOUBLE_TYPE);
break;
case NUMERIC:
DecimalType decimalTypeForNumeric;
if (typeDefine.getPrecision() != null && typeDefine.getPrecision() > 0) {
decimalTypeForNumeric =
new DecimalType(
typeDefine.getPrecision().intValue(), typeDefine.getScale());
} else {
decimalTypeForNumeric = new DecimalType(DEFAULT_PRECISION, DEFAULT_SCALE);
}
builder.dataType(decimalTypeForNumeric);
break;
case DECIMAL:
Preconditions.checkArgument(typeDefine.getPrecision() > 0);
DecimalType decimalType;
if (typeDefine.getPrecision() > DEFAULT_PRECISION) {
decimalType = new DecimalType(DEFAULT_PRECISION, DEFAULT_SCALE);
} else {
decimalType =
new DecimalType(
typeDefine.getPrecision().intValue(),
typeDefine.getScale() == null
? 0
: typeDefine.getScale().intValue());
}
builder.dataType(decimalType);
builder.columnLength(Long.valueOf(decimalType.getPrecision()));
builder.scale(decimalType.getScale());
break;

case CHAR:
case VARCHAR:
case LONGVARCHAR:
case NCHAR:
case NVARCHAR:
case LONGNVARCHAR:
case CLOB:
case DATALINK:
case NCLOB:
case SQLXML:
builder.dataType(BasicType.STRING_TYPE);
break;

case BINARY:
case BLOB:
case VARBINARY:
case LONGVARBINARY:
if (typeDefine.getLength() == null || typeDefine.getLength() <= 0) {
builder.columnLength(1L);
} else {
builder.columnLength(typeDefine.getLength());
}
builder.dataType(PrimitiveByteArrayType.INSTANCE);
break;
case DATE:
builder.dataType(LocalTimeType.LOCAL_DATE_TYPE);
break;
case TIME:
builder.dataType(LocalTimeType.LOCAL_TIME_TYPE);
builder.scale(typeDefine.getScale());
break;
case TIMESTAMP:
builder.dataType(LocalTimeType.LOCAL_DATE_TIME_TYPE);
builder.scale(typeDefine.getScale());
break;

case OTHER:
case ARRAY:
case JAVA_OBJECT:
case DISTINCT:
case STRUCT:
case REF:
case ROWID:
default:
log.warn(
"JDBC type {} ({}) not currently supported",
sqlType.getVendorTypeNumber(),
sqlType.getName());
}
return builder.build();
}

/**
* Convert {@link Column} to an external system's type definition.
*
* @param column
* @return
*/
@Override
public BasicTypeDefine reconvert(Column column) {
throw new UnsupportedOperationException(
String.format(
"%s (%s) type doesn't have a mapping to the SQL database column type",
column.getName(), column.getDataType().getSqlType().name()));
}
}
Loading

0 comments on commit 83a1b07

Please sign in to comment.