diff --git a/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-hadoop/pom.xml b/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-hadoop/pom.xml index 3a89a1e11..a7752388a 100644 --- a/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-hadoop/pom.xml +++ b/bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-hadoop/pom.xml @@ -26,9 +26,6 @@ bitsail-connector-hadoop - - - com.bytedance.bitsail diff --git a/bitsail-connectors/connector-hadoop/pom.xml b/bitsail-connectors/connector-hadoop/pom.xml index 1a2f20947..37182aafe 100644 --- a/bitsail-connectors/connector-hadoop/pom.xml +++ b/bitsail-connectors/connector-hadoop/pom.xml @@ -36,87 +36,27 @@ - org.apache.hadoop - hadoop-common - - - log4j - log4j - - - org.codehaus.jackson - jackson-mapper-asl - - - org.codehaus.jackson - jackson-core-asl - - - com.codahale.metrics - metrics-core - - + com.bytedance.bitsail + bitsail-shaded-hadoop + ${revision} provided - org.apache.hadoop - hadoop-mapreduce-client-core + com.bytedance.bitsail + bitsail-shaded-hive + ${revision} + provided - guice-servlet - com.google.inject.extensions - - - guice - com.google.inject - - - jersey-guice - com.sun.jersey.contribs - - - log4j - * - - - log4j - log4j - - - metrics-core - com.codahale.metrics - - - guava - com.google.guava - - - asm - asm - - + netty-common io.netty - netty - - - org.codehaus.jackson - jackson-mapper-asl - - - org.codehaus.jackson - jackson-xc - - - org.codehaus.jackson - jackson-jaxrs - org.codehaus.jackson - jackson-core-asl + netty-buffer + io.netty - provided @@ -145,27 +85,7 @@ com.bytedance.bitsail connector-print ${revision} - provided - - - - com.bytedance.bitsail - bitsail-shaded-hive - ${revision} - - - org.apache.ant - ant - - - log4j - log4j - - - commons-net - commons-net - - + test @@ -173,21 +93,10 @@ bitsail-connector-test ${revision} test - - - - org.apache.hadoop - hadoop-minicluster - ${hadoop.version} - test - guice - com.google.inject - - - guice-servlet - com.google.inject.extensions + hppc + com.carrotsearch diff --git a/bitsail-connectors/connector-hadoop/src/test/java/com/bytedance/bitsail/connector/hadoop/source/HadoopOrcInputFormatITCase.java b/bitsail-connectors/connector-hadoop/src/test/java/com/bytedance/bitsail/connector/hadoop/source/HadoopOrcInputFormatITCase.java deleted file mode 100644 index 3f953983b..000000000 --- a/bitsail-connectors/connector-hadoop/src/test/java/com/bytedance/bitsail/connector/hadoop/source/HadoopOrcInputFormatITCase.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Copyright 2022 Bytedance Ltd. and/or its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.bytedance.bitsail.connector.hadoop.source; - -import com.bytedance.bitsail.common.configuration.BitSailConfiguration; -import com.bytedance.bitsail.connector.hadoop.option.HadoopReaderOptions; -import com.bytedance.bitsail.test.connector.test.EmbeddedFlinkCluster; -import com.bytedance.bitsail.test.connector.test.minicluster.MiniClusterUtil; -import com.bytedance.bitsail.test.connector.test.utils.JobConfUtils; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; - -public class HadoopOrcInputFormatITCase { - private static final Logger LOG = LoggerFactory.getLogger(HadoopOrcInputFormatITCase.class); - private static FileSystem fs; - - @Before - public void setUp() throws IOException, InterruptedException { - MiniClusterUtil.setUp(); - fs = MiniClusterUtil.fileSystem; - } - - @After - public void close() { - MiniClusterUtil.shutdown(); - fs = null; - } - - @Test - public void testHadoopToPrintOrc() throws Exception { - String localJsonFile = "test_namespace/source/test_orc"; - String remoteJsonFile = "/test_namespace/source/test_orc"; - Configuration conf = fs.getConf(); - String defaultFS = conf.get("fs.defaultFS"); - LOG.info("fs.defaultFS: {}", defaultFS); - ClassLoader classLoader = JobConfUtils.class.getClassLoader(); - fs.copyFromLocalFile(new Path(classLoader.getResource(localJsonFile).getPath()), new Path(remoteJsonFile)); - String inputFormat = "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"; - BitSailConfiguration jobConf = JobConfUtils.fromClasspath("hadoop_to_print_hive.json"); - jobConf.set(HadoopReaderOptions.HADOOP_INPUT_FORMAT_CLASS, inputFormat); - jobConf.set(HadoopReaderOptions.PATH_LIST, defaultFS + remoteJsonFile); - EmbeddedFlinkCluster.submitJob(jobConf); - } -} diff --git a/bitsail-connectors/connector-hadoop/src/test/java/com/bytedance/bitsail/connector/hadoop/source/HadoopParquetInputFormatITCase.java b/bitsail-connectors/connector-hadoop/src/test/java/com/bytedance/bitsail/connector/hadoop/source/HadoopParquetInputFormatITCase.java deleted file mode 100644 index 6e69f3900..000000000 --- a/bitsail-connectors/connector-hadoop/src/test/java/com/bytedance/bitsail/connector/hadoop/source/HadoopParquetInputFormatITCase.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Copyright 2022 Bytedance Ltd. and/or its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.bytedance.bitsail.connector.hadoop.source; - -import com.bytedance.bitsail.common.configuration.BitSailConfiguration; -import com.bytedance.bitsail.connector.hadoop.option.HadoopReaderOptions; -import com.bytedance.bitsail.test.connector.test.EmbeddedFlinkCluster; -import com.bytedance.bitsail.test.connector.test.minicluster.MiniClusterUtil; -import com.bytedance.bitsail.test.connector.test.utils.JobConfUtils; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; - -public class HadoopParquetInputFormatITCase { - private static final Logger LOG = LoggerFactory.getLogger(HadoopParquetInputFormatITCase.class); - - private static FileSystem fs; - - @Before - public void setUp() throws IOException, InterruptedException { - MiniClusterUtil.setUp(); - fs = MiniClusterUtil.fileSystem; - } - - @After - public void close() { - MiniClusterUtil.shutdown(); - fs = null; - } - - @Test - public void testHadoopToPrintParquet() throws Exception { - String localJsonFile = "test_namespace/source/test_parquet"; - String remoteJsonFile = "/test_namespace/source/test_parquet"; - Configuration conf = fs.getConf(); - String defaultFS = conf.get("fs.defaultFS"); - LOG.info("fs.defaultFS: {}", defaultFS); - ClassLoader classLoader = JobConfUtils.class.getClassLoader(); - fs.copyFromLocalFile(new Path(classLoader.getResource(localJsonFile).getPath()), new Path(remoteJsonFile)); - String inputFormat = "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"; - BitSailConfiguration jobConf = JobConfUtils.fromClasspath("hadoop_to_print_hive.json"); - jobConf.set(HadoopReaderOptions.HADOOP_INPUT_FORMAT_CLASS, inputFormat); - jobConf.set(HadoopReaderOptions.PATH_LIST, defaultFS + remoteJsonFile); - EmbeddedFlinkCluster.submitJob(jobConf); - } -} diff --git a/bitsail-connectors/connector-hadoop/src/test/java/com/bytedance/bitsail/connector/hadoop/source/HadoopSourceITCase.java b/bitsail-connectors/connector-hadoop/src/test/java/com/bytedance/bitsail/connector/hadoop/source/HadoopSourceITCase.java new file mode 100644 index 000000000..3e0c4c0bf --- /dev/null +++ b/bitsail-connectors/connector-hadoop/src/test/java/com/bytedance/bitsail/connector/hadoop/source/HadoopSourceITCase.java @@ -0,0 +1,116 @@ +/* + * Copyright 2022 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.hadoop.source; + +import com.bytedance.bitsail.common.configuration.BitSailConfiguration; +import com.bytedance.bitsail.connector.hadoop.option.HadoopReaderOptions; +import com.bytedance.bitsail.test.connector.test.EmbeddedFlinkCluster; +import com.bytedance.bitsail.test.connector.test.utils.JobConfUtils; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; + +public class HadoopSourceITCase { + private static final Logger LOG = LoggerFactory.getLogger(HadoopSourceITCase.class); + + @ClassRule + public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder(new File("/tmp")); + + private static FileSystem FILESYSTEM; + File folder; + + @Before + public void setUp() throws IOException { + FILESYSTEM = LocalFileSystem.getLocal(new Configuration()); + folder = TEMP_FOLDER.newFolder(); + } + + @After + public void close() { + FILESYSTEM = null; + folder = null; + } + + @Test + public void testHadoopToPrintJson() throws Exception { + Path source = Paths.get(HadoopSourceITCase.class.getClassLoader() + .getResource("source/test.json") + .toURI() + .getPath()); + + Path target = Paths.get(folder.getAbsolutePath(), source.getFileName().toString()); + Files.copy(source, target); + Configuration conf = FILESYSTEM.getConf(); + String defaultFS = conf.get("fs.defaultFS"); + LOG.info("fs.defaultFS: {}", defaultFS); + BitSailConfiguration jobConf = JobConfUtils.fromClasspath("hadoop_to_print_text.json"); + jobConf.set(HadoopReaderOptions.PATH_LIST, defaultFS + target); + EmbeddedFlinkCluster.submitJob(jobConf); + } + + @Test + public void testHadoopToPrintParquet() throws Exception { + Path source = Paths.get(HadoopSourceITCase.class.getClassLoader() + .getResource("source/test_parquet") + .toURI() + .getPath()); + + Path target = Paths.get(folder.getAbsolutePath(), source.getFileName().toString()); + Files.copy(source, target); + Configuration conf = FILESYSTEM.getConf(); + String defaultFS = conf.get("fs.defaultFS"); + LOG.info("fs.defaultFS: {}", defaultFS); + String inputFormat = "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"; + BitSailConfiguration jobConf = JobConfUtils.fromClasspath("hadoop_to_print_hive.json"); + jobConf.set(HadoopReaderOptions.HADOOP_INPUT_FORMAT_CLASS, inputFormat); + jobConf.set(HadoopReaderOptions.PATH_LIST, defaultFS + target); + EmbeddedFlinkCluster.submitJob(jobConf); + } + + @Test + public void testHadoopToPrintOrc() throws Exception { + Path source = Paths.get(HadoopSourceITCase.class.getClassLoader() + .getResource("source/test_orc") + .toURI() + .getPath()); + + Path target = Paths.get(folder.getAbsolutePath(), source.getFileName().toString()); + Files.copy(source, target); + Configuration conf = FILESYSTEM.getConf(); + String defaultFS = conf.get("fs.defaultFS"); + LOG.info("fs.defaultFS: {}", defaultFS); + String inputFormat = "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"; + BitSailConfiguration jobConf = JobConfUtils.fromClasspath("hadoop_to_print_hive.json"); + jobConf.set(HadoopReaderOptions.HADOOP_INPUT_FORMAT_CLASS, inputFormat); + jobConf.set(HadoopReaderOptions.PATH_LIST, defaultFS + target); + EmbeddedFlinkCluster.submitJob(jobConf); + } +} diff --git a/bitsail-connectors/connector-hadoop/src/test/java/com/bytedance/bitsail/connector/hadoop/source/HadoopTextInputFormatITCase.java b/bitsail-connectors/connector-hadoop/src/test/java/com/bytedance/bitsail/connector/hadoop/source/HadoopTextInputFormatITCase.java deleted file mode 100644 index a06a7c05a..000000000 --- a/bitsail-connectors/connector-hadoop/src/test/java/com/bytedance/bitsail/connector/hadoop/source/HadoopTextInputFormatITCase.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Copyright 2022 Bytedance Ltd. and/or its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.bytedance.bitsail.connector.hadoop.source; - -import com.bytedance.bitsail.common.configuration.BitSailConfiguration; -import com.bytedance.bitsail.connector.hadoop.option.HadoopReaderOptions; -import com.bytedance.bitsail.test.connector.test.EmbeddedFlinkCluster; -import com.bytedance.bitsail.test.connector.test.minicluster.MiniClusterUtil; -import com.bytedance.bitsail.test.connector.test.utils.JobConfUtils; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; - -public class HadoopTextInputFormatITCase { - private static final Logger LOG = LoggerFactory.getLogger(HadoopTextInputFormatITCase.class); - private static FileSystem fs; - - @Before - public void setUp() throws IOException, InterruptedException { - MiniClusterUtil.setUp(); - fs = MiniClusterUtil.fileSystem; - } - - @After - public void close() { - MiniClusterUtil.shutdown(); - fs = null; - } - - @Test - public void testHadoopToPrintJson() throws Exception { - String localJsonFile = "test_namespace/source/test.json"; - String remoteJsonFile = "/test_namespace/source/test.json"; - Configuration conf = fs.getConf(); - String defaultFS = conf.get("fs.defaultFS"); - LOG.info("fs.defaultFS: {}", defaultFS); - ClassLoader classLoader = JobConfUtils.class.getClassLoader(); - fs.copyFromLocalFile(new Path(classLoader.getResource(localJsonFile).getPath()), new Path(remoteJsonFile)); - BitSailConfiguration jobConf = JobConfUtils.fromClasspath("hadoop_to_print_text.json"); - jobConf.set(HadoopReaderOptions.PATH_LIST, defaultFS + remoteJsonFile); - EmbeddedFlinkCluster.submitJob(jobConf); - } -} diff --git a/bitsail-connectors/connector-hadoop/src/test/resources/test_namespace/source/test.json b/bitsail-connectors/connector-hadoop/src/test/resources/source/test.json similarity index 100% rename from bitsail-connectors/connector-hadoop/src/test/resources/test_namespace/source/test.json rename to bitsail-connectors/connector-hadoop/src/test/resources/source/test.json diff --git a/bitsail-connectors/connector-hadoop/src/test/resources/test_namespace/source/test_orc b/bitsail-connectors/connector-hadoop/src/test/resources/source/test_orc similarity index 100% rename from bitsail-connectors/connector-hadoop/src/test/resources/test_namespace/source/test_orc rename to bitsail-connectors/connector-hadoop/src/test/resources/source/test_orc diff --git a/bitsail-connectors/connector-hadoop/src/test/resources/test_namespace/source/test_parquet b/bitsail-connectors/connector-hadoop/src/test/resources/source/test_parquet similarity index 100% rename from bitsail-connectors/connector-hadoop/src/test/resources/test_namespace/source/test_parquet rename to bitsail-connectors/connector-hadoop/src/test/resources/source/test_parquet