Skip to content

Commit

Permalink
[Improve][SQL-Transform] Remove escape identifier from output fields (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
hailin0 authored Aug 3, 2024
1 parent 34a6b8e commit 82f5d8c
Show file tree
Hide file tree
Showing 5 changed files with 191 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ transform {
Sql {
source_table_name = "fake"
result_table_name = "fake1"
query = "select cast(id as STRING) as id, cast(id as INT) as id2, cast(id as DOUBLE) as id3 , cast(c1 as double) as c1_1, cast(c1 as DECIMAL(10,2)) as c1_2, cast(c2 as DATE) as c2_1, coalesce(c3,'Unknown') c3_1, ifnull(c3,'Unknown') c3_2, ifnull(nullif(name,'Joy Ding'),'NULL') name1, nullif(name,'Joy Ding_') name2, cast(c4 as timestamp) as c4_1, cast(c4 as decimal(17,4)) as c4_2, cast(c5 as date) as c5, cast(c6 as time) as c6, cast(name as bytes) as c7 from fake"
query = "select cast(id as STRING) as id, cast(id as INT) as id2, cast(id as DOUBLE) as id3 , cast(c1 as double) as c1_1, cast(c1 as DECIMAL(10,2)) as c1_2, cast(c2 as DATE) as c2_1, coalesce(c3,'Unknown') c3_1, ifnull(c3,'Unknown') c3_2, ifnull(nullif(name,'Joy Ding'),'NULL') name1, nullif(name,'Joy Ding_') name2, cast(c4 as timestamp) as c4_1, cast(c4 as decimal(17,4)) as c4_2, cast(c5 as date) as c5, cast(c6 as time) as c6, cast(name as bytes) as c7, name as `apply` from fake"
}
}

Expand Down Expand Up @@ -164,6 +164,13 @@ sink {
rule_type = NOT_NULL
}
]
},
{
field_name = "apply"
field_type = "string"
field_value = [
{equals_to = "Joy Ding"}
]
}
]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@

