From 9577e7db18e8e77b92308a4cde9013c349ab0fe2 Mon Sep 17 00:00:00 2001 From: Wenjun Ruan Date: Fri, 11 Aug 2023 16:08:22 +0800 Subject: [PATCH] Support catalog in MaxCompute Source --- docs/en/connector-v2/source/Maxcompute.md | 15 + docs/en/connector-v2/source/MyHours.md | 8 +- .../aliyun/odps/type/SimpleArrayTypeInfo.java | 52 --- .../aliyun/odps/type/SimpleMapTypeInfo.java | 63 ---- .../odps/type/SimpleStructTypeInfo.java | 102 ------ .../maxcompute/catalog/MaxComputeCatalog.java | 160 ++++++++++ .../catalog/MaxComputeCatalogFactory.java | 58 ++++ .../catalog/MaxComputeDataTypeConvertor.java | 302 ++++++++++++++++++ .../maxcompute/config/MaxcomputeConfig.java | 3 + .../maxcompute/sink/MaxcomputeSink.java | 4 +- .../sink/MaxcomputeSinkFactory.java | 3 +- .../maxcompute/source/MaxcomputeSource.java | 16 +- .../source/MaxcomputeSourceFactory.java | 6 +- .../maxcompute/util/MaxcomputeTypeMapper.java | 111 +------ .../MaxComputeDataTypeConvertorTest.java | 71 ++++ .../source/MaxcomputeSourceTest.java | 46 +++ 16 files changed, 692 insertions(+), 328 deletions(-) delete mode 100644 seatunnel-connectors-v2/connector-maxcompute/src/main/java/com/aliyun/odps/type/SimpleArrayTypeInfo.java delete mode 100644 seatunnel-connectors-v2/connector-maxcompute/src/main/java/com/aliyun/odps/type/SimpleMapTypeInfo.java delete mode 100644 seatunnel-connectors-v2/connector-maxcompute/src/main/java/com/aliyun/odps/type/SimpleStructTypeInfo.java create mode 100644 seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCatalog.java create mode 100644 seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCatalogFactory.java create mode 100644 seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeDataTypeConvertor.java create mode 100644 seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeDataTypeConvertorTest.java create mode 100644 seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceTest.java diff --git a/docs/en/connector-v2/source/Maxcompute.md b/docs/en/connector-v2/source/Maxcompute.md index f30be5a0d26c..cb9bc32dd382 100644 --- a/docs/en/connector-v2/source/Maxcompute.md +++ b/docs/en/connector-v2/source/Maxcompute.md @@ -26,6 +26,7 @@ Used to read data from Maxcompute. | partition_spec | string | no | - | | split_row | int | no | 10000 | | common-options | string | no | | +| schema | config | no | | ### accessId [string] @@ -59,6 +60,12 @@ Used to read data from Maxcompute. Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details. +### schema [config] + +#### fields [Config] + +The schema information of upstream data. + ## Examples ```hocon @@ -71,6 +78,13 @@ source { table_name="" #partition_spec="" #split_row = 10000 + schema { + fields { + name = string + age = int + gender = string + } + } } } ``` @@ -80,4 +94,5 @@ source { ### next version - [Feature] Add Maxcompute Source Connector([3640](https://github.com/apache/seatunnel/pull/3640)) +- [Feature] Support Schema in MaxCompute Source([3640](https://github.com/apache/seatunnel/pull/5283)) diff --git a/docs/en/connector-v2/source/MyHours.md b/docs/en/connector-v2/source/MyHours.md index 87d9ab3ce3c1..f90d42ab1cba 100644 --- a/docs/en/connector-v2/source/MyHours.md +++ b/docs/en/connector-v2/source/MyHours.md @@ -35,13 +35,13 @@ Used to read data from My Hours. In order to use the My Hours connector, the following dependencies are required. They can be downloaded via install-plugin.sh or from the Maven central repository. -| Datasource | Supported Versions | Dependency | -|------------|--------------------|------------------------------------------------------------------------------------------------| -| My Hours | universal | [Download](https://mvnrepository.com/artifact/org.apache.seatunnel/seatunnel-connectors-v2) | +| Datasource | Supported Versions | Dependency | +|------------|--------------------|---------------------------------------------------------------------------------------------| +| My Hours | universal | [Download](https://mvnrepository.com/artifact/org.apache.seatunnel/seatunnel-connectors-v2) | ## Source Options -| Name | Type | Required | Default | Description | +| Name | Type | Required | Default | Description | |-----------------------------|---------|----------|---------|--------------------------------------------------------------------------------------------------------------------------------------| | url | String | Yes | - | Http request url. | | email | String | Yes | - | My hours login email address. | diff --git a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/com/aliyun/odps/type/SimpleArrayTypeInfo.java b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/com/aliyun/odps/type/SimpleArrayTypeInfo.java deleted file mode 100644 index 76fddc10c4b8..000000000000 --- a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/com/aliyun/odps/type/SimpleArrayTypeInfo.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.aliyun.odps.type; - -import org.apache.seatunnel.common.exception.CommonErrorCode; -import org.apache.seatunnel.connectors.seatunnel.maxcompute.exception.MaxcomputeConnectorException; - -import com.aliyun.odps.OdpsType; - -public class SimpleArrayTypeInfo implements ArrayTypeInfo { - private final TypeInfo valueType; - - SimpleArrayTypeInfo(TypeInfo typeInfo) { - if (typeInfo == null) { - throw new MaxcomputeConnectorException( - CommonErrorCode.UNSUPPORTED_DATA_TYPE, "Invalid element type."); - } else { - this.valueType = typeInfo; - } - } - - public String getTypeName() { - return this.getOdpsType().name() + "<" + this.valueType.getTypeName() + ">"; - } - - public TypeInfo getElementTypeInfo() { - return this.valueType; - } - - public OdpsType getOdpsType() { - return OdpsType.ARRAY; - } - - public String toString() { - return this.getTypeName(); - } -} diff --git a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/com/aliyun/odps/type/SimpleMapTypeInfo.java b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/com/aliyun/odps/type/SimpleMapTypeInfo.java deleted file mode 100644 index 6cedcd22bba0..000000000000 --- a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/com/aliyun/odps/type/SimpleMapTypeInfo.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.aliyun.odps.type; - -import org.apache.seatunnel.common.exception.CommonErrorCode; -import org.apache.seatunnel.connectors.seatunnel.maxcompute.exception.MaxcomputeConnectorException; - -import com.aliyun.odps.OdpsType; - -public class SimpleMapTypeInfo implements MapTypeInfo { - private final TypeInfo keyType; - private final TypeInfo valueType; - - SimpleMapTypeInfo(TypeInfo keyType, TypeInfo valueType) { - if (keyType != null && valueType != null) { - this.keyType = keyType; - this.valueType = valueType; - } else { - throw new MaxcomputeConnectorException( - CommonErrorCode.UNSUPPORTED_DATA_TYPE, "Invalid key or value type for map."); - } - } - - public String getTypeName() { - return this.getOdpsType().name() - + "<" - + this.keyType.getTypeName() - + "," - + this.valueType.getTypeName() - + ">"; - } - - public TypeInfo getKeyTypeInfo() { - return this.keyType; - } - - public TypeInfo getValueTypeInfo() { - return this.valueType; - } - - public OdpsType getOdpsType() { - return OdpsType.MAP; - } - - public String toString() { - return this.getTypeName(); - } -} diff --git a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/com/aliyun/odps/type/SimpleStructTypeInfo.java b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/com/aliyun/odps/type/SimpleStructTypeInfo.java deleted file mode 100644 index a6283552217d..000000000000 --- a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/com/aliyun/odps/type/SimpleStructTypeInfo.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.aliyun.odps.type; - -import org.apache.seatunnel.common.exception.CommonErrorCode; -import org.apache.seatunnel.connectors.seatunnel.maxcompute.exception.MaxcomputeConnectorException; - -import com.aliyun.odps.OdpsType; - -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; - -public class SimpleStructTypeInfo implements StructTypeInfo { - private final List fieldNames; - private final List fieldTypeInfos; - - SimpleStructTypeInfo(List names, List typeInfos) { - this.validateParameters(names, typeInfos); - this.fieldNames = this.toLowerCase(names); - this.fieldTypeInfos = new ArrayList(typeInfos); - } - - private List toLowerCase(List names) { - List lowerNames = new ArrayList(names.size()); - Iterator var3 = names.iterator(); - - while (var3.hasNext()) { - String name = (String) var3.next(); - lowerNames.add(name.toLowerCase()); - } - - return lowerNames; - } - - private void validateParameters(List names, List typeInfos) { - if (names != null && typeInfos != null && !names.isEmpty() && !typeInfos.isEmpty()) { - if (names.size() != typeInfos.size()) { - throw new MaxcomputeConnectorException( - CommonErrorCode.UNSUPPORTED_DATA_TYPE, - "The amount of field names must be equal to the amount of field types."); - } - } else { - throw new MaxcomputeConnectorException( - CommonErrorCode.UNSUPPORTED_DATA_TYPE, - "Invalid name or element type for struct."); - } - } - - public String getTypeName() { - StringBuilder stringBuilder = new StringBuilder(this.getOdpsType().name()); - stringBuilder.append("<"); - - for (int i = 0; i < this.fieldNames.size(); ++i) { - if (i > 0) { - stringBuilder.append(","); - } - - stringBuilder.append((String) this.fieldNames.get(i)); - stringBuilder.append(":"); - stringBuilder.append(((TypeInfo) this.fieldTypeInfos.get(i)).getTypeName()); - } - - stringBuilder.append(">"); - return stringBuilder.toString(); - } - - public List getFieldNames() { - return this.fieldNames; - } - - public List getFieldTypeInfos() { - return this.fieldTypeInfos; - } - - public int getFieldCount() { - return this.fieldNames.size(); - } - - public OdpsType getOdpsType() { - return OdpsType.STRUCT; - } - - public String toString() { - return this.getTypeName(); - } -} diff --git a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCatalog.java b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCatalog.java new file mode 100644 index 000000000000..b131277bd782 --- /dev/null +++ b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCatalog.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.maxcompute.catalog; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.table.catalog.Catalog; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.table.catalog.exception.CatalogException; +import org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistException; +import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException; +import org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException; +import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException; + +import com.aliyun.odps.Odps; +import com.aliyun.odps.OdpsException; +import com.aliyun.odps.Projects; +import com.aliyun.odps.Tables; +import com.aliyun.odps.account.Account; +import com.aliyun.odps.account.AliyunAccount; +import com.google.common.collect.Lists; +import lombok.extern.slf4j.Slf4j; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ACCESS_ID; +import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ACCESS_KEY; +import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ENDPOINT; +import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PROJECT; + +@Slf4j +public class MaxComputeCatalog implements Catalog { + + private final ReadonlyConfig readonlyConfig; + private final String catalogName; + + private Account account; + + public MaxComputeCatalog(String catalogName, ReadonlyConfig options) { + this.readonlyConfig = options; + this.catalogName = catalogName; + } + + @Override + public void open() throws CatalogException { + account = new AliyunAccount(readonlyConfig.get(ACCESS_ID), readonlyConfig.get(ACCESS_KEY)); + } + + @Override + public void close() throws CatalogException {} + + @Override + public String getDefaultDatabase() throws CatalogException { + return readonlyConfig.get(PROJECT); + } + + @Override + public boolean databaseExists(String databaseName) throws CatalogException { + Odps odps = new Odps(account); + odps.setEndpoint(readonlyConfig.get(ENDPOINT)); + odps.setDefaultProject(readonlyConfig.get(PROJECT)); + Projects projects = odps.projects(); + try { + return projects.exists(databaseName); + } catch (OdpsException e) { + throw new CatalogException("Check " + databaseName + " exist error", e); + } + } + + @Override + public List listDatabases() throws CatalogException { + try { + // todo: how to get all projects + String project = readonlyConfig.get(PROJECT); + if (databaseExists(project)) { + return Lists.newArrayList(project); + } + return Collections.emptyList(); + } catch (Exception e) { + throw new CatalogException("listDatabases exist error", e); + } + } + + @Override + public List listTables(String databaseName) + throws CatalogException, DatabaseNotExistException { + Odps odps = new Odps(account); + odps.setEndpoint(readonlyConfig.get(ENDPOINT)); + odps.setDefaultProject(databaseName); + + Tables tables = odps.tables(); + List tableNames = new ArrayList<>(); + tables.forEach( + table -> { + tableNames.add(table.getName()); + }); + return tableNames; + } + + @Override + public boolean tableExists(TablePath tablePath) throws CatalogException { + Odps odps = new Odps(account); + odps.setEndpoint(readonlyConfig.get(ENDPOINT)); + odps.setDefaultProject(tablePath.getDatabaseName()); + + Tables tables = odps.tables(); + try { + return tables.exists(tablePath.getTableName()); + } catch (OdpsException e) { + throw new CatalogException("tableExists" + tablePath + " error", e); + } + } + + @Override + public CatalogTable getTable(TablePath tablePath) + throws CatalogException, TableNotExistException { + throw new UnsupportedOperationException(); + } + + @Override + public void createTable(TablePath tablePath, CatalogTable table, boolean ignoreIfExists) + throws TableAlreadyExistException, DatabaseNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void dropTable(TablePath tablePath, boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void createDatabase(TablePath tablePath, boolean ignoreIfExists) + throws DatabaseAlreadyExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists) + throws DatabaseNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } +} diff --git a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCatalogFactory.java b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCatalogFactory.java new file mode 100644 index 000000000000..4ed29c0ea64c --- /dev/null +++ b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCatalogFactory.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.maxcompute.catalog; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.table.catalog.Catalog; +import org.apache.seatunnel.api.table.factory.CatalogFactory; +import org.apache.seatunnel.api.table.factory.Factory; + +import com.google.auto.service.AutoService; + +import static org.apache.seatunnel.api.table.catalog.CatalogTableUtil.SCHEMA; +import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ACCESS_ID; +import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ACCESS_KEY; +import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ENDPOINT; +import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PARTITION_SPEC; +import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PLUGIN_NAME; +import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PROJECT; +import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.SPLIT_ROW; +import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.TABLE_NAME; + +@AutoService(Factory.class) +public class MaxComputeCatalogFactory implements CatalogFactory { + + @Override + public Catalog createCatalog(String catalogName, ReadonlyConfig options) { + return new MaxComputeCatalog(catalogName, options); + } + + @Override + public String factoryIdentifier() { + return PLUGIN_NAME; + } + + @Override + public OptionRule optionRule() { + return OptionRule.builder() + .required(ACCESS_ID, ACCESS_KEY, ENDPOINT, PROJECT, TABLE_NAME) + .optional(PARTITION_SPEC, SPLIT_ROW, SPLIT_ROW, SCHEMA) + .build(); + } +} diff --git a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeDataTypeConvertor.java b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeDataTypeConvertor.java new file mode 100644 index 000000000000..4e48f938aa49 --- /dev/null +++ b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeDataTypeConvertor.java @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.maxcompute.catalog; + +import org.apache.seatunnel.api.table.catalog.DataTypeConvertException; +import org.apache.seatunnel.api.table.catalog.DataTypeConvertor; +import org.apache.seatunnel.api.table.type.ArrayType; +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.DecimalType; +import org.apache.seatunnel.api.table.type.LocalTimeType; +import org.apache.seatunnel.api.table.type.MapType; +import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.exception.CommonErrorCode; +import org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig; +import org.apache.seatunnel.connectors.seatunnel.maxcompute.exception.MaxcomputeConnectorException; + +import com.aliyun.odps.OdpsType; +import com.aliyun.odps.type.ArrayTypeInfo; +import com.aliyun.odps.type.DecimalTypeInfo; +import com.aliyun.odps.type.MapTypeInfo; +import com.aliyun.odps.type.StructTypeInfo; +import com.aliyun.odps.type.TypeInfo; +import com.aliyun.odps.type.TypeInfoFactory; +import com.google.auto.service.AutoService; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +@AutoService(DataTypeConvertor.class) +public class MaxComputeDataTypeConvertor implements DataTypeConvertor { + + @Override + public SeaTunnelDataType toSeaTunnelType(String connectorDataType) { + if (connectorDataType.startsWith("MAP")) { + // MAP + int i = connectorDataType.indexOf(","); + return new MapType( + toSeaTunnelType(connectorDataType.substring(4, i)), + toSeaTunnelType( + connectorDataType.substring(i + 1, connectorDataType.length() - 1))); + } + if (connectorDataType.startsWith("ARRAY")) { + // ARRAY + SeaTunnelDataType seaTunnelType = + toSeaTunnelType(connectorDataType.substring(6, connectorDataType.length() - 1)); + switch (seaTunnelType.getSqlType()) { + case STRING: + return ArrayType.STRING_ARRAY_TYPE; + case BOOLEAN: + return ArrayType.BOOLEAN_ARRAY_TYPE; + case BYTES: + return ArrayType.BYTE_ARRAY_TYPE; + case SMALLINT: + return ArrayType.SHORT_ARRAY_TYPE; + case INT: + return ArrayType.INT_ARRAY_TYPE; + case BIGINT: + return ArrayType.LONG_ARRAY_TYPE; + case FLOAT: + return ArrayType.FLOAT_ARRAY_TYPE; + case DOUBLE: + return ArrayType.DOUBLE_ARRAY_TYPE; + default: + throw new MaxcomputeConnectorException( + CommonErrorCode.UNSUPPORTED_DATA_TYPE, + "Unsupported array element type: " + seaTunnelType); + } + } + if (connectorDataType.startsWith("STRUCT")) { + // STRUCT + // todo: support struct type + String substring = connectorDataType.substring(7, connectorDataType.length() - 1); + String[] entryArray = substring.split(","); + String[] fieldNames = new String[entryArray.length]; + SeaTunnelDataType[] fieldTypes = new SeaTunnelDataType[entryArray.length]; + for (int i = 0; i < entryArray.length; i++) { + String[] field = entryArray[i].split(":"); + fieldNames[i] = field[0]; + fieldTypes[i] = toSeaTunnelType(field[1]); + } + return new SeaTunnelRowType(fieldNames, fieldTypes); + } + if (connectorDataType.startsWith("DECIMAL")) { + // DECIMAL(precision,scale) + if (connectorDataType.contains("(")) { + String substring = connectorDataType.substring(8, connectorDataType.length() - 1); + String[] split = substring.split(","); + return new DecimalType(Integer.parseInt(split[0]), Integer.parseInt(split[1])); + } else { + return new DecimalType(54, 18); + } + } + if (connectorDataType.startsWith("CHAR") || connectorDataType.startsWith("VARCHAR")) { + // CHAR(n) or VARCHAR(n) + return BasicType.STRING_TYPE; + } + switch (connectorDataType) { + case "TINYINT": + case "BINARY": + return BasicType.BYTE_TYPE; + case "SMALLINT": + return BasicType.SHORT_TYPE; + case "INT": + return BasicType.INT_TYPE; + case "BIGINT": + return BasicType.LONG_TYPE; + case "FLOAT": + return BasicType.FLOAT_TYPE; + case "DOUBLE": + return BasicType.DOUBLE_TYPE; + case "STRING": + return BasicType.STRING_TYPE; + case "DATE": + return LocalTimeType.LOCAL_DATE_TYPE; + case "TIMESTAMP": + return LocalTimeType.LOCAL_TIME_TYPE; + case "TIME": + return LocalTimeType.LOCAL_DATE_TYPE; + case "BOOLEAN": + return DecimalType.BOOLEAN_TYPE; + case "NULL": + return BasicType.VOID_TYPE; + default: + throw new MaxcomputeConnectorException( + CommonErrorCode.UNSUPPORTED_DATA_TYPE, + String.format( + "SeaTunnel type not support this type [%s] now", + connectorDataType)); + } + } + + @Override + public SeaTunnelDataType toSeaTunnelType( + TypeInfo connectorDataType, Map dataTypeProperties) + throws DataTypeConvertException { + switch (connectorDataType.getOdpsType()) { + case MAP: + MapTypeInfo mapTypeInfo = (MapTypeInfo) connectorDataType; + return new MapType( + toSeaTunnelType(mapTypeInfo.getKeyTypeInfo(), dataTypeProperties), + toSeaTunnelType(mapTypeInfo.getValueTypeInfo(), dataTypeProperties)); + case ARRAY: + ArrayTypeInfo arrayTypeInfo = (ArrayTypeInfo) connectorDataType; + switch (arrayTypeInfo.getElementTypeInfo().getOdpsType()) { + case BOOLEAN: + return ArrayType.BOOLEAN_ARRAY_TYPE; + case INT: + return ArrayType.INT_ARRAY_TYPE; + case BIGINT: + return ArrayType.LONG_ARRAY_TYPE; + case FLOAT: + return ArrayType.FLOAT_ARRAY_TYPE; + case DOUBLE: + return ArrayType.DOUBLE_ARRAY_TYPE; + case STRING: + return ArrayType.STRING_ARRAY_TYPE; + default: + throw new MaxcomputeConnectorException( + CommonErrorCode.UNSUPPORTED_DATA_TYPE, + String.format( + "SeaTunnel type not support this type [%s] now", + connectorDataType.getTypeName())); + } + case STRUCT: + StructTypeInfo structTypeInfo = (StructTypeInfo) connectorDataType; + List fields = structTypeInfo.getFieldTypeInfos(); + List fieldNames = new ArrayList<>(fields.size()); + List> fieldTypes = new ArrayList<>(fields.size()); + for (TypeInfo field : fields) { + fieldNames.add(field.getTypeName()); + fieldTypes.add(toSeaTunnelType(field, dataTypeProperties)); + } + return new SeaTunnelRowType( + fieldNames.toArray(new String[0]), + fieldTypes.toArray(new SeaTunnelDataType[0])); + case TINYINT: + return BasicType.BYTE_TYPE; + case SMALLINT: + return BasicType.SHORT_TYPE; + case INT: + return BasicType.INT_TYPE; + case BIGINT: + return BasicType.LONG_TYPE; + case BINARY: + return PrimitiveByteArrayType.INSTANCE; + case FLOAT: + return BasicType.FLOAT_TYPE; + case DOUBLE: + return BasicType.DOUBLE_TYPE; + case DECIMAL: + DecimalTypeInfo decimalTypeInfo = (DecimalTypeInfo) connectorDataType; + return new DecimalType(decimalTypeInfo.getPrecision(), decimalTypeInfo.getScale()); + case VARCHAR: + case CHAR: + case STRING: + return BasicType.STRING_TYPE; + case DATE: + return LocalTimeType.LOCAL_DATE_TYPE; + case DATETIME: + return LocalTimeType.LOCAL_DATE_TIME_TYPE; + case TIMESTAMP: + return LocalTimeType.LOCAL_TIME_TYPE; + case BOOLEAN: + return BasicType.BOOLEAN_TYPE; + case VOID: + return BasicType.VOID_TYPE; + case INTERVAL_DAY_TIME: + case INTERVAL_YEAR_MONTH: + default: + throw new MaxcomputeConnectorException( + CommonErrorCode.UNSUPPORTED_DATA_TYPE, + String.format( + "SeaTunnel type not support this type [%s] now", + connectorDataType.getTypeName())); + } + } + + @Override + public TypeInfo toConnectorType( + SeaTunnelDataType seaTunnelDataType, Map dataTypeProperties) + throws DataTypeConvertException { + switch (seaTunnelDataType.getSqlType()) { + case MAP: + MapType mapType = (MapType) seaTunnelDataType; + return TypeInfoFactory.getMapTypeInfo( + toConnectorType(mapType.getKeyType(), dataTypeProperties), + toConnectorType(mapType.getValueType(), dataTypeProperties)); + case ARRAY: + ArrayType arrayType = (ArrayType) seaTunnelDataType; + return TypeInfoFactory.getArrayTypeInfo( + toConnectorType(arrayType.getElementType(), dataTypeProperties)); + case ROW: + SeaTunnelRowType rowType = (SeaTunnelRowType) seaTunnelDataType; + List fieldNames = new ArrayList<>(rowType.getTotalFields()); + List fieldTypes = new ArrayList<>(rowType.getTotalFields()); + for (int i = 0; i < rowType.getTotalFields(); i++) { + fieldNames.add(rowType.getFieldName(i)); + fieldTypes.add(toConnectorType(rowType.getFieldType(i), dataTypeProperties)); + } + return TypeInfoFactory.getStructTypeInfo(fieldNames, fieldTypes); + case TINYINT: + return TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.TINYINT); + case SMALLINT: + return TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.SMALLINT); + case INT: + return TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.INT); + case BIGINT: + return TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.BIGINT); + case BYTES: + return TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.BINARY); + case FLOAT: + return TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.FLOAT); + case DOUBLE: + return TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.DOUBLE); + case DECIMAL: + DecimalType decimalType = (DecimalType) seaTunnelDataType; + return TypeInfoFactory.getDecimalTypeInfo( + decimalType.getPrecision(), decimalType.getScale()); + case STRING: + return TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.STRING); + case DATE: + return TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.DATE); + case TIMESTAMP: + return TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.TIMESTAMP); + case TIME: + return TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.DATETIME); + case BOOLEAN: + return TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.BOOLEAN); + case NULL: + return TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.VOID); + default: + throw new MaxcomputeConnectorException( + CommonErrorCode.UNSUPPORTED_DATA_TYPE, + String.format( + "Maxcompute type not support this type [%s] now", + seaTunnelDataType.getSqlType())); + } + } + + @Override + public String getIdentity() { + return MaxcomputeConfig.PLUGIN_NAME; + } +} diff --git a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/config/MaxcomputeConfig.java b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/config/MaxcomputeConfig.java index 18aaafad417e..84bbccddb790 100644 --- a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/config/MaxcomputeConfig.java +++ b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/config/MaxcomputeConfig.java @@ -23,6 +23,9 @@ import java.io.Serializable; public class MaxcomputeConfig implements Serializable { + + public static final String PLUGIN_NAME = "Maxcompute"; + private static final int SPLIT_ROW_DEFAULT = 10000; public static final Option ACCESS_ID = Options.key("accessId") diff --git a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSink.java b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSink.java index cf6d639ed129..31c5ab0791a1 100644 --- a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSink.java +++ b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSink.java @@ -34,6 +34,8 @@ import com.google.auto.service.AutoService; +import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PLUGIN_NAME; + @AutoService(SeaTunnelSink.class) public class MaxcomputeSink extends AbstractSimpleSink { private static final Logger LOG = LoggerFactory.getLogger(MaxcomputeSink.class); @@ -42,7 +44,7 @@ public class MaxcomputeSink extends AbstractSimpleSink { @Override public String getPluginName() { - return "Maxcompute"; + return PLUGIN_NAME; } @Override diff --git a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSinkFactory.java b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSinkFactory.java index 5fbf1608d33d..faa2f6291025 100644 --- a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSinkFactory.java +++ b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSinkFactory.java @@ -28,6 +28,7 @@ import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ENDPOINT; import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.OVERWRITE; import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PARTITION_SPEC; +import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PLUGIN_NAME; import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PROJECT; import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.TABLE_NAME; @@ -35,7 +36,7 @@ public class MaxcomputeSinkFactory implements TableSinkFactory { @Override public String factoryIdentifier() { - return "Maxcompute"; + return PLUGIN_NAME; } @Override diff --git a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSource.java b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSource.java index 3f6dd6cc7674..417d660b86d8 100644 --- a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSource.java +++ b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSource.java @@ -23,7 +23,9 @@ import org.apache.seatunnel.api.source.SeaTunnelSource; import org.apache.seatunnel.api.source.SourceReader; import org.apache.seatunnel.api.source.SourceSplitEnumerator; +import org.apache.seatunnel.api.source.SupportColumnProjection; import org.apache.seatunnel.api.source.SupportParallelism; +import org.apache.seatunnel.api.table.catalog.CatalogTableUtil; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.connectors.seatunnel.maxcompute.util.MaxcomputeTypeMapper; @@ -31,22 +33,30 @@ import com.google.auto.service.AutoService; import lombok.extern.slf4j.Slf4j; +import static org.apache.seatunnel.api.table.catalog.CatalogTableUtil.SCHEMA; +import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PLUGIN_NAME; + @Slf4j @AutoService(SeaTunnelSource.class) public class MaxcomputeSource implements SeaTunnelSource, - SupportParallelism { + SupportParallelism, + SupportColumnProjection { private SeaTunnelRowType typeInfo; private Config pluginConfig; @Override public String getPluginName() { - return "Maxcompute"; + return PLUGIN_NAME; } @Override public void prepare(Config pluginConfig) { - this.typeInfo = MaxcomputeTypeMapper.getSeaTunnelRowType(pluginConfig); + if (pluginConfig.hasPath(SCHEMA.key())) { + this.typeInfo = CatalogTableUtil.buildWithConfig(pluginConfig).getSeaTunnelRowType(); + } else { + this.typeInfo = MaxcomputeTypeMapper.getSeaTunnelRowType(pluginConfig); + } this.pluginConfig = pluginConfig; } diff --git a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceFactory.java b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceFactory.java index f5b4fd03b79e..a1600f106d39 100644 --- a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceFactory.java +++ b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceFactory.java @@ -24,10 +24,12 @@ import com.google.auto.service.AutoService; +import static org.apache.seatunnel.api.table.catalog.CatalogTableUtil.SCHEMA; import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ACCESS_ID; import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ACCESS_KEY; import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ENDPOINT; import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PARTITION_SPEC; +import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PLUGIN_NAME; import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PROJECT; import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.SPLIT_ROW; import static org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.TABLE_NAME; @@ -36,14 +38,14 @@ public class MaxcomputeSourceFactory implements TableSourceFactory { @Override public String factoryIdentifier() { - return "Maxcompute"; + return PLUGIN_NAME; } @Override public OptionRule optionRule() { return OptionRule.builder() .required(ACCESS_ID, ACCESS_KEY, ENDPOINT, PROJECT, TABLE_NAME) - .optional(PARTITION_SPEC, SPLIT_ROW) + .optional(PARTITION_SPEC, SPLIT_ROW, SCHEMA) .build(); } diff --git a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeTypeMapper.java b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeTypeMapper.java index ac772140150d..8e9eaf078514 100644 --- a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeTypeMapper.java +++ b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeTypeMapper.java @@ -20,15 +20,12 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config; import org.apache.seatunnel.api.table.type.ArrayType; -import org.apache.seatunnel.api.table.type.BasicType; -import org.apache.seatunnel.api.table.type.DecimalType; -import org.apache.seatunnel.api.table.type.LocalTimeType; import org.apache.seatunnel.api.table.type.MapType; -import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.common.exception.CommonErrorCode; +import org.apache.seatunnel.connectors.seatunnel.maxcompute.catalog.MaxComputeDataTypeConvertor; import org.apache.seatunnel.connectors.seatunnel.maxcompute.exception.MaxcomputeConnectorException; import com.aliyun.odps.Column; @@ -41,11 +38,7 @@ import com.aliyun.odps.data.SimpleStruct; import com.aliyun.odps.data.Varchar; import com.aliyun.odps.type.ArrayTypeInfo; -import com.aliyun.odps.type.DecimalTypeInfo; import com.aliyun.odps.type.MapTypeInfo; -import com.aliyun.odps.type.SimpleArrayTypeInfo; -import com.aliyun.odps.type.SimpleMapTypeInfo; -import com.aliyun.odps.type.SimpleStructTypeInfo; import com.aliyun.odps.type.StructTypeInfo; import com.aliyun.odps.type.TypeInfo; import lombok.extern.slf4j.Slf4j; @@ -67,9 +60,9 @@ public class MaxcomputeTypeMapper implements Serializable { public static SeaTunnelRow getSeaTunnelRowData(Record rs, SeaTunnelRowType typeInfo) { List fields = new ArrayList<>(); - SeaTunnelDataType[] seaTunnelDataTypes = typeInfo.getFieldTypes(); - for (int i = 0; i < rs.getColumns().length; i++) { - fields.add(resolveObject2SeaTunnel(rs.get(i), seaTunnelDataTypes[i])); + for (int i = 0; i < typeInfo.getTotalFields(); i++) { + String typeName = typeInfo.getFieldName(i); + fields.add(resolveObject2SeaTunnel(rs.get(typeName), typeInfo.getFieldType(i))); } return new SeaTunnelRow(fields.toArray()); } @@ -92,11 +85,12 @@ public static SeaTunnelRowType getSeaTunnelRowType(Config pluginConfig) { ArrayList> seaTunnelDataTypes = new ArrayList<>(); ArrayList fieldNames = new ArrayList<>(); try { + MaxComputeDataTypeConvertor typeConvertor = new MaxComputeDataTypeConvertor(); for (int i = 0; i < tableSchema.getColumns().size(); i++) { fieldNames.add(tableSchema.getColumns().get(i).getName()); TypeInfo maxcomputeTypeInfo = tableSchema.getColumns().get(i).getTypeInfo(); SeaTunnelDataType seaTunnelDataType = - maxcompute2SeaTunnelType(maxcomputeTypeInfo); + typeConvertor.toSeaTunnelType(maxcomputeTypeInfo, null); seaTunnelDataTypes.add(seaTunnelDataType); } } catch (Exception e) { @@ -107,88 +101,6 @@ public static SeaTunnelRowType getSeaTunnelRowType(Config pluginConfig) { seaTunnelDataTypes.toArray(new SeaTunnelDataType[seaTunnelDataTypes.size()])); } - private static SeaTunnelDataType maxcompute2SeaTunnelType(TypeInfo typeInfo) { - switch (typeInfo.getOdpsType()) { - case MAP: - MapTypeInfo mapTypeInfo = (MapTypeInfo) typeInfo; - return new MapType( - maxcompute2SeaTunnelType(mapTypeInfo.getKeyTypeInfo()), - maxcompute2SeaTunnelType(mapTypeInfo.getValueTypeInfo())); - case ARRAY: - ArrayTypeInfo arrayTypeInfo = (ArrayTypeInfo) typeInfo; - switch (arrayTypeInfo.getElementTypeInfo().getOdpsType()) { - case BOOLEAN: - return ArrayType.BOOLEAN_ARRAY_TYPE; - case INT: - return ArrayType.INT_ARRAY_TYPE; - case BIGINT: - return ArrayType.LONG_ARRAY_TYPE; - case FLOAT: - return ArrayType.FLOAT_ARRAY_TYPE; - case DOUBLE: - return ArrayType.DOUBLE_ARRAY_TYPE; - case STRING: - return ArrayType.STRING_ARRAY_TYPE; - default: - throw new MaxcomputeConnectorException( - CommonErrorCode.UNSUPPORTED_DATA_TYPE, - String.format( - "SeaTunnel type not support this type [%s] now", - typeInfo.getTypeName())); - } - case STRUCT: - StructTypeInfo structTypeInfo = (StructTypeInfo) typeInfo; - List fields = structTypeInfo.getFieldTypeInfos(); - List fieldNames = new ArrayList<>(fields.size()); - List> fieldTypes = new ArrayList<>(fields.size()); - for (TypeInfo field : fields) { - fieldNames.add(field.getTypeName()); - fieldTypes.add(maxcompute2SeaTunnelType(field)); - } - return new SeaTunnelRowType( - fieldNames.toArray(new String[0]), - fieldTypes.toArray(new SeaTunnelDataType[0])); - case TINYINT: - return BasicType.BYTE_TYPE; - case SMALLINT: - return BasicType.SHORT_TYPE; - case INT: - return BasicType.INT_TYPE; - case BIGINT: - return BasicType.LONG_TYPE; - case BINARY: - return PrimitiveByteArrayType.INSTANCE; - case FLOAT: - return BasicType.FLOAT_TYPE; - case DOUBLE: - return BasicType.DOUBLE_TYPE; - case DECIMAL: - DecimalTypeInfo decimalTypeInfo = (DecimalTypeInfo) typeInfo; - return new DecimalType(decimalTypeInfo.getPrecision(), decimalTypeInfo.getScale()); - case VARCHAR: - case CHAR: - case STRING: - return BasicType.STRING_TYPE; - case DATE: - return LocalTimeType.LOCAL_DATE_TYPE; - case DATETIME: - case TIMESTAMP: - return LocalTimeType.LOCAL_DATE_TIME_TYPE; - case BOOLEAN: - return BasicType.BOOLEAN_TYPE; - case VOID: - return BasicType.VOID_TYPE; - case INTERVAL_DAY_TIME: - case INTERVAL_YEAR_MONTH: - default: - throw new MaxcomputeConnectorException( - CommonErrorCode.UNSUPPORTED_DATA_TYPE, - String.format( - "SeaTunnel type not support this type [%s] now", - typeInfo.getTypeName())); - } - } - private static Object resolveObject2SeaTunnel(Object field, SeaTunnelDataType fieldType) { if (field == null) { return null; @@ -245,11 +157,10 @@ private static Object resolveObject2SeaTunnel(Object field, SeaTunnelDataType case DOUBLE: case BIGINT: case BOOLEAN: + case DECIMAL: return field; case BYTES: return ((Binary) field).data(); - case DECIMAL: - return null; case STRING: if (field instanceof byte[]) { return new String((byte[]) field); @@ -288,7 +199,7 @@ private static Object resolveObject2Maxcompute(Object field, TypeInfo typeInfo) case ARRAY: ArrayList origArray = new ArrayList<>(); Arrays.stream((Object[]) field).iterator().forEachRemaining(origArray::add); - switch (((SimpleArrayTypeInfo) typeInfo).getElementTypeInfo().getOdpsType()) { + switch (((ArrayTypeInfo) typeInfo).getElementTypeInfo().getOdpsType()) { case STRING: case BOOLEAN: case INT: @@ -305,8 +216,8 @@ private static Object resolveObject2Maxcompute(Object field, TypeInfo typeInfo) } case MAP: HashMap dataMap = new HashMap<>(); - TypeInfo keyTypeInfo = ((SimpleMapTypeInfo) typeInfo).getKeyTypeInfo(); - TypeInfo valueTypeInfo = ((SimpleMapTypeInfo) typeInfo).getValueTypeInfo(); + TypeInfo keyTypeInfo = ((MapTypeInfo) typeInfo).getKeyTypeInfo(); + TypeInfo valueTypeInfo = ((MapTypeInfo) typeInfo).getValueTypeInfo(); HashMap origDataMap = (HashMap) field; origDataMap.forEach( (key, value) -> @@ -316,7 +227,7 @@ private static Object resolveObject2Maxcompute(Object field, TypeInfo typeInfo) return origDataMap; case STRUCT: Object[] fields = ((SeaTunnelRow) field).getFields(); - List typeInfos = ((SimpleStructTypeInfo) typeInfo).getFieldTypeInfos(); + List typeInfos = ((StructTypeInfo) typeInfo).getFieldTypeInfos(); ArrayList origStruct = new ArrayList<>(); for (int i = 0; i < fields.length; i++) { origStruct.add(resolveObject2Maxcompute(fields[i], typeInfos.get(i))); diff --git a/seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeDataTypeConvertorTest.java b/seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeDataTypeConvertorTest.java new file mode 100644 index 000000000000..0af30301ca15 --- /dev/null +++ b/seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeDataTypeConvertorTest.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.maxcompute.catalog; + +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.MapType; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import com.aliyun.odps.OdpsType; +import com.aliyun.odps.type.MapTypeInfo; +import com.aliyun.odps.type.TypeInfoFactory; +import com.aliyun.odps.type.VarcharTypeInfo; + +public class MaxComputeDataTypeConvertorTest { + + private final MaxComputeDataTypeConvertor maxComputeDataTypeConvertor = + new MaxComputeDataTypeConvertor(); + + @Test + public void testTypeInfoStrToSeaTunnelType() { + String typeInfoStr = "MAP"; + SeaTunnelDataType seaTunnelType = + maxComputeDataTypeConvertor.toSeaTunnelType(typeInfoStr); + Assertions.assertEquals(BasicType.STRING_TYPE, ((MapType) seaTunnelType).getKeyType()); + Assertions.assertEquals(BasicType.STRING_TYPE, ((MapType) seaTunnelType).getKeyType()); + } + + @Test + public void testTypeInfoToSeaTunnelType() { + MapTypeInfo simpleMapTypeInfo = + TypeInfoFactory.getMapTypeInfo(new VarcharTypeInfo(10), new VarcharTypeInfo(10)); + MapType seaTunnelMapType = + (MapType) maxComputeDataTypeConvertor.toSeaTunnelType(simpleMapTypeInfo, null); + Assertions.assertEquals(BasicType.STRING_TYPE, seaTunnelMapType.getKeyType()); + Assertions.assertEquals(BasicType.STRING_TYPE, seaTunnelMapType.getValueType()); + } + + @Test + public void testSeaTunnelTypeToTypeInfo() { + MapType mapType = new MapType<>(BasicType.STRING_TYPE, BasicType.STRING_TYPE); + MapTypeInfo mapTypeInfo = + (MapTypeInfo) maxComputeDataTypeConvertor.toConnectorType(mapType, null); + Assertions.assertEquals(OdpsType.STRING, mapTypeInfo.getKeyTypeInfo().getOdpsType()); + Assertions.assertEquals(OdpsType.STRING, mapTypeInfo.getValueTypeInfo().getOdpsType()); + } + + @Test + public void getIdentity() { + Assertions.assertEquals( + MaxcomputeConfig.PLUGIN_NAME, maxComputeDataTypeConvertor.getIdentity()); + } +} diff --git a/seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceTest.java b/seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceTest.java new file mode 100644 index 000000000000..6100081c8195 --- /dev/null +++ b/seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceTest.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.maxcompute.source; + +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.api.table.type.SqlType; +import org.apache.seatunnel.shade.com.typesafe.config.Config; +import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory; +import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class MaxcomputeSourceTest { + + @Test + public void prepare() { + Config fields = ConfigFactory.empty() + .withValue("id", ConfigValueFactory.fromAnyRef("int")) + .withValue("name", ConfigValueFactory.fromAnyRef("string")) + .withValue("age", ConfigValueFactory.fromAnyRef("int")); + + Config schema = fields.atKey("fields").atKey("schema"); + + MaxcomputeSource maxcomputeSource = new MaxcomputeSource(); + Assertions.assertDoesNotThrow(() -> maxcomputeSource.prepare(schema)); + + SeaTunnelRowType seaTunnelRowType = maxcomputeSource.getProducedType(); + Assertions.assertEquals(SqlType.INT, seaTunnelRowType.getFieldType(0).getSqlType()); + } +}