From e0db14a2ef6c7f09b46fb3b4da930ec620084265 Mon Sep 17 00:00:00 2001 From: Evey Nguyen Date: Sun, 12 Feb 2023 16:18:13 +0800 Subject: [PATCH 1/5] Add support byte type --- .../legacy/hudi/util/RowToAvroConverters.java | 15 ++++++++++++++- .../connector/legacy/hudi/util/SchemaUtils.java | 2 ++ .../connector/legacy/hudi/Fake2HudiITCase.java | 2 +- .../core/typeutils/TypeInfoColumnBridge.java | 2 ++ 4 files changed, 19 insertions(+), 2 deletions(-) diff --git a/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-hudi/src/main/java/com/bytedance/bitsail/connector/legacy/hudi/util/RowToAvroConverters.java b/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-hudi/src/main/java/com/bytedance/bitsail/connector/legacy/hudi/util/RowToAvroConverters.java index 33b584e19..2ca54d5e1 100644 --- a/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-hudi/src/main/java/com/bytedance/bitsail/connector/legacy/hudi/util/RowToAvroConverters.java +++ b/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-hudi/src/main/java/com/bytedance/bitsail/connector/legacy/hudi/util/RowToAvroConverters.java @@ -30,6 +30,7 @@ import org.apache.flink.api.java.typeutils.MapTypeInfo; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.types.Row; +import org.apache.hadoop.hbase.util.Bytes; import java.io.Serializable; import java.math.BigDecimal; @@ -73,7 +74,19 @@ public Object convert(Schema schema, Object object) { return ((Short) object).intValue(); } }; - } else if (BasicTypeInfo.BOOLEAN_TYPE_INFO.getTypeClass().equals(typeClass) || BasicTypeInfo.INT_TYPE_INFO.getTypeClass().equals(typeClass) || + } + else if (BasicTypeInfo.BYTE_TYPE_INFO.getTypeClass().equals(typeClass)) { + converter = + new RowToAvroConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Schema schema, Object object) { + return Integer.valueOf(Long.toString((Long) object)); + } + }; + } + else if (BasicTypeInfo.BOOLEAN_TYPE_INFO.getTypeClass().equals(typeClass) || BasicTypeInfo.INT_TYPE_INFO.getTypeClass().equals(typeClass) || BasicTypeInfo.LONG_TYPE_INFO.getTypeClass().equals(typeClass) || BasicTypeInfo.FLOAT_TYPE_INFO.getTypeClass().equals(typeClass) || BasicTypeInfo.DOUBLE_TYPE_INFO.getTypeClass().equals(typeClass) || SqlTimeTypeInfo.TIME.getTypeClass().equals(typeClass) || SqlTimeTypeInfo.DATE.getTypeClass().equals(typeClass) || SqlTimeTypeInfo.TIMESTAMP.getTypeClass().equals(typeClass)) { diff --git a/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-hudi/src/main/java/com/bytedance/bitsail/connector/legacy/hudi/util/SchemaUtils.java b/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-hudi/src/main/java/com/bytedance/bitsail/connector/legacy/hudi/util/SchemaUtils.java index fd1b4b474..9052bbfb9 100644 --- a/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-hudi/src/main/java/com/bytedance/bitsail/connector/legacy/hudi/util/SchemaUtils.java +++ b/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-hudi/src/main/java/com/bytedance/bitsail/connector/legacy/hudi/util/SchemaUtils.java @@ -105,6 +105,8 @@ private static DataType baseType2DataType(String type) { case "tinyint": case "int": return DataTypes.INT(); + case "byte": + return DataTypes.TINYINT(); case "long": case "bigint": return DataTypes.BIGINT(); diff --git a/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-hudi/src/test/java/com/bytedance/bitsail/connector/legacy/hudi/Fake2HudiITCase.java b/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-hudi/src/test/java/com/bytedance/bitsail/connector/legacy/hudi/Fake2HudiITCase.java index 67023da8a..e36ccc59d 100644 --- a/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-hudi/src/test/java/com/bytedance/bitsail/connector/legacy/hudi/Fake2HudiITCase.java +++ b/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-hudi/src/test/java/com/bytedance/bitsail/connector/legacy/hudi/Fake2HudiITCase.java @@ -39,7 +39,7 @@ public class Fake2HudiITCase { private static final Logger LOG = LoggerFactory.getLogger(Fake2HudiITCase.class); private static final String TEST_SCHEMA = - "[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"text\",\"type\":\"string\"},{\"name\":\"timestamp\",\"type\":\"string\"}]"; + "[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"text\",\"type\":\"string\"},{\"name\":\"age\",\"type\":\"byte\"},{\"name\":\"timestamp\",\"type\":\"string\"}]"; private static final String WRITER_PREFIX = "job.writer."; @Rule diff --git a/bitsail-cores/bitsail-core-flink-base/src/main/java/com/bytedance/bitsail/flink/core/typeutils/TypeInfoColumnBridge.java b/bitsail-cores/bitsail-core-flink-base/src/main/java/com/bytedance/bitsail/flink/core/typeutils/TypeInfoColumnBridge.java index 7f44f447f..fb8542023 100644 --- a/bitsail-cores/bitsail-core-flink-base/src/main/java/com/bytedance/bitsail/flink/core/typeutils/TypeInfoColumnBridge.java +++ b/bitsail-cores/bitsail-core-flink-base/src/main/java/com/bytedance/bitsail/flink/core/typeutils/TypeInfoColumnBridge.java @@ -47,6 +47,8 @@ public class TypeInfoColumnBridge { PrimitiveColumnTypeInfo.LONG_COLUMN_TYPE_INFO); COLUMN_BRIDGE_TYPE_INFO_MAPPING.put(TypeInfos.LONG_TYPE_INFO, PrimitiveColumnTypeInfo.LONG_COLUMN_TYPE_INFO); + COLUMN_BRIDGE_TYPE_INFO_MAPPING.put(TypeInfos.BYTE_TYPE_INFO, + PrimitiveColumnTypeInfo.LONG_COLUMN_TYPE_INFO); COLUMN_BRIDGE_TYPE_INFO_MAPPING.put(TypeInfos.BIG_INTEGER_TYPE_INFO, PrimitiveColumnTypeInfo.LONG_COLUMN_TYPE_INFO); From f6435e431c03612a134d602a6aefd8c9297ce309 Mon Sep 17 00:00:00 2001 From: Evey Nguyen Date: Sun, 12 Feb 2023 23:34:56 +0800 Subject: [PATCH 2/5] remove rebundant import --- .../bitsail/connector/legacy/hudi/util/RowToAvroConverters.java | 1 - 1 file changed, 1 deletion(-) diff --git a/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-hudi/src/main/java/com/bytedance/bitsail/connector/legacy/hudi/util/RowToAvroConverters.java b/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-hudi/src/main/java/com/bytedance/bitsail/connector/legacy/hudi/util/RowToAvroConverters.java index 2ca54d5e1..68663a265 100644 --- a/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-hudi/src/main/java/com/bytedance/bitsail/connector/legacy/hudi/util/RowToAvroConverters.java +++ b/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-hudi/src/main/java/com/bytedance/bitsail/connector/legacy/hudi/util/RowToAvroConverters.java @@ -30,7 +30,6 @@ import org.apache.flink.api.java.typeutils.MapTypeInfo; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.types.Row; -import org.apache.hadoop.hbase.util.Bytes; import java.io.Serializable; import java.math.BigDecimal; From fc9d1fc740bb848820dc90c34185bca48c7434be Mon Sep 17 00:00:00 2001 From: Evey Nguyen Date: Mon, 27 Feb 2023 23:50:02 +0800 Subject: [PATCH 3/5] add test --- .../legacy/fake/source/FakeSourceTest.java | 19 ++++++++++++++ .../legacy/hudi/util/RowToAvroConverters.java | 25 ++++++++----------- 2 files changed, 30 insertions(+), 14 deletions(-) diff --git a/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-fake/src/test/java/com/bytedance/bitsail/conector/legacy/fake/source/FakeSourceTest.java b/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-fake/src/test/java/com/bytedance/bitsail/conector/legacy/fake/source/FakeSourceTest.java index 2bcf6337d..c75e439c5 100644 --- a/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-fake/src/test/java/com/bytedance/bitsail/conector/legacy/fake/source/FakeSourceTest.java +++ b/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-fake/src/test/java/com/bytedance/bitsail/conector/legacy/fake/source/FakeSourceTest.java @@ -16,12 +16,21 @@ package com.bytedance.bitsail.conector.legacy.fake.source; +import com.bytedance.bitsail.common.model.ColumnInfo; +import com.bytedance.bitsail.common.type.BitSailTypeInfoConverter; +import com.bytedance.bitsail.common.util.Preconditions; import com.bytedance.bitsail.connector.legacy.fake.source.FakeSource; +import com.bytedance.bitsail.flink.core.typeinfo.PrimitiveColumnTypeInfo; +import com.bytedance.bitsail.flink.core.typeutils.ColumnFlinkTypeInfoUtil; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.junit.Assert; import org.junit.Test; +import java.util.ArrayList; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; @@ -61,4 +70,14 @@ public void testConstructRandomValueWithUniqueCheck() { Assert.assertEquals(5678, actualValue); Assert.assertTrue(existValues.contains("5678")); } + + @Test + public void testSupportByteType() { + List columnInfos = new ArrayList<>(); + ColumnInfo age = new ColumnInfo("age", "byte"); + columnInfos.add(age); +// FakeSource fs = new FakeSource() + RowTypeInfo rowTypeInfo = ColumnFlinkTypeInfoUtil.getRowTypeInformation(new BitSailTypeInfoConverter(), columnInfos); + Assert.assertTrue(rowTypeInfo.getFieldTypes()[0] instanceof PrimitiveColumnTypeInfo); + } } diff --git a/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-hudi/src/main/java/com/bytedance/bitsail/connector/legacy/hudi/util/RowToAvroConverters.java b/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-hudi/src/main/java/com/bytedance/bitsail/connector/legacy/hudi/util/RowToAvroConverters.java index 68663a265..ae29d1a15 100644 --- a/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-hudi/src/main/java/com/bytedance/bitsail/connector/legacy/hudi/util/RowToAvroConverters.java +++ b/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-hudi/src/main/java/com/bytedance/bitsail/connector/legacy/hudi/util/RowToAvroConverters.java @@ -73,19 +73,7 @@ public Object convert(Schema schema, Object object) { return ((Short) object).intValue(); } }; - } - else if (BasicTypeInfo.BYTE_TYPE_INFO.getTypeClass().equals(typeClass)) { - converter = - new RowToAvroConverter() { - private static final long serialVersionUID = 1L; - - @Override - public Object convert(Schema schema, Object object) { - return Integer.valueOf(Long.toString((Long) object)); - } - }; - } - else if (BasicTypeInfo.BOOLEAN_TYPE_INFO.getTypeClass().equals(typeClass) || BasicTypeInfo.INT_TYPE_INFO.getTypeClass().equals(typeClass) || + } else if (BasicTypeInfo.BOOLEAN_TYPE_INFO.getTypeClass().equals(typeClass) || BasicTypeInfo.INT_TYPE_INFO.getTypeClass().equals(typeClass) || BasicTypeInfo.LONG_TYPE_INFO.getTypeClass().equals(typeClass) || BasicTypeInfo.FLOAT_TYPE_INFO.getTypeClass().equals(typeClass) || BasicTypeInfo.DOUBLE_TYPE_INFO.getTypeClass().equals(typeClass) || SqlTimeTypeInfo.TIME.getTypeClass().equals(typeClass) || SqlTimeTypeInfo.DATE.getTypeClass().equals(typeClass) || SqlTimeTypeInfo.TIMESTAMP.getTypeClass().equals(typeClass)) { @@ -98,6 +86,16 @@ public Object convert(Schema schema, Object object) { return object; } }; + } else if (BasicTypeInfo.BYTE_TYPE_INFO.getTypeClass().equals(typeClass)) { + converter = + new RowToAvroConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Schema schema, Object object) { + return Integer.valueOf(Long.toString((Long) object)); + } + }; } else if (BasicTypeInfo.BIG_INT_TYPE_INFO.getTypeClass().equals(typeClass)) { converter = new RowToAvroConverter() { @@ -267,4 +265,3 @@ public interface RowToAvroConverter extends Serializable { Object convert(Schema schema, Object object); } } - From c30f57dbc859cabe89f397d433cab456628baeab Mon Sep 17 00:00:00 2001 From: Evey Nguyen Date: Mon, 27 Feb 2023 23:55:42 +0800 Subject: [PATCH 4/5] remove test byte for hudi connector --- .../bitsail/conector/legacy/fake/source/FakeSourceTest.java | 1 - .../bitsail/connector/legacy/hudi/Fake2HudiITCase.java | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-fake/src/test/java/com/bytedance/bitsail/conector/legacy/fake/source/FakeSourceTest.java b/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-fake/src/test/java/com/bytedance/bitsail/conector/legacy/fake/source/FakeSourceTest.java index c75e439c5..6f4f7ea1d 100644 --- a/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-fake/src/test/java/com/bytedance/bitsail/conector/legacy/fake/source/FakeSourceTest.java +++ b/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-fake/src/test/java/com/bytedance/bitsail/conector/legacy/fake/source/FakeSourceTest.java @@ -76,7 +76,6 @@ public void testSupportByteType() { List columnInfos = new ArrayList<>(); ColumnInfo age = new ColumnInfo("age", "byte"); columnInfos.add(age); -// FakeSource fs = new FakeSource() RowTypeInfo rowTypeInfo = ColumnFlinkTypeInfoUtil.getRowTypeInformation(new BitSailTypeInfoConverter(), columnInfos); Assert.assertTrue(rowTypeInfo.getFieldTypes()[0] instanceof PrimitiveColumnTypeInfo); } diff --git a/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-hudi/src/test/java/com/bytedance/bitsail/connector/legacy/hudi/Fake2HudiITCase.java b/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-hudi/src/test/java/com/bytedance/bitsail/connector/legacy/hudi/Fake2HudiITCase.java index e36ccc59d..79ef0a7c6 100644 --- a/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-hudi/src/test/java/com/bytedance/bitsail/connector/legacy/hudi/Fake2HudiITCase.java +++ b/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-hudi/src/test/java/com/bytedance/bitsail/connector/legacy/hudi/Fake2HudiITCase.java @@ -39,7 +39,7 @@ public class Fake2HudiITCase { private static final Logger LOG = LoggerFactory.getLogger(Fake2HudiITCase.class); private static final String TEST_SCHEMA = - "[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"text\",\"type\":\"string\"},{\"name\":\"age\",\"type\":\"byte\"},{\"name\":\"timestamp\",\"type\":\"string\"}]"; + "[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"text\",\"type\":\"string\"},{\"name\":\"timestamp\",\"type\":\"string\"}]";"[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"text\",\"type\":\"string\"},{\"name\":\"age\",\"type\":\"byte\"},{\"name\":\"timestamp\",\"type\":\"string\"}]"; private static final String WRITER_PREFIX = "job.writer."; @Rule From 3016e8d2f7b7ffebbc3b19dd0edd6ca5e7ca48ab Mon Sep 17 00:00:00 2001 From: Evey Nguyen Date: Tue, 28 Feb 2023 22:12:16 +0800 Subject: [PATCH 5/5] fix checkstyle --- .../bitsail/conector/legacy/fake/source/FakeSourceTest.java | 2 -- .../bitsail/connector/legacy/hudi/Fake2HudiITCase.java | 2 +- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-fake/src/test/java/com/bytedance/bitsail/conector/legacy/fake/source/FakeSourceTest.java b/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-fake/src/test/java/com/bytedance/bitsail/conector/legacy/fake/source/FakeSourceTest.java index 6f4f7ea1d..3e9c852d9 100644 --- a/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-fake/src/test/java/com/bytedance/bitsail/conector/legacy/fake/source/FakeSourceTest.java +++ b/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-fake/src/test/java/com/bytedance/bitsail/conector/legacy/fake/source/FakeSourceTest.java @@ -18,12 +18,10 @@ import com.bytedance.bitsail.common.model.ColumnInfo; import com.bytedance.bitsail.common.type.BitSailTypeInfoConverter; -import com.bytedance.bitsail.common.util.Preconditions; import com.bytedance.bitsail.connector.legacy.fake.source.FakeSource; import com.bytedance.bitsail.flink.core.typeinfo.PrimitiveColumnTypeInfo; import com.bytedance.bitsail.flink.core.typeutils.ColumnFlinkTypeInfoUtil; -import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.junit.Assert; import org.junit.Test; diff --git a/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-hudi/src/test/java/com/bytedance/bitsail/connector/legacy/hudi/Fake2HudiITCase.java b/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-hudi/src/test/java/com/bytedance/bitsail/connector/legacy/hudi/Fake2HudiITCase.java index 79ef0a7c6..67023da8a 100644 --- a/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-hudi/src/test/java/com/bytedance/bitsail/connector/legacy/hudi/Fake2HudiITCase.java +++ b/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-hudi/src/test/java/com/bytedance/bitsail/connector/legacy/hudi/Fake2HudiITCase.java @@ -39,7 +39,7 @@ public class Fake2HudiITCase { private static final Logger LOG = LoggerFactory.getLogger(Fake2HudiITCase.class); private static final String TEST_SCHEMA = - "[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"text\",\"type\":\"string\"},{\"name\":\"timestamp\",\"type\":\"string\"}]";"[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"text\",\"type\":\"string\"},{\"name\":\"age\",\"type\":\"byte\"},{\"name\":\"timestamp\",\"type\":\"string\"}]"; + "[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"text\",\"type\":\"string\"},{\"name\":\"timestamp\",\"type\":\"string\"}]"; private static final String WRITER_PREFIX = "job.writer."; @Rule