public class ZetaSQLEngine implements SQLEngine {
private static final Logger log = LoggerFactory.getLogger(ZetaSQLEngine.class);
public static final String ESCAPE_IDENTIFIER = "`";

private String inputTableName;
@Nullable private String catalogTableName;
private SeaTunnelRowType inputRowType;
Expand Down Expand Up @@ -193,9 +195,13 @@ public SeaTunnelRowType typeMapping(List<String> inputColumnsMapping) {
} else if (selectItem instanceof SelectExpressionItem) {
SelectExpressionItem expressionItem = (SelectExpressionItem) selectItem;
Expression expression = expressionItem.getExpression();

if (expressionItem.getAlias() != null) {
fieldNames[idx] = expressionItem.getAlias().getName();
String aliasName = expressionItem.getAlias().getName();
if (aliasName.startsWith(ESCAPE_IDENTIFIER)
&& aliasName.endsWith(ESCAPE_IDENTIFIER)) {
aliasName = aliasName.substring(1, aliasName.length() - 1);
}
fieldNames[idx] = aliasName;
} else {
if (expression instanceof Column) {
fieldNames[idx] = ((Column) expression).getColumnName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,13 @@ public Object computeForValue(Expression expression, Object[] inputFields) {
Column columnExp = (Column) expression;
String columnName = columnExp.getColumnName();
int index = inputRowType.indexOf(columnName, false);
if (index == -1
&& columnName.startsWith(ZetaSQLEngine.ESCAPE_IDENTIFIER)
&& columnName.endsWith(ZetaSQLEngine.ESCAPE_IDENTIFIER)) {
columnName = columnName.substring(1, columnName.length() - 1);
index = inputRowType.indexOf(columnName, false);
}

if (index != -1) {
return inputFields[index];
} else {
Expand All @@ -237,11 +244,26 @@ public Object computeForValue(Expression expression, Object[] inputFields) {
SeaTunnelRow parRowValues = new SeaTunnelRow(inputFields);
Object res = parRowValues;
for (int i = 0; i < deep; i++) {
String key = columnNames[i];
if (parDataType instanceof MapType) {
return ((Map) res).get(columnNames[i]);
Map<String, Object> mapValue = ((Map) res);
if (mapValue.containsKey(key)) {
return mapValue.get(key);
} else if (key.startsWith(ZetaSQLEngine.ESCAPE_IDENTIFIER)
&& key.endsWith(ZetaSQLEngine.ESCAPE_IDENTIFIER)) {
key = key.substring(1, key.length() - 1);
return mapValue.get(key);
}
return null;
}
parRowValues = (SeaTunnelRow) res;
int idx = ((SeaTunnelRowType) parDataType).indexOf(columnNames[i], false);
int idx = ((SeaTunnelRowType) parDataType).indexOf(key, false);
if (idx == -1
&& key.startsWith(ZetaSQLEngine.ESCAPE_IDENTIFIER)
&& key.endsWith(ZetaSQLEngine.ESCAPE_IDENTIFIER)) {
key = key.substring(1, key.length() - 1);
idx = ((SeaTunnelRowType) parDataType).indexOf(key, false);
}
if (idx == -1) {
throw new IllegalArgumentException(
String.format("can't find field [%s]", fullyQualifiedName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,13 @@ public SeaTunnelDataType<?> getExpressionType(Expression expression) {
Column columnExp = (Column) expression;
String columnName = columnExp.getColumnName();
int index = inputRowType.indexOf(columnName, false);
if (index == -1
&& columnName.startsWith(ZetaSQLEngine.ESCAPE_IDENTIFIER)
&& columnName.endsWith(ZetaSQLEngine.ESCAPE_IDENTIFIER)) {
columnName = columnName.substring(1, columnName.length() - 1);
index = inputRowType.indexOf(columnName, false);
}

if (index != -1) {
return inputRowType.getFieldType(index);
} else {
Expand All @@ -121,7 +128,14 @@ public SeaTunnelDataType<?> getExpressionType(Expression expression) {
SeaTunnelRowType parRowType = inputRowType;
SeaTunnelDataType<?> filedTypeRes = null;
for (int i = 0; i < deep; i++) {
int idx = parRowType.indexOf(columnNames[i], false);
String key = columnNames[i];
int idx = parRowType.indexOf(key, false);
if (idx == -1
&& key.startsWith(ZetaSQLEngine.ESCAPE_IDENTIFIER)
&& key.endsWith(ZetaSQLEngine.ESCAPE_IDENTIFIER)) {
key = key.substring(1, key.length() - 1);
idx = parRowType.indexOf(key, false);
}
if (idx == -1) {
throw new IllegalArgumentException(
String.format("can't find field [%s]", fullyQualifiedName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,22 @@

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.LocalTimeType;
import org.apache.seatunnel.api.table.type.MapType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Objects;

Expand Down Expand Up @@ -144,4 +148,136 @@ private CatalogTable getCatalogTable() {
new ArrayList<>(),
"It has column information.");
}

@Test
public void testEscapeIdentifier() {
String tableName = "test";
String[] fields = new String[] {"id", "apply"};
CatalogTable table =
CatalogTableUtil.getCatalogTable(
tableName,
new SeaTunnelRowType(
fields,
new SeaTunnelDataType[] {
BasicType.INT_TYPE, BasicType.STRING_TYPE
}));
ReadonlyConfig config =
ReadonlyConfig.fromMap(
Collections.singletonMap(
"query",
"select id, trim(`apply`) as `apply` from test where `apply` = 'a'"));
SQLTransform sqlTransform = new SQLTransform(config, table);
TableSchema tableSchema = sqlTransform.transformTableSchema();
SeaTunnelRow result =
sqlTransform.transformRow(
new SeaTunnelRow(new Object[] {Integer.valueOf(1), String.valueOf("a")}));
Assertions.assertEquals("apply", tableSchema.getFieldNames()[1]);
Assertions.assertEquals("a", result.getField(1));
result =
sqlTransform.transformRow(
new SeaTunnelRow(new Object[] {Integer.valueOf(1), String.valueOf("b")}));
Assertions.assertNull(result);

config =
ReadonlyConfig.fromMap(
Collections.singletonMap(
"query",
"select id, IFNULL(`apply`, '1') as `apply` from test where `apply` = 'a'"));
sqlTransform = new SQLTransform(config, table);
tableSchema = sqlTransform.transformTableSchema();
result =
sqlTransform.transformRow(
new SeaTunnelRow(new Object[] {Integer.valueOf(1), String.valueOf("a")}));
Assertions.assertEquals("apply", tableSchema.getFieldNames()[1]);
Assertions.assertEquals(
BasicType.STRING_TYPE, tableSchema.getColumns().get(1).getDataType());
Assertions.assertEquals("a", result.getField(1));

table =
CatalogTableUtil.getCatalogTable(
tableName,
new SeaTunnelRowType(
fields,
new SeaTunnelDataType[] {BasicType.INT_TYPE, BasicType.LONG_TYPE}));
config =
ReadonlyConfig.fromMap(
Collections.singletonMap(
"query",
"select id, `apply` + 1 as `apply` from test where `apply` > 0"));
sqlTransform = new SQLTransform(config, table);
tableSchema = sqlTransform.transformTableSchema();
result =
sqlTransform.transformRow(
new SeaTunnelRow(new Object[] {Integer.valueOf(1), Long.valueOf(1)}));
Assertions.assertEquals("apply", tableSchema.getFieldNames()[1]);
Assertions.assertEquals(BasicType.LONG_TYPE, tableSchema.getColumns().get(1).getDataType());
Assertions.assertEquals(Long.valueOf(2), result.getField(1));
result =
sqlTransform.transformRow(
new SeaTunnelRow(new Object[] {Integer.valueOf(1), Long.valueOf(0)}));
Assertions.assertNull(result);

table =
CatalogTableUtil.getCatalogTable(
tableName,
new SeaTunnelRowType(
fields,
new SeaTunnelDataType[] {
BasicType.INT_TYPE,
new MapType<String, String>(
BasicType.STRING_TYPE, BasicType.STRING_TYPE)
}));
config =
ReadonlyConfig.fromMap(
Collections.singletonMap(
"query",
"select id, `apply`.k1 as `apply` from test where `apply`.k1 = 'a'"));
sqlTransform = new SQLTransform(config, table);
tableSchema = sqlTransform.transformTableSchema();
result =
sqlTransform.transformRow(
new SeaTunnelRow(
new Object[] {
Integer.valueOf(1), Collections.singletonMap("k1", "a")
}));
Assertions.assertEquals("apply", tableSchema.getFieldNames()[1]);
Assertions.assertEquals(
BasicType.STRING_TYPE, tableSchema.getColumns().get(1).getDataType());
Assertions.assertEquals("a", result.getField(1));
result =
sqlTransform.transformRow(
new SeaTunnelRow(
new Object[] {
Integer.valueOf(1), Collections.singletonMap("k1", "b")
}));
Assertions.assertNull(result);

table =
CatalogTableUtil.getCatalogTable(
tableName,
new SeaTunnelRowType(
new String[] {"id", "map"},
new SeaTunnelDataType[] {
BasicType.INT_TYPE,
new MapType<String, String>(
BasicType.STRING_TYPE, BasicType.STRING_TYPE)
}));
config =
ReadonlyConfig.fromMap(
Collections.singletonMap(
"query",
"select id, map.`apply` as `apply` from test where map.`apply` = 'a'"));
sqlTransform = new SQLTransform(config, table);
tableSchema = sqlTransform.transformTableSchema();
result =
sqlTransform.transformRow(
new SeaTunnelRow(
new Object[] {
Integer.valueOf(1), Collections.singletonMap("apply", "a")
}));
Assertions.assertEquals("apply", tableSchema.getFieldNames()[1]);
Assertions.assertEquals(
BasicType.STRING_TYPE, tableSchema.getColumns().get(1).getDataType());
Assertions.assertEquals("a", result.getField(1));
}
}

0 comments on commit 82f5d8c

Please sign in to comment